Main simulation class and command line client
%load_ext autoreload
%autoreload 2
hp.disable_warnings()

md5sum_string[source]

md5sum_string(string)

md5sum_file[source]

md5sum_file(filename)

Compute md5 checksum of the contents of a file
import hashlib


def md5sum_string(string):
    return hashlib.md5(string.encode("utf-8")).hexdigest()


def md5sum_file(filename):
    """Compute md5 checksum of the contents of a file"""

    return md5sum_string(open(filename, "r").read())

parse_channels[source]

parse_channels(channels)

Parse a comma separated list of channels or all or SAT/LAT into channel tag list

Generate list of channels

This is hardcoded above so we don't need to read the instrument model for processing channels

from s4_design_sim_tool.core import read_instrument_model
s4 = read_instrument_model()
exported_s4_channels = {}

for telescope, rows in zip(s4.group_by("telescope").groups.keys, s4.group_by("telescope").groups):
    exported_s4_channels[telescope[0]] = list(rows["band"])
exported_s4_channels == s4_channels
True
assert parse_channels("LFL1") == ["LFL1"]
assert parse_channels(["LFL1"]) == ["LFL1"]
assert parse_channels("LFL1,LFL2") == ["LFL1", "LFL2"]
assert parse_channels("LAT") == ['ULFPL1',
  'LFL1',
  'LFPL1',
  'LFL2',
  'LFPL2',
  'MFPL1',
  'MFL1',
  'MFL2',
  'MFPL2',
  'HFL1',
  'HFPL1',
  'HFL2',
  'HFPL2']
assert len(parse_channels("all")) == 21

merge_dict[source]

merge_dict(d1, d2)

Modifies d1 in-place to contain values from d2.  If any value
in d1 is a dictionary (or dict-like), *and* the corresponding
value in d2 is also a dictionary, then merge them in-place.

parse_config[source]

parse_config(*config_files)

Parse TOML configuration files

Later TOML configuration files override the previous ones,
dictionaries at the same level are merged.

Parameters
----------
config_files : one or more str
    paths to TOML configuration files

Returns
-------
config : dict
    parsed dictionary
import collections


def merge_dict(d1, d2):
    """
    Modifies d1 in-place to contain values from d2.  If any value
    in d1 is a dictionary (or dict-like), *and* the corresponding
    value in d2 is also a dictionary, then merge them in-place.
    """
    for k, v2 in d2.items():
        v1 = d1.get(k)  # returns None if v1 has no value for this key
        if isinstance(v1, collections.Mapping) and isinstance(v2, collections.Mapping):
            merge_dict(v1, v2)
        else:
            d1[k] = v2


def parse_config(*config_files):
    """Parse TOML configuration files

    Later TOML configuration files override the previous ones,
    dictionaries at the same level are merged.

    Parameters
    ----------
    config_files : one or more str
        paths to TOML configuration files

    Returns
    -------
    config : dict
        parsed dictionary"""
    config = toml.load(config_files[0])
    for conf in config_files[1:]:
        merge_dict(config, toml.load(conf))
    return config
console_md5sum = !md5sum s4_design.toml
console_md5sum
['Warning: Python module not loaded, you already have Python loaded via conda init',
 '2291dbc6667564aed8259956f968cba7  s4_design.toml']
assert md5sum_file("s4_design.toml") == console_md5sum[-1].split()[0]

class S4RefSimTool[source]

S4RefSimTool(config_filename, output_folder='output')

