Source code for spt3g.dfmux.DataQualityTools

from .. import core
from . import DfMuxMetaSample
import numpy as np

def get_empty_timepoint(sample_time):
    tp_frame = core.G3Frame(core.G3FrameType.Timepoint)
    tp_frame["EventHeader"] = sample_time
    tp_frame["DfMux"] = DfMuxMetaSample()
    return tp_frame

class AddMissingTimepoints(object):
    def __init__(self, expected_sample_rate = (20e6 * core.G3Units.Hz)/ 2**17):
        self.samp_spacing_ = 1.0 / expected_sample_rate
        self.prev_sample_time_ = None

    def __call__(self, frame):
        if frame.type != core.G3FrameType.Timepoint:
            return            
        if 'EventHeader' not in frame:
            return
        new_time = frame['EventHeader']
        if self.prev_sample_time_ == None:
            self.prev_sample_time_ = new_time
            return
        t_delt = float(new_time - self.prev_sample_time_)
        n_missing_samples = int(round(t_delt / samp_spacing_ )) - 1
        samp_times = [new_time - samp_spacing_ * i for i in range(1, 1+n_missing_samples)]
        new_packets = [get_empty_timepoint(st) for st in samp_times]
        new_packets.append(frame)
        return new_packets

[docs] @core.indexmod class FillMissingTimepointFrames(object): ''' Search for Timepoint frames that come at larger-than-expected intervals. Fill in the missin frames with garbage, and mark them as garbage for conversion to NaNs later. ''' def __init__(self): self.dt = None self.last_frame = None self.buffer = [] def __call__(self, fr): if fr.type != core.G3FrameType.Timepoint: return if self.dt is None: # Calculate the sample rate from the first 20 frames self.buffer.append(fr) if len(self.buffer) < 20: return [] else: times = [_fr['EventHeader'].time for _fr in self.buffer] intervals = np.diff(times) self.dt = min(intervals[intervals > 0]) # Now process the frames in the buffer out = sum([self._find_and_fill_frames(_fr) for _fr in self.buffer], []) self.buffer = [] return out return self._find_and_fill_frames(fr) def _find_and_fill_frames(self, fr): # This function does most of the work: # Find frames with separation that is too large, and fill the gaps # with new frames. if self.last_frame is None: self.last_frame = fr return [fr] this_dt = fr['EventHeader'].time - self.last_frame['EventHeader'].time if this_dt > 1.5 * self.dt: # We're missing at least one frame nframes = int(np.round(this_dt / self.dt) - 1) injected_dt = this_dt / (nframes + 1) injected_frs = [core.G3Frame(core.G3FrameType.Timepoint) for i in range(nframes)] for i, new_fr in enumerate(injected_frs): new_fr['EventHeader'] = self.last_frame['EventHeader'] + \ injected_dt * (i + 1) new_fr['DfMux'] = fr['DfMux'].copy() try: new_fr['CalibratorOn'] = fr['CalibratorOn'] except KeyError: pass new_fr['GarbageData'] = True self.last_frame = fr return injected_frs + [fr] self.last_frame = fr return [fr]