Source code for spt3g.gcp.ARCHKExtractor

import numpy
from .. import core, dfmux

[docs] @core.indexmod class UnpackSPTpolHKData(object): ''' Extracts SPTpol housekeeping information from ARC files ''' def __init__(self): self.hk_seen = False def __call__(self, f): if f.type != core.G3FrameType.GcpSlow: return hkregs = f['array']['muxHousekeeping'] # HK registers are just duplicated when unchanged. Deduplicate them here if self.hk_seen and not numpy.asarray(hkregs['hkIsValid']).any(): return self.hk_seen = True hkout = dfmux.DfMuxHousekeepingMap() for i, serial in enumerate(hkregs['moboId']): board = dfmux.HkBoardInfo() board.timestamp = hkregs['utc'] # TimeStamp is off by exactly 30 days, so ignore it board.timestamp_port = 'UNKNOWN' board.serial = str(serial) board.fir_stage = hkregs['firStage'][i] board.voltages['Ext5V5N'] = hkregs['mbExt5V5N'][i] board.voltages['Ext5V5P'] = hkregs['mbExt5V5P'][i] board.voltages['Int5V5N'] = hkregs['mbInt5V5N'][i] board.voltages['Int5V5P'] = hkregs['mbInt5V5P'][i] board.voltages['Squid8V5N'] = hkregs['mbSquid8V5N'][i] board.voltages['Squid8V5P'] = hkregs['mbSquid8V5P'][i] board.voltages['VAuxN'] = hkregs['mbVAuxN'][i] board.voltages['VAuxP'] = hkregs['mbVAuxP'][i] # XXX: mbSensorX? for j, mezz in enumerate(['A', 'B']): mezzhk = dfmux.HkMezzanineInfo() mezzhk.power = True mezzhk.present = True mezzhk.serial = 'N/A' mezzhk.part_number = 'SPTpol Mezz' mezzhk.revision = '' for v in ['V3P', 'V5N', '_VDemodA', '_VDemodB']: mezzhk.voltages[v.strip('_')] = hkregs['mezz' + mezz + v][i] for k, mod in enumerate(['A', 'B']): modhk = dfmux.HkModuleInfo() modhk.module_number = k modhk.carrier_gain = hkregs['carrierGain'][i][j*2 + k] modhk.nuller_gain = hkregs['nullerGain'][i][j*2 + k] modhk.demod_gain = hkregs['demodGain'][i][j*2 + k] if 'mezz' + mezz + 'Car' + mod + 'Overload' in hkregs: modhk.carrier_railed = hkregs['mezz' + mezz + 'Car' + mod + 'Overload'][i] modhk.nuller_railed = hkregs['mezz' + mezz + 'Nul' + mod + 'Overload'][i] modhk.demod_railed = hkregs['mezz' + mezz + 'Adc' + mod + 'Overload'][i] nchan = len(hkregs['carrierSettings'][0])//(3*2*2) # 3 fields per mezz/module for chan in range(nchan): chanhk = dfmux.HkChannelInfo() chanhk.channel_number = chan+1 arcchan = (j*2 + k)*nchan + chan if 'danGain' in hkregs: chanhk.dan_gain = hkregs['danGain'][i][arcchan] chanhk.dan_feedback_enable = hkregs['danFeedbackEnable'][i][arcchan] chanhk.dan_streaming_enable = hkregs['danStreamingEnable'][i][arcchan] chanhk.dan_accumulator_enable = hkregs['danAccumulatorRailed'][i][arcchan] chanhk.dan_railed = hkregs['danAccumulatorRailed'][i][arcchan] # Decrypt xSettings registers: (freq, phase, amp) chanhk.carrier_frequency = hkregs['carrierSettings'][i][arcchan*3 + 0]*25.e6/(2.**32-1)*core.G3Units.Hz chanhk.carrier_amplitude = hkregs['carrierSettings'][i][arcchan*3 + 2]/(2.**23 - 1.) chanhk.nuller_amplitude = hkregs['nullerSettings'][i][arcchan*3 + 2]/(2.**23 - 1.) chanhk.demod_frequency = hkregs['demodSettings'][i][arcchan*2 + 0]*25.e6/(2.**32-1)*core.G3Units.Hz modhk.channels[chan+1] = chanhk mezzhk.modules[k+1] = modhk board.mezz[j+1] = mezzhk hkout[serial] = board hkframe = core.G3Frame(core.G3FrameType.Housekeeping) hkframe['DfMuxHousekeeping'] = hkout return [f, hkframe]