class S4RefSimTool:
    def __init__(self, config_filename, output_folder="output"):
        """Simulate CMB-S4 maps based on the experiment configuration

        Parameters
        ----------
        config : str or Path or List
            CMB-S4 configuration stored in a TOML file
            see for example s4_design.toml in the repository
            It also supports multiple TOML files as a List, in this case
            later files override configuration files of the earlier files.
            check the `config` attribute to verify that the parsing behaved
            as expected.
        output_folder : str or Path
            Output path
        """
        self.config_filename = (
            [config_filename]
            if isinstance(config_filename, (str, Path))
            else config_filename
        )

        self.config = parse_config(*self.config_filename)
        self.output_filename_template = "cmbs4_KCMB_{telescope}-{band}_{site}_nside{nside}_{split}_of_{nsplits}.fits"
        self.output_folder = Path(output_folder)
        self.output_folder.mkdir(parents=True, exist_ok=True)

    def run(self, channels="all", sites=["Pole", "Chile"]):
        """Run the simulation

        Parameters
        ----------
        channels : str or list[str]
            list of channel tags, e.g.
            * ["LFS1", "LFS2"] or
            * "SAT" or "LAT"
            * "all" (default)
        site : list[str]
            ['Pole'] or ['Chile'], by default ["Pole", "Chile"]
        """
        nsplits = self.config["experiment"].get("number_of_splits", 0)
        if nsplits == 1:
            nsplits = 0
        assert (
            nsplits < 8
        ), "We currently only have 7 independent realizations of atmosphere and noise"
        conf_md5 = ",".join(map(md5sum_file, self.config_filename))
        for site in sites:
            for channel in parse_channels(channels):

                if get_telecope_years(self.config, site, channel) == 0:
                    continue
                telescope = get_telescope(channel)
                subfolder = self.output_folder / f"{telescope}-{channel}_{site.lower()}"
                subfolder.mkdir(parents=True, exist_ok=True)
                log.info("Created output folder %s", str(subfolder))

                for split in range(nsplits + 1):
                    nside = 512 if telescope == "SAT" else 4096
                    output_filename = self.output_filename_template.format(
                        nside=nside,
                        telescope=telescope,
                        band=channel,
                        site=site.lower(),
                        split=max(1, split),  # split=0 is full mission and we want 1
                        nsplits=1 if split == 0 else nsplits,
                    )
                    if os.path.exists(subfolder / output_filename):
                        log.info("File %s already exists, SKIP", output_filename)
                        continue

                    if split == 0:
                        log.info(f"Simulate channel {channel} at {site}")
                        sky_emission = load_sky_emission(
                            self.config["sky_emission"], site, channel
                        )

                    output_map = np.zeros_like(sky_emission)
                    if self.config["experiment"].get("include_atmosphere", True):
                        output_map += load_atmosphere(
                            self.config, site, channel, realization=split
                        )
                    else:
                        log.info("Skip the atmosphere noise")

                    if self.config["experiment"].get("include_noise", True):
                        output_map += load_noise(
                            self.config, site, channel, realization=split
                        )

                    else:
                        log.info("Skip the instrument noise")

                    if split > 0:
                        output_map *= np.sqrt(nsplits)
                    output_map += sky_emission
                    # Use UNSEEN instead of nan for missing pixels
                    output_map[np.isnan(output_map)] = hp.UNSEEN

                    log.info(f"Writing {output_filename}")
                    noise_version = "1.0"
                    hp.write_map(
                        subfolder / output_filename,
                        output_map,
                        column_units="K_CMB",
                        extra_header=[
                            ("SOFTWARE", "s4_design_sim_tool"),
                            ("SW_VERS", __version__),
                            ("SKY_VERS", "1.0"),
                            ("ATM_VERS", "1.0"),
                            ("NOI_VERS", noise_version),
                            ("SITE", site),
                            ("SPLIT", split),
                            ("NSPLITS", nsplits),
                            ("CHANNEL", channel),
                            ("DATE", str(date.today())),
                            ("CONFMD5", conf_md5),
                        ],
                        coord="Q",
                        overwrite=True,
                    )
                    # only run of full mission and the first split
                    if split in [0, 1] and self.config["experiment"].get(
                        "include_noise", True
                    ):

                        log.info(f"Loading hitmap and white noise covariance matrix")
                        if split == 0:
                            hitmap, wcov = load_hitmap_wcov(
                                self.config, site, channel, realization=0
                            )
                        else:
                            hitmap = np.round(hitmap / nsplits).astype(np.int64)
                            wcov = hp.ma(wcov) * nsplits

                        hitmap_filename = output_filename.replace("KCMB", "hitmap")
                        log.info(f"Writing {hitmap_filename}")
                        hp.write_map(
                            subfolder / hitmap_filename,
                            hitmap,
                            column_units="hits",
                            extra_header=[
                                ("SOFTWARE", "s4_design_sim_tool"),
                                ("SW_VERS", __version__),
                                ("NOI_VERS", noise_version),
                                ("SITE", site),
                                ("SPLIT", split),
                                ("NSPLITS", nsplits),
                                ("CHANNEL", channel),
                                ("DATE", str(date.today())),
                                ("CONFMD5", conf_md5),
                            ],
                            coord="Q",
                            overwrite=True,
                        )

                        wcov_filename = output_filename.replace("KCMB", "wcov")
                        log.info(f"Writing {wcov_filename}")

                        hp.write_map(
                            subfolder / wcov_filename,
                            wcov,
                            column_units="K_CMB**2",
                            extra_header=[
                                ("SOFTWARE", "s4_design_sim_tool"),
                                ("SW_VERS", __version__),
                                ("NOI_VERS", noise_version),
                                ("SITE", site),
                                ("SPLIT", split),
                                ("NSPLITS", nsplits),
                                ("CHANNEL", channel),
                                ("DATE", str(date.today())),
                                ("CONFMD5", conf_md5),
                            ],
                            coord="Q",
                            overwrite=True,
                        )
                        if split == 1:
                            del hitmap, wcov

