Source code for spt3g.maps.azel

import numpy as np
import os

from .. import core

__all__ = [
    "convert_azel_to_radec",
    "convert_radec_to_azel",
    "convert_radec_to_gal",
    "convert_gal_to_radec",
    "LocalToAstronomicalPointing",
    "EquatorialToGalacticPointing",
]


def get_location(location="spt"):
    """
    Return the astropy EarthLocation for the given location name.

    Arguments
    ---------
    location : str or EarthLocation instance
        If a string, must be a recognized location name.  Currently
        only "spt" is supported.
    """
    from astropy.coordinates import EarthLocation
    import astropy.units

    if isinstance(location, EarthLocation):
        return location

    location = str(location).lower()
    if location == "spt":
        return EarthLocation(
            lat=-89.991066 * astropy.units.deg,
            lon=-44.65 * astropy.units.deg,
            height=2835.0 * astropy.units.meter,
        )

    raise NotImplementedError


iers_checked = False


def check_iers(mjd):
    """
    Check whether IERS calculations will work, and load an IERS database file
    from backup if all else fails.

    Arguments
    ---------
    mjd : Array-like
        MJD timestamps for which an IERS calculation must be computed.

    Returns
    -------
    t : astropy.time.Time instance
        Time instance for the input MJD(s).
    """
    from astropy.utils import iers
    import astropy.time

    global iers_checked
    if iers_checked:
        return astropy.time.Time(mjd, format="mjd")

    try:
        if os.getenv("SPT3G_IERS_AUTO_URL"):
            iers.conf.iers_auto_url = os.getenv("SPT3G_IERS_AUTO_URL")
        if os.getenv("SPT3G_IERS_REMOTE_TIMEOUT"):
            iers.conf.remote_timeout = float(os.getenv("SPT3G_IERS_REMOTE_TIMEOUT"))
    except:
        pass

    mjd1 = np.atleast_1d(mjd)[-1]
    t1 = astropy.time.Time(mjd1, format="mjd")

    # check if accessing the IERS table outright works.
    try:
        t1.ut1
        iers_checked = True
        return astropy.time.Time(mjd, format="mjd")
    except:
        pass

    # if that fails, allow extrapolation
    iers.conf.auto_max_age = None
    t1 = astropy.time.Time(mjd1, format="mjd")
    try:
        t1.ut1
        core.log_warn("IERS auto update failed, allowing extrapolation", unit="IERS")
        iers_checked = True
        return astropy.time.Time(mjd, format="mjd")
    except:
        pass

    # and if that fails, use a locally cached file that is hopefully setup correctly.
    fname = os.path.join(os.path.dirname(os.path.abspath(__file__)), "finals2000A.all")
    iers.conf.auto_download = False
    iers.IERS.iers_table = iers.IERS_A.open(fname)
    t1 = astropy.time.Time(mjd1, format="mjd")
    t1.ut1
    core.log_warn("Using IERS table from local cache {}".format(fname), unit="IERS")
    iers_checked = True
    return astropy.time.Time(mjd, format="mjd")


def convert_deg(d, system="g3"):
    """
    Convert the input array to the appropriate system of units.

    Arguments
    ---------
    d : array_like
        Data vector
    system : str
        System to convert to, either "g3" or "astropy".  Assumes conversion from
        the other system.
    """
    import astropy.units

    system = str(system).lower()
    if system == "astropy":
        return np.asarray(d) / core.G3Units.deg * astropy.units.deg
    elif system == "g3":
        return np.asarray(d / astropy.units.deg) * core.G3Units.deg
    raise NotImplementedError


