Source code for spt3g.dfmux.unittransforms

import numpy
from .. import core
from .Housekeeping import HousekeepingForBolo

# Transfer functions for 3G and SPTpol boards
from .IceboardConversions import convert_TF, convert_adc_samples
from .wtl_ConvertUtils import convert_squid, convert_demod, convert_mb

def counts_to_rms_amps(wiringmap, hkmap, bolo, system, tf=None):
    boardhk, mezzhk, modhk, chanhk = HousekeepingForBolo(hkmap, wiringmap, bolo, True)

    if system == 'ICE':
        if chanhk.dan_streaming_enable:
            tf_I = convert_TF(modhk.nuller_gain, target='nuller', custom_TF=tf, unit='RAW', frequency=chanhk.carrier_frequency/core.G3Units.Hz)
        else:
            tf_V = convert_adc_samples('Streamer')
            tf_I = tf_V/modhk.squid_transimpedance
        if chanhk.carrier_frequency != 0:
            tf_I /= numpy.sqrt(2) # Use RMS amps
    elif system == 'DfMux':
        if chanhk.dan_streaming_enable:
            squid_converter = convert_squid(units='Normalized')
            tf_I = 1. / (2.**24) * squid_converter.theo_AnToIbolo(1., Gnuller=modhk.nuller_gain, eff=1.)
        else:
            dfmux_converter = convert_demod('gcp')
            resistor='10K' # Assume this for DAN Off
            # Handle the three exceptions for SPTpol
            if bolo in ['Sq3SBpol23Ch7', 'Sq3SBpol23Ch3', 'Sq3SBpol23Ch5']:
                resistor='5K'
            # XXX above list is wrong for 100d data
            if boardhk.timestamp.mjd < 56293:
                core.log_fatal('Set correct resistor values for 2012 SPTpol data', unit='dfmux.unittransforms')
            tf_I = dfmux_converter.theo_AdcToIbolo(1., modhk.demod_gain, resistor, f_carrier_equals_f_demod=True)
            tf_I *= 256 # 3G software shifts 8 bits right compared to SPTpol, so account for difference in meaning of "counts"
    else:
        core.log_fatal('Unknown transfer function profile %s' % system, unit='dfmux.unittransforms')

    return tf_I * core.G3Units.amp

def bolo_bias_voltage_rms(wiringmap, hkmap, bolo, system, tf=None):
    boardhk, mezzhk, modhk, chanhk = HousekeepingForBolo(hkmap, wiringmap, bolo, True)

    if system == 'ICE':
        tf_V = convert_TF(modhk.carrier_gain, target='carrier', custom_TF=tf, unit='NORMALIZED', frequency=chanhk.carrier_frequency/core.G3Units.Hz)
        volts = tf_V * chanhk.carrier_amplitude * core.G3Units.V
        if chanhk.carrier_frequency != 0:
            volts /= numpy.sqrt(2) # Use RMS amps
    elif system == 'DfMux':
        squid_converter = convert_squid(units='Normalized')
        volts = squid_converter.theo_AcToVbias(chanhk.carrier_amplitude, modhk.carrier_gain) * core.G3Units.V
    else:
        core.log_fatal('Unknown transfer function profile %s' % system, unit='dfmux.unittransforms')

    return volts