command_line_script[source]

command_line_script(args=None)

def command_line_script(args=None):

    import logging as log

    log.basicConfig(level=log.INFO)

    import argparse

    parser = argparse.ArgumentParser(description="Run s4_design_sim_tool")
    parser.add_argument("config", type=str, nargs="*", help="TOML Configuration files")
    parser.add_argument(
        "--channels",
        type=str,
        help="Channels e.g. all, SAT, LAT, LFL1 or comma separated list of channels",
        required=False,
        default="all",
    )
    parser.add_argument(
        "--site",
        type=str,
        help="Pole, Chile or all, default all",
        required=False,
        default="all",
    )
    parser.add_argument(
        "--output_folder",
        type=str,
        help="Output folder, optional",
        required=False,
        default="output",
    )
    res = parser.parse_args(args)
    if res.site == "all":
        sites = ["Chile", "Pole"]
    else:
        sites = [res.site]
    sim = S4RefSimTool(res.config, output_folder=res.output_folder)
    sim.run(channels=res.channels, sites=sites)
log.basicConfig(level = log.INFO)
sim = S4RefSimTool("s4_design.toml")
!ls output/
Warning: Python module not loaded, you already have Python loaded via conda init
C_ell_LAT_1.pkl
s4_reference_design_cmb_r0
s4_reference_design_cmb_tensor_only_r3e-3
s4_reference_design_foregrounds
s4_reference_design_noise_atmo_7splits
S4RefSimTool?
Init signature: S4RefSimTool(config_filename, output_folder='output')
Docstring:      <no docstring>
Init docstring:
Simulate CMB-S4 maps based on the experiment configuration

Parameters
----------
config : str or Path or List
    CMB-S4 configuration stored in a TOML file
    see for example s4_design.toml in the repository
    It also supports multiple TOML files as a List, in this case
    later files override configuration files of the earlier files.
    check the `config` attribute to verify that the parsing behaved
    as expected.
output_folder : str or Path
    Output path
