Source code for posydon.popsyn.binarypopulation

"""Create, evolve and save a binary star population.

Large populations are RAM limited when holding an arbitrary
number of BinaryStar instances. Therefore, by default the BinaryPopulation
will generate binaries, evolve, and try to save them to disk one at a time.

Create a BinaryPopulation instance from an inifile:
I. CREATING A POPULATION
------------------------
a) One-liner for creating a BinaryPopulation from an inifile:

> BinaryPopulation.from_ini('<PATH_TO_POSYDON>' \
            '/posydon/popsyn/population_params_default.ini')
"""


__authors__ = [
    "Kyle Akira Rocha <kylerocha2024@u.northwestern.edu>",
    "Jeffrey Andrews <jeffrey.andrews@northwestern.edu>",
    "Konstantinos Kovlakas <Konstantinos.Kovlakas@unige.ch>",
    "Devina Misra <devina.misra@unige.ch>",
    "Simone Bavera <Simone.Bavera@unige.ch>",
    "Max Briel <max.briel@unige.ch>",
    "Matthias Kruckow <Matthias.Kruckow@unige.ch>",
]


__credits__ = ["Nam Tran <tranhn03@gmail.com>"]


import pandas as pd
import numpy as np
import traceback
import atexit
import os
from tqdm import tqdm
import psutil
import sys

if 'posydon.binary_evol.binarystar' not in sys.modules.keys():
    from posydon.binary_evol.binarystar import BinaryStar
from posydon.binary_evol.singlestar import (SingleStar,properties_massless_remnant)
from posydon.binary_evol.simulationproperties import SimulationProperties
from posydon.popsyn.star_formation_history import get_formation_times

from posydon.popsyn.independent_sample import (generate_independent_samples,
                                               binary_fraction_value)
from posydon.popsyn.sample_from_file import (get_samples_from_file,
                                             get_kick_samples_from_file)
from posydon.utils.common_functions import (orbital_period_from_separation,
                                            orbital_separation_from_period)
from posydon.popsyn.normalized_pop_mass import initial_total_underlying_mass

from posydon.popsyn.defaults import default_kwargs
from posydon.popsyn.io import binarypop_kwargs_from_ini
from posydon.utils.constants import Zsun
from posydon.utils.posydonerror import POSYDONError,initial_condition_message
from posydon.utils.posydonwarning import (Pwarn, Catch_POSYDON_Warnings)
from posydon.utils.common_functions import set_binary_to_failed

saved_ini_parameters = ['metallicity',
                        "number_of_binaries",
                   'binary_fraction_scheme',
                   'binary_fraction_const',
                   'star_formation',
                   'max_simulation_time',
                   'primary_mass_scheme',
                   'primary_mass_min',                              
                   'primary_mass_max',                                  
                   'secondary_mass_scheme',
                   'secondary_mass_min',
                   'secondary_mass_max',
                   'orbital_scheme',
                   'orbital_period_scheme',
                   'orbital_period_min',
                   'orbital_period_max',
                   'eccentricity_scheme']


HISTORY_MIN_ITEMSIZE = {'state': 30, 'event': 25, 'step_names': 21,
                        'S1_state': 31, 'S2_state': 31,
                        'mass_transfer_case': 16,
                        'S1_SN_type': 5, 'S2_SN_type': 5}
ONELINE_MIN_ITEMSIZE = {'state_i': 30, 'state_f': 30,
                        'event_i': 10, 'event_f': 31,
                        'step_names_i': 21, 'step_names_f': 21,
                        'S1_state_i': 31, 'S1_state_f': 31,
                        'S2_state_i': 31, 'S2_state_f': 31,
                        'mass_transfer_case_i': 7, 'mass_transfer_case_f': 20,
                        'S1_SN_type': 5, 'S2_SN_type': 5,
                        'interp_class_HMS_HMS' : 20, 'interp_class_CO_HeMS' : 15,
                        'interp_class_CO_HMS_RLO' : 15, 'interp_class_CO_HeMS_RLO' : 15,
                        'mt_history_HMS_HMS' : 40, 'mt_history_CO_HeMS' : 40,
                        'mt_history_CO_HMS_RLO' : 40, 'mt_history_CO_HeMS_RLO' : 40,
                        'culmulative_mt_case_HMS_HMS': 40, 'culmulative_mt_case_CO_HeMS': 40,
                        'culmulative_mt_case_CO_HMS_RLO': 40, 'culmulative_mt_case_CO_HeMS_RLO': 40,
                        }

