"""
Core z-DM grid class for FRB population modeling.
This module provides the Grid class, which computes 2D probability distributions
of FRB detection rates as a function of redshift (z) and dispersion measure (DM).
The Grid combines:
- Cosmological volume elements and source evolution
- p(DM|z) from the Macquart relation (cosmic + host)
- Telescope detection efficiency (fluence threshold, beam pattern)
- FRB luminosity/energy function
Key Features
------------
- Builds normalized 2D probability grids for expected FRB rates
- Handles beam response and detection efficiency
- Supports multiple luminosity functions (power-law, gamma)
- Efficient updating for MCMC parameter exploration
Example
-------
>>> from zdm import grid
>>> g = grid.Grid(survey, state, zDMgrid, zvals, dmvals, smear_mask)
>>> expected_rate = g.rates # Expected detection rate per (z, DM) bin
Author: C.W. James
"""
from IPython.terminal.embed import embed
import numpy as np
import datetime
from numpy import random
import scipy.ndimage as ndimage
from zdm import cosmology as cos
from zdm import misc_functions
from zdm import energetics
from zdm import pcosmic
from zdm import io
import time
import warnings
import importlib.resources as resources
[docs]
class Grid:
"""2D grid for computing FRB detection rates as a function of z and DM.
The Grid class is the core computational object in zdm. It builds a 2D
probability distribution representing expected FRB detection rates across
the redshift-DM plane for a given survey and parameter set.
Assumptions:
- Each z bin represents FRBs originating at that redshift (not integrated)
- Linear uniform spacing in both z and DM
- DM includes cosmic + host contributions convolved together
Attributes
----------
rates : ndarray
2D array of expected FRB rates per (z, DM) bin.
zvals : ndarray
Redshift bin centers.
dmvals : ndarray
DM bin centers in pc/cm^3.
state : parameters.State
Parameter state used for grid calculation.
survey : survey.Survey
Associated survey object.
"""
[docs]
def __init__(self, survey, state, zDMgrid, zvals, dmvals, smear_mask, wdist=None, prev_grid=None):
"""Initialize the Grid for a survey and parameter state.
Parameters
----------
survey : survey.Survey
Survey object with telescope properties and FRB data.
state : parameters.State
Parameter state defining the model. Note: grids share the same
State object, so modifications affect all grids.
zDMgrid : ndarray
2D array of p(DM|z) probabilities, shape (nz, ndm).
zvals : ndarray
Redshift bin centers. Bins span [z - dz/2, z + dz/2].
dmvals : ndarray
DM bin centers in pc/cm^3. Bins span [DM - dDM/2, DM + dDM/2].
smear_mask : ndarray
1D convolution kernel for host DM smearing.
wdist : bool, optional
If True, include width distribution effects.
prev_grid : Grid, optional
Another Grid with same z/DM values but different survey.
Allows reusing pre-computed cosmological quantities.
"""
self.grid = None
self.survey = survey
self.verbose = False
# Beam
self.beam_b = survey.beam_b
self.beam_o = survey.beam_o
self.b_fractions = None
self.w_fractions = None
# State
self.state = state
self.MCinit = False
self.source_function = cos.choose_source_evolution_function(
state.FRBdemo.source_evolution
)
# Energetics
if self.state.energy.luminosity_function in [3]:
self.use_log10 = True
else:
self.use_log10 = False
self.luminosity_function = self.state.energy.luminosity_function
self.init_luminosity_functions()
self.nuObs= survey.meta['FBAR']*1e6 #from MHz to Hz
# Init the grid
# THESE SHOULD BE THE SAME ORDER AS self.update()
self.parse_grid(zDMgrid.copy(), zvals.copy(), dmvals.copy())
if prev_grid is None:
self.calc_dV()
self.smear_dm(smear_mask.copy())
else:
self.dV = prev_grid.dV.copy()
self.smear = prev_grid.smear.copy()
self.smear_grid = prev_grid.smear_grid.copy()
if wdist is not None:
efficiencies = survey.efficiencies # two OR three dimensions
weights = survey.wplist
# Warning -- THRESH could be different for each FRB, but we don't treat it that way
self.calc_thresholds(survey.meta["THRESH"],
efficiencies,weights=weights)
else:
# this is called when the grid is not iterating over widths internally
efficiencies = survey.mean_efficiencies # one dimension
weights = None
self.calc_thresholds(survey.meta["THRESH"], efficiencies, weights=weights)
# Calculate
self.calc_pdv()
self.set_evolution() # sets star-formation rate scaling with z - here, no evoltion...
self.calc_rates() # includes sfr smearing factors and pdv mult
[docs]
def init_luminosity_functions(self):
""" Set the luminsoity function for FRB energetics """
if self.luminosity_function == 0: # Power-law
self.array_cum_lf = energetics.array_cum_power_law
self.vector_cum_lf = energetics.vector_cum_power_law
self.array_diff_lf = energetics.array_diff_power_law
self.vector_diff_lf = energetics.vector_diff_power_law
elif self.luminosity_function == 1: # Gamma function
embed(header="79 of grid -- BEST NOT TO USE THIS!!!!")
self.array_cum_lf = energetics.array_cum_gamma
self.vector_cum_lf = energetics.vector_cum_gamma
self.array_diff_lf = energetics.array_diff_gamma
self.vector_diff_lf = energetics.vector_diff_gamma
elif self.luminosity_function == 2: # Spline gamma function
self.array_cum_lf = energetics.array_cum_gamma_spline
self.vector_cum_lf = energetics.vector_cum_gamma_spline
self.array_diff_lf = energetics.array_diff_gamma
self.vector_diff_lf = energetics.vector_diff_gamma
elif self.luminosity_function == 3: # Linear + log10
self.array_cum_lf = energetics.array_cum_gamma_linear
self.vector_cum_lf = energetics.vector_cum_gamma_linear
self.array_diff_lf = energetics.array_diff_gamma
self.vector_diff_lf = energetics.vector_diff_gamma
else:
raise ValueError(
"Luminosity function must be 0, not ", self.luminosity_function
)
[docs]
def parse_grid(self, zDMgrid, zvals, dmvals):
self.grid = zDMgrid
self.zvals = zvals
self.dmvals = dmvals
#
self.check_grid()
# self.calc_dV()
# this contains all the values used to generate grids
# these parameters begin at None, and get filled when
# ever something is regenerated. They are semi-hierarchical
# in that if a low-level value is reset, high-level ones
# get put to None.
[docs]
def load_grid(self, gridfile, zfile, dmfile):
self.grid = io.load_data(gridfile)
self.zvals = io.load_data(zfile)
self.dmvals = io.load_data(dmfile)
self.check_grid()
self.volume_grid()
[docs]
def get_dm_coeffs(self, DMlist):
"""
Returns indices and coefficients for interpolating between DM values
dmlist: np.ndarray of dispersion measures (extragalactic!)
"""
# get indices in dm space
kdms=DMlist/self.ddm - 0.5 # idea: if DMobs = ddm, we are half-way between bin 0 and bin 1
Bin0 = np.where(kdms < 0.)[0] # if DMs are in the lower half of the lowest bin, use lowest bin only
kdms[Bin0] = 0.
idms1=kdms.astype('int') # rounds down
idms2=idms1+1
dkdms2=kdms-idms1 # applies to idms2, i.e. the upper bin. If DM = ddm, then this should be 0.5
dkdms1 = 1.-dkdms2 # applies to idms1
return idms1,idms2,dkdms1,dkdms2
[docs]
def get_z_coeffs(self,zlist):
"""
Returns indices and coefficients for interpolating between z values
zlist: np.ndarray of dispersion measures (extragalactic!)
"""
kzs=zlist/self.dz - 0.5
Bin0 = np.where(kzs < 0.)[0]
kzs[Bin0] = 0.
izs1=kzs.astype('int')
izs2=izs1+1
dkzs2=kzs-izs1 # applies to izs2
dkzs1 = 1. - dkzs2
# checks for values which are too large
toobigz = np.where(zlist > self.zvals[-1] + self.dz/2.)[0]
if len(toobigz) > 0:
raise ValueError("Redshift values ",zlist[toobigz],
" too large for grid max of ",self.zvals[-1] + self.dz/2.)
# checks for zs in top half of top bin - only works because of above bin
topbin = np.where(zlist > self.zvals[-1])[0]
if len(topbin) > 0:
izs2[topbin] = self.zvals.size-1
izs1[topbin] = self.zvals.size-2
dkzs2[topbin] = 1.
dkzs1[topbin] = 0.
return izs1, izs2, dkzs1, dkzs2
[docs]
def check_grid(self,TOLERANCE = 1e-6):
"""
Check that the grid values are behaving as expected
TOLERANCE: defines the max relative difference in expected
and found values that will be tolerated
"""
self.nz = self.zvals.size
self.ndm = self.dmvals.size
# check to see if these are log-spaced
if (self.zvals[-1] - self.zvals[-2]) / (self.zvals[1] - self.zvals[0]) > 1.01:
if (
np.abs(self.zvals[-1] * self.zvals[0] - self.zvals[-2] * self.zvals[1])
> 0.01
):
raise ValueError("Cannot determine scaling of zvals, exiting...")
self.zlog = True
self.dz = np.log(self.zvals[1] / self.zvals[0])
else:
self.zlog = False
self.dz = self.zvals[1] - self.zvals[0]
self.ddm = self.dmvals[1] - self.dmvals[0]
shape = self.grid.shape
if shape[0] != self.nz:
if shape[0] == self.ndm and shape[1] == self.nz:
print("Transposing grid, looks like first index is DM")
self.grid = self.grid.transpose
else:
raise ValueError("wrong shape of grid for zvals and dm vals")
else:
if shape[1] == self.ndm:
if self.verbose:
print("Grid successfully initialised")
else:
raise ValueError("wrong shape of grid for zvals and dm vals")
# checks that the grid is approximately linear to high precision
if self.zlog:
expectation = np.exp(np.arange(0, self.nz) * self.dz) * self.zvals[0]
else:
expectation = self.dz * np.arange(0, self.nz) + self.zvals[0]
diff = self.zvals - expectation
maxoff = np.max(diff ** 2)
if maxoff > TOLERANCE * self.dz:
raise ValueError(
"Maximum non-linearity in z-grid of ",
maxoff ** 0.5,
"detected, aborting",
)
# Ensures that log-spaced bins are truly bin centres
if not self.zlog and np.abs(self.zvals[0] - self.dz/2.) > TOLERANCE*self.dz:
raise ValueError(
"Linear z-grids *must* begin at dz/2. e.g. 0.05,0.15,0.25 etc, ",
" first value ",self.zvals[0]," expected to be half of spacing ",
self.dz,", aborting..."
)
expectation = self.ddm * np.arange(0, self.ndm) + self.dmvals[0]
diff = self.dmvals - expectation
maxoff = np.max(diff ** 2)
if maxoff > TOLERANCE * self.ddm:
raise ValueError(
"Maximum non-linearity in dm-grid of ",
maxoff ** 0.5,
"detected, aborting",
)
[docs]
def calc_dV(self, reINIT=False):
""" Calculates volume per steradian probed by a survey.
Does this only in the z-dimension (for obvious reasons!)
"""
if (cos.INIT is False) or reINIT:
# print('WARNING: cosmology not yet initiated, using default parameters.')
cos.init_dist_measures()
if self.zlog:
# if zlog, dz is actually .dlogz. And dlogz/dz=1/z, i.e. dz= z dlogz
self.dV = cos.dvdtau(self.zvals) * self.dz * self.zvals
else:
self.dV = cos.dvdtau(self.zvals) * self.dz
[docs]
def EF(self, alpha=0, bandwidth=1e9):
"""Calculates the fluence--energy conversion factors as a function of redshift
This does NOT account for the central frequency
"""
if self.state.FRBdemo.alpha_method == 0:
self.FtoE = cos.F_to_E(
1,
self.zvals,
alpha=alpha,
bandwidth=bandwidth,
Fobs=self.nuObs,
Fref=self.nuRef,
)
elif self.state.FRBdemo.alpha_method == 1:
self.FtoE = cos.F_to_E(1, self.zvals, alpha=0.0, bandwidth=bandwidth)
else:
raise ValueError("alpha method must be 0 or 1, not ", self.alpha_method)
[docs]
def set_evolution(self): # ,n,alpha=None):
""" Scales volumetric rate by SFR """
self.sfr=self.source_function(self.zvals,
self.state.FRBdemo.sfr_n)
if self.state.FRBdemo.alpha_method==1:
self.sfr *= (1.0 + self.zvals)**(-self.state.energy.alpha) #reduces rate with alpha
# changes absolute normalisation at z=0 according to central frequency
self.sfr *= (
self.nuObs / self.nuRef
) ** -self.state.energy.alpha # alpha positive, nuObs<nuref, expected rate increases
[docs]
def calc_pdv(self, beam_b=None, beam_o=None):
""" Calculates the rate per cell.
Assumed model: a power-law between Emin and Emax (erg)
with slope gamma.
Efficiencies: list of efficiency response to DM
So-far: does NOT include time x solid-angle factor
NOW: this includes a solid-angle and beam factor if initialised
This will recalculate beam factors if they are passed, however
during iteration this is not recalculated
"""
if beam_b is not None:
self.beam_b = beam_b
self.beam_o = beam_o
try:
x = beam_o.shape
x = beam_b.shape
except:
raise ValueError(
"Beam values must be numby arrays! Currently ", beam_o, beam_b
)
# linear weighted sum of probabilities: pdVdOmega now. Could also be used to include time factor
# For convenience and speed up
Emin = 10 ** self.state.energy.lEmin
Emax = 10 ** self.state.energy.lEmax
# this implementation allows us to access the b-fractions later on
if (self.b_fractions is None) or (beam_b is not None):
self.b_fractions = np.zeros(
[self.zvals.size, self.dmvals.size, self.beam_b.size]
)
# we can now access the width information later on
if (self.w_fractions is None):
self.w_fractions = np.zeros(
[self.zvals.size, self.dmvals.size, self.eff_weights.size]
)
# for some arbitrary reason, we treat the beamshape slightly differently... no need to keep an intermediate product!
main_beam_b = self.beam_b
# call log10 beam
if self.use_log10:
new_thresh = np.log10(
self.thresholds
) # use when calling in log10 space conversion
main_beam_b = np.log10(main_beam_b)
for i, b in enumerate(main_beam_b):
# if eff_weights is 2D (i.e., z-dependent) then w is a vector of length NZ
# It is a probability - the detection efficiency is encapusalted by thresh"
for j, w in enumerate(self.eff_weights):
# using log10 space conversion
if self.use_log10:
thresh = new_thresh[j, :, :] - b
else: # original
thresh = self.thresholds[j, :, :] / b
# the below is to ensure this works when w is a vector of length nz
w = np.array(w)
# this array gives the relative probability of detecting an FRB at this point
# in the beam, with this particular width. We may not have space to store this
# as a 4D (w,b,z,DM) array, hence we store two 3D arrays
temp_wb = self.beam_o[i] \
* (self.array_cum_lf(
thresh, Emin, Emax, self.state.energy.gamma, self.use_log10
).T * w.T).T
# partial sum over all beam values for a given width
self.b_fractions[:, :, i] += temp_wb
# partial sum over all width values for a given beam
self.w_fractions[:, :, j] += temp_wb
# here, b-fractions are unweighted according to the value of b.
self.fractions = np.sum(
self.b_fractions, axis=2
) # sums over b-axis [ we could ignore this step?]
self.pdv = np.multiply(self.fractions.T, self.dV).T
[docs]
def get_pw_dist(self):
"""
Function asking the grid to return the p(w) distribution.
This will be an "all-burst" distribution in case of a
repeater inherited class.
Note that a grid does not actually know what a "width" means:
it is simply an abstract category of FRBs corresponding to a
particular efficiency and fraction of the population.
Args: None
Returns:
Wtots (np.ndarray): Rate per width bin
Wzs (np.ndarray: Nw x Nz): Rate as a function of w and z
Wdms (np.ndarray: Nw x Ndm): Rate as a function of w and DM
"""
Wtots = np.zeros([self.nw])
Wzs = np.zeros([self.nw,self.nz])
Wdms = np.zeros([self.nw,self.ndm])
const = 10**self.state.FRBdemo.lC
# consider if beamb in log10 space
# call log10 beam
if self.use_log10:
new_thresh = np.log10(
self.thresholds
) # use when calling in log10 space conversion
main_beam_b = np.log10(self.beam_b)
else:
main_beam_b = self.beam_b
# For convenience and speed up
Emin = 10 ** self.state.energy.lEmin
Emax = 10 ** self.state.energy.lEmax
# we record b-fractions, but NOT the width increments in each
for j, w in enumerate(self.eff_weights):
# if eff_weights is 2D (i.e., z-dependent) then w is a vector of length NZ
# resets p(z,dm) for this w
Warray = np.zeros([self.nz,self.ndm]) # we re-use this array for each w
# sums over the beam values
for i, b in enumerate(self.beam_b):
# using log10 space conversion
if self.use_log10:
thresh = new_thresh[j, :, :] - b
else: # original
thresh = self.thresholds[j, :, :] / b
# the below is to ensure this works when w is a vector of length nz
w = np.array(w)
Warray[:, :] += (
self.beam_o[i]
* (self.array_cum_lf(
thresh, Emin, Emax, self.state.energy.gamma, self.use_log10
).T * w.T).T
)
# accounts for z-dependent volumetric fractions
Warray = np.multiply(Warray.T, self.dV).T
# multiply by the DM distribution and star-formation-rate scaling
# and constant number of FRBs
Warray *= self.sfr_smear * const
Wzs[j,:] = np.sum(Warray,axis = 1)
Wdms[j,:] = np.sum(Warray,axis = 0)
Wtots[j] = np.sum(Wzs[j,:])
return Wtots,Wzs,Wdms
[docs]
def calc_rates(self):
""" multiplies the rate per cell with the appropriate pdm plot """
try:
self.sfr
except:
print("WARNING: no evolutionary weight yet applied")
exit()
try:
self.smear_grid
except:
print("WARNING: no DMx smearing yet applied")
exit()
try:
self.pdv
except:
print("WARNING: no volumetric probability pdv yet calculated")
exit()
# zfraction describes the fraction of host galaxies estimated to be
# visible at a given redshift. Implementing zfraction then means this grid
# is calculating the *observable* z-DM space, rather than the intrinsic z-DM space
# zfractions are given as two arrays - the zvalues, and the f(z) values
if self.survey.survey_data.observing.Z_FRACTION is not None:
fdir = str(resources.files('zdm').joinpath('data/optical'))
ffile = fdir + "/fz_"+str(self.survey.survey_data.observing.Z_FRACTION)+".npy"
zfile = fdir + "/z_"+str(self.survey.survey_data.observing.Z_FRACTION)+".npy"
self.construct_fz(ffile,zfile)
self.sfr *= self.fz
self.sfr_smear = np.multiply(self.smear_grid.T, self.sfr).T
# below could pass more parameters internally, but this may not
# be the final implementation
self.rates = self.pdv * self.sfr_smear
self.zsigma = self.survey.survey_data.observing.Z_PHOTO
if self.zsigma > 0.:
self.smear_zgrid = self.smear_z(self.rates,self.zsigma)
self.rates=self.smear_zgrid
[docs]
def get_rates(self):
"""
Returns rates, multiplied by the relevant constant,
and accounting for any DM preference via a DM mask
"""
rates = np.zeros(self.rates.shape)
rates[:,:] = self.rates * 10**self.state.FRBdemo.lC
# multiplies by DM mask if applicable
if self.survey.dm_mask is not None:
rates = rates*self.survey.dm_mask
elif self.survey.max_dm is not None:
# in case a maximum DM is set in survey
if self.survey.max_idm < self.dmvals.size-1:
rates[:,self.survey.max_idm+1:]=0.
return rates
[docs]
def calc_thresholds(self, F0:float,
eff_table,
bandwidth=1e9,
nuRef=1.3e9, weights=None):
""" Sets the effective survey threshold on the zdm grid
Args:
F0 (float): base survey threshold
eff_table ([type]): table of efficiencies corresponding to DM-values. 1, 2, or 3 dimensions!
bandwidth ([type], optional): [description]. Defaults to 1e9.
nuObs ([float], optional): survey frequency (affects sensitivity via alpha - only for alpha method)
Defaults to 1.3e9.
nuRef ([float], optional): reference frequency we are calculating thresholds at
Defaults to 1.3e9.
weights ([type], optional): [description]. Defaults to None.
Raises:
ValueError: [description]
ValueError: [description]
"""
# keep the inputs for later use
self.F0 = F0
self.nuRef = nuRef
self.bandwidth = bandwidth
if eff_table.ndim == 1: # only a single FRB width: dimensions of NDM
self.nthresh = 1
self.eff_weights = np.array([1])
self.eff_table = np.array([eff_table]) # make it an extra dimension
else: # multiple FRB widths: dimensions nW x NDM
# check that the weights dimensions check out
self.nthresh = eff_table.shape[0] # number of width bins.
if weights is not None:
if weights.shape[0] != self.nthresh:
raise ValueError(
"Dimension of weights must equal first dimension of efficiency table"
)
else:
raise ValueError(
"For a multidimensional efficiency table, please set relative weights"
)
# I have removed weight normalisation here. In theory, normalisation to <1 is
# a feature, not a bug, representing more/less scattering moving into the
# observable range
self.eff_table = eff_table
self.eff_weights = weights
self.nw = self.eff_weights.shape[0]
# now two or three dimensions
Eff_thresh = F0 / self.eff_table
self.EF(self.state.energy.alpha, bandwidth) # sets FtoE values - could have been done *WAY* earlier
self.thresholds = np.zeros([self.nthresh, self.zvals.size, self.dmvals.size])
# Performs an outer multiplication of conversion from fluence to energy.
# The FtoE array has one value for each redshift.
# The effective threshold array has one value for each combination of
# FRB width (nthresh) and DM.
# We loop over nthesh and generate a NDM x Nz array for each
for i in np.arange(self.nthresh):
if self.eff_table.ndim == 2:
self.thresholds[i,:,:] = np.outer(self.FtoE, Eff_thresh[i,:])
else:
self.thresholds[i,:,:] = ((Eff_thresh[i,:,:]).T * self.FtoE).T
[docs]
def smear_dm(self, smear:np.ndarray): # ,mean:float,sigma:float):
""" Smears DM using the supplied array.
Example use: DMX contribution
smear_grid is created in place
Args:
smear (np.ndarray): Smearing array
"""
# just easier to have short variables for this
ls = smear.size
lz, ldm = self.grid.shape
if not hasattr(self, "smear_grid"):
self.smear_grid = np.zeros([lz, ldm])
self.smear = smear
# this method is O~7 times faster than the 'brute force' above for large arrays
for i in np.arange(lz):
# we need to get the length of mode='same', BUT
# we do not want it 'centred', hence must make cut on full
if smear.ndim == 1:
self.smear_grid[i, :] = np.convolve(
self.grid[i, :], smear, mode="full"
)[0:ldm]
elif smear.ndim == 2:
self.smear_grid[i, :] = np.convolve(
self.grid[i, :], smear[i, :], mode="full"
)[0:ldm]
else:
raise ValueError(
"Wrong number of dimensions for DM smearing ", smear.shape
)
[docs]
def get_p_zgdm(self, DMs: np.ndarray):
""" Calcuates the probability of redshift given a DM
We already have our grid of observed DM values.
Just take slices!
Args:
DMs (np.ndarray): array of DM values
Returns:
np.ndarray: array of priors for the DMs
"""
# first gets ids of matching DMs
priors = np.zeros([DMs.size, self.zvals.size])
for i, dm in enumerate(DMs):
DM2 = np.where(self.dmvals > dm)[0][0]
DM1 = DM2 - 1
kDM = (dm - self.dmvals[DM1]) / (self.dmvals[DM2] - self.dmvals[DM1])
priors[i, :] = kDM * self.rates[:, DM2] + (1.0 - kDM) * self.rates[:, DM1]
priors[i, :] /= np.sum(priors[i, :])
return priors
[docs]
def GenMCSample(self, N, Poisson=False):
"""
Generate a MC sample of FRB events
If Poisson=True, then interpret N as a Poisson expectation value
Otherwise, generate precisely N FRBs
Generated values are [MCz, MCDM, MCb, MCs, MCw]
NOTE: the routine GenMCFRB does not know 'w', merely
which w bin it generates.
"""
# Boost?
if self.state.energy.luminosity_function in [1, 2]:
Emax_boost = 3.0
else:
Emax_boost = 0.0
if Poisson:
# from np.random import poisson
NFRB = np.random.poisson(N)
else:
NFRB = int(N) # just to be sure...
sample = []
for i in np.arange(NFRB):
if (i % 100) == 0:
print(i)
# Regen if the survey would not find this FRB
frb = self.GenMCFRB(Emax_boost)
# This is a pretty naive method of generation.
if self.survey.max_dmeg is not None:
while frb[1] > self.survey.max_dm:
print("Regenerating MC FRB with too high DM ",frb[1],self.survey.max_dm)
frb = self.GenMCFRB(Emax_boost)
sample.append(frb)
sample = np.array(sample)
return sample
[docs]
def initMC(self):
"""
Initialises the MC sample, if it has not been doen already
This uses a great deal of RAM - hence, do not do this lightly!
"""
# shorthand
lEmin = self.state.energy.lEmin
lEmax = self.state.energy.lEmax
gamma = self.state.energy.gamma
Emin = 10 ** lEmin
Emax = 10 ** lEmax
# grid of beam values, weights
nw = self.nw
nb = self.beam_b.size
if self.eff_weights.ndim > 1:
raise ValueError("MC generation from z-dependent widths not currently enabled")
# holds array of probabilities in w,b space
pwb = np.zeros([nw * nb])
rates = []
pzcs = []
# gets list of DM probabilities to set to zero due to
# the survey missing these FRBs
if self.survey.max_dm is not None:
setDMzero = np.where(self.dmvals +self.ddm/2. > self.survey.max_dm)[0]
# Generates a joint distribution in B,w
for i, b in enumerate(self.beam_b):
for j, w in enumerate(self.eff_weights):
# each of the following is a 2D array over DM, z which we sum to generate B,w values
pzDM = self.array_cum_lf(
self.thresholds[j, :, :] / b,
Emin, Emax, gamma)
# sets to zero if we have a max survey DM
if self.survey.max_dm is not None:
pzDM [:,setDMzero] = 0.
# weighted pzDM
wb_fraction = (self.beam_o[i]* w * pzDM)
pdv = np.multiply(wb_fraction.T, self.dV).T
rate = pdv * self.sfr_smear
# We do not implement photo-z smearing here
# the MC generates truth values of parameters
# smearing can be done very simple afterwards
# this smears the
#if self.survey.observing.Z_PHOTO > 1.:
# rate = self.smear_z(rate,self.survey.observing.Z_PHOTO)
# #rate=np.copy(self.smear_zgrid)
rates.append(rate)
pwb[i * nw + j] = np.sum(rate)
pz = np.sum(rate, axis=1)
pzc = np.cumsum(pz)
pzc /= pzc[-1]
pzcs.append(pzc)
# generates cumulative distribution for sampling w,b
pwbc = np.cumsum(pwb)
pwbc /= pwbc[-1]
# saves cumulative distributions for sampling
self.MCpwbc = pwbc
# saves individal wxb zDM rates for sampling these distributions
self.MCrates = rates
# saves projections onto z-axis
self.MCpzcs = pzcs
self.MCinit = True
[docs]
def GenMCFRB(self, Emax_boost):
"""
Generates a single FRB according to the grid distributions
Samples beam position b, FRB DM, z, s=SNR/SNRth, and w
Currently: no interpolation included.
This should be implemented in s,DM, and z.
NOTE: currently, the actual FRB widths are not part of 'grid'
only the relative probabilities of any given width.
Hence, this routine only returns the integer of the width bin
not the width itelf.
Args:
pwb (optional): probability(width,beam)
Emax_boost (float, optional):
Allow for larger energies than Emax
The factor is logarithmic, i.e. Emax_boost = 2. allows
for 10**2 higher energies than Emax
Returns:
tuple: FRBparams=[MCz,MCDM,MCb,j,MCs], pwb values
These are:
MCz: redshift
MCDM: dispersion measure (extragalactic)
MCb: beam value
j:
MCs: SNR/SNRth value of FRB
MCw: width value of FRB
[MCz, MCDM, MCb, j, MCs, MCw]
"""
# shorthand
lEmin = self.state.energy.lEmin
lEmax = self.state.energy.lEmax
gamma = self.state.energy.gamma
Emin = 10 ** lEmin
Emax = 10 ** lEmax
# grid of beam values, weights
nw = self.nw
if self.eff_weights.ndim > 1:
raise ValueError("MC generation from z-dependent widths not currently enabled")
nb = self.beam_b.size
if not self.MCinit:
self.initMC()
# sample distribution in w,b
# we do NOT interpolate here - we treat these as qualitative values
# i.e. as if there was an irregular grid of them
r = np.random.rand(1)[0]
which = np.where(self.MCpwbc > r)[0][0]
i = int(which / nw)
j = which - i * nw
MCb = self.beam_b[i]
MCw = self.eff_weights[j]
# get p(z,DM) distribution for this b,w
pzDM = self.MCrates[which]
pzc = self.MCpzcs[which]
r = np.random.rand(1)[0]
# sampling in DM and z space
# First choose z: pzc is the cumulative distribution in z
# for all dm
# each probability represents the p(bin), i.e. z-dz/2. to z+dz/2
# first, find the bin where the cumulative probability is higher
# than the sampled amount.
# Alternative method: just use the distribution from the bin,
# then multiply the resulting DM linearly with deltaz/z.
# Would be better at low z, worse at high z
iz2 = np.where(pzc > r)[0][0]
if iz2 > 0:
iz1 = iz2 - 1
iz3 = iz2 + 1
dr = r - pzc[iz1]
fz = dr / (pzc[iz2] - pzc[iz1]) # fraction of way to upper z value
# move a fraction of kz2 between z-dz/0.5 and z + dz/0.5
#MCz = self.zvals[iz2] + (kz2-0.5)*dz
# weigts between iz1 and iz2
if fz < 0.5:
kz1 = 0.5 - fz
kz2 = 0.5 + fz
kz3 = 0.
iz3 = 0 # dummy
elif iz2 == self.zvals.size-1:
# we are in the last bin - don't extrapolate, just use it
kz1 = 0.
kz2 = 1.
kz3 = 0.
iz1 = 0 # dummy
iz3 = 0 # dummy
else:
kz1 = 0.
kz2 = (1.5-fz)
kz3 = fz-0.5
iz1 = 0 #dummy
pDM = pzDM[iz1, :] * kz1 + pzDM[iz2, :] * kz2 + pzDM[iz3, :] * kz3
MCz = self.zvals[iz1] * kz1 + self.zvals[iz2] * kz2 + self.zvals[iz3]*kz3
else:
# we perform a simple linear interpolation in z from 0 to minimum bin
fz = r / pzc[iz2]
kz1 = 0.
kz2 = 1.
kz3 = 0.
iz1 = 0 # dummy
iz3 = 0 # dummy
MCz = (self.zvals[iz2] + self.dz/0.5) * fz
# Just use the value of lowest bin.
# This is a gross and repugnant approximation
pDM = pzDM[iz2, :]
# NOW DO dm
# DM represents the distribution for the centre of z-bin
pDMc = np.cumsum(pDM)
pDMc /= pDMc[-1]
r = np.random.rand(1)[0]
iDM2 = np.where(pDMc > r)[0][0]
if iDM2 > 0:
iDM1 = iDM2 - 1
iDM3 = iDM2 + 1
dDM = r - pDMc[iDM1]
# fraction of value through DM bin
fDM = dDM / (pDMc[iDM2] - pDMc[iDM1])
# get the MC DM through interpolation
if fDM < 0.5:
kDM1 = 0.5 - fDM
kDM2 = 0.5 + fDM
kDM3 = 0.
iDM3 = 0 # dummy
# sets iDM3 to be safe at 0
MCDM = self.dmvals[iDM1] * kDM1 + self.dmvals[iDM2] * kDM2
elif iDM2 == self.dmvals.size-1:
kDM1 = 0.
kDM2 = 1. # for future use, not here
kDM3 = 0.
iDM1 = 0 # dummy
iDM3 = 0 # dummy
MCDM = self.dmvals[iDM2] + (fDM - 0.5)*self.dDM # upper DM bins
else:
kDM1 = 0.
kDM2 = 1.5-fDM
kDM3 = fDM - 0.5
iDM1 = 0 # dummy
MCDM = self.dmvals[iDM3] * kDM3 + self.dmvals[iDM2] * kDM2
else:
# interpolate linearly from 0 to the minimum value
fDM = r / pDMc[iDM2]
MCDM = (self.dmvals[iDM2] + self.ddm/2.) * fDM
kDM1 = 0.
kDM2 = 1.
kDM3 = 0.
iDM1 = 0 #dummy
iDM3 = 0 #dummy
# This is constructed such that weights and iz, iDM will work out
# for all cases of the above. Note that only four of these terms at
# most will ever be non-zero.
Eth = self.thresholds[j, iz1, iDM1] * kz1 * kDM1 \
+ self.thresholds[j, iz1, iDM2] * kz1 * kDM2 \
+ self.thresholds[j, iz1, iDM3] * kz1 * kDM3 \
+ self.thresholds[j, iz2, iDM1] * kz2 * kDM1 \
+ self.thresholds[j, iz2, iDM2] * kz2 * kDM2 \
+ self.thresholds[j, iz2, iDM3] * kz2 * kDM3 \
+ self.thresholds[j, iz3, iDM1] * kz3 * kDM1 \
+ self.thresholds[j, iz3, iDM2] * kz3 * kDM2 \
+ self.thresholds[j, iz3, iDM3] * kz3 * kDM3 \
# now account for beamshape
Eth /= MCb
# NOW GET snr
# Eth=self.thresholds[j,k,l]/MCb
Es = np.logspace(np.log10(Eth), lEmax + Emax_boost, 1000)
PEs = self.vector_cum_lf(Es, Emin, Emax, gamma)
PEs /= PEs[0] # normalises: this is now cumulative distribution from 1 to 0
r = np.random.rand(1)[0]
iE1 = np.where(PEs > r)[0][-1] # returns list starting at 0 and going upwards
iE2 = iE1 + 1
# iE1 should never be the highest energy, since it will always have a probability of 0 (or near 0 for Gamma)
kE1 = (r - PEs[iE2]) / (PEs[iE1] - PEs[iE2])
kE2 = 1.0 - kE1
MCE = 10 ** (np.log10(Es[iE1]) * kE1 + np.log10(Es[iE2]) * kE2)
MCs = MCE / Eth
FRBparams = [MCz, MCDM, MCb, MCs, MCw]
return FRBparams
[docs]
def build_sz(self):
pass
[docs]
def update(self, vparams: dict, ALL=False, prev_grid=None):
"""Update the grid based on a set of input
parameters
Hierarchy:
Each indent corresponds to one 'level'.
This is used in the program control below
to dictate how far each tree should proceed
in calculation.
Direct variable inputs are always listed first
We see that sfr evolution and dm smearing
lie just before the pdv step
Hence, we deal with these first, and
calc rates as a final step regardless
of what else has changed.
Args:
vparams (dict): dict containing the parameters
to be updated and their values
prev_grid (Grid, optional):
If provided, it is assumed this grid has been
updated on items that need not be repeated for
the current grid. i.e. Speed up!
ALL (bool, optional): If True, update the full grid
calc_rates:
calc_pdv
Emin
Emax
gamma
H0
calc_thresholds
F0
alpha
bandwidth
set_evolution
sfr_n
H0
smear_grid
grid
mask
dmx_params (lmean, lsigma)
dV
zdm_grid
H0
Note that the grid is independent of the constant C (trivial dependence)
Args:
vparams (dict): [description]
"""
warnings.warn("grid.update is deprecated, create a new instantiation instead", DeprecationWarning)
# Init
reset_cos, get_zdm, calc_dV = False, False, False
smear_mask, smear_dm, calc_pdv, set_evol = False, False, False, False
new_sfr_smear, new_pdv_smear, calc_thresh = False, False, False
# if we are updating a grid, the MC will in-general need to be
# re-initialised
self.MCinit = False
# Cosmology -- Only H0 so far
if self.chk_upd_param("H0", vparams, update=True):
reset_cos = True
get_zdm = True
calc_dV = True
smear_dm = True
calc_thresh = True
calc_pdv = True
set_evol = True
new_sfr_smear = True
# IGM
if self.chk_upd_param("logF", vparams, update=True):
get_zdm = True
smear_dm = True
# calc_thresh = False # JMB
# calc_pdv = False # JMB
# set_evol = False # JMB
new_sfr_smear = True
# DM_host
# IT IS IMPORTANT TO USE np.any so that each item is executed!!
if np.any(
[
self.chk_upd_param("lmean", vparams, update=True),
self.chk_upd_param("lsigma", vparams, update=True),
]
):
smear_mask = True
smear_dm = True
new_sfr_smear = True
# SFR?
if self.chk_upd_param("sfr_n", vparams, update=True):
set_evol = True
new_sfr_smear = True # True for either alpha_method
if self.chk_upd_param("alpha", vparams, update=True):
set_evol = True
if self.state.FRBdemo.alpha_method == 0:
calc_thresh = True
calc_pdv = True
new_pdv_smear = True
elif self.state.FRBdemo.alpha_method == 1:
new_sfr_smear = True
##### examines the 'pdv tree' affecting sensitivity #####
# begin with alpha
# alpha does not change thresholds under rate scaling, only spec index
if np.any(
[
self.chk_upd_param("lEmin", vparams, update=True),
self.chk_upd_param("lEmax", vparams, update=True),
self.chk_upd_param("gamma", vparams, update=True),
]
):
calc_pdv = True
new_pdv_smear = True
if self.chk_upd_param("DMhalo", vparams, update=True):
# Update survey params
self.survey.init_DMEG(vparams["DMhalo"])
# NOTE: In future we can change this to not need to recalc every time
self.survey.get_efficiency_from_wlist(self.survey.DMlist,self.survey.wlist,self.survey.wplist,model=self.survey.meta['WBIAS'])
self.eff_table = self.survey.efficiencies
calc_thresh = True
calc_pdv = True
new_pdv_smear = True
# ###########################
# NOW DO THE REAL WORK!!
# TODO -- For cubes with multiple surveys can we do these
# first two steps (even the first 5!) only once??
# Update cosmology?
if reset_cos and prev_grid is None:
cos.set_cosmology(self.state)
cos.init_dist_measures()
if get_zdm or ALL:
if prev_grid is None:
zDMgrid, zvals, dmvals = misc_functions.get_zdm_grid(
self.state,
new=True,
plot=False,
method="analytic",
save=False,
nz=self.zvals.size,
zmax=self.zvals[-1],
ndm=self.dmvals.size,
dmmax=self.dmvals[-1],
zlog=self.zlog,
)
self.parse_grid(zDMgrid, zvals, dmvals)
else:
# Pass a copy (just to be safe)
self.parse_grid(
prev_grid.grid.copy(),
prev_grid.zvals.copy(),
prev_grid.dmvals.copy(),
)
if calc_dV or ALL:
if prev_grid is None:
self.calc_dV()
else:
self.dV = prev_grid.dV.copy()
# Smear?
if smear_mask or ALL:
if prev_grid is None:
self.smear = pcosmic.get_dm_mask(
self.dmvals,
(self.state.host.lmean, self.state.host.lsigma),
self.zvals,
)
else:
self.smear = prev_grid.smear.copy()
if smear_dm or ALL:
if prev_grid is None:
self.smear_dm(self.smear)
else:
self.smear = prev_grid.smear.copy()
self.smear_grid = prev_grid.smear_grid.copy()
if calc_thresh or ALL:
self.calc_thresholds(
self.F0,
self.eff_table,
bandwidth=self.bandwidth,
weights=self.eff_weights,
)
if calc_pdv or ALL:
self.calc_pdv()
if set_evol or ALL:
self.set_evolution() # sets star-formation rate scaling with z - here, no evoltion...
if new_sfr_smear or ALL:
self.calc_rates() # includes sfr smearing factors and pdv mult
elif new_pdv_smear:
self.rates = self.pdv * self.sfr_smear # does pdv mult only, 'by hand'
# Catch all the changes just in case, e.g. lCf
# Can no longer do this because of repeat_grid
self.state.update_params(vparams)
return new_sfr_smear, new_pdv_smear, (get_zdm or smear_dm or calc_dV) # If either is true, need to also recalc repeater grids
[docs]
def chk_upd_param(self, param: str, vparams: dict, update=False):
""" Check to see whether a parameter is
differs from that in self.state
Args:
param (str): Paramter in question
vparams (dict): Dict holding the value
update (bool, optional): If True,
update the value in self.state.
Defaults to False.
Returns:
bool: True if the parameter is different
"""
updated = False
DC = self.state.params[param]
# In dict?
if param in vparams.keys():
# Changed?
if vparams[param] != getattr(self.state[DC], param):
updated = True
if update:
self.state.update_param(param, vparams[param])
#
return updated
[docs]
def smear_z(self,array,zsigma):
"""
Smear a 2-D z-DM grid along the redshift axis to account for
photometric redshift uncertainty.
When a survey uses photometric rather than spectroscopic redshifts,
the true redshift of each FRB host is uncertain. This method convolves
each column of the grid (i.e. each fixed-DM slice along the z axis)
with a Gaussian kernel whose standard deviation equals ``zsigma``,
redistributing probability across neighbouring redshift bins.
The kernel is truncated at ``state.photo.sigma_width`` standard
deviations on each side (default 6σ), rounded up to an odd number of
bins so that it is centred exactly on zero.
Parameters
----------
array : np.ndarray, shape (Nz, NDM)
Input 2-D grid with redshift along axis 0 and DM along axis 1.
zsigma : float
Photometric redshift uncertainty (1σ), in the same units as
``self.zvals`` (i.e. dimensionless redshift).
Returns
-------
smear_zgrid : np.ndarray, shape (Nz, NDM)
Copy of ``array`` with each DM column convolved along the z axis
by the Gaussian smearing kernel. Boundary effects are handled with
``np.convolve`` mode ``"same"``, so the output has the same shape
as the input.
Notes
-----
The kernel width in grid bins is ``zsigma / self.dz``. Values near the
grid edges will be underestimated because the convolution truncates to
zero outside the grid; for well-chosen grid extents this edge effect is
negligible.
In ``calc_rates``, this method is called with ``zsigma`` taken from
``self.survey.survey_data.observing.Z_PHOTO`` and applied to
``self.rates`` after the FRB rate grid has been computed.
"""
r,c=array.shape
# get sigma in grid units
sigma=zsigma/(self.dz)
smear_size=int(self.state.photo.sigma_width*sigma)
smear_size=smear_size-smear_size%2+1
smear_arr=np.linspace(-(smear_size-1)/2,(smear_size-1)//2,smear_size)
# makes the approximation of taking the central value in the bin.
smear_arr=np.exp(-(smear_arr**2)/(2*(sigma**2)))
#normalise
smear_arr/=np.sum(smear_arr)
smear_zgrid=np.zeros([r,c])
for i in range(c):
smear_zgrid[:,i]=np.convolve(array[:,i],smear_arr,mode="same")
return smear_zgrid
[docs]
def construct_fz(self,ffile,zfile):
"""
linearly interpolates passed fz values onto own zvals array
Args:
ffile (string): file containing fraction of hosts seen at given redshift
zfile(string): file containing z values of above
"""
fz = np.load(ffile)
z = np.load(zfile)
from scipy.interpolate import interp1d
f=interp1d(z,fz,kind="linear",bounds_error=False)
newfz=f(self.zvals)
# check for unphysical values
toolow = np.where(newfz < 0.)
newfz[toolow] = 0
toohigh = np.where(newfz > 1.)
newfz[toohigh] = 1.
self.fz = newfz
#np.save(path+"/"+name+"_fz",newfz)
#np.save(path+"/"+name+"_z",newz)