Type:           type
Subclasses:     
sim.run(channels="LFS1", sites=["Pole"])
INFO:root:Created output folder output/SAT-LFS1_pole
INFO:root:Simulate channel LFS1 at Pole
INFO:root:Configuration {'foreground_emission': 1, 'CMB_unlensed': 1, 'CMB_lensing_signal': 1, 'CMB_tensor_to_scalar_ratio': 0.005}
INFO:root:Processing foreground_emission
INFO:root:Processing CMB_unlensed
INFO:root:Processing CMB_lensing_signal
INFO:root:Processing CMB_tensor_to_scalar_ratio
INFO:root:Skip the atmosphere noise
INFO:root:Skip the instrument noise
INFO:root:Writing cmbs4_KCMB_SAT-LFS1_pole_nside512_1_of_1.fits
%matplotlib inline
from astropy.io import fits
output_map, header = hp.read_map(
    "output/SAT-LFS1_pole/cmbs4_KCMB_SAT-LFS1_pole_nside512_1_of_1.fits", (0,1,2),
h=True)
header
[('XTENSION', 'BINTABLE'),
 ('BITPIX', 8),
 ('NAXIS', 2),
 ('NAXIS1', 12288),
 ('NAXIS2', 3072),
 ('PCOUNT', 0),
 ('GCOUNT', 1),
 ('TFIELDS', 3),
 ('TTYPE1', 'TEMPERATURE'),
 ('TFORM1', '1024E'),
 ('TUNIT1', 'K_CMB'),
 ('TTYPE2', 'Q_POLARISATION'),
 ('TFORM2', '1024E'),
 ('TUNIT2', 'K_CMB'),
 ('TTYPE3', 'U_POLARISATION'),
 ('TFORM3', '1024E'),
 ('TUNIT3', 'K_CMB'),
 ('PIXTYPE', 'HEALPIX'),
 ('ORDERING', 'RING'),
 ('COORDSYS', 'Q'),
 ('EXTNAME', 'xtension'),
 ('NSIDE', 512),
 ('FIRSTPIX', 0),
 ('LASTPIX', 3145727),
 ('INDXSCHM', 'IMPLICIT'),
 ('OBJECT', 'FULLSKY'),
 ('SOFTWARE', 's4_design_sim_tool'),
 ('SW_VERS', '1.1.3'),
 ('SKY_VERS', '1.0'),
 ('ATM_VERS', '1.0'),
 ('NOI_VERS', '1.0'),
 ('SITE', 'Pole'),
 ('SPLIT', 0),
 ('NSPLITS', 0),
 ('CHANNEL', 'LFS1'),
 ('DATE', '2021-04-23'),
 ('CONFMD5', '2291dbc6667564aed8259956f968cba7')]
header_dict = {k:v for k,v in header}
assert header_dict["SW_VERS"] == __version__
assert header_dict["SOFTWARE"] == "s4_design_sim_tool"
np.min(output_map[1][output_map[1] != hp.UNSEEN]), np.max(output_map[1])
(-7.488029496016679e-06, 7.983601972227916e-05)
assert np.min(output_map[1][output_map[1] != hp.UNSEEN]) > -1e-2 and np.max(output_map[1]) < 1e-2, \
    "Amplitude check failed"
hp.mollview(output_map[0], min=-1e-4, max=1e-4, unit="K", title="Total I")
hp.mollview(output_map[1], min=-1e-5, max=1e-5, unit="K", title="Total Q")
hp.mollview(output_map[2], min=-1e-5, max=1e-5, unit="K", title="Total U")
!rm -r output/SAT-LFS1_pole
Warning: Python module not loaded, you already have Python loaded via conda init

Test multiple TOML files

%%file only_CMB_scalar.toml
[sky_emission]
foreground_emission = 0
CMB_unlensed = 1
CMB_lensing_signal = 1
CMB_tensor_to_scalar_ratio = 0
Writing only_CMB_scalar.toml
sim2 = S4RefSimTool(["s4_design.toml", "only_CMB_scalar.toml"])
/global/homes/z/zonca/condajupynersc/lib/python3.7/site-packages/ipykernel_launcher.py:14: DeprecationWarning: Using or importing the ABCs from 'collections' instead of from 'collections.abc' is deprecated, and in 3.8 it will stop working
  
assert sim2.config["sky_emission"]["foreground_emission"] == 0
assert sim2.config["sky_emission"]["CMB_tensor_to_scalar_ratio"] == 0
assert sim.config["telescopes"] == sim2.config["telescopes"]
!rm only_CMB_scalar.toml
Warning: Python module not loaded, you already have Python loaded via conda init