# BinaryPopulation will enforce a constant metallicity accross all steps that
# load stellar or binary models by checked this list of steps.
STEP_NAMES_LOADING_GRIDS = [
    'step_HMS_HMS', 'step_CO_HeMS', 'step_CO_HMS_RLO', 'step_CO_HeMS_RLO', 'step_detached','step_isolated','step_disrupted','step_initially_single', 'step_merged'
]


[docs] class BinaryPopulation: """Handle a binary star population.""" def __init__(self, **kwargs): """Initialize the binary population object. Parameters ---------- number_of_binaries : int Size of the population population_properties : SimulationProperties Instance of simulationproperties holding steps. """ # Set population kwargs - first set defaults, then add updates self.kwargs = default_kwargs.copy() for key, arg in kwargs.items(): self.kwargs[key] = arg self.number_of_binaries = self.kwargs.get('number_of_binaries') self.population_properties = self.kwargs.get('population_properties', SimulationProperties()) atexit.register(lambda: BinaryPopulation.close(self)) self.metallicity = self.kwargs.get('metallicity', 1) # grab all metallicities in population or use single metallicity self.metallicities = self.kwargs.get('metallicities', [self.metallicity]) # force the metallicity on to the simulation properties for key in STEP_NAMES_LOADING_GRIDS: if key in self.population_properties.kwargs: self.population_properties.kwargs[key][1].update({'metallicity': self.metallicity}) self.population_properties.max_simulation_time = self.kwargs.get( 'max_simulation_time') # years self.entropy = self.kwargs.get('entropy', None) seq = np.random.SeedSequence(entropy=self.entropy) self.comm = self.kwargs.pop('comm', None) self.JOB_ID = self.kwargs.pop('JOB_ID', None) if self.comm is not None and self.JOB_ID is not None: raise ValueError('MPI and Job array runs are not compatible.') # local MPI run elif self.comm is not None: # To guarantee reproducibility, we need to set the seed sequence # to be the same across all processes. if self.entropy is None: raise ValueError('A local MPI run requires an entropy value to be set.') self.rank = self.comm.Get_rank() self.size = self.comm.Get_size() # Make seed sequence unique per metallicity met_shift = self.metallicities.index(self.metallicity) seq = np.random.SeedSequence(entropy=self.entropy + met_shift) seed_seq = [i for i in seq.spawn(self.size)][self.rank] # Job array runs elif self.JOB_ID is not None: self.rank = self.kwargs.pop('RANK', None) self.size = self.kwargs.pop('size', None) # Make sure each of the processes has the same entropy # But unique per metallicity if self.entropy is None: met_shift = self.metallicities.index(self.metallicity) seq = np.random.SeedSequence(entropy=self.JOB_ID + met_shift) # Split the seed sequence between processes for uniqueness seed_seq = [i for i in seq.spawn(self.size)][self.rank] else: seed_seq = seq self.RNG = np.random.default_rng(seed=seed_seq) self.kwargs['RNG'] = self.RNG self.entropy = self.RNG.bit_generator._seed_seq.entropy self.kwargs['entropy'] = self.entropy self.manager = PopulationManager(**self.kwargs) # use manager methods self.to_df = self.manager.to_df self.to_oneline_df = self.manager.to_oneline_df self.find_failed = self.manager.find_failed
[docs] @classmethod def from_ini(cls, path, verbose=False): """Create a BinaryPopulation instance from an inifile. Parameters ---------- path : str Path to an inifile to load in. verbose : bool Print useful info. Returns ------- BinaryPopulation A new instance of a BinaryPopulation. """ kwargs = binarypop_kwargs_from_ini(path, verbose=verbose) return cls(**kwargs)
[docs] def evolve(self, **kwargs): """Evolve a binary population. Parameters ---------- indices : list, optional Custom binary indices to use. Default is range(number_of_binaries). If running with MPI, indices are split between processes if given. breakdown_to_df : bool, True Breakdown a binary after evolution, converting to dataframe and removing the binary instance from memory. tqdm : bool, False Show tqdm progress bar during evolution. Returns ------- None """ # combine kw defined at init and any passed here kw = {**self.kwargs, **kwargs} tqdm_bool = kw.get('tqdm', False) breakdown_to_df_bool = kw.get('breakdown_to_df', True) from_hdf_bool = kw.get('from_hdf', False) if self.JOB_ID is None and self.comm is None: # do regular evolution indices = kw.get('indices', list(range(self.number_of_binaries))) params = {'indices':indices, 'tqdm':tqdm_bool, 'breakdown_to_df':breakdown_to_df_bool, 'from_hdf':from_hdf_bool} self.kwargs.update(params) self._safe_evolve(**self.kwargs) else: # do local MPI or cluster job array evolution indices = kw.get('indices', list(range(self.number_of_binaries))) indices_split = np.array_split(indices, self.size) batch_indices = indices_split[self.rank] mpi_tqdm_bool = True if (tqdm_bool and self.rank == 0) else False params = {'indices':batch_indices, 'tqdm':mpi_tqdm_bool, 'breakdown_to_df':breakdown_to_df_bool, 'from_hdf':from_hdf_bool} self.kwargs.update(params) self._safe_evolve(**self.kwargs)
def _safe_evolve(self, **kwargs): """Evolve binaries in a population, catching warnings/exceptions.""" if not self.population_properties.steps_loaded: # Enforce the same metallicity for all grid steps for step_name, tup in self.population_properties.kwargs.items(): if step_name in STEP_NAMES_LOADING_GRIDS: step_function, step_kwargs = tup # unpack params step_kwargs['metallicity'] = self.kwargs.get('metallicity', 1) # update the step kwargs, override metallicity modified_tup = (step_function, step_kwargs) self.population_properties.kwargs[step_name] = modified_tup self.population_properties.load_steps() indices = kwargs.get('indices', list(range(self.number_of_binaries))) indices_for_iter = (tqdm(indices) if kwargs.get('tqdm', False) else indices) breakdown_to_df = kwargs.get('breakdown_to_df', True) optimize_ram = kwargs.get("optimize_ram", True) self.kwargs['optimize_ram'] = optimize_ram ram_per_cpu = kwargs.get("ram_per_cpu", None) if optimize_ram: dump_rate = kwargs.get( "dump_rate", int(len(indices_for_iter) / 10)) dump_rate = np.max([dump_rate, 1]) # Set temporary directory for population batches temp_directory = os.path.abspath( kwargs.get("temp_directory", "batches")) self.kwargs['temp_directory'] = temp_directory kw = self.kwargs.copy() # Create temporary directory if it doesn't exist # Built to handle MPI if self.JOB_ID is None and self.comm is None: if not os.path.exists(temp_directory): os.makedirs(temp_directory) else: # Create a directory for parallel runs if not os.path.exists(temp_directory): try: os.makedirs(temp_directory) except FileExistsError: pass filenames = [] for j, index in enumerate(indices_for_iter): if kwargs.get('from_hdf', False): #generator binary = self.manager.from_hdf(index, restore=True).pop() else: binary = self.manager.generate(index=index, **self.kwargs) binary.properties = self.population_properties # catch POSYDON warnings: record them to be possibly printed at # then and of the "with" context; use a new registry for this # context instead of the global one with Catch_POSYDON_Warnings(record=True, own_registry=True) as cpw: try: binary.evolve() except POSYDONError as posydon_error: set_binary_to_failed(binary) binary.traceback = traceback.format_exc() if self.kwargs.get("error_checking_verbose", False): posydon_error.add_note(initial_condition_message(binary)) traceback.print_exception(posydon_error) except Exception as e: set_binary_to_failed(binary) binary.traceback = traceback.format_exc() e.add_note(initial_condition_message(binary)) traceback.print_exception(e) # record if there were warnings caught during the binary # evolution, this is needed to update the WARNINGS column in # the oneline dataframe; this will only be updated if POSYDON # warnings occur, NOT general python warnings if cpw.got_called(): binary.warnings = True # if the user wants to print all POSYDON warnings to stderr # (warnings_verbose=True), no action is needed, because it will # be printed at the end of the "with" context; to avoid the # printing clear the warnings cache before the end of the # context if not self.kwargs.get("warnings_verbose", False): cpw.reset_cache() if breakdown_to_df: self.manager.breakdown_to_df(binary, **self.kwargs) # Save data at some frequency if (optimize_ram and j % dump_rate == 0 and j != 0 and ram_per_cpu is None): # Create filenames for each batch if(self.JOB_ID is None and self.comm is None): path = os.path.join(temp_directory, f"{j}_evolution.batch") else: path = os.path.join(temp_directory, f"{j}_evolution.batch.{self.rank}") # save binaries to disk for RAM optimization self.manager.save(path, mode="w", **kw) filenames.append(path) if(breakdown_to_df): self.manager.clear_dfs() else: self.manager.remove(self.manager.binaries.copy()) # Check to see if used memory is greater than 99% of allowed memory # rss gives memory usage in bytes, so divide by 2^30 to get GBs elif (optimize_ram and ram_per_cpu is not None and psutil.Process().memory_info().rss / (1024**3) >= 0.9 * ram_per_cpu): if(self.JOB_ID is None and self.comm is None): path = os.path.join(temp_directory, f"{j}_evolution.batch") else: path = os.path.join(temp_directory, f"{j}_evolution.batch.{self.rank}") # save binaries to disk for RAM optimization self.manager.save(path, mode="w", **kw) filenames.append(path) if(breakdown_to_df): self.manager.clear_dfs() else: self.manager.remove(self.manager.binaries.copy()) # handling case if dump rate is not multiple of population size if ((len(self.manager.binaries) != 0 or len(self.manager.history_dfs) != 0) and optimize_ram): if(self.JOB_ID is None and self.comm is None): path = os.path.join(temp_directory, "leftover_evolution.batch") else: path = os.path.join(temp_directory, f"leftover_evolution.batch.{self.rank}") # save binaries to disk for RAM optimization self.manager.save(path, mode="w", **kw) filenames.append(path) if(breakdown_to_df): self.manager.clear_dfs() else: self.manager.remove(self.manager.binaries.copy()) if optimize_ram: # combining files if self.JOB_ID is None and self.comm is None: self.combine_saved_files(os.path.join(temp_directory, "evolution.combined"), filenames, mode = "w") else: self.combine_saved_files( os.path.join(temp_directory, f"evolution.combined.{self.rank}"), filenames, mode = "w") else: if self.JOB_ID is None and self.comm is None: self.manager.save(os.path.join(temp_directory, "evolution.combined"), mode='w', **kwargs) else: self.manager.save( os.path.join(temp_directory, f"evolution.combined.{self.rank}"), mode='w', **kwargs)
[docs] def save(self, save_path, **kwargs): """Save BinaryPopulation to hdf file.""" optimize_ram = self.kwargs['optimize_ram'] temp_directory = self.kwargs['temp_directory'] mode = self.kwargs.get('mode', 'a') if self.JOB_ID is None and self.comm is None: if optimize_ram: os.rename(os.path.join(temp_directory, "evolution.combined"), save_path) else: self.manager.save(save_path, mode=mode, **kwargs) else: absolute_filepath = os.path.abspath(save_path) dir_name = os.path.dirname(absolute_filepath) file_name = os.path.basename(absolute_filepath) # get the temporary files in the directory tmp_files = [os.path.join(temp_directory, f) \ for f in os.listdir(temp_directory) \ if os.path.isfile(os.path.join(temp_directory, f))] if os.path.isdir(absolute_filepath): file_name = 'backup_save_pop_data.h5' file_path = os.path.join(dir_name, file_name) Pwarn('The provided path is a directory - saving ' 'to {0} instead.'.format(file_path), "ReplaceValueWarning") self.combine_saved_files(absolute_filepath, tmp_files, **kwargs)
[docs] def make_temp_fname(self): """Get a valid filename for the temporary file.""" temp_directory = self.kwargs['temp_directory'] return os.path.join(temp_directory, f"evolution.combined.{self.rank}")
# return os.path.join(dir_name, '.tmp{}_'.format(rank) + file_name)
[docs] def combine_saved_files(self, absolute_filepath, file_names, **kwargs): """Combine various temporary files in a given folder. Parameters ---------- absolute_filepath : str Absolute path to the file to be saved. file_names : list List of absolute paths to the temporary files. """ dir_name = os.path.dirname(absolute_filepath) history_cols = pd.read_hdf(file_names[0], key='history').columns oneline_cols = pd.read_hdf(file_names[0], key='oneline').columns #history_tmp = pd.read_hdf(file_names[0], key='history') history_min_itemsize = {key: val for key, val in HISTORY_MIN_ITEMSIZE.items() if key in history_cols} oneline_min_itemsize = {key: val for key, val in ONELINE_MIN_ITEMSIZE.items() if key in oneline_cols} mode = kwargs.get('mode', 'a') complib = kwargs.get('complib', 'zlib') complevel = kwargs.get('complevel', 9) with pd.HDFStore(absolute_filepath, mode=mode, complevel=complevel, complib=complib) as store: simulated_mass = 0.0 simulated_mass_single = 0.0 simulated_mass_binaries = 0.0 number_of_systems=0 for f in file_names: # strings itemsize set by first append max value, # which may not be largest string try: store.append('history', pd.read_hdf(f, key='history'), min_itemsize=history_min_itemsize) oneline = pd.read_hdf(f, key='oneline') # split weight between single and binary stars mask = oneline["state_i"] == "initially_single_star" filtered_data_single = oneline[mask] filtered_data_binaries = oneline[~mask] simulated_mass_binaries += np.nansum(filtered_data_binaries[["S1_mass_i", "S2_mass_i"]].to_numpy()) simulated_mass_single += np.nansum(filtered_data_single[["S1_mass_i"]].to_numpy()) simulated_mass = simulated_mass_single + simulated_mass_binaries if 'metallicity' not in oneline.columns: met_df = pd.DataFrame(data={'metallicity': [self.metallicity] * len(oneline)}, index=oneline.index) oneline = pd.concat([oneline, met_df], axis=1) number_of_systems += len(oneline) store.append('oneline', oneline, min_itemsize=oneline_min_itemsize) except Exception: print(traceback.format_exc(), flush=True) # store population metadata tmp_df = pd.DataFrame() for c in saved_ini_parameters: tmp_df[c] = [self.kwargs[c]] store.append('ini_parameters', tmp_df) tmp_df = pd.DataFrame( index=[self.metallicity], data={'simulated_mass': simulated_mass, 'simulated_mass_single': simulated_mass_single, 'simulated_mass_binaries': simulated_mass_binaries, 'number_of_systems': number_of_systems}) tmp_df.index.name = 'metallicity' store.append('mass_per_metallicity', tmp_df) # only remove the files once they've been written to the new file for f in file_names: os.remove(f)
[docs] def close(self): """Close loaded h5 files from SimulationProperties.""" self.population_properties.close()
def __getstate__(self): """Prepare the BinaryPopulation to be 'pickled'.""" # In order to be generally picklable, we need to discard the # communication object before picking d = self.__dict__ d['comm'] = None prop = d['population_properties'] if prop.steps_loaded: prop.close() return d def __iter__(self): """Iterate the binaries.""" return iter(self.manager) def __getitem__(self, key): """Get the k-th binary.""" return self.manager[key] def __len__(self): """Get the number of binaries in the population.""" return len(self.manager) def __repr__(self): """Report key properties of the object.""" s = "<{}.{} at {}>\n".format( self.__class__.__module__, self.__class__.__name__, hex(id(self)) ) for key, arg in self.kwargs.items(): s += "{}: {}\n".format(key, arg) return s
[docs] class PopulationManager: """Manage a population of binaries.""" def __init__(self, file_name=None, **kwargs): """Initialize a PopulationManager instance.""" self.kwargs = kwargs.copy() self.binaries = [] self.indices = [] self.history_dfs = [] self.oneline_dfs = [] if (('read_samples_from_file' in self.kwargs) and (self.kwargs['read_samples_from_file'] != '')): self.binary_generator = BinaryGenerator(\ sampler=get_samples_from_file, **kwargs) else: self.binary_generator = BinaryGenerator(\ sampler=generate_independent_samples,\ **kwargs) self.entropy = self.binary_generator.entropy if file_name: self.store_file = file_name self.store_file = file_name
[docs] def append(self, binary): """Add a binary instance internaly.""" if isinstance(binary, (list, np.ndarray)): self.indices.append([b.index for b in binary]) self.binaries.extend(list(binary)) elif isinstance(binary, BinaryStar): self.indices.append(binary.index) self.binaries.append(binary) else: raise ValueError('Must be BinaryStar or list of BinaryStars')
[docs] def remove(self, binary): """Remove a binary instance.""" if isinstance(binary, (list, np.ndarray)): for b in binary: self.binaries.remove(b) self.indices.remove(b.index) elif isinstance(binary, BinaryStar): self.binaries.remove(binary) self.indices.remove(binary.index) else: raise ValueError('Must be BinaryStar or list of BinaryStars')
[docs] def clear_dfs(self): """Remove all dfs.""" self.history_dfs = [] self.oneline_dfs = []
[docs] def breakdown_to_df(self, binary, **kwargs): """Breakdown to a pandas DataFrame. Breakdown a binary into more convenient data type, store it, and remove the BinaryStar instance from self. """ try: history = binary.to_df(**kwargs) self.history_dfs.append(history) oneline = binary.to_oneline_df(**kwargs) self.oneline_dfs.append(oneline) self.remove(binary) except Exception as err: print("Error during breakdown of {0}:\n{1}". format(str(binary), err))
[docs] def to_df(self, selection_function=None, **kwargs): """Convert all binaries to dataframe.""" if len(self.binaries) == 0 and len(self.history_dfs) == 0: return is_callable = callable(selection_function) holder = [] if len(self.binaries) > 0: for binary in self.binaries: if not is_callable or (is_callable and selection_function(binary)): holder.append(binary.to_df(**kwargs)) elif len(self.history_dfs) > 0: holder.extend(self.history_dfs) if len(holder) > 0: return pd.concat(holder, axis=0, ignore_index=False)
[docs] def to_oneline_df(self, selection_function=None, **kwargs): """Convert all binaries to oneline dataframe.""" if len(self.binaries) == 0 and len(self.oneline_dfs) == 0: return is_callable = callable(selection_function) holder = [] if len(self.binaries) > 0: for binary in self.binaries: if not is_callable or (is_callable and selection_function(binary)): holder.append(binary.to_oneline_df(**kwargs)) elif len(self.oneline_dfs) > 0: holder.extend(self.oneline_dfs) if len(holder) > 0: return pd.concat(holder, axis=0, ignore_index=False)
[docs] def find_failed(self,): """Find any failed binaries in the population.""" if len(self) > 0: return [b for b in self if b.event == 'FAILED'] elif len(self.history_dfs) > 0: failed_dfs = [f for f in self.history_dfs if f['event'].iloc[-1] == 'FAILED'] if bool(failed_dfs): return pd.concat(failed_dfs, axis=0, ignore_index=False) else: return failed_dfs
[docs] def generate(self, **kwargs): """Generate a binary by drawing from the binary_generator. This can be a callable or a generator. """ binary = self.binary_generator.draw_initial_binary(**kwargs) self.append(binary) return binary
[docs] def from_hdf(self, indices=None, where=None, restore=False): """Load a BinaryStar instance from an hdf file of a saved population. Parameters ---------- indices : int, list Selects the binaries to load. where : str Query performed on disk to select history and oneline DataFrames. restore : bool If true, restore binaries back to initial conditions. """ # check if numpy64, since where does not support this if isinstance(indices, np.int64): indices = int(indices) if where is None: if indices is None: raise ValueError("You must specify either the binary indices or a query string " "to read from file.") else: query_str = 'index==indices' else: query_str = str(where) with pd.HDFStore(self.store_file, mode='r') as store: hist = store.select(key='history', where=query_str) oneline = store.select(key='oneline', where=query_str) binary_holder = [] for i in np.unique(hist.index): binary = BinaryStar.from_df( hist.loc[[i]], extra_columns=self.kwargs.get('extra_columns', [])) # if the binary has failed if bool(oneline.loc[[i]]['FAILED'].values[-1]): setattr(binary, 'event', 'FAILED') bin_scalars = self.kwargs.get('scalar_names', []) s1_scalars = self.kwargs.get( 'S1_kwargs', {}).get('scalar_names', []) s2_scalars = self.kwargs.get( 'S2_kwargs', {}).get('scalar_names', []) if any([bool(bin_scalars), bool(s1_scalars), bool(s2_scalars)]): oline_bin = BinaryStar.from_oneline_df( oneline.loc[[i]], extra_columns=self.kwargs.get('extra_columns', []) ) for name in bin_scalars: val = getattr(oline_bin, name) setattr(binary, name, val) for name in s1_scalars: val = getattr(oline_bin.star_1, name) setattr(binary.star_1, name, val) for name in s2_scalars: val = getattr(oline_bin.star_2, name) setattr(binary.star_2, name, val) del oline_bin binary_holder.append(binary) self.append(binary) if restore: [b.restore() for b in binary_holder] return binary_holder
[docs] def save(self, fname, **kwargs): """Save binaries to an hdf file using pandas HDFStore. Any object dtype columns not parsed by infer_objects() is converted to a string. Parameters ---------- fname : str Name of hdf file saved. mode : {'a', 'w', 'r', 'r+'}, default 'a' See pandas HDFStore docs complib : {'zlib', 'lzo', 'bzip2', 'blosc'}, default 'zlib' Compression library. See HDFStore docs complevel : int, 0-9, default 9 Level of compression. See HDFStore docs kwargs : dict Arguments for `BinaryStar` methods `to_df` and `to_oneline_df`. Returns ------- None """ mode = kwargs.get('mode', 'a') complib = kwargs.get('complib', 'zlib') complevel = kwargs.get('complevel', 9) with pd.HDFStore(fname, mode=mode, complevel=complevel, complib=complib) as store: history_df = self.to_df(**kwargs) store.append('history', history_df) online_df = self.to_oneline_df(**kwargs) store.append('oneline', online_df) return
def __getitem__(self, key): """Return the key-th binary.""" return self.binaries[key] def __iter__(self): """Iterate the binaries in the population.""" return iter(self.binaries) def __len__(self): """Return the number of binaries in the population.""" return len(self.binaries) def __bool__(self): """Evaluate as True if binaries have been appended.""" return len(self) > 0
[docs] class BinaryGenerator: """Generate binaries to be included in a BinaryPopulation.""" def __init__(self, sampler=generate_independent_samples, RNG=None, **kwargs): """Initialize the BinaryGenerator instance.""" self._num_gen = 0 if RNG is None: self.RNG = np.random.default_rng() self.entropy = self.RNG.bit_generator._seed_seq.entropy else: assert isinstance(RNG, np.random.Generator) self.RNG = RNG self.entropy = self.RNG.bit_generator._seed_seq.entropy self.kwargs = kwargs.copy() self.sampler = sampler self.star_formation = kwargs.get('star_formation', 'burst') self.binary_fraction_generator = binary_fraction_value
[docs] def reset_rng(self): """Reset the RNG with the stored entropy.""" self._num_gen = 0 self.RNG = self.get_original_rng()
[docs] def get_original_rng(self): """Return the random-number generator.""" seq = np.random.SeedSequence(entropy=self.entropy) RNG = np.random.default_rng(seed=seq) return RNG
[docs] def get_binary_by_iter(self, n=1, **kwargs): """Get the nth binary as if n calls were made to draw_intial_binary.""" RNG = self.get_original_rng() original_num_gen = self._num_gen if n != 0: # draw n-1 samples with new original RNG self.sampler( number_of_binaries=int(n-1), RNG=RNG, **kwargs) binary = self.draw_initial_binary(index=n, RNG=RNG, **kwargs) self._num_gen = original_num_gen return binary
[docs] def draw_initial_samples(self, orbital_scheme='separation', **kwargs): """Generate all random varibles.""" if not ('RNG' in kwargs.keys()): kwargs['RNG'] = self.RNG # a, e, M_1, M_2, P sampler_output = self.sampler(orbital_scheme, **kwargs) if orbital_scheme == 'separation': separation, eccentricity, m1, m2 = sampler_output orbital_period = orbital_period_from_separation(separation, m1, m2) elif orbital_scheme == 'period': orbital_period, eccentricity, m1, m2 = sampler_output separation = orbital_separation_from_period(orbital_period, m1, m2) else: raise ValueError("Allowed orbital schemes: separation or period.") # formation times N_binaries = len(orbital_period) formation_times = get_formation_times(N_binaries, **kwargs) #Get the binary_fraction binary_fraction = self.binary_fraction_generator(m1=m1, **kwargs) # indices indices = np.arange(self._num_gen, self._num_gen+N_binaries, 1) # kicks if (('read_samples_from_file' in kwargs) and (kwargs['read_samples_from_file'] != '')): kick1, kick2 = get_kick_samples_from_file(**kwargs) else: if 'number_of_binaries' in kwargs: number_of_binaries = kwargs['number_of_binaries'] else: number_of_binaries = 1 kick1 = np.array(number_of_binaries*[[None, None, None, None]]) kick2 = np.array(number_of_binaries*[[None, None, None, None]]) # output output_dict = { 'binary_index': indices, 'binary_fraction': binary_fraction, 'time': formation_times, 'separation': separation, 'eccentricity': eccentricity, 'orbital_period': orbital_period, 'S1_mass': m1, 'S2_mass': m2, 'S1_natal_kick_array': kick1, 'S2_natal_kick_array': kick2, } self._num_gen += N_binaries return output_dict
[docs] def draw_initial_binary(self, **kwargs): """Return a binary for evolution in a population. Parameters ---------- index : int Sets binary index. Defaults to number of generated binaries. Returns ------- binary : BinaryStar """ sampler_kwargs = kwargs.copy() sampler_kwargs['number_of_binaries'] = 1 sampler_kwargs['RNG'] = kwargs.get('RNG', self.RNG) # Randomly generated variables output = self.draw_initial_samples(**sampler_kwargs) default_index = output['binary_index'].item() binary_fraction = output['binary_fraction'] formation_time = output['time'].item() m1 = output['S1_mass'].item() Z_div_Zsun = kwargs.get('metallicity', 1.) zams_table = {2.: 2.915e-01, 1.: 2.703e-01, 0.45: 2.586e-01, 0.2: 2.533e-01, 0.1: 2.511e-01, 0.01: 2.492e-01, 0.001: 2.49e-01, 0.0001: 2.49e-01} Z = Z_div_Zsun*Zsun if Z_div_Zsun in zams_table.keys(): Y = zams_table[Z_div_Zsun] else: raise KeyError(f"{Z_div_Zsun} is a not defined metallicity") X = 1. - Z - Y kick1 = output['S1_natal_kick_array'][0] if self.RNG.uniform() < binary_fraction: separation = output['separation'].item() orbital_period = output['orbital_period'].item() eccentricity = output['eccentricity'].item() m2 = output['S2_mass'].item() kick2 = output['S2_natal_kick_array'][0] binary_params = dict( index=kwargs.get('index', default_index), time=formation_time, state="detached", event="ZAMS", separation=separation, orbital_period=orbital_period, eccentricity=eccentricity, ) star1_params = dict( mass=m1, state="H-rich_Core_H_burning", metallicity=Z, center_h1=X, center_he4=Y, natal_kick_array=kick1, ) star2_params = dict( mass=m2, state="H-rich_Core_H_burning", metallicity=Z, center_h1=X, center_he4=Y, natal_kick_array=kick2, ) #If binary_fraction not default a initially single star binary is created. else: separation = np.nan orbital_period = np.nan eccentricity = np.nan binary_params = dict( index=kwargs.get('index', default_index), time=formation_time, state="initially_single_star", event="ZAMS", separation=separation, orbital_period=orbital_period, eccentricity=eccentricity, ) star1_params = dict( mass=m1, state="H-rich_Core_H_burning", metallicity=Z, center_h1=X, center_he4=Y, natal_kick_array=kick1, ) star2_params = properties_massless_remnant() binary = BinaryStar(**binary_params, star_1=SingleStar(**star1_params), star_2=SingleStar(**star2_params)) return binary
def __repr__(self,): """Report key properties of the BinaryGenerator instance.""" s = "<{}.{} at {}>\n".format( self.__class__.__module__, self.__class__.__name__, hex(id(self)) ) s += 'RNG: {}\n'.format(repr(self.RNG)) s += 'entropy: {}\n'.format(self.entropy) for key, arg in self.kwargs.items(): s += "{}: {}\n".format(key, arg) return s