Source code for pysm3.models.cmb

import numpy as np
import healpy as hp
from scipy.special import factorial, comb

from .. import utils
from .. import units as u
from .template import Model


[docs]class CMBMap(Model): def __init__( self, nside, map_IQU=None, map_I=None, map_Q=None, map_U=None, map_dist=None ): super().__init__(nside=nside, map_dist=map_dist) if map_IQU is not None: self.map = self.read_map(map_IQU, unit=u.uK_CMB, field=(0, 1, 2)) elif map_I is not None: self.map = self.read_map(map_I, unit=u.uK_CMB, field=0) if map_Q is not None: self.map = [self.map] for m in [map_Q, map_U]: self.map.append(self.read_map(m, unit=u.uK_CMB)) self.map = u.Quantity(self.map, unit=u.uK_CMB) else: raise (ValueError("No input map provided"))
[docs] @u.quantity_input def get_emission(self, freqs: u.GHz, weights=None) -> u.uK_RJ: freqs = utils.check_freq_input(freqs) # do not use normalize weights because that includes a transformation # to spectral radiance and then back to RJ if weights is None: weights = np.ones(len(freqs), dtype=np.float) scaling_factor = utils.bandpass_unit_conversion( freqs * u.GHz, weights, output_unit=u.uK_RJ, input_unit=u.uK_CMB ) return u.Quantity(self.map * scaling_factor, unit=u.uK_RJ, copy=False)
"""The following code is edited from the taylens code: Naess, S. K. and Louis, T. 2013 'Lensing simulations by Taylor expansion - not so inefficient after all' Journal of Cosmology and Astroparticle Physics September 2013. Available at: https://github.com/amaurea/taylens """ def simulate_tebp_correlated(cl_tebp_arr, nside, lmax, seed): """This generates correlated T,E,B and Phi maps """ np.random.seed(seed) alms = hp.synalm(cl_tebp_arr, lmax=lmax, new=True) aphi = alms[-1] acmb = alms[0:-1] # Set to zero above map resolution to avoid aliasing beam_cut = np.ones(3 * nside) for ac in acmb: hp.almxfl(ac, beam_cut, inplace=True) cmb = np.array(hp.alm2map(acmb, nside, pol=True, verbose=False)) return cmb, aphi def taylor_interpol_iter(m, pos, order=3, verbose=False, lmax=None): """Given a healpix map m[npix], and a set of positions pos[{theta,phi},...], evaluate the values at those positions using harmonic Taylor interpolation to the given order (3 by default). Successively yields values for each cumulative order up to the specified one. If verbose is specified, it will print progress information to stderr. """ nside = hp.npix2nside(m.size) if lmax is None: lmax = 3 * nside # Find the healpix pixel centers closest to pos, # and our deviation from these pixel centers. ipos = hp.ang2pix(nside, pos[0], pos[1]) pos0 = np.array(hp.pix2ang(nside, ipos)) dpos = pos[:2] - pos0 # Take wrapping into account bad = dpos[1] > np.pi dpos[1, bad] = dpos[1, bad] - 2 * np.pi bad = dpos[1] < -np.pi dpos[1, bad] = dpos[1, bad] + 2 * np.pi # Since healpix' dphi actually returns dphi/sintheta, we choose # to expand in terms of dphi*sintheta instead. dpos[1] *= np.sin(pos0[0]) del pos0 # We will now Taylor expand our healpix field to # get approximations for the values at our chosen # locations. The structure of this section is # somewhat complicated by the fact that alm2map_der1 returns # two different derivatives at the same time. derivs = [[m]] res = m[ipos] yield res for o in range(1, order + 1): # Compute our derivatives derivs2 = [None for i in range(o + 1)] used = [False for i in range(o + 1)] # Loop through previous level in steps of two (except last) if verbose: print("order %d" % o) for i in range(o): # Each alm2map_der1 provides two derivatives, so avoid # doing double work. if i < o - 1 and i % 2 == 1: continue a = hp.map2alm(derivs[i], use_weights=True, lmax=lmax, iter=0) derivs[i] = None dtheta, dphi = hp.alm2map_der1(a, nside, lmax=lmax)[-2:] derivs2[i : i + 2] = [dtheta, dphi] del a, dtheta, dphi # Use these to compute the next level for j in range(i, min(i + 2, o + 1)): if used[j]: continue N = comb(o, j) / factorial(o) res += N * derivs2[j][ipos] * dpos[0] ** (o - j) * dpos[1] ** j used[j] = True # If we are at the last order, we don't need to waste memory # storing the derivatives any more if o == order: derivs2[j] = None derivs = derivs2 yield res def offset_pos(ipos, dtheta, dphi, pol=False, geodesic=False): """Offsets positions ipos on the sphere by a unit length step along the gradient dtheta, dphi/sintheta, taking the curvature of the sphere into account. If pol is passed, also computes the cos and sin of the angle by which (Q,U) must be rotated to take into account the change in local coordinate system. If geodesic is passed, a quick and dirty, but quite accurate, approximation is used. Uses the memory of 2 maps (4 if pol) (plus that of the input maps). """ opos = np.zeros(ipos.shape) if pol and not geodesic: orot = np.zeros(ipos.shape) else: orot = None if not geodesic: # Loop over chunks in order to conserve memory step = 0x10000 for i in range(0, ipos.shape[1], step): small_opos, small_orot = offset_pos_helper( ipos[:, i : i + step], dtheta[i : i + step], dphi[i : i + step], pol ) opos[:, i : i + step] = small_opos if pol: orot[:, i : i + step] = small_orot else: opos[0] = ipos[0] + dtheta opos[1] = ipos[1] + dphi / np.sin(ipos[0]) opos = fixang(opos) return opos, orot def offset_pos_helper(ipos, dtheta, dphi, pol): grad = np.array((dtheta, dphi)) dtheta, dphi = None, None d = np.sum(grad ** 2, 0) ** 0.5 grad /= d cosd, sind = np.cos(d), np.sin(d) cost, sint = np.cos(ipos[0]), np.sin(ipos[0]) ocost = cosd * cost - sind * sint * grad[0] osint = (1 - ocost ** 2) ** 0.5 ophi = ipos[1] + np.arcsin(sind * grad[1] / osint) if not pol: return np.array([np.arccos(ocost), ophi]), None A = grad[1] / (sind * cost / sint + grad[0] * cosd) nom1 = grad[0] + grad[1] * A denom = 1 + A ** 2 cosgam = 2 * nom1 ** 2 / denom - 1 singam = 2 * nom1 * (grad[1] - grad[0] * A) / denom return np.array([np.arccos(ocost), ophi]), np.array([cosgam, singam]) def fixang(pos): """Handle pole wraparound.""" a = np.array(pos) bad = np.where(a[0] < 0) a[0, bad] = -a[0, bad] a[1, bad] = a[1, bad] + np.pi bad = np.where(a[0] > np.pi) a[0, bad] = 2 * np.pi - a[0, bad] a[1, bad] = a[1, bad] + np.pi return a def apply_rotation(m, rot): """Update Q,U components in polarized map by applying the rotation rot, represented as [cos2psi,sin2psi] per pixel. Rot is one of the outputs from offset_pos. """ if len(m) < 3: return m if rot is None: return m m = np.asarray(m) res = m.copy() res[1] = rot[0] * m[1] - rot[1] * m[2] res[2] = rot[1] * m[1] + rot[0] * m[2] return m
[docs]class CMBLensed(CMBMap): # intherit from CMBMap so we get the `get_emission` method def __init__( self, nside, cmb_spectra, cmb_seed=None, apply_delens=False, delensing_ells=None, map_dist=None, ): """Lensed CMB Takes an input unlensed CMB and lensing spectrum from CAMB and uses Taylens to apply lensing, it optionally simulates delensing by suppressing the lensing power at specific scales with the user provided `delensing_ells`. Parameters ---------- cmb_spectra : path Input text file from CAMB, spectra unlensed cmb_seed : int Numpy random seed for synfast, set to None for a random seed apply_delens : bool If true, simulate delensing with taylens delensing_ells : path Space delimited file with ells in the first columns and suppression factor (1 for no suppression) in the second column """ try: super().__init__(nside=nside, map_dist=map_dist) except ValueError: pass # suppress exception about not providing any input map self.cmb_spectra = self.read_txt(cmb_spectra, unpack=True) self.cmb_seed = cmb_seed self.apply_delens = apply_delens self.delensing_ells = ( None if delensing_ells is None else self.read_txt(delensing_ells) ) self.map = u.Quantity(self.run_taylens(), unit=u.uK_CMB, copy=False)
[docs] def run_taylens(self): """Returns CMB (T, Q, U) maps as a function of observing frequency, nu. This code is extracted from the taylens code (reference). :return: function -- CMB maps. """ synlmax = 8 * self.nside # this used to be user-defined. data = self.cmb_spectra lmax_cl = len(data[0]) + 1 ell = np.arange(int(lmax_cl + 1)) synlmax = min(synlmax, ell[-1]) # Reading input spectra in CAMB format. CAMB outputs l(l+1)/2pi hence the corrections. cl_tebp_arr = np.zeros([10, lmax_cl + 1]) cl_tebp_arr[0, 2:] = 2 * np.pi * data[1] / (ell[2:] * (ell[2:] + 1)) # TT cl_tebp_arr[1, 2:] = 2 * np.pi * data[2] / (ell[2:] * (ell[2:] + 1)) # EE cl_tebp_arr[2, 2:] = 2 * np.pi * data[3] / (ell[2:] * (ell[2:] + 1)) # BB cl_tebp_arr[4, 2:] = 2 * np.pi * data[4] / (ell[2:] * (ell[2:] + 1)) # TE cl_tebp_arr[5, :] = np.zeros(lmax_cl + 1) # EB cl_tebp_arr[7, :] = np.zeros(lmax_cl + 1) # TB if self.apply_delens: cl_tebp_arr[3, 2:] = ( 2 * np.pi * data[5] * self.delensing_ells[1] / (ell[2:] * (ell[2:] + 1)) ** 2 ) # PP cl_tebp_arr[6, :] = np.zeros(lmax_cl + 1) # BP cl_tebp_arr[8, 2:] = ( 2 * np.pi * data[7] * np.sqrt(self.delensing_ells[1]) / (ell[2:] * (ell[2:] + 1)) ** 1.5 ) # EP cl_tebp_arr[9, 2:] = ( 2 * np.pi * data[6] * np.sqrt(self.delensing_ells[1]) / (ell[2:] * (ell[2:] + 1)) ** 1.5 ) # TP else: cl_tebp_arr[3, 2:] = ( 2 * np.pi * data[5] / (ell[2:] * (ell[2:] + 1)) ** 2 ) # PP cl_tebp_arr[6, :] = np.zeros(lmax_cl + 1) # BP cl_tebp_arr[8, 2:] = ( 2 * np.pi * data[7] / (ell[2:] * (ell[2:] + 1)) ** 1.5 ) # EP cl_tebp_arr[9, 2:] = ( 2 * np.pi * data[6] / (ell[2:] * (ell[2:] + 1)) ** 1.5 ) # TP # Coordinates of healpix pixel centers ipos = np.array(hp.pix2ang(self.nside, np.arange(hp.nside2npix(self.nside)))) # Simulate a CMB and lensing field cmb, aphi = simulate_tebp_correlated( cl_tebp_arr, self.nside, synlmax, self.cmb_seed ) if cmb.ndim == 1: cmb = np.reshape(cmb, [1, cmb.size]) # Compute the offset positions phi, phi_dtheta, phi_dphi = hp.alm2map_der1(aphi, self.nside, lmax=synlmax) del aphi opos, rot = offset_pos( ipos, phi_dtheta, phi_dphi, pol=True, geodesic=False ) # geodesic used to be used defined. del phi, phi_dtheta, phi_dphi # Interpolate maps one at a time maps = [] for comp in cmb: for m in taylor_interpol_iter( comp, opos, 3, verbose=False, lmax=None ): # lmax here needs to be fixed. order of taylor expansion is fixed to 3. pass maps.append(m) del opos, cmb return np.array(apply_rotation(maps, rot))