#!/usr/bin/env python
__authors__ = [
"Simone Bavera <Simone.Bavera@unige.ch>",
"Kyle Akira Rocha <kylerocha2024@u.northwestern.edu>",
"Matthias Kruckow <Matthias.Kruckow@unige.ch>",
]
##############################################################################
# IMPORT ALL NECESSARY PYTHON PACKAGES
##############################################################################
import os
import sys
import ast
import shutil
import pandas as pd
import numpy as np
from pprint import pformat
from posydon.popsyn.io import parse_inifile
from posydon.utils.common_functions import PATH_TO_POSYDON
from posydon.utils.gridutils import get_new_grid_name
# this data processing pipeline was designed assuming POSYDON v2 data structure
'''
Data tree structure
PATH_TO_GRIDS/
/HMS-HMS/
/CO-HMS_RLO/
/CO-HeMS/
/single_HMS/
/single_HeMS/
/v1/
/v2/
/1e+00_Zsun/
/1e-01_Zsun/
/1e-02_Zsun/
...
/grid_low_res_0/
/grid_low_res_1/
/grid_rerun_1/
...
/LITE/
/ORIGINAL/
/plots/
/grid_low_res_combined/
/TF1/
/TF2/
...
'''
# pipeline steps
ACTION_TO_STEP_NUM = {
'CREATE_GRID_SLICES' : 'step_1',
'COMBINE_GRID_SLICES' : 'step_2',
'CALCULATE_EXTRA_VALUES': 'step_3',
'TRAIN_INTERPOLATORS' : 'step_4',
'EXPORT_DATASET' : 'step_9',
'RERUN' : 'rerun'
}
# pipeline substeps
SUB_STEPS_TO_EXTENSION = {
'CREATE_PLOTS' : '_plots',
'DO_CHECKS' : '_checks'
}
# predefined plotting sets
PLOTTING_SETS = {
'PLOT_AFTER_CREATE' : [],
'PLOT_AFTER_COMBINE' : ['combined_TF12', 'termination_flag_1',\
'termination_flag_2', 'termination_flag_3',\
'termination_flag_4', 'rl_relative_overflow_1',\
'rl_relative_overflow_2', 'lg_mtransfer_rate'],
'PLOT_AFTER_EXTRA' : ['S1_MODEL01_CO_type', 'S1_MODEL01_SN_type',\
'S1_MODEL01_mass', 'S1_MODEL01_spin',\
'S1_MODEL01_m_disk_radiated',
'S1_MODEL02_CO_type', 'S1_MODEL02_SN_type',\
'S1_MODEL02_mass', 'S1_MODEL02_spin',\
'S1_MODEL02_m_disk_radiated',
'S1_MODEL03_CO_type', 'S1_MODEL03_SN_type',\
'S1_MODEL03_mass', 'S1_MODEL03_spin',\
'S1_MODEL03_m_disk_radiated',
'S1_MODEL04_CO_type', 'S1_MODEL04_SN_type',\
'S1_MODEL04_mass', 'S1_MODEL04_spin',\
'S1_MODEL04_m_disk_radiated',
'S1_MODEL05_CO_type', 'S1_MODEL05_SN_type',\
'S1_MODEL05_mass', 'S1_MODEL05_spin',\
'S1_MODEL05_m_disk_radiated',
'S1_MODEL06_CO_type', 'S1_MODEL06_SN_type',\
'S1_MODEL06_mass', 'S1_MODEL06_spin',\
'S1_MODEL06_m_disk_radiated',
'S1_MODEL07_CO_type', 'S1_MODEL07_SN_type',\
'S1_MODEL07_mass', 'S1_MODEL07_spin',\
'S1_MODEL07_m_disk_radiated',
'S1_MODEL08_CO_type', 'S1_MODEL08_SN_type',\
'S1_MODEL08_mass', 'S1_MODEL08_spin',\
'S1_MODEL08_m_disk_radiated',
'S1_MODEL09_CO_type', 'S1_MODEL09_SN_type',\
'S1_MODEL09_mass', 'S1_MODEL09_spin',\
'S1_MODEL09_m_disk_radiated',
'S1_MODEL10_CO_type', 'S1_MODEL10_SN_type',\
'S1_MODEL10_mass', 'S1_MODEL10_spin',\
'S1_MODEL10_m_disk_radiated'],
'PLOT_AFTER_TRAINING' : ['INTERP_ERROR_age', 'INTERP_ERROR_star_1_mass',\
'INTERP_ERROR_star_2_mass',\
'INTERP_ERROR_period_days',\
'INTERP_ERROR_S1_co_core_mass',\
'INTERP_ERROR_S1_co_core_radius',\
'INTERP_ERROR_S1_he_core_mass',\
'INTERP_ERROR_S1_he_core_radius',\
'INTERP_ERROR_S1_center_h1',\
'INTERP_ERROR_S1_center_he4',\
'INTERP_ERROR_S1_surface_h1',\
'INTERP_ERROR_S1_surface_he4',\
'INTERP_ERROR_S1_surf_avg_omega_div_omega_crit',\
'INTERP_ERROR_S1_log_Teff',\
'INTERP_ERROR_S1_log_L', 'INTERP_ERROR_S1_log_R',\
'INTERP_ERROR_S1_spin_parameter',\
'INTERP_ERROR_S1_lambda_CE_10cent',\
'INTERP_ERROR_S2_co_core_mass',\
'INTERP_ERROR_S2_co_core_radius',\
'INTERP_ERROR_S2_he_core_mass',\
'INTERP_ERROR_S2_he_core_radius',\
'INTERP_ERROR_S2_center_h1',\
'INTERP_ERROR_S2_center_he4',\
'INTERP_ERROR_S2_surface_h1',\
'INTERP_ERROR_S2_surface_he4',\
'INTERP_ERROR_S2_surf_avg_omega_div_omega_crit',\
'INTERP_ERROR_S2_log_Teff',\
'INTERP_ERROR_S2_log_L', 'INTERP_ERROR_S2_log_R',\
'INTERP_ERROR_S2_spin_parameter',\
'INTERP_ERROR_S2_lambda_CE_10cent',\
'INTERP_ERROR_S1_MODEL01_mass',\
'INTERP_ERROR_S1_MODEL01_spin',\
'INTERP_ERROR_S1_MODEL01_m_disk_radiated',\
'INTERP_ERROR_S1_MODEL05_mass',\
'INTERP_ERROR_S1_MODEL05_spin',\
'INTERP_ERROR_S1_MODEL05_m_disk_radiated',\
'INTERP_ERROR_S1_MODEL06_mass',\
'INTERP_ERROR_S1_MODEL06_spin',\
'INTERP_ERROR_S1_MODEL06_m_disk_radiated',\
'INTERP_ERROR_S1_MODEL10_mass',\
'INTERP_ERROR_S1_MODEL10_spin',\
'INTERP_ERROR_S1_MODEL10_m_disk_radiated']
}
# predefined checking sets
CHECKING_SETS = {
'CHECK_AFTER_CREATE' : [],
'CHECK_AFTER_COMBINE' : ['failure_rate'],
'CHECK_AFTER_EXTRA' : ['CO_type', 'SN_type'],
'CHECK_AFTER_TRAINING' : []
}
# common attributes for all steps
ATTRIBUTES_TO_CHECK = ['GRID_TYPES', 'METALLICITIES', 'GRID_SLICES',\
'COMPRESSIONS']
[docs]
class PostProcessingPipeline:
"""A class to handle the post-processing pipeline."""
def __init__(self, path_to_inifile=None):
"""Initialize a pipeline
Arguments
---------
path_to_inifile : path
The location of an ini file to read the parameters for the pipeline
(default: None)
"""
self.PATH_TO_POSYDON = os.getenv('PATH_TO_POSYDON')
self.PATH_TO_INIFILE = path_to_inifile
if self.PATH_TO_INIFILE is not None:
self.pipeline_kwargs = self.parse_setup_params(self.PATH_TO_INIFILE)
[docs]
@staticmethod
def parse_setup_params(path=None):
"""Parse inifile for running post-processing pipelines.
Arguments
---------
path : path
The location of an ini file to read the parameters for the pipeline
(default: None)
Returns
-------
dictionary
"""
if path is None:
return
else:
parser = parse_inifile(path)
pipeline_kwargs = dict()
for section in parser.sections():
# create a subdirectory for each section
section_dict = dict()
for key, val in parser[section].items():
# interprete all input in python types
# MK:this is probably not a save way to do this
section_dict[key] = ast.literal_eval(val)
pipeline_kwargs[section] = section_dict
return pipeline_kwargs
[docs]
def create_csv_and_slurm_job_files(self):
"""Creates all files the pipeline needs."""
def expand_runfile(runfile, last_step='', step_number='step_X'):
"""Add next entry to the script in runfile."""
if last_step == '':
# if there was no previous step the current step has no
# dependency
runfile.write(f"ID{step_number}=$(sbatch --parsable "
f"{step_number}.slurm)\n")
else:
# steps waiting for the previous step
runfile.write(f"ID{step_number}=$(sbatch --parsable "
"--dependency=afterok:$""{"f"ID{last_step}""} "
"--kill-on-invalid-dep=yes "
f"{step_number}.slurm)\n")
runfile.write(f"echo '{step_number}.slurm submitted as '$"
"{"f"ID{step_number}""}"f"\n")
return step_number
def get_step_kwargs(self, last_step='', step_number='step_X'):
"""Provides the attributes of a step. Additionally, it checks for
missing attributes and may copies them from a previous step."""
if step_number not in self.pipeline_kwargs.keys():
raise KeyError(f'{step_number} not in pipeline_kwargs!')
elif 'step' in step_number:
# check for attributes
for attribute in ATTRIBUTES_TO_CHECK:
if (attribute not in self.pipeline_kwargs[step_number].\
keys() or not isinstance(self.pipeline_kwargs\
[step_number][attribute], list)):
if last_step not in self.pipeline_kwargs.keys():
raise KeyError(f'{step_number} has no {attribute}')
else: # infer missing attribute from last step
print(f"\ncopy {attribute} from {last_step} to "
f"{step_number}")
self.pipeline_kwargs[step_number][attribute] = \
self.pipeline_kwargs[last_step][attribute]
# replace pre defined sets
if 'CREATE_PLOTS' in self.pipeline_kwargs[step_number].keys():
for set_name, set_content in PLOTTING_SETS.items():
if set_name in self.pipeline_kwargs[step_number]['CREATE_PLOTS']:
self.pipeline_kwargs[step_number]['CREATE_PLOTS'].remove(set_name)
self.pipeline_kwargs[step_number]['CREATE_PLOTS'].extend(set_content)
if 'DO_CHECKS' in self.pipeline_kwargs[step_number].keys():
for set_name, set_content in CHECKING_SETS.items():
if set_name in self.pipeline_kwargs[step_number]['DO_CHECKS']:
self.pipeline_kwargs[step_number]['DO_CHECKS'].remove(set_name)
self.pipeline_kwargs[step_number]['DO_CHECKS'].extend(set_content)
# return attributes
return self.pipeline_kwargs[step_number]
def has_substep(self, step_number='step_X', substep='ALL'):
"""Checks whether a step has substeps."""
if step_number not in self.pipeline_kwargs.keys():
raise KeyError(f'{step_number} not in pipeline_kwargs!')
if substep=='ALL': # check all possible substeps
substeps = SUB_STEPS_TO_EXTENSION.keys()
else: # check the specified substep
substeps = [substep]
for sub_step in substeps:
if sub_step in self.pipeline_kwargs[step_number].keys():
SUB_STEP_LIST = self.pipeline_kwargs[step_number][sub_step]
if (isinstance(SUB_STEP_LIST, list) and
len(SUB_STEP_LIST)>0):
# at least one entry is found in the substep
return True
# no entry for the substep(s)
return False
# check for sections 'account' and 'pipeline setup' and store their
# attributes
if 'account' not in self.pipeline_kwargs.keys():
raise KeyError('account not in pipeline_kwargs!')
if 'pipeline setup' not in self.pipeline_kwargs.keys():
raise KeyError('pipeline setup not in pipeline_kwargs!')
account_kwargs = self.pipeline_kwargs['account']
setup_kwargs = self.pipeline_kwargs['pipeline setup']
# take verbose value from setup arguments
if 'VERBOSE' in setup_kwargs.keys():
VERBOSE = setup_kwargs['VERBOSE']
else:
VERBOSE = False
if VERBOSE:
print( "\n{:+^45s} \n{}\n\n{:+^45s}\n{}".format(
'ACCOUNT', pformat(account_kwargs, indent=2),
'SETUP', pformat(setup_kwargs, indent=2) )
)
previously_created_files = []
# create a runfile script
with open('run_pipeline.sh', 'w') as runfile:
runfile.write("#!/bin/bash\n")
last_step = ''
for action, step_number in ACTION_TO_STEP_NUM.items():
if action in setup_kwargs.keys():
do_step_bool = setup_kwargs[action]
else:
do_step_bool = False
if VERBOSE:
print( "\n\n{:-^45s} {:^8s}:{:^6s}".format(
action, step_number, str(do_step_bool)) )
if do_step_bool or has_substep(self, step_number=step_number):
step_kwargs = get_step_kwargs(self, last_step=last_step,
step_number=step_number)
if VERBOSE:
print( pformat(step_kwargs, indent=2) )
if do_step_bool:
# create csv file and remember therein created files
previously_created_files += create_csv(
step_name=step_number,
previously_created_files=previously_created_files,
**{**step_kwargs, **setup_kwargs}
)
# create slurm file
slurm_job(
job_name=step_number,
step_name=step_number,
PATH_TO_POSYDON=self.PATH_TO_POSYDON,
**{**step_kwargs, **setup_kwargs, **account_kwargs}
)
# add job to the list in the runfile script
expand_runfile(runfile, last_step=last_step,
step_number=step_number)
# remember that this step was done
last_step = step_number
for sub_step, step_extension in SUB_STEPS_TO_EXTENSION.items():
if sub_step not in self.pipeline_kwargs[step_number].keys():
continue
if has_substep(self, step_number=step_number,
substep=sub_step):
# get sub-step number to be a concatenation of the main
# step and the sub-step extension.
sub_step_number = step_number+step_extension
# create csv file for sub step
create_csv(
step_name=sub_step_number,
previously_created_files=previously_created_files,
**{**step_kwargs, **setup_kwargs}
)
# create slurm file for sub step
slurm_job(
job_name=sub_step_number,
step_name=step_number,
PATH_TO_POSYDON=self.PATH_TO_POSYDON,
**{**step_kwargs, **setup_kwargs, **account_kwargs}
)
# add job to the list in the runfile script
expand_runfile(runfile, last_step=last_step,
step_number=sub_step_number)
# there are no dependencies supported on sub steps
# make the runfile script executable
os.system("chmod 755 run_pipeline.sh")
[docs]
def create_log_dirs(self):
"""Create directories to store the log files."""
# use work directory given by PATH in the setup attributes
if 'pipeline setup' not in self.pipeline_kwargs.keys():
raise KeyError('pipeline setup not in pipeline_kwargs!')
setup_kwargs = self.pipeline_kwargs['pipeline setup']
if 'PATH' in setup_kwargs.keys():
logs_path = os.path.join(setup_kwargs['PATH'], 'logs')
else:
logs_path = os.path.join('.', 'logs')
# create logs in working dir to store all slurm outputs
if not os.path.isdir(logs_path):
os.makedirs(logs_path)
# create sub directories for each step
for action, step_number in ACTION_TO_STEP_NUM.items():
if ((action in setup_kwargs.keys()) and setup_kwargs[action]):
step_log_path = os.path.join(logs_path, step_number)
if not os.path.isdir(step_log_path):
os.makedirs(step_log_path)
[docs]
def create_export_dirs(self):
"""Create directories for the finally exported data."""
if 'pipeline setup' not in self.pipeline_kwargs.keys():
raise KeyError('pipeline setup not in pipeline_kwargs!')
setup_kwargs = self.pipeline_kwargs['pipeline setup']
# create data dir three to export the datasets
if (('EXPORT_DATASET' in setup_kwargs.keys()) and
setup_kwargs['EXPORT_DATASET']):
if 'PATH' in setup_kwargs.keys():
data_path = os.path.join(setup_kwargs['PATH'], 'POSYDON_data')
else:
data_path = os.path.join('.', 'POSYDON_data')
# create main directory for the POSYDON data
if not os.path.isdir(data_path):
os.makedirs(data_path)
# determine sub directories
dirs = []
grid_dirs = ['HMS-HMS', 'CO-HMS_RLO', 'CO-HeMS', 'CO-HeMS_RLO',
'single_HMS', 'single_HeMS']
interp_dirs = ['interpolators', 'interpolators/1NN_1NN',
'interpolators/linear3c_kNN']
for name1 in grid_dirs:
dirs.append(os.path.join(data_path, name1))
for name2 in interp_dirs:
dirs.append(os.path.join(data_path, name1, name2))
# create sub directories
for dir_ in dirs:
if not os.path.isdir(dir_):
os.makedirs(dir_)
[docs]
def slurm_job(job_name,
step_name,
PATH_TO_GRIDS=None,
PATH_TO_POSYDON=None,
PATH='.',
ACCOUNT=None,
PARTITION=None,
WALLTIME=None,
MAILTYPE=None,
EMAIL=None,
CREATE_GRID_SLICES=False,
COMBINE_GRID_SLICES=False,
CALCULATE_EXTRA_VALUES=False,
TRAIN_INTERPOLATORS=False,
EXPORT_DATASET=False,
RERUN=False,
VERBOSE=False,
**kwargs):
"""Create slurm file."""
def copy_old_log_file(path_to_logs='.', log_file='log'):
"""Make a copy of an old out file (and remove an old copy)."""
path_to_out_file = os.path.join(path_to_logs, log_file+'.out')
if os.path.exists(path_to_out_file): # move old data to a copy
path_to_old_out_file = os.path.join(path_to_logs,
log_file+'.old.out')
if os.path.exists(path_to_old_out_file): # delete old copy
os.remove(path_to_old_out_file)
shutil.move(path_to_out_file, path_to_old_out_file)
return path_to_out_file
# get path to csv file
path_to_csv_file = os.path.join(PATH, f'{job_name}.csv')
# create slurm file
with open(f'{job_name}.slurm', 'w') as f:
# get STEPID
if ('step' in job_name) and (len(job_name.split("_")) > 1):
STEPID = job_name.split("_")[1]
if (len(job_name.split("_")) > 2):
STEPID += job_name.split("_")[2]
elif job_name == 'rerun':
STEPID = 'R'
else:
STEPID = ''
# write slurm file content
f.write("#!/bin/bash\n")
if ACCOUNT is not None:
f.write(f"#SBATCH --account={ACCOUNT}\n")
if PARTITION is not None:
f.write(f"#SBATCH --partition={PARTITION}\n")
f.write("#SBATCH -N 1\n")
f.write("#SBATCH --cpus-per-task 1\n")
f.write("#SBATCH --ntasks-per-node 1\n")
if WALLTIME is not None:
f.write(f"#SBATCH --time={WALLTIME}\n")
f.write(f"#SBATCH --job-name=psygrid{STEPID}\n")
f.write("#SBATCH --mem-per-cpu=4G\n")
if ((EMAIL is not None) and (MAILTYPE is not None)):
f.write(f"#SBATCH --mail-type={MAILTYPE}\n")
f.write(f"#SBATCH --mail-user={EMAIL}\n")
# extract array size
if ((job_name in ['step_1', 'step_3', 'step_4', 'step_9', 'rerun']) or
('plot' in job_name) or ('check' in job_name)):
df = pd.read_csv(path_to_csv_file)
N = df.shape[0]-1
elif job_name in ['step_2']:
df = pd.read_csv(path_to_csv_file)
N = df.shape[1]-1
else:
raise ValueError(f'This should never happen! job_name={job_name}')
if N<0:
raise ValueError(f'{job_name} has no jobs to run, please check '
f'{path_to_csv_file}')
f.write(f"#SBATCH --array=0-{N}\n")
slurm_array = '$SLURM_ARRAY_TASK_ID'
# get logs path and set the output of the slurm job
if step_name in job_name:
path_to_logs = os.path.join(PATH, 'logs', step_name)
else:
path_to_logs = os.path.join(PATH, 'logs', job_name)
if job_name == 'step_1': # CREATE_GRID_SLICES
f.write("#SBATCH --open-mode=truncate\n")
f.write(f"#SBATCH --output={path_to_logs}/grid_slice_%a.out\n")
if job_name == 'step_2': # COMBINE_GRID_SLICES
path_to_out_file = copy_old_log_file(path_to_logs=path_to_logs,
log_file='combine_grid_slices')
f.write("#SBATCH --open-mode=append\n")
f.write(f"#SBATCH --output={path_to_out_file}\n")
if job_name == 'step_3': # CALCULATE_EXTRA_VALUES
f.write("#SBATCH --open-mode=truncate\n")
f.write(f"#SBATCH --output={path_to_logs}/post_processing_%a.out\n")
f.write(f"export PATH_TO_POSYDON={PATH_TO_POSYDON}\n")
if job_name == 'step_4': # TRAIN_INTERPOLATORS
f.write("#SBATCH --open-mode=truncate\n")
f.write(f"#SBATCH --output={path_to_logs}/train_interpolators_%a.out\n")
if job_name == 'step_9': # EXPORT_DATASET
path_to_out_file = copy_old_log_file(path_to_logs=path_to_logs,
log_file='export_dataset')
f.write("#SBATCH --open-mode=append\n")
f.write(f"#SBATCH --output={path_to_out_file}\n")
if job_name == 'rerun': # RERUN
path_to_out_file = copy_old_log_file(path_to_logs=path_to_logs,
log_file='rerun')
f.write("#SBATCH --open-mode=append\n")
f.write(f"#SBATCH --output={path_to_out_file}\n")
if 'plot' in job_name: # PLOTS
path_to_out_file = copy_old_log_file(path_to_logs=path_to_logs,
log_file='plots_'+step_name)
f.write("#SBATCH --open-mode=append\n")
f.write(f"#SBATCH --output={path_to_out_file}\n")
f.write("unset DISPLAY\n")
if 'check' in job_name: # CHECKS
path_to_out_file = copy_old_log_file(path_to_logs=path_to_logs,
log_file='checks_'+step_name)
f.write("#SBATCH --open-mode=append\n")
f.write(f"#SBATCH --output={path_to_out_file}\n")
# get path to run the pipeline
path_to_run_pipeline = os.path.join(PATH_TO_POSYDON, 'bin',
'run-pipeline')
# encode the verbose information for running the pipeline as a command
# line argument
if VERBOSE and 'check' not in job_name:
f.write(f"\nsrun python {path_to_run_pipeline} {PATH_TO_GRIDS} "
f"{path_to_csv_file} {slurm_array} 1")
else:
f.write(f"\nsrun python {path_to_run_pipeline} {PATH_TO_GRIDS} "
f"{path_to_csv_file} {slurm_array} 0")
[docs]
def create_csv(GRID_TYPES=[],
METALLICITIES=[],
GRID_SLICES=[],
COMPRESSIONS=[],
step_name='',
VERSION='',
GRIDS_COMBINED=[],
INTERPOLATION_METHODS=[],
CONTROL_GRIDS=[],
STOP_BEFORE_CARBON_DEPLETION=0,
RERUN_TYPE='',
DROP_MISSING_FILES=False,
CREATE_PLOTS=[],
DO_CHECKS=[],
PATH_TO_GRIDS='',
PATH='',
previously_created_files=[],
**kwargs):
"""Create csv file with a list containing all data to process a step."""
# number of grid types
N = len(GRID_TYPES)
if ( N != len(METALLICITIES) or N != len(GRID_SLICES) or
N != len(COMPRESSIONS)):
raise ValueError('Missmatch between the len of GRID_TYPES, '
'METALLICITIES, GRID_SLICES, COMPRESSIONS.')
if step_name == 'step_2':
# there need to be as many combined files specified as recipes are
# given to create them.
if N != len(GRIDS_COMBINED):
raise ValueError('len(GRID_TYPES) != len(GRIDS_COMBINED)!')
elif 'step_4_' in step_name:
# sub steps of the interpolator training need a control grid
if len(GRID_SLICES) != len(CONTROL_GRIDS):
raise ValueError('len(GRID_SLICES) != len(CONTROL_GRIDS)!')
grids = []
grids_compression = []
stop_before_carbon_depletion = []
grids_ORIGINAL = []
processed_grids = []
interpolators = []
export_path = []
rerun_path = []
rerun_type = []
plot_dirs = []
plot_quantities = []
checks = []
newly_created_files = []
df = pd.DataFrame()
# loop over all grid types
for l, grid_type in enumerate(GRID_TYPES):
METALLICITIES_ = METALLICITIES[l]
GRID_SLICES_ = GRID_SLICES[l]
COMPRESSIONS_ = COMPRESSIONS[l]
# loop over all metallicities, grid slices and compressions
for metallicity in METALLICITIES_:
for i, grid_slice in enumerate(GRID_SLICES_):
for compression in COMPRESSIONS_:
# get RLO extension text depending on compression
if 'RLO' in compression:
RLO = '_RLO'
else:
RLO = ''
# create data lists depending on the (sub)step
if step_name == 'step_1': # CREATE_GRID_SLICES
grids.append(os.path.join(PATH_TO_GRIDS, grid_type,
VERSION, metallicity,
grid_slice))
grids_compression.append(compression)
stop_before_carbon_depletion.append(STOP_BEFORE_CARBON_DEPLETION)
elif step_name == 'step_2': # COMBINE_GRID_SLICES
if len(GRID_SLICES[l]) != len(GRIDS_COMBINED[l]):
raise ValueError(f'len(GRID_SLICES[{l}]) != '
f'len(GRIDS_COMBINED[{l}])!')
combine_grid_slices = []
# grid_slice is a batch
for grid_slice_ in grid_slice:
path_to_grid = os.path.join(PATH_TO_GRIDS,
grid_type, VERSION,
metallicity, compression,
grid_slice_+'.h5')
combine_grid_slices.append(path_to_grid)
path_to_grid_combined = os.path.join(PATH_TO_GRIDS,
grid_type, VERSION,
metallicity, compression,
GRIDS_COMBINED[l][i]+'.h5')
df_tmp = pd.DataFrame()
df_tmp[path_to_grid_combined] = combine_grid_slices
df = pd.concat([df,df_tmp], axis=1)
elif step_name == 'step_3': # CALCULATE_EXTRA_VALUES
grids.append(os.path.join(PATH_TO_GRIDS,
grid_type, VERSION, metallicity,
compression, grid_slice+'.h5'))
grids_ORIGINAL.append(os.path.join(PATH_TO_GRIDS,
grid_type, VERSION,
metallicity, 'ORIGINAL'+RLO,
grid_slice+'.h5'))
processed_grids.append(os.path.join(PATH_TO_GRIDS,
grid_type, VERSION,
metallicity, compression,
grid_slice+'_processed.h5'))
elif step_name == 'step_4': # TRAIN_INTERPOLATORS
for method in INTERPOLATION_METHODS:
grids.append(os.path.join(PATH_TO_GRIDS,
grid_type, VERSION, metallicity,
compression, grid_slice+'.h5'))
interpolators.append(os.path.join(PATH_TO_GRIDS,
grid_type, VERSION,
metallicity,
'interpolation_objects',
'IF_'+method+RLO+'.pkl'))
elif step_name == 'step_9': # EXPORT_DATASET
if "RLO" in grid_type:
export_dir = grid_type
else:
export_dir = grid_type+RLO
grids.append(os.path.join(PATH_TO_GRIDS,
grid_type, VERSION, metallicity,
compression, grid_slice+'.h5'))
export_path.append(os.path.join(PATH, 'POSYDON_data',
export_dir, metallicity+'.h5'))
for method in [['linear','linear3c_kNN'],
['1NN','1NN_1NN']]:
grids.append(os.path.join(PATH_TO_GRIDS, grid_type,
VERSION, metallicity,
'interpolation_objects',
'IF_'+method[0]+RLO+'.pkl'))
export_path.append(os.path.join(PATH,
'POSYDON_data', export_dir,
'interpolators', method[1],
metallicity+'.pkl'))
elif step_name == 'rerun': # RERUN
grids.append(os.path.join(PATH_TO_GRIDS, grid_type,
VERSION, metallicity, compression,
grid_slice+'.h5'))
rerun_path.append(os.path.join(PATH_TO_GRIDS,
grid_type, VERSION, metallicity,
'rerun_'+RERUN_TYPE+'_'+grid_slice))
rerun_type.append(RERUN_TYPE)
if (('plot' in step_name) or ('check' in step_name)):
# get output grid names for sub steps
if 'step_2' in step_name:
grid_s = GRIDS_COMBINED[l][i]
elif 'step_3' in step_name:
grid_s = grid_slice+'_processed'
elif 'step_4' in step_name:
grid_s = CONTROL_GRIDS[l][i]
else:
grid_s = grid_slice
if grid_s == '': # skip that slice for sub steps
continue
if 'step_4' not in step_name:
# all steps without interpolation method get an
# empty method
if len(INTERPOLATION_METHODS)==0:
INTERPOLATION_METHODS = ['']
if 'plot' in step_name: # PLOTS
# only plot first compression type, but allow for
# with and without RLO flag
if ((compression not in COMPRESSIONS_[0]) and
(COMPRESSIONS_[0] not in compression)):
continue
# loop over interpolation methods and plots
for method in INTERPOLATION_METHODS:
# for to_plot in CREATE_PLOTS:
# plot_quantities.append(to_plot)
if len(CREATE_PLOTS)>0:
plot_quantities.append(CREATE_PLOTS)
grids.append(os.path.join(PATH_TO_GRIDS,
grid_type, VERSION,
metallicity, compression,
grid_s+'.h5'))
plot_dirs.append(os.path.join(PATH_TO_GRIDS,
grid_type, VERSION,
metallicity, 'plots',
RLO[1:]+RLO[:1]+grid_s))
if method != '': # if there is a method
interpolators.append(os.path.join(PATH_TO_GRIDS,
grid_type,
VERSION,
metallicity,
'interpolation_objects',
'IF_'+method+RLO+'.pkl'))
elif 'check' in step_name: # CHECKS
# loop over interpolation methods and checks
for method in INTERPOLATION_METHODS:
# for to_check in DO_CHECKS:
# checks.append(to_check)
if len(DO_CHECKS)>0:
checks.append(DO_CHECKS)
grids.append(os.path.join(PATH_TO_GRIDS,
grid_type, VERSION,
metallicity, compression,
grid_s+'.h5'))
if method != '': # if there is a method
interpolators.append(os.path.join(PATH_TO_GRIDS,
grid_type,
VERSION,
metallicity,
'interpolation_objects',
'IF_'+method+RLO+'.pkl'))
# saving dataset to csv file
if step_name != 'step_2':
grids = np.array(grids)
df['path_to_grid'] = grids
if step_name == 'step_1':
df['compression'] = grids_compression
df['stop_before_carbon_depletion'] = stop_before_carbon_depletion
elif step_name == 'step_3':
df['path_to_grid_ORIGINAL'] = grids_ORIGINAL
df['path_to_processed_grid'] = processed_grids
elif step_name == 'step_4':
df['path_to_interpolator'] = interpolators
elif step_name == 'step_9':
df['export_path'] = export_path
elif step_name == 'rerun':
df['rerun_path'] = rerun_path
df['rerun_type'] = rerun_type
elif 'plot' in step_name:
df['quantities_to_plot'] = plot_quantities
df['path_to_plot'] = plot_dirs
if len(interpolators)>0:
df['path_to_interpolator'] = interpolators
elif 'check' in step_name:
df['checks_to_do'] = checks
if len(interpolators)>0:
df['path_to_interpolator'] = interpolators
# drop lines when grid directories/files are not found
if DROP_MISSING_FILES:
drop_rows = []
for row in df.index:
# grid file is always needed
path = df.at[row,'path_to_grid']
if (not os.path.exists(path) and
path not in previously_created_files):
drop_rows.append(row)
elif step_name == 'step_3':
# grid file with ORIGINAL compression needed
path = df.at[row,'path_to_grid_ORIGINAL']
if (not os.path.exists(path) and
path not in previously_created_files):
drop_rows.append(row)
elif 'step_4_' in step_name:
# interpolator files needed
path = df.at[row,'path_to_interpolator']
if (not os.path.exists(path) and
path not in previously_created_files):
drop_rows.append(row)
# report about missing files and remove tasks, which need them
if len(drop_rows)>0:
print("\n{:-^54s}".format(step_name))
print('The following grids will not be processed because the '
'files/directories are missing! If this warning message '
'is unexpected to you, please check the file paths!')
for row in drop_rows:
print(df.at[row,'path_to_grid'])
if step_name == 'step_3':
print(df.at[row,'path_to_grid_ORIGINAL'])
elif 'step_4_' in step_name:
print(df.at[row,'path_to_interpolator'])
print('')
df = df.drop(index=drop_rows)
# keep track of files created by a step
if step_name == 'step_1':
for row in df.index:
path = df.at[row,'path_to_grid']
compression = df.at[row,'compression']
newly_created_files.append(get_new_grid_name(path,compression))
elif step_name == 'step_3':
newly_created_files = df['path_to_processed_grid'].to_list()
elif step_name == 'step_4':
newly_created_files = df['path_to_interpolator'].to_list()
elif step_name == 'step_9':
newly_created_files = df['export_path'].to_list()
elif step_name == 'rerun':
newly_created_files = df['rerun_path'].to_list()
else: # step 2
# drop columns when psygrid files are not found
if DROP_MISSING_FILES:
first_time = True
drop_columns = []
for column in df.keys():
drop_rows = []
for row in df.index:
path = df.at[row,column]
if pd.notna(path):
if (not os.path.exists(path) and
path not in previously_created_files):
drop_rows.append(row)
# report about missing files and adjust column
if len(drop_rows)>0:
if first_time:
print("\n{:-^54s}".format(step_name))
print('If this warning message is unexpected to you, '
'please check that step 1 occoured '
'succesffully!')
first_time = False
print(f'In {column} the following grids are skipped!')
for row in drop_rows:
print(df.at[row,column])
df.at[row,column] = np.NaN
print('')
newcol = df[column].dropna().to_list()
if len(newcol)==0:
drop_columns.append(column)
newcol.extend((df.shape[0]-len(newcol))*[np.NaN])
df[column] = newcol
# report about empty tasks and remove them
if len(drop_columns)>0:
if first_time:
print("\n{:-^54s}".format(step_name))
print('If this warning message is unexpected to you, '
'please check that step 1 occoured '
'succesffully!')
first_time = False
print('The following grids will not be combined as all grid '
'slices are missing!')
for column in drop_columns:
print(column)
print('')
df = df.dropna(axis=0, how='all').dropna(axis=1, how='all')
newly_created_files = df.keys().to_list()
# finally save data frame
output_fname = f'{step_name}.csv'
df.to_csv(os.path.join(PATH, output_fname), index=False)
# return list of new files created by that step
return newly_created_files
if __name__ == '__main__':
if len(sys.argv) >= 2:
ini_file_path = str(sys.argv[1])
if '.' not in ini_file_path:
ini_file_path = './' + ini_file_path
else:
raise ValueError('No ini file specified.')
pipeline = PostProcessingPipeline(ini_file_path)
pipeline.create_csv_and_slurm_job_files()
pipeline.create_log_dirs()
pipeline.create_export_dirs()