[docs] @core.usefulfunc def get_timestream_unit_conversion(from_units, to_units, bolo, wiringmap=None, hkmap=None, system=None, tf=None): ''' Return the scalar conversion factor to move timestream data from a given system of units (Power, Resistance, Current, Counts) to another one. Requires a wiring map and recent housekeeping data. Returned quantities are RMS for currents and time-averaged for power. Note that this does not handle conversions to on-sky quantities (e.g. K_cmb) ''' # Rather than special-casing the full mesh of conversions, # first calculate a conversion to watts from whatever units # we have, then a conversion from watts if wiringmap is None: core.log_fatal('Try to convert data with no wiring map', unit='dfmux.unittransforms') if hkmap is None: core.log_fatal('Try to convert data with no housekeeping', unit='dfmux.unittransforms') if from_units == to_units: return 1. if system not in ["ICE", "DfMux"]: core.log_fatal(f"Cannot convert data for {system} readout system", unit="dfmux.unittransforms") I = counts_to_rms_amps(wiringmap, hkmap, bolo, system, tf) V = bolo_bias_voltage_rms(wiringmap, hkmap, bolo, system, tf) if from_units == core.G3TimestreamUnits.Resistance: if to_units == core.G3TimestreamUnits.Counts: return V / I if to_units == core.G3TimestreamUnits.Current: return V if to_units == core.G3TimestreamUnits.Power: return V * V if to_units == core.G3TimestreamUnits.Resistance: if from_units == core.G3TimestreamUnits.Counts: return V / I if from_units == core.G3TimestreamUnits.Current: return V if from_units == core.G3TimestreamUnits.Power: return V * V # First, convert to watts if from_units == core.G3TimestreamUnits.Counts: to_watts = I * V elif from_units == core.G3TimestreamUnits.Current: to_watts = V elif from_units == core.G3TimestreamUnits.Power: to_watts = 1. else: core.log_fatal('Unknown units scheme %s in timestream for bolo %s' % (repr(from_units), bolo), unit='dfmux.unittransforms') # Now the conversion from watts if to_units == core.G3TimestreamUnits.Counts: from_watts = I * V if from_watts != 0: from_watts = 1./from_watts elif to_units == core.G3TimestreamUnits.Current: from_watts = V if from_watts != 0: from_watts = 1./from_watts elif to_units == core.G3TimestreamUnits.Power: from_watts = 1. else: core.log_fatal('Trying to convert timestreams to unknown units %s' % repr(to_units), unit='dfmux.unittransforms') # Multiply them together return to_watts * from_watts
[docs] @core.indexmod class ConvertTimestreamUnits(object): ''' Changes timestream units from one set of units (e.g. ADC counts) to another (e.g. Power). Currents and power are time averaged quantities (i.e. currents give RMS values). Note that this does not handle conversions to on-sky quantities (e.g. K_cmb) ''' def __init__(self, Input='RawTimestreams', Output='CalTimestreams', Units=core.G3TimestreamUnits.Power, SkipUncalibratable=False, KeepConversionsForObservation=True): ''' Copy data in the timestream map Input to the timestream map Output, converting the units from whatever they were to those specified by Units. If SkipUncalibratable is true, copy timestreams for which the unit conversions could not be evaluated into the output timestream in their original units. If false, throws an exception if this occurs. If KeepConversionsForObservation is True (default), conversion factors will only be evaluated once per observation. Note that this will cause the wrong conversions to be applied if any parameters of the mux system are modified during the observation but will substantially increase performance. ''' self.units = Units self.input = Input self.output = Output self.wiringmap = None self.system = None self.default_tf = None self.hkmap = None self.convfactors = {} self.skiperrors = SkipUncalibratable self.keepconversions = KeepConversionsForObservation def __call__(self, frame): if 'DfMuxTransferFunction' in frame: self.default_tf = frame['DfMuxTransferFunction'] if frame.type == core.G3FrameType.Wiring: self.wiringmap = frame['WiringMap'] self.system = frame['ReadoutSystem'] self.convfactors = {} return if frame.type == core.G3FrameType.Housekeeping: self.hkmap = frame['DfMuxHousekeeping'] self.convfactors = {} return if self.keepconversions and frame.type == core.G3FrameType.Observation: self.convfactors = {} self.hkmap = None return if frame.type != core.G3FrameType.Scan: return # Housekeeping data can also be in Scan frames if 'DfMuxHousekeeping' in frame: if self.hkmap is None or (list(frame['DfMuxHousekeeping'].values())[0].timestamp != list(self.hkmap.values())[0].timestamp and not self.keepconversions): # XXX: finer-grained check for same values? self.hkmap = frame['DfMuxHousekeeping'] self.convfactors = {} # Now the meat newts = core.G3TimestreamMap() oldts = frame[self.input] for bolo,ts in oldts.items(): if ts.units not in self.convfactors: self.convfactors[ts.units] = {} if ts.units == self.units: newts[bolo] = ts continue elif bolo in self.convfactors[ts.units]: convfactor = self.convfactors[ts.units][bolo] else: tf = self.default_tf # XXX: might get from HK data try: convfactor = get_timestream_unit_conversion(ts.units, self.units, bolo, wiringmap=self.wiringmap, hkmap=self.hkmap, system=self.system, tf=tf) except KeyError: if not self.skiperrors: raise newts[bolo] = ts continue self.convfactors[ts.units][bolo] = convfactor # And cache it # Convert timestream and store results convts = core.G3Timestream(ts) if core.G3TimestreamUnits.Resistance in [self.units, ts.units] and self.units != ts.units: convts = 1. / convts convts.units = self.units convts *= convfactor if convts.units != core.G3TimestreamUnits.Counts: convts.SetFLACCompression(False) newts[bolo] = convts frame[self.output] = newts