from . import indexmod, pipesegment, G3FrameType, log_fatal, G3Reader, G3NetworkSender
[docs]
@indexmod
def Delete(frame, keys=[], type=None):
'''
Deletes specified keys from frame. If optional type specified, only acts on frames of the given type.
'''
if type is not None and frame.type != type:
return
for key in keys:
if key in frame:
del frame[key]
[docs]
@indexmod
def Rename(frame, keys={}, type=None):
'''
Renames specified keys in frame. If optional type specified, only acts on frames of the given type. Argument is a dictionary mapping old names to new ones.
'''
if type is not None and frame.type != type:
return
for key in keys:
if key in frame:
frame[keys[key]] = frame[key]
del frame[key]
[docs]
@indexmod
def Dump(frame, type=None, added_message = None):
'''
Prints frames to console. If optional type specified, only acts on frames of the given type.
'''
if type is not None and frame.type != type:
return
if frame.type == G3FrameType.EndProcessing:
return
if added_message:
print(added_message)
print(frame)
[docs]
@indexmod
class InjectFrame(object):
"""
Inject an arbitrary frame into a pipeline.
Arguments
---------
frame : G3Frame
The frame to inject
"""
def __init__(self, frame):
self.frame = frame
def __call__(self, frame):
if self.frame is None:
return
out = [self.frame, frame]
self.frame = None
return out
[docs]
@indexmod
def InjectDebug(frame, type=None, debug_start_func = None):
'''Starts a pdb session when a frame of type shows up.
The frame data is stored in the variable named "frame".
If ``debug_start_func`` is not None, only starts a debug session when
``debug_start_func(frame) == True``.
'''
if type is None or frame.type == type:
if ((debug_start_func is None) or debug_start_func(frame)):
import pdb, rlcompleter
pdb.Pdb.complete = rlcompleter.Completer(locals()).complete
pdb.set_trace()
[docs]
@indexmod
class AbortAfterNFrames(object):
'''Stops processing after n_frames frames go by'''
def __init__(self, type, n_frames):
self.n_desired_frames = n_frames
self.num_frames = 0
self.type = type
def __call__(self, frame):
if self.num_frames >= self.n_desired_frames:
log_fatal("Manual Abort Triggered")
if frame.type == self.type:
self.num_frames += 1
return
@pipesegment
def G3NetworkReceiver(pipe, hostname='localhost', port=5978):
'''
Emulation of old G3NetworkReceiver class. Equivalent to pointing
G3Reader at a TCP URL.
'''
pipe.Add(G3Reader, filename='tcp://' + hostname + ':' + str(port))
[docs]
@indexmod
class G3ThrottledNetworkSender(object):
'''
Send every Nth frame of certain types using a wrapped G3NetworkSender.
All instances of frames not in the dictionary frame_decimation will be sent
at their full rate.
'''
def __init__(self, hostname='*', port=5978, frame_decimation = {G3FrameType.Timepoint: 10}, max_queue_size=0):
self.sender = G3NetworkSender(hostname=hostname, port=port, max_queue_size=max_queue_size)
self.decimation = frame_decimation
self.counts = {}
for k in self.decimation.keys():
self.counts[k] = 0
def __call__(self, frame):
if frame.type in self.counts:
if self.decimation[frame.type] == 0:
return
self.counts[frame.type] += 1
if self.counts[frame.type] % self.decimation[frame.type] != 0:
return
self.sender(frame)
del indexmod
del pipesegment