Source code for spt3g.core.modconstruct

from . import G3Module, G3Pipeline, G3PipelineInfo, G3Frame, G3FrameType, G3Time, G3ModuleConfig
import types
import re
import textwrap


def g3decorator(func):
    """
    Mark argument as a decorator that can be found by automated
    documentation tools.
    """
    func.__g3decorator__ = True
    return func


[docs] @g3decorator def usefulfunc(func): ''' Mark argument as a useful function that can be found by automated documentation tools. Example ------- :: @core.usefulfunc def do_some_science(data): science(data) ''' func.__g3usefulfunc__ = True return func
# document thyself usefulfunc.__g3usefulfunc__ = True class PipeSegmentDocstring(type): @property def __doc__(cls): return ''' Use as a decorator for a pre-assembled set of pipeline modules. Makes a pseudo-module consisting of several inputs. Use this to introspect the segment to find out what it does, or use :func:`pipesegment_nodoc` if your module does anything complicated. Example ------- :: @core.pipesegment def standardfiltering( pipe, PolyOrder=4, MaskedHighPassEll=6000, Input='CalTimestreams', Output='FilteredTimestreams', ): pipe.Add(analysis.PolyFilter, PolyOrder=PolyOrder, Input=Input, Output='__Temp' + Output) pipe.Add(analysis.MaskedHighPass, MaskedHighPassEll=MaskedHighPassEll, Input='__Temp' + Output, Output=Output) def cleanup(frame): del frame['__Temp' + Output] pipe.Add(cleanup) pipe.Add(standardfiltering, PolyOrder=3) '''
[docs] @g3decorator class pipesegment(metaclass=PipeSegmentDocstring): def __init__(self, func): self.func = self.__wrapped__ = func self.__pipesegment__ = func.__pipesegment__ = True def __call__(self, pipe, *args, **kwargs): return self.func(pipe, *args, **kwargs) @property def __doc__(self): """ Create a dummy pipeline for introspection. Generates a docstring when the __doc__ attribute is accessed. """ if hasattr(self, "_autodoc"): return self._autodoc introdoc = textwrap.dedent(getattr(self.func, "__doc__", "")) or "" if introdoc: introdoc += "\n\n" introdoc += '\nEquivalent to\n-------------\n\n::\n\n' doclines = [] class PotemkinPipe(object): def Add(self, thing, *args, **kwargs): if hasattr(thing, '__wrapped__'): modname = thing.__wrapped__.__module__ else: modname = thing.__module__ doc = 'pipe.Add(%s.%s' % (modname, thing.__name__) for arg in args: doc += ', %s' % repr(arg) for arg in kwargs: # remove object hashes s = re.sub('<(.*) at (.*)>', '<\\1>', repr(kwargs[arg])) doc += ', %s=%s' % (arg, s) doc += ')' doclines.append(doc) fake = PotemkinPipe() try: self.func(fake) except Exception as e: doclines.append('Exception evaluating equivalence (%s)' % (str(e), )) introdoc += ' ' + '\n '.join(doclines) + '\n' self._autodoc = introdoc return self._autodoc @property def __name__(self): return self.func.__name__
[docs] @g3decorator def pipesegment_nodoc(func): """ Use as a decorator for a pre-assembled set of pipeline modules. Makes a pseudo-module consisting of several inputs. Use this variant instead of :class:`pipesegment` to avoid introspection if your pipeline does anything complicated. Example ------- :: @core.pipesegment_nodoc def standardfiltering( pipe, PolyOrder=4, MaskedHighPassEll=6000, Input='CalTimestreams', Output='FilteredTimestreams', ): pipe.Add(analysis.PolyFilter, PolyOrder=PolyOrder, Input=Input, Output='__Temp' + Output) pipe.Add(analysis.MaskedHighPass, MaskedHighPassEll=MaskedHighPassEll, Input='__Temp' + Output, Output=Output) def cleanup(frame): del frame['__Temp' + Output] pipe.Add(cleanup) pipe.Add(standardfiltering, PolyOrder=3) """ func.__pipesegment__ = True return func
[docs] @g3decorator def indexmod(func): ''' Mark argument as a processing module that can be found by automated documentation tools. Example ------- :: @core.indexmod def dostuff(frame): dosomestuff() ''' func.__g3module__ = True return func
def build_pymodule(pycallable, *args, **kwargs): '''Convert a python callable and arguments into a core.G3Module by hook or by crook''' from .g3logging import log_fatal if isinstance(pycallable, G3Module): return pycallable if not callable(pycallable): log_fatal('Argument not a python callable', unit = 'G3Pipeline') if type(pycallable) == types.FunctionType: class PyFuncModule(G3Module): def Process(self, fr): return pycallable(fr, *args, **kwargs) return PyFuncModule() # If this is not a function, and it is callable, it is a class. If it is # a non-instantiated class, instantiate it. try: # issubclass() throws TypeError if argument is instantiated issubclass(pycallable, G3Module) isclass = True except TypeError: isclass = False # Instantiate if necessary. Not in try block so we don't miss TypeError if isclass: pycallable = pycallable(*args, **kwargs) else: # No way to handle arguments in this call; assert there are none if len(args) != 0 or len(kwargs) != 0: log_fatal('Cannot pass through arguments when passed instantiated class', unit = 'G3Pipeline') # See if it was a Python G3Module subclass if isinstance(pycallable, G3Module): return pycallable # This is a python callable that is not a module, so wrap it class PyCallObjModule(G3Module): def Process(self, fr): return pycallable(fr) return PyCallObjModule() class _add_pipeline_info(G3Module): def __init__(self): from .. import version import socket, getpass G3Module.__init__(self) self.infoemitted = False self.buffer = [] self.pipelineinfo = G3PipelineInfo() self.pipelineinfo.vcs_url = version.upstream_url self.pipelineinfo.vcs_branch = version.upstream_branch self.pipelineinfo.vcs_revision = version.revision self.pipelineinfo.vcs_localdiffs = version.localdiffs self.pipelineinfo.vcs_versionname = version.versionname self.pipelineinfo.vcs_fullversion = version.fullversion self.pipelineinfo.vcs_githash = version.gitrevision self.pipelineinfo.hostname = socket.gethostname() self.pipelineinfo.user = getpass.getuser() def Process(self, fr): if self.infoemitted: if fr.type == G3FrameType.PipelineInfo and \ self.originalpi is not None and \ self.originalpi == list(fr.keys()): # Deduplicate PipelineInfo frames identical to one that we # added to earlier, which avoids false semi-duplicates down # the line when processing multiple files. return False return fr # Allow limited reordering of metadata if fr.type in [G3FrameType.Observation, G3FrameType.Wiring, G3FrameType.Calibration]: self.buffer.append(fr) return [] self.infoemitted = True if fr.type == G3FrameType.PipelineInfo: self.originalpi = list(fr.keys()) fr[str(G3Time.Now())] = self.pipelineinfo self.buffer.append(fr) else: self.originalpi = None f = G3Frame(G3FrameType.PipelineInfo) f[str(G3Time.Now())] = self.pipelineinfo self.buffer += [f, fr] rv = self.buffer del self.buffer return rv def PipelineAddCallable(self, callable, name=None, subprocess=False, **kwargs): ''' Add a processing module to the pipeline. It can be any subclass of spt3g.core.G3Module or any Python callable, either an instance or a class. Positional and keyword arguments are passed through to the argument's constructor (if a class) or as additional arguments to a function. If subprocess is set to True, this module will be run in a separate process. ''' addpipelineinfo = False if not hasattr(self, '_pipelineinfo'): self._pipelineinfo = _add_pipeline_info() addpipelineinfo = True if not hasattr(self, 'nameprefix'): self.nameprefix = '' if (hasattr(callable, '__name__')): callable_name = callable.__name__ elif (hasattr(callable, '__class__')): callable_name = callable.__class__.__name__ else: raise RuntimeError("Cannot establish name of pipeline module") if name is None: name = '%s.%s' % (callable.__module__, callable_name) name = self.nameprefix + name # Record module configuration for root objects if self.nameprefix == '': modconfig = G3ModuleConfig() modconfig.instancename = name modconfig.modname = '%s.%s' % (callable.__module__, callable_name) for k,v in kwargs.items(): tostore = v try: if v.npix_allocated > 0: # Don't store full sky maps as configuration options. It # just wastes a ton of disk space with simulations. tostore = v.clone(False) except: # If that threw an exception, it either isn't a map or dropping # data didn't work, so just don't bother. pass modconfig[k] = tostore self._pipelineinfo.pipelineinfo.modules.append(modconfig) # Deal with the segment case if hasattr(callable, '__pipesegment__'): # Prefix module names added by segments with the segment name oldnameprefix = self.nameprefix self.nameprefix = name + '/' rv = callable(self, **kwargs) self.nameprefix = oldnameprefix else: # Otherwise it's a module pymod = build_pymodule(callable, **kwargs) if subprocess: from .multiprocess import Subproc pymod = build_pymodule(Subproc(pymod, name=name)) rv = self._Add_(pymod, name=name) if addpipelineinfo: self._Add_(self._pipelineinfo, name='_pipelineinfo') return rv # Add this as G3Pipeline's Add method so it takes any Python callable G3Pipeline.Add = PipelineAddCallable G3Pipeline.__repr__ = lambda self: repr(self._pipelineinfo.pipelineinfo) if hasattr(self, '_pipelineinfo') else 'pipe = {}.G3Pipeline()'.format(G3Pipeline.__module__)