"""Processing of population files.
This module contains classes and functions to process population files.
Population files are HDF5 files containing the history and oneline dataframes
of a population of binary systems. The history dataframe contains the detailed
evolution of each binary system, while the oneline dataframe contains the
final state of each binary system.
Classes
-------
PopulationRunner
A class to handle the evolution of binary populations.
Population
A class to handle population files.
History
A class to handle the history dataframe of a population file.
Oneline
A class to handle the oneline dataframe of a population file.
TransientPopulation
A class to handle transient populations.
Rates
A class to handle the cosmic rates of in a population file.
"""
__authors__ = [
"Simone Bavera <Simone.Bavera@unige.ch>",
"Kyle Akira Rocha <kylerocha2024@u.northwestern.edu>",
"Monica Gallegos-Garcia <monicagallegosgarcia2024@u.northwestern.edu>",
"Max Briel <max.briel@unige.ch>",
]
import numpy as np
import pandas as pd
from tqdm import tqdm
import os
from matplotlib import pyplot as plt
from posydon.utils.constants import Zsun
from posydon.popsyn.io import binarypop_kwargs_from_ini
from posydon.popsyn.normalized_pop_mass import initial_total_underlying_mass
import posydon.visualization.plot_pop as plot_pop
from posydon.utils.common_functions import convert_metallicity_to_string
from posydon.utils.posydonwarning import Pwarn
from astropy.cosmology import Planck15 as cosmology
from astropy import constants as const
from posydon.popsyn.rate_calculation import (
get_shell_comoving_volume,
get_comoving_distance_from_redshift,
get_cosmic_time_from_redshift,
redshift_from_cosmic_time_interpolator,
DEFAULT_MODEL,
get_redshift_bin_edges,
get_redshift_bin_centers,
)
from posydon.popsyn.star_formation_history import (
star_formation_rate,
SFR_Z_fraction_at_given_redshift,
)
from posydon.popsyn.binarypopulation import (
BinaryPopulation,
HISTORY_MIN_ITEMSIZE,
ONELINE_MIN_ITEMSIZE,
)
###############################################################################
parameter_array = [
"number_of_binaries",
"binary_fraction_scheme",
"binary_fraction_const",
"star_formation",
"max_simulation_time",
"primary_mass_scheme",
"primary_mass_min",
"primary_mass_max",
"secondary_mass_scheme",
"secondary_mass_min",
"secondary_mass_max",
"orbital_scheme",
"orbital_period_scheme",
"orbital_period_min",
"orbital_period_max",
"eccentricity_scheme",
]
[docs]
class PopulationRunner:
"""A class to handle the evolution of binary populations.
Attributes
----------
pop_params : dict
The parameters of the population population.
solar_metallicities : list of float
The metallicities of the populations in solar units.
binary_populations : list of BinaryPopulation
The binary populations.
verbose : bool
If `True`, print additional information.
"""
def __init__(self, path_to_ini, verbose=False):
"""Initialize the binary populations from an ini file.
Parameters
----------
path_to_ini : str
The path to the ini file.
verbose : bool, optional
If True, print additional information. Default is False.
Raises
------
ValueError
If the provided `path_to_ini` does not have a '.ini' extension.
Notes
-----
This method initializes the `PopulationRunner` object by reading the binary population parameters from an ini file specified by `path_to_ini`.
It also allows for optional verbosity, which, when set to True, enables additional information to be printed in operation of the class.
The `path_to_ini` should be a valid path to an ini file. If the provided `path_to_ini` does not have a '.ini' extension, a `ValueError` is raised.
Examples
--------
>>> sp = PopulationRunner('/path/to/ini/file.ini')
>>> sp = PopulationRunner('/path/to/ini/file.ini', verbose=True)
"""
if ".ini" not in path_to_ini:
raise ValueError("You did not provide a valid path_to_ini!")
else:
self.pop_params = binarypop_kwargs_from_ini(path_to_ini)
self.solar_metallicities = self.pop_params["metallicity"]
self.verbose = verbose
if not isinstance(self.solar_metallicities, list):
self.solar_metallicities = [self.solar_metallicities]
self.binary_populations = []
for MET in self.solar_metallicities:
ini_kw = binarypop_kwargs_from_ini(path_to_ini)
# overwrite the ini_kw verbose parameter
ini_kw["verbose"] = self.verbose
ini_kw['tqdm'] = self.verbose
ini_kw["metallicity"] = MET
ini_kw["temp_directory"] = (
convert_metallicity_to_string(MET)
+ "_Zsun_"
+ ini_kw["temp_directory"]
)
self.binary_populations.append(BinaryPopulation(**ini_kw))
[docs]
def evolve(self, overwrite=False):
"""Evolve the binary populations.
This method is responsible for evolving the binary populations. It iterates over each population
in the `binary_populations` list and calls the `evolve` method on each population. After evolving
each population, it merges them using the `merge_parallel_runs` method.
Note:
The `merge_parallel_runs` method is called only if the `comm` attribute of the population is `None`.
"""
for pop in self.binary_populations:
# check if the temp directory exists
if os.path.exists(pop.kwargs["temp_directory"]) and not overwrite:
raise FileExistsError(f"The {pop.kwargs['temp_directory']} directory already exists! Please remove it or rename it before running the population.")
elif os.path.exists(pop.kwargs["temp_directory"]) and overwrite:
if self.verbose:
print(f"Removing {pop.kwargs['temp_directory']} directory...")
os.removedirs(pop.kwargs["temp_directory"])
pop.evolve()
if pop.comm is None:
self.merge_parallel_runs(pop)
[docs]
def merge_parallel_runs(self, pop):
"""Merge the parallel runs of the population.
Parameters
----------
pop : BinaryPopulation
The binary population whose files have to be merged.
"""
if os.path.exists(convert_metallicity_to_string(pop.metallicity) + "_Zsun_population.h5"):
raise FileExistsError(
f"{convert_metallicity_to_string(pop.metallicity)}_Zsun_population.h5 already exists!\n"
+"Files were not merged. You can use PopulationRunner.merge_parallel_runs() to merge the files manually."
)
path_to_batch = pop.kwargs["temp_directory"]
tmp_files = [
os.path.join(path_to_batch, f)
for f in os.listdir(path_to_batch)
if os.path.isfile(os.path.join(path_to_batch, f))
]
if self.verbose:
print(f"Merging {len(tmp_files)} files...")
pop.combine_saved_files(
convert_metallicity_to_string(pop.metallicity) + "_Zsun_population.h5",
tmp_files,
)
if self.verbose:
print("Files merged!")
print(f"Removing files in {path_to_batch}...")
# remove files
if len(os.listdir(path_to_batch)) == 0:
os.rmdir(path_to_batch)
##################
# Helper classes #
##################
[docs]
class DFInterface:
"""A class to handle the interface between the population file and the History and Oneline classes."""
def __init__(self):
self.filename = None
self.chunksize = None
[docs]
def head(self, key, n=10):
"""Return the first n rows of the key table
Parameters
----------
key: str
The key of the table
n : int, optional
The number of rows to return. Default is 10.
Returns
-------
pandas.DataFrame
The first n rows of the key table.
"""
with pd.HDFStore(self.filename, mode="r") as store:
return store.select(key, start=0, stop=n)
[docs]
def tail(self, key, n=10):
"""
Get the last n rows of the key table.
Parameters
----------
key : str
The key of the table.
n : int, optional
The number of rows to return. Default is 10.
Returns
-------
pd.DataFrame
The last n rows of the key table.
"""
with pd.HDFStore(self.filename, mode="r") as store:
return store.select(key, start=-n)
[docs]
def select(self, key, where=None, start=None, stop=None, columns=None):
'''Select a subset of the key table based on the given conditions.
Parameters
----------
key : str
The key of the table to select from.
where : str, optional
A string representing the query condition to apply to the data.
start : int, optional
The starting index of the data to select.
stop : int, optional
The ending index of the data to select.
columns : list, optional
A list of column names to select.
Returns
-------
pandas.DataFrame
The selected data as a DataFrame.
'''
# we have to chunk the read because of memory issues
with pd.HDFStore(self.filename, mode="r") as store:
iterator = store.select(key, where=where, start=start, stop=stop, columns=columns, chunksize=self.chunksize)
# read the data in chunks and concatenate once (faster than concat every chunk!)
out = []
for chunk in iterator:
out.append(chunk)
out = pd.concat(out, axis=0)
return out
[docs]
def get_repr(self, key):
'''Return a string representation of the key table.
Parameters
----------
key : str
The key of the table to return the string representation of.
Returns
-------
str
The string representation of the key table.
'''
with pd.HDFStore(self.filename, mode="r") as store:
return store.select(key, start=0, stop=10).__repr__()
[docs]
def get_html_repr(self, key):
"""Return the HTML representation of the key table.
Parameters
----------
key : str
The key of the table to return the HTML representation of.
Returns
-------
str
The HTML representation of the key table.
"""
with pd.HDFStore(self.filename, mode="r") as store:
return store.select(key, start=0, stop=10)._repr_html_()
[docs]
class History(DFInterface):
"""A class to handle the history dataframe of a population file.
This class provides methods to handle the history dataframe of a population file.
It allows accessing and manipulating the history table based on various keys and conditions.
Attributes
----------
filename : str
The path to the population file.
verbose : bool
If `True`, print additional information.
chunksize : int
The chunksize to use when reading the history file.
lengths : pd.DataFrame
The number of rows of each binary in the history dataframe.
number_of_systems : int
The number of systems in the history dataframe.
columns : list of str
The columns of the history dataframe.
indices : np.ndarray
The binary indices of the history dataframe.
"""
def __init__(self, filename, verbose=False, chunksize=100000):
"""Initialise the history dataframe.
This class is used to handle the history dataframe of a population file.
On initialisation, the history_lengths are calculated and stored in the population file,
if not present in the file. The history dataframe is not loaded into memory.
Parameters
----------
filename : str
The path to the population file.
verbose : bool
If `True`, print additional information.
chunksize : int (default=100000)
The chunksize to use when reading the history file.
"""
self.filename = filename
self.verbose = verbose
self.chunksize = chunksize
self.lengths = None
self.number_of_systems = None
self.columns = None
if not os.path.exists(filename):
raise FileNotFoundError(f"{filename} does not exist!")
# add history_lengths
with pd.HDFStore(filename, mode="a") as store:
# get the history lengths from the file
if "/history_lengths" in store.keys():
self.lengths = store["history_lengths"]
else:
if self.verbose:
print(
"history_lengths not found in population file. Calculating history lengths..."
)
history_events = store.select_column("history", "index")
tmp_df = pd.DataFrame(
history_events.groupby(history_events).count(),
)
tmp_df.rename(columns={"index": "length"}, inplace=True)
self.lengths = tmp_df
del tmp_df
if self.verbose:
print("Storing history lengths in population file!")
store.put("history_lengths", pd.DataFrame(self.lengths), format="table")
del history_events
self.columns = store.select("history", start=0, stop=0).columns.to_list()
self.indices = self.lengths.index.to_numpy()
self.number_of_systems = len(self.lengths)
def __getitem__(self, key):
"""Return the history table based on the provided key.
Parameters
----------
key : int, list of int, np.ndarray, or str
The key to use for indexing the history dataframe.
Returns
-------
pd.DataFrame
The history table based on the provided key.
Raises
------
ValueError
If the key type is invalid or if the column name(s) are not valid.
Examples
--------
# Get a single row by index
>>> population[0]
Returns the history table for the row with index 0.
# Get multiple rows by index
>>> population[[0, 1, 2]]
Returns the history table for the rows with indices 0, 1, and 2.
# Get rows based on a boolean mask
>>> mask = population['age'] > 30
>>> population[mask]
Returns the history table for the rows where the 'age' column is greater than 30.
# Get a specific column
>>> population['time']
Returns the 'age' column from the history table.
# Get multiple columns
>>> population[['S1_mass', 'S2_mass']]
Returns the 'S1_mass' and 'S2_mass' columns from the history table.
"""
if isinstance(key, slice):
if key.start is None:
pre = 0
else:
pre = key.start
if key.stop is None:
chunk = self.number_of_systems
else:
chunk = key.stop - pre
indices = list(range(pre, pre + chunk))
return self.select(where=f'index in {indices}')
# single index
elif isinstance(key, int):
return self.select(where=f"index == {key}")
# list of indices
elif isinstance(key, list) and all(isinstance(x, int) for x in key):
if len(key) == 0:
return pd.DataFrame()
else:
return self.select(where=f"index in {key}")
# numpy array
elif isinstance(key, np.ndarray) and (key.dtype == int):
if len(key) == 0:
return pd.DataFrame()
else:
indices = key.tolist()
return self.select(where=f"index in {indices}")
# boolean mask
elif (isinstance(key, np.ndarray) and key.dtype == bool) or (isinstance(key, pd.DataFrame) and all(key.dtypes == bool)):
# return empty if no values
if len(key) == 0:
return pd.DataFrame()
# We cannot use self.select because we're using a boolean mask across the entire table
# This can be optimized by only selecting indices that are True instead of the entire table
with pd.HDFStore(self.filename, mode="r") as store:
iterator = store.select("history", chunksize=self.chunksize)
out = []
for i, chunk in enumerate(iterator):
out.append(chunk[key[i : i + self.chunksize]])
return pd.concat(out, axis=0)
# single column
elif isinstance(key, str):
if key in self.columns:
return self.select(columns=[key])
else:
raise ValueError(f"{key} is not a valid column name!")
# multiple columns
elif isinstance(key, list) and all(isinstance(x, str) for x in key):
if all(x in self.columns for x in key):
return self.select(columns=key)
else:
raise ValueError(f"Not all columns in {key} are valid column names!")
else:
raise ValueError("Invalid key type!")
def __len__(self):
"""Return the number of rows in the history table
Returns
-------
int
The number of rows in the history table.
"""
return np.sum(self.lengths.values)
[docs]
def head(self, n=10):
"""Return the first n rows of the history table
Parameters
----------
n : int, optional
The number of rows to return. Default is 10.
Returns
-------
pandas.DataFrame
The first n rows of the history table.
"""
return super().head("history", n)
[docs]
def tail(self, n=10):
"""Return the last n rows of the history table.
Parameters:
-----------
n : int, optional
Number of rows to return. Default is 10.
Returns:
--------
pandas.DataFrame
The last n rows of the history table.
"""
return super().tail("history", n)
def __repr__(self):
"""Return a string representation of the object.
Returns:
str: A string representation of the object.
"""
return super().get_repr("history")
def _repr_html_(self):
"""Return the HTML representation of the history dataframe.
This method reads the history data from an HDF file and returns
the HTML representation of the data using the `_repr_html_` method of the
pandas DataFrame.
Returns
-------
str
The HTML representation of the history dataframe.
"""
return super().get_html_repr("history")
[docs]
def select(self, where=None, start=None, stop=None, columns=None):
"""Select a subset of the history table based on the given conditions.
This method allows you to query and retrieve a subset of data from the history table
stored in an HDFStore file. You can specify conditions using the `where` parameter,
which is a string representing the query condition to apply to the data. You can also
specify the starting and ending indices of the data to select using the `start` and
`stop` parameters. Additionally, you can choose to select specific columns by providing
a list of column names using the `columns` parameter.
Parameters
----------
where : str, optional
A string representing the query condition to apply to the data.
It is only possible to query on the index or string columns.
start : int, optional
The starting index of the data to select.
stop : int, optional
The ending index of the data to select.
columns : list, optional
A list of column names to select.
Returns
-------
pandas.DataFrame
The selected data as a DataFrame.
"""
return super().select(key='history',
where=where,
start=start,
stop=stop,
columns=columns)
[docs]
class Oneline(DFInterface):
"""A class to handle the oneline dataframe of a population file.
The `Oneline` class provides methods to manipulate and retrieve data from the oneline dataframe of a population file.
Attributes
----------
filename : str
The path to the population file.
verbose : bool
If `True`, print additional information.
chunksize : int
The chunksize to use when reading the oneline file.
number_of_systems : int
The number of systems in the oneline dataframe.
indices : np.ndarray
The binary indices of the oneline dataframe.
columns : list of str
"""
def __init__(self, filename, verbose=False, chunksize=100000):
"""Initialize a Oneline class instance.
Parameters
----------
filename : str
The path to the HDFStore file containing the Oneline population data.
verbose : bool, optional
If True, print additional information during initialization. Default is False.
chunksize : int, optional
The number of rows to read from the HDFStore file at a time. Default is 10000.
"""
self.filename = filename
self.verbose = verbose
self.chunksize = chunksize
self.number_of_systems = None
self.indices = None
if not os.path.exists(filename):
raise FileNotFoundError(f"{filename} does not exist!")
with pd.HDFStore(filename, mode="r") as store:
self.indices = store.select_column("oneline", "index").to_numpy()
self.columns = store.select("oneline", start=0, stop=0).columns.to_list()
self.number_of_systems = len(self.indices)
def __getitem__(self, key):
"""Get a subset of the oneline table based on the given key.
Parameters
----------
key : slice, int, list, np.ndarray, str
The key to select the subset of the oneline table.
Returns
-------
pd.DataFrame
The subset of the oneline table.
Raises
------
ValueError
If the key is of invalid type or contains invalid values.
Examples
--------
# Get a slice of the oneline table
>>> subset = population[10:20]
# Get a single row from the oneline table
>>> row = population[5]
# Get multiple rows from the oneline table using a list of indices
>>> rows = population[[1, 3, 5]]
# Get rows from the oneline table using a boolean array
>>> mask = population['age'] > 30
>>> filtered_rows = population[mask]
# Get a specific column from the oneline table
>>> column = population['age']
# Get multiple columns from the oneline table using a list of column names
>>> columns = population[['age', 'gender']]
"""
if isinstance(key, slice):
if key.start is None:
pre = 0
else:
pre = key.start
if key.stop is None:
chunk = self.number_of_systems
else:
chunk = key.stop - pre
indices = list(range(pre, pre + chunk))
return self.select(where=f'index in {indices}')
elif isinstance(key, int):
return self.select(where=f"index == {key}")
elif isinstance(key, list) and all(isinstance(x, int) for x in key):
return self.select(where=f"index in {key}")
elif isinstance(key, np.ndarray) and (key.dtype == int):
indices = key.tolist()
return self.select(where=f"index in {indices}")
elif isinstance(key, list) and all(isinstance(x, float) for x in key):
raise ValueError("elements in list are not integers! Try casting to int.")
elif isinstance(key, pd.DataFrame) and all(key.dtypes == bool):
indices = self.indices[key.to_numpy().flatten()].tolist()
return self.select(where=f"index in {indices}")
elif isinstance(key, np.ndarray) and key.dtype == bool:
indices = self.indices[key].tolist()
return self.select(where=f"index in {indices}")
elif isinstance(key, str):
if key in self.columns:
return self.select(columns=[key])
else:
raise ValueError(f"{key} is not a valid column!")
elif isinstance(key, list) and all(isinstance(x, str) for x in key):
if all(x in self.columns for x in key):
return self.select(columns=key)
else:
raise ValueError(f"Not all columns in {key} are valid column names!")
else:
raise ValueError("Invalid key type!")
def __len__(self):
"""
Get the number of systems in the oneline table.
Returns
-------
int
The number of systems in the oneline table.
"""
return self.number_of_systems
[docs]
def head(self, n=10):
"""Get the first n rows of the oneline table.
Parameters
----------
n : int, optional
The number of rows to return. Default is 10.
Returns
-------
pd.DataFrame
The first n rows of the oneline table.
"""
return super().head("oneline", n)
[docs]
def tail(self, n=10):
"""
Get the last n rows of the oneline table.
Parameters
----------
n : int, optional
The number of rows to return. Default is 10.
Returns
-------
pd.DataFrame
The last n rows of the oneline table.
"""
return super().tail("oneline", n)
def __repr__(self):
"""
Get a string representation of the oneline table.
Returns
-------
str
The string representation of the oneline table.
"""
return super().get_repr("oneline")
def _repr_html_(self):
"""
Get an HTML representation of the oneline table.
Returns
-------
str
The HTML representation of the oneline table.
"""
return super().get_html_repr("oneline")
[docs]
def select(self, where=None, start=None, stop=None, columns=None):
"""Select a subset of the oneline table based on the given conditions.
This method allows you to filter and extract a subset of rows from the oneline table stored in an HDF file.
You can specify conditions to filter the rows, define the range of rows to select, and choose specific columns to include in the subset.
Parameters
----------
where : str, optional
A condition to filter the rows of the oneline table. Default is None.
It is only possible to query on the index or string columns.
start : int, optional
The starting index of the subset. Default is None.
stop : int, optional
The ending index of the subset. Default is None.
columns : list, optional
The column names to include in the subset. Default is None.
Returns
-------
pd.DataFrame
The selected subset of the oneline table.
Examples
--------
# Select rows based on a condition
>>> df = Oneline.select(where="event == 'ZAMS'")
# Select rows from index 10 to 20
>>> df = Oneline.select(start=10, stop=20)
# Select specific columns
>>> df = Oneline.select(columns=['S1_mass_i', 'S1_mass_f'])
"""
return super().select(key='oneline',
where=where,
start=start,
stop=stop,
columns=columns)
[docs]
class PopulationIO:
"""A class to handle the input/output of population files.
This class provides methods to load and save population files in HDF5 format.
It also includes methods to load and save metadata and ini parameters.
Attributes
----------
mass_per_metallicity : pandas.DataFrame
A DataFrame containing mass per metallicity data.
ini_params : dict
A dictionary containing some ini parameters, described in parameter_array.
"""
def __init__(self):
self.verbose = False
def _load_metadata(self, filename):
"""Load the metadata from the file.
Parameters
----------
filename : str
The name of the file to load the metadata from.
Raises
------
ValueError
If the filename does not contain '.h5' extension.
"""
if ".h5" not in filename:
raise ValueError(
f"{filename} does not contain .h5 in the se.\n Is this a valid population file?"
)
self._load_ini_params(filename)
self._load_mass_per_metallicity(filename)
def _save_mass_per_metallicity(self, filename):
"""Save the mass per metallicity data to the file.
Parameters
----------
filename : str
The name of the file to save the mass per metallicity data to.
"""
with pd.HDFStore(filename, mode="a") as store:
store.put("mass_per_metallicity", self.mass_per_metallicity)
if self.verbose:
print("mass_per_metallicity table written to population file!")
def _load_mass_per_metallicity(self, filename):
"""Load the mass per metallicity data from the file.
Parameters
----------
filename : str
The name of the file to load the mass per metallicity data from.
"""
with pd.HDFStore(filename, mode="r") as store:
self.mass_per_metallicity = store["mass_per_metallicity"]
if self.verbose:
print("mass_per_metallicity table read from population file!")
def _save_ini_params(self, filename):
"""Save the ini parameters to the file.
Parameters
----------
filename : str
The name of the file to save the ini parameters to.
"""
with pd.HDFStore(filename, mode="a") as store:
# write ini parameters to file
tmp_df = pd.DataFrame()
for c in parameter_array:
tmp_df[c] = [self.ini_params[c]]
store.put("ini_parameters", tmp_df)
def _load_ini_params(self, filename):
"""Load the ini parameters from the file.
The values loaded from file are stored in the parameter_array.
Parameters
----------
filename : str
The name of the file to load the ini parameters from.
"""
# load ini parameters
with pd.HDFStore(filename,mode="r",) as store:
tmp_df = store["ini_parameters"]
self.ini_params = {}
for c in parameter_array:
self.ini_params[c] = tmp_df[c][0]
##########################
# Main interface classes #
##########################
[docs]
class Population(PopulationIO):
"""A class to handle population files.
This class provides methods to handle population files. It includes methods to read and write population files,
as well as methods to access and manipulate the history and oneline dataframes.
Attributes
----------
history : History
The history dataframe of the population.
oneline : Oneline
The oneline dataframe of the population.
formation_channels : pd.DataFrame
The formation channels dataframe of the population.
ini_params : dict
The parameters from the ini file used to create the population.
mass_per_metallicity : pd.DataFrame
The mass per metallicity dataframe of the population.
solar_metallicities : np.ndarray
The solar metallicities of the population.
metallicities : np.ndarray
The metallicities of the population.
indices : np.ndarray
The indices of the binaries in the population.
verbose : bool
If `True`, print additional information.
chunksize : int
The chunksize to use when reading the population file.
filename : str
The path to the population file.
number_of_systems : int
The number of systems in the population.
history_lengths : pd.DataFrame
The number of rows of each binary in the history dataframe. The index is the binary index.
"""
def __init__(
self, filename, metallicity=None, ini_file=None, verbose=False, chunksize=1000000
):
"""Initialize the Population object.
The Population object is initialised by creating History and Oneline objects,
which refer back to the population file (filename). The formation channels are also
linked to the population file, if present. The mass per metallicity data is loaded
from the file, if present, or calculated and saved to the file if not present.
If the mass per metallicity data is not present in the file, you can provide a metallicity
and the ini file (used to create the population) to calculate and save the mass per metallicity data.
You only need to do this once for a given population file. However, all systems in the population
will be given the same metallicity.
Parameters
-----------
filename : str
The path to the population file
metallicity : float, optional
The metallicity of the population in solar units.
ini_file : str, optional
The path to the ini file used to create the population.
verbose : bool, optional
If `True`, print additional information.
chunksize : int, optional
The chunksize to use when reading the population file.
Raises
------
ValueError
If the provided filename does not contain '.h5' extension.
If the population file does not contain a history table.
If the population file does not contain an oneline table.
If the population file does not contain an ini_parameters table.
If the population file does not contain a mass_per_metallicity table and no metallicity for the file was given.
Examples
--------
# When the population file contains a mass_per_metallicity table
>>> pop = Population('/path/to/population_file.h5')
# When the population file does not contain a mass_per_metallicity table
>>> pop = Population('/path/to/population_file.h5', metallicity=0.02, ini_file='/path/to/ini_file.ini')
"""
self.filename = filename
self.verbose = verbose
self.chunksize = chunksize
self.mass_per_metallicity = None
self.number_of_systems = None
self.history_lengths = None
# if the user provided a single string instead of a list of strings
if not (".h5" in filename):
raise ValueError(
f"{filename} does not contain .h5 in the se.\n Is this a valid population file?"
)
# read the population file
with pd.HDFStore(filename, mode="r") as store:
keys = store.keys()
# check if pop contains history
if "/history" not in keys:
raise ValueError(f"{filename} does not contain a history table!")
else:
self.history = History(filename, self.verbose, self.chunksize)
# check if pop contains oneline
if "/oneline" not in keys:
raise ValueError(f"{filename} does not contain an oneline table!")
else:
self.oneline = Oneline(filename, self.verbose, self.chunksize)
# check if formation channels are present
if "/formation_channels" not in keys:
if self.verbose:
print(f"{filename} does not contain formation channels!")
self._formation_channels = None
else:
self._formation_channels = pd.read_hdf(
self.filename, key="formation_channels"
)
# if an ini file is given, read the parameters from the ini file
if ini_file is not None:
self.ini_params = binarypop_kwargs_from_ini(ini_file)
self._save_ini_params(filename)
self._load_ini_params(filename)
else:
if "/ini_parameters" not in keys:
raise ValueError(
f"{filename} does not contain an ini_parameters table!"
)
else:
self._load_ini_params(filename)
# check if pop contains mass_per_metallicity table
if "/mass_per_metallicity" in keys and metallicity is None:
self._load_mass_per_metallicity(filename)
self.solar_metallicities = self.mass_per_metallicity.index.to_numpy()
self.metallicities = self.solar_metallicities * Zsun
elif metallicity is None:
raise ValueError(
f"{filename} does not contain a mass_per_metallicity table and no metallicity for the file was given!"
)
# calculate the metallicity information. This assumes the metallicity is for the whole file!
if metallicity is not None and ini_file is not None:
if "/mass_per_metallicity" in keys:
Pwarn(f"{filename} already contains a mass_per_metallicity "
"table. Overwriting the table!", "OverwriteWarning")
# only load in required columns
tmp_data = self.oneline[['state_i', 'S1_mass_i', 'S2_mass_i']]
mask = tmp_data["state_i"] == "initially_single_star"
filtered_data_single = tmp_data[mask]
filtered_data_binaries = tmp_data[~mask]
simulated_mass_single = np.nansum(filtered_data_single[["S1_mass_i"]].to_numpy())
simulated_mass_binaries = np.nansum(filtered_data_binaries[["S1_mass_i", "S2_mass_i"]].to_numpy())
simulated_mass = simulated_mass_single + simulated_mass_binaries
del tmp_data, filtered_data_single, filtered_data_binaries
self.mass_per_metallicity = pd.DataFrame(
index=[metallicity],
data={"simulated_mass": simulated_mass,
"simulated_mass_single": simulated_mass_single,
"simulated_mass_binaries": simulated_mass_binaries,
"number_of_systems": len(self.oneline),
},
)
self._save_mass_per_metallicity(filename)
self.solar_metallicities = self.mass_per_metallicity.index.to_numpy()
self.metallicities = self.solar_metallicities * Zsun
elif metallicity is not None and ini_file is None:
raise ValueError(
f"{filename} does not contain a mass_per_metallicity table and no ini file was given!"
)
# add number of systems
self.history_lengths = self.history.lengths
self.number_of_systems = self.oneline.number_of_systems
self.indices = self.history.indices
[docs]
def calculate_underlying_mass(self, f_bin=0.7, overwrite=False):
"""Calculate the underlying mass of the population.
Adds the underlying mass of the population to the mass_per_metallicity table.
This method calculates the underlying mass of the population based on the simulated mass
of the population and the boundaries of the sampled mass distribution.
You can specify the fraction of binaries in the population using the `f_bin` parameter.
Parameters
----------
f_bin : float, optional
The fraction of binaries in the population. Default is 0.7.
overwrite : bool, optional
If `True`, overwrite the underlying mass values if they already exist. Default is `False`.
Returns
-------
np.ndarray
The underlying mass of the population.
"""
if 'underlying_mass' in self.mass_per_metallicity.columns:
warn_text="underlying_mass already exists in the mass_per_metallicity table."
if overwrite:
Pwarn(warn_text+" Overwriting the underlying_mass values.", "OverwriteWarning")
else:
Pwarn(warn_text+" Not overwriting the underlying_mass values, skipping it.", "IncompletenessWarning")
return
underlying_mass = np.zeros(len(self.mass_per_metallicity))
for i in range(len(self.mass_per_metallicity)):
underlying_mass[i] = initial_total_underlying_mass(
simulated_mass=self.mass_per_metallicity['simulated_mass'].iloc[i],
simulated_mass_single=self.mass_per_metallicity['simulated_mass_single'].iloc[i],
simulated_mass_binaries=self.mass_per_metallicity['simulated_mass_binaries'].iloc[i],
f_bin=f_bin,
**self.ini_params)[0]
self.mass_per_metallicity['underlying_mass'] = underlying_mass
# save it to the file
self._save_mass_per_metallicity(self.filename)
return underlying_mass
[docs]
def export_selection(self, selection, filename, overwrite=False, append=False, history_chunksize=1000000):
"""Export a selection of the population to a new file
This method exports a selection of systems from the population to a new file.
The selected systems are specified by their indices in the population.
By default the selected systems will overwrite the file if it already exists.
If overwrite is set to False, the selected systems will be appended to
the existing file if it already exists and the indices will be shifted
based on the current length of data in the file that is being appended to.
Parameters
----------
selection : list of int
The indices of the systems to export.
filename : str
The name of the export file to create or append to.
chunksize : int, optional
The number of systems to export at a time. Default is 1000000.
Raises
------
ValueError
If the filename does not contain ".h5" extension.
Warnings
--------
ReplaceValueWarning
If there is no "metallicity" column in the oneline dataframe and the population file contains multiple metallicities.
Notes
-----
- The exported systems will be appended to the existing file if it already exists.
- The indices of the exported systems will be shifted based on the current length of data in the file.
- The "oneline" and "history" dataframes of the selected systems will be written to the file.
- If available, the "formation_channels" dataframe and "history_lengths" dataframe of the selected systems will also be written to the file.
- The "metallicity" column of the oneline dataframe will be added if it is not present, using the metallicity of the population file.
- The "mass_per_metallicity" dataframe will be updated with the number of selected systems.
Examples
--------
# Export systems with indices [0, 1, 2] to a new file named "selected.h5"
>>> population.export_selection([0, 1, 2], "selected.h5")
# Export systems with indices [3, 4, 5] to an existing file named "existing.h5"
>>> population.export_selection([3, 4, 5], "existing.h5")
# Export systems with indices [6, 7, 8] to a new file named "selected.h5" in chunks of 2
>>> population.export_selection([6, 7, 8], "selected.h5", history_chunksize=2)
"""
if not (".h5" in filename):
raise ValueError(
f"{filename} does not contain .h5 in the se.\n Is this a valid population file?"
)
# overwrite and append cannot both be True
if append and overwrite:
raise ValueError("Both overwrite and append cannot be True!")
# check for file existence
if os.path.exists(filename) and not overwrite and not append:
raise FileExistsError(f"{filename} already exists! Set overwrite or append to True to continue!")
if overwrite:
mode = 'w'
elif append:
mode = 'a'
else:
mode = 'w'
history_cols = self.history.columns
oneline_cols = self.oneline.columns
history_min_itemsize = {
key: val for key, val in HISTORY_MIN_ITEMSIZE.items() if key in history_cols
}
oneline_min_itemsize = {
key: val for key, val in ONELINE_MIN_ITEMSIZE.items() if key in oneline_cols
}
with pd.HDFStore(filename, mode=mode) as store:
# shift all new indices by the current length of data in the file
last_index_in_file = -1
if "/oneline" in store.keys():
last_index_in_file = np.sort(store["oneline"].index)[-1]
elif "/history" in store.keys():
last_index_in_file = np.sort(store["history"].index)[-1]
if "/history" in store.keys() and self.verbose:
print("history in file. Appending to file")
if "/oneline" in store.keys() and self.verbose:
print("oneline in file. Appending to file")
if "/formation_channels" in store.keys() and self.verbose:
print("formation_channels in file. Appending to file")
if "/history_lengths" in store.keys() and self.verbose:
print("history_lengths in file. Appending to file")
# TODO: I need to shift the indices of the binaries or should I reindex them?
# since I'm storing the information, reindexing them should be fine.
if last_index_in_file == -1:
last_index_in_file = 0
reindex = {
i: j
for i, j in zip(selection,
np.arange(last_index_in_file, last_index_in_file + len(selection), 1),
)
}
else:
reindex = {
i: j
for i, j in zip(selection,
np.arange(last_index_in_file + 1, last_index_in_file + len(selection) + 1,1,),
)
}
if "metallicity" not in self.oneline.columns:
Pwarn("No metallicity column in oneline dataframe! Using the "
"metallicity of the population file and adding it to the"
" oneline.", "ReplaceValueWarning")
if len(self.metallicities) > 1:
raise ValueError(
"The population file contains multiple metallicities. Please add a metallicity column to the oneline dataframe!"
)
if self.verbose:
print("Writing selected systems to population file...")
# write oneline of selected systems
for i in tqdm(
range(0, len(selection), self.chunksize),
total=len(selection) // self.chunksize,
disable=not self.verbose,
):
tmp_df = self.oneline[selection[i : i + self.chunksize]]
if "metallicity" in tmp_df.columns:
tmp_df["metallicity"] = tmp_df["metallicity"].astype("float")
else:
tmp_df["metallicity"] = self.metallicities[0]
tmp_df.rename(index=reindex, inplace=True)
store.append(
"oneline",
tmp_df,
format="table",
min_itemsize=oneline_min_itemsize,
index=False,
)
if self.verbose:
print("Oneline: Done")
# write history of selected systems
for i in tqdm(
range(0, len(selection), history_chunksize),
total=len(selection) // history_chunksize,
disable=not self.verbose,
):
tmp_df = self.history[selection[i : i + history_chunksize]]
tmp_df.rename(index=reindex, inplace=True)
store.append(
"history",
tmp_df,
format="table",
min_itemsize=history_min_itemsize,
index=False,
)
if self.verbose:
print("History: Done")
# write formation channels of selected systems
if self.formation_channels is not None:
for i in tqdm(
range(0, len(selection), self.chunksize),
total=len(selection) // self.chunksize,
disable=not self.verbose,
):
tmp_df = self.formation_channels.loc[selection[i : i + self.chunksize]]
tmp_df.rename(index=reindex, inplace=True)
store.append(
"formation_channels",
tmp_df,
format="table",
min_itemsize={"channel_debug": 100, "channel": 100},
index=False,
)
## METADATA
# write the history lengths
for i in tqdm(
range(0, len(selection), self.chunksize),
total=len(selection) // self.chunksize,
disable=not self.verbose,
):
tmp_df = self.history.lengths.loc[selection[i : i + self.chunksize]]
tmp_df.rename(index=reindex, inplace=True)
store.append(
"history_lengths", pd.DataFrame(tmp_df), format="table", index=False
)
# write mass_per_metallicity
if "/mass_per_metallicity" in store.keys():
self_mass = self.mass_per_metallicity
self_mass["number_of_systems"] = len(selection)
tmp_df = pd.concat([store["mass_per_metallicity"], self_mass])
mass_per_metallicity = tmp_df.groupby(tmp_df.index).sum()
store.put("mass_per_metallicity", mass_per_metallicity)
else:
self_mass = self.mass_per_metallicity
self_mass["number_of_systems"] = len(selection)
store.put("mass_per_metallicity", self_mass)
# write ini parameters
self._save_ini_params(filename)
@property
def formation_channels(self):
"""
Retrieves the formation channels from the population file.
Returns:
pandas.DataFrame or None: The formation channels if available, otherwise None.
"""
with pd.HDFStore(self.filename, mode="r") as store:
if "/formation_channels" in store.keys():
self._formation_channels = pd.read_hdf(
self.filename, key="formation_channels"
)
else:
if self.verbose:
print("No formation channels in the population file!")
self._formation_channels = None
return self._formation_channels
def _write_formation_channels(self, filename, df):
"""Write the formation channels to the population file
This will append the formation channels to the population file, while restricting the maximum
length of the channel and channel_debug columns to 100 characters.
Parameters
----------
filename : str
The name of the file to write the formation channels to.
df : pd.DataFrame
The dataframe containing the formation channels.
"""
str_length = 100
with pd.HDFStore(filename, mode="a") as store:
df['channel'] = df['channel'].str.slice(0, str_length)
df['channel_debug'] = df['channel_debug'].str.slice(0, str_length)
store.append(
"formation_channels",
df,
format="table",
min_itemsize={"channel_debug": str_length, "channel": str_length},
)
def __len__(self):
"""Get the number of systems in the population.
Returns
-------
int
The number of systems in the population.
"""
return self.number_of_systems
@property
def columns(self):
"""
Returns a dictionary containing the column names of the history and oneline dataframes.
Returns
-------
dict
A dictionary with keys 'history' and 'oneline', where the values are the column names of the respective dataframes.
"""
return {"history": self.history.columns, "oneline": self.oneline.columns}
[docs]
def create_transient_population(
self, func, transient_name, oneline_cols=None, hist_cols=None
):
"""Given a function, create a TransientPopulation
This method creates a transient population using the provided function.
`func` is given the history, oneline, and formation channels dataframes as arguments.
The function should return a dataframe containing the transient population, which needs to contain the columns 'time' and 'metallicity'.
Processing is done in chunks to avoid memory issues and a pandas DataFrame is stored at `'/transients/transient_name'` in the population file.
The creation of the transient population can be sped up by limiting the oneline_cols and hist_cols to only the columns needed for the function.
If you do not provide these, all columns will be used, which can be slow for large populations.
Parameters
----------
func : function
Function to apply to the parsed population to create a transient population.
The function needs to take 3 arguments:
- history_chunk : pd.DataFrame
- oneline_chunk : pd.DataFrame
- formation_channels_chunk : pd.DataFrame
and return a pd.DataFrame containing the transient population, which needs to contain the columns 'time' and 'metallicity'.
oneline_cols : list of str, optional
Columns to extract from the oneline dataframe. Default is all columns.
hist_cols : list of str, optional
Columns to extract from the history dataframe. Default is all columns.
Returns
-------
TransientPopulation or None
A TransientPopulation object for interfacing with the transient population or None if no systems are present in the TransientPopulation.
Raises
------
ValueError
If the transient population requires a time column or if the transient population contains duplicate columns.
Examples
--------
See the tutorials for examples of how to use this method.
"""
with pd.HDFStore(self.filename, mode="a") as store:
if f"/transients/{transient_name}" in store.keys():
print("overwriting transient population")
del store["transients/" + transient_name]
min_itemsize = {
"channel": 100,
}
if hist_cols is not None:
if "time" not in hist_cols:
raise ValueError("The transient population requires a time column!")
min_itemsize.update(
{
key: val
for key, val in HISTORY_MIN_ITEMSIZE.items()
if key in hist_cols
}
)
else:
hist_cols = self.history.columns
min_itemsize.update(HISTORY_MIN_ITEMSIZE)
if oneline_cols is not None:
min_itemsize.update(
{
key: val
for key, val in ONELINE_MIN_ITEMSIZE.items()
if key in oneline_cols
}
)
else:
oneline_cols = self.oneline.columns
min_itemsize.update(ONELINE_MIN_ITEMSIZE)
# setup a mapping to the size of each history colummn
history_lengths = self.history_lengths
unique_binary_indices = self.indices
previous = 0
for i in tqdm(
range(0, len(unique_binary_indices), self.chunksize),
disable=not self.verbose,
):
end = previous + history_lengths[i : i + self.chunksize].sum().iloc[0]
oneline_chunk = self.oneline.select(
start=i, stop=i + self.chunksize, columns=oneline_cols
)
history_chunk = self.history.select(
start=previous, stop=end, columns=hist_cols
)
if self.formation_channels is not None:
formation_channels_chunk = self.formation_channels[
i : i + self.chunksize
]
else:
formation_channels_chunk = None
syn_df = func(history_chunk, oneline_chunk, formation_channels_chunk)
if len(syn_df.columns) != len(syn_df.columns.unique()):
raise ValueError("Transient population contains duplicate columns!")
# filter out the columns in min_itemsize that are not in the dataframe
min_itemsize = {
key: val for key, val in min_itemsize.items() if key in syn_df.columns
}
with pd.HDFStore(self.filename, mode="a") as store:
store.append(
"transients/" + transient_name,
syn_df,
format="table",
min_itemsize=min_itemsize,
)
previous = end
# it can happen that no systems are selected, in which case nothing has been appended to the file in the loop
with pd.HDFStore(self.filename, mode="r") as store:
if '/transients/'+transient_name not in store.keys():
Pwarn("No systems selected for the transient population!", "POSYDONWarning")
return None
synth_pop = TransientPopulation(
self.filename, transient_name, verbose=self.verbose
)
return synth_pop
[docs]
def plot_binary_evolution(self, index):
"""Plot the binary evolution of a system
This method is not currently implemented.
"""
pass
[docs]
class TransientPopulation(Population):
"""A class representing a population of transient events.
This class allows you to calculate additional properties of the population,
such as the efficiency of events per Msun for each solar metallicity, and to
calculate the cosmic weights of the transient population.
Attributes
----------
population : pandas.DataFrame
DataFrame containing the whole transient population.
transient_name : str
Name of the transient population.
efficiency : pandas.DataFrame
DataFrame containing the efficiency of events per Msun for each solar metallicity.
columns : list
List of columns in the transient population.
"""
def __init__(self, filename, transient_name, verbose=False, chunksize=100000):
"""Initialise the TransientPopulation object.
This method initializes the TransientPopulation object by linking it to the population file.
The transient population linked is located at '/transients/{transient_name}' in the population file.
Parameters
----------
filename : str
The name of the file containing the population. The file should be in HDF5 format.
transient_name : str
The name of the transient population within the file.
verbose : bool, optional
If `True`, additional information will be printed during the initialization process.
chunksize : int, optional
The chunksize to use when reading the population file (default is 100000).
Raises
------
ValueError
If the specified transient population name is not found in the file.
Notes
-----
The population data is stored in an HDF5 file format. The file should contain a group named '/transients' which
holds all the transient populations. The specified transient population name should be a valid group name within
'/transients'. If the transient population has associated efficiencies, they will be loaded as well.
Examples
--------
>>> filename = 'population_data.h5'
>>> transient_name = 'BBH'
>>> population = TransientPopulation(filename, transient_name, verbose=True)
"""
super().__init__(filename, verbose=verbose, chunksize=chunksize)
with pd.HDFStore(self.filename, mode="r") as store:
if "/transients/" + transient_name not in store.keys():
raise ValueError(
f"{transient_name} is not a valid transient population in {filename}!"
)
self.transient_name = transient_name
if "/transients/" + transient_name + "/efficiencies" in store.keys():
self._load_efficiency(filename)
@property
def population(self):
"""Returns the entire transient population as a pandas DataFrame.
This method retrieves the transient population data from a file and returns it as a pandas DataFrame.
Please note that if the transient population is too large, it may consume a significant amount of memory.
Returns
-------
pd.DataFrame
A DataFrame containing the transient population data.
"""
return pd.read_hdf(self.filename, key="transients/" + self.transient_name)
def _load_efficiency(self, filename):
"""Load the efficiency from the file
Parameters:
filename (str): The path to the file containing the efficiency data.
Returns:
None
Raises:
None
"""
with pd.HDFStore(filename, mode="r") as store:
self.efficiency = store[
"transients/" + self.transient_name + "/efficiencies"
]
if self.verbose:
print("Efficiency table read from population file!")
def _save_efficiency(self, filename):
"""Save the efficiency to the file.
Args:
filename (str): The name of the file to save the efficiency to.
Returns:
None
"""
with pd.HDFStore(filename, mode="a") as store:
store.put(
"transients/" + self.transient_name + "/efficiencies", self.efficiency
)
@property
def columns(self):
"""Return the columns of the transient population.
Returns:
list: A list of column names in the transient population.
"""
if not hasattr(self, "_columns"):
with pd.HDFStore(self.filename, mode="r") as store:
self._columns = store.select(
"transients/" + self.transient_name, start=0, stop=0
).columns
return self._columns
[docs]
def select(self, where=None, start=None, stop=None, columns=None):
"""
Select a subset of the transient population.
This method allows you to filter and extract a subset of rows from the transient table stored in an HDF file.
You can specify conditions to filter the rows, define the range of rows to select, and choose specific columns to include in the subset.
Parameters
----------
where : str, optional
A condition to filter the rows of the transient table. Default is None.
It is only possible to search on the index or string columns.
start : int, optional
The starting index of the subset. Default is None.
stop : int, optional
The ending index of the subset. Default is None.
columns : list, optional
The column names to include in the subset. Default is None.
Returns
-------
pd.DataFrame
The selected subset of the transient table.
Examples
--------
# Select rows based on a condition
>>> df = transpop.select(where="S1_state == 'BH'")
# Select rows from index 10 to 20
>>> df = transpop.select(start=10, stop=20)
# Select specific columns
>>> df = transpop.select(columns=['time', 'metallicity'])
"""
return pd.read_hdf(
self.filename,
key="transients/" + self.transient_name,
where=where,
start=start,
stop=stop,
columns=columns,
)
[docs]
def calculate_cosmic_weights(self, SFH_identifier, MODEL_in=None):
"""
Calculate the cosmic weights of the transient population.
This method calculates the cosmic weights of the transient population based on the provided star formation history identifier and model parameters.
It performs various calculations and stores the results in an HDF5 file at the location '/transients/{transient_name}/rates/{SFH_identifier}'.
This allows for multiple star formation histories to be used with the same transient population.
The default MODEL parameters are used if none are provided, which come from the IllustrisTNG model.
Parameters
----------
SFH_identifier : str
Identifier for the star formation history.
MODEL_in : dict, optional
Dictionary containing the model parameters. If not provided, the default model parameters will be used.
Returns
-------
Rates
An instance of the Rates class.
Raises
------
ValueError
If a parameter name in MODEL_in is not valid.
Notes
-----
This function calculates the cosmic weights of the transient population based on the provided star formation history
identifier and model parameters. It performs various calculations and stores the results in an HDF5 file.
The cosmic weights are computed for each event in the population, taking into account the metallicity, redshift,
and birth time of the events. The weights are calculated using the provided model parameters and the underlying mass
distribution.
The calculated weights, along with the corresponding redshifts of the events, are stored in the HDF5 file for further analysis.
These can be accessed using the Rates class.
Examples
--------
>>> transient_population = TransientPopulation('filename.h5', 'transient_name')
>>> transient_population.calculate_cosmic_weights('IllustrisTNG', MODEL_in=DEFAULT_MODEL)
"""
# Set model to DEFAULT or provided MODEL parameters
# Allows for partial model specification
if MODEL_in is None:
MODEL = DEFAULT_MODEL
else:
for key in MODEL_in:
if key not in DEFAULT_MODEL:
raise ValueError(key + " is not a valid parameter name!")
# write the DEFAULT_MODEL with updates parameters to self.MODEL.
MODEL = DEFAULT_MODEL
MODEL.update(MODEL_in)
path_in_file = (
"/transients/" + self.transient_name + "/rates/" + SFH_identifier + "/"
)
if 'underlying_mass' not in self.mass_per_metallicity.columns:
raise ValueError("Underlying mass not calculated! Please calculate the underlying mass first!")
with pd.HDFStore(self.filename, mode="a") as store:
if path_in_file + "MODEL" in store.keys():
store.remove(path_in_file + "MODEL")
if self.verbose:
print("Cosmic weights already computed! Overwriting them!")
if path_in_file + "weights" in store.keys():
store.remove(path_in_file + "weights")
if path_in_file + "z_events" in store.keys():
store.remove(path_in_file + "z_events")
if path_in_file + "birth" in store.keys():
store.remove(path_in_file + "birth")
self._write_MODEL_data(self.filename, path_in_file, MODEL)
rates = Rates(
self.filename, self.transient_name, SFH_identifier, verbose=self.verbose
)
z_birth = rates.centers_redshift_bins
t_birth = get_cosmic_time_from_redshift(z_birth)
nr_of_birth_bins = len(z_birth)
# write birth to the population file
with pd.HDFStore(self.filename, mode="a") as store:
store.put(
path_in_file + "birth", pd.DataFrame(data={"z": z_birth, "t": t_birth})
)
get_redshift_from_cosmic_time = redshift_from_cosmic_time_interpolator()
indices = self.indices
# sample the SFH for only the events that are within the Hubble time
# only need to sample the SFH at each metallicity and z_birth
# Not for every event!
SFR_at_z_birth = star_formation_rate(rates.MODEL["SFR"], z_birth)
# get metallicity bin edges
met_edges = rates.edges_metallicity_bins
# get the fractional SFR at each metallicity and z_birth
fSFR = SFR_Z_fraction_at_given_redshift(
z_birth,
rates.MODEL["SFR"],
rates.MODEL["sigma_SFR"],
met_edges,
rates.MODEL["Z_max"],
rates.MODEL["select_one_met"],
)
# simulated mass per given metallicity corrected for the unmodeled
# single and binary stellar mass
M_model = rates.mass_per_metallicity.loc[rates.centers_metallicity_bins / Zsun][
"underlying_mass"
].values
# speed of light
c = const.c.to("Mpc/yr").value # Mpc/yr
# delta cosmic time bin
deltaT = rates.MODEL["delta_t"] * 10**6 # yr
for i in tqdm(
range(0, len(indices), self.chunksize),
desc="event loop",
disable=not self.verbose,
):
selected_indices = (
self.select(start=i, stop=i + self.chunksize, columns=["index"])
.index.to_numpy()
.flatten()
)
if len(selected_indices) == 0:
continue
# selected_indices = indices[i:i+self.chunksize]
delay_time = (
self.select(
start=i, stop=i + self.chunksize, columns=["time"]
).to_numpy()
* 1e-3
) # Gyr
t_events = t_birth + delay_time
hubble_time_mask = t_events <= cosmology.age(1e-08).value * 0.9999999
# get the redshift of the events
z_events = np.full(t_events.shape, np.nan)
z_events[hubble_time_mask] = get_redshift_from_cosmic_time(
t_events[hubble_time_mask]
)
D_c = get_comoving_distance_from_redshift(z_events) # Mpc
# the events have to be in solar metallicity
met_events = (
self.select(start=i, stop=i + self.chunksize, columns=["metallicity"])
.to_numpy()
.flatten()
* Zsun
)
weights = np.zeros((len(met_events), nr_of_birth_bins))
for i, met in enumerate(rates.centers_metallicity_bins):
mask = met_events == met
weights[mask, :] = (
4.0
* np.pi
* c
* D_c[mask] ** 2
* deltaT
* (fSFR[:, i] * SFR_at_z_birth)
/ M_model[i]
) # yr^-1
with pd.HDFStore(self.filename, mode="a") as store:
store.append(
path_in_file + "weights",
pd.DataFrame(data=weights, index=selected_indices),
format="table",
)
store.append(
path_in_file + "z_events",
pd.DataFrame(data=z_events, index=selected_indices),
format="table",
)
return rates
[docs]
def plot_delay_time_distribution(
self, metallicity=None, ax=None, bins=100, color="black"
):
"""
Plot the delay time distribution of the transient population.
This method plots the delay time distribution of the transient population. If a specific metallicity is provided,
the delay time distribution of the population at that metallicity will be plotted. Otherwise, the delay time distribution
of the entire population will be plotted.
Parameters
----------
metallicity : float or None
The metallicity value to select a specific population. If None, the delay time distribution of the entire population will be plotted.
ax : matplotlib.axes.Axes or None
The axes object to plot the distribution on. If None, a new figure and axes will be created.
bins : int
The number of bins to use for the histogram.
color : str
The color of the histogram.
Raises
------
ValueError
If the specified metallicity is not present in the population.
Notes
-----
- The delay time distribution is normalized by the total mass of the population if no metallicity is specified.
Otherwise, it is normalized by the mass of the population at the specified metallicity.
"""
if 'underlying_mass' not in self.mass_per_metallicity.columns:
raise ValueError("Underlying mass not calculated! Please calculate the underlying mass first!")
if ax is None:
fig, ax = plt.subplots()
if metallicity is None:
time = self.select(columns=["time"]).values
time = time * 1e6 # yr
h, bin_edges = np.histogram(time, bins=bins)
h = h / np.diff(bin_edges) / self.mass_per_metallicity["underlying_mass"].sum()
else:
if not any(np.isclose(metallicity, self.solar_metallicities)):
raise ValueError("The metallicity is not present in the population!")
time = self.select(columns=['metallicity', 'time'])
time = time[time['metallicity'] == metallicity].drop(columns=['metallicity']).values
time = time * 1e6 # yr
h, bin_edges = np.histogram(time, bins=bins)
h = (
h
/ np.diff(bin_edges)
/ self.mass_per_metallicity["underlying_mass"][metallicity]
)
ax.step(bin_edges[:-1], h, where="post", color=color)
ax.set_xscale("log")
ax.set_yscale("log")
ax.set_xlabel("Time [yr]")
ax.set_ylabel("Number of events/Msun/yr")
[docs]
def plot_popsyn_over_grid_slice(self, grid_type, met_Zsun, **kwargs):
"""
Plot the transients over the grid slice.
Parameters
----------
grid_type : str
The type of grid to plot.
met_Zsun : float
The metallicity of the Sun.
**kwargs
Additional keyword arguments to pass to the plot_pop.plot_popsyn_over_grid_slice function.
"""
plot_pop.plot_popsyn_over_grid_slice(
pop=self, grid_type=grid_type, met_Zsun=met_Zsun, **kwargs
)
def _write_MODEL_data(self, filename, path_in_file, MODEL):
"""
Write the MODEL data to the HDFStore file.
Parameters
----------
filename : str
The path to the HDFStore file.
path_in_file : str
The path within the HDFStore file to store the MODEL data.
MODEL : dict
The MODEL data to be stored.
"""
with pd.HDFStore(filename, mode="a") as store:
if (MODEL["dlogZ"] is not None) and (not isinstance(MODEL["dlogZ"], float)):
store.put(path_in_file + "MODEL", pd.DataFrame(MODEL))
else:
store.put(path_in_file + "MODEL", pd.DataFrame(MODEL, index=[0]))
if self.verbose:
print("MODEL written to population file!")
[docs]
class Rates(TransientPopulation):
"""Class representing rates of a transient population.
Attributes
----------
SFH_identifier : str
The identifier for the star formation history.
base_path : str
The base path for accessing the rates data.
MODEL : dict
The model data for the star formation history.
weights : pandas.DataFrame
The weights of the transient population.
z_birth : pandas.DataFrame
The redshift of the birth bins.
z_events : pandas.DataFrame
The redshift of the events.
intrinsic_rate_density : pandas.DataFrame
The intrinsic rate density of the transient population.
observable_population_names : list
The names of the observable populations.
edges_metallicity_bins : np.ndarray
The edges of the metallicity bins of the star formation history.
centers_metallicity_bins : np.ndarray
The centers of the metallicity bins of the star formation history.
edges_redshift_bins : np.ndarray
The edges of the redshift bins of the star formation history.
centers_redshift_bins : np.ndarray
The centers of the redshift bins of the star formation history.
"""
def __init__(self, filename, transient_name, SFH_identifier, verbose=False, chunksize=100000):
"""
Initialize the Rates object.
This method initializes a Rates object by linking it to the population file
with the specified transient name and star formation history identifier.
The path in the file is '/transients/{transient_name}/rates/{SFH_identifier}'.
Parameters:
-----------
filename : str
The path to the file containing the transient population data.
transient_name : str
The name of the transient.
SFH_identifier : str
The identifier for the star formation history.
verbose : bool, optional
Whether to print verbose output. Default is False.
chunksize : int, optional
The chunksize to use when reading the population file (Default is 100000).
"""
super().__init__(filename, transient_name, verbose=verbose, chunksize=chunksize)
self.SFH_identifier = SFH_identifier
self.base_path = (
"/transients/" + self.transient_name + "/rates/" + self.SFH_identifier + "/"
)
with pd.HDFStore(self.filename, mode="r") as store:
if (
"/transients/"
+ self.transient_name
+ "/rates/"
+ self.SFH_identifier
+ "/MODEL"
not in store.keys()
):
raise ValueError(
f"{self.SFH_identifier} is not a valid SFH_identifier in {filename}!"
)
# load in the SFH_model
self._read_MODEL_data(self.filename)
def _read_MODEL_data(self, filename):
"""
Reads the MODEL data from the specified file.
Parameters
----------
filename : str
The path to the file containing the MODEL data.
"""
with pd.HDFStore(filename, mode="r") as store:
tmp_df = store[self.base_path + "MODEL"]
if len(tmp_df) > 1:
self.MODEL = tmp_df.iloc[0].to_dict()
self.MODEL["dlogZ"] = [tmp_df["dlogZ"].min(), tmp_df["dlogZ"].max()]
else:
self.MODEL = tmp_df.iloc[0].to_dict()
if self.verbose:
print("MODEL read from population file!")
@property
def weights(self):
"""
Retrieves the weights from the HDFStore.
The rows are indexed by the binary index of the events, while to columns are indexed by the redshift of the birth bins.
Returns
-------
pandas.DataFrame
The weights DataFrame.
"""
with pd.HDFStore(self.filename, mode="r") as store:
return store[self.base_path + "weights"]
@property
def z_birth(self):
"""
Retrieves the 'birth' data from the HDFStore.
The 'birth' DataFrame contains the redshift and age of the Universe of the birth bins with columns 'z' and 't'.
Returns
-------
pandas.DataFrame
The 'birth' DataFrame, which contains the redshift and age of the Universe of the birth bins.
"""
with pd.HDFStore(self.filename, mode="r") as store:
return store[self.base_path + "birth"]
@property
def z_events(self):
"""
Returns the 'z_events' data from the HDFStore.
The 'z_events' data contains the redshifts at which the events occur.
The rows of the returned DataFrame are indexed by the binary index of the events.
The columns of the returned DataFrame are indexed by the redshift of the birth bins.
Returns
-------
pandas.DataFrame
The 'z_events' data from the HDFStore.
"""
with pd.HDFStore(self.filename, mode="r") as store:
return store[self.base_path + "z_events"]
[docs]
def select_rate_slice(self, key, start=None, stop=None):
"""Selects a slice of a rates dataframe at key.
This method allows you to select a slice in rows from the diffferent rates dataframes.
The slice is selected based on the start and stop indices.
The key specifies which rates dataframe to select, and must be one of ['weights', 'z_events', 'birth'].
Parameters
----------
key : str
The key to select the slice from. Must be one of ['weights', 'z_events', 'birth'].
start : int, optional
The starting index of the slice. Defaults to None.
stop : int, optional
The ending index of the slice. Defaults to None.
Returns
-------
pandas.DataFrame
The selected slice of rates.
Raises
------
ValueError
If the key is not one of ['weights', 'z_events', 'birth'].
"""
if key not in ["weights", "z_events", "birth"]:
raise ValueError("key not in [weights, z_events, birth]")
with pd.HDFStore(self.filename, mode="r") as store:
return store.select(self.base_path + key, start=start, stop=stop)
[docs]
def calculate_intrinsic_rate_density(self, channels=False):
"""
Compute the intrinsic rate density over redshift of the transient population.
Besides returning the intrinsic rate density, this method also stores the results in the HDF5 file for further analysis.
This can be accessed using the intrinsic_rate_density attribute of the Rates class.
Parameters
----------
channels : bool, optional
Flag indicating whether to calculate the intrinsic rate density for each channel separately. Default is False.
Returns
-------
pandas.DataFrame
DataFrame containing the intrinsic rate density values.
"""
z_events = self.z_events.to_numpy()
weights = self.weights.to_numpy()
z_horizon = self.edges_redshift_bins
n = len(z_horizon)
if channels:
channels = self.select(columns=["channel"])
unique_channels = np.unique(channels)
else:
unique_channels = []
intrinsic_rate_density = pd.DataFrame(index=z_horizon[:-1], columns=["total"])
normalisation = np.zeros(n - 1)
for i in tqdm(range(1, n), total=n - 1, disable=not self.verbose):
normalisation[i - 1] = get_shell_comoving_volume(
z_horizon[i - 1], z_horizon[i], "infinite"
)
for i in tqdm(range(1, n), total=n - 1, disable=not self.verbose):
mask = (z_events > z_horizon[i - 1]) & (z_events <= z_horizon[i])
for ch in unique_channels:
mask_ch = channels.to_numpy() == ch
intrinsic_rate_density.loc[z_horizon[i - 1], ch] = (
np.nansum(weights[mask & mask_ch]) / normalisation[i - 1]
)
intrinsic_rate_density.loc[z_horizon[i - 1], "total"] = (
np.nansum(weights[mask]) / normalisation[i - 1]
)
with pd.HDFStore(self.filename, mode="a") as store:
store.put(self.base_path + "intrinsic_rate_density", intrinsic_rate_density)
return intrinsic_rate_density
[docs]
def calculate_observable_population(self, observable_func, observable_name):
"""
Calculate an observable population.
The observable population is calculated based on the provided observable function.
It should recalculate your weights based on an observability probability given certain transinet parameters.
This function requires the following input chunks:
1. transient_pop_chunk
2. z_events_chunk
3. weights_chunk
The observable function should output a DataFrame with the same shape as the weights_chunk.
The observable population is stored in the HDF5 file at
the location '/transients/{transient_name}/rates/observable/{observable_name}'.
Parameters
----------
observable_func : function
The observability function that takes the TransientPopulation as input.
observable_name : str
The name of the observable.
Note
----
- If the observable population already exists in the file, it will be overwritten.
"""
with pd.HDFStore(self.filename, mode="a") as store:
# remove the observable population if it already exists
if (
"/transients/"
+ self.transient_name
+ "/rates/observable/"
+ observable_name
in store.keys()
):
if self.verbose:
print("Overwriting observable population!")
del store[
"transients/"
+ self.transient_name
+ "/rates/observable/"
+ observable_name
]
# loop over the transient population and calculate the new weights, while writing to the file
for i in tqdm(
range(0, len(self), self.chunksize),
total=len(self) // self.chunksize,
disable=not self.verbose,
):
transient_pop_chunk = self.select(start=i, stop=i + self.chunksize)
weights_chunk = self.select_rate_slice(
"weights", start=i, stop=i + self.chunksize
)
z_events_chunk = self.select_rate_slice(
"z_events", start=i, stop=i + self.chunksize
)
new_weights = observable_func(
transient_pop_chunk, z_events_chunk, weights_chunk
)
with pd.HDFStore(self.filename, mode="a") as store:
store.append(
"transients/"
+ self.transient_name
+ "/rates/observable/"
+ observable_name,
new_weights,
format="table",
)
[docs]
def observable_population(self, observable_name):
"""Return the observable population based on the provided name.
This method returns the observable population based on the provided name,
which can take a while to load if the population is large.
It loads the observable population from '/transients/{transient_name}/rates/observable/{observable_name}' in the HDF5 file.
Parameters
----------
observable_name : str
The name of the observable population to return.
Returns
-------
pandas.DataFrame
The observable population based on the provided name.
"""
with pd.HDFStore(self.filename, mode="r") as store:
if (
"/transients/"
+ self.transient_name
+ "/rates/observable/"
+ observable_name
not in store.keys()
):
raise ValueError(
f"{observable_name} is not a valid observable population!"
)
else:
return store[
"transients/"
+ self.transient_name
+ "/rates/observable/"
+ observable_name
]
@property
def observable_population_names(self):
"""Return the names of the observable populations in the associated file.
Returns
-------
list
The names of the observable populations.
"""
with pd.HDFStore(self.filename, mode="r") as store:
return [
key.split("/")[-1]
for key in store.keys()
if "/transients/" + self.transient_name + "/rates/observable/" in key
]
@property
def intrinsic_rate_density(self):
"""Return the intrinsic rate density of the transient population at the specified SFH_identifier and transient_name.
The data is read from the HDF5 file at '/transients/{transient_name}/rates/{SFH_identifier}/intrinsic_rate_density'.
Returns
-------
pandas.DataFrame
The intrinsic rate density of the transient population.
"""
with pd.HDFStore(self.filename, mode="r") as store:
if self.base_path + "intrinsic_rate_density" not in store.keys():
raise ValueError(
"First you need to compute the intrinsic rate density!"
)
else:
return store[self.base_path + "intrinsic_rate_density"]
[docs]
def plot_hist_properties(
self, prop, intrinsic=True, observable=None, bins=50, channel=None, **kwargs
):
"""Plot a histogram of a given property available in the transient population.
This method plots a histogram of a given property available in the transient population.
The property can be intrinsic or observable, and the histogram can be plotted for a specific channel if provided.
Parameters
----------
prop : str
The property to plot the histogram for.
intrinsic : bool, optional
If True, plot the intrinsic property. Default is True.
observable : str, optional
The observable population name to plot the histogram for. Default is None.
bins : int, optional
The number of bins to use for the histogram. Default is 50.
channel : str, optional
The channel to plot the histogram for. Default is None.
A channel column must be present in the transient population.
**kwargs
Additional keyword arguments to pass to the plot
Raises
------
ValueError
If the specified property is not a valid property in the transient population.
If the specified observable is not a valid observable population.
If the specified channel is not present in the transient population.
"""
if prop not in self.columns:
raise ValueError(
f"{prop} is not a valid property in the transient population!"
)
# get the property and its associated weights in the population.
df = self.select(columns=[prop])
kwargs['xlabel'] = prop
df["property"] = df[prop]
del df[prop]
if intrinsic:
df["intrinsic"] = np.sum(self.weights, axis=1)
if observable is not None:
with pd.HDFStore(self.filename, mode="r") as store:
if (
"/transients/"
+ self.transient_name
+ "/rates/observable/"
+ observable
not in store.keys()
):
raise ValueError(
f"{observable} is not a valid observable population!"
)
else:
df["observable"] = np.sum(
store[
"transients/"
+ self.transient_name
+ "/rates/observable/"
+ observable
],
axis=1,
)
if channel is not None:
df["channel"] = self.select(columns=["channel"])
df = df[df["channel"] == channel]
if len(df) == 0:
raise ValueError(
f"{channel} is not present in the transient population!"
)
plot_pop.plot_hist_properties(df, bins=bins, **kwargs)
else:
# plot the histogram using plot_pop.plot_hist_properties
plot_pop.plot_hist_properties(df, bins=bins, **kwargs)
[docs]
def plot_intrinsic_rate(self, channels=False, **kwargs):
"""Plot the intrinsic rate density of the transient population."""
plot_pop.plot_rate_density(self.intrinsic_rate_density, channels=channels, **kwargs)
@property
def edges_metallicity_bins(self):
"""Return the edges of the metallicity bins.
Returns
-------
array float
Returns the edges of all metallicity bins. We assume metallicities
were binned in log-space.
"""
met_val = np.log10(self.centers_metallicity_bins)
bin_met = np.zeros(len(met_val) + 1)
# if more than one metallicty bin
if len(met_val) > 1:
bin_met[0] = met_val[0] - (met_val[1] - met_val[0]) / 2.0
bin_met[-1] = met_val[-1] + (met_val[-1] - met_val[-2]) / 2.0
bin_met[1:-1] = met_val[:-1] + (met_val[1:] - met_val[:-1]) / 2.0
# one metallicty bin
elif len(met_val) == 1:
if self.MODEL["dlogZ"] is None:
bin_met[0] = -9
bin_met[-1] = 0
elif isinstance(self.MODEL["dlogZ"], float):
bin_met[0] = met_val[0] - self.MODEL["dlogZ"] / 2.0
bin_met[-1] = met_val[0] + self.MODEL["dlogZ"] / 2.0
elif isinstance(self.MODEL["dlogZ"], list) or isinstance(
self.MODEL["dlogZ"], np.array
):
bin_met[0] = self.MODEL["dlogZ"][0]
bin_met[-1] = self.MODEL["dlogZ"][1]
return 10**bin_met
@property
def centers_metallicity_bins(self):
"""Return the centers of the metallicity bins.
Returns
-------
array float
Returns sampled metallicities of the population. This corresponds
to the center of each metallicity bin.
"""
return np.sort(self.metallicities)
@property
def edges_redshift_bins(self):
"""Compute redshift bin edges.
Returns
-------
array floats
We devide the cosmic time history of the Universe in equally spaced
bins of cosmic time of self.MODEL['delta_t'] (100 Myr default) an compute the
redshift corresponding to edges of these bins.
"""
return get_redshift_bin_edges(self.MODEL["delta_t"])
@property
def centers_redshift_bins(self):
"""Compute redshift bin centers.
Returns
-------
array floats
We devide the cosmic time history of the Universe in equally spaced
bins of cosmic time of self.MODEL['delta_t'] (100 Myr default) an compute the
redshift corresponding to center of these bins.
"""
return get_redshift_bin_centers(self.MODEL["delta_t"])