Source code for pysm3.models.spdust

import numpy as np
from .. import units as u
from numba import njit
from .. import utils

from .template import Model


[docs]class SpDust(Model): """ Implementation of the SpDust2 code of (Ali-Haimoud et al 2012) evaluated for a Cold Neutral Medium. See: * https://arxiv.org/abs/1003.4732 * https://arxiv.org/abs/0812.2904 """ def __init__( self, map_I, freq_ref_I, emissivity, freq_peak, freq_ref_peak, nside, unit_I=None, map_dist=None, ): """ This function initializes the spinning dust model Parameters ---------- map_I : `pathlib.Path` object Paths to the map to be used as I templates. unit_I : string or Unit Unit string or Unit object for all input FITS maps, if None, the input file should have a unit defined in the FITS header. freq_ref_I : Quantity or string Reference frequencies at which the templates are defined. They should be a astropy Quantity object or a string (e.g. "1500 MHz") compatible with GHz. freq_peak : `pathlib.Path` object or string Path to the map to be used as frequency of the peak of the emission or its scalar value as a Quantity or a string convertible to a Quantity freq_ref_peak : Quantity or string Reference frequency for the peak frequency map They should be a astropy Quantity object or a string (e.g. "1500 MHz") compatible with GHz. nside: int Resolution parameter at which this model is to be calculated. """ super().__init__(nside=nside, map_dist=map_dist) # do model setup self.I_ref = self.read_map(map_I, unit=unit_I) # This does unit conversion in place so we do not copy the data # we do not keep the original unit because otherwise we would need # to make a copy of the array when we run the model self.I_ref <<= u.uK_RJ self.freq_ref_I = u.Quantity(freq_ref_I).to(u.GHz) try: # input is a number self.freq_peak = u.Quantity(freq_peak).to(u.GHz) except TypeError: # input is a path self.freq_peak = self.read_map(freq_peak, unit=u.GHz) freq_ref_peak = u.Quantity(freq_ref_peak).to(u.GHz) self.freq_peak /= freq_ref_peak self.emissivity = self.read_txt(emissivity, unpack=True)
[docs] @u.quantity_input def get_emission(self, freqs: u.GHz, weights=None): freqs = utils.check_freq_input(freqs) weights = utils.normalize_weights(freqs, weights) outputs = ( compute_spdust_emission_numba( freqs, weights, self.I_ref.value, self.freq_ref_I.value, self.freq_peak.value, self.emissivity, ) << u.uK_RJ ) return outputs
@njit def compute_spdust_scaling_numba(freq, freq_ref_I, freq_peak, emissivity): scaled_freq = freq / freq_peak scaled_ref_freq = freq_ref_I / freq_peak return ( (freq_ref_I / freq) ** 2 * np.interp(scaled_freq, emissivity[0], emissivity[1]) / np.interp(scaled_ref_freq, emissivity[0], emissivity[1]) ) @njit(parallel=True) def compute_spdust_emission_numba( freqs, weights, I_ref, freq_ref_I, freq_peak, emissivity ): output = np.zeros((3, len(I_ref)), dtype=I_ref.dtype) for i, (freq, weight) in enumerate(zip(freqs, weights)): scaling = compute_spdust_scaling_numba(freq, freq_ref_I, freq_peak, emissivity) utils.trapz_step_inplace(freqs, weights, i, scaling * I_ref, output[0]) return output
[docs]class SpDustPol(SpDust): """SpDust2 model with Polarized emission""" def __init__( self, map_I, freq_ref_I, emissivity, freq_peak, freq_ref_peak, pol_frac, angle_Q, angle_U, nside, unit_I=None, map_dist=None, ): super().__init__( map_I=map_I, freq_ref_I=freq_ref_I, emissivity=emissivity, freq_peak=freq_peak, freq_ref_peak=freq_ref_peak, nside=nside, unit_I=unit_I, map_dist=map_dist, ) self.pol_angle = np.arctan2(self.read_map(angle_U), self.read_map(angle_Q)) self.pol_frac = pol_frac
[docs] @u.quantity_input def get_emission(self, freqs: u.GHz, weights=None): freqs = utils.check_freq_input(freqs) weights = utils.normalize_weights(freqs, weights) outputs = ( compute_spdust_emission_pol_numba( freqs, weights, self.I_ref.value, self.freq_ref_I.value, self.freq_peak.value, self.emissivity, self.pol_angle, self.pol_frac, ) << u.uK_RJ ) return outputs
@njit(parallel=True) def compute_spdust_emission_pol_numba( freqs, weights, I_ref, freq_ref_I, freq_peak, emissivity, pol_angle, pol_frac ): output = np.zeros((3, len(I_ref)), dtype=I_ref.dtype) I, Q, U = 0, 1, 2 for i, (freq, weight) in enumerate(zip(freqs, weights)): scaling = compute_spdust_scaling_numba(freq, freq_ref_I, freq_peak, emissivity) utils.trapz_step_inplace(freqs, weights, i, scaling * I_ref, output[I]) utils.trapz_step_inplace( freqs, weights, i, scaling * I_ref * pol_frac * np.cos(pol_angle), output[Q] ) utils.trapz_step_inplace( freqs, weights, i, scaling * I_ref * pol_frac * np.sin(pol_angle), output[U] ) return output