[docs] @core.usefulfunc def convert_azel_to_radec(az, el, location="spt", mjd=None): """ Convert timestreams of local azimuth and elevation to right ascension and declination. Arguments --------- az, el : np.ndarray or G3Timestream Array of local coordinates. If inputs are G3Timestream objects, G3Timestreams are also returned. location : str or astropy.coordinates.EarthLocation instance The telescope location on Earth. mjd : np.ndarray An array of times for each az/el sample. If input az and el are not G3Timestreams, this argument is required. Returns ------- ra, dec : np.ndarray or G3Timestream """ import astropy.coordinates singleton = False if isinstance(az, core.G3Timestream): assert az.start == el.start assert az.stop == el.stop assert az.n_samples == el.n_samples mjd = np.asarray([i.mjd for i in az.times]) else: try: len(az) except TypeError: singleton = True if singleton: az = np.atleast_1d(az) el = np.atleast_1d(el) mjd = np.atleast_1d(mjd) assert len(az) == len(el) t = check_iers(mjd) # record locations of bad elevation values to mark them later badel_inds = np.where( (el < -90.0 * core.G3Units.deg) | (el > 90.0 * core.G3Units.deg) ) el[badel_inds] = 0.0 * core.G3Units.deg k = astropy.coordinates.AltAz( az=convert_deg(az, "astropy"), alt=convert_deg(el, "astropy"), obstime=t, location=get_location(location), pressure=0, ) kt = k.transform_to(astropy.coordinates.FK5()) ra = convert_deg(kt.ra, "g3") dec = convert_deg(kt.dec, "g3") dec[badel_inds] = np.nan if isinstance(az, core.G3Timestream): ra = core.G3Timestream(ra) dec = core.G3Timestream(dec) ra.start = dec.start = az.start ra.stop = dec.stop = az.stop elif singleton: ra = ra[0] dec = dec[0] return (ra, dec)
[docs] @core.usefulfunc def convert_radec_to_azel(ra, dec, location="spt", mjd=None): """ Convert timestreams of right ascension and declination to local azimuth and elevation. Arguments --------- ra, dec : np.ndarray or G3Timestream Array of Equatorial sky coordinates. If inputs are G3Timestream objects, G3Timestreams are also returned. location : str or astropy.coordinates.EarthLocation instance The telescope location on Earth. mjd : np.ndarray An array of times for each ra/dec sample. If input ra and dec are not G3Timestreams, this argument is required. Returns ------- az, el : np.ndarray or G3Timestream """ import astropy.coordinates singleton = False if isinstance(ra, core.G3Timestream): assert ra.start == dec.start assert ra.stop == dec.stop assert ra.n_samples == dec.n_samples mjd = np.asarray([i.mjd for i in ra.times]) else: try: len(ra) except TypeError: singleton = True if singleton: ra = np.atleast_1d(ra) dec = np.atleast_1d(dec) mjd = np.atleast_1d(mjd) assert len(ra) == len(dec) t = check_iers(mjd) k = astropy.coordinates.FK5( ra=convert_deg(ra, "astropy"), dec=convert_deg(dec, "astropy"), ) kt = k.transform_to( astropy.coordinates.AltAz( obstime=t, location=get_location(location), pressure=0, ) ) az = convert_deg(kt.az, "g3") el = convert_deg(kt.alt, "g3") if isinstance(ra, core.G3Timestream): az = core.G3Timestream(az) el = core.G3Timestream(el) az.start = el.start = ra.start az.stop = el.stop = ra.stop elif singleton: az = az[0] el = el[0] return (az, el)
[docs] @core.usefulfunc def convert_radec_to_gal(ra, dec): """ Convert timestreams of right ascension and declination to Galactic longitude and latitude. Arguments --------- ra, dec : np.ndarray or G3Timestream Array of Equatorial sky coordinates. If inputs are G3Timestream objects, G3Timestreams are also returned. Returns ------- glon, glat : np.ndarray or G3Timestream """ import astropy.coordinates singleton = False if isinstance(ra, core.G3Timestream): assert ra.start == dec.start assert ra.stop == dec.stop assert ra.n_samples == dec.n_samples else: try: len(ra) except TypeError: singleton = True if singleton: ra = np.atleast_1d(ra) dec = np.atleast_1d(dec) assert len(ra) == len(dec) k = astropy.coordinates.FK5( ra=convert_deg(ra, "astropy"), dec=convert_deg(dec, "astropy"), ) kt = k.transform_to(astropy.coordinates.Galactic()) glon = convert_deg(kt.l, "g3") glat = convert_deg(kt.b, "g3") if isinstance(ra, core.G3Timestream): glon = core.G3Timestream(glon) glat = core.G3Timestream(glat) glon.start = glat.start = ra.start glon.stop = glat.stop = ra.stop elif singleton: glon = glon[0] glat = glat[0] return (glon, glat)
[docs] @core.usefulfunc def convert_gal_to_radec(glon, glat): """ Convert timestreams of Galactic longitude and latitude to right ascension and declination. Arguments --------- glon, glat : np.ndarray or G3Timestream Array of Galactic sky coordinates. If inputs are G3Timestream objects, G3Timestreams are also returned. Returns ------- ra, dec : np.ndarray or G3Timestream """ import astropy.coordinates singleton = False if isinstance(glon, core.G3Timestream): assert glon.start == glat.start assert glon.stop == glat.stop assert glon.n_samples == glat.n_samples else: try: len(glon) except TypeError: singleton = True if singleton: glon = np.atleast_1d(glon) glat = np.atleast_1d(glat) assert len(glon) == len(glat) k = astropy.coordinates.Galactic( l=convert_deg(glon, "astropy"), b=convert_deg(glat, "astropy"), ) kt = k.transform_to(astropy.coordinates.FK5()) ra = convert_deg(kt.ra, "g3") dec = convert_deg(kt.dec, "g3") if isinstance(ra, core.G3Timestream): ra = core.G3Timestream(ra) dec = core.G3Timestream(dec) ra.start = dec.start = glon.start ra.stop = dec.stop = glon.stop elif singleton: ra = ra[0] dec = dec[0] return (ra, dec)
[docs] @core.indexmod def LocalToAstronomicalPointing( frame, az_timestream="BoresightAz", el_timestream="BoresightEl", ra_timestream="BoresightRa", dec_timestream="BoresightDec", Telescope="spt", ): """ Converts a set of timestreams in Scan frames representing Az and El pointing of the telescope into RA and Declination timestreams, stored in the frame under their respective names. """ if frame.type != core.G3FrameType.Scan: return ra, dec = convert_azel_to_radec( frame[az_timestream], frame[el_timestream], location=Telescope ) frame[ra_timestream] = ra frame[dec_timestream] = dec
[docs] @core.indexmod def EquatorialToGalacticPointing( frame, ra_timestream="BoresightRa", dec_timestream="BoresightDec", glon_timestream="BoresightGalLon", glat_timestream="BoresightGalLat", ): """ Converts a set of timestreams in Scan frames representing RA and Declination pointing of the telescope into Galactic longitude and latitude timestreams, stored in the frame under their respective names. """ if frame.type != core.G3FrameType.Scan: return glon, glat = convert_radec_to_gal(frame[ra_timestream], frame[dec_timestream]) frame[glon_timestream] = glon frame[glat_timestream] = glat