Source code for spt3g.dfmux.HardwareMapTools

from .. import core
from . import DfMuxWiringMap, DfMuxChannelMapping
import struct, socket

'''
Code to generate wiring map objects (and wiring frames in general) from
control system hardware maps.
'''

[docs] @core.indexmod class GenerateFakeHardwareMap(object): '''Inserts a fake hardware map into the data stream. Takes a wiring frame as an argument to the constructor, which it will inject before any other frames.''' def __init__(self, frame): self.ran = False self.insert = frame def __call__(self, frame): if self.ran: return frame else: self.ran = True return [self.insert, frame]
NUM_MEZZANINES = 2 NUM_MODS_PER_MEZZ = 4 NUM_MODULES = NUM_MEZZANINES * NUM_MODS_PER_MEZZ
[docs] @core.indexmod class PyDfMuxWiringMapInjector(object): ''' Insert a wiring map derived from a pydfmux hardware map into the data stream ahead of what would otherwise be the first frame. Optionally filter for detectors described by the mask in <pathstring> (see pydfmux documentation for hwm.channel_maps_from_pstring()) and detectors in one of the states identified by the state argument. ''' def __init__(self, pydfmux_hwm, pathstring=None, state=[]): self.ran = False from pydfmux.core import dfmux as pydfmux self.hwmf = core.G3Frame(core.G3FrameType.Wiring) hwm = DfMuxWiringMap() if pathstring: chan_map_query = pydfmux_hwm.channel_maps_from_pstring(pathstring) else: chan_map_query = pydfmux_hwm.query(pydfmux.ChannelMapping) if len(state) > 0: for bolo in pydfmux_hwm.query(pydfmux.Bolometer): if bolo.readout_channel: bolo.state = bolo.retrieve_bolo_state().state pydfmux_hwm.commit() chan_map_query = chan_map_query.join(pydfmux.ChannelMapping, pydfmux.Bolometer).filter(pydfmux.Bolometer.state._in(state)) for bolo in chan_map_query: mapping = DfMuxChannelMapping() mapping.board_ip = struct.unpack("i", socket.inet_aton(socket.gethostbyname('iceboard' + str(bolo.iceboard.serial) + '.local')))[0] mapping.board_serial = int(bolo.iceboard.serial) mapping.board_slot = bolo.iceboard.slot if bolo.iceboard.slot else -1 mapping.crate_serial = int(bolo.iceboard.crate.serial) if bolo.iceboard.slot else -1 mezz = bolo.readout_channel.mezzanine.mezzanine mod = bolo.readout_channel.module.module mapping.module = (mezz - 1) * NUM_MODS_PER_MEZZ + (mod - 1) mapping.channel = bolo.readout_channel.channel - 1 # pydfmux HWMs use 1-indexing of channels, while FPGA uses 0-indexing hwm[str(bolo.bolometer.name)] = mapping self.hwmf['WiringMap'] = hwm self.hwmf['ReadoutSystem'] = 'ICE' try: from pydfmux import current_transferfunction self.hwmf['DfMuxTransferFunction'] = current_transferfunction except ImportError: pass def __call__(self, frame): if self.ran: return frame self.ran = True out = [self.hwmf, frame] del self.hwmf return out
# Compatibility PyDfMuxHardwareMapInjector = PyDfMuxWiringMapInjector
[docs] @core.usefulfunc def PathStringForBolo(wiringmap, bolo): ''' Obtain the channel pathstring for a bolometer named "bolo" using the passed wiring map. ''' wiringentry = wiringmap[bolo] if wiringentry.crate_serial >= 0: path = '/'.join([ '%03d' % wiringentry.crate_serial, '%d' % wiringentry.board_slot, ]) else: path = '%04d' % wiringentry.board_serial path = '/'.join([ path, '%d' % ((wiringentry.module // NUM_MODULES) + 1), '%d' % ((wiringentry.module % NUM_MODULES) + 1), '%d' % (wiringentry.channel + 1), ]) return path
[docs] @core.indexmod def PyDfMuxBolometerPropertiesInjector(frame, pydfmux_hwm=None, angle_per_mm = 4.186*core.G3Units.deg/1000): ''' Insert a calibration frame following any wiring frame containing a BolometerPropertiesMap named "NominalBolometerProperties" that has the properties of each bolometer as defined by the given pydfmux hardware map. ''' if frame.type != core.G3FrameType.Wiring: return from .. import calibration from pydfmux.core import dfmux as pydfmux cal = core.G3Frame(core.G3FrameType.Calibration) bpm = calibration.BolometerPropertiesMap() for bolo in pydfmux_hwm.query(pydfmux.Bolometer): bp = calibration.BolometerProperties() if hasattr(bolo, 'physical_name'): bp.physical_name = str(bolo.wafer.name) + '_' + str(bolo.physical_name) if hasattr(bolo, 'x_mm') and bolo.x_mm is not None: bp.x_offset = bolo.x_mm * angle_per_mm if hasattr(bolo, 'y_mm') and bolo.y_mm is not None: bp.y_offset = bolo.y_mm * angle_per_mm if bolo.wafer is not None: bp.wafer_id = str(bolo.wafer.name) if hasattr(bolo, 'pixel') and bolo.pixel is not None: bp.pixel_id = str(bolo.pixel) if hasattr(bolo, 'pixel_type') and bolo.pixel_type is not None: bp.pixel_type = str(bolo.pixel_type) if hasattr(bolo, 'observing_band') and bolo.observing_band is not None: bp.band = float(bolo.observing_band)*core.G3Units.GHz if hasattr(bolo, 'polarization_angle') and bolo.polarization_angle is not None: bp.pol_angle = float(bolo.polarization_angle)*core.G3Units.deg bp.pol_efficiency = 1.0 if hasattr(bolo, 'coupling') and bolo.coupling is not None: if bolo.coupling == "optical": bp.coupling = calibration.BolometerCouplingType.Optical elif bolo.coupling == "dark_tres": bp.coupling = calibration.BolometerCouplingType.DarkTermination elif bolo.coupling == "dark_xover": bp.coupling = calibration.BolometerCouplingType.DarkCrossover elif bolo.coupling == "resistor": bp.coupling = calibration.BolometerCouplingType.Resistor else: raise ValueError("Unrecognized coupling type %s" % bolo.coupling) bpm[str(bolo.name)] = bp cal['NominalBolometerProperties'] = bpm from pydfmux import current_transferfunction cal['DfMuxTransferFunction'] = current_transferfunction return [frame, cal]
core.indexmod class PyDfMuxWiringMapInjectorAllChannels(object): ''' Insert a wiring map derived from a pydfmux hardware map, but ignoring the channel mappings and adding all the channels on each defined module from the HWM. ''' def __init__(self, pydfmux_hwm, channels=64): ''' Use readout modules from provided pydfmux_hwm and add channels to it up to the index specified in channels. ''' self.ran = False self.hwm = pydfmux_hwm self.channels = channels def __call__(self, frame): if self.ran: return frame from pydfmux.core import dfmux as pydfmux hwmf = core.G3Frame(core.G3FrameType.Wiring) hwm = DfMuxWiringMap() for mod in self.hwm.query(pydfmux.ReadoutModule): for bolo in range(self.channels): mapping = DfMuxChannelMapping() mapping.board_ip = struct.unpack("i", socket.inet_aton(socket.gethostbyname('iceboard' + str(mod.iceboard.serial) + '.local')))[0] mapping.board_serial = int(mod.iceboard.serial) mapping.board_slot = mod.iceboard.slot if mod.iceboard.slot else -1 mapping.crate_serial = int(mod.iceboard.crate.serial) if mod.iceboard.slot else -1 mezz = mod.mezzanine.mezzanine # pydfmux HWMs use 1-indexing of modules, while FPGA uses 0-indexing mapping.module = (mezz - 1) * NUM_MODS_PER_MEZZ + mod.module - 1 mapping.channel = bolo hwm[str(mapping)] = mapping hwmf['WiringMap'] = hwm hwmf['ReadoutSystem'] = 'ICE' self.ran = True return [hwmf, frame] # Compatibility PyDfMuxHardwareMapInjectorAllChannels = PyDfMuxWiringMapInjectorAllChannels
[docs] @core.indexmod class DfmlHardwareMapInjector(object): def __init__(self, dfml_hwm): self.ran = False self.dfml_hwm = dfml_hwm self.hwm = DfMuxWiringMap() for ch in self.dfml_hwm.channels: bolo_id = str(self.dfml_hwm(ch, 'channel_id')) if self.dfml_hwm(ch, 'bolo_id') == '': # Check bolo_id, which is set iff this is a channel we care about continue mapping = DfMuxChannelMapping() mapping.board_ip = struct.unpack("i", socket.inet_aton( self.dfml_hwm(ch, 'dfmux_ip') ))[0] mapping.board_serial = int(self.dfml_hwm(ch, 'dfmux_id')) if self.dfml_hwm(ch, 'crate_slot') is not None: mapping.board_slot = int(self.dfml_hwm(ch, 'crate_slot')) mapping.crate_serial = -1 mapping.module = self.dfml_hwm(ch, 'module') - 1 mapping.channel = self.dfml_hwm(ch, 'chan_num') - 1 self.hwm[bolo_id] = mapping def __call__(self, frame): if self.ran: return frame hwmf = core.G3Frame(core.G3FrameType.Wiring) hwmf['WiringMap'] = self.hwm hwmf['ReadoutSystem'] = 'DfMux' self.ran = True return [hwmf, frame]