from .. import core
from . import HealpixSkyMap, FlatSkyMap, G3SkyMapWeights, G3SkyMapMask
from . import MapPolType, MapPolConv, MapCoordReference, MapProjection
import numpy as np
import os
import warnings
__all__ = [
'load_skymap_fits',
'save_skymap_fits',
'load_skymapmask_fits',
'save_skymapmask_fits',
'SaveMapFrame',
]
[docs]
@core.usefulfunc
def load_skymap_fits(filename, hdu=None, keys=None, memmap=False, apply_units=False):
"""
Load a fits file containing a sky map.
Arguments
---------
filename : str
Path to fits file
hdu : int, optional
If supplied, the data are extract from the given HDU index.
keys : list of strings, optional
If supplied, return only these keys in the output dictionary.
Options are: T, Q, U, W.
memmap : bool, optional
Argument passed to astropy.io.fits.open. If True, the map is not read
into memory, but only the required pixels are read when needed. Default:
False.
apply_units : bool, optional
If True, and input maps have known units, multiply by the appropriate
conversion factor to return maps in G3Units.
Returns
-------
a dictionary of maps keyed with e.g. 'T', 'Q', 'U' and 'W'.
"""
import astropy.io.fits
assert(os.path.exists(filename))
map_type = None
pol = None
map_opts = {}
output = {}
# defaults for missing header entries
pol_conv = None
units = 'Tcmb'
coord_ref = 'Equatorial'
weighted = False
unit_dict = {
'k_cmb': ('Tcmb', core.G3Units.K),
'kcmb': ('Tcmb', core.G3Units.K),
'kcmb^2': ('Tcmb', core.G3Units.K ** 2),
}
if keys is None:
keys = ['T', 'Q', 'U', 'W']
with astropy.io.fits.open(filename, memmap=memmap) as hdulist:
for hidx, H in enumerate(hdulist):
hdr = H.header
mtype = hdr.get('MAPTYPE', None)
if mtype == 'FLAT' or 'PROJ' in hdr or 'WCSAXES' in hdr or 'CTYPE1' in hdr:
# flat map
if not map_type:
map_type = 'flat'
elif map_type != 'flat':
raise ValueError(
"Expected a {} sky map in HDU {}, found a flat map".format(
map_type, hidx
)
)
# expect that flat sky maps on disk are probably in IAU
if not pol_conv:
pol_conv = 'IAU'
elif mtype == 'HEALPIX' or hdr.get('PIXTYPE', None) == 'HEALPIX' or 'NSIDE' in hdr:
# healpix map
if not map_type:
map_type = 'healpix'
elif map_type != 'healpix':
raise ValueError(
"Expected a {} sky map in HDU {}, found a healpix map".format(
map_type, hidx
)
)
# expect that healpix maps on disk are probably in COSMO
if not pol_conv:
pol_conv = 'COSMO'
if 'POLAR' in hdr:
pdict = {'T': True, 'F': False}
pol = pdict.get(hdr['POLAR'], hdr['POLAR'])
if pol and map_type is not None:
if hdr.get('POLCCONV', '').upper() not in ['IAU', 'COSMO']:
core.log_warn('Polarization convention not set, assuming %s' % pol_conv,
unit='load_skymap_fits')
else:
pol_conv = hdr['POLCCONV'].upper()
uconv = None
if 'TUNIT' in hdr:
u = hdr['TUNIT'].lower()
if u in unit_dict:
units, uconv = unit_dict[u]
else:
units = hdr.get('UNITS', units)
if 'COORDSYS' in hdr:
cdict = {'C': 'Equatorial', 'G': 'Galactic', 'L': 'Local'}
coord_ref = cdict.get(hdr['COORDSYS'], coord_ref)
else:
coord_ref = hdr.get('COORDREF', coord_ref)
overflow = hdr.get('OVERFLOW', 0)
map_opts.update(
units=getattr(core.G3TimestreamUnits, units),
coord_ref=getattr(MapCoordReference, coord_ref),
pol_conv=getattr(MapPolConv, pol_conv) if pol_conv else None,
)
if map_type == 'flat':
map_opts.update(flat_pol=hdr.get('FLATPOL', False))
map_opts.update(parse_wcs_header(hdr))
elif map_type == 'healpix':
nside = hdr['NSIDE']
nested = hdr['ORDERING'].strip().lower() in ['nest', 'nested']
map_opts.update(nested=nested)
# primary HDU
if H.data is None:
continue
# extracting a particular HDU
if map_type is not None and hdu is not None and hidx != hdu:
continue
if map_type == 'flat':
if hdr.get('ISWEIGHT', None):
if 'W' not in keys:
continue
else:
if hdr.get('POLTYPE', 'T') not in keys:
continue
data = np.array(H.data, dtype=float)
if map_opts.pop('transpose', False):
data = np.array(data.T)
if uconv is not None:
data *= uconv
# map type must be known if the HDU contains map data
if map_type not in ['flat', 'healpix']:
raise ValueError("Unknown map type in HDU {}".format(hidx))
if map_type == 'flat' and hdr.get('ISWEIGHT', None):
# flat map weights
assert('T' in output)
if 'W' not in output:
output['W'] = G3SkyMapWeights()
weight_map = output['W']
pol_type = getattr(MapPolType, hdr['WTYPE'], None)
fm = FlatSkyMap(data, pol_type=pol_type, **map_opts)
fm.overflow = overflow
setattr(weight_map, hdr['WTYPE'], fm)
del data
elif map_type == 'flat' and not hdr.get('ISWEIGHT', None):
# flat map data
ptype = hdr.get('POLTYPE', 'T')
pol_type = getattr(MapPolType, ptype, None)
fm = FlatSkyMap(data, pol_type=pol_type, **map_opts)
fm.overflow = overflow
output[ptype] = fm
del data
elif map_type == 'healpix':
# healpix map data
col_dict = {
'TEMPERATURE': 'T',
'Q_POLARISATION': 'Q',
'U_POLARISATION': 'U',
'Q-POLARISATION': 'Q',
'U-POLARISATION': 'U',
'I_STOKES': 'T',
'Q_STOKES': 'Q',
'U_STOKES': 'U',
'I': 'T',
'II': 'TT',
'IQ': 'TQ',
'IU': 'TU',
'II_COV': 'TT',
'IQ_COV': 'TQ',
'IU_COV': 'TU',
'QQ_COV': 'QQ',
'QU_COV': 'QU',
'UU_COV': 'UU',
'I_MEAN': 'T',
'Q_MEAN': 'Q',
'U_MEAN': 'U',
}
pix = None
partial = hdr.get('INDXSCHM') == 'EXPLICIT' or hdr.get('OBJECT') == 'PARTIAL'
for cidx, hcol in enumerate(H.data.names):
col = col_dict.get(hcol, hcol)
if col in 'TQU' and col not in keys:
continue
elif col in ['TT', 'TQ', 'TU', 'QQ', 'QU', 'UU'] and 'W' not in keys:
continue
data = np.array(H.data[hcol], dtype=float).ravel()
if col == 'PIXEL' or (partial and cidx == 0):
pix = np.array(data, dtype=int).ravel()
del data
continue
uconv = None
u = 'TUNIT{:d}'.format(cidx + 1)
if u in hdr:
u = hdr[u].lower()
if u in unit_dict:
units, uconv = unit_dict[u.lower()]
if uconv is not None:
data *= uconv
overflow = hdr.get('TOFLW{:d}'.format(cidx + 1), overflow)
weighted = hdr.get(
'TISWGT{:d}'.format(cidx + 1),
col in ['TT', 'TQ', 'TU', 'QQ', 'QU', 'UU'],
)
pol_type = getattr(MapPolType, col, None)
mdata = (pix, data, nside) if pix is not None else (data,)
if weighted:
assert('T' in output)
if 'W' not in output:
output['W'] = G3SkyMapWeights()
weight_map = output['W']
hm = HealpixSkyMap(*mdata, pol_type=pol_type, **map_opts)
hm.overflow = overflow
setattr(weight_map, col, hm)
else:
hm = HealpixSkyMap(*mdata, pol_type=pol_type, **map_opts)
output[col] = hm
del mdata, data
del pix
break
del H.data
for k, m in output.items():
m.pol_conv = map_opts['pol_conv']
if k == 'W':
continue
m.weighted = 'W' in output
return output
[docs]
@core.usefulfunc
def load_skymapmask_fits(filename, hdu=None, memmap=False):
"""
Load a fits file containing a binary sky map mask. Floating pointing or integer
data are converted to booleans. Assumes the fits file contains a single mask,
with flat sky data stored in a single ``ImageHDU``, and healpix masks stored in a
single ``BinTableHDU`` in full-sky format with implicit indexing over all pixels.
Arguments
---------
filename : str
Path to fits file
hdu : int, optional
If supplied, the data are extract from the given HDU index.
memmap : bool, optional
Argument passed to astropy.io.fits.open. If True, the map is not read into
memory, but only the required pixels are read when needed. Default: False.
Returns
-------
G3SkyMapMask :
The binary mask.
"""
import astropy.io.fits
map_type = None
coord_ref = "Equatorial"
map_opts = {}
with astropy.io.fits.open(filename, memmap=memmap) as hdulist:
for hidx, H in enumerate(hdulist):
hdr = H.header
mtype = hdr.get('MAPTYPE', None)
if mtype == 'FLAT' or 'PROJ' in hdr or 'WCSAXES' in hdr or 'CTYPE1' in hdr:
map_type = 'flat'
elif mtype == 'HEALPIX' or hdr.get('PIXTYPE', None) == 'HEALPIX' or 'NSIDE' in hdr:
map_type = 'healpix'
if 'COORDSYS' in hdr:
cdict = {'C': 'Equatorial', 'G': 'Galactic', 'L': 'Local'}
coord_ref = cdict.get(hdr['COORDSYS'], coord_ref)
else:
coord_ref = hdr.get('COORDREF', coord_ref)
map_opts.update(coord_ref=getattr(MapCoordReference, coord_ref))
if map_type == 'flat':
map_opts.update(parse_wcs_header(hdr))
elif map_type == 'healpix':
nside = hdr['NSIDE']
nested = hdr['ORDERING'].strip().lower() in ['nest', 'nested']
map_opts.update(nested=nested)
# primary HDU
if H.data is None:
continue
# extracting a particular HDU
if map_type is not None and hdu is not None and hidx != hdu:
continue
# map type must be known if the HDU contains map data
if map_type not in ['flat', 'healpix']:
raise ValueError("Unknown map type in HDU {}".format(hidx))
data = np.array(H.data, dtype=bool)
if map_type == 'flat':
# flat map data
if map_opts.pop('transpose', False):
data = np.array(data.T)
parent = FlatSkyMap(*data.shape, **map_opts)
elif map_type == 'healpix':
# healpix map data
parent = HealpixSkyMap(nside, **map_opts)
mask = G3SkyMapMask(parent, data.ravel())
del H.data, data
return mask
def load_proj_dict(inverse=False):
"""
Dictionary that associates projection names to their three-letter codes in WCS.
"""
projdict = {
'Proj8': '!!!',
'ProjSFL': 'SFL',
'ProjCAR': 'CAR',
'ProjSIN': 'SIN',
'ProjARC': 'ARC',
'ProjSTG': 'STG',
'ProjZEA': 'ZEA',
'ProjTAN': 'TAN',
'ProjCEA': 'CEA',
'ProjBICEP': 'CAR',
'ProjHealpix': 'HPX',
}
if inverse:
projdict_inverse = {}
for k, v in projdict.items():
projdict_inverse.setdefault(v, []).append(k)
return projdict_inverse
return projdict
def get_wcs(skymap, reset=False):
"""
Creates WCS (world coordinate system) information from information in the
input FlatSkyMap object.
"""
if hasattr(skymap, '_wcs') and not reset:
return skymap._wcs
projdict = load_proj_dict()
proj_abbr = projdict[str(skymap.proj)]
from astropy.wcs import WCS
w = WCS(naxis=2)
if skymap.coord_ref == MapCoordReference.Equatorial:
w.wcs.ctype = ['RA---{}'.format(proj_abbr), 'DEC--{}'.format(proj_abbr)]
w.wcs.radesys = 'FK5'
w.wcs.equinox = 2000
elif skymap.coord_ref == MapCoordReference.Galactic:
w.wcs.ctype = ['GLON-{}'.format(proj_abbr), 'GLAT-{}'.format(proj_abbr)]
elif skymap.coord_ref == MapCoordReference.Local:
w.wcs.ctype = ['RA---{}'.format(proj_abbr), 'DEC--{}'.format(proj_abbr)]
w.wcs.radesys = 'ALTAZ'
w.wcs.cdelt = [
-skymap.x_res / core.G3Units.deg,
skymap.res / core.G3Units.deg,
]
w.wcs.cunit = ['deg', 'deg']
w.wcs.crpix = [skymap.x_center + 1, skymap.y_center + 1]
w.wcs.crval = [
skymap.alpha_center / core.G3Units.deg,
skymap.delta_center / core.G3Units.deg,
]
if proj_abbr in ['SFL', 'CAR', 'CEA']:
w.wcs.crval[1] = 0.0
if proj_abbr == 'CEA':
v = np.sin(skymap.delta_center / core.G3Units.rad) / skymap.y_res
else:
v = skymap.delta_center / skymap.y_res
w.wcs.crpix[1] -= v
if str(skymap.proj) == 'ProjBICEP':
w.wcs.pc[0][0] = 1. / np.cos(skymap.delta_center / core.G3Units.rad)
skymap._wcs = w
return w
# set property
wcs_doc = "astropy.wcs.WCS instance containing projection information"
setattr(FlatSkyMap, 'wcs', property(get_wcs, doc=wcs_doc))
def create_wcs_header(skymap):
"""
Return a FITS header for the WCS information in the FlatSkyMap object.
"""
header = skymap.wcs.to_header()
header['PROJ'] = str(skymap.proj)
header['ALPHA0'] = skymap.alpha_center / core.G3Units.deg
header['DELTA0'] = skymap.delta_center / core.G3Units.deg
header['X0'] = skymap.x_center
header['Y0'] = skymap.y_center
return header
def parse_wcs_header(header):
"""
Extract flat sky map keywords from a WCS fits header. Raises an error if
the header is malformed.
"""
from astropy.wcs import WCS
w = WCS(header)
# parse projection
ctype = w.wcs.ctype
wcsproj = ctype[0][-3:]
proj = header.get('PROJ', None)
projdict = load_proj_dict(inverse=True)
if wcsproj not in projdict or wcsproj == '!!!':
raise ValueError('Unknown WCS projection {}'.format(wcsproj))
projopts = projdict[wcsproj]
if proj is None:
proj = projopts[0]
elif proj not in projopts:
raise ValueError('PROJ keyword inconsistent with WCS coordinate type')
# parse coordinate system
coord_ref = None
if ctype[0].startswith('RA-') or ctype[0].startswith('DEC-'):
if w.wcs.radesys == 'FK5':
coord_ref = MapCoordReference.Equatorial
elif w.wcs.radesys == 'ALTAZ':
coord_ref = MapCoordReference.Local
elif ctype[0].startswith('GLON-') or ctype[0].startswith('GLAT-'):
coord_ref = MapCoordReference.Galactic
# parse coordinate order
if ctype[0].startswith('RA-') or ctype[0].startswith('GLON-'):
order = [0, 1]
transpose = False
elif ctype[0].startswith('DEC-') or ctype[0].startswith('GLAT-'):
order = [1, 0]
transpose = True
else:
raise ValueError('Unable to determine WCS coordinate axes')
# parse resolution
cdelt = w.wcs.get_cdelt()
x_res = -cdelt[order[0]] * core.G3Units.deg
res = cdelt[order[1]] * core.G3Units.deg
# parse map center
crval = w.wcs.crval
alpha_center = header.get('ALPHA0', crval[order[0]]) * core.G3Units.deg
delta_center = header.get('DELTA0', crval[order[1]]) * core.G3Units.deg
crpix = w.wcs.crpix
x_center = header.get('X0', crpix[order[0]] - 1)
y_center = header.get('Y0', crpix[order[1]] - 1)
if proj == 'ProjBICEP':
v = 1. / np.cos(delta_center / core.G3Units.rad)
if not np.isclose(v, w.wcs.pc[order[0]][order[0]]):
raise ValueError("Inconsistent BICEP projection resolution")
if wcsproj == 'CEA':
for i, m, v in w.wcs.get_pv():
if i == order[1] + 1 and m == 1:
if not np.isclose(v, 1):
raise ValueError(
"CEA projection with non-conformal scaling parameter "
"PV%d_1 not supported" % i
)
# construct arguments
map_opts = dict(
proj=getattr(MapProjection, proj),
res=res,
x_res=x_res,
alpha_center=alpha_center,
delta_center=delta_center,
x_center=x_center,
y_center=y_center,
transpose=transpose,
)
if coord_ref is not None:
map_opts.update(coord_ref=coord_ref)
return map_opts
[docs]
@core.usefulfunc
def save_skymap_fits(filename, T, Q=None, U=None, W=None, overwrite=False,
compress=False, quantize_level=16.0, hdr=None):
"""
Save G3 map objects to a fits file.
`FlatSkyMap` objects are stored in a series of (optionally compressed)
`ImageHDU` entries, in which each HDU contains the projection information in
its header in standard WCS format, along with the image data for a single
map (one of the Stokes maps or a weight map component).
`HealpixSkyMap` objects are stored in a `BinTableHDU` extension, which
contains the necessary header information for compatiblity with healpix map
readers (e.g. `healpix.read_map`), and a single table with one column per
Stokes map or weight map component. Sparse maps are stored as cut-sky
pixel-indexed tables, while dense maps are stored with implicit indexing
over all pixels. The former produces output that is equivalent to using
`healpy.write_map` with the `partial=True` option.
Arguments
---------
filename : str
Path to output file. Must not exist, unless overwrite is True.
T, Q, U : FlatSkyMap or HealpixSkyMap
Maps to save
W : G3SkyMapWeights
Weights to save with the maps
overwrite : bool
If True, any existing file with the same name will be ovewritten.
compress : str or bool
If defined, and if input maps are FlatSkyMap objects, store these in a
series of compressed image HDUs, one per map. Otherwise, store input
maps in a series of standard ImageHDUs, which are readable with older
FITS readers (e.g. idlastro). If defined, the compression algorithm to
be used by the Astropy class astropy.io.fits.CompImageHDU.
Can be: 'RICE_1', 'RICE_ONE', 'PLIO_1', 'GZIP_1', 'GZIP_2' or
'HCOMPRESS_1'. Only GZIP_1 and GZIP_2 are lossless, although only
for integer data.
quantize_level : float
Floating point quantization level for compression. Higher values result
in more accurate floating point representation, but worse compression
ratio. See the astropy FITS image documention for details:
https://docs.astropy.org/en/stable/io/fits/api/images.html
hdr : dict
If defined, extra keywords to be appened to the FITS header. The dict
can contain entries such as ``hdr['NEWKEY'] = 'New value'`` or
``hdr['NEWKEY'] = ('New value', "Comment for New value")``.
"""
import astropy.io.fits
if isinstance(T, (FlatSkyMap, HealpixSkyMap)):
flat = isinstance(T, FlatSkyMap)
else:
raise TypeError("Input map must be a FlatSkyMap or HealpixSkyMap instance")
ctype = None
if compress == True:
ctype = 'GZIP_2'
elif isinstance(compress, str):
ctype = compress
if Q is not None:
assert(U is not None)
assert(U.pol_conv == MapPolConv.IAU or U.pol_conv == MapPolConv.COSMO)
assert(T.compatible(Q))
assert(T.compatible(U))
pol = True
maps = [T, Q, U]
if flat:
names = ['T', 'Q', 'U']
else:
names = ['TEMPERATURE', 'Q_POLARISATION', 'U_POLARISATION']
else:
assert(U is None)
pol = False
maps = [T]
names = ['T' if flat else 'TEMPERATURE']
if flat:
header = create_wcs_header(T)
bitpix = {
np.dtype(np.int16): 16,
np.dtype(np.int32): 32,
np.dtype(np.float32): -32,
np.dtype(np.float64): -64,
}
header['BITPIX'] = bitpix[np.asarray(T).dtype]
header['NAXIS'] = 2
else:
header = astropy.io.fits.Header()
header['PIXTYPE'] = 'HEALPIX'
header['ORDERING'] = 'NEST' if T.nested else 'RING'
header['NSIDE'] = T.nside
if T.dense:
header['INDXSCHM'] = 'IMPLICIT'
header['OBJECT'] = 'FULLSKY'
else:
header['INDXSCHM'] = 'EXPLICIT'
header['OBJECT'] = 'PARTIAL'
cdict = {'Equatorial': 'C', 'Galactic': 'G', 'Local': 'L'}
header['COORDSYS'] = cdict[str(T.coord_ref)]
if header['COORDSYS'] == 'C':
header['RADESYS'] = 'FK5'
header['EQUINOX'] = 2000.0
conv = {
np.dtype(np.int16): 'I',
np.dtype(np.int32): 'J',
np.dtype(np.int64): 'K',
np.dtype(np.float32): 'E',
np.dtype(np.float64): 'D',
}
pix = None
header['MAPTYPE'] = 'FLAT' if flat else 'HEALPIX'
header['COORDREF'] = str(T.coord_ref)
if pol:
header['POLCCONV'] = str(U.pol_conv)
header['POLAR'] = pol
header['UNITS'] = str(T.units)
header['WEIGHTED'] = T.weighted
# Append extra metadata in hdr if defined
if hdr:
for keyword in hdr.keys():
header[keyword] = hdr[keyword]
hdulist = astropy.io.fits.HDUList()
cols = astropy.io.fits.ColDefs([])
try:
for m, name in zip(maps, names):
if flat:
if compress:
hdu = astropy.io.fits.CompImageHDU(
np.asarray(m),
header=header,
compression_type=ctype,
quantize_level=quantize_level,
)
else:
hdu = astropy.io.fits.ImageHDU(np.asarray(m), header=header)
hdu.header['ISWEIGHT'] = False
hdu.header['POLTYPE'] = name
hdu.header['OVERFLOW'] = m.overflow
hdu.header['FLATPOL'] = m.flat_pol
hdulist.append(hdu)
del hdu
else:
if not T.dense:
pix1, data = m.nonzero_pixels()
pix1 = np.asarray(pix1)
data = np.asarray(data)
data = data[np.argsort(pix1)]
pix1.sort()
if pix is None:
pix = pix1
fmt = conv.get(np.min_scalar_type(-pix.max()), 'K')
col = astropy.io.fits.Column(
name='PIXEL', format=fmt, array=pix, unit=None
)
cols.add_col(col)
del col
else:
# XXX different sparse maps can have different nonzero pixels
assert(len(pix) == len(pix1) and (pix == pix1).all())
del pix1
else:
data = np.asarray(m)
fmt = conv.get(data.dtype, 'D')
col = astropy.io.fits.Column(
name=name, format=fmt, array=data, unit=str(m.units)
)
cols.add_col(col)
del col
del data
idx = len(cols)
header['TISWGT{:d}'.format(idx)] = False
header['TOFLW{:d}'.format(idx)] = m.overflow
if W is not None:
if pol:
assert(W.polarized)
wnames = ['TT', 'TQ', 'TU', 'QQ', 'QU', 'UU']
else:
assert(not W.polarized)
wnames = ['TT']
for wt in wnames:
m = getattr(W, wt)
assert T.compatible(m), 'Weight {} is not compatible with T'.format(wt)
if flat:
if compress:
hdu = astropy.io.fits.CompImageHDU(
np.asarray(m),
header=header,
compression_type=ctype,
quantize_level=quantize_level,
)
else:
hdu = astropy.io.fits.ImageHDU(np.asarray(m), header=header)
hdu.header['ISWEIGHT'] = True
hdu.header['WTYPE'] = wt
hdu.header['OVERFLOW'] = m.overflow
hdulist.append(hdu)
del hdu
else:
if not T.dense:
pix1, data = m.nonzero_pixels()
pix1 = np.asarray(pix1)
data = np.asarray(data)
data = data[np.argsort(pix1)]
pix1.sort()
assert(len(pix) == len(pix1) and (pix == pix1).all())
del pix1
else:
data = np.asarray(m)
fmt = conv.get(data.dtype, 'D')
col = astropy.io.fits.Column(
name=wt, format=fmt, array=data, unit=str(m.units)
)
cols.add_col(col)
del col
del data
idx = len(cols)
header['TISWGT{:d}'.format(idx)] = False
header['TOFLW{:d}'.format(idx)] = m.overflow
if not flat:
del pix
hdu = astropy.io.fits.BinTableHDU.from_columns(cols, header=header)
hdulist.append(hdu)
del hdu
if overwrite:
if os.path.exists(filename):
os.remove(filename)
else:
assert(not os.path.exists(filename))
hdulist.writeto(filename)
finally:
for col in cols.names:
del cols[col].array
cols.del_col(col)
del cols
for hdu in hdulist:
del hdu.data
del hdulist
[docs]
@core.usefulfunc
def save_skymapmask_fits(filename, mask, overwrite=False, compress=False, hdr=None):
"""
Save G3 mask objects to a fits file.
Masks with ``FlatSkyMap`` parents are stored in a (optionally compressed)
``ImageHDU`` entry, which contains the projection information in its header in
standard WCS format, along with the image data for a single mask.
Masks with ``HealpixSkyMap`` parents are stored in a ``BinTableHDU`` extension,
which contains the necessary header information for compatiblity with healpix map
readers (e.g. `healpix.read_map`), and a single table with one column for the
mask. Masks are stored densely with implicit indexing over all pixels.
Arguments
---------
filename : str
Path to output file. Must not exist, unless overwrite is True.
mask : G3SkyMapMask
Mask to save
overwrite : bool
If True, any existing file with the same name will be ovewritten.
compress : str or bool
If defined, and if input mask has a ``FlatSkyMap`` parent, store this in a
compressed image HDU. Otherwise, store input mask in a standard ImageHDU,
which is readable with older FITS readers (e.g. idlastro). If defined, the
compression algorithm to be used by the Astropy class
astropy.io.fits.CompImageHDU. Can be: 'RICE_1', 'RICE_ONE', 'PLIO_1',
'GZIP_1', 'GZIP_2' or 'HCOMPRESS_1'. Only GZIP_1 and GZIP_2 are lossless,
although only for integer data.
hdr : dict
If defined, extra keywords to be appened to the FITS header. The dict
can contain entries such as ``hdr['NEWKEY'] = 'New value'`` or
``hdr['NEWKEY'] = ('New value', "Comment for New value")``.
"""
import astropy.io.fits
flat = isinstance(mask.parent, FlatSkyMap)
if flat:
header = create_wcs_header(mask.parent)
header["BITPIX"] = 8
header['NAXIS'] = 2
else:
header = astropy.io.fits.Header()
header['PIXTYPE'] = 'HEALPIX'
header['ORDERING'] = 'NEST' if mask.parent.nested else 'RING'
header['NSIDE'] = mask.parent.nside
header['INDXSCHM'] = 'IMPLICIT'
header['OBJECT'] = 'FULLSKY'
cdict = {'Equatorial': 'C', 'Galactic': 'G', 'Local': 'L'}
header['COORDSYS'] = cdict[str(mask.parent.coord_ref)]
if header['COORDSYS'] == 'C':
header['RADESYS'] = 'FK5'
header['EQUINOX'] = 2000.0
header['MAPTYPE'] = 'FLAT' if flat else 'HEALPIX'
header['COORDREF'] = str(mask.parent.coord_ref)
# Append extra metadata in hdr if defined
if hdr:
for keyword in hdr.keys():
header[keyword] = hdr[keyword]
hdulist = astropy.io.fits.HDUList()
data = np.asarray(mask)
if flat:
if compress:
ctype = None
if compress is True:
ctype = 'GZIP_2'
elif isinstance(compress, str):
ctype = compress
hdu = astropy.io.fits.CompImageHDU(
data, header=header, compression_type=ctype,
)
else:
hdu = astropy.io.fits.ImageHDU(data, header=header)
else:
cols = astropy.io.fits.ColDefs([])
col = astropy.io.fits.Column(name="MASK", format="B", array=data)
cols.add_col(col)
hdu = astropy.io.fits.BinTableHDU.from_columns(cols, header=header)
hdulist.append(hdu)
hdulist.writeto(filename, overwrite=overwrite)
[docs]
@core.indexmod
def SaveMapFrame(
frame,
output_file=None,
hdr=None,
compress=False,
quantize_level=16.0,
overwrite=False,
):
"""
Save a map frame to a FITS file. See ``save_skymap_fits`` for details. The
map frame should contain T maps and (optionally) unpolarized weights, or T/Q/U
maps and (optionally) polarized weights to store in the output file.
Arguments
---------
output_file : str or callable
Fits filename to which the map will be written. Maybe a callable
function that takes a frame object as its sole input argument and
returns a string filename.
hdr : dict
If defined, extra keywords to be appened to the FITS header. The dict
can contain entries such as ``hdr['NEWKEY'] = 'New value'`` or
``hdr['NEWKEY'] = ('New value', "Comment for New value")``.
compress : str or bool
If defined, and if input maps are FlatSkyMap objects, store these in a
series of compressed image HDUs, one per map. Otherwise, store input
maps in a series of standard ImageHDUs, which are readable with older
FITS readers (e.g. idlastro). If defined, the compression algorithm to
be used by the Astropy class astropy.io.fits.CompImageHDU.
Can be: 'RICE_1', 'RICE_ONE', 'PLIO_1', 'GZIP_1', 'GZIP_2' or
'HCOMPRESS_1'. Only GZIP_1 and GZIP_2 are lossless, although only
for integer data.
quantize_level : float
Floating point quantization level for compression. Higher values result
in more accurate floating point representation, but worse compression
ratio. See the astropy FITS image documention for details:
https://docs.astropy.org/en/stable/io/fits/api/images.html
overwrite : bool
If True, any existing file with the same name will be ovewritten.
"""
if frame.type != core.G3FrameType.Map:
return
T = frame['T']
Q = frame.get('Q', None)
U = frame.get('U', None)
W = frame.get('Wpol', frame.get('Wunpol', None))
if callable(output_file):
output_file = output_file(frame)
save_skymap_fits(
output_file,
T=T,
Q=Q,
U=U,
W=W,
overwrite=overwrite,
compress=compress,
quantize_level=quantize_level,
hdr=hdr,
)