Source code for spt3g.gcp.InfluxDB

import numpy as np
import re
import datetime as dt
from .. import core
from ..core import G3Units as U
from . import ARCExtractor

def build_field_list(fr):
    """
    All the fields you can upload from gcp to InfluxDB
    """
    r = {
        # tracker status
        'az_actual': ['TrackerStatus', 'az_pos', U.deg],
        'el_actual': ['TrackerStatus', 'el_pos', U.deg],
        'az_rate_actual': ['TrackerStatus', 'az_rate', U.deg/U.sec],
        'el_rate_actual': ['TrackerStatus', 'el_rate', U.deg/U.sec],
        'az_command': ['TrackerStatus', 'az_command', U.deg],
        'el_command': ['TrackerStatus', 'el_command', U.deg],
        'az_rate_command': ['TrackerStatus', 'az_rate_command', U.deg/U.sec],
        'el_rate_command': ['TrackerStatus', 'el_rate_command', U.deg/U.sec],
        'tracker_state': ['TrackerStatus', 'state', None],
        'acu_seq': ['TrackerStatus', 'acu_seq', None],
        'in_control_int': ['TrackerStatus', 'in_control_int', None],
        'scan_flag': ['TrackerStatus', 'scan_flag', None],
        'lst': ['TrackerStatus', 'lst', U.hour],
        'source_acquired': ['TrackerStatus', 'source_acquired', None],
        'source_acquired_thresh': ['TrackerStatus', 'source_acquired_threshold', None],
        'tracker_mode': ['TrackerStatus', 'tracker_mode', None],
        'tracker_lacking': ['TrackerStatus', 'tracker_lacking', None],
        'time_status': ['TrackerStatus', 'time_status', None],
        'schedule': ['TrackerStatus', 'schedule_name', None],
        'raw_encoder_1': ['antenna0', 'tracker', 'raw_encoder', 0, U.deg],
        'raw_encoder_2': ['antenna0', 'tracker', 'raw_encoder', 1, U.deg],
        'drive_currents_el1': ['array', 'dc', 'currents', 0, U.volt],
        'drive_currents_el2': ['array', 'dc', 'currents', 1, U.volt],
        'drive_currents_el3': ['array', 'dc', 'currents', 2, U.volt],
        'drive_currents_el4': ['array', 'dc', 'currents', 3, U.volt],
        'drive_currents_az1': ['array', 'dc', 'currents', 4, U.volt],
        'drive_currents_az2': ['array', 'dc', 'currents', 5, U.volt],
        'drive_currents_az3': ['array', 'dc', 'currents', 6, U.volt],
        'drive_currents_az4': ['array', 'dc', 'currents', 7, U.volt],

        # tracker pointing
        'features': ['TrackerPointing', 'features', 1],
        'encoder_off_x': ['TrackerPointing', 'encoder_off_x', U.deg],
        'encoder_off_y': ['TrackerPointing', 'encoder_off_y', U.deg],
        'low_limit_az': ['TrackerPointing', 'low_limit_az', U.deg],
        'high_limit_az': ['TrackerPointing', 'high_limit_az', U.deg],
        'low_limit_el': ['TrackerPointing', 'low_limit_el', U.deg],
        'high_limit_el': ['TrackerPointing', 'high_limit_el', U.deg],
        'tilts_x': ['TrackerPointing', 'tilts_x', U.deg],
        'tilts_y': ['TrackerPointing', 'tilts_y', U.deg],
        'refraction': ['TrackerPointing', 'refraction', U.deg],
        'horiz_mount_x': ['TrackerPointing', 'horiz_mount_x', U.deg],
        'horiz_mount_y': ['TrackerPointing', 'horiz_mount_y', U.deg],
        'horiz_topo_az': ['TrackerPointing', 'horiz_topo_az', U.deg],
        'horiz_topo_el': ['TrackerPointing', 'horiz_topo_el', U.deg],
        'horiz_off_x': ['TrackerPointing', 'horiz_off_x', U.deg],
        'horiz_off_y': ['TrackerPointing', 'horiz_off_y', U.deg],
        'scan_off_x': ['TrackerPointing', 'scan_off_x', U.deg],
        'scan_off_y': ['TrackerPointing', 'scan_off_y', U.deg],
        'sky_off_x': ['TrackerPointing', 'sky_off_x', U.deg],
        'sky_off_y': ['TrackerPointing', 'sky_off_y', U.deg],
        'equat_off_x': ['TrackerPointing', 'equat_off_x', U.deg],
        'equat_off_y': ['TrackerPointing', 'equat_off_y', U.deg],
        'source_ra': ['TrackerPointing', 'equat_geoc_ra', U.rahr],
        'source_dec': ['TrackerPointing', 'equat_geoc_dec', U.deg],
        'error_az': ['TrackerPointing', 'error_az', U.deg],
        'error_el': ['TrackerPointing', 'error_el', U.deg],
        'linsens_avg_l1': ['TrackerPointing', 'linsens_avg_l1', U.mm],
        'linsens_avg_l2': ['TrackerPointing', 'linsens_avg_l2', U.mm],
        'linsens_avg_r1': ['TrackerPointing', 'linsens_avg_r1', U.mm],
        'linsens_avg_r2': ['TrackerPointing', 'linsens_avg_r2', U.mm],
        'linsens_daz': ['LinearSensorDeltas', 'delta_az', U.deg],
        'linsens_del': ['LinearSensorDeltas', 'delta_el', U.deg],
        'linsens_det': ['LinearSensorDeltas', 'delta_et', U.deg],

        # Weather
        'telescope_temp': ['Weather', 'telescope_temp', 'C'],
        'inside_dsl_temp': ['Weather', 'inside_dsl_temp', 'C'],
        'telescope_pressure': ['Weather', 'telescope_pressure', U.mb],
        'wind_speed': ['Weather', 'wind_speed', U.m / U.s],
        'wind_direction': ['Weather', 'wind_direction', U.deg],
        'battery': ['Weather', 'battery', U.mV],
        'rel_humidity': ['Weather', 'rel_humidity', None],
        'power': ['Weather', 'power', None],
        'tau': ['Weather', 'tau', None],
        'tatm': ['Weather', 'tatm', None],

        # Cryo -- units appear to just be in K. Don't recalibrate.
        # He10
        'uc_head': ['CryoStatus', 'uc_head', 1],
        'ic_head': ['CryoStatus', 'ic_head', 1],
        'he4_head': ['CryoStatus', 'he4_head', 1],
        'he4_fb': ['CryoStatus', 'he4_fb', 1],
        'he4_pump': ['CryoStatus', 'he4_pump', 1],
        'ic_pump': ['CryoStatus', 'ic_pump', 1],
        'uc_pump': ['CryoStatus', 'uc_pump', 1],
        'he4_sw': ['CryoStatus', 'he4_sw', 1],
        'ic_sw': ['CryoStatus', 'ic_sw', 1],
        'uc_sw': ['CryoStatus', 'uc_sw', 1],
        'uc_stage': ['CryoStatus', 'uc_stage', 1],
        'lc_tower': ['CryoStatus', 'lc_tower', 1],
        'ic_stage': ['CryoStatus', 'ic_stage', 1],
        '4k_head': ['CryoStatus', 't4k_head', 1],
        '4k_squid_strap': ['CryoStatus', 't4k_squid_strap', 1],
        '50k_head': ['CryoStatus', 't50k_head', 1],
        # Optics
        'b1_50k_wbp_near': ['CryoStatus', 'b1_50k_wbp_near', 1],
        'b2_50k_wbp_far': ['CryoStatus', 'b2_50k_wbp_far', 1],
        'b3_50k_diving_board': ['CryoStatus', 'b3_50k_diving_board', 1],
        'b4_50k_top_bot_ptc': ['CryoStatus', 'b4_50k_top_bot_ptc', 1],
        'y1_50k_head': ['CryoStatus', 'y1_50k_head', 1],
        'y2_50k_window_strap_near': ['CryoStatus', 'y2_50k_window_strap_near', 1],
        'y3_50k_tube_strap_near': ['CryoStatus', 'y3_50k_tube_strap_near', 1],
        'y4_50k_tube': ['CryoStatus', 'y4_50k_tube', 1],
        'g1_4k_head': ['CryoStatus', 'g1_4k_head', 1],
        'g2_4k_strap': ['CryoStatus', 'g2_4k_strap', 1],
        'g3_4k_lens_tab': ['CryoStatus', 'g3_4k_lens_tab', 1],
        'g4_4k_lens_tab_far': ['CryoStatus', 'g4_4k_lens_tab_far', 1],
        'r1_4k_top_top_ptc': ['CryoStatus', 'r1_4k_top_top_ptc', 1],
        'r2_50k_midop_bot_ptc': ['CryoStatus', 'r2_50k_midop_bot_ptc', 1],
        'r3_4k_lyot_flange': ['CryoStatus', 'r3_4k_lyot_flange', 1],
        'r4_4k_lyot': ['CryoStatus', 'r4_4k_lyot', 1],
        # Receiver
        '4k_plate_far': ['CryoStatus', 't4k_plate_far', 1],
        '4k_strap_optics': ['CryoStatus', 't4k_strap_optics', 1],
        '4k_plate_mid': ['CryoStatus', 't4k_plate_mid', 1],
        '4k_plate_top': ['CryoStatus', 't4k_plate_top', 1],
        '4k_plate_ptc': ['CryoStatus', 't4k_plate_ptc', 1],
        '50k_harness_middle': ['CryoStatus', 't50k_harness_middle', 1],
        '50k_strap': ['CryoStatus', 't50k_strap', 1],
        'squid_wh1_sl1': ['CryoStatus', 'squid_wh1_sl1', 1],
        'squid_wh5_sl1': ['CryoStatus', 'squid_wh5_sl1', 1],
        'squid_wh3_sl7': ['CryoStatus', 'squid_wh3_sl7', 1],
        'cal_filament': ['CryoStatus', 'cal_filament', 1],
        'cal_ambient1': ['CryoStatus', 'cal_ambient1', 1],
        'cal_ambient2': ['CryoStatus', 'cal_ambient2', 1],
        'cal_ambient3': ['CryoStatus', 'cal_ambient3', 1],
        # heaters
        'heat_he4_pump': ['CryoStatus', 'heat_he4_pump', 1],
        'heat_ic_pump': ['CryoStatus', 'heat_ic_pump', 1],
        'heat_uc_pump': ['CryoStatus', 'heat_uc_pump', 1],
        'heat_he4_sw': ['CryoStatus', 'heat_he4_sw', 1],
        'heat_ic_sw': ['CryoStatus', 'heat_ic_sw', 1],
        'heat_uc_sw': ['CryoStatus', 'heat_uc_sw', 1],
        # status bit
        'cryo_is_valid': ['CryoStatus', 'cryo_is_valid', None],

        # PT status
        'optics_low_p_now': ['PTStatus', 'optics_lowp', None],
        'optics_low_p_min': ['PTStatus', 'min_optics_lowp', None],
        'optics_low_p_max': ['PTStatus', 'max_optics_lowp', None],
        'optics_high_p_now': ['PTStatus', 'optics_highp', None],
        'optics_high_p_min': ['PTStatus', 'min_optics_highp', None],
        'optics_high_p_max': ['PTStatus', 'max_optics_highp', None],
        'optics_tempoil_now': ['PTStatus', 'optics_tempoil', None],
        'optics_tempoil_min': ['PTStatus', 'min_optics_tempoil', None],
        'optics_tempoil_max': ['PTStatus', 'max_optics_tempoil', None],

        'receiver_low_p_now': ['PTStatus', 'receiver_lowp', None],
        'receiver_low_p_min': ['PTStatus', 'min_receiver_lowp', None],
        'receiver_low_p_max': ['PTStatus', 'max_receiver_lowp', None],
        'receiver_high_p_now': ['PTStatus', 'receiver_highp', None],
        'receiver_high_p_min': ['PTStatus', 'min_receiver_highp', None],
        'receiver_high_p_max': ['PTStatus', 'max_receiver_highp', None],
        'receiver_tempoil_now': ['PTStatus', 'receiver_tempoil', None],
        'receiver_tempoil_min': ['PTStatus', 'min_receiver_tempoil', None],
        'receiver_tempoil_max': ['PTStatus', 'max_receiver_tempoil', None],

        'optics_is_valid': ['PTStatus', 'optics_is_valid', None],
        'receiver_is_valid': ['PTStatus', 'receiver_is_valid', None],

        # Online Pointing Model
        'tilts_hr_angle': ['OnlinePointingModel', 'tilts', 0, U.deg],
        'tilts_lat': ['OnlinePointingModel', 'tilts', 1, U.deg],
        'tilts_el': ['OnlinePointingModel', 'tilts', 2, U.deg],
        'flexure_sin': ['OnlinePointingModel', 'flexure', 0, U.deg],
        'flexure_cos': ['OnlinePointingModel', 'flexure', 1, U.deg],
        'fixed_collimation_x': ['OnlinePointingModel', 'fixedCollimation', 0, U.deg],
        'fixed_collimation_y': ['OnlinePointingModel', 'fixedCollimation', 1, U.deg],
        'tilts_hr_angle_adjust': ['OnlinePointingModelCorrection', 'tilts', 0, U.deg],
        'tilts_lat_adjust': ['OnlinePointingModelCorrection', 'tilts', 1, U.deg],
        'tilts_el_adjust': ['OnlinePointingModelCorrection', 'tilts', 2, U.deg],
        'flexure_sin_adjust': ['OnlinePointingModelCorrection', 'flexure', 0, U.deg],
        'flexure_cos_adjust': ['OnlinePointingModelCorrection', 'flexure', 1, U.deg],
        'fixed_collimation_x_adjust': ['OnlinePointingModelCorrection', 'fixedCollimation', 0, U.deg],
        'fixed_collimation_y_adjust': ['OnlinePointingModelCorrection', 'fixedCollimation', 1, U.deg],
        'linsens_coeff_az': ['OnlinePointingModel', 'linsensCoeffs', 0, None],
        'linsens_coeff_el': ['OnlinePointingModel', 'linsensCoeffs', 1, None],
        'linsens_coeff_et': ['OnlinePointingModel', 'linsensCoeffs', 2, None],
        'linsens_enabled': ['OnlinePointingModel', 'linsensEnabled', 0, None],

        # Other
        'obs_id': ['ObservationID', None],
        'source_name': ['SourceName', None],

        # ACUStatus
        'acu_state': ['ACUStatus', 'state', None],
        'acu_status': ['ACUStatus', 'status', None],
        'acu_error': ['ACUStatus', 'error', None],

        # Bench
        'bench_command_y1': ['BenchCommandedPosition', 'y1', U.mm],
        'bench_command_y2': ['BenchCommandedPosition', 'y2', U.mm],
        'bench_command_y3': ['BenchCommandedPosition', 'y3', U.mm],
        'bench_command_x4': ['BenchCommandedPosition', 'x4', U.mm],
        'bench_command_x5': ['BenchCommandedPosition', 'x5', U.mm],
        'bench_command_z6': ['BenchCommandedPosition', 'z6', U.mm],

        'bench_actual_y1': ['BenchPosition', 'y1', U.mm],
        'bench_actual_y2': ['BenchPosition', 'y2', U.mm],
        'bench_actual_y3': ['BenchPosition', 'y3', U.mm],
        'bench_actual_x4': ['BenchPosition', 'x4', U.mm],
        'bench_actual_x5': ['BenchPosition', 'x5', U.mm],
        'bench_actual_z6': ['BenchPosition', 'z6', U.mm],

        'bench_zero_y1': ['BenchZeros', 'y1', U.mm],
        'bench_zero_y2': ['BenchZeros', 'y2', U.mm],
        'bench_zero_y3': ['BenchZeros', 'y3', U.mm],
        'bench_zero_x4': ['BenchZeros', 'x4', U.mm],
        'bench_zero_x5': ['BenchZeros', 'x5', U.mm],
        'bench_zero_z6': ['BenchZeros', 'z6', U.mm],

        'bench_offset_y1': ['BenchOffsets', 'y1', U.mm],
        'bench_offset_y2': ['BenchOffsets', 'y2', U.mm],
        'bench_offset_y3': ['BenchOffsets', 'y3', U.mm],
        'bench_offset_x4': ['BenchOffsets', 'x4', U.mm],
        'bench_offset_x5': ['BenchOffsets', 'x5', U.mm],
        'bench_offset_z6': ['BenchOffsets', 'z6', U.mm],

        'bench_error_y1': ['BenchErrors', 'y1', U.mm],
        'bench_error_y2': ['BenchErrors', 'y2', U.mm],
        'bench_error_y3': ['BenchErrors', 'y3', U.mm],
        'bench_error_x4': ['BenchErrors', 'x4', U.mm],
        'bench_error_x5': ['BenchErrors', 'x5', U.mm],
        'bench_error_z6': ['BenchErrors', 'z6', U.mm],

        'bench_focus': ['BenchInfo', 'benchFocus', U.mm],
        'bench_dead_band': ['BenchInfo', 'benchDeadBand', U.mm],
        'bench_acquired_thresh': ['BenchInfo', 'benchAcquiredThreshold', U.mm],
        'bench_primary_state': ['BenchInfo', 'benchPrimaryState', None],
        'bench_secondary_state': ['BenchInfo', 'benchSecondaryState', None],
        'bench_fault': ['BenchInfo', 'benchFault', None],
        'bench_time_locked': ['BenchInfo', 'timeLocked', None],
    }

    # mux housekeeping
    for i in range(32):
        i = str(i)
        r['fpga_temp_ib{}'.format(i)] = ['MuxFPGATemp', i, None]
        r['name_ib{}'.format(i)] = ['MuxBoardName', i, None]

    # scu.temp - all temps documented given a name, others just a number
    scu_temps = {
        0: 'yoke_air',
        1: 'ctrl_room_air',
        2: 'glycol_supply',
        3: 'glycol_return',
        4: 'ctrl_room',
        20: 'secondary',
        21: 'icecrate',
        22: 'bench',
        23: 'attic',
        24: 'cabin',
        25: 'cryoboard',
    }
    for i in range(60):
        key = 't_scu_{}'.format(scu_temps.get(i, i))
        r[key] = ['TrackerPointing', 'scu_temp', i, 'C']

    return r


