import numpy as np
import copy
from .. import core
from . import ACUStatus, ACUState, TrackerStatus, TrackerState, TrackerPointing, CalFile
def UnitValue(caldict_entry):
'''Turn unit name into floating point unit value'''
if 'UnitValue' in caldict_entry:
return caldict_entry['UnitValue']
try:
uname = caldict_entry['UnitName']
if uname and uname != 'None':
try:
if '/' in uname:
unames = list(filter(None,uname.split('/')))
uvalue1 = getattr(core.G3Units,
list(filter(None,unames[0].split(' ')))[0])
uvalue2 = getattr(core.G3Units,
list(filter(None,unames[1].split(' ')))[0])
uvalue = uvalue1 / uvalue2
else:
uvalue = getattr(core.G3Units, uname)
except AttributeError:
uvalue = 1.
core.log_warn('No entry in G3Units for ' + uname + '. Setting UnitValue to 1.0\n')
else:
uvalue = 1.
except KeyError:
uvalue = 1.
caldict_entry['UnitValue'] = uvalue
return uvalue
def CalibrateValue(data, caldict_entry):
'''Apply gain / offset units from G3 cal file to register'''
uvalue = UnitValue(caldict_entry)
# if a register has units, it can't be an int anymore. well, actually,
# it can't be an int if we're adding floats to it or multiplying it by
# floats either, so convert everything that has an entry in the cal file
# to float/double.
if 'OutputType' not in caldict_entry:
g3type = type(data)
if g3type == core.G3VectorInt:
g3type = core.G3VectorDouble
elif g3type == core.G3MapInt:
g3type = core.G3MapDouble
elif g3type == core.G3Int:
g3type = core.G3Double
caldict_entry['OutputType'] = g3type
g3type = caldict_entry['OutputType']
# make a copy
if np.size(data) == 1:
data = data.value
data2 = np.asarray(data, dtype='float64')
# calibrate units
offset, recip = caldict_entry['Offset'], caldict_entry['ReciprocalFactor']
if offset != 0:
data2 += float(offset)
data2 *= float(uvalue / recip)
if not data2.shape:
data2 = data2.tolist()
return g3type(data2)
[docs]
@core.indexmod
class CalibrateFrame:
'''Apply gain / offset / units from G3 cal file'''
def __init__(self, calibration_file=None):
cf = CalFile.CalFileReader()
self.cal = cf.readCalFile(calibration_file)
def __call__(self, frame):
if frame.type != core.G3FrameType.GcpSlow or 'Calibrated' in frame:
return
for board in list(frame.keys()):
cboard = frame.pop(board)
if board not in self.cal:
continue
bcal = self.cal[board]
for rmap, crmap in cboard.items():
if rmap not in bcal:
continue
rcal = bcal[rmap]
for reg, creg in crmap.items():
if reg not in rcal:
continue
rcd = rcal[reg]
rsize = np.size(creg)
rshape = np.shape(creg)
if rsize > 1 and len(rshape) > 1:
for i in range(rshape[0]):
try:
rcdi = rcd[i]
except KeyError:
rcdi = rcd
creg[i] = CalibrateValue(creg[i], rcdi)
else:
try:
rcdi = rcd[0]
except KeyError:
rcdi = rcd
crmap[reg] = CalibrateValue(creg, rcdi)
frame[board] = cboard
frame['Calibrated'] = True
[docs]
@core.indexmod
def UnpackACUData(f):
'''Extracts ACU status information to ACUStatus key in frame'''
if f.type != core.G3FrameType.GcpSlow:
return
board = f['antenna0']['acu']
a = ACUStatus()
a.time = f['antenna0']['frame']['utc']
a.az_pos = board['az_pos'].value
a.el_pos = board['el_pos'].value
a.az_rate = board['az_rate'].value
a.el_rate = board['el_rate'].value
# 'new_*' registers not actually filled by GCP; ignore them
a.px_checksum_error_count = board['px_checksum_error_count'].value
a.px_resync_count = board['px_resync_count'].value
a.px_resync_timeout_count = board['px_resync_timeout_count'].value
a.px_resyncing = board['px_resyncing'].value
a.px_timeout_count = board['px_timeout_count'].value
a.restart_count = board['restart_count'].value
a.state = ACUState(board['state'].value)
a.status = board['acu_status'].value
try:
a.error = board['acu_error'].value
except KeyError:
# This register was some time in early 2018. In order to read
# older data, just set the error code to 0.
a.error = 0
f['ACUStatus'] = a
[docs]
@core.indexmod
def UnpackTrackerMinimal(f, rewrite_source_from_feature_bits=True):
'''
Construct SourceName and ObservationId keys from frame.
If rewrite_source_from_feature_bits is True (the default), will try to
rewrite source names if DecryptFeatureBit() has been run and either
"elnod", "calibrator", or "noise" is present in the feature bit list
to that value.
'''
if f.type != core.G3FrameType.GcpSlow:
return
# Grab the GCP source name. If it is "current", fill in something more
# helpful from the feature bits if possible.
source = f['antenna0']['tracker']['source'].value
if rewrite_source_from_feature_bits and 'GCPFeatureBits' in f:
if 'elnod' in f['GCPFeatureBits']:
source = 'elnod'
if 'calibrator' in f['GCPFeatureBits'] and 'source_scan' not in f['GCPFeatureBits']:
source = 'calibrator'
if 'noise' in f['GCPFeatureBits']:
source = 'noise'
if 'debug' in f['GCPFeatureBits']:
source = 'debug-forced-scanify'
if 'every_pixel_on_src' in f['GCPFeatureBits']:
source = source + '-pixelraster' # NB: Do NOT use in-place +=
f['SourceName'] = source
# And observation ID, if present
if 'obs_id' in f['antenna0']['tracker']:
f['ObservationID'] = f['antenna0']['tracker']['obs_id']
[docs]
@core.indexmod
def UnpackTrackerData(f, rewrite_source_from_feature_bits=True):
'''
Extracts tracker status information to frame into the TrackerStatus key,
along with the observation processing handled by UnpackTrackerMinimal.
If rewrite_source_from_feature_bits is True (the default), will try to
rewrite source names if DecryptFeatureBit() has been run and either
"elnod", "calibrator", or "noise" is present in the feature bit list
to that value.
'''
if f.type != core.G3FrameType.GcpSlow:
return
UnpackTrackerMinimal(f, rewrite_source_from_feature_bits)
board = f['antenna0']['tracker']
t = TrackerStatus()
# List comprehensions are due to funny business with G3VectorFrameObject
t.time = board['utc'][0]
# Measured values
t.az_pos = np.asarray(board['actual'][0])
t.el_pos = np.asarray(board['actual'][1])
# XXX units for rates seem to be wrong. I think this is in encoder counts
t.az_rate = np.asarray(board['actual_rates'][0], dtype = float)
t.el_rate = np.asarray(board['actual_rates'][1], dtype = float)
# Expected values
t.az_command = np.asarray(board['expected'][0])
t.el_command = np.asarray(board['expected'][1])
t.az_rate_command = np.asarray(board['expected_rates'][0], dtype = float)
t.el_rate_command = np.asarray(board['expected_rates'][1], dtype = float)
# Status params
if isinstance(board['state'][0], core.G3String):
# If state is all zero (LACKING), for example due to an ACU glitch,
# the ARC reader may decide that the 8-bit array field is a string.
# Treat it as one.
t.state = [TrackerState(0) for s in board['inControl'][0]]
else:
t.state = [TrackerState(s) for s in board['state'][0]]
t.acu_seq = board['acu_seq'][0]
t.in_control = core.BoolVector(board['inControl'][0])
t.in_control_int = core.IntVector(board['inControl'][0])
t.scan_flag = core.BoolVector(board['scan_flag'][0])
t.lst = np.asarray(board['lst'][0], dtype=float)
t.source_acquired = np.asarray(board['off_source'][0])
t.source_acquired_threshold = np.asarray(board['source_acquired_threshold'])
t.tracker_mode = np.asarray(board['mode'][0])
t.tracker_lacking = np.asarray(board['lacking'][0])
t.time_status = np.asarray(board['time_status'][0])
try:
t.schedule_name = np.asarray(board['schedule_name'].value)
except AttributeError:
t.schedule_name = np.asarray(''.join([chr(x) for x in board['schedule_name']]))
f['TrackerStatus'] = t
[docs]
@core.indexmod
def UnpackTrackerPointingData(f):
'''
Extracts tracker registers relevant to online and offline pointing.
Calibration values (offsets and multiplicative constants) are from
gcp/control/conf/spt/cal.
'''
if f.type != core.G3FrameType.GcpSlow:
return
board = f['antenna0']['tracker']
t = TrackerPointing()
t.time = board['utc'][0]
t.scu_temp = np.asarray(f['antenna0']['scu']['temp'])
t.features = core.IntVector([f['array']['frame']['features'].value])
t.encoder_off_x = np.asarray([board['encoder_off'][0]], dtype=np.double)
t.encoder_off_y = np.asarray([board['encoder_off'][1]], dtype=np.double)
t.low_limit_az = np.asarray([board['az_limits'][0]], dtype=np.double)
t.high_limit_az = np.asarray([board['az_limits'][1]], dtype=np.double)
t.low_limit_el = np.asarray([board['el_limits'][0]], dtype=np.double)
t.high_limit_el = np.asarray([board['el_limits'][1]], dtype=np.double)
t.tilts_x = np.asarray(board['tilt_xy_avg'][0], dtype=np.double)
t.tilts_y = np.asarray(board['tilt_xy_avg'][1], dtype=np.double)
t.refraction = np.asarray(board['refraction'][2], dtype=np.double)
t.horiz_mount_x = np.asarray(board['horiz_mount'][0])
t.horiz_mount_y = np.asarray(board['horiz_mount'][1])
t.horiz_off_x = np.asarray(board['horiz_off'][0])
t.horiz_off_y = np.asarray(board['horiz_off'][1])
t.scan_off_x = np.asarray(board['scan_off'][0])
t.scan_off_y = np.asarray(board['scan_off'][1])
t.sky_off_x = np.asarray(board['sky_xy_off'][0])
t.sky_off_y = np.asarray(board['sky_xy_off'][1])
t.equat_off_x = np.asarray(board['equat_off'][0])
t.equat_off_y = np.asarray(board['equat_off'][1])
t.equat_geoc_ra = np.asarray(board['equat_geoc'][0])
t.equat_geoc_dec = np.asarray(board['equat_geoc'][1])
t.horiz_topo_az = np.asarray(board['horiz_topo'][0])
t.horiz_topo_el = np.asarray(board['horiz_topo'][1])
t.error_az = np.asarray(board['errors'][0])
t.error_el = np.asarray(board['errors'][1])
t.linsens_avg_l1 = np.asarray(board['linear_sensor_avg'][0])
t.linsens_avg_l2 = np.asarray(board['linear_sensor_avg'][1])
t.linsens_avg_r1 = np.asarray(board['linear_sensor_avg'][2])
t.linsens_avg_r2 = np.asarray(board['linear_sensor_avg'][3])
t.telescope_temp = np.asarray([f['array']['weather']['airTemperature'].value])
t.telescope_pressure = np.asarray([f['array']['weather']['pressure'].value])
f['TrackerPointing'] = t
p = core.G3MapVectorDouble()
pc = core.G3MapVectorDouble()
# core model parameters
for k in ["tilts", "flexure", "fixedCollimation"]:
p[k] = np.asarray(board[k], dtype=np.double)
kadj = k + "Adjust"
if kadj in board.keys():
v = np.asarray(board[kadj], dtype=np.double)
if np.any(v):
pc[k] = v
p['time'] = np.asarray(t.time, dtype=np.double)
if 'linear_sensor_enabled' in board.keys():
p['linsensEnabled'] = np.asarray(board['linear_sensor_enabled'][0], dtype=np.double)
if 'linear_sensor_coeff' in board.keys():
p['linsensCoeffs'] = np.asarray(board['linear_sensor_coeff'], dtype=np.double)
else:
p['linsensEnabled'] = np.zeros_like(p['time'], dtype=np.double)
f['OnlinePointingModel'] = p
if len(pc.keys()):
pc['time'] = p['time']
f['OnlinePointingModelCorrection'] = pc
[docs]
@core.indexmod
def UpdateLinearSensorDeltas(f):
if 'TrackerPointing' not in f:
return
if 'LinearSensorDeltas' in f:
return
# Yoke dimensions in mm.
Rs = 1652.0 * core.G3Units.mm
Rh = 3556.0 * core.G3Units.mm
Ry = 6782.0 * core.G3Units.mm
l1 = f["TrackerPointing"].linsens_avg_l1
l2 = f["TrackerPointing"].linsens_avg_l2
r1 = f["TrackerPointing"].linsens_avg_r1
r2 = f["TrackerPointing"].linsens_avg_r2
# Calculate corrections in radians.
p = core.G3MapVectorDouble()
p['time'] = np.asarray(f['TrackerPointing'].time, dtype=np.double)
p["delta_az"] = (Rh / (Ry * Rs)) * (l1 - l2 - r1 + r2) * core.G3Units.rad
p["delta_el"] = (1. / (2. * Rs)) * (l2 - l1 + r2 - r1) * core.G3Units.rad
p["delta_et"] = (1. / (2. * Ry)) * (l1 + l2 - r1 - r2) * core.G3Units.rad
f['LinearSensorDeltas'] = p
[docs]
@core.indexmod
def DecryptFeatureBit(f):
'''
Unpacks the GCP feature flags
'''
if f.type != core.G3FrameType.GcpSlow:
return
flag_array = core.G3VectorString()
feature_bit = f['array']['frame']['features'].value
flags = ['analyze', 'source_scan', 'cabin_shutter', 'elnod', 'pol_cal',
'calibrator', 'every_pixel_on_src', 'skydip', 'optical', 'noise',
'trail', 'el_scan', None, None, None, None, None, None, None,
'debug']
# Sorry... NDH
for i in enumerate(flags):
if feature_bit & (1 << i[0]):
if i[1] is None:
core.log_error('Got an unused feature bit: {:d}'.format(i[0]))
flag_array.append(i[1])
f['GCPFeatureBits'] = flag_array
[docs]
@core.indexmod
def AddBenchData(f):
'''
Add the optical bench positions to the frame.
'''
if f.type != core.G3FrameType.GcpSlow:
return
bench_axes = ['y1', 'y2', 'y3', 'x4', 'x5', 'z6']
board = f['antenna0']['scu']
# As of 2017-08-03, SCU time is not trustworthy
# time = board['benchSampleTime'][0]
# For now, do this bit of evil
time = f['antenna0']['tracker']['utc'][0]
start = time[0]
stop = time[-1]
benchcom = core.G3TimestreamMap()
benchpos = core.G3TimestreamMap()
benchzero = core.G3TimestreamMap()
benchoff = core.G3TimestreamMap()
bencherr = core.G3TimestreamMap()
bench_info = core.G3TimestreamMap()
for i, key in enumerate(bench_axes):
benchcom[key] = core.G3Timestream(board['benchExpected'][i])
benchcom[key].start = start
benchcom[key].stop = stop
benchpos[key] = core.G3Timestream(board['benchActual'][i])
benchpos[key].start = start
benchpos[key].stop = stop
benchzero[key] = core.G3Timestream(board['benchZeros'][i])
benchzero[key].start = start
benchzero[key].stop = stop
benchoff[key] = core.G3Timestream(board['benchOffsets'][i])
benchoff[key].start = start
benchoff[key].stop = stop
bencherr[key] = core.G3Timestream(board['benchErrors'][i])
bencherr[key].start = start
bencherr[key].stop = stop
info_items = ['benchFocus', 'benchDeadBand', 'benchAcquiredThreshold',
'benchPrimaryState', 'benchSecondaryState',
'benchFault', 'timeLocked']
bench_info = core.G3TimestreamMap()
for i, key in enumerate(info_items):
bench_info[key] = core.G3Timestream(board[key][0])
bench_info[key].start = start
bench_info[key].stop = stop
f['BenchPosition'] = benchpos
f['BenchCommandedPosition'] = benchcom
f['BenchZeros'] = benchzero
f['BenchOffsets'] = benchoff
f['BenchErrors'] = bencherr
f['BenchInfo'] = bench_info
f['BenchSampleTime'] = board['benchSampleTime'][0]
[docs]
@core.indexmod
def UnpackCryoData(f):
'''
Extracts cryo information into CryoStatus key
'''
if f.type != core.G3FrameType.GcpSlow:
return
if 'cryo' not in f['array']:
return
board = f['array']['cryo']
temps = board['temperature']
heaters = board['heater_dac']
t = core.G3MapDouble()
t['cryo_is_valid'] = board['cryoIsValid'][0]
# Measured values
# He10
t['uc_head'] = temps[0][0]
t['ic_head'] = temps[0][1]
t['he4_head'] = temps[0][2]
t['he4_fb'] = temps[0][3]
t['he4_pump'] = temps[0][4]
t['ic_pump'] = temps[0][5]
t['uc_pump'] = temps[0][6]
t['he4_sw'] = temps[0][7]
t['ic_sw'] = temps[0][8]
t['uc_sw'] = temps[0][9]
t['uc_stage'] = temps[0][10]
t['lc_tower'] = temps[0][11]
t['ic_stage'] = temps[0][12]
t['t4k_head'] = temps[0][13]
t['t4k_squid_strap'] = temps[0][14]
t['t50k_head'] = temps[0][15]
# Optics
t['b1_50k_wbp_near'] = temps[1][0]
t['b2_50k_wbp_far'] = temps[1][1]
t['b3_50k_diving_board'] = temps[1][2]
t['b4_50k_top_bot_ptc'] = temps[1][3]
t['y1_50k_head'] = temps[1][4]
t['y2_50k_window_strap_near'] = temps[1][5]
t['y3_50k_tube_strap_near'] = temps[1][6]
t['y4_50k_tube'] = temps[1][7]
t['g1_4k_head'] = temps[1][8]
t['g2_4k_strap'] = temps[1][9]
t['g3_4k_lens_tab'] = temps[1][10]
t['g4_4k_lens_tab_far'] = temps[1][11]
t['r1_4k_top_top_ptc'] = temps[1][12]
t['r2_50k_midop_bot_ptc'] = temps[1][13]
t['r3_4k_lyot_flange'] = temps[1][14]
t['r4_4k_lyot'] = temps[1][15]
# Receiver
t['t4k_plate_far'] = temps[2][0]
t['t4k_strap_optics'] = temps[2][1]
t['t4k_plate_mid'] = temps[2][2]
t['t4k_plate_top'] = temps[2][3]
t['t4k_plate_ptc'] = temps[2][4]
t['t50k_harness_middle'] = temps[2][6]
t['t50k_strap'] = temps[2][7]
t['squid_wh1_sl1'] = temps[2][8]
t['squid_wh5_sl1'] = temps[2][9]
t['squid_wh3_sl7'] = temps[2][10]
t['cal_filament'] = temps[2][11]
t['cal_ambient1'] = temps[2][12]
t['cal_ambient2'] = temps[2][13]
t['cal_ambient3'] = temps[2][14]
# Heaters
t['heat_he4_pump'] = heaters[0][3]
t['heat_ic_pump'] = heaters[0][4]
t['heat_uc_pump'] = heaters[0][5]
t['heat_he4_sw'] = heaters[0][0]
t['heat_ic_sw'] = heaters[0][1]
t['heat_uc_sw'] = heaters[0][2]
f['CryoStatus'] = t
f['CryoStatusTime'] = board['utc']
[docs]
@core.indexmod
def UnpackPTData(f):
'''Extracts pulse tube status information to PTStatus key
in frame'''
if f.type != core.G3FrameType.GcpSlow:
return
if 'pt415' not in f['array']:
return
board = f['array']['pt415']
p = core.G3MapDouble()
p['optics_lowp'] = board['pressure_low'][0]
p['min_optics_lowp'] = board['min_pressure_low'][0]
p['max_optics_lowp'] = board['max_pressure_low'][0]
p['optics_highp'] = board['pressure_high'][0]
p['min_optics_highp'] = board['min_pressure_high'][0]
p['max_optics_highp'] = board['max_pressure_high'][0]
p['optics_tempoil'] = board['temp_oil'][0]
p['min_optics_tempoil'] = board['min_temp_oil'][0]
p['max_optics_tempoil'] = board['max_temp_oil'][0]
p['receiver_lowp'] = board['pressure_low'][1]
p['min_receiver_lowp'] = board['min_pressure_low'][1]
p['max_receiver_lowp'] = board['max_pressure_low'][1]
p['receiver_highp'] = board['pressure_high'][1]
p['min_receiver_highp'] = board['min_pressure_high'][1]
p['max_receiver_highp'] = board['max_pressure_high'][1]
p['receiver_tempoil'] = board['temp_oil'][1]
p['min_receiver_tempoil'] = board['min_temp_oil'][1]
p['max_receiver_tempoil'] = board['max_temp_oil'][1]
p['optics_is_valid'] = board['deviceIsValid'][0]
p['receiver_is_valid'] = board['deviceIsValid'][1]
f['PTStatus'] = p
f['PTStatusTime'] = board['utc']
[docs]
@core.indexmod
def UnpackMuxData(f):
'''
Add the DFMux data to the frame.
'''
if f.type != core.G3FrameType.GcpSlow:
return
try:
mux = f['array']['muxHousekeeping']
boards = mux['boardname']
except KeyError:
return
fpga_temp = core.G3MapDouble()
board_name = core.G3MapString()
for i, bn in enumerate(boards):
bn = str(bn).replace('"', '') # get rid of extra quotes in board name
if bn != "":
board_name[str(i)] = bn
fpga_temp[str(i)] = mux['MB_TEMPERATURE_FPGA_DIE'][i]
f['MuxFPGATemp'] = fpga_temp
f['MuxBoardName'] = board_name
f['MuxTime'] = mux['utc']
[docs]
@core.indexmod
def UnpackWeatherData(f):
'''Extracts weather status information to Weather key
in frame'''
if f.type != core.G3FrameType.GcpSlow:
return
if 'weather' not in f['array']:
return
board = f['array']['weather']
t = core.G3MapDouble()
t['telescope_temp'] = board['airTemperature'].value
t['telescope_pressure'] = board['pressure'].value
t['inside_dsl_temp'] = board['internalTemperature'].value
t['wind_speed'] = board['windSpeed'].value
t['wind_direction'] = board['windDirection'].value
t['battery'] = board['battery'].value
t['rel_humidity'] = board['relativeHumidity'].value
t['power'] = board['power'].value
t['tau'] = f['array']['tipper']['tau'].value
t['tatm'] = f['array']['tipper']['tatm'].value
f['Weather'] = t
f['WeatherTime'] = board['utc']
f['TipperTime'] = f['array']['tipper']['utc']
@core.pipesegment
def ARCExtract(pipe):
'''Extract GCP registers into native objects'''
pipe.Add(CalibrateFrame)
pipe.Add(UnpackACUData)
pipe.Add(UnpackTrackerPointingData)
pipe.Add(UpdateLinearSensorDeltas)
pipe.Add(DecryptFeatureBit)
pipe.Add(UnpackTrackerData)
pipe.Add(AddBenchData)
pipe.Add(UnpackCryoData)
pipe.Add(UnpackPTData)
pipe.Add(UnpackMuxData)
pipe.Add(UnpackWeatherData)
@core.pipesegment
def ARCExtractMinimal(pipe):
'''
Extract bare minimum GCP registers into native objects.
Includes only GCPFeatureBits, SourceName and ObservationID keys.
Use ARCExtract to calibrate and extract the complete frame.
'''
pipe.Add(DecryptFeatureBit)
pipe.Add(UnpackTrackerMinimal)
# Need tool for tilt meter next