mpi

mpi contains several modules build to interface with HPC systems using the installed MPI library (through mpi4py). This comes in two pieces: parallelization of frame processing and accumulation.

Frame IO Parallelization

The MPIFileIO sub-module contains a few modules that will add frame-level parallelism to a G3Pipeline by reading files within an observation in parallel and (optionally, with MPIIODistributor or MPIFrameParallelizer) distributes the data frame-by-frame across any number of processes with an M:N IO:CPU node distribution.

This processing will make sure all metadata frames are seen by all processes in the same order with respect to both each other and all data frames and are thus strongly ordered across all processes. Data frames (scans, typically) are ordered on a node, but are not ordered between nodes and thus should be considered weakly ordered. For a frame sequence MN12345PQ67, where letters denote metadata frames (calibration, etc.) and numbers data frames, a possible ordering seen by the pipelines on two processing nodes would be MN134PQ7 and MN25PQ6, respectively. A consequence of these ordering rules is that you must not use any modules that depend on frame ordering and continuity. Examples of such would be modules that buffer multiple scans together to get longer-time-period data; by their nature, these only work for observation-level parallelism, rather than frame-level or file-level.

Parallelization in this sense can be achieved by replacement of G3Reader in a normal G3Pipeline with MPIIODistributor.

Frame Accumulation

At the end of a sequence of parallel pipelines, you in general want to join the data from all the processes in the communicator back together again. Typical cases are stitching processed scans back together again, for example to feed to a maximum-likelihood map-maker or some other algorithm that needs very large quantities of distributed data, or doing a parallel reduction, for example by coadding map frames produced by N parallel map-makers.

The first of these cases (restitching an observation processed on many parallel processes) is more complicated than the second, so this library provides a helper module (MPIAccumulator) that does it for you, placing the result in a member variable inspected after the pipelines end.

The second case is easier, but potentially common enough that a module should be added in future.

Interface to TOAST

There is an experimental module (TOASTFiller) that uses MPIAccumulator to fill frame contents after processing in many parallel pipelines into a TOAST TOD object in order to use TOAST’s set of parallel timestream algorithms and map-making. Basic functionality is present, but it should be considered in-development at present.

Modules in spt3g.mpi

spt3g.mpi.MPIAccumulator.MPIAccumulator

Accumulate data from many frames in memory, sharing metadata for all the frames between nodes in an MPI communicator, potentially organized into groups (for example by observation ID).

The result will be stored in the member variable ‘fullobs’ after pipeline termination. ‘fullobs’ is a dictionary containing entries for each data group (see the extractfunc parameter), which in turn are lists of 3-element tuples with the following content:

  • First element is the metadata for the frame (see extractfunc)

  • Second element is the node on which the full data exist

  • Third element is the data from the frame (see extractfunc) or None if the data are on a remote process.

Constructor:

MPIAccumulator(mpicomm, extractfunc=None, sorter=None, dataframes=[spt3g.core.G3FrameType.Scan, spt3g.core.G3FrameType.Timepoint])

Constructor:

Data will be shared between nodes participating in the communicator mpicomm. The data stored are based on frames of the types in the dataframes argument and are organized based on the results of extractfunc and sorter.

extractfunc is expected to return a three-element tuple:

  • First element is a group ID for the data (for example, an observation ID). This is used only for organization and can be anything that Python can compare. By default, this is SourceName-ObservationID.

  • Second element is metadata. This information is shared between all processes participating in the communicator at completion and should include any information you need to assess whether the data are interesting for future calculations (for example, the start time of the frame). By default, frames are sorted by this value. None by default.

  • Third element is the data. This stays node-local. It could be just a copy of the frame (the default) or just the information (in any format understandable to Python) from the frame you expect to use later.

Note that extractfunc is called on all frame types (including calibration data) but only the results if it returns non-None values are stored. Thus, it is the responsibility of extractfunc to either cache any applicable calibration, etc. information and store it in the return value for data frames if needed or explicitly return a value for calibration data, if needed. The default extractfunc saves only the frames in the “dataframes” argument.

sorter is used, if defined, to sort the list of frame data in each group. This is passed to the Python sorted() function as a ‘key’ argument and receives the three-element tuple stored for each set of frame data (metadata, node, data). By default, does no sorting (frames will still appear in a common order across nodes related to the data distribution if unsorted).

spt3g.mpi.MPIFileIO.MPIFileReader

Do parallel I/O and processing across an MPI communicator. The style of parallelism here is that each process in the communicator gets a file, reads the file, and processes the file. Supports shared files, which are read from the lead process and then broadcasted to the others before they read their own.

The list of files is specified as a list of lists. For example, [‘offline_calibration.g3’, [‘0000.g3’, ‘0002.g3’, ‘0003.g3’]] will cause all nodes to process ‘offline_calibration.g3’ and then the remaining files (in the parallel block denoted by the nested list) will be read and processed by the remaining nodes in a distributed way.

Constructor:

MPIFileReader(mpicomm, files)

spt3g.mpi.MPIFileIO.MPIFrameParallelizer

Do parallel I/O and processing across an MPI communicator. The style of parallelism here is that a set of IO processes read files from disk and distribute frames round-robin-style across a worker communicator. Frames will arrive in order on each process, with gaps between time segments, but no guarantees are made about relative order between processes. All CPU nodes receive all metadata frames.

Rules to make this work: - First module on IO nodes must be MPIFileReader - Second module on IO nodes must be DeduplicateMetadata - Third module on IO nodes, first on workers, must be MPIFrameParallelizer

You may want to use the MPIIODistributor pipe segment to make sure these rules are followed.

Constructor:

MPIFrameParallelizer(iocomm, cpucomm, cpucomm_startrank, dataframetype=[spt3g.core.G3FrameType.Timepoint, spt3g.core.G3FrameType.Scan])

Constructor:

Parellizes a frame stream across a communicator, distributing frames from M IO processes that are reading them to N CPU processes that are analyzing them. iocomm is a communicator containing exactly the set of IO nodes. cpucomm is a communicator containing (at least) all the IO and all the CPU nodes that the IO and CPU nodes can use to communicate with each other. The set of CPU nodes is the set of processes in cpucomm with rank greater than or equal to cpucomm_startrank. All frame types other than those in dataframetype appear on all CPU nodes, while frames of types in dataframetype are processed only by a single (random) CPU node.

spt3g.mpi.MPIFileIO.MPIIODistributor

Read files from disk using the first n_io processes in mpicomm (COMM_WORLD by default), with processing of frames in those files occurring on the other processes in mpicomm. See documentation for MPIFileReader for the format of the files argument and MPIFrameParallelizer for information on the semantics of processing. Add this as the first module in your pipeline in place of core.G3Reader.

Equivalent to:

Exception evaluating equivalence (No module named 'mpi4py')
Definition:

MPIIODistributor(pipe, mpicomm=None, n_io=10, files=[])