def make_lines(measurement, field, time, dat, tags=None):
    '''
    Return list of string lines to add to database
    '''
    if isinstance(dat[0], str):
        dat = ['"' + x + '"' for x in dat]
    if tags is None:
        fmt_str = measuremnt +' '+ field + '={val} {time}'
    else:
        fmt_str = measurement
        for tag, tag_val in tags.items():
            fmt_str += ',{}={}'.format(tag, tag_val)
        fmt_str += ' ' + field + '={val} {time}'
    # time in int because it's in nanoseconds for InfluxDB
    lines = [fmt_str.format(val=x, time=int(t0)) for x, t0 in zip(dat, time)]
    return lines


[docs] @core.indexmod def WriteDB(fr, client, fields=None): ''' Write points to the database for each field Arguments --------- client : InfluxDB client fields : Which gcp fields to add to database. See parse_field for options. If None, add all. ''' assert client is not None, "InfluxDB client required" from influxdb.exceptions import InfluxDBClientError from influxdb.exceptions import InfluxDBServerError if fr.type != core.G3FrameType.GcpSlow: return all_fields = build_field_list(fr) if fields is None: fields = all_fields.keys() dict_list = [] for f in fields: field_dat = all_fields[f] if len(field_dat) == 5: # raw register stat = 'TrackerStatus' tmp, brd, attr, ind, unit = field_dat try: dat = fr[tmp][brd][attr][ind] except: continue try: if 'utc' in fr[tmp][brd].keys(): time = fr[tmp][brd]['utc'][0] else: time = fr['antenna0']['tracker']['utc'][0][:len(dat)] except: continue elif len(field_dat) == 4: stat, attr, ind, unit = field_dat if stat not in fr: # OnlinePointingModelCorrection continue try: dat = getattr(fr[stat], attr)[ind] time = getattr(fr[stat], 'time') except AttributeError: # OnlinePointingModel try: dat = fr[stat][attr][ind] except KeyError: # OnlinePointingModelCorrection continue time = fr[stat]['time'] elif len(field_dat) == 3: stat, attr, unit = field_dat if stat not in fr: # Field only exists in live data stream continue try: dat = getattr(fr[stat], attr) except AttributeError: try: dat = fr[stat][attr] except KeyError: # Field only exists in live data stream continue if 'Bench' in stat: # funny time field for bench positions time = fr['BenchSampleTime'] elif 'Mux' in stat: time = fr['MuxTime'] elif stat in ['CryoStatus', 'Weather', 'PTStatus']: time = fr['{}Time'.format(stat)] else: try: time = getattr(fr[stat], 'time') except AttributeError: time = fr[stat]['time'] elif len(field_dat) == 2: stat, unit = field_dat try: dat = fr[stat] except KeyError: #eg, no obsid core.log_warn('No key {}'.format(stat), unit='InfluxDB') continue try: time = getattr(fr[stat], 'time') except AttributeError as err: time = fr['antenna0']['tracker']['utc'][0] # InfluxDB wants time in nanoseconds since the UNIX epoch in UTC if isinstance(time, core.G3Time): time = np.asarray([time.time / U.nanosecond]) elif isinstance(time, core.G3VectorTime): time = np.asarray(time) / U.nanosecond else: try: time = np.asarray(core.G3VectorTime(np.atleast_1d(time))) / U.nanosecond except Exception as e: core.log_error("Error converting time: {}".format(str(e)), unit="InfluxDB") continue # Avoid out-of-range errors in parsing time # time < 0 occurs when, e.g., the SCU loses the clock (e.g. during a brownout) time[time < 0] = 0 if dat is None: core.log_warn('{} dat is None'.format(f), unit='InfluxDB') continue dat = np.atleast_1d(dat) try: dlen = len(dat) except TypeError: # sometimes source_name is a weird non-none value continue if unit is not None: if unit == 'C': zeropt_K = 273.15 cal_dat = dat / U.K - zeropt_K else: cal_dat = dat / unit else: cal_dat = dat try: if np.any(np.isnan(cal_dat)): continue except TypeError: pass if 'heat' not in f: tag = f else: tag = f.replace('heat_', '') # for fields that have az/el components az_el_names = ['az', 'el', 'az', 'el', 'ra', 'dec', 'x', 'y', 'hr_angle', 'sin', 'cos', 'lat'] tag2 = f for name in az_el_names: # require name_ at beginning or _name at end match1 = re.findall('^{}_'.format(name), f) match2 = re.findall('_{}$'.format(name), f) if len(match1): tag2 = f.replace(match1[0], '') if len(match2): tag2 = f.replace(match2[0], '') # also group source names if 'source' in f: tag2 = 'source' stat = 'TrackerPointing' if stat == 'PTStatus': groups = ['now', 'min', 'max'] for g in groups: match = re.findall('_{}$'.format(g), f) if len(match): tag2 = f.replace(match[0], '') # group bench positions # require bench_ at beginning match = re.findall('^bench', f) if len(match): tag2 = attr # y1, y2, etc stat = 'Bench' # group Mux properties if 'Mux' in stat: stat = 'muxHousekeeping' tag2 = 'ib'+f.split('ib')[-1] if 'linsens_coeff' in f: tag2 = 'linsens_coeff' if 'drive_currents_az' in f: tag2 = 'drive_currents_az' if 'drive_currents_el' in f: tag2 = 'drive_currents_el' dict_list += make_lines( measurement=stat, field=f, time=time, dat=cal_dat, tags={'label': tag, 'label2': tag2}, ) try: now = core.G3Time.Now() delay = float(now.time/U.nanosecond - time[-1])/1e9 if delay > 5: core.log_info( '{} Delay: {} s'.format(now.isoformat(), delay), unit='InfluxDB' ) except RuntimeError: # sometimes timestamp gets screwed up pass try: client.write_points( dict_list, batch_size=len(dict_list), protocol='line' ) except (InfluxDBClientError, InfluxDBServerError) as v: core.log_error( 'Error writing to database. {}'.format(v), unit='InfluxDB' )
@core.pipesegment def UpdateDB(pipe, client=None, fields=None): ''' Update InfluxDB with data in frame Arguments --------- client : InfluxDB client fields : Which gcp fields to add to database. See parse_field for options. If None, add all. ''' pipe.Add(ARCExtractor.ARCExtract) pipe.Add(WriteDB, client=client, fields=fields)