"""
FRB Survey class for modeling telescope observations.
This module provides the Survey class, which encapsulates all properties of an
FRB survey including instrument characteristics, beam patterns, detection
efficiencies, and detected FRB data.
The Survey class handles:
- Loading survey definition files (ECSV format) with FRB metadata
- Beam pattern modeling and solid angle calculations
- Detection efficiency as a function of DM and width
- DM budget calculations (Galactic, halo, host contributions)
- Threshold calculations for FRB detection
Survey Definition Files
-----------------------
Survey files are stored in `zdm/data/Surveys/` and contain:
- Header metadata: telescope parameters, beam settings, frequency, etc.
- FRB table: TNS name, DM, position, width, fluence for each detection
Example
-------
>>> from zdm import survey
>>> from zdm.parameters import State
>>> state = State()
>>> dmvals = np.linspace(0, 3000, 1000)
>>> s = survey.Survey(state, 'CRAFT/ICS', 'CRAFT_ICS.ecsv', dmvals)
>>> s.NFRB # Number of FRBs
10
>>> s.DMEGs # Extragalactic DM values
array([...])
Author: C.W. James
"""
import numpy as np
import os
from scipy.integrate import quad
from dataclasses import dataclass, fields
import importlib.resources as resources
import pandas
from astropy.table import Table
import json
from ne2001 import density
from zdm import beams, parameters
from zdm import pcosmic
from zdm import survey_data
from zdm import galactic_dm_models
from zdm import misc_functions
import matplotlib.pyplot as plt
from IPython import embed
import warnings
from astropy import units as u
from astropy.coordinates import SkyCoord
# minimum threshold to use - it's a substitute for zero
MIN_THRESH = 1e-10
[docs]
class Survey:
"""Represents an FRB survey with instrument properties and detected FRBs.
The Survey class is the primary interface for defining telescope
characteristics and loading FRB detections. It computes detection
efficiencies across the DM-width-fluence parameter space.
Attributes
----------
name : str
Survey identifier (e.g., 'CRAFT/ICS', 'Parkes/Mb').
NFRB : int
Number of FRBs in the survey.
DMEGs : ndarray
Extragalactic DM values for detected FRBs.
Zs : ndarray or None
Redshifts for localized FRBs (None if unknown).
meta : dict
Survey metadata from file header.
efficiencies : ndarray
Detection efficiency grid as function of DM and width.
"""
[docs]
def __init__(self, state, survey_name: str,
filename: str,
dmvals: np.ndarray,
zvals: np.ndarray = None,
NFRB: int = None,
iFRB: int = 0,
edir=None,
rand_DMG=False,
survey_dict=None):
"""Initialize an FRB Survey.
Parameters
----------
state : parameters.State
State object containing model parameters.
survey_name : str
Identifier for the survey (e.g., 'CRAFT/ICS').
filename : str
Path to survey definition file (ECSV format).
dmvals : ndarray
Extragalactic DM grid values for efficiency calculations.
zvals : ndarray, optional
Redshift grid values. Required for some width methods.
NFRB : int, optional
Limit number of FRBs loaded from file.
iFRB : int, optional
Starting index for FRB selection. Default is 0.
edir : str, optional
Directory containing pre-computed efficiency files.
rand_DMG : bool, optional
If True, randomize Galactic DM within uncertainty. Default False.
survey_dict : dict, optional
Override survey metadata parameters.
"""
# Proceed
self.state = state
self.name = survey_name
self.dmvals = dmvals
self.zvals = zvals
self.NDM = dmvals.size
if zvals is not None:
self.NZ = zvals.size
self.edir = edir
# Load up
self.process_survey_file(filename, NFRB, iFRB, min_lat=state.analysis.min_lat,
dmg_cut=state.analysis.DMG_cut,survey_dict = survey_dict)
# Check if repeaters or not and set relevant parameters
# Now done in loading
# self.repeaters=False
# self.init_repeaters()
# DM EG
self.init_halo_coeffs()
if rand_DMG:
self.randomise_DMG(state.MW.sigmaDMG)
self.init_DMEG(state.MW.DMhalo, state.MW.halo_method)
# Zs
print("Initialising zs")
self.init_zs() # This should be redone every time DMhalo is changed IF we use a flat cutoff on DMEG
# Allows survey metadata to over-ride parameter defaults if present.
# This is required when mixing CHIME and non-CHIME FRBs
beam_method = self.meta['BMETHOD']
beam_thresh = self.meta['BTHRESH']
self.init_beam(
method=beam_method,
plot=False,
thresh=beam_thresh) # tells the survey to use the beam file
# initialise scattering/width and resulting efficiences
self.init_widths()
self.calc_max_dm()
self.init_frb_bvals() #initial;ise weights for FRBs with known beam values
self.init_frb_wvals()
self.calc_max_dm()
[docs]
def init_widths(self,state=None):
"""
Performs initialisation of width and scattering distributions
Args:
state (parameters.state object., optional): if set, assume
this state contains new scattering/width parameters
"""
if state is not None:
self.state = state
# copies over Width bin information
self.NWbins = self.state.width.WNbins
self.WMin = self.state.width.WMin
self.WMax = self.state.width.WMax
self.thresh = self.state.width.Wthresh
self.wlogmean = self.state.width.Wlogmean
self.wlogsigma = self.state.width.Wlogsigma
if self.meta['WBIAS'] == 'Quadrature' or self.meta['WBIAS'] == 'Sammons' or self.meta['WBIAS']=="StdDev":
# these allow width-dependent sensitivities
self.width_method = self.state.width.Wmethod
else:
# the sensitivity is a fixed function of DM. Don't waste
# time on multiple width bins! (though maybe we should...)
# so just set to a constant width of 1, i.e. a single bin.
self.width_method = 0
if self.width_method == 3 and self.zvals is None:
raise ValueError("Width method 3 requires z-values to be set")
self.NInternalBins=self.state.width.WNInternalBins
# records scattering information, scaling
# according to frequency
self.slogmean=self.state.scat.Slogmean \
+ self.state.scat.Sfpower*np.log10(
self.meta['FBAR']/self.state.scat.Sfnorm
)
self.slogsigma=self.state.scat.Slogsigma
self.maxsigma=self.state.scat.Smaxsigma
self.scatdist=self.state.scat.ScatDist
self.backproject=self.state.scat.Sbackproject
# sets internal functions
WF = self.state.width.WidthFunction
if WF ==0:
self.WidthFunction = constant
elif WF == 1:
self.WidthFunction = lognormal
elif WF == 2:
self.WidthFunction = halflognormal
else:
raise ValueError("state parameter scat.WidthFunction ",WF," not implemented, use 0-2 only")
SF = self.state.scat.ScatFunction
if SF ==0:
self.ScatFunction = constant
elif SF == 1:
self.ScatFunction = lognormal
elif SF == 2:
self.ScatFunction = halflognormal
else:
raise ValueError("state parameter scat.ScatFunction ",SF," not implemented, use 0-2 only")
# sets n width bins equal to zero for this survey
if self.width_method == 0 or self.width_method == 4:
self.NWbins = 1
###### calculate width bins. We fix these here ######
# Unless w and tau are explicitly being fit, it is not actually necessary
# to have constant bin values over z and DM. But best to do so!
# Here, wbins are the bin edges, w list the midpoint values used for calculations
# Nbins describes the number of bins, so Nedges is Nbins+1
wbins = np.zeros([self.NWbins+1])
if self.NWbins > 1:
wbins = np.logspace(np.log10(self.WMin),np.log10(self.WMax),self.NWbins+1)
dlogw = np.log10(wbins[2]/wbins[1])
#wbins[0] = wbins[1]-dlogw # no longer tint: 1.e-10 # set to a tiny value, to ensure we capture all small widths
# offsets the mean by half the log-spacing for each
wlist = np.logspace(np.log10(self.WMin)+dlogw/2.,np.log10(self.WMax)-dlogw/2.,self.NWbins)
wbins[0] -= 3 # ensures we capture low values!
else:
wbins[0] = self.WMin
wbins[1] = self.WMax
dlogw = np.log10(wbins[1]/wbins[0])
wlist = np.array([(self.WMax*self.WMin)**0.5])
self.wbins = wbins
self.wlist = wlist
self.dlogw = dlogw
####### generates internal width values of numerical calculation purposes #####
#minval = np.min([self.wlogmean - self.maxsigma*self.wlogsigma,
# self.slogmean - self.maxsigma*self.slogsigma,
# np.log10(self.WMin)])
minval = np.log10(self.WMin)-3
maxval = np.log10(self.WMax)
# I haven't decided yet where to put the value of 1000 internal bins
# in terms of parameters.
self.internal_logwvals = np.linspace(minval,maxval,self.NInternalBins)
# initialise probability bins
if self.width_method == 3:
# evaluate efficiencies at each redshift
self.efficiencies = np.zeros([self.NWbins,self.NZ,self.NDM])
self.wplist = np.zeros([self.NWbins,self.NZ])
self.DMlist = np.zeros([self.NZ,self.NDM])
self.mean_efficiencies = np.zeros([self.NZ,self.NDM])
if self.backproject:
self.pws = np.zeros([self.NZ,self.internal_logwvals.size,self.NWbins]) #[iz,:,:] = pw
self.ptaus = np.zeros([self.NZ,self.internal_logwvals.size,self.NWbins]) #[iz,:,:] = ptau
# we have a z-dependent scattering and width model
for iz,z in enumerate(self.zvals):
self.make_widths(iz)
#_ = self.get_efficiency_from_wlist(self.wlist,self.wplist[:,iz],
# model=self.meta['WBIAS'], edir=self.edir, iz=iz)
else:
self.wplist = np.zeros([self.NWbins])
self.make_widths()
if self.backproject:
self.pws = np.zeros([self.internal_logwvals.size,self.NWbins]) #[iz,:,:] = pw
self.ptaus = np.zeros([self.internal_logwvals.size,self.NWbins]) #[iz,:,:] = ptau
#_ = self.get_efficiency_from_wlist(self.wlist,self.wplist,
# model=self.meta['WBIAS'], edir=self.edir, iz=None)
self.do_efficiencies()
[docs]
def process_dm_mask(self,edir=None):
"""
Processes a DM mask into units of local DM
"""
# loads the mask
if edir is None:
edir = resources.files('zdm').joinpath('data/Efficiencies')
filename = os.path.expanduser(os.path.join(edir, self.meta['DMMASK']))
if not os.path.exists(filename):
raise ValueError("Could not find DM mask " + filename + " in directory "+edir)
# Should contain DM in the first row and efficiencies in the second row
sensitivity_array = np.load(filename)
if self.DMGs is not None:
effective_vals = self.dmvals + self.state.MW.DMhalo + np.median(self.DMGs)
else:
effective_vals = self.dmvals + self.state.MW.DMhalo + 30
dm_mask = np.interp(effective_vals, sensitivity_array[0,:], sensitivity_array[1,:], left=1.,right=0)
self.dm_mask = dm_mask
[docs]
def do_efficiencies(self):
"""
Function to handle calculation of efficiencies.
Allows this to be called externally, e.g. if DM halo model changes
"""
# process DM mask to get mask in units of used DM values
if self.meta['DMMASK'] is not None:
self.process_dm_mask()
else:
self.dm_mask = None
if self.width_method == 3:
for iz,z in enumerate(self.zvals):
_ = self.get_efficiency_from_wlist(self.wlist,self.wplist[:,iz],
model=self.meta['WBIAS'], edir=self.edir, iz=iz)
else:
_ = self.get_efficiency_from_wlist(self.wlist,self.wplist,
model=self.meta['WBIAS'], edir=self.edir, iz=None)
[docs]
def make_widths(self,iz=None):
"""
This routine calculates width distributions of FRBs, which
is required to estimate detection efficiency, and (potentially)
calculate p(w,tau).
The functionality depends on self.width_method, which
is set via
WMETHOD [int: 0-4]:
0: No distribution - calculations performed at total width = 1ms
1: FRB widths are a simple lognormal of self.wlogmean, self.wlogsigma
histogrammed into self.wbins. Describes widths as "total width",
i.e. intrinsic plus scattering.
2: FRB widths convolved with frequency-dependent scattering distribution.
Calculates lognormal intrinsic width distribution, and convolves
this with a lognormal scattering distribution. NOTE: if wmethod=2,
iz must be called with iz=None.
3: As above, but allows for redshift dependence of both intrinsic
width and scattering. Distributions both scale width redshift:
width with 1+z, scattering with (1+z)^-3
4: Use a specific width of a particular FRB. Used for detailed calcs
for individual FRB-by-FRB analyses.
"""
if self.width_method == 0:
# do not take a distribution, just use 1ms for everything
# this is done for tests, for complex surveys such as CHIME,
# or for estimating the properties of a single FRB
self.wlist[0] = 1.
self.wplist[0] = 1.
elif self.width_method == 1:
# take intrinsic width function only
for i in np.arange(self.NWbins):
norm=(2.*np.pi)**-0.5/self.wlogsigma
args=(self.wlogmean,self.wlogsigma,norm)
weight,err=quad(self.WidthFunction,
np.log10(self.wbins[i]),np.log10(self.wbins[i+1]),args=args)
self.wplist[i]=weight
elif self.width_method == 2 or self.width_method == 3:
# include scattering distribution. 3 means include z-dependence
#gets cumulative hist and bin edges
if iz is not None:
if self.width_method == 2:
print("Trying to calculate redshift dependence for redshift ",\
"independent WMETHOD=2. Internally inconsistent.")
exit()
z = self.zvals[iz]
else:
z=0.
# performs z-scaling (does nothing when z=0.)
wlogmean = self.wlogmean + np.log10(1+z)
wlogsigma = self.wlogsigma
slogmean = self.slogmean - 3.*np.log10(1+z)
slogsigma = self.slogsigma
# we need these for normalisation purposes. Otherwise, the
# scattering distribution gets diluted with a constant max value
# these are only used for normalisation purposes of otherwise
# unnormalisable functions.
smax = np.log10(self.WMax) - 3.*np.log10(1+z)
wmax = np.log10(self.WMax) + np.log10(1+z)
WidthArgs = (wlogmean,wlogsigma,wmax)
ScatArgs = (slogmean,slogsigma,smax)
if self.backproject:
dist,pw,ptau = quadrature_convolution(self.WidthFunction, WidthArgs, self.ScatFunction, ScatArgs,
self.internal_logwvals, self.wbins,backproject=self.backproject)
else:
dist = quadrature_convolution(self.WidthFunction, WidthArgs, self.ScatFunction, ScatArgs,
self.internal_logwvals, self.wbins,backproject=self.backproject)
if iz is not None:
self.wplist[:,iz] = dist
if self.backproject:
self.pws[iz,:,:] = pw
self.ptaus[iz,:,:] = ptau
else:
self.wplist[:] = dist
if self.backproject:
self.pws=pw
self.ptaus=ptau
elif self.width_method == 4:
# use specific width of FRB. This requires there to be only a single FRB in the survey
if s.meta['NFRB'] != 1:
raise ValueError("If width method in make_widths is 4 only one FRB should be specified in the survey but ", str(s.meta['NFRB']), " FRBs were specified")
else:
self.wplist[0] = 1.
self.wlist[0] = s.frbs['WIDTH'][0]
else:
raise ValueError("Width method in make_widths must be 0-4, not ",width_method)
[docs]
def init_repeaters(self):
"""
Checks to see if this is a repeater survey and if so ensures all the
relevant information is present.
"""
# self.repeaters = self.meta["REPEATERS"]
self.repeaters = True
# checks to see if NREP info is present. If not, assumes all single bursts
if not "NREP" in self.frbs:
print("Warning, no information of repetition provided")
print("Assuming all FRBs are once-off bursts")
self.frbs["NREP"] = np.full([self.NFRB],1,dtype='int')
# Set repeater/singles list
self.replist = np.where(self.frbs["NREP"] > 1)[0]
self.singleslist = np.where(self.frbs["NREP"] == 1)[0]
#------------------------------------------------------------------
self.drift_scan = self.meta['DRIFT_SCAN']
if self.drift_scan == 2:
self.Nfields = 1
self.Tfield = self.TOBS
elif self.drift_scan == 1:
# Check we have the necessary data to construct Nfields and Tfield
self.Nfields = self.meta['NFIELDS']
self.Tfield = self.meta['TFIELD']
if self.Nfields is None:
if self.Tfield is None or self.TOBS is None:
raise ValueError("At least 2 of NFIELDS, TFIELD and TOBS must be set in repeater surveys")
else:
self.Nfields = int(round(self.TOBS / self.Tfield))
# To account for rounding errors - TOBS is used anyways
self.Tfield = self.TOBS / self.Nfields
elif self.Tfield is None:
if self.TOBS is None:
raise ValueError("At least 2 of NFIELDS, TFIELD and TOBS must be set in repeater surveys")
else:
self.Tfield = self.TOBS / self.Nfields
elif self.TOBS is None:
self.TOBS = self.Nfields * self.Tfield
else:
# All three are set
if abs(self.Tfield - self.TOBS / self.Nfields) > 0.001:
raise ValueError("WARNING: Inconsistent values of Tfield, Tobs, Nfields: ",
self.Tfield, self.TOBS, self.Nfields)
#------------------------------------------------------------------
# Check we have number of repeaters / singles
self.NORM_REPS = self.meta['NORM_REPS']
self.NORM_SINGLES = self.meta['NORM_SINGLES']
if self.NORM_FRB == 0:
# Case of no detections
if (self.NORM_REPS is None and self.NORM_SINGLES is None) or (self.NORM_REPS == 0 or self.NORM_SINGLES == 0):
self.NORM_REPS = 0
self.NORM_SINGLES = 0
# Case invalid
elif self.NORM_REPS is None or self.NORM_SINGLES is None:
raise ValueError("At least 2 of NORM_FRB, NORM_REPS and NORM_SINGLES must be set in repeater surveys")
# Case of NORM_FRB not set
else:
self.NORM_FRB = self.NORM_REPS + self.NORM_SINGLES
# print("NORM_FRB set to NORM_REPS + NORM_SINGLES = " + str(self.NORM_FRB))
elif self.NORM_REPS is None:
# Case invalid
if self.NORM_SINGLES is None or self.NORM_SINGLES > self.NORM_FRB:
raise ValueError("At least 2 of NORM_FRB, NORM_REPS and NORM_SINGLES must be set in repeater surveys and NORM_FRB = NORM_REPS + NORM_SINGLES")
# Case NORM_REPS not set
else:
self.NORM_REPS = self.NORM_FRB - self.NORM_SINGLES
# print("NORM_REPS set to NORM_FRB - NORM_SINGLES = " + str(self.NORM_REPS))
elif self.NORM_SINGLES is None:
# Do not need to consider NORM_REPS == None as that is done in the previous elif
# Case invalid
if self.NORM_REPS > self.NORM_FRB:
raise ValueError("At least 2 of NORM_FRB, NORM_REPS and NORM_SINGLES must be set in repeater surveys and NORM_FRB = NORM_REPS + NORM_SINGLES")
# Case NORM_SINGLES not set
else:
self.NORM_SINGLES = self.NORM_FRB - self.NORM_REPS
# print("NORM_SINGLES set to NORM_FRB - NORM_REPS = " + str(self.NORM_SINGLES))
# Case all 3 are set
else:
# Common sense check
if self.NORM_FRB != self.NORM_REPS + self.NORM_SINGLES:
raise ValueError("NORM_FRB != NORM_REPS + NORM_SINGLES")
# Initialise repeater zs
self.init_zs_reps()
[docs]
def get_internal_coeffs(self,wlist):
"""
Returns indices and coefficients for linear interpolation between
intrinsic width values
wlist: np.ndarray of observed intrinsic widths or taus
Returns:
iws1 (int): index of lower value
iws2 (int): index of upper value
dkws1 (float): coefficient for iws1
dkws2 (float): coefficient for iws2
"""
# convert to log-widths - the bins are in log10 space
logwlist = np.log10(wlist)
dinternal = self.internal_logwvals[1]-self.internal_logwvals[0]
kws=(logwlist-self.internal_logwvals[0])/dinternal
Bin0 = np.where(kws < 0.)[0]
kws[Bin0] = 0.
iws1=kws.astype('int')
iws2=iws1+1
dkws2=kws-iws1 # applies to izs2
dkws1 = 1. - dkws2
# checks for values which are too large
toobigw = np.where(logwlist > self.internal_logwvals[-1])[0]
if len(toobigw) > 0:
raise ValueError("Width value ",wlist[toobigw],
" too large for max internal log value of ",10**self.internal_logwvals[-1])
return iws1, iws2, dkws1, dkws2
[docs]
def init_frb_bvals(self):
"""
Initialise frb-by-frb weights for each value of the beam histogram, based on
the value of the beam that the FRB was detected in.
Simply does this by linear interpolation
"""
# contains beam-dependent weights for each FRB
frb_bweights = np.zeros([self.NFRB,self.meta["NBINS"]])
lbs = np.log(self.beam_b)
for i,B in enumerate(self.frbs["B"]):
if B == -1: # code for "ignore it"
# still have to decide what to do here. Likely give equal weights?
frb_bweights[i,:] = 1./self.meta["NBINS"]
elif B > self.beam_b[-1]: # greater value than max, just use max
frb_bweights[i,-1] = 1.
elif B < self.beam_b[0]:
frb_bweights[i,0] = 1.
else:
# at least one value of beam_b will be greater and one lesser than B
iB2 = np.where(self.beam_b > B)[0][0]
iB1 = iB2 - 1
# do log-scaling
lB = np.log(B)
kB2 = (lB- lbs[iB1])/(lbs[iB2]-lbs[iB1])
kB1 = 1.-kB2
frb_bweights[i,iB1] = kB1
frb_bweights[i,iB2] = kB2
self.frb_bweights = frb_bweights
# speedups when iterating through 1D and 2D likelihoods
self.frb_zbweights = frb_bweights[self.zlist,:]
self.frb_nozbweights = frb_bweights[self.nozlist,:]
[docs]
def init_frb_wvals(self):
"""
Initialises frb width coefficients for linear interpolation
This is for a slightly different purpose than the init widths routine
"""
#wlist = survey.WIDTHs # measured total widths
nw = self.wlist.size
frb_wweights = np.zeros([self.NFRB,nw])
OKw = np.where(self.WIDTHs > 0.)
notOKw = np.where(self.WIDTHs <= 0.)
# equal weights for all FRBs with no measured width
frb_wweights[notOKw,:] = 1./nw
# iterature through the list
iws1,iws2,dkws1,dkws2 = self.get_w_coeffs(self.WIDTHs[OKw])
frb_wweights[OKw,iws1] = dkws1
frb_wweights[OKw,iws2] = dkws2
self.frb_wweights = frb_wweights
# speedups when iterating through 1D and 2D likelihoods
self.frb_zwweights = self.frb_wweights[self.zlist,:]
self.frb_nozwweights = self.frb_wweights[self.nozlist,:]
[docs]
def get_w_coeffs(self,wlist):
"""
Returns indices and coefficients for linear interpolation between width values
Bin edges run from [small~1e-10, self.WMin, self.WMin + self.dlowg, ..., self.Wmax]
We should use bin centres to determine which linear interpolations we sit between
wlist: np.ndarray of observed widths
Returns:
iws1 (int): index of lower value
iws2 (int): index of upper value
dkws1 (float): coefficient for iws1
dkws2 (float): coefficient for iws2
"""
# convert to log-widths - the bins are in log10 space
logwlist = np.log10(wlist)
if self.NWbins == 1:
# only when there is a single width bin
nfrb = logwlist.size
iws1 = np.full([nfrb],0,dtype='int')
iws2 = iws1
dkws1 = np.full([nfrb],1.,dtype='float')
dkws2 = dkws1
# dkws2 is identical to 1. This over-writes 1, but ensures
# that order of implementation of 1 and 2 does not matter
return iws1, iws2, dkws1, dkws2
# the below assumes that
kws=(logwlist-np.log10(self.WMin))/self.dlogw # now will assume it begins at Wmin+dlogw + 0.5
# forces any low values to zero
Bin0 = np.where(kws < 0.)[0]
kws[Bin0] = 0.
iws1=kws.astype('int')
iws2=iws1+1
dkws2=kws-iws1 # applies to izs2
dkws1 = 1. - dkws2
# in case iws1 is in the largets bin, then
# iws2 will be too large
toobig1 = np.where(iws2 >= self.wlist.size)[0]
iws2[toobig1]=self.wlist.size-1
# checks for values which are too large
toobigw = np.where(wlist > self.WMax)[0]
if len(toobigw) > 0:
raise ValueError("Width value ",wlist[toobigw],
" too large for Wmax of ",self.WMax)
return iws1, iws2, dkws1, dkws2
[docs]
def randomise_DMG(self, uDMG=0.5):
""" Change the DMG_ISM values to a random value within uDMG Gaussian uncertainty """
new_DMGs = np.random.normal(self.DMGs, uDMG*self.DMGs)
neg = np.where(new_DMGs < 0)[0]
while len(neg) != 0:
new_DMGs[neg] = np.random.normal(self.DMGs[neg], uDMG*self.DMGs[neg])
neg = np.where(new_DMGs < 0)[0]
self.DMGs = new_DMGs
[docs]
def init_DMEG(self,DMhalo,halo_method=0):
""" Calculates extragalactic DMs assuming halo DM """
self.DMhalo=DMhalo
self.process_dmhalo(halo_method)
self.DMEGs=self.DMs-self.DMGs - self.DMhalos
[docs]
def process_dmhalo(self, halo_method):
"""
Calculates directionally dependent DMhalo from Yamasaki and Totani 2020
and rescaling to an average of self.DMhalo
self.c, self.Gls and self.Gbs should be loaded in process_survey_file
"""
# Constant halo
if halo_method == 0:
self.DMhalos = np.ones(self.DMs.shape) * self.DMhalo
# self.DMGals = self.DMhalos + self.DMGs
# Yamasaki and Totani 2020
elif halo_method == 1:
no_coords = np.where(self.Gls == 1.0)[0]
# if np.any(np.isnan(self.RA[no_coords])) or np.any(np.isnan(self.Dec[no_coords])):
# raise ValueError('Galactic coordinates must be set if using directional dependence')
if len(no_coords) != 0:
for i in no_coords:
coords = SkyCoord(ra=self.RA[i], dec=self.Dec[i], frame='icrs', unit="deg")
self.Gls[i] = coords.galactic.l.value
self.Gbs[i] = coords.galactic.b.value
if np.any(self.Gls == 1.0) or np.any(self.Gbs == 1.0):
raise ValueError('Galactic coordinates must be set if using directional dependence')
for i in range(len(self.Gls)):
if self.Gls[i] > 180.0:
self.Gls[i] = 360.0 - self.Gls[i]
# Convert to rads
self.Gls = self.Gls * np.pi / 180
self.Gbs = self.Gbs * np.pi / 180
# Evaluate each one
self.DMhalos = np.zeros(self.DMs.shape, dtype='float')
for i in range(8):
for j in range(8-i):
self.DMhalos = self.DMhalos + self.c[i][j] * np.abs(self.Gls)**i * np.abs(self.Gbs)**j
self.DMhalos = self.DMhalos * self.DMhalo / 43
# self.DMGals = self.DMhalos + self.DMGs
# Sanskriti et al. 2020
elif halo_method == 2:
no_coords = np.where(self.Gls == 1.0)[0]
# if np.any(np.isnan(self.XRA[no_coords])) or np.any(np.isnan(self.Dec[no_coords])):
# raise ValueError('Galactic coordinates must be set if using directional dependence')
if len(no_coords) != 0:
for i in no_coords:
coords = SkyCoord(ra=self.RA[i], dec=self.Dec[i], frame='icrs', unit="deg")
self.Gls[i] = coords.galactic.l.value
self.Gbs[i] = coords.galactic.b.value
if np.any(self.Gls == 1.0) or np.any(self.Gbs == 1.0):
raise ValueError('Galactic coordinates must be set if using directional dependence')
self.DMhalos = np.zeros(len(self.frbs))
self.DMhalo = 0
# self.DMGals = np.zeros(len(self.frbs))
self.DMG_el = np.zeros(len(self.frbs))
self.DMG_eu = np.zeros(len(self.frbs))
DMMWs = np.zeros(len(self.frbs))
for i, (Gl, Gb) in enumerate(zip(self.Gls, self.Gbs)):
DMMWs[i], self.DMG_el[i], self.DMG_eu[i] = galactic_dm_models.dmg_sanskriti2020(Gl, Gb)
self.DMhalos = DMMWs - self.DMGs
self.DMhalo = np.median(self.DMhalos)
print(self.DMhalos)
# self.DMGal = np.median(self.DMGals)
[docs]
def init_halo_coeffs(self):
"""
Initialise coefficients for Yamasaki and Totani 2020 implementation of
directionally dependent DMhalo
"""
self.c = [
[250.12, -871.06, 1877.5, -2553.0, 2181.3, -1127.5, 321.72, -38.905],
[-154.82, 783.43, -1593.9, 1727.6, -1046.5, 332.09, -42.815],
[-116.72, -76.815, 428.49, -419.00, 174.60, -27.610],
[216.67, -193.30, 12.234, 32.145, -8.3602],
[-129.95, 103.80, -22.800, 0.44171],
[39.652, -21.398, 2.7694],
[-6.1926, 1.6162,],
[0.39346]
]
[docs]
def init_zs(self):
"""
Gets zlist and nozlist and determines which z values to use
"""
# Ignore redshifts above MAX_LOC_DMEG
self.min_noz = self.meta["MAX_LOC_DMEG"]
# Ignore redshifts above the minimum unlocalised DM if MAX_LOC_DMEG==0
if self.min_noz == 0:
nozlist = np.where((self.frbs["Z"] == -1.) | (self.frbs["Z"] is None))[0]
if len(nozlist != 0):
self.min_noz = np.min(self.DMEGs[nozlist] + self.DMhalos[nozlist])
imin = np.where(self.DMEGs + self.DMhalos == self.min_noz)[0][0]
# Do not get rid of redshifts if MAX_LOC_DMEG==-1
if self.min_noz > 0:
high_dm = np.where(self.DMEGs + self.DMhalos > self.min_noz)[0]
self.ignored_Zs = self.frbs["Z"].values[high_dm]
self.ignored_Zlist = high_dm[self.ignored_Zs > 0]
self.ignored_Zs = self.ignored_Zs[self.ignored_Zs > 0]
self.frbs["Z"].values[high_dm] = -1.0
print("Ignoring redshifts with DMEG >", str(self.min_noz))
else:
self.ignored_Zs = []
self.ignored_Zlist = []
# Pandas resolves None to Nan
if len(self.frbs["Z"])>0:
self.Zs=np.array(self.frbs["Z"].values).astype('float')
# checks for any redhsifts identically equal to zero
#exactly zero can be bad... only happens in MC generation
# 0.001 is chosen as smallest redshift in original fit
zeroz = np.where(self.Zs == 0.)[0]
if len(zeroz) >0:
self.Zs[zeroz]=0.001
# checks to see if there are any FRBs which are localised
self.zlist = np.where(self.Zs > 0.)[0]
if len(self.zlist) < self.NFRB:
self.nozlist = np.where(self.Zs < 0.)[0]
if len(self.nozlist) == len(self.Zs):
self.nD=1 # they all had -1 as their redshift!
self.zlist=None
else:
self.nD=3 # code for both
else:
self.nozlist = None
self.nD=2
else:
self.nD=1
self.Zs=None
self.nozlist=np.arange(self.NFRB)
self.zlist=None
[docs]
def init_zs_reps(self):
"""
Gets zlist and nozlist for repeaters and singles. Basically the same as init_zs but for repeaters.
"""
# Case of no repeaters detected
if len(self.replist) == 0:
self.nDr = 1
self.zreps = None
self.nozreps = np.arange(0)
# This also accounts for the case of no FRBs at all
self.nDs = self.nD
self.zsingles = self.zlist
self.nozsingles = self.nozlist
# Case of no singles (all repeaters)
elif len(self.singleslist) == 0:
self.nDs = 1
self.zsingles = None
self.nozsingles = np.arange(0)
self.nDr = self.nD
self.zreps = self.zlist
self.nozreps = self.nozlist
# Case of some singles and some repeaters
else:
if self.nD == 1:
self.zreps = None
self.zsingles = None
self.nozreps = np.array([i for i in self.replist if i in self.nozlist])
self.nozsingles = np.array([i for i in self.singleslist if i in self.nozlist])
self.nDr = 1
self.nDs = 1
elif self.nD == 2:
self.zreps = np.array([i for i in self.replist if i in self.zlist])
self.zsingles = np.array([i for i in self.singleslist if i in self.zlist])
self.nozreps = None
self.nozsingles = None
self.nDr = 2
self.nDs = 2
else:
self.nozreps = np.array([i for i in self.replist if i in self.nozlist])
self.zreps = np.array([i for i in self.replist if i in self.zlist])
self.nozsingles = np.array([i for i in self.singleslist if i in self.nozlist])
self.zsingles = np.array([i for i in self.singleslist if i in self.zlist])
# Repeater dimensions
if len(self.nozreps) == 0:
self.nozreps = None
self.nDr = 2
elif len(self.zreps) == 0:
self.zreps = None
self.nDr = 1
else:
self.nDr = 3
# Singles dimensions
if len(self.nozsingles) == 0:
self.nozsingles = None
self.nDs = 2
elif len(self.zsingles) == 0:
self.zsingles = None
self.nDs = 1
else:
self.nDs = 3
# initialise rep-dependent beam and width weights
self.frb_zbweights_singles = self.frb_bweights[self.zsingles,:]
self.frb_zbweights_reps = self.frb_bweights[self.zreps,:]
self.frb_nozbweights_singles = self.frb_bweights[self.nozsingles,:]
self.frb_nozbweights_reps = self.frb_bweights[self.nozreps,:]
self.frb_zwweights_singles = self.frb_wweights[self.zsingles,:]
self.frb_zwweights_reps = self.frb_wweights[self.zreps,:]
self.frb_nozwweights_singles = self.frb_wweights[self.nozsingles,:]
self.frb_nozwweights_reps = self.frb_wweights[self.nozreps,:]
[docs]
def process_survey_file(self,filename:str,
NFRB:int=None,
iFRB:int=0,
min_lat=None,
dmg_cut=None,
survey_dict = None):
""" Loads a survey file, then creates
dictionaries of the loaded variables
Args:
filename (str): Survey filename
NFRB (int, optional): Use only a subset of the FRBs in the Survey file.
Mainly used for Monte Carlo analysis
iFRB (int, optional): Start grabbing FRBs at this index
Mainly used for Monte Carlo analysis
Requires that NFRB be set
surveydict: overrides value in file
"""
self.iFRB = iFRB
self.NFRB = NFRB
self.meta = {}
# Read
frb_tbl = Table.read(filename, format='ascii.ecsv')
# Survey Data
self.survey_data = survey_data.SurveyData.from_jsonstr(
frb_tbl.meta['survey_data'])
# Meta -- for convenience for now;Â best to migrate away from this
default_telescope = survey_data.Telescope()
for key in self.survey_data.params:
DC = self.survey_data.params[key]
if DC == "telescope":
value = getattr(self.survey_data[DC],key)
if value == getattr(default_telescope,key):
# using default value - check if the FRBs have this
if key in frb_tbl.columns:
value = np.mean(frb_tbl[key])
self.meta[key] = value
else:
self.meta[key] = getattr(self.survey_data[DC],key)
# Get default values from default frb data
default_frb = survey_data.FRB()
# we now populate missing fields with the default values
for field in fields(default_frb):\
# checks to see if this is a field in metadata: if so, takes priority
if survey_dict is not None and field.name in survey_dict.keys():
default_value = survey_dict[field.name]
elif field.name in self.meta.keys():
default_value = self.meta[field.name]
else:
default_value = getattr(default_frb, field.name)
# now checks for missing data, fills with the default value
if field.name in frb_tbl.columns:
# iterate over fields, checking if they are populated.
# only replaces values that are []
for i,val in enumerate(frb_tbl[field.name]):
if isinstance(val,np.ma.core.MaskedArray):
frb_tbl[field.name][i] = default_value
else:
#default_value = getattr(default_frb, field.name)
frb_tbl[field.name] = default_value
print("WARNING: no ",field.name," found in survey",
"replacing with default value of ",default_value)
self.frbs = frb_tbl.to_pandas()
# Cut down?
# NFRB
if self.NFRB is not None:
self.NFRB=min(len(self.frbs), NFRB)
if self.NFRB < NFRB+iFRB:
raise ValueError("Cannot return sufficient FRBs, did you mean NFRB=None?")
# Not sure the following linematters given the Error above
themax = max(NFRB+iFRB,self.NFRB)
self.frbs=self.frbs[iFRB:themax]
# fills in missing coordinates if possible
# also converts RA and Dec strings to floats
self.fix_coordinates(verbose=False)
# Min latitude
if min_lat is not None and min_lat > 0.0:
tot = len(self.frbs)
mask = [(Gb is None) or (np.abs(Gb) > min_lat) for Gb in self.frbs['Gb'].values]
self.frbs = self.frbs[mask]
included = len(self.frbs)
print("Using minimum galactic latitude of " + str(min_lat) + ". Excluding " + str(tot - included) + " FRBs")
# Max DM
if dmg_cut is not None:
tot = len(self.frbs)
self.frbs = self.frbs[np.abs(self.frbs['DMG'].values) < dmg_cut]
included = len(self.frbs)
print("Using maximum DMG of " + str(dmg_cut) + ". Excluding " + str(tot - included) + " FRBs")
# Get new number of FRBs
self.NFRB = len(self.frbs)
# Vet
vet_frb_table(self.frbs, mandatory=True)
print("Loaded FRB info")
if len(self.frbs) > 0:
# first, replacing missing values with survey values
# only do this for values which are otherwise missing!
# replace default values with observed media values
# it's unclear if median or mean is the best here
keylist = ['SNRTHRESH','THRESH','BW','FBAR','FRES','TRES','WIDTH','DMG']
for key in keylist:
if not key in self.meta:
self.meta[key] = np.median(self.frbs[key])
#self.meta['SNRTHRESH'] = np.median(self.frbs['SNRTHRESH'])
#self.meta['THRESH'] = np.median(self.frbs['THRESH'])
#self.meta['BW'] = np.median(self.frbs['BW'])
#self.meta['FBAR'] = np.median(self.frbs['FBAR'])
#self.meta['FRES'] = np.median(self.frbs['FRES'])
#self.meta['TRES'] = np.median(self.frbs['TRES'])
#self.meta['WIDTH'] = np.median(self.frbs['WIDTH'])
#self.meta['DMG'] = np.mean(self.frbs['DMG'])
# over-rides survey data if applicable
if survey_dict is not None:
for key in survey_dict:
self.meta[key] = survey_dict[key]
print(self.meta[key], "overidden by ",survey_dict[key])
### processes galactic contributions
self.process_dmg()
### get pointers to correct results ,for better access
self.DMs=self.frbs['DM'].values
self.DMGs=self.frbs['DMG'].values
self.SNRs=self.frbs['SNR'].values
self.WIDTHs=self.frbs['WIDTH'].values
self.TRESs=self.frbs['TRES'].values
self.FRESs=self.frbs['FRES'].values
self.FBARs=self.frbs['FBAR'].values
self.BWs=self.frbs['BW'].values
self.THRESHs=self.frbs['THRESH'].values
self.SNRTHRESHs=self.frbs['SNRTHRESH'].values
self.Ss=self.SNRs/self.SNRTHRESHs
self.TOBS=self.meta['TOBS']
self.NORM_FRB=self.meta['NORM_FRB']
self.Gbs=self.frbs['Gb'].values
self.Gls=self.frbs['Gl'].values
# calculates intrinsic widths
# Uses the model of James et al 2025
# assumes we have S/N maximising widths
# if scattering dominates total width, expect tau = 0.816 w
tscale = 1.225 # scale scattering time to total width at +- 1 sigma
TEMP = self.frbs['WIDTH'].values**2 - (tscale*self.frbs['TAU'].values)**2
self.OKTAU = np.where(self.frbs['TAU'].values != -1.)[0] # code for non-existent
toolow = np.where(TEMP <= 0.)
TEMP[toolow] = 0.01*self.frbs['TAU'].values[toolow]**2 # 10% of scattering width
iwidths = TEMP**0.5 # scale to SNR max width assuming Gaussian shape
self.IWIDTHs = iwidths
self.TAUs = self.frbs['TAU'].values
# sets the 'beam' values to unity by default
self.beam_b=np.array([1])
self.beam_o=np.array([1])
self.NBEAMS=1
# checks for incorrectSNR values
toolow = np.where(self.Ss < 1.)[0]
if len(toolow) > 0:
raise ValueError("FRBs ",toolow," have SNR < SNRTHRESH!!! Please correct this. Exiting...")
print("FRB survey sucessfully initialised with ",self.NFRB," FRBs starting from", self.iFRB)
[docs]
def fix_coordinates(self,verbose=False):
"""
Takes and FRB, and fills out missing coordinate values
Note that now, RA, DEC, Gl, and Gb will be present
But their default values are None
"""
# converts to float if in string. Will do nothing if None
if isinstance(self.frbs['RA'][0],str):
RAs = np.zeros([len(self.frbs['RA'])])
for i,RA in enumerate(self.frbs['RA']):
RAs[i] = misc_functions.coord_string_to_deg(self.frbs['RA'][i],hr=True)
self.frbs['RA'] = RAs
if isinstance(self.frbs['DEC'][0],str):
DECs = np.zeros([len(self.frbs['DEC'])])
for i,DEC in enumerate(self.frbs['DEC']):
DECs[i] = misc_functions.coord_string_to_deg(self.frbs['DEC'][i],hr=False)
self.frbs['DEC'] = DECs
if isinstance(self.frbs['Gb'][0],str):
Gbs = np.zeros([len(self.frbs['Gb'])])
for i,Gb in enumerate(self.frbs['Gb']):
Gbs[i] = misc_functions.coord_string_to_deg(self.frbs['Gb'][i],hr=True)
self.frbs['Gb'] = Gbs
if isinstance(self.frbs['Gl'][0],str):
Gls = np.zeros([len(self.frbs['Gl'])])
for i,Gl in enumerate(self.frbs['Gl']):
Gls[i] = misc_functions.coord_string_to_deg(self.frbs['Gl'][i],hr=False)
self.frbs['Gl'] = Gls
for i,gl in enumerate(self.frbs['Gl']):
if gl is None or self.frbs['Gb'][i] is None:
# test RA
if self.frbs['RA'][i] is None or self.frbs['DEC'][i] is None:
if verbose:
print("WARNING: no coordinates calculable for FRB ",i)
else:
Gb,Gl = misc_functions.j2000_to_galactic(self.frbs['RA'][i], self.frbs['DEC'][i])
self.frbs[i,'Gb'] = Gb
self.frbs[i,'Gl'] = Gl
elif self.frbs['RA'][i] is None or self.frbs['DEC'][i] is None:
RA,Dec = misc_functions.galactic_to_j2000(self.frbs['Gl'][i], self.frbs['Gb'][i])
self.frbs[i,'RA'] = RA
self.frbs[i,'DEC'] = Dec
[docs]
def process_dmg(self):
""" Estimates galactic DM according to
Galactic lat and lon only if not otherwise provided
"""
if len(self.frbs["TNS"].values) != 0 and not np.isfinite(self.frbs["DMG"].values[0]):
print("Checking Gl and Gb")
if np.isfinite(self.frbs["Gl"].values[0]) and np.isfinite(self.frbs["Gb"].values[0]):
raise ValueError('Can not estimate Galactic contributions.\
Please enter Galactic coordinates, or else manually enter \
it as DMG')
print("Calculating DMG from NE2001. Please record this, it takes a while!")
ne = density.ElectronDensity()
DMGs=np.zeros([self.NFRB])
for i,l in enumerate(self.frbs["Gl"]):
b=self.frbs["Gb"][i]
ismDM = ne.DM(l, b, 100.)
print(i,l,b,ismDM)
DMGs=np.array(DMGs)
self.frbs["DMG"]=DMGs
self.DMGs=DMGs
[docs]
def init_beam(self,plot=False,
method=1,thresh=1e-3):
""" Initialises the beam """
# Gaussian beam if method == 0
if method==0:
b,omegab=beams.gauss_beam(thresh=thresh,
nbins=self.meta["NBINS"],
freq=self.meta["FBAR"],D=self.meta["DIAM"])
self.beam_b=b
self.beam_o=omegab*self.meta["NBEAMS"]
self.orig_beam_b=self.beam_b
self.orig_beam_o=self.beam_o
elif self.meta["BEAM"] is not None:
logb,omegab=beams.load_beam(self.meta["BEAM"])
self.orig_beam_b=10**logb
self.orig_beam_o=omegab
if plot:
savename='Plots/Beams/'+self.name+'_'+self.meta["BEAM"]+'_'+str(method)+'_'+str(thresh)+'_beam.pdf'
else:
savename=None
b2,o2=beams.simplify_beam(logb,omegab,self.meta["NBINS"],
savename=savename,method=method,thresh=thresh)
# there is a chance that this method alters the expected number of bins. Reset it!~
self.meta["NBINS"] = len(o2)
self.beam_b=b2
self.beam_o=o2
self.do_beam=True
# sets the 'beam' values to unity by default
self.NBEAMS=b2.size
else:
print("No beam found to initialise...")
[docs]
def calc_max_dm(self):
'''
Calculates the maximum searched DM.
Calculates bandwidth using
'''
fbar=self.meta['FBAR']
t_res=self.meta['TRES']
nu_res=self.meta['FRES']
max_idt=self.meta['MAX_IDT']
max_dm=self.meta['MAX_DM']
if max_dm is None and max_idt is not None:
k_DM=4.149 #ms GHz^2 pc^-1 cm^3
#f_low = fbar - (Nchan/2. - 1)*nu_res
#f_high = fbar + (Nchan/2. - 1)*nu_res
f_low = fbar - self.meta['BW']/2. # bottom of lowest band
f_high = fbar + self.meta['BW']/2. # top of highest band
max_dt = t_res * max_idt
max_dm = max_dt / (k_DM * ((f_low/1e3)**(-2) - (f_high/1e3)**(-2)))
self.max_dm = max_dm
# calculates max iDM value
if self.max_dm is not None:
max_dmeg = max_dm - np.median(self.DMhalos + self.DMGs)
max_idm = np.where(self.dmvals < max_dmeg)[0][-1]
self.max_idm = max_idm
self.max_dmeg = max_dmeg
else:
self.max_idm = None
self.max_dmeg = None
[docs]
def get_efficiency_from_wlist(self,wlist,plist,
model="Quadrature",
addGalacticDM=True,
edir=None, iz=None):
""" Gets efficiency to FRBs
Returns a list of relative efficiencies
as a function of dispersion measure for each width given in wlist
wlist:
list of intrinsic FRB widths
plist:
list of relative probabilities for FRBs to have widths of wlist
model: method of estimating efficiency as function of width, DM, and time resolution
Takes values of "Quadrature", "Sammons" (from Mawson Sammons summer project),
"CHIME" or a file name
addGalacticDM:
- True: this routine adds in contributions from the MW Halo and ISM, i.e.
it acts like DMlist is an extragalactic DM
- False: just use the supplied DMlist
edir:
- Directory where efficiency files are contained. Only relevant if specific FRB responses are used
iz:
- izth z-bin where these efficiencies are being calculated
"""
DMlist = self.dmvals
efficiencies=np.zeros([wlist.size,DMlist.size])
if addGalacticDM:
# toAdd = self.DMhalo + self.meta['DMG']
toAdd = np.median(self.DMhalos + self.DMGs)
# toAdd = self.DMGal
else:
toAdd = 0.
for i,w in enumerate(wlist):
efficiencies[i,:]=calc_relative_sensitivity(
None,DMlist+toAdd,w,
self.meta['FBAR'],
self.meta['TRES'],
self.meta['FRES'],
max_idt=self.meta['MAX_IDT'],
max_dm=self.meta['MAX_DM'],
model=model,
dsmear=False,
edir=edir,
max_iw=self.meta['MAX_IW'],
max_meth = self.meta['MAXWMETH'])
# keep an internal record of this
if iz is None:
self.efficiencies=efficiencies
self.DMlist=DMlist
mean_efficiencies=np.mean(efficiencies,axis=0)
self.mean_efficiencies=mean_efficiencies #be careful here!!! This may not be what we want!
else:
self.efficiencies[:,iz,:]=efficiencies
self.DMlist[iz,:]=DMlist
mean_efficiencies=np.mean(efficiencies,axis=0)
self.mean_efficiencies[iz,:]=mean_efficiencies #be careful here!!! This may not be what we want!
return efficiencies
[docs]
def __repr__(self):
""" Over-ride print representation
Returns:
str: Items of the FURBY
"""
repr = '<{:s}: \n'.format(self.__class__.__name__)
repr += f'name={self.name}'
return repr
# implements something like Mawson's formula for sensitivity
# t_res in ms
[docs]
def calc_relative_sensitivity(DM_frb,DM,w,fbar,t_res,nu_res,Nchan=336,max_idt=None,
max_dm=None,model='Quadrature',dsmear=True,edir=None,max_iw=None,
max_meth = 0):
""" Calculates DM-dependent sensitivity
This function adjusts sensitivity to a given burst as a function of DM.
It includes DM smearing between channels,
burst intrinsic width,
the observation frequency
time- and frequency-resolutions etc.
NOTE: DM_frb *only* used if dsmear = True: combine these to default to None?
Arguments:
DM_frb [float]: measured DM of a particular FRB. Used only if dsmear=True.
DMs [np.ndarray] DMs at which to calculate the DM bias effect. pc/cm3
w [float]: FRB width [ms]
fbar: mean frequency of the observation [Mhz]
t_res: time resolution of the observation [ms]
nu_res: frequency resolution of the observation [Mhz]
model: Quadrature,Sammons, or CHIME: method to calculate bias.
NOTE: Quadrature_s and Sammons_s should be input to this function as
just Quadrature and Sammons respectively
dsmear: subtract DM smearing from measured width to calculate intrinsic
edir [string, optional]: directory containing efficiency files to be loaded
max_iw [int, optional]: maximum integer width of the search
maxmeth [int, optional]:
0: ignore maximum width
1: truncate sensitivity at maximum width
2: scale sensitivity as 1/w at maximum width
"""
global MIN_THRESH
# this model returns the parameterised CHIME DM-dependent sensitivity
# it is independent of width
if model=='CHIME':
# polynomial coefficients for fit to CHIME DM bias data (4th order poly)
coeffs = np.array([ 7.79309074e-03, -2.09210057e-01, 1.93122752e+00,
-7.05813760e+00, 8.93355593e+00])
# this constant normalises the above to a peak efficiency of 100%
coeffs /= 1.118694423940629
# fit is to natural log of DM values
ldm = np.log(DM)
rate = np.polyval(coeffs,ldm)
# scale rate by assumed Cartesian logN-logS
sensitivity = rate**(2./3.)
# calculates relative sensitivity to bursts as a function of DM
# Check for Quadrature and Sammons
elif model == 'Quadrature' or model == 'Sammons' or model=="StdDev":
# constant of DM
k_DM=4.149 #ms GHz^2 pc^-1 cm^3
# total smearing factor within a channel
dm_smearing=2*(nu_res/1.e3)*k_DM*DM/(fbar/1e3)**3 #smearing factor of FRB in the band
# this assumes that what we see are measured widths including all the smearing factors
# hence we must first adjust for this prior to estimating the DM-dependence
# for this we use the *true* DM at which the FRB was observed
if dsmear==True:
# width is the total width
measured_dm_smearing=2*(nu_res/1.e3)*k_DM*DM_frb/(fbar/1e3)**3 #smearing factor of FRB in the band
if model == "StdDev":
uw = w**2 - dm_smearing**2/3. - t_res**2/3.
else:
uw = w**2-measured_dm_smearing**2-t_res**2 # uses the quadrature model to calculate intrinsic width uw
if uw < 0:
uw=1e-2 # replace this with some fraction of minimum width?
else:
uw=uw**0.5
else:
# w represents the intrinsic width
uw=w
if model == "StdDev":
# 2* to be +- one standard deviation. But we're now fixing Tres to be unity
totalw = (uw**2 + dm_smearing**2/3. + t_res**2)**0.5
nosmearw = (uw**2 + t_res**2)**0.5
else:
totalw = (uw**2 + dm_smearing**2 + t_res**2)**0.5
nosmearw = (uw**2 + t_res**2)**0.5
# calculates relative sensitivity to bursts as a function of DM
if model=='Quadrature' or model=="StdDev":
sensitivity=totalw**-0.5
elif model=='Sammons':
sensitivity=0.75*(0.93*dm_smearing + uw + 0.35*t_res)**-0.5
# implements max integer width cut.
if max_meth != 0 and max_iw is not None:
max_w = t_res*(max_iw+0.5)
if max_meth == 1 or max_meth == 2:
# NOTE: for CRAFT, dm smearing already accounted for prior to width search
# this means that the smearing cut is DM independent
if nosmearw > max_w:
toolong = np.arange(dm_smearing.size)
else:
toolong = []
#toolong = np.where(nosmearw > max_w)[0]
elif max_meth == 3 or max_meth == 4:
# NOTE: for CRAFT, dm smearing already accounted for prior to width search
toolong = np.where(totalw > max_w)[0]
if max_meth == 1 or max_meth == 3:
sensitivity[toolong] = MIN_THRESH # something close to zero
elif max_meth == 2 or max_meth == 4:
# we have already reduced it by \sqrt{t}
# we thus add a further sqrt{t} factor
sensitivity[toolong] *= (max_w / totalw[toolong])**0.5
# If model not CHIME, Quadrature or Sammons assume it is a filename
else:
if edir is None:
edir = resources.files('zdm').joinpath('data/Efficiencies')
filename = os.path.expanduser(os.path.join(edir, model + ".npy"))
if not os.path.exists(filename):
raise ValueError("Model is not CHIME, Quadrature or Sammons and hence is expected to be the name of a file containing the efficiencies but " + filename + " does not exist.")
# Should contain DM in the first row and efficiencies in the second row
sensitivity_array = np.load(filename)
sensitivity = np.interp(DM, sensitivity_array[0,:], sensitivity_array[1,:], right=1e-2)
return sensitivity
[docs]
def load_survey(survey_name:str, state:parameters.State,
dmvals:np.ndarray,
zvals:np.ndarray=None,
sdir:str=None, NFRB:int=None,
nbins=None, iFRB:int=0,
dummy=False,
edir=None,
rand_DMG=False,
survey_dict = None,
verbose=False):
"""Load a survey
Args:
survey_name (str): Name of the survey
e.g. CRAFT/FE
state (parameters.State): Parameters for the state
dmvals (np.ndarray): DM values
zvals (np.ndarray,optional): z values
sdir (str, optional): Path to survey files. Defaults to None.
nbins (int, optional): Sets number of bins for Beam analysis
[was NBeams]
NFRB (int, optional): Cut the total survey down to a random
subset [useful for testing]
iFRB (int, optional): Start grabbing FRBs at this index
Mainly used for Monte Carlo analysis
Requires that NFRB be set
dummy (bool,optional)
Skip many initialisation steps: used only when loading
survey parameters for conversion to the new survey format
survey_dict (dict, optional): dictionary of survey metadata to
over-ride values in file
verbose (bool): print output
Raises:
IOError: [description]
Returns:
Survey: instance of the class
"""
if verbose:
print(f"Loading survey: {survey_name}")
if sdir is None:
sdir = resources.files('zdm').joinpath('data/Surveys')
# Hard code real surveys
if survey_name == 'CRAFT/FE':
dfile = 'CRAFT_class_I_and_II'
elif survey_name == 'CRAFT/ICS':
dfile = 'CRAFT_ICS'
elif survey_name == 'CRAFT/ICS892':
dfile = 'CRAFT_ICS_892'
elif survey_name == 'CRAFT/ICS1632':
dfile = 'CRAFT_ICS_1632'
elif survey_name == 'PKS/Mb':
dfile = 'parkes_mb_class_I_and_II'
elif 'private' in survey_name:
dfile = survey_name
else:
dfile = survey_name
# allows a user to input the .ecsv themselves
if dfile[-6:] != ".ecsv":
dfile += '.ecsv'
print(f"Loading survey: {survey_name} from {dfile}")
# Do it
srvy = Survey(state,
survey_name,
os.path.join(sdir, dfile),
dmvals,
zvals=zvals,
NFRB=NFRB, iFRB=iFRB,
edir=edir, rand_DMG=rand_DMG,
survey_dict = survey_dict)
return srvy
[docs]
def vet_frb_table(frb_tbl:pandas.DataFrame,
mandatory:bool=False,
fill:bool=False):
"""
This should not be necessary anymore, since
all required FRB data should be populated with
default values. However, it's great as a check. If
this complains, it means we have a bug in the
replacement with default value procedure.
"""
frb_data = survey_data.FRB()
# Loop on the standard fields
for field in frb_data.__dataclass_fields__.keys():
if field in frb_tbl.keys():
not_none = frb_tbl[field].values != None
if np.any(not_none):
idx0 = np.where(not_none)[0][0]
assert isinstance(
frb_tbl.iloc[idx0][field],
frb_data.__dataclass_fields__[field].type), \
f'Bad data type for {field}'
elif mandatory:
raise ValueError(f'{field} is missing in your table!')
elif fill:
frb_tbl[field] = None
############ We now define some width/scattering functions ############
# These all return p(w) dlogw, and must take as arguments np.log10(widths)
[docs]
def quadrature_convolution(width_function, width_args, scat_function, scat_args,
internal_logvals, bins, backproject = False):
'''
Numerically evaluates the resulting distribution of y=\sqrt{x1^2+x2^2},
where x1 is the width distribution, and x2 is the scattering distribution.
Args:
width_function (float function(float,args)): function to call giving p(logw) dlogw
width_args (*list): arguments to pass to width function
scat_function (float function(float,args)): function to call giving p(logtau) dlogtau
scat_args (*list): arguments to pass to scattering function
internal_vals (np.ndarray): numpy array of length NIbins giving internal
values of log dw to use for internal calculation purposes.
bins (np.ndarray([NBINS+1],dtype='float')): bin edges for final width distribution
backproject (bool, optional): if True, calculates p(tau|totalw) and p(w|totalw)
for this redshift, and returns additional values.
Returns:
hist (np.ndarray): histogram of probability within bins
wfracs (np.ndarray,only if backproject): p(tau|tw)
taufracs (np.ndarray,only if backproject): p(w|tw)
'''
# these need to be normalised by the internal bin width
logbinwidth = internal_logvals[-1] - internal_logvals[-2]
# these functions should *not* be normalsid, since some true distribution
# may extend beyond the range of interest. But it means that the below functions
# absolutely should be correctly normalised
pw = width_function(internal_logvals, *width_args)*logbinwidth
ptau = scat_function(internal_logvals, *scat_args)*logbinwidth
# adds extra bits onto the lowest bin. Does this by integrating in
# log space. Assumes exp(-10) is small enough!
lowest = internal_logvals[0] - logbinwidth/2.
extrapw,err = quad(width_function,lowest-10,lowest,args=width_args)
extraptau,err = quad(scat_function,lowest-10,lowest,args=scat_args)
pw[0] += extrapw
ptau[0] += extraptau
linvals = 10**internal_logvals
# calculate total widths - all done in linear domain
Nbins = bins.size-1
hist = np.zeros([Nbins])
for i,x1 in enumerate(linvals):
totalwidths = (x1**2 + linvals**2)**0.5
probs = pw[i]*ptau
h,b = np.histogram(totalwidths,bins=bins,weights=probs)
hist += h
# calculate p(w) and p(tau) for each w for this z
if backproject:
# generate arrays to hold probabilities, so values of tau and
# w can be fit
wfracs = np.zeros([internal_logvals.size,Nbins])
taufracs = np.zeros([internal_logvals.size,Nbins])
# maps the probabilities as a function of intrinsic
# width to generate a p(w|total_width) and p(tau|total_width)
# note that these are p(observed) values, i.e. after z-correction
# arrays have dimensions(Nw,Ntotal_width) so for each total
# width, we get the probability
for i,x1 in enumerate(linvals):
# total widths corresponding to linvals for tau/iw x1
totalwidths = (x1**2 + linvals**2)**0.5
# ptau is the probability of achieving linvals, so combined
# probability is pw[i]*ptau
probs = pw[i]*ptau
h,b = np.histogram(totalwidths,bins=bins,weights=probs)
wfracs[i,:] = h
# pw is the probability of achieving linvals, so combined
# probability is ptau[i] * pw
probs = ptau[i]*pw
h,b = np.histogram(totalwidths,bins=bins,weights=probs)
taufracs[i,:] = h
# we need p(tau|w). This means sum for a given w must be 1!
wnorm = np.sum(wfracs,axis=0)
# where is p(tau=0 for a given w?)
bad = np.where(wnorm == 0.)[0]
wfracs[:,bad] = 1./internal_logvals.size # if a particular width value has no iw, equalise probability over all iw
wnorm[bad] = 1.
tnorm = np.sum(taufracs,axis=0)
bad = np.where(tnorm == 0.)[0]
taufracs[:,bad] = 1./internal_logvals.size # if a particular width value has no possible tau, equalise probability over all tau
tnorm[bad] = 1.
# normalise probabilities for each intrinsic w
#wfracs = (wfracs.T/wnorm).T
#taufracs = (taufracs.T/tnorm).T
wfracs = wfracs/wnorm
taufracs = taufracs/tnorm
# plot some examples. This code is kept here for internal analysis purposes
if False:
plt.figure()
plt.plot(internal_logvals,ptau,label="Intrinsic p(tau)")
plt.plot(internal_logvals,pw,label="Intrinsic p(w)")
for ib in np.arange(Nbins):
plt.plot(internal_logvals,taufracs[:,ib],label="p(tau) width "+str(ib))
plt.plot(internal_logvals,wfracs[:,ib],label="p(w) width "+str(ib))
plt.plot(np.log10([bins[ib],bins[ib]]),[0,1],linestyle=":",color=plt.gca().lines[-1].get_color())
plt.plot(np.log10([bins[ib+1],bins[ib+1]]),[0,1],linestyle=":",color=plt.gca().lines[-1].get_color())
break
plt.yscale("log")
#plt.xscale("log")
plt.xlabel("log10 Tau [ms]")
plt.ylabel("p(tau |w)")
plt.legend()
plt.tight_layout()
plt.show()
plt.close()
# exit now, to prevent very many such plots being generated
exit()
return hist,wfracs,taufracs
else:
return hist
[docs]
def lognormal(log10w, *args):
"""
Lognormal probability distribution. Note that the x values and args
could theoretically be in natural log (or other) space,
including linear.
Args:
log10w: log base 10 of widths
args: vector of [logmean,logsigma] mean and std dev
Returns:
result: p(logw) d logw
"""
logmean = args[0]
logsigma = args[1]
norm = (2.*np.pi)**-0.5/logsigma
result = norm * np.exp(-0.5 * ((log10w - logmean) / logsigma) ** 2)
return result
[docs]
def halflognormal(log10w, *args):#logmean,logsigma,minw,maxw,nbins):
"""
Generates a parameterised half-lognormal distribution.
This acts as a lognormal in the lower half, but
keeps a constant per-log-bin width in the upper half
There is nor formal way to normalise this function.
It can also be verified that changing the normalisation of these functions
only changes the P(N) calculation, which should in any case be
separately optimised. Note however that this changes the interpretation
of the log-constant, which may be incorrect to within such a normalisation factor.
Args:
log10w: log base 10 of widths
args: vector of [logmean,logsigma] mean and std dev and max value
Returns:
result: p(log10w) d log10w
"""
logmean = args[0]
logsigma = args[1]
logmax = args[2]
norm = (2.*np.pi)**-0.5/logsigma #Currently no normalisation
if hasattr(log10w,"__len__"):
large = np.where(log10w > logmean)[0]
modlogw = np.copy(log10w) # ensures we don't change the original values
modlogw[large] = logmean # subs mean value in for values larger than the mean
else:
if log10w > logmean:
modlogw = logmean
else:
modlogw = log10w
result = lognormal(modlogw,logmean,logsigma)
# normalises the distribution. We note that the lower half
# is correctly normalised to 0.5 via the lognormal function
# the upper half spans the range [logmax-logmean] at amplitude
# norm. Hence, the total integral is
# 0.5 + [logmax-logmean]*norm
result /= (0.5 + (logmax-logmean)*norm)
return result
[docs]
def constant(log10w,*args):
"""
Dummy function that returns a constant of unity, down
to a certain minimum, below which it is zero.
NOTE: to include 1+z scaling here, one will need to
reduce the minimum width argument with z. Feature
to be added. Maybe have args also contain min and max values?
Args:
log10w: log base 10 of widths
args: vector of [logmean,logsigma] mean and std dev and max width
Returns:
result: p(logw) d logw
"""
width = args[2] - args[0]
if hasattr(log10w,"__len__"):
good = np.where(log10w > args[0])[0]
result = np.zeros([log10w.size])
result[good] = 1./width
else:
if log10w < args[0]:
result = np.array([0])
else:
result = np.array([1./width])
return result