Main simulation class and command line client
%load_ext autoreload
%autoreload 2
hp.disable_warnings()
import hashlib
def md5sum_string(string):
return hashlib.md5(string.encode("utf-8")).hexdigest()
def md5sum_file(filename):
"""Compute md5 checksum of the contents of a file"""
return md5sum_string(open(filename, "r").read())
from s4_design_sim_tool.core import read_instrument_model
s4 = read_instrument_model()
exported_s4_channels = {}
for telescope, rows in zip(s4.group_by("telescope").groups.keys, s4.group_by("telescope").groups):
exported_s4_channels[telescope[0]] = list(rows["band"])
exported_s4_channels == s4_channels
assert parse_channels("LFL1") == ["LFL1"]
assert parse_channels(["LFL1"]) == ["LFL1"]
assert parse_channels("LFL1,LFL2") == ["LFL1", "LFL2"]
assert parse_channels("LAT") == ['ULFPL1',
'LFL1',
'LFPL1',
'LFL2',
'LFPL2',
'MFPL1',
'MFL1',
'MFL2',
'MFPL2',
'HFL1',
'HFPL1',
'HFL2',
'HFPL2']
assert len(parse_channels("all")) == 21
import collections
def merge_dict(d1, d2):
"""
Modifies d1 in-place to contain values from d2. If any value
in d1 is a dictionary (or dict-like), *and* the corresponding
value in d2 is also a dictionary, then merge them in-place.
"""
for k, v2 in d2.items():
v1 = d1.get(k) # returns None if v1 has no value for this key
if isinstance(v1, collections.Mapping) and isinstance(v2, collections.Mapping):
merge_dict(v1, v2)
else:
d1[k] = v2
def parse_config(*config_files):
"""Parse TOML configuration files
Later TOML configuration files override the previous ones,
dictionaries at the same level are merged.
Parameters
----------
config_files : one or more str
paths to TOML configuration files
Returns
-------
config : dict
parsed dictionary"""
config = toml.load(config_files[0])
for conf in config_files[1:]:
merge_dict(config, toml.load(conf))
return config
console_md5sum = !md5sum s4_design.toml
console_md5sum
assert md5sum_file("s4_design.toml") == console_md5sum[-1].split()[0]
class S4RefSimTool:
def __init__(self, config_filename, output_folder="output"):
"""Simulate CMB-S4 maps based on the experiment configuration
Parameters
----------
config : str or Path or List
CMB-S4 configuration stored in a TOML file
see for example s4_design.toml in the repository
It also supports multiple TOML files as a List, in this case
later files override configuration files of the earlier files.
check the `config` attribute to verify that the parsing behaved
as expected.
output_folder : str or Path
Output path
"""
self.config_filename = (
[config_filename]
if isinstance(config_filename, (str, Path))
else config_filename
)
self.config = parse_config(*self.config_filename)
self.output_filename_template = "cmbs4_KCMB_{telescope}-{band}_{site}_nside{nside}_{split}_of_{nsplits}.fits"
self.output_folder = Path(output_folder)
self.output_folder.mkdir(parents=True, exist_ok=True)
def run(self, channels="all", sites=["Pole", "Chile"]):
"""Run the simulation
Parameters
----------
channels : str or list[str]
list of channel tags, e.g.
* ["LFS1", "LFS2"] or
* "SAT" or "LAT"
* "all" (default)
site : list[str]
['Pole'] or ['Chile'], by default ["Pole", "Chile"]
"""
nsplits = self.config["experiment"].get("number_of_splits", 0)
if nsplits == 1:
nsplits = 0
assert (
nsplits < 8
), "We currently only have 7 independent realizations of atmosphere and noise"
conf_md5 = ",".join(map(md5sum_file, self.config_filename))
for site in sites:
for channel in parse_channels(channels):
if get_telecope_years(self.config, site, channel) == 0:
continue
telescope = get_telescope(channel)
subfolder = self.output_folder / f"{telescope}-{channel}_{site.lower()}"
subfolder.mkdir(parents=True, exist_ok=True)
log.info("Created output folder %s", str(subfolder))
for split in range(nsplits + 1):
nside = 512 if telescope == "SAT" else 4096
output_filename = self.output_filename_template.format(
nside=nside,
telescope=telescope,
band=channel,
site=site.lower(),
split=max(1, split), # split=0 is full mission and we want 1
nsplits=1 if split == 0 else nsplits,
)
if os.path.exists(subfolder / output_filename):
log.info("File %s already exists, SKIP", output_filename)
continue
if split == 0:
log.info(f"Simulate channel {channel} at {site}")
sky_emission = load_sky_emission(
self.config["sky_emission"], site, channel
)
output_map = np.zeros_like(sky_emission)
if self.config["experiment"].get("include_atmosphere", True):
output_map += load_atmosphere(
self.config, site, channel, realization=split
)
else:
log.info("Skip the atmosphere noise")
if self.config["experiment"].get("include_noise", True):
output_map += load_noise(
self.config, site, channel, realization=split
)
else:
log.info("Skip the instrument noise")
if split > 0:
output_map *= np.sqrt(nsplits)
output_map += sky_emission
# Use UNSEEN instead of nan for missing pixels
output_map[np.isnan(output_map)] = hp.UNSEEN
log.info(f"Writing {output_filename}")
noise_version = "1.0"
hp.write_map(
subfolder / output_filename,
output_map,
column_units="K_CMB",
extra_header=[
("SOFTWARE", "s4_design_sim_tool"),
("SW_VERS", __version__),
("SKY_VERS", "1.0"),
("ATM_VERS", "1.0"),
("NOI_VERS", noise_version),
("SITE", site),
("SPLIT", split),
("NSPLITS", nsplits),
("CHANNEL", channel),
("DATE", str(date.today())),
("CONFMD5", conf_md5),
],
coord="Q",
overwrite=True,
)
# only run of full mission and the first split
if split in [0, 1] and self.config["experiment"].get(
"include_noise", True
):
log.info(f"Loading hitmap and white noise covariance matrix")
if split == 0:
hitmap, wcov = load_hitmap_wcov(
self.config, site, channel, realization=0
)
else:
hitmap = np.round(hitmap / nsplits).astype(np.int64)
wcov = hp.ma(wcov) * nsplits
hitmap_filename = output_filename.replace("KCMB", "hitmap")
log.info(f"Writing {hitmap_filename}")
hp.write_map(
subfolder / hitmap_filename,
hitmap,
column_units="hits",
extra_header=[
("SOFTWARE", "s4_design_sim_tool"),
("SW_VERS", __version__),
("NOI_VERS", noise_version),
("SITE", site),
("SPLIT", split),
("NSPLITS", nsplits),
("CHANNEL", channel),
("DATE", str(date.today())),
("CONFMD5", conf_md5),
],
coord="Q",
overwrite=True,
)
wcov_filename = output_filename.replace("KCMB", "wcov")
log.info(f"Writing {wcov_filename}")
hp.write_map(
subfolder / wcov_filename,
wcov,
column_units="K_CMB**2",
extra_header=[
("SOFTWARE", "s4_design_sim_tool"),
("SW_VERS", __version__),
("NOI_VERS", noise_version),
("SITE", site),
("SPLIT", split),
("NSPLITS", nsplits),
("CHANNEL", channel),
("DATE", str(date.today())),
("CONFMD5", conf_md5),
],
coord="Q",
overwrite=True,
)
if split == 1:
del hitmap, wcov
def command_line_script(args=None):
import logging as log
log.basicConfig(level=log.INFO)
import argparse
parser = argparse.ArgumentParser(description="Run s4_design_sim_tool")
parser.add_argument("config", type=str, nargs="*", help="TOML Configuration files")
parser.add_argument(
"--channels",
type=str,
help="Channels e.g. all, SAT, LAT, LFL1 or comma separated list of channels",
required=False,
default="all",
)
parser.add_argument(
"--site",
type=str,
help="Pole, Chile or all, default all",
required=False,
default="all",
)
parser.add_argument(
"--output_folder",
type=str,
help="Output folder, optional",
required=False,
default="output",
)
res = parser.parse_args(args)
if res.site == "all":
sites = ["Chile", "Pole"]
else:
sites = [res.site]
sim = S4RefSimTool(res.config, output_folder=res.output_folder)
sim.run(channels=res.channels, sites=sites)
log.basicConfig(level = log.INFO)
sim = S4RefSimTool("s4_design.toml")
!ls output/
S4RefSimTool?
sim.run(channels="LFS1", sites=["Pole"])
%matplotlib inline
from astropy.io import fits
output_map, header = hp.read_map(
"output/SAT-LFS1_pole/cmbs4_KCMB_SAT-LFS1_pole_nside512_1_of_1.fits", (0,1,2),
h=True)
header
header_dict = {k:v for k,v in header}
assert header_dict["SW_VERS"] == __version__
assert header_dict["SOFTWARE"] == "s4_design_sim_tool"
np.min(output_map[1][output_map[1] != hp.UNSEEN]), np.max(output_map[1])
assert np.min(output_map[1][output_map[1] != hp.UNSEEN]) > -1e-2 and np.max(output_map[1]) < 1e-2, \
"Amplitude check failed"
hp.mollview(output_map[0], min=-1e-4, max=1e-4, unit="K", title="Total I")
hp.mollview(output_map[1], min=-1e-5, max=1e-5, unit="K", title="Total Q")
hp.mollview(output_map[2], min=-1e-5, max=1e-5, unit="K", title="Total U")
!rm -r output/SAT-LFS1_pole
%%file only_CMB_scalar.toml
[sky_emission]
foreground_emission = 0
CMB_unlensed = 1
CMB_lensing_signal = 1
CMB_tensor_to_scalar_ratio = 0
sim2 = S4RefSimTool(["s4_design.toml", "only_CMB_scalar.toml"])
assert sim2.config["sky_emission"]["foreground_emission"] == 0
assert sim2.config["sky_emission"]["CMB_tensor_to_scalar_ratio"] == 0
assert sim.config["telescopes"] == sim2.config["telescopes"]
!rm only_CMB_scalar.toml