Source code for spt3g.core.timestreamextensions

import numpy
from . import G3Timestream, DoubleVector, G3VectorDouble, G3TimestreamMap, G3VectorTime, G3Time, Int64Vector, G3VectorInt, \
    G3VectorComplexDouble, ComplexDoubleVector, BoolVector, G3VectorBool, IntVector, FloatVector, ComplexFloatVector
from . import G3Units, log_fatal, log_warn, usefulfunc, G3FrameObject

__all__ = ['concatenate_timestreams']

# Use numpy bindings rather than vector_indexing_suite for G3Timestream
def G3Timestream_getitem(x, y):
    if isinstance(y, slice):
        # If the argument is a slice, and the value would be a numpy array,
        # turn it into an appropriately sliced timestream, with appropriate
        # start and stop times. This is done in C++ for speed.
        it = x._cxxslice(y)
    else:
        it = numpy.asarray(x).__getitem__(y)

    return it

@property
def timestream_dtype(ts):
    """
    Numpy data type of stored data.
    """
    return numpy.asarray(ts).dtype

G3Timestream.__getitem__ = G3Timestream_getitem
G3Timestream.__setitem__ = lambda x, y, z: numpy.asarray(x).__setitem__(y, z)
G3Timestream.dtype = timestream_dtype
G3TimestreamMap.dtype = timestream_dtype

def timestreamastype(a, dtype):
    '''
    Convert timestream to a different data type. See numpy.array.astype()
    '''
    if a.dtype == numpy.dtype(dtype):
        return a
    out = G3Timestream(numpy.asarray(a).astype(dtype))
    out.units = a.units
    out.start = a.start
    out.stop = a.stop
    return out
G3Timestream.astype = timestreamastype

def timestreammapastype(a, dtype):
    '''
    Convert timestream map to a different data type.  See numpy.array.astype()
    '''
    if a.dtype == numpy.dtype(dtype):
        return a
    return G3TimestreamMap(a.keys(), numpy.asarray(a).astype(dtype), a.start, a.stop, a.units, 0, False)
G3TimestreamMap.astype = timestreammapastype

# XXX consider replacing all of this with the numpy __array_ufunc__ machinery

# Provide all the numpy binary arithmetic operators, first non-in-place, then
# in-place, with slightly different semantics
def numpybinarywrap(a, b, op):
    is_g3 = isinstance(a, G3FrameObject) or isinstance(b, G3FrameObject)
    is_tsa = isinstance(a, G3Timestream)
    is_tsb = isinstance(b, G3Timestream)
    is_cxa = isinstance(a, (G3VectorComplexDouble, ComplexDoubleVector, ComplexFloatVector))
    is_cxb = isinstance(b, (G3VectorComplexDouble, ComplexDoubleVector, ComplexFloatVector))
    cls = a.__class__
    if is_tsa:
        if is_tsb:
            a._assert_congruence(b)
        elif is_cxb:
            return NotImplemented
    elif is_cxa:
        pass
    elif is_tsb or is_cxb:
        return NotImplemented
    out = op(numpy.asarray(a), numpy.asarray(b))
    k = out.dtype.kind
    s = out.dtype.itemsize
    if k == 'c':
        if is_g3:
            return G3VectorComplexDouble(out)
        return ComplexDoubleVector(out) if s == 16 else ComplexFloatVector(out)
    elif k == 'b':
        return G3VectorBool(out) if is_g3 else BoolVector(G3VectorBool(out))
    elif is_tsa:
        out = G3Timestream(out)
        out.units = a.units
        out.start = a.start
        out.stop = a.stop
        return out
    elif k == 'f':
        if is_g3:
            return G3VectorDouble(out)
        return DoubleVector(out) if s == 8 else FloatVector(out)
    elif k in 'iu':
        out = out.astype(int)
        if is_g3:
            return G3VectorInt(out)
        return Int64Vector(out) if s == 8 else IntVector(out)
    return NotImplemented

def numpyinplacebinarywrap(a, b, op):
    if isinstance(a, G3Timestream) and isinstance(b, G3Timestream):
        a._assert_congruence(b)
    op(numpy.asarray(a), numpy.asarray(b))
    return a

all_cls = [G3Timestream, G3VectorDouble, DoubleVector, G3VectorInt, Int64Vector,
           G3VectorComplexDouble, ComplexDoubleVector, G3VectorBool, BoolVector,
           FloatVector, IntVector, ComplexFloatVector]

for attr in ['add', 'sub', 'mul', 'div', 'truediv', 'floordiv', 'mod', 'pow',
             'and', 'or', 'xor', 'lshift', 'rshift']:
    for cls in all_cls:
        x = '__{}__'.format(attr)
        if x in numpy.ndarray.__dict__:
            setattr(cls, x,
                    lambda a, b, op=numpy.ndarray.__dict__[x]: numpybinarywrap(a, b, op))
        x = '__r{}__'.format(attr)
        if x in numpy.ndarray.__dict__:
            setattr(cls, x,
                    lambda a, b, op=numpy.ndarray.__dict__[x]: numpybinarywrap(a, b, op))
        x = '__i{}__'.format(attr)
        if x in numpy.ndarray.__dict__:
            setattr(cls, x,
                    lambda a, b, op=numpy.ndarray.__dict__[x]: numpyinplacebinarywrap(a, b, op))

# unary operators
for x in ['__neg__', '__pos__', '__invert__', '__abs__', '__bool__']:
    if x not in numpy.ndarray.__dict__:
        continue
    for cls in all_cls:
        setattr(cls, x,
                lambda a, op=numpy.ndarray.__dict__[x]: a.__class__(op(numpy.asarray(a))))

# Bind some useful nativish binary operators
for x in ['__eq__', '__ge__', '__gt__', '__le__', '__lt__', '__ne__']:
    if x not in numpy.ndarray.__dict__:
        continue
    for cls in all_cls:
        setattr(cls, x,
                lambda a, b, op=numpy.ndarray.__dict__[x]: numpybinarywrap(a, b, op))

# Bind some useful methods
for x in ["sum", "mean", "any", "all", "min", "max", "argmin", "argmax", "var", "std"]:
    if x not in numpy.ndarray.__dict__:
        continue
    for cls in all_cls:
        def ufunc_wrapper(op):
            def ufunc(a, *args, **kwargs):
                return numpy.ndarray.__dict__[op](numpy.asarray(a), *args, **kwargs)
            ufunc.__doc__ = numpy.ndarray.__dict__[op].__doc__
            return ufunc
        setattr(cls, x, ufunc_wrapper(x))

# Bind some memory-efficient methods for G3TimestreamMap
for op in ["std", "var"]:
    def ufunc_wrapper(op):
        def ufunc(a, axis=None, dtype=None, out=None, **kwargs):
            bound_args = {}
            if op in ["std", "var"]:
                bound_args["ddof"] = kwargs.pop("ddof", 0)
            if not len(kwargs) and (axis == -1 or axis == 1):
                ret = numpy.asarray(getattr(a, "_c" + op)(**bound_args))
                if out is not None:
                    out[:] = ret
                    return out
                if dtype is not None:
                    return ret.astype(dtype)
                return ret
            kwargs.update(**bound_args)
            return numpy.ndarray.__dict__[op](numpy.asarray(a), axis, dtype, out, **kwargs)
        ufunc.__doc__ = numpy.ndarray.__dict__[op].__doc__
        return ufunc

    setattr(G3TimestreamMap, op, ufunc_wrapper(op))


#add concatenation routines to g3timestream objects
def _concatenate_timestreams(cls, ts_lst, ts_rounding_error=0.6, ts_interp_threshold=0):
    """
    Concatenate G3Timestream objects together.

    Arguments
    ---------
    ts_lst : list
        list of G3Timestream objects.
    ts_rounding_error : float
        allowed error in timestream separation such that timestreams are
        contiguous, as a fraction of the sample rate. This should be
        0 by default, but is 0.5 to allow for downsampler shifting,
        and then bumpted again to 0.6 to allow for floating-point
        errors in what 0.5 is.
    ts_interp_threshold : float
        allowed timestream separation below which gaps between timestreams are
        interpolated to be made continuous

    Returns
    -------
    ts : G3Timestream instance
        The concatenation of the input list of G3Timestream objects
    """
    #check for contiguous timestreams
    for i in range(1, len(ts_lst)):
        ts_sep = (ts_lst[i].start.time - ts_lst[i-1].stop.time) * ts_lst[i].sample_rate
        if numpy.abs(ts_sep - 1) > ts_rounding_error:
            if (ts_interp_threshold > 0) and ((ts_sep - 1) < ts_interp_threshold):
                log_warn("Timestreams are not contiguous: timestreams %d and %d "
                          "separated by %f samples (%s).  Interpolating." %
                         (i, i-1, ts_sep - 1, str(ts_lst[i].start)))
                v = numpy.linspace(ts_lst[i-1][-1], ts_lst[i][0], ts_sep + 1)[1:-1]
                ts_interp = cls(v)
                ts_interp.units = ts_lst[0].units
                ts_interp.start = ts_lst[i-1].stop + 1. / ts_lst[i].sample_rate
                ts_interp.stop = ts_lst[i].start - 1. / ts_lst[i].sample_rate
                ts_lst = ts_lst[:i] + [ts_interp] + ts_lst[i:]
            else:
                log_fatal("Timestreams are not contiguous: timestreams %d and %d "
                          "separated by %f samples (%s)" % (i, i-1, ts_sep - 1, str(ts_lst[i].start)))
        if (ts_lst[i].units != ts_lst[0].units):
            log_fatal("Timestreams are not the same units")
    out_ts = cls(numpy.concatenate(ts_lst))
    out_ts.units = ts_lst[0].units
    out_ts.start = ts_lst[0].start
    out_ts.stop = ts_lst[-1].stop
    return out_ts

G3Timestream.concatenate = classmethod(_concatenate_timestreams)


def _concatenate_timestream_maps(cls, ts_map_lst, ts_rounding_error=0.6, ts_interp_threshold=0, skip_missing=False):
    """
    Concatenate G3TimestreamMap objects together.

    Arguments
    ---------
    ts_map_lst : list
        list of G3TimestreamMap objects.
    ts_rounding_error : float
        allowed error in timestream separation such that timestreams are
        contiguous, as a fraction of the sample rate. This should be
        0 by default, but is 0.5 to allow for downsampler shifting,
        and then bumpted again to 0.6 to allow for floating-point
        errors in what 0.5 is.
    ts_interp_threshold : float
        allowed timestream separation below which gaps between timestreams are
        interpolated to be made continuous
    skip_missing : bool
        If True, include only the channels that are present in all of the
        input G3TimestreamMap object.  Otherwise, raises an error if any
        map does not have the same keys.

    Returns
    -------
    tsm : G3TimestreamMap instance
        The concatenation of the input list of G3TimestreamMap objects
    """
    keys = list(ts_map_lst[0].keys())
    skeys = set(keys)
    for tsm in ts_map_lst[1:]:
        if skip_missing:
            skeys &= set(tsm.keys())
            continue
        if set(tsm.keys()) != skeys:
            log_fatal("Timestream maps do not have the same keys")
    if skip_missing:
        # preserve key order
        keys = [k for k in keys if k in skeys]
    out_tsm = cls()
    for k in keys:
        ts_lst = [tsm[k] for tsm in ts_map_lst]
        out_tsm[k] = G3Timestream.concatenate(ts_lst, ts_rounding_error, ts_interp_threshold)
    return out_tsm

G3TimestreamMap.concatenate = classmethod(_concatenate_timestream_maps)


# global concatenation function
[docs] @usefulfunc def concatenate_timestreams(ts_lst, ts_rounding_error=0.6, ts_interp_threshold=0): """ Concatenate G3Timestream or G3TimestreamMap objects together. Arguments --------- ts_lst : list list of G3Timestream or G3TimestreamMap objects. Must all be the same type. ts_rounding_error : float allowed error in timestream separation such that timestreams are contiguous, as a fraction of the sample rate. This should be 0 by default, but is 0.5 to allow for downsampler shifting, and then bumpted again to 0.6 to allow for floating-point errors in what 0.5 is. ts_interp_threshold : float allowed timestream separation below which gaps between timestreams are interpolated to be made continuous Returns ------- ts : G3Timestream or G3TimestreamMap object The concatenation of the input list of objects """ return ts_lst[0].concatenate(ts_lst, ts_rounding_error, ts_interp_threshold)
@property def tsm_names(self): ''' Get timestream map channel names. ''' return list(self.keys()) G3TimestreamMap.names = tsm_names @property def tsm_data(self): ''' Return a numpy array view into the underlying 2D array of the timestream map ''' return numpy.asarray(self) G3TimestreamMap.data = tsm_data