Source code for posydon.utils.gridutils

"""Various utility functions used for the manipulating grid data."""

import os
import gzip

import numpy as np
import pandas as pd

from posydon.utils.constants import clight, Msun, Rsun
from posydon.utils.constants import standard_cgrav as cgrav
from posydon.utils.constants import secyer as secyear
from posydon.utils.limits_thresholds import LG_MTRANSFER_RATE_THRESHOLD
from posydon.utils.posydonwarning import Pwarn


__authors__ = [
    "Konstantinos Kovlakas <Konstantinos.Kovlakas@unige.ch>",
    "Scott Coughlin <scottcoughlin2014@u.northwestern.edu>",
    "Simone Bavera <Simone.Bavera@unige.ch>",
    "Emmanouil Zapartas <ezapartas@gmail.com>",
    "Kyle Akira Rocha <kylerocha2024@u.northwestern.edu>",
    "Jeffrey Andrews <jeffrey.andrews@northwestern.edu>",
    "Matthias Kruckow <Matthias.Kruckow@unige.ch>",
]


[docs] def join_lists(A, B): """Make a joint list of A and B without repetitions and keeping the order. Parameters ---------- A : iterable The baseline list to be extended. B : iterable The second list from which additional elements will be appened. Returns ------- list The joint list by merging A and B. """ result = [] for element in A: result.append(element) A_set = set(A) for element in B: if element not in A_set: result.append(element) return result
[docs] def read_MESA_data_file(path, columns): """Read specific columns from a MESA output file to an array. Note that this function also works with `.gz` files. Parameters ---------- path : str The path to the file. columns : list of strings The list of names of the columns to be returned. Returns ------- array or None The array containing the MESA data, or `None` if path was `None`. """ if path is None: return None elif os.path.exists(path): try: return np.atleast_1d(np.genfromtxt(path, skip_header=5, names=True, usecols=columns, invalid_raise=False)) except: Pwarn("Problems with reading file "+path, "MissingFilesWarning") return None else: return None
[docs] def read_EEP_data_file(path, columns): """Read an EEP file (can be `.gz`) - similar to `read_MESA_data_file()`.""" try: return np.atleast_1d(np.genfromtxt(path, skip_header=11, names=True, usecols=columns, invalid_raise=False)) except: Pwarn("Problems with reading file "+path, "MissingFilesWarning") return None
[docs] def fix_He_core(history): """Make He core mass/radius at least equal to CO core mass/radius.""" if history is not None: columns = history.dtype.names if "he_core_mass" in columns and "co_core_mass" in columns: history["he_core_mass"] = np.maximum(history["he_core_mass"], history["co_core_mass"]) if "he_core_radius" in columns and "co_core_radius" in columns: history["he_core_radius"] = np.maximum(history["he_core_radius"], history["co_core_radius"]) return history
[docs] def add_field(a, descr): """Return a new array that is like `a`, but has additional fields. The contents of `a` are copied over to the appropriate fields in the new array, whereas the new fields are uninitialized. The arguments are not modified. Parameters ---------- a : structured numpy array The base array. descr : str A numpy type description of the new fields. Returns ------- type Description of returned object. Example ------- >>> sa = numpy.array([(1, 'Foo'), (2, 'Bar')], dtype=[('id', int), ('name', 'S3')]) >>> sa.dtype.descr == numpy.dtype([('id', int), ('name', 'S3')]) True >>> sb = add_field(sa, [('score', float)]) >>> sb.dtype.descr == numpy.dtype([('id', int), ('name', 'S3'), ('score', float)]) True >>> numpy.all(sa['id'] == sb['id']) True >>> numpy.all(sa['name'] == sb['name']) True """ if a.dtype.fields is None: raise ValueError("`A' must be a structured numpy array") b = np.empty(a.shape, dtype=a.dtype.descr + descr) for name in a.dtype.names: b[name] = a[name] return b
[docs] def get_cell_edges(grid_x, grid_y): """Return the edges of grid cells to be used for plotting with pcolormesh. Parameters ---------- grid_x : array Center values of grid cells for X. grid_y : array Center values of grid cells for Y. Returns ------- array X grid values. array Y grid values. """ q, p = grid_x, grid_y shift = (q[1]-q[0])/2 new_q = q-shift last = q[-1]+shift new_q = list(new_q) new_q.append(last) shift = (p[1]-p[0])/2 new_p = p-shift last = p[-1]+shift new_p = list(new_p) new_p.append(last) # make mesh QQ, PP = np.meshgrid(new_q, new_p) return QQ, PP
[docs] def find_nearest(val, array): """Find the element of `array` closest to the value `val`.""" nearest_idx = (abs(val-array)).argmin() return array[nearest_idx]
[docs] def find_index_nearest_neighbour(array, value): """Find the index of `array` closest to the value.""" idex = np.argmin(np.abs(array - value)) return idex
[docs] def get_final_proposed_points(proposed_x, grid_x, proposed_y, grid_y): """Map proposed simulation points to the center of a correspoding cell. Only one mapped value is returned if more than one proposed point falls within one cell. Parameters ---------- proposed_x : array Proposed points for X. grid_x : array The values of X at the center of the grid cells. proposed_y : array Proposed points for Y. grid_y : type The values of Y at the center of the grid cells. Returns ------- array Mapped X values. array Mapped Y values. """ mapped_x = [] mapped_y = [] for i, x in enumerate(proposed_x): mapped_x.append(find_nearest(x, grid_x)) for i, y in enumerate(proposed_y): mapped_y.append(find_nearest(y, grid_y)) # filtering: I only want one unique mapped value in each cell coords = np.array([mapped_x, mapped_y]) unique_coord = np.unique(coords.T, axis=0) mapped_x, mapped_y = unique_coord.T[0], unique_coord.T[1] print('Number of unique mapped coordinates:', len(mapped_x)) return mapped_x, mapped_y
[docs] def T_merger_P(P, m1, m2): """Merger time given initial orbital period and masses of binary. Parameters ---------- P : float Orbital period (days). m1 : float Mass of first star (Msun). m2 : float Mass of second star (Msun). Returns ------- float Merger time (Gyr) """ return T_merger_a(kepler3_a(P, m1, m2), m1, m2)
[docs] def beta_gw(m1, m2): """Evaluate the "beta" (equation 5.9) from Peters (1964). Parameters ---------- m1 : float Mass of the first star. m2 : type Mass of the second star. Returns ------- float Peters' beta in cgs units. """ return 64. / 5. * cgrav**3 / clight**5 * m1 * m2 * (m1 + m2)
[docs] def kepler3_a(P, m1, m2): """Calculate the semimajor axis of a binary from its period and masses. Parameters ---------- P : float Orbital period (days). m1 : float Mass of first star. m2 : type Mass of second star. Returns ------- float Semi-major axis (Rsun) using Kepler's third law. """ return ((P * 24.0 * 3600.0)**2.0 * cgrav * (m1 + m2) * Msun / (4.0 * np.pi**2))**(1.0 / 3.0) / Rsun
[docs] def T_merger_a(a, m1, m2): """Merger time given initial orbital separation and masses of binary. Parameters ---------- a : float Orbital separation (Rsun). m1 : float Mass of first star (Msun). m2 : float Mass of second star (Msun). Returns ------- float Merger time (Gyr) following Peters (1964), eq. (5.10). """ return (a * Rsun)**4 / (4. * beta_gw(m1, m2) * Msun**3) / (secyear * 1.0e9)
[docs] def convert_output_to_table( output_file, binary_history_file=None, star1_history_file=None, star2_history_file=None, column_names=[ "result", "CE_flag", "M_1f(Msun)", "M_2f(Msun)", "Porb_f(d)", "tmerge(Gyr)", "log_L_1", "log_T_1", "He_core_1(Msun)", "C_core_1(Msun)", "log_L_2", "log_T_2", "He_core_2(Msun)", "C_core_2(Msun)"]): """Convert output of a run, to a pandas dataframe. Note that this function also works with `.gz` files. Parameters ---------- output_file : str Path to MESA output file. binary_history_file : str Path to binary history data. star1_history_file : str Path to history data for star1. star2_history_file : str Path to history data for star1. column_names : list of str Which columns to return. Returns ------- pandas dataframe The table containing the requested columns from the run's histories. """ values = {} # read out.txt file (simulation terminal output) if not os.path.isfile(output_file): raise ValueError("File {0} does not exist".format(output_file)) elif os.stat(output_file).st_size == 0: raise ValueError('The output file does not have any data.') if binary_history_file is not None: binary_history = pd.DataFrame(np.genfromtxt(binary_history_file, skip_header=5, names=True)) else: Pwarn("You have not supplied a binary history file to parse. " "This will cause all binary history columns to be dashes.", "MissingFilesWarning") if star1_history_file is not None: star1_history = pd.DataFrame(np.genfromtxt(star1_history_file, skip_header=5, names=True)) else: Pwarn("You have not supplied a star1 history file to parse. " "This will cause all star1 history columns to be dashes.", "MissingFilesWarning") if star2_history_file is not None: star2_history = pd.DataFrame(np.genfromtxt(star2_history_file, skip_header=5, names=True)) else: Pwarn("You have not supplied a star2 history file to parse. " "This will cause all star2 history columns to be dashes.", "MissingFilesWarning") # TODO: is this necessary to be done with `popen`? # outdata = os.popen('cat ' + output_file).read() if output_file.endswith(".gz"): with gzip.open(output_file, "rt") as f: outdata = f.read() else: with open(output_file, "r") as f: outdata = f.read() # Checking for individual terminating conditions # carbon depletion depleted_c = outdata.find('Terminate due to primary depleting carbon') # evolved into contact contact = outdata.find("Terminate because accretor " "(r-rl)/rl > accretor_overflow_terminate") # CE happened CE_flag = outdata.find("TURNING ON CE") # initial RLOF overflow, too compact orbit overflow = outdata.find("model is overflowing at ZAMS") # CE merger CE_merge = outdata.find("below CE_xa_diff_to_terminate") # CE merger CE_merge2 = outdata.find("CE_terminate_when_core_overflows") if overflow != -1: values["result"] = "ZAMS_RLOF" else: if CE_flag != -1: values["CE_flag"] = 1 else: values["CE_flag"] = 0 if (contact != -1) and (CE_flag == -1): values["result"] = "contact" elif ((CE_flag != -1) and (CE_merge != -1 or CE_merge2 != -1 or contact != -1)): values["result"] = "CE_merger" elif depleted_c != -1: if star1_history_file is not None: hist = star1_history values["log_L_1"] = hist["log_L"].iloc[-1] values["log_T_1"] = hist["log_Teff"].iloc[-1] values["He_core_1(Msun)"] = hist["he_core_mass"].iloc[-1] values["C_core_1(Msun)"] = hist["c_core_mass"].iloc[-1] if star2_history_file is not None: hist = star2_history values["log_L_2"] = hist["log_L"].iloc[-1] values["log_T_2"] = hist["log_Teff"].iloc[-1] values["He_core_2(Msun)"] = hist["he_core_mass"].iloc[-1] values["C_core_2(Msun)"] = hist["c_core_mass"].iloc[-1] if binary_history_file is not None: tmerge = T_merger_P( binary_history["period_days"].iloc[-1], binary_history["star_1_mass"].iloc[-1], binary_history["star_2_mass"].iloc[-1]) max_lg_mtransfer_rate = binary_history[ "lg_mtransfer_rate"].max() values["M_1f(Msun)"] = binary_history["star_1_mass"].iloc[-1] values["M_2f(Msun)"] = binary_history["star_2_mass"].iloc[-1] values["Porb_f(d)"] = binary_history["period_days"].iloc[-1] values["tmerge(Gyr)"] = tmerge if max_lg_mtransfer_rate < LG_MTRANSFER_RATE_THRESHOLD: values["result"] = "no_interaction" elif CE_flag != -1: if tmerge > 13.8: values["result"] = "CE_ejection" else: values["result"] = "CE_ejection_m" else: if tmerge > 13.8: values["result"] = "stable_MT_to_wide_binary" else: values["result"] = "stable_MT_to_merger" else: values["result"] = "error" # If there is any reason that a column cannot be reported based on # the MESA simulation we fill this value with a "-" for column in column_names: if column not in values.keys(): values[column] = "-" return pd.DataFrame.from_dict(values, orient='index').transpose().apply( pd.to_numeric, errors='ignore')
[docs] def clean_inlist_file(inlist, **kwargs): """Get dictionary of possible parameters/values from the default inlist. Parameters ---------- inlist : str Path to inlist file. **kwargs : dict TODO Returns ------- dict Dictionary of parameters and values from inlist. """ section = kwargs.pop('section', None) with open(inlist, 'r') as f: # clean inlist into nice list param_value_list = [] for i in f.readlines(): # strip away all the comments and whitespace, etc param_and_value = i.strip('\n').strip().split('!')[0].strip() # check that this line actually has a parameter and value par if ((param_and_value) and ('=' in param_and_value or '&' in param_and_value)): param_value_list.append(param_and_value) # does this inlist have multiple sections? dict_of_parameters = {k: {} for k in param_value_list if '&' in k} if not dict_of_parameters: # mesa default files do not have sections, # because controls and jobs are in separate files dict_of_parameters[section] = { i.split('=', 1)[0].strip(): i.split('=', 1)[1].strip() for i in param_value_list if (i.split('=', 1)[1].strip() not in ["''", "'.'"])} else: # probably other people are sending the same inlist with # both job and controls sections for i in param_value_list: if '&' in i: current_key = i elif (i.split('=', 1)[1].strip() not in ["''", "'.'"]): dict_of_parameters[current_key][i.split('=', 1)[0].strip()] = \ i.split('=', 1)[1].strip() return dict_of_parameters
[docs] def get_new_grid_name(path, compression, create_missing_directories=False): """Get the name of a new grid slice based on the path and the compression. Parameters ---------- path : str Path to grid slice data. compression : str Compression value. (Directory to put the new grid slice in.) create_missing_directories : bool Flag to create missing directories. Returns ------- grid_output File name for the new grid slice. """ grid_path, grid_name = os.path.split(os.path.normpath(path)) output_path = os.path.join(grid_path, compression) grid_output = os.path.join(output_path, grid_name+'.h5') if create_missing_directories: # check that LITE/ or ORIGINAL/ directory exists if not os.path.isdir(output_path): os.makedirs(output_path) return grid_output