Source code for setup_pipeline

#!/usr/bin/env python
__authors__ = [
    "Simone Bavera <Simone.Bavera@unige.ch>",
    "Kyle Akira Rocha <kylerocha2024@u.northwestern.edu>",
    "Matthias Kruckow <Matthias.Kruckow@unige.ch>",
]

##############################################################################
#  IMPORT ALL NECESSARY PYTHON PACKAGES
##############################################################################
import os
import sys
import ast
import shutil
import pandas as pd
import numpy as np
from pprint import pformat
from posydon.popsyn.io import parse_inifile
from posydon.utils.common_functions import PATH_TO_POSYDON
from posydon.utils.gridutils import get_new_grid_name

# this data processing pipeline was designed assuming POSYDON v2 data structure
'''
Data tree structure

PATH_TO_GRIDS/
    /HMS-HMS/
    /CO-HMS_RLO/
    /CO-HeMS/
    /single_HMS/
    /single_HeMS/
        /v1/
        /v2/
            /1e+00_Zsun/
            /1e-01_Zsun/
            /1e-02_Zsun/
            ...
                /grid_low_res_0/
                /grid_low_res_1/
                /grid_rerun_1/
                ...
                /LITE/
                /ORIGINAL/
                /plots/
                    /grid_low_res_combined/
                        /TF1/
                        /TF2/
                        ...
'''

