from .. import core
from . import DfMuxHousekeepingMap, HkBoardInfo, HkMezzanineInfo, HkModuleInfo, HkChannelInfo, DfMuxWiringMap, DfMuxChannelMapping
from .TuberClient import TuberClient
import socket, struct, time
import numpy
[docs]
@core.indexmod
class HousekeepingConsumer(object):
'''
Collect housekeeping data from the mux boards defined in the wiring map.
Will add a key called 'DfMuxHousekeeping' to any housekeeping frame that
goes by containing the data as of the arrival of the housekeeping frame.
Use in conjunction with a dfmux.PeriodicHousekeepingCollector to get
data at fixed intervals.
Also emits a Wiring frame from the housekeeping data. This requires a
recent (as of November 2018) version of pydfmux in order to read mapped
channel names from each board. If ignore_wiring=False, assumes that
the wiring map is handled by a separate process.
If collecting real-time data, you may want to set subprocess=True when
adding this module.
'''
def __init__(self, ignore_wiring=False):
self.tuber = {}
self.board_map = {}
self.board_serials = []
self.buffer = []
self.hwmf = None
self.ignore_wiring = ignore_wiring
[docs]
def map_boards(self):
'''
Cache IP address, crate serial and slot number and
create a TuberClient for each IceBoard.
'''
boards = [b for b in self.board_serials if b not in self.board_map]
if not boards:
return
ips = {}
crates = {}
slots = {}
# create tubers for each missing board
for board in boards:
ip = socket.inet_aton(socket.gethostbyname('iceboard{}.local'.format(board)))
ips[board] = struct.unpack("i", ip)[0]
self.tuber[board] = TuberClient(socket.inet_ntoa(ip), timeout=5.0)
# get crate ID for each missing board
for board in boards:
self.tuber[board].CallMethod('Dfmux', '_get_backplane_serial', False)
for board in boards:
reply = self.tuber[board].GetReply()[0]
if reply.get('error', None):
crates[board] = -1
else:
crates[board] = int(reply['result'])
# get slot ID for each missing board
for board in boards:
if crates[board] >= 0:
self.tuber[board].CallMethod('Dfmux', 'get_backplane_slot', False)
for board in boards:
if crates[board] < 0:
slots[board] = -1
else:
reply = self.tuber[board].GetReply()[0]
if reply.get('error', None):
slots[board] = -1
else:
slots[board] = int(reply['result'])
# store
for board in boards:
self.board_map[board] = (ips[board], crates[board], slots[board])
def __call__(self, frame):
# If boards have been mapped already, then process every frame
# as received.
if len(self.board_map):
return self.ProcessBuffered(frame)
# Otherwise, process at least one Timepoint frame first to gather
# a list of IceBoards for which housekeeping data are required,
# Then process any buffered frames after Timepoint frame,
# but emit before the Timepoint
if frame.type == core.G3FrameType.Timepoint:
tp_frames = self.ProcessBuffered(frame)
frames = sum([self.ProcessBuffered(fr) for fr in self.buffer], [])
frames += tp_frames
self.buffer = []
return frames
# Buffer until a Timepoint frame is received
self.buffer.append(frame)
return False
[docs]
def ProcessBuffered(self, frame):
'''
Process frames in the buffered order. Returned value should
always be a list of frames, possibly empty.
'''
if frame.type == core.G3FrameType.Timepoint:
self.board_serials = [('%04d' % k) for k in frame['DfMux'].keys()]
return [frame]
if frame.type == core.G3FrameType.Wiring:
if self.ignore_wiring:
core.log_warn(
"Received a wiring frame, which may be inconsistent with "
"board housekeeping data. Store mapped channel names on the "
"board using updated pydfmux/hidfmux software.",
unit="HousekeepingConsumer",
)
else:
core.log_fatal(
"Received spurious wiring frame. Do not use "
"PyDfMuxHardwareMapInjector with the HousekeepingConsumer. "
"You may update pydfmux/hidfmux to a newer version that "
"stores mapped channel names on the boards, and rearrange "
"your data acquisition script.",
unit="HousekeepingConsumer",
)
if frame.type == core.G3FrameType.Housekeeping:
self.map_boards()
hwm = DfMuxWiringMap()
hkdata = DfMuxHousekeepingMap()
try:
for board in self.board_serials:
self.tuber[board].CallMethod('Dfmux', '_dump_housekeeping', False)
time.sleep(0.02) # Stagger return data transfer a little to
# avoid overloading the network on the return
found = False
ismkid = False
for board in self.board_serials:
dat = self.tuber[board].GetReply()[0]['result']
boardhk = self.HousekeepingFromJSON(dat)
if boardhk.firmware_name:
ismkid = ismkid or "mkid" in boardhk.firmware_name.lower()
hkdata[int(boardhk.serial)] = boardhk
if self.ignore_wiring:
continue
ip, crate, slot = self.board_map[board]
boardw = self.WiringFromJSON(dat, ip, crate, slot)
for key in boardw.keys():
hwm[key] = boardw[key]
found = True
if not found and not self.ignore_wiring:
core.log_fatal("No mapped channels found on any IceBoards. "
"You may need to update pydfmux to a newer version of pydfmux "
"that stores mapped channel names on the boards, and reload "
"the hardware map.",
unit='HousekeepingConsumer')
frame['DfMuxHousekeeping'] = hkdata
if self.ignore_wiring:
return [frame]
hwmf = core.G3Frame(core.G3FrameType.Wiring)
hwmf['WiringMap'] = hwm
hwmf['ReadoutSystem'] = 'RF-ICE' if ismkid else 'ICE'
if self.hwmf is None:
self.hwmf = hwmf
# If this is the first time the consumer is triggered, make sure
# a Housekeeping and WiringMap frame are issued before any
# Timepoint frames
frames = [hwmf, frame]
return frames
else:
# Compare wiring maps and re-issue frame if anything has changed
old_hwm = self.hwmf['WiringMap']
if (set(hwm.keys()) ^ set(old_hwm.keys())):
self.hwmf = hwmf
return [hwmf, frame]
for k in hwm.keys():
try:
if vars(hwm[str(k)]) != vars(old_hwm[str(k)]):
self.hwmf = hwmf
return [hwmf, frame]
except:
core.log_error("Invalid HWM key %r" % k)
# If we get here then the wiring map hasn't changed,
# so return the populated Housekeeping frame as it is
return [frame]
except socket.timeout:
core.log_error('Timeout collecting housekeeping data from mux boards. Dropping housekeeping sample', unit='HousekeepingConsumer')
return []
except Exception as e:
core.log_error('Error (%s) collecting housekeeping data from mux boards. Dropping housekeeping sample' % e, unit='HousekeepingConsumer')
return []
return [frame]
[docs]
@classmethod
def HousekeepingFromJSON(cls, dat):
'''
Build HKBoardInfo object from a JSON blob returned by the
_dump_housekeeping call
'''
# Board-global quantities
boardhk = HkBoardInfo()
if 'is128x' in dat:
boardhk.is128x = dat['is128x']
else:
boardhk.is128x = False
year = dat['timestamp']['y']
if year == 0:
# It probably isn't 1900
systime = time.gmtime()
# Check for New Year's, assuming no more than 24 hours clock slew
if dat['timestamp']['d'] == 1 and systime.tm_yday >= 365:
year = systime.tm_year + 1
elif dat['timestamp']['d'] >= 365 and systime.tm_yday == 1:
year = systime.tm_year - 1
else:
year = systime.tm_year
year -= 2000
boardhk.timestamp = core.G3Time(y=year,d=dat['timestamp']['d'],h=dat['timestamp']['h'],m=dat['timestamp']['m'],s=dat['timestamp']['s'],ss=dat['timestamp']['ss'])
boardhk.timestamp_port = str(dat['timestamp_port'])
boardhk.serial = str(dat['serial'])
if 'firmware_name' in dat:
boardhk.firmware_name = str(dat['firmware_name'])
boardhk.firmware_version = str(dat['firmware_version'])
boardhk.fir_stage = dat['fir_stage']
for i in dat['currents'].items():
boardhk.currents[str(i[0])] = i[1]
for i in dat['voltages'].items():
boardhk.voltages[str(i[0])] = i[1]
for i in dat['temperatures'].items():
boardhk.temperatures[str(i[0])] = i[1]
# Mezzanines
for n, mezz in enumerate(dat['mezzanines']):
mezzhk = HkMezzanineInfo()
mezzhk.present = mezz['present']
mezzhk.power = mezz['power']
if mezzhk.present:
if 'ipmi' in mezz:
mezzhk.serial = str(mezz['ipmi']['product']['serial_number'])
mezzhk.part_number = str(mezz['ipmi']['product']['part_number'])
mezzhk.revision = str(mezz['ipmi']['product']['version_number'])
if 'currents' in mezz:
for i in mezz['currents'].items():
mezzhk.currents[str(i[0])] = i[1]
if 'voltages' in mezz:
for i in mezz['voltages'].items():
mezzhk.voltages[str(i[0])] = i[1]
if mezzhk.present and mezzhk.power:
if 'temperature' in mezz:
mezzhk.temperature = mezz['temperature']
# these parameters are not in the 64x housekeeping tuber
mezzhk.squid_heater = mezz.get('squid_heater', 0.0)
mezzhk.squid_controller_power = mezz.get('squid_controller_power', False)
mezzhk.squid_controller_temperature = mezz.get('squid_controller_temperature', 0.0)
# Modules
for m, mod in enumerate(mezz['modules']):
modhk = HkModuleInfo()
modhk.routing_type = str(mod['routing'][0])
modhk.module_number = m+1
if mezzhk.present and mezzhk.power:
if 'gains' in mod:
modhk.carrier_gain = mod['gains']['carrier']
modhk.nuller_gain = mod['gains']['nuller']
if 'overload' in mod:
modhk.carrier_railed = mod['overload']['carrier']
modhk.nuller_railed = mod['overload']['nuller']
modhk.demod_railed = mod['overload']['demod']
if 'squid_current_bias' in mod:
modhk.squid_current_bias = mod['squid_current_bias']
if 'squid_flux_bias' in mod:
modhk.squid_current_bias = mod['squid_flux_bias']
if 'squid_feedback' in mod:
modhk.squid_feedback = str(mod['squid_feedback'])
if 'nco_frequency' in mod:
modhk.nco_frequency = mod['nco_frequency'] * core.G3Units.Hz
if 'squid_tuning' in mod and mod['squid_tuning'] is not None:
modhk.squid_state = str(mod['squid_tuning']['state'])
modhk.squid_transimpedance = mod['squid_tuning']['transimpedance'] if mod['squid_tuning']['transimpedance'] is not None else numpy.nan
modhk.squid_p2p = mod['squid_tuning']['p2p'] if mod['squid_tuning']['p2p'] is not None else numpy.nan
for k, chan in enumerate(mod['channels']):
chanhk = HkChannelInfo()
chanhk.channel_number = k+1
chanhk.carrier_amplitude = chan['carrier_amplitude']
chanhk.nuller_amplitude = chan['nuller_amplitude']
if 'dan_gain' in chan:
chanhk.dan_gain = chan['dan_gain']
if 'dan_streaming_enable' in chan:
chanhk.dan_streaming_enable = chan['dan_streaming_enable']
if 'frequency' in chan:
chanhk.carrier_frequency = chan['frequency'] * core.G3Units.Hz
chanhk.demod_frequency = chan['frequency'] * core.G3Units.Hz
else:
chanhk.carrier_frequency = chan['carrier_frequency'] * core.G3Units.Hz
chanhk.demod_frequency = chan['demod_frequency'] * core.G3Units.Hz
if 'carrier_phase' in chan:
chanhk.carrier_phase = chan['carrier_phase'] * core.G3Units.deg
chanhk.nuller_phase = chan['nuller_phase'] * core.G3Units.deg
chanhk.demod_phase = chan['demod_phase'] * core.G3Units.deg
if 'dan_accumulator_enable' in chan:
chanhk.dan_accumulator_enable = chan['dan_accumulator_enable']
if 'dan_feedback_enable' in chan:
chanhk.dan_feedback_enable = chan['dan_feedback_enable']
if 'dan_railed' in chan:
chanhk.dan_railed = chan['dan_railed']
if 'tuning' in chan and chan['tuning'] is not None:
if "tune" in chan["tuning"] and "state" not in chan["tuning"]:
# KIDs
chanhk.state = "tuned" if chan["tuning"]["tune"] else "unknown"
else:
chanhk.state = str(chan["tuning"]["state"])
attrs = {
"rlatched": ("rlatched", core.G3Units.ohm),
"rnormal": ("rnormal", core.G3Units.ohm),
"rfrac_achieved": ("rfrac_achieved", 1),
"loopgain": ("loopgain", 1),
"i_slope": ("dIr", 1. / core.G3Units.Hz),
"q_slope": ("dQr", 1. / core.G3Units.Hz),
"internal_phase": ("internal_phase", core.G3Units.rad),
"external_phase": ("external_phase", core.G3Units.rad),
"bias_frequency": ("bias_freq", core.G3Units.Hz),
}
for attr, (na, ua) in attrs.items():
if chan["tuning"].get(na, None) is not None:
setattr(chanhk, attr, float(chan["tuning"][na]) * ua)
modhk.channels[k+1] = chanhk
mezzhk.modules[m+1] = modhk
boardhk.mezz[n+1] = mezzhk
return boardhk
[docs]
@classmethod
def WiringFromJSON(cls, dat, ip, crate, slot):
'''
Build WiringMap object from a JSON blob returned by the
_dump_housekeeping call
'''
found = False
hwm = DfMuxWiringMap()
serial = int(dat['serial'])
for imezz, mezz in enumerate(dat['mezzanines']):
for imod, mod in enumerate(mezz['modules']):
module = imod + len(mezz['modules']) * imezz
for ichan, chan in enumerate(mod['channels']):
name = (chan.get('tuning', {}) or {}).get('name', None)
if not name:
continue
mapping = DfMuxChannelMapping()
mapping.board_ip = ip
mapping.board_serial = serial
mapping.board_slot = slot
mapping.crate_serial = crate
mapping.module = module
mapping.channel = ichan
try:
name = str(name)
hwm[name] = mapping
found = True
except:
core.log_error("Invalid channel name %r" % (name))
if not found:
core.log_error("No mapped channels found on iceboard%04d. "
"You may need to update pydfmux to a newer version of pydfmux "
"that stores mapped channel names on the boards, and reload "
"the hardware map." % (serial),
unit='HousekeepingConsumer')
return hwm
[docs]
@classmethod
def HousekeepingForBoard(cls, hostname):
'''
Return HkBoardInfo object from a board. Useful for debugging.
'''
t = TuberClient(hostname)
data = t.CallMethod('Dfmux', '_dump_housekeeping')
return cls.HousekeepingFromJSON(data[0]['result'])
[docs]
@core.indexmod
class PeriodicHousekeepingCollector(object):
'''Inserts housekeeping frames every N timepoints.'''
def __init__(self, N=15200):
self.N = N
self.count = 0
def __call__(self, frame):
ret = []
if frame.type == core.G3FrameType.Timepoint:
if self.count % self.N == 0:
ret.append(core.G3Frame(core.G3FrameType.Housekeeping))
self.count += 1
ret.append(frame)
return ret
[docs]
@core.usefulfunc
def HousekeepingForBolo(hkmap, wiringmap, bolo, all_hk=False):
'''
Obtain the channel housekeeping information for a bolometer named "bolo"
using the passed housekeeping and wiring maps.
If all_hk is True, returns a tuple of the (board, mezz, module, channel)
HK data instead of just the channel.
'''
wiringentry = wiringmap[bolo]
board = hkmap[wiringentry.board_serial]
mezz = board.mezz[(wiringentry.module // len(board.mezz[1].modules)) + 1]
mod = mezz.modules[(wiringentry.module % len(mezz.modules)) + 1]
chan = mod.channels[wiringentry.channel + 1]
if all_hk:
return (board, mezz, mod, chan)
else:
return chan