Source code for spt3g.calibration.build_cal_frames

from .. import core
from . import BolometerProperties, BolometerPropertiesMap
from . import PointingProperties, PointingPropertiesMap
import numpy, os, re

'''
Utilities for bringing together calibration data from a number of sources
to produce unified calibration data to go in folders with the data for
analysis processing.
'''

[docs] @core.indexmod class BuildBoloPropertiesMap(object): ''' Build bolometer properties map from raw calibration sub-frames emitted by other processing scripts. Input data are maps of bolo ID to floats or, in the case of names, strings. If multiple instance of each data class appear, the final bolometer properties map will be the median of the input values. Expects to be passed frames from: - RCW38 Relative Pointing Offset Calibration (Keys: 'PointingOffsetX', 'PointingOffsetY') - CenA Angle Calibration (Keys: 'PolarizationAngle', 'PolarizationEfficiency') - Band Calibration (Key: 'BoloBands') - Physical Name Data (Key: 'PhysicalBoloIDs') ''' def __init__(self, drop_original_frames=True, fiducial_detectors=[], bpm_name='NominalBolometerProperties', use_bpm_pointing=False): ''' If drop_original_frames is True, will drop all input Calibration frames. If fiducial_detectors is set, will use the average of the position[s] of whatever detector[s] are specified to center each set of relative offsets encountered (NB: this recentering is done in a Cartesian way). If it is *not* specified, five detectors near the middle of the focal plane present in every observation given will be chosen automatically and printed to the console. bpm_name is the name of the key containing the input BolometerPropertiesMap that will be merged into the output map. If use_bpm_pointing is True, then the pointing information is extracted from the input BolometerPropertiesMap. If False, pointing information must be supplied in an additional input frame. ''' self.drop_original_frames = drop_original_frames self.fiducial_detectors = fiducial_detectors self.bpm_name = bpm_name self.use_bpm_pointing = use_bpm_pointing self.props = {} self.dfmux_tf = None def __call__(self, frame): if frame.type == core.G3FrameType.EndProcessing: boloprops = BolometerPropertiesMap() # Technique to average together points while ignoring outliers def robust_avg(data): from scipy.stats import sigmaclip gooddata = numpy.asarray(data)[numpy.isfinite(data)] if len(gooddata) == 1: return gooddata[0] return numpy.median(sigmaclip(gooddata, low=2.0, high=2.0)[0]) if len(self.fiducial_detectors) == 0: always_dets = set([bolo for bolo in self.props if numpy.isfinite(self.props[bolo].get('xoffsets', [numpy.nan])).all()]) self.fiducial_detectors = sorted(always_dets, key=lambda bolo: robust_avg(self.props[bolo]['xoffsets'])**2 + robust_avg(self.props[bolo]['yoffsets'])**2)[:5] print('Using %s as the set of fiducial detectors for pointing' % self.fiducial_detectors) for bolo in self.props.keys(): p = BolometerProperties() if 'xoffsets' in self.props[bolo]: # Ideally we would have error bars on these measurements... p.x_offset = robust_avg([self.props[bolo]['xoffsets'][i] - numpy.mean([self.props[fd]['xoffsets'][i] for fd in self.fiducial_detectors]) for i in range(len(self.props[bolo]['xoffsets']))]) p.y_offset = robust_avg([self.props[bolo]['yoffsets'][i] - numpy.mean([self.props[fd]['yoffsets'][i] for fd in self.fiducial_detectors]) for i in range(len(self.props[bolo]['xoffsets']))]) if 'physname' in self.props[bolo]: p.physical_name = self.props[bolo]['physname'] if 'band' in self.props[bolo]: p.band = self.props[bolo]['band'] if 'polangle' in self.props[bolo]: # Ideally we would have error bars on these measurements. # Strategy: to avoid wraparound issues, form a 2-D vector # and then take the median X/Y position # XXX: Do anything about vectors being headless? angles = numpy.asarray(self.props[bolo]['polangle']) efficiencies = numpy.asarray(self.props[bolo]['poleff']) poly = robust_avg(numpy.sin(angles/core.G3Units.rad)*efficiencies) polx = robust_avg(numpy.cos(angles/core.G3Units.rad)*efficiencies) p.pol_angle = numpy.arctan2(poly, polx) p.pol_efficiency = numpy.hypot(poly, polx) if 'coupling' in self.props[bolo]: p.coupling = self.props[bolo]['coupling'] if 'pixel_id' in self.props[bolo]: p.pixel_id = self.props[bolo]['pixel_id'] if 'wafer_id' in self.props[bolo]: p.wafer_id = self.props[bolo]['wafer_id'] if 'pixel_type' in self.props[bolo]: p.pixel_type = self.props[bolo]['pixel_type'] boloprops[bolo] = p cframe = core.G3Frame(core.G3FrameType.Calibration) cframe['BolometerProperties'] = boloprops cframe['FiducialPointingDetectors'] = core.G3VectorString(self.fiducial_detectors) cframe['DfMuxTransferFunction'] = self.dfmux_tf return [cframe, frame] if self.bpm_name in frame: bpm = frame[self.bpm_name] if len(self.props) > 0: npointings = max([len(p.get('xoffsets', [])) for p in self.props.values()]) else: npointings = 0 for bolo in bpm.keys(): bp = bpm[bolo] if bolo not in self.props: self.props[bolo] = {} if self.use_bpm_pointing: if 'xoffsets' not in self.props[bolo]: self.props[bolo]['xoffsets'] = [numpy.nan]*npointings self.props[bolo]['yoffsets'] = [numpy.nan]*npointings self.props[bolo]['xoffsets'].append(bp.x_offset) self.props[bolo]['yoffsets'].append(bp.y_offset) self.props[bolo]['band'] = bp.band self.props[bolo]['coupling'] = bp.coupling self.props[bolo]['physname'] = bp.physical_name self.props[bolo]['pixel_id'] = bp.pixel_id self.props[bolo]['wafer_id'] = bp.wafer_id self.props[bolo]['pixel_type'] = bp.pixel_type if 'polangle' not in self.props[bolo]: self.props[bolo]['polangle'] = [] self.props[bolo]['poleff'] = [] self.props[bolo]['polangle'].append(bp.pol_angle) self.props[bolo]['poleff'].append(bp.pol_efficiency) # Book NaNs for any non-present detectors if self.use_bpm_pointing: for bolo,p in self.props.items(): if bolo in bpm: continue if 'xoffsets' in p: self.props[bolo]['xoffsets'].append(numpy.nan) self.props[bolo]['yoffsets'].append(numpy.nan) # Pointing calibration if 'PointingOffsetX' in frame: if len(self.props) > 0: npointings = max([len(p.get('xoffsets', [])) for p in self.props.values()]) else: npointings = 0 for bolo in frame['PointingOffsetX'].keys(): if bolo not in self.props: self.props[bolo] = {} if 'xoffsets' not in self.props[bolo]: self.props[bolo]['xoffsets'] = [numpy.nan]*npointings self.props[bolo]['yoffsets'] = [numpy.nan]*npointings self.props[bolo]['xoffsets'].append(frame['PointingOffsetX'][bolo]) self.props[bolo]['yoffsets'].append(frame['PointingOffsetY'][bolo]) for bolo,p in self.props.items(): if bolo in frame['PointingOffsetX']: continue if 'xoffsets' in p: self.props[bolo]['xoffsets'].append(numpy.nan) self.props[bolo]['yoffsets'].append(numpy.nan) # Band calibration if 'BoloBands' in frame: for bolo in frame['BoloBands'].keys(): if bolo not in self.props: self.props[bolo] = {} #assert('band' not in self.props[bolo] or self.props[bolo]['band'] == frame['BoloBands'][bolo]) self.props[bolo]['band'] = frame['BoloBands'][bolo] # Optical coupling if 'BoloCoupling' in frame: for bolo in frame['BoloCoupling'].keys(): if bolo not in self.props: self.props[bolo] = {} self.props[bolo]['coupling'] = frames['BoloCoupling'][bolo] # Names if 'PhysicalBoloIDs' in frame: for bolo in frame['PhysicalBoloIDs'].keys(): if bolo not in self.props: self.props[bolo] = {} #assert('physname' not in self.props[bolo] or self.props[bolo]['physname'] == frame['PhysicalBoloIDs'][bolo]) self.props[bolo]['physname'] = frame['PhysicalBoloIDs'][bolo] if 'PixelIDs' in frame: for bolo in frame['PixelIDs'].keys(): if bolo not in self.props: self.props[bolo] = {} #assert('pixel_id' not in self.props[bolo] or self.props[bolo]['pixel_id'] == frame['PixelIDs'][bolo]) self.props[bolo]['pixel_id'] = frame['PixelIDs'][bolo] if 'WaferIDs' in frame: for bolo in frame['WaferIDs'].keys(): if bolo not in self.props: self.props[bolo] = {} #assert('wafer_id' not in self.props[bolo] or self.props[bolo]['wafer_id'] == frame['WaferIDs'][bolo]) self.props[bolo]['wafer_id'] = frame['WaferIDs'][bolo] if 'PixelTypes' in frame: for bolo in frame['PixelTypes'].keys(): if bolo not in self.props: self.props[bolo] = {} # assert('pixel_type' not in self.props[bolo] or self.props[bolo]['pixel_type'] == frame['PixelTypes'][bolo]) self.props[bolo]['pixel_type'] = frame['PixelTypes'][bolo] # Polarization Angles if 'PolarizationAngle' in frame: for bolo in frame['PolarizationAngle'].keys(): if bolo not in self.props: self.props[bolo] = {} if 'polangle' not in self.props[bolo]: self.props[bolo]['polangle'] = [] self.props[bolo]['poleff'] = [] self.props[bolo]['polangle'].append(frame['PolarizationAngle'][bolo]) self.props[bolo]['poleff'].append(frame['PolarizationEfficiency'][bolo]) # DfMux Transfer Function if 'DfMuxTransferFunction' in frame: self.dfmux_tf = frame['DfMuxTransferFunction'] if frame.type == core.G3FrameType.Calibration and self.drop_original_frames: return []
[docs] @core.indexmod def ExplodeBolometerProperties(frame, bpmname='NominalBolometerProperties'): ''' Take a bolometer properties map (usually the nominal one) and convert it into its constituent keys as though they came from real calibration. This is the inverse of BuildBoloPropertiesMap and mostly is useful when combining hardware map information with real calibration. ''' if bpmname not in frame: return bpm = frame[bpmname] polangle = core.G3MapDouble() poleff = core.G3MapDouble() bands = core.G3MapDouble() names = core.G3MapString() pixels = core.G3MapString() wafers = core.G3MapString() pixtypes = core.G3MapString() xoff = core.G3MapDouble() yoff = core.G3MapDouble() coupling = core.G3MapInt() for bolo, p in bpm.items(): polangle[bolo] = p.pol_angle poleff[bolo] = p.pol_efficiency bands[bolo] = p.band names[bolo] = p.physical_name xoff[bolo] = p.x_offset yoff[bolo] = p.y_offset pixels[bolo] = p.pixel_id wafers[bolo] = p.wafer_id pixtypes[bolo] = p.pixel_type coupling[bolo] = p.coupling frame['PolarizationAngle'] = polangle frame['PolarizationEfficiency'] = poleff frame['PhysicalBoloIDs'] = names frame['BoloBands'] = bands frame['PointingOffsetX'] = xoff frame['PointingOffsetY'] = yoff frame['PixelIDs'] = pixels frame['WaferIDs'] = wafers frame['PixelTypes'] = pixtypes frame['BoloCoupling'] = coupling
[docs] @core.indexmod class BuildPointingProperties(object): ''' Build pointing properties from raw calibration sub-frames emitted by other processing scripts. Input data are floats, including tilt information, and eventually other pointing model values. Expects to be passed frames from: - Az tilt fit parameters (Keys: 'tiltAngle', 'tiltHA', 'tiltLat', 'tiltMag') ''' def __init__(self, drop_original_frames=True): ''' If drop_original_frames is True, will drop all input Calibration frames. ''' self.drop_original_frames = drop_original_frames self.props = {} def __call__(self, frame): if frame.type == core.G3FrameType.EndProcessing: pointingprops = PointingPropertiesMap() p = PointingProperties() if 'tiltLat' in self.props: p['tiltLat'] = self.props['tiltLat'] if 'tiltHA' in self.props: p['tiltHA'] = self.props['tiltHA'] if 'tiltMag' in self.props: p['tiltMag'] = self.props['tiltMag'] if 'tiltAngle' in self.props: p['tiltAngle'] = self.props['tiltAngle'] if frame.type == core.G3FrameType.Calibration and self.drop_original_frames: return []
[docs] @core.indexmod class MergeCalibrationFrames(object): ''' Merge the keys from a sequence of calibration frames. Will throw an exception if a key recurs in more than one calibration frame. The merged calibration frame will be emitted before the first non-calibration frame that follows a calibration frame or at the end of processing, whichever comes first. Other non-calibration frames will be ignored. ''' def __init__(self, KeysToIgnore=['PointingOffsetX', 'PointingOffsetY']): ''' Ignores keys in the KeysToIgnore list during merging. By default, set to values written by the flux/pointing calibration that are stored to the BolometerPropertiesMap. ''' self.outframe = core.G3Frame(core.G3FrameType.Calibration) self.ignore_keys = KeysToIgnore def __call__(self, frame): if frame.type == core.G3FrameType.Calibration: for k in frame.keys(): if k not in self.ignore_keys: self.outframe[k] = frame[k] return [] # Allow extra calibration frames to follow proper calframe files # (which likely have PipelineInfo frames in them) if frame.type == core.G3FrameType.PipelineInfo: return # Ignore random wiring frames etc. if self.outframe is None or len(self.outframe.keys()) == 0: return # Return merged frame before whatever follows if we have new data out = [self.outframe, frame] self.outframe = None return out