# pipeline steps
ACTION_TO_STEP_NUM = {
    'CREATE_GRID_SLICES'    : 'step_1',
    'COMBINE_GRID_SLICES'   : 'step_2',
    'CALCULATE_EXTRA_VALUES': 'step_3',
    'TRAIN_INTERPOLATORS'   : 'step_4',
    'EXPORT_DATASET'        : 'step_9',
    'RERUN'                 : 'rerun'
}
# pipeline substeps
SUB_STEPS_TO_EXTENSION = {
    'CREATE_PLOTS' : '_plots',
    'DO_CHECKS'    : '_checks'
}
# predefined plotting sets
PLOTTING_SETS = {
    'PLOT_AFTER_CREATE'   : [],
    'PLOT_AFTER_COMBINE'  : ['combined_TF12', 'termination_flag_1',\
                             'termination_flag_2', 'termination_flag_3',\
                             'termination_flag_4', 'rl_relative_overflow_1',\
                             'rl_relative_overflow_2', 'lg_mtransfer_rate'],
    'PLOT_AFTER_EXTRA'    : ['S1_MODEL01_CO_type', 'S1_MODEL01_SN_type',\
                             'S1_MODEL01_mass', 'S1_MODEL01_spin',\
                             'S1_MODEL01_m_disk_radiated',
                             'S1_MODEL02_CO_type', 'S1_MODEL02_SN_type',\
                             'S1_MODEL02_mass', 'S1_MODEL02_spin',\
                             'S1_MODEL02_m_disk_radiated',
                             'S1_MODEL03_CO_type', 'S1_MODEL03_SN_type',\
                             'S1_MODEL03_mass', 'S1_MODEL03_spin',\
                             'S1_MODEL03_m_disk_radiated',
                             'S1_MODEL04_CO_type', 'S1_MODEL04_SN_type',\
                             'S1_MODEL04_mass', 'S1_MODEL04_spin',\
                             'S1_MODEL04_m_disk_radiated',
                             'S1_MODEL05_CO_type', 'S1_MODEL05_SN_type',\
                             'S1_MODEL05_mass', 'S1_MODEL05_spin',\
                             'S1_MODEL05_m_disk_radiated',
                             'S1_MODEL06_CO_type', 'S1_MODEL06_SN_type',\
                             'S1_MODEL06_mass', 'S1_MODEL06_spin',\
                             'S1_MODEL06_m_disk_radiated',
                             'S1_MODEL07_CO_type', 'S1_MODEL07_SN_type',\
                             'S1_MODEL07_mass', 'S1_MODEL07_spin',\
                             'S1_MODEL07_m_disk_radiated',
                             'S1_MODEL08_CO_type', 'S1_MODEL08_SN_type',\
                             'S1_MODEL08_mass', 'S1_MODEL08_spin',\
                             'S1_MODEL08_m_disk_radiated',
                             'S1_MODEL09_CO_type', 'S1_MODEL09_SN_type',\
                             'S1_MODEL09_mass', 'S1_MODEL09_spin',\
                             'S1_MODEL09_m_disk_radiated',
                             'S1_MODEL10_CO_type', 'S1_MODEL10_SN_type',\
                             'S1_MODEL10_mass', 'S1_MODEL10_spin',\
                             'S1_MODEL10_m_disk_radiated'],
    'PLOT_AFTER_TRAINING' : ['INTERP_ERROR_age', 'INTERP_ERROR_star_1_mass',\
                             'INTERP_ERROR_star_2_mass',\
                             'INTERP_ERROR_period_days',\
                             'INTERP_ERROR_S1_co_core_mass',\
                             'INTERP_ERROR_S1_co_core_radius',\
                             'INTERP_ERROR_S1_he_core_mass',\
                             'INTERP_ERROR_S1_he_core_radius',\
                             'INTERP_ERROR_S1_center_h1',\
                             'INTERP_ERROR_S1_center_he4',\
                             'INTERP_ERROR_S1_surface_h1',\
                             'INTERP_ERROR_S1_surface_he4',\
                             'INTERP_ERROR_S1_surf_avg_omega_div_omega_crit',\
                             'INTERP_ERROR_S1_log_Teff',\
                             'INTERP_ERROR_S1_log_L', 'INTERP_ERROR_S1_log_R',\
                             'INTERP_ERROR_S1_spin_parameter',\
                             'INTERP_ERROR_S1_lambda_CE_10cent',\
                             'INTERP_ERROR_S2_co_core_mass',\
                             'INTERP_ERROR_S2_co_core_radius',\
                             'INTERP_ERROR_S2_he_core_mass',\
                             'INTERP_ERROR_S2_he_core_radius',\
                             'INTERP_ERROR_S2_center_h1',\
                             'INTERP_ERROR_S2_center_he4',\
                             'INTERP_ERROR_S2_surface_h1',\
                             'INTERP_ERROR_S2_surface_he4',\
                             'INTERP_ERROR_S2_surf_avg_omega_div_omega_crit',\
                             'INTERP_ERROR_S2_log_Teff',\
                             'INTERP_ERROR_S2_log_L', 'INTERP_ERROR_S2_log_R',\
                             'INTERP_ERROR_S2_spin_parameter',\
                             'INTERP_ERROR_S2_lambda_CE_10cent',\
                             'INTERP_ERROR_S1_MODEL01_mass',\
                             'INTERP_ERROR_S1_MODEL01_spin',\
                             'INTERP_ERROR_S1_MODEL01_m_disk_radiated',\
                             'INTERP_ERROR_S1_MODEL05_mass',\
                             'INTERP_ERROR_S1_MODEL05_spin',\
                             'INTERP_ERROR_S1_MODEL05_m_disk_radiated',\
                             'INTERP_ERROR_S1_MODEL06_mass',\
                             'INTERP_ERROR_S1_MODEL06_spin',\
                             'INTERP_ERROR_S1_MODEL06_m_disk_radiated',\
                             'INTERP_ERROR_S1_MODEL10_mass',\
                             'INTERP_ERROR_S1_MODEL10_spin',\
                             'INTERP_ERROR_S1_MODEL10_m_disk_radiated']
}
# predefined checking sets
CHECKING_SETS = {
    'CHECK_AFTER_CREATE'   : [],
    'CHECK_AFTER_COMBINE'  : ['failure_rate'],
    'CHECK_AFTER_EXTRA'    : ['CO_type', 'SN_type'],
    'CHECK_AFTER_TRAINING' : []
}
# common attributes for all steps
ATTRIBUTES_TO_CHECK = ['GRID_TYPES', 'METALLICITIES', 'GRID_SLICES',\
                       'COMPRESSIONS']

[docs] class PostProcessingPipeline: """A class to handle the post-processing pipeline.""" def __init__(self, path_to_inifile=None): """Initialize a pipeline Arguments --------- path_to_inifile : path The location of an ini file to read the parameters for the pipeline (default: None) """ self.PATH_TO_POSYDON = os.getenv('PATH_TO_POSYDON') self.PATH_TO_INIFILE = path_to_inifile if self.PATH_TO_INIFILE is not None: self.pipeline_kwargs = self.parse_setup_params(self.PATH_TO_INIFILE)
[docs] @staticmethod def parse_setup_params(path=None): """Parse inifile for running post-processing pipelines. Arguments --------- path : path The location of an ini file to read the parameters for the pipeline (default: None) Returns ------- dictionary """ if path is None: return else: parser = parse_inifile(path) pipeline_kwargs = dict() for section in parser.sections(): # create a subdirectory for each section section_dict = dict() for key, val in parser[section].items(): # interprete all input in python types # MK:this is probably not a save way to do this section_dict[key] = ast.literal_eval(val) pipeline_kwargs[section] = section_dict return pipeline_kwargs
[docs] def create_csv_and_slurm_job_files(self): """Creates all files the pipeline needs.""" def expand_runfile(runfile, last_step='', step_number='step_X'): """Add next entry to the script in runfile.""" if last_step == '': # if there was no previous step the current step has no # dependency runfile.write(f"ID{step_number}=$(sbatch --parsable " f"{step_number}.slurm)\n") else: # steps waiting for the previous step runfile.write(f"ID{step_number}=$(sbatch --parsable " "--dependency=afterok:$""{"f"ID{last_step}""} " "--kill-on-invalid-dep=yes " f"{step_number}.slurm)\n") runfile.write(f"echo '{step_number}.slurm submitted as '$" "{"f"ID{step_number}""}"f"\n") return step_number def get_step_kwargs(self, last_step='', step_number='step_X'): """Provides the attributes of a step. Additionally, it checks for missing attributes and may copies them from a previous step.""" if step_number not in self.pipeline_kwargs.keys(): raise KeyError(f'{step_number} not in pipeline_kwargs!') elif 'step' in step_number: # check for attributes for attribute in ATTRIBUTES_TO_CHECK: if (attribute not in self.pipeline_kwargs[step_number].\ keys() or not isinstance(self.pipeline_kwargs\ [step_number][attribute], list)): if last_step not in self.pipeline_kwargs.keys(): raise KeyError(f'{step_number} has no {attribute}') else: # infer missing attribute from last step print(f"\ncopy {attribute} from {last_step} to " f"{step_number}") self.pipeline_kwargs[step_number][attribute] = \ self.pipeline_kwargs[last_step][attribute] # replace pre defined sets if 'CREATE_PLOTS' in self.pipeline_kwargs[step_number].keys(): for set_name, set_content in PLOTTING_SETS.items(): if set_name in self.pipeline_kwargs[step_number]['CREATE_PLOTS']: self.pipeline_kwargs[step_number]['CREATE_PLOTS'].remove(set_name) self.pipeline_kwargs[step_number]['CREATE_PLOTS'].extend(set_content) if 'DO_CHECKS' in self.pipeline_kwargs[step_number].keys(): for set_name, set_content in CHECKING_SETS.items(): if set_name in self.pipeline_kwargs[step_number]['DO_CHECKS']: self.pipeline_kwargs[step_number]['DO_CHECKS'].remove(set_name) self.pipeline_kwargs[step_number]['DO_CHECKS'].extend(set_content) # return attributes return self.pipeline_kwargs[step_number] def has_substep(self, step_number='step_X', substep='ALL'): """Checks whether a step has substeps.""" if step_number not in self.pipeline_kwargs.keys(): raise KeyError(f'{step_number} not in pipeline_kwargs!') if substep=='ALL': # check all possible substeps substeps = SUB_STEPS_TO_EXTENSION.keys() else: # check the specified substep substeps = [substep] for sub_step in substeps: if sub_step in self.pipeline_kwargs[step_number].keys(): SUB_STEP_LIST = self.pipeline_kwargs[step_number][sub_step] if (isinstance(SUB_STEP_LIST, list) and len(SUB_STEP_LIST)>0): # at least one entry is found in the substep return True # no entry for the substep(s) return False # check for sections 'account' and 'pipeline setup' and store their # attributes if 'account' not in self.pipeline_kwargs.keys(): raise KeyError('account not in pipeline_kwargs!') if 'pipeline setup' not in self.pipeline_kwargs.keys(): raise KeyError('pipeline setup not in pipeline_kwargs!') account_kwargs = self.pipeline_kwargs['account'] setup_kwargs = self.pipeline_kwargs['pipeline setup'] # take verbose value from setup arguments if 'VERBOSE' in setup_kwargs.keys(): VERBOSE = setup_kwargs['VERBOSE'] else: VERBOSE = False if VERBOSE: print( "\n{:+^45s} \n{}\n\n{:+^45s}\n{}".format( 'ACCOUNT', pformat(account_kwargs, indent=2), 'SETUP', pformat(setup_kwargs, indent=2) ) ) previously_created_files = [] # create a runfile script with open('run_pipeline.sh', 'w') as runfile: runfile.write("#!/bin/bash\n") last_step = '' for action, step_number in ACTION_TO_STEP_NUM.items(): if action in setup_kwargs.keys(): do_step_bool = setup_kwargs[action] else: do_step_bool = False if VERBOSE: print( "\n\n{:-^45s} {:^8s}:{:^6s}".format( action, step_number, str(do_step_bool)) ) if do_step_bool or has_substep(self, step_number=step_number): step_kwargs = get_step_kwargs(self, last_step=last_step, step_number=step_number) if VERBOSE: print( pformat(step_kwargs, indent=2) ) if do_step_bool: # create csv file and remember therein created files previously_created_files += create_csv( step_name=step_number, previously_created_files=previously_created_files, **{**step_kwargs, **setup_kwargs} ) # create slurm file slurm_job( job_name=step_number, step_name=step_number, PATH_TO_POSYDON=self.PATH_TO_POSYDON, **{**step_kwargs, **setup_kwargs, **account_kwargs} ) # add job to the list in the runfile script expand_runfile(runfile, last_step=last_step, step_number=step_number) # remember that this step was done last_step = step_number for sub_step, step_extension in SUB_STEPS_TO_EXTENSION.items(): if sub_step not in self.pipeline_kwargs[step_number].keys(): continue if has_substep(self, step_number=step_number, substep=sub_step): # get sub-step number to be a concatenation of the main # step and the sub-step extension. sub_step_number = step_number+step_extension # create csv file for sub step create_csv( step_name=sub_step_number, previously_created_files=previously_created_files, **{**step_kwargs, **setup_kwargs} ) # create slurm file for sub step slurm_job( job_name=sub_step_number, step_name=step_number, PATH_TO_POSYDON=self.PATH_TO_POSYDON, **{**step_kwargs, **setup_kwargs, **account_kwargs} ) # add job to the list in the runfile script expand_runfile(runfile, last_step=last_step, step_number=sub_step_number) # there are no dependencies supported on sub steps # make the runfile script executable os.system("chmod 755 run_pipeline.sh")
[docs] def create_log_dirs(self): """Create directories to store the log files.""" # use work directory given by PATH in the setup attributes if 'pipeline setup' not in self.pipeline_kwargs.keys(): raise KeyError('pipeline setup not in pipeline_kwargs!') setup_kwargs = self.pipeline_kwargs['pipeline setup'] if 'PATH' in setup_kwargs.keys(): logs_path = os.path.join(setup_kwargs['PATH'], 'logs') else: logs_path = os.path.join('.', 'logs') # create logs in working dir to store all slurm outputs if not os.path.isdir(logs_path): os.makedirs(logs_path) # create sub directories for each step for action, step_number in ACTION_TO_STEP_NUM.items(): if ((action in setup_kwargs.keys()) and setup_kwargs[action]): step_log_path = os.path.join(logs_path, step_number) if not os.path.isdir(step_log_path): os.makedirs(step_log_path)
[docs] def create_export_dirs(self): """Create directories for the finally exported data.""" if 'pipeline setup' not in self.pipeline_kwargs.keys(): raise KeyError('pipeline setup not in pipeline_kwargs!') setup_kwargs = self.pipeline_kwargs['pipeline setup'] # create data dir three to export the datasets if (('EXPORT_DATASET' in setup_kwargs.keys()) and setup_kwargs['EXPORT_DATASET']): if 'PATH' in setup_kwargs.keys(): data_path = os.path.join(setup_kwargs['PATH'], 'POSYDON_data') else: data_path = os.path.join('.', 'POSYDON_data') # create main directory for the POSYDON data if not os.path.isdir(data_path): os.makedirs(data_path) # determine sub directories dirs = [] grid_dirs = ['HMS-HMS', 'CO-HMS_RLO', 'CO-HeMS', 'CO-HeMS_RLO', 'single_HMS', 'single_HeMS'] interp_dirs = ['interpolators', 'interpolators/1NN_1NN', 'interpolators/linear3c_kNN'] for name1 in grid_dirs: dirs.append(os.path.join(data_path, name1)) for name2 in interp_dirs: dirs.append(os.path.join(data_path, name1, name2)) # create sub directories for dir_ in dirs: if not os.path.isdir(dir_): os.makedirs(dir_)
[docs] def slurm_job(job_name, step_name, PATH_TO_GRIDS=None, PATH_TO_POSYDON=None, PATH='.', ACCOUNT=None, PARTITION=None, WALLTIME=None, MAILTYPE=None, EMAIL=None, CREATE_GRID_SLICES=False, COMBINE_GRID_SLICES=False, CALCULATE_EXTRA_VALUES=False, TRAIN_INTERPOLATORS=False, EXPORT_DATASET=False, RERUN=False, VERBOSE=False, **kwargs): """Create slurm file.""" def copy_old_log_file(path_to_logs='.', log_file='log'): """Make a copy of an old out file (and remove an old copy).""" path_to_out_file = os.path.join(path_to_logs, log_file+'.out') if os.path.exists(path_to_out_file): # move old data to a copy path_to_old_out_file = os.path.join(path_to_logs, log_file+'.old.out') if os.path.exists(path_to_old_out_file): # delete old copy os.remove(path_to_old_out_file) shutil.move(path_to_out_file, path_to_old_out_file) return path_to_out_file # get path to csv file path_to_csv_file = os.path.join(PATH, f'{job_name}.csv') # create slurm file with open(f'{job_name}.slurm', 'w') as f: # get STEPID if ('step' in job_name) and (len(job_name.split("_")) > 1): STEPID = job_name.split("_")[1] if (len(job_name.split("_")) > 2): STEPID += job_name.split("_")[2] elif job_name == 'rerun': STEPID = 'R' else: STEPID = '' # write slurm file content f.write("#!/bin/bash\n") if ACCOUNT is not None: f.write(f"#SBATCH --account={ACCOUNT}\n") if PARTITION is not None: f.write(f"#SBATCH --partition={PARTITION}\n") f.write("#SBATCH -N 1\n") f.write("#SBATCH --cpus-per-task 1\n") f.write("#SBATCH --ntasks-per-node 1\n") if WALLTIME is not None: f.write(f"#SBATCH --time={WALLTIME}\n") f.write(f"#SBATCH --job-name=psygrid{STEPID}\n") f.write("#SBATCH --mem-per-cpu=4G\n") if ((EMAIL is not None) and (MAILTYPE is not None)): f.write(f"#SBATCH --mail-type={MAILTYPE}\n") f.write(f"#SBATCH --mail-user={EMAIL}\n") # extract array size if ((job_name in ['step_1', 'step_3', 'step_4', 'step_9', 'rerun']) or ('plot' in job_name) or ('check' in job_name)): df = pd.read_csv(path_to_csv_file) N = df.shape[0]-1 elif job_name in ['step_2']: df = pd.read_csv(path_to_csv_file) N = df.shape[1]-1 else: raise ValueError(f'This should never happen! job_name={job_name}') if N<0: raise ValueError(f'{job_name} has no jobs to run, please check ' f'{path_to_csv_file}') f.write(f"#SBATCH --array=0-{N}\n") slurm_array = '$SLURM_ARRAY_TASK_ID' # get logs path and set the output of the slurm job if step_name in job_name: path_to_logs = os.path.join(PATH, 'logs', step_name) else: path_to_logs = os.path.join(PATH, 'logs', job_name) if job_name == 'step_1': # CREATE_GRID_SLICES f.write("#SBATCH --open-mode=truncate\n") f.write(f"#SBATCH --output={path_to_logs}/grid_slice_%a.out\n") if job_name == 'step_2': # COMBINE_GRID_SLICES path_to_out_file = copy_old_log_file(path_to_logs=path_to_logs, log_file='combine_grid_slices') f.write("#SBATCH --open-mode=append\n") f.write(f"#SBATCH --output={path_to_out_file}\n") if job_name == 'step_3': # CALCULATE_EXTRA_VALUES f.write("#SBATCH --open-mode=truncate\n") f.write(f"#SBATCH --output={path_to_logs}/post_processing_%a.out\n") f.write(f"export PATH_TO_POSYDON={PATH_TO_POSYDON}\n") if job_name == 'step_4': # TRAIN_INTERPOLATORS f.write("#SBATCH --open-mode=truncate\n") f.write(f"#SBATCH --output={path_to_logs}/train_interpolators_%a.out\n") if job_name == 'step_9': # EXPORT_DATASET path_to_out_file = copy_old_log_file(path_to_logs=path_to_logs, log_file='export_dataset') f.write("#SBATCH --open-mode=append\n") f.write(f"#SBATCH --output={path_to_out_file}\n") if job_name == 'rerun': # RERUN path_to_out_file = copy_old_log_file(path_to_logs=path_to_logs, log_file='rerun') f.write("#SBATCH --open-mode=append\n") f.write(f"#SBATCH --output={path_to_out_file}\n") if 'plot' in job_name: # PLOTS path_to_out_file = copy_old_log_file(path_to_logs=path_to_logs, log_file='plots_'+step_name) f.write("#SBATCH --open-mode=append\n") f.write(f"#SBATCH --output={path_to_out_file}\n") f.write("unset DISPLAY\n") if 'check' in job_name: # CHECKS path_to_out_file = copy_old_log_file(path_to_logs=path_to_logs, log_file='checks_'+step_name) f.write("#SBATCH --open-mode=append\n") f.write(f"#SBATCH --output={path_to_out_file}\n") # get path to run the pipeline path_to_run_pipeline = os.path.join(PATH_TO_POSYDON, 'bin', 'run-pipeline') # encode the verbose information for running the pipeline as a command # line argument if VERBOSE and 'check' not in job_name: f.write(f"\nsrun python {path_to_run_pipeline} {PATH_TO_GRIDS} " f"{path_to_csv_file} {slurm_array} 1") else: f.write(f"\nsrun python {path_to_run_pipeline} {PATH_TO_GRIDS} " f"{path_to_csv_file} {slurm_array} 0")
[docs] def create_csv(GRID_TYPES=[], METALLICITIES=[], GRID_SLICES=[], COMPRESSIONS=[], step_name='', VERSION='', GRIDS_COMBINED=[], INTERPOLATION_METHODS=[], CONTROL_GRIDS=[], STOP_BEFORE_CARBON_DEPLETION=0, RERUN_TYPE='', DROP_MISSING_FILES=False, CREATE_PLOTS=[], DO_CHECKS=[], PATH_TO_GRIDS='', PATH='', previously_created_files=[], **kwargs): """Create csv file with a list containing all data to process a step.""" # number of grid types N = len(GRID_TYPES) if ( N != len(METALLICITIES) or N != len(GRID_SLICES) or N != len(COMPRESSIONS)): raise ValueError('Missmatch between the len of GRID_TYPES, ' 'METALLICITIES, GRID_SLICES, COMPRESSIONS.') if step_name == 'step_2': # there need to be as many combined files specified as recipes are # given to create them. if N != len(GRIDS_COMBINED): raise ValueError('len(GRID_TYPES) != len(GRIDS_COMBINED)!') elif 'step_4_' in step_name: # sub steps of the interpolator training need a control grid if len(GRID_SLICES) != len(CONTROL_GRIDS): raise ValueError('len(GRID_SLICES) != len(CONTROL_GRIDS)!') grids = [] grids_compression = [] stop_before_carbon_depletion = [] grids_ORIGINAL = [] processed_grids = [] interpolators = [] export_path = [] rerun_path = [] rerun_type = [] plot_dirs = [] plot_quantities = [] checks = [] newly_created_files = [] df = pd.DataFrame() # loop over all grid types for l, grid_type in enumerate(GRID_TYPES): METALLICITIES_ = METALLICITIES[l] GRID_SLICES_ = GRID_SLICES[l] COMPRESSIONS_ = COMPRESSIONS[l] # loop over all metallicities, grid slices and compressions for metallicity in METALLICITIES_: for i, grid_slice in enumerate(GRID_SLICES_): for compression in COMPRESSIONS_: # get RLO extension text depending on compression if 'RLO' in compression: RLO = '_RLO' else: RLO = '' # create data lists depending on the (sub)step if step_name == 'step_1': # CREATE_GRID_SLICES grids.append(os.path.join(PATH_TO_GRIDS, grid_type, VERSION, metallicity, grid_slice)) grids_compression.append(compression) stop_before_carbon_depletion.append(STOP_BEFORE_CARBON_DEPLETION) elif step_name == 'step_2': # COMBINE_GRID_SLICES if len(GRID_SLICES[l]) != len(GRIDS_COMBINED[l]): raise ValueError(f'len(GRID_SLICES[{l}]) != ' f'len(GRIDS_COMBINED[{l}])!') combine_grid_slices = [] # grid_slice is a batch for grid_slice_ in grid_slice: path_to_grid = os.path.join(PATH_TO_GRIDS, grid_type, VERSION, metallicity, compression, grid_slice_+'.h5') combine_grid_slices.append(path_to_grid) path_to_grid_combined = os.path.join(PATH_TO_GRIDS, grid_type, VERSION, metallicity, compression, GRIDS_COMBINED[l][i]+'.h5') df_tmp = pd.DataFrame() df_tmp[path_to_grid_combined] = combine_grid_slices df = pd.concat([df,df_tmp], axis=1) elif step_name == 'step_3': # CALCULATE_EXTRA_VALUES grids.append(os.path.join(PATH_TO_GRIDS, grid_type, VERSION, metallicity, compression, grid_slice+'.h5')) grids_ORIGINAL.append(os.path.join(PATH_TO_GRIDS, grid_type, VERSION, metallicity, 'ORIGINAL'+RLO, grid_slice+'.h5')) processed_grids.append(os.path.join(PATH_TO_GRIDS, grid_type, VERSION, metallicity, compression, grid_slice+'_processed.h5')) elif step_name == 'step_4': # TRAIN_INTERPOLATORS for method in INTERPOLATION_METHODS: grids.append(os.path.join(PATH_TO_GRIDS, grid_type, VERSION, metallicity, compression, grid_slice+'.h5')) interpolators.append(os.path.join(PATH_TO_GRIDS, grid_type, VERSION, metallicity, 'interpolation_objects', 'IF_'+method+RLO+'.pkl')) elif step_name == 'step_9': # EXPORT_DATASET if "RLO" in grid_type: export_dir = grid_type else: export_dir = grid_type+RLO grids.append(os.path.join(PATH_TO_GRIDS, grid_type, VERSION, metallicity, compression, grid_slice+'.h5')) export_path.append(os.path.join(PATH, 'POSYDON_data', export_dir, metallicity+'.h5')) for method in [['linear','linear3c_kNN'], ['1NN','1NN_1NN']]: grids.append(os.path.join(PATH_TO_GRIDS, grid_type, VERSION, metallicity, 'interpolation_objects', 'IF_'+method[0]+RLO+'.pkl')) export_path.append(os.path.join(PATH, 'POSYDON_data', export_dir, 'interpolators', method[1], metallicity+'.pkl')) elif step_name == 'rerun': # RERUN grids.append(os.path.join(PATH_TO_GRIDS, grid_type, VERSION, metallicity, compression, grid_slice+'.h5')) rerun_path.append(os.path.join(PATH_TO_GRIDS, grid_type, VERSION, metallicity, 'rerun_'+RERUN_TYPE+'_'+grid_slice)) rerun_type.append(RERUN_TYPE) if (('plot' in step_name) or ('check' in step_name)): # get output grid names for sub steps if 'step_2' in step_name: grid_s = GRIDS_COMBINED[l][i] elif 'step_3' in step_name: grid_s = grid_slice+'_processed' elif 'step_4' in step_name: grid_s = CONTROL_GRIDS[l][i] else: grid_s = grid_slice if grid_s == '': # skip that slice for sub steps continue if 'step_4' not in step_name: # all steps without interpolation method get an # empty method if len(INTERPOLATION_METHODS)==0: INTERPOLATION_METHODS = [''] if 'plot' in step_name: # PLOTS # only plot first compression type, but allow for # with and without RLO flag if ((compression not in COMPRESSIONS_[0]) and (COMPRESSIONS_[0] not in compression)): continue # loop over interpolation methods and plots for method in INTERPOLATION_METHODS: # for to_plot in CREATE_PLOTS: # plot_quantities.append(to_plot) if len(CREATE_PLOTS)>0: plot_quantities.append(CREATE_PLOTS) grids.append(os.path.join(PATH_TO_GRIDS, grid_type, VERSION, metallicity, compression, grid_s+'.h5')) plot_dirs.append(os.path.join(PATH_TO_GRIDS, grid_type, VERSION, metallicity, 'plots', RLO[1:]+RLO[:1]+grid_s)) if method != '': # if there is a method interpolators.append(os.path.join(PATH_TO_GRIDS, grid_type, VERSION, metallicity, 'interpolation_objects', 'IF_'+method+RLO+'.pkl')) elif 'check' in step_name: # CHECKS # loop over interpolation methods and checks for method in INTERPOLATION_METHODS: # for to_check in DO_CHECKS: # checks.append(to_check) if len(DO_CHECKS)>0: checks.append(DO_CHECKS) grids.append(os.path.join(PATH_TO_GRIDS, grid_type, VERSION, metallicity, compression, grid_s+'.h5')) if method != '': # if there is a method interpolators.append(os.path.join(PATH_TO_GRIDS, grid_type, VERSION, metallicity, 'interpolation_objects', 'IF_'+method+RLO+'.pkl')) # saving dataset to csv file if step_name != 'step_2': grids = np.array(grids) df['path_to_grid'] = grids if step_name == 'step_1': df['compression'] = grids_compression df['stop_before_carbon_depletion'] = stop_before_carbon_depletion elif step_name == 'step_3': df['path_to_grid_ORIGINAL'] = grids_ORIGINAL df['path_to_processed_grid'] = processed_grids elif step_name == 'step_4': df['path_to_interpolator'] = interpolators elif step_name == 'step_9': df['export_path'] = export_path elif step_name == 'rerun': df['rerun_path'] = rerun_path df['rerun_type'] = rerun_type elif 'plot' in step_name: df['quantities_to_plot'] = plot_quantities df['path_to_plot'] = plot_dirs if len(interpolators)>0: df['path_to_interpolator'] = interpolators elif 'check' in step_name: df['checks_to_do'] = checks if len(interpolators)>0: df['path_to_interpolator'] = interpolators # drop lines when grid directories/files are not found if DROP_MISSING_FILES: drop_rows = [] for row in df.index: # grid file is always needed path = df.at[row,'path_to_grid'] if (not os.path.exists(path) and path not in previously_created_files): drop_rows.append(row) elif step_name == 'step_3': # grid file with ORIGINAL compression needed path = df.at[row,'path_to_grid_ORIGINAL'] if (not os.path.exists(path) and path not in previously_created_files): drop_rows.append(row) elif 'step_4_' in step_name: # interpolator files needed path = df.at[row,'path_to_interpolator'] if (not os.path.exists(path) and path not in previously_created_files): drop_rows.append(row) # report about missing files and remove tasks, which need them if len(drop_rows)>0: print("\n{:-^54s}".format(step_name)) print('The following grids will not be processed because the ' 'files/directories are missing! If this warning message ' 'is unexpected to you, please check the file paths!') for row in drop_rows: print(df.at[row,'path_to_grid']) if step_name == 'step_3': print(df.at[row,'path_to_grid_ORIGINAL']) elif 'step_4_' in step_name: print(df.at[row,'path_to_interpolator']) print('') df = df.drop(index=drop_rows) # keep track of files created by a step if step_name == 'step_1': for row in df.index: path = df.at[row,'path_to_grid'] compression = df.at[row,'compression'] newly_created_files.append(get_new_grid_name(path,compression)) elif step_name == 'step_3': newly_created_files = df['path_to_processed_grid'].to_list() elif step_name == 'step_4': newly_created_files = df['path_to_interpolator'].to_list() elif step_name == 'step_9': newly_created_files = df['export_path'].to_list() elif step_name == 'rerun': newly_created_files = df['rerun_path'].to_list() else: # step 2 # drop columns when psygrid files are not found if DROP_MISSING_FILES: first_time = True drop_columns = [] for column in df.keys(): drop_rows = [] for row in df.index: path = df.at[row,column] if pd.notna(path): if (not os.path.exists(path) and path not in previously_created_files): drop_rows.append(row) # report about missing files and adjust column if len(drop_rows)>0: if first_time: print("\n{:-^54s}".format(step_name)) print('If this warning message is unexpected to you, ' 'please check that step 1 occoured ' 'succesffully!') first_time = False print(f'In {column} the following grids are skipped!') for row in drop_rows: print(df.at[row,column]) df.at[row,column] = np.NaN print('') newcol = df[column].dropna().to_list() if len(newcol)==0: drop_columns.append(column) newcol.extend((df.shape[0]-len(newcol))*[np.NaN]) df[column] = newcol # report about empty tasks and remove them if len(drop_columns)>0: if first_time: print("\n{:-^54s}".format(step_name)) print('If this warning message is unexpected to you, ' 'please check that step 1 occoured ' 'succesffully!') first_time = False print('The following grids will not be combined as all grid ' 'slices are missing!') for column in drop_columns: print(column) print('') df = df.dropna(axis=0, how='all').dropna(axis=1, how='all') newly_created_files = df.keys().to_list() # finally save data frame output_fname = f'{step_name}.csv' df.to_csv(os.path.join(PATH, output_fname), index=False) # return list of new files created by that step return newly_created_files
if __name__ == '__main__': if len(sys.argv) >= 2: ini_file_path = str(sys.argv[1]) if '.' not in ini_file_path: ini_file_path = './' + ini_file_path else: raise ValueError('No ini file specified.') pipeline = PostProcessingPipeline(ini_file_path) pipeline.create_csv_and_slurm_job_files() pipeline.create_log_dirs() pipeline.create_export_dirs()