Source code for posydon_setup_pipeline

#!/usr/bin/env python
__authors__ = [
    "Simone Bavera <Simone.Bavera@unige.ch>",
    "Kyle Akira Rocha <kylerocha2024@u.northwestern.edu>",
    "Matthias Kruckow <Matthias.Kruckow@unige.ch>",
]

##############################################################################
#  IMPORT ALL NECESSARY PYTHON PACKAGES
##############################################################################
import os
import sys
import ast
import shutil
import pandas as pd
import numpy as np
from pprint import pformat
from posydon.popsyn.io import parse_inifile
from posydon.utils.common_functions import PATH_TO_POSYDON
from posydon.utils.gridutils import get_new_grid_name
from posydon.utils.posydonwarning import Pwarn

# this data processing pipeline was designed assuming POSYDON v2 data structure
'''
Data tree structure

PATH_TO_GRIDS/
    /HMS-HMS/
    /CO-HMS_RLO/
    /CO-HeMS/
    /single_HMS/
    /single_HeMS/
        /v1/
        /v2/
            /1e+00_Zsun/
            /1e-01_Zsun/
            /1e-02_Zsun/
            ...
                /grid_low_res_0/
                /grid_low_res_1/
                /grid_rerun_1/
                ...
                /LITE/
                /ORIGINAL/
                /plots/
                    /grid_low_res_combined/
                        /TF1/
                        /TF2/
                        ...
'''

# pipeline steps
ACTION_TO_STEP_NUM = {
    'CREATE_GRID_SLICES'         : 'step_1',
    'COMBINE_GRID_SLICES'        : 'step_2',
    'CALCULATE_EXTRA_VALUES'     : 'step_3',
    'TRAIN_INTERPOLATORS'        : 'step_4',
    'TRAIN_PROFILE_INTERPOLATORS': 'step_5',
    'EXPORT_DATASET'             : 'step_9',
    'RERUN'                      : 'rerun'
}
# pipeline substeps
SUB_STEPS_TO_EXTENSION = {
    'CREATE_PLOTS' : '_plots',
    'DO_CHECKS'    : '_checks'
}
# predefined plotting sets
PLOTTING_SETS = {
    'PLOT_AFTER_CREATE'   : [],
    'PLOT_AFTER_COMBINE'  : ['combined_TF12', 'termination_flag_1',\
                             'termination_flag_2', 'termination_flag_3',\
                             'termination_flag_4', 'rl_relative_overflow_1',\
                             'rl_relative_overflow_2', 'lg_mtransfer_rate'],
    'PLOT_AFTER_EXTRA'    : ['S1_MODEL01_CO_type', 'S1_MODEL01_SN_type',\
                             'S1_MODEL01_mass', 'S1_MODEL01_spin',\
                             'S1_MODEL01_m_disk_radiated',
                             'S1_MODEL02_CO_type', 'S1_MODEL02_SN_type',\
                             'S1_MODEL02_mass', 'S1_MODEL02_spin',\
                             'S1_MODEL02_m_disk_radiated',
                             'S1_MODEL03_CO_type', 'S1_MODEL03_SN_type',\
                             'S1_MODEL03_mass', 'S1_MODEL03_spin',\
                             'S1_MODEL03_m_disk_radiated',
                             'S1_MODEL04_CO_type', 'S1_MODEL04_SN_type',\
                             'S1_MODEL04_mass', 'S1_MODEL04_spin',\
                             'S1_MODEL04_m_disk_radiated',
                             'S1_MODEL05_CO_type', 'S1_MODEL05_SN_type',\
                             'S1_MODEL05_mass', 'S1_MODEL05_spin',\
                             'S1_MODEL05_m_disk_radiated',
                             'S1_MODEL06_CO_type', 'S1_MODEL06_SN_type',\
                             'S1_MODEL06_mass', 'S1_MODEL06_spin',\
                             'S1_MODEL06_m_disk_radiated',
                             'S1_MODEL07_CO_type', 'S1_MODEL07_SN_type',\
                             'S1_MODEL07_mass', 'S1_MODEL07_spin',\
                             'S1_MODEL07_m_disk_radiated',
                             'S1_MODEL08_CO_type', 'S1_MODEL08_SN_type',\
                             'S1_MODEL08_mass', 'S1_MODEL08_spin',\
                             'S1_MODEL08_m_disk_radiated',
                             'S1_MODEL09_CO_type', 'S1_MODEL09_SN_type',\
                             'S1_MODEL09_mass', 'S1_MODEL09_spin',\
                             'S1_MODEL09_m_disk_radiated',
                             'S1_MODEL10_CO_type', 'S1_MODEL10_SN_type',\
                             'S1_MODEL10_mass', 'S1_MODEL10_spin',\
                             'S1_MODEL10_m_disk_radiated'],
    'PLOT_AFTER_TRAINING' : ['INTERP_ERROR_age', 'INTERP_ERROR_star_1_mass',\
                             'INTERP_ERROR_star_2_mass',\
                             'INTERP_ERROR_period_days',\
                             'INTERP_ERROR_S1_co_core_mass',\
                             'INTERP_ERROR_S1_co_core_radius',\
                             'INTERP_ERROR_S1_he_core_mass',\
                             'INTERP_ERROR_S1_he_core_radius',\
                             'INTERP_ERROR_S1_center_h1',\
                             'INTERP_ERROR_S1_center_he4',\
                             'INTERP_ERROR_S1_surface_h1',\
                             'INTERP_ERROR_S1_surface_he4',\
                             'INTERP_ERROR_S1_surf_avg_omega_div_omega_crit',\
                             'INTERP_ERROR_S1_log_Teff',\
                             'INTERP_ERROR_S1_log_L', 'INTERP_ERROR_S1_log_R',\
                             'INTERP_ERROR_S1_spin_parameter',\
                             'INTERP_ERROR_S1_lambda_CE_10cent',\
                             'INTERP_ERROR_S2_co_core_mass',\
                             'INTERP_ERROR_S2_co_core_radius',\
                             'INTERP_ERROR_S2_he_core_mass',\
                             'INTERP_ERROR_S2_he_core_radius',\
                             'INTERP_ERROR_S2_center_h1',\
                             'INTERP_ERROR_S2_center_he4',\
                             'INTERP_ERROR_S2_surface_h1',\
                             'INTERP_ERROR_S2_surface_he4',\
                             'INTERP_ERROR_S2_surf_avg_omega_div_omega_crit',\
                             'INTERP_ERROR_S2_log_Teff',\
                             'INTERP_ERROR_S2_log_L', 'INTERP_ERROR_S2_log_R',\
                             'INTERP_ERROR_S2_spin_parameter',\
                             'INTERP_ERROR_S2_lambda_CE_10cent',\
                             'INTERP_ERROR_S1_MODEL01_mass',\
                             'INTERP_ERROR_S1_MODEL01_spin',\
                             'INTERP_ERROR_S1_MODEL01_m_disk_radiated',\
                             'INTERP_ERROR_S1_MODEL05_mass',\
                             'INTERP_ERROR_S1_MODEL05_spin',\
                             'INTERP_ERROR_S1_MODEL05_m_disk_radiated',\
                             'INTERP_ERROR_S1_MODEL06_mass',\
                             'INTERP_ERROR_S1_MODEL06_spin',\
                             'INTERP_ERROR_S1_MODEL06_m_disk_radiated',\
                             'INTERP_ERROR_S1_MODEL10_mass',\
                             'INTERP_ERROR_S1_MODEL10_spin',\
                             'INTERP_ERROR_S1_MODEL10_m_disk_radiated'],
    'PLOT_AFTER_PROFILE_TRAINING' : []
}
# predefined checking sets
CHECKING_SETS = {
    'CHECK_AFTER_CREATE'           : [],
    'CHECK_AFTER_COMBINE'          : ['failure_rate'],
    'CHECK_AFTER_EXTRA'            : ['CO_type', 'SN_type'],
    'CHECK_AFTER_TRAINING'         : [],
    'CHECK_AFTER_PROFILE_TRAINING' : []
}
# common attributes for all steps
ATTRIBUTES_TO_CHECK = ['GRID_TYPES', 'METALLICITIES', 'GRID_SLICES',\
                       'COMPRESSIONS']

[docs] class PostProcessingPipeline: """A class to handle the post-processing pipeline.""" def __init__(self, path_to_inifile=None): """Initialize a pipeline Parameters ---------- path_to_inifile : path The location of an ini file to read the parameters for the pipeline (default: None) """ # set base variables self.PATH_TO_POSYDON = PATH_TO_POSYDON self.PATH_TO_INIFILE = path_to_inifile # if there is an ini file extract the parameters from it if self.PATH_TO_INIFILE is not None: if os.path.isfile(self.PATH_TO_INIFILE): self.pipeline_kwargs = self.parse_setup_params( self.PATH_TO_INIFILE) else: raise FileNotFoundError("Can't find ini-file: " f"{self.PATH_TO_INIFILE}")
[docs] @staticmethod def parse_setup_params(path=None): """Parse inifile for running post-processing pipelines. Parameters ---------- path : path The location of an ini file to read the parameters for the pipeline (default: None) Returns ------- dict A dictionary with all the parameters in the ini file. It accounts for one sectioning level. """ if path is None: return else: parser = parse_inifile(path) pipeline_kwargs = dict() for section in parser.sections(): # create a subdirectory for each section section_dict = dict() for key, val in parser[section].items(): # interprete all input in python types # MK:this is probably not a save way to do this section_dict[key] = ast.literal_eval(val) pipeline_kwargs[section] = section_dict return pipeline_kwargs
[docs] def create_csv_and_slurm_job_files(self): """Creates all files the pipeline needs.""" def expand_runfile(runfile, step_number='step_X', last_step=''): """Add next entry to the script in runfile. Parameters ---------- runfile : file object (text file) IO object of an opened file to write to step_number : str String representation of the current step last_step : str String representation of the last step Returns ------- str On success it returns step_number """ if not runfile.writable(): raise PermissionError(f"Can't write to {runfile.name}") if not os.path.isfile(f"{step_number}.slurm"): raise FileNotFoundError(f"Can't find file {step_number}.slurm") if last_step == '': # if there was no previous step the current step has no # dependency runfile.write(f"ID{step_number}=$(sbatch --parsable " f"{step_number}.slurm)\n") else: # steps waiting for the previous step runfile.write(f"ID{step_number}=$(sbatch --parsable " "--dependency=afterok:$""{"f"ID{last_step}""} " "--kill-on-invalid-dep=yes " f"{step_number}.slurm)\n") runfile.write(f"echo '{step_number}.slurm submitted as '$" "{"f"ID{step_number}""}"f"\n") return step_number def get_step_kwargs(self, step_number='step_X', last_step=''): """Provides the attributes of a step. Additionally, it replaces defined sets and it checks for missing attributes and may copies them from a previous step. Parameters ---------- step_number : str String representation of the current step last_step : str String representation of the last step Returns ------- dict A dictionary containing all the parameters for the current step """ if step_number not in self.pipeline_kwargs.keys(): raise KeyError(f'{step_number} not in pipeline_kwargs.') elif (('step' in step_number) or (step_number=='rerun')): # check for attributes for attribute in ATTRIBUTES_TO_CHECK: if (attribute not in self.pipeline_kwargs[step_number].\ keys() or not isinstance(self.pipeline_kwargs\ [step_number][attribute], list)): if (last_step in self.pipeline_kwargs.keys()\ and attribute in\ self.pipeline_kwargs[last_step].keys()): # infer missing attribute from last step Pwarn(f"Copy {attribute} from {last_step} to " f"{step_number}", "ReplaceValueWarning") self.pipeline_kwargs[step_number][attribute] = \ self.pipeline_kwargs[last_step][attribute] else: raise KeyError(f'{step_number} has no {attribute}') # replace pre defined sets if 'CREATE_PLOTS' in self.pipeline_kwargs[step_number].keys(): for set_name, set_content in PLOTTING_SETS.items(): if set_name in self.pipeline_kwargs[step_number]\ ['CREATE_PLOTS']: self.pipeline_kwargs[step_number]['CREATE_PLOTS'].\ remove(set_name) self.pipeline_kwargs[step_number]['CREATE_PLOTS'].\ extend(set_content) # remove all plot entries in case the global plotting flag # is off if (('pipeline setup' in self.pipeline_kwargs.keys()) and\ ('MAKE_PLOTS' in\ self.pipeline_kwargs['pipeline setup'].keys()) and not (self.pipeline_kwargs['pipeline setup']\ ['MAKE_PLOTS'])): self.pipeline_kwargs[step_number]['CREATE_PLOTS'] = [] if 'DO_CHECKS' in self.pipeline_kwargs[step_number].keys(): for set_name, set_content in CHECKING_SETS.items(): if set_name in self.pipeline_kwargs[step_number]\ ['DO_CHECKS']: self.pipeline_kwargs[step_number]['DO_CHECKS'].\ remove(set_name) self.pipeline_kwargs[step_number]['DO_CHECKS'].\ extend(set_content) # remove all check entries in case the global checking flag # is off if (('pipeline setup' in self.pipeline_kwargs.keys()) and\ ('MAKE_CHECKS' in\ self.pipeline_kwargs['pipeline setup'].keys()) and not (self.pipeline_kwargs['pipeline setup']\ ['MAKE_CHECKS'])): self.pipeline_kwargs[step_number]['DO_CHECKS'] = [] # return attributes return self.pipeline_kwargs[step_number] def has_substep(self, step_number='step_X', substep='ALL'): """Checks whether a step has substeps. Parameters ---------- step_number : str String representation of the current step substep : str String representation of the substep Returns ------- bool Only True if a valid substep is found """ if step_number not in self.pipeline_kwargs.keys(): raise KeyError(f'{step_number} not in pipeline_kwargs!') if substep=='ALL': # check all possible substeps substeps = SUB_STEPS_TO_EXTENSION.keys() else: # check the specified substep substeps = [substep] for sub_step in substeps: if sub_step in self.pipeline_kwargs[step_number].keys(): SUB_STEP_LIST = self.pipeline_kwargs[step_number][sub_step] if (isinstance(SUB_STEP_LIST, list) and len(SUB_STEP_LIST)>0): # at least one entry is found in the substep return True # no entry for the substep(s) return False # check for sections 'account' and 'pipeline setup' and store their # attributes if 'account' not in self.pipeline_kwargs.keys(): raise KeyError('account not in pipeline_kwargs!') if 'pipeline setup' not in self.pipeline_kwargs.keys(): raise KeyError('pipeline setup not in pipeline_kwargs!') account_kwargs = self.pipeline_kwargs['account'] setup_kwargs = self.pipeline_kwargs['pipeline setup'] # take verbose value from setup arguments if 'VERBOSE' in setup_kwargs.keys(): VERBOSE = setup_kwargs['VERBOSE'] else: Pwarn("VERBOSE not given, use False", "ReplaceValueWarning") VERBOSE = False if VERBOSE: print( "\n{:+^45s} \n{}\n\n{:+^45s}\n{}".format( 'ACCOUNT', pformat(account_kwargs, indent=2), 'SETUP', pformat(setup_kwargs, indent=2) ) ) previously_created_files = [] plot_dirs = [] pipeline_steps = [] if os.path.exists('run_pipeline.sh'): Pwarn('Replace run_pipeline.sh', "OverwriteWarning") # create a runfile script with open('run_pipeline.sh', 'w') as runfile: if runfile.writable(): runfile.write("#!/bin/bash\n") else: raise PermissionError(f"Can't write to {runfile.name}") # loop through all steps last_step = '' for action, step_number in ACTION_TO_STEP_NUM.items(): if action in setup_kwargs.keys(): do_step_bool = setup_kwargs[action] else: do_step_bool = False if VERBOSE: print( "\n\n{:-^45s} {:^8s}:{:^6s}".format( action, step_number, str(do_step_bool)) ) if do_step_bool or has_substep(self, step_number=step_number): step_kwargs = get_step_kwargs(self, step_number=step_number, last_step=last_step) if VERBOSE: print( pformat(step_kwargs, indent=2) ) if do_step_bool: # create csv file and remember therein created files previously_created_files += create_csv( step_name=step_number, previously_created_files=previously_created_files, **{**step_kwargs, **setup_kwargs} ) # create slurm file slurm_job( job_name=step_number, step_name=step_number, PATH_TO_POSYDON=self.PATH_TO_POSYDON, **{**step_kwargs, **setup_kwargs, **account_kwargs} ) pipeline_steps.append(step_number) # add job to the list in the runfile script expand_runfile(runfile, step_number=step_number, last_step=last_step) # remember that this step was done last last_step = step_number # add cleanup if create_cleanup_slurm_job(job_name=step_number,\ step_name=step_number,\ **{**step_kwargs, **setup_kwargs, **account_kwargs}): pipeline_steps.append(step_number+"_cleanup") expand_runfile(runfile, step_number=step_number+"_cleanup", last_step=last_step) # try to load the profile interpolation module, because it # will be needed when running step 5 if step_number == "step_5": from posydon.interpolation.profile_interpolation\ import CompileData, ProfileInterpolator for sub_step, step_extension in SUB_STEPS_TO_EXTENSION.items(): if sub_step not in\ self.pipeline_kwargs[step_number].keys(): continue if has_substep(self, step_number=step_number, substep=sub_step): # get sub-step number to be a concatenation of the main # step and the sub-step extension. sub_step_number = step_number+step_extension # create csv file for sub step plot_dirs += create_csv( step_name=sub_step_number, previously_created_files=previously_created_files, **{**step_kwargs, **setup_kwargs} ) # create slurm file for sub step slurm_job( job_name=sub_step_number, step_name=step_number, PATH_TO_POSYDON=self.PATH_TO_POSYDON, **{**step_kwargs, **setup_kwargs, **account_kwargs} ) pipeline_steps.append(sub_step_number) # add job to the list in the runfile script expand_runfile(runfile, step_number=sub_step_number, last_step=last_step) # add cleanup if create_cleanup_slurm_job(job_name=sub_step_number,\ step_name=step_number, **{**step_kwargs,\ **setup_kwargs, **account_kwargs}): pipeline_steps.append(sub_step_number+"_cleanup") expand_runfile(runfile, step_number=sub_step_number+"_cleanup", last_step=sub_step_number) # there are no dependencies supported on sub steps # beside the cleanup # add cleanup for pipeline if create_cleanup_slurm_job(job_name='pipeline',\ step_name='pipeline',\ **{**step_kwargs, **setup_kwargs, **account_kwargs}): # create csv with all created files df = pd.DataFrame() if 'PATH' in setup_kwargs.keys(): PATH = setup_kwargs['PATH'] else: Pwarn("PATH not given, use current directory", "ReplaceValueWarning") PATH = '.' df['pipeline_files'] = previously_created_files +\ list(set(plot_dirs)) df.to_csv(os.path.join(PATH, 'pipeline_files.csv'), header=False, index=False) # add cleanup to run script submission_line = "IDpipeline_cleanup=$(sbatch --parsable " for i, step in enumerate(pipeline_steps): if i==0: submission_line += "--dependency=" else: submission_line += "," submission_line += "afterany:$""{"f"ID{step}""}" submission_line += " --kill-on-invalid-dep=yes " submission_line += "pipeline_cleanup.slurm)\n" if runfile.writable(): if not os.path.isfile("pipeline_cleanup.slurm"): raise FileNotFoundError("Can't find file " "pipeline_cleanup.slurm") runfile.write(submission_line) runfile.write("echo 'pipeline_cleanup.slurm submitted as '" "${IDpipeline_cleanup}\n") else: raise PermissionError(f"Can't write to {runfile.name}") # make the runfile script executable os.system("chmod a+x run_pipeline.sh")
[docs] def create_log_dirs(self): """Create directories to store the log files.""" # use work directory given by PATH in the setup attributes if 'pipeline setup' not in self.pipeline_kwargs.keys(): raise KeyError('pipeline setup not in pipeline_kwargs!') setup_kwargs = self.pipeline_kwargs['pipeline setup'] if 'PATH' in setup_kwargs.keys(): logs_path = os.path.join(setup_kwargs['PATH'], 'logs') else: Pwarn("PATH not given, use current directory", "ReplaceValueWarning") logs_path = os.path.join('.', 'logs') # create logs in working dir to store all slurm outputs if not os.path.isdir(logs_path): os.makedirs(logs_path) # create sub directories for each step for action, step_number in ACTION_TO_STEP_NUM.items(): if ((action in setup_kwargs.keys()) and setup_kwargs[action]): step_log_path = os.path.join(logs_path, step_number) if not os.path.isdir(step_log_path): os.makedirs(step_log_path) # create sub directory for cleanup step_log_path = os.path.join(logs_path, 'cleanup') if not os.path.isdir(step_log_path): os.makedirs(step_log_path)
[docs] def slurm_job(job_name, step_name, PATH_TO_GRIDS=None, PATH_TO_POSYDON=None, PATH='.', ACCOUNT=None, PARTITION=None, WALLTIME=None, MAILTYPE=None, EMAIL=None, VERBOSE=False, **kwargs): """Create slurm file. Parameters ---------- job_name : str String representation of the job. It will be used in the file name. step_name : str String representation of the parent step. It will be used for the logs. PATH_TO_GRIDS : str Path where to find the root directory of the grids. PATH_TO_POSYDON : str Path where to find the root directory of POSYDON. PATH : str Path of the working directory. ACCOUNT : str The slurm account name. Passed to slurm's option '--account'. PARTITION : str The slurm partition name. Passed to slurm's option '--partition'. WALLTIME : str Maximum time for the slurm job. Passed to slurm's option '--time'. MAILTYPE : str Specifier, which emails to send by slurm. Passed to slurm's option '--mail-type'. It requires EMAIL to be set, too. EMAIL : str Email address to send mails to. Passed to slurm's option '--mail-user'. It requires MAILTYPE to be set, too. VERBOSE : bool Enables/Disables additional output. kwargs : dict Other parameters (all being ignored). """ if not os.path.isdir(PATH_TO_GRIDS): raise NotADirectoryError(f"PATH_TO_GRIDS={PATH_TO_GRIDS} not found.") # get path to csv file path_to_csv_file = os.path.join(PATH, f'{job_name}.csv') if not os.path.isfile(path_to_csv_file): raise FileNotFoundError(f"Missing csv file: {path_to_csv_file}") if os.path.exists(f'{job_name}.slurm'): Pwarn(f'Replace {job_name}.slurm', "OverwriteWarning") # create slurm file with open(f'{job_name}.slurm', 'w') as f: # get STEPID if ('step' in job_name) and (len(job_name.split("_")) > 1): STEPID = job_name.split("_")[1] if (len(job_name.split("_")) > 2): STEPID += job_name.split("_")[2] elif job_name == 'rerun': STEPID = 'R' else: Pwarn("No step ID detected, leave it empty", "ReplaceValueWarning") STEPID = '' # write slurm file content f.write("#!/bin/bash\n") if ACCOUNT is not None: f.write(f"#SBATCH --account={ACCOUNT}\n") if PARTITION is not None: f.write(f"#SBATCH --partition={PARTITION}\n") f.write("#SBATCH -N 1\n") f.write("#SBATCH --cpus-per-task 1\n") f.write("#SBATCH --ntasks-per-node 1\n") if WALLTIME is not None: f.write(f"#SBATCH --time={WALLTIME}\n") f.write(f"#SBATCH --job-name=psygrid{STEPID}\n") f.write("#SBATCH --mem-per-cpu=4G\n") if ((EMAIL is not None) and (MAILTYPE is not None)): f.write(f"#SBATCH --mail-type={MAILTYPE}\n") f.write(f"#SBATCH --mail-user={EMAIL}\n") # extract array size if ((job_name in ['step_1', 'step_3', 'step_4', 'step_5', 'step_9',\ 'rerun']) or ('plot' in job_name) or ('check' in job_name)): df = pd.read_csv(path_to_csv_file) N = df.shape[0]-1 elif job_name in ['step_2']: df = pd.read_csv(path_to_csv_file) N = df.shape[1]-1 else: raise ValueError(f'This should never happen! job_name={job_name}') if N<0: raise ValueError(f'{job_name} has no jobs to run, please check ' f'{path_to_csv_file}') f.write(f"#SBATCH --array=0-{N}\n") slurm_array = '$SLURM_ARRAY_TASK_ID' # get logs path and set the output of the slurm job if step_name in job_name: path_to_logs = os.path.join(PATH, 'logs', step_name) else: path_to_logs = os.path.join(PATH, 'logs', job_name) log_file_name = get_log_file_name(job_name=job_name,\ step_name=step_name) f.write("#SBATCH --open-mode=truncate\n") f.write(f"#SBATCH --output={path_to_logs}/{log_file_name}\n") # set PATH_TO_POSYDON where needed if job_name == 'step_3': # CALCULATE_EXTRA_VALUES f.write(f"export PATH_TO_POSYDON={PATH_TO_POSYDON}\n") # reset display settings if 'plot' in job_name: # PLOTS f.write("unset DISPLAY\n") # get path to run the pipeline path_to_run_pipeline = os.path.join(PATH_TO_POSYDON, 'bin', 'posydon-run-pipeline') if not os.path.isfile(path_to_run_pipeline): raise FileNotFoundError("Pipeline's runfile not found at " f"{path_to_run_pipeline}.") # encode the verbose information for running the pipeline as a command # line argument if VERBOSE and 'check' not in job_name: f.write(f"\nsrun python {path_to_run_pipeline} {PATH_TO_GRIDS} " f"{path_to_csv_file} {slurm_array} 1") else: f.write(f"\nsrun python {path_to_run_pipeline} {PATH_TO_GRIDS} " f"{path_to_csv_file} {slurm_array} 0")
[docs] def create_csv(GRID_TYPES=[], METALLICITIES=[], GRID_SLICES=[], COMPRESSIONS=[], step_name='', VERSION='', PLOT_EXTENSION='pdf', GRIDS_COMBINED=[], ORIGINAL_COMPRESSIONS=[], INTERPOLATION_METHODS=[], PROFILE_NAMES=[], CONTROL_GRIDS=[], STOP_BEFORE_CARBON_DEPLETION=0, RERUN_TYPE='', CLUSTER='quest', DROP_MISSING_FILES=False, CREATE_PLOTS=[], DO_CHECKS=[], PATH_TO_GRIDS='', PATH='.', previously_created_files=[], **kwargs): """Create csv file with a list containing all data to process a step. Parameters ---------- GRID_TYPES : list Grid types in the directory tree in PATH_TO_GRIDS. METALLICITIES : list Metallicities in the directory tree below grid types (or version). GRID_SLICES : list Grid slices in the directory tree below the metallicities. COMPRESSIONS: list Compression types (e.g. LITE or ORIGINAL). step_name : str String representation of the step. It will be used in the file name. VERSION : str Version in the directory tree level below the grid type. (outdated) PLOT_EXTENSION : str The file extension for plots recognized by mathplotlib or 'multipage-pdf'. GRIDS_COMBINED : list The names of the new grids combined from several slices. ORIGINAL_COMPRESSIONS : list Grids with the orgininal compression to get detailed data from them. INTERPOLATION_METHODS : list The interpolation methodes (e.g. linear or 1NN). PROFILE_NAMES : list The profiles which should get interpolated (e.g. radius, logRho, ...) CONTROL_GRIDS : list Grids used to check the interpolators. STOP_BEFORE_CARBON_DEPLETION : int Flag to stop high mass stars before carbon depletion because of chaotic behavior. RERUN_TYPE : str Rerun types (e.g. PISN, reverse_MT, ...). CLUSTER : str Name of the cluster (e.g. yggdrasil or quest). DROP_MISSING_FILES : bool Enable/Disable to ignore files which neither exists nor are created beforehand. CREATE_PLOTS : list Requested plots to be created (e.g. combined_TF12, termination_flag_1, ...). DO_CHECKS : list Requested checks (e.g. failure_rate, CO_type). PATH_TO_GRIDS : str Path where to find the root directory of the grids. PATH : str Path of the working directory. previously_created_files : list List of filenames created by previous steps. kwargs : dict Other parameters (all being ignored). Returns ------- list List of names of files which will be created by this step. """ if not os.path.isdir(PATH_TO_GRIDS): raise NotADirectoryError(f"PATH_TO_GRIDS={PATH_TO_GRIDS} not found.") if not os.path.isdir(PATH): raise NotADirectoryError(f"PATH={PATH} not found.") # number of grid types N = len(GRID_TYPES) if ( N != len(METALLICITIES) or N != len(GRID_SLICES) or N != len(COMPRESSIONS)): raise ValueError('Missmatch between the len of GRID_TYPES, ' 'METALLICITIES, GRID_SLICES, COMPRESSIONS.' f'Lengths: {N}, {len(METALLICITIES)}, ' f'{len(GRID_SLICES)}, {len(COMPRESSIONS)}') if step_name == 'step_2': # there need to be as many combined files specified as recipes are # given to create them. if N != len(GRIDS_COMBINED): raise ValueError(f'len(GRID_TYPES) = {len(GRID_TYPES)} != ' f'{len(GRIDS_COMBINED)} = len(GRIDS_COMBINED)!') elif step_name == 'step_3': # here we need a reference grid with ORIGINAL compression to # calculated the extra quantities from. if N != len(ORIGINAL_COMPRESSIONS): raise ValueError(f'len(GRID_TYPES) = {len(GRID_TYPES)} != ' f'{len(ORIGINAL_COMPRESSIONS)} = ' 'len(ORIGINAL_COMPRESSIONS)!') elif (('step_4_' in step_name) or ('step_5' in step_name)): # sub steps of the interpolator training need a control grid if len(GRID_SLICES) != len(CONTROL_GRIDS): raise ValueError(f'len(GRID_SLICES) = {len(GRID_SLICES)} != ' f'{len(CONTROL_GRIDS)} = len(CONTROL_GRIDS)!') grids = [] grids_compression = [] grids_type = [] stop_before_carbon_depletion = [] grids_ORIGINAL = [] processed_grids = [] interpolation_methods = [] interpolators = [] profile_quantities = [] profile_interpolators = [] export_path = [] rerun_metallicities = [] rerun_path = [] rerun_type = [] clusters = [] plot_dirs = [] plot_quantities = [] plot_extensions = [] checks = [] newly_created_files = [] df = pd.DataFrame() # loop over all grid types for l, grid_type in enumerate(GRID_TYPES): METALLICITIES_ = METALLICITIES[l] GRID_SLICES_ = GRID_SLICES[l] COMPRESSIONS_ = COMPRESSIONS[l] if (isinstance(ORIGINAL_COMPRESSIONS, list) and\ len(ORIGINAL_COMPRESSIONS)>0): ORIGINAL_COMPRESSIONS_ = ORIGINAL_COMPRESSIONS[l] # loop over all metallicities, grid slices and compressions for metallicity in METALLICITIES_: for i, grid_slice in enumerate(GRID_SLICES_): for compression in COMPRESSIONS_: # get RLO extension text depending on compression if 'RLO' in compression: RLO = '_RLO' else: RLO = '' # check for combined gird in step 2 and its supsteps if 'step_2' in step_name: if len(GRID_SLICES[l]) != len(GRIDS_COMBINED[l]): raise ValueError(f'len(GRID_SLICES[{l}]) = ' f'{len(GRID_SLICES[l])} != ' f'{len(GRIDS_COMBINED[l])} =' f'len(GRIDS_COMBINED[{l}])!') # check for combined gird in steps 4 and 5 (incl. supsteps) if (('step_4_' in step_name) or ('step_5' in step_name)): if len(GRID_SLICES[l]) != len(CONTROL_GRIDS[l]): raise ValueError(f'len(GRID_SLICES[{l}]) = ' f'{len(GRID_SLICES[l])} != ' f'{len(CONTROL_GRIDS[l])} =' f'len(CONTROL_GRIDS[{l}])!') # create data lists depending on the (sub)step if step_name == 'step_1': # CREATE_GRID_SLICES # This step needs the location of the grid slice, the # compression applied to it, the type of the gird, and # the information, whether it is OK to stop cut of some # evolution short before carbon depletion for massive # stars. grids.append(os.path.join(PATH_TO_GRIDS, grid_type, VERSION, metallicity, grid_slice)) grids_compression.append(compression) grids_type.append(grid_type) stop_before_carbon_depletion.append( STOP_BEFORE_CARBON_DEPLETION) if i==0: compression_dir = os.path.join(PATH_TO_GRIDS, grid_type, VERSION, metallicity, compression) # the compression directory will be created if not # existing already if not os.path.isdir(compression_dir): newly_created_files.append(compression_dir) elif step_name == 'step_2': # COMBINE_GRID_SLICES # This step needs the name of the combined gird and the # information which slices get combined for each # combined gird. It will be stored in a data frame, # with the name of the combined grid as column key and # the column containing the slices to be combined. combine_grid_slices = [] # grid_slice is a batch for grid_slice_ in grid_slice: path_to_grid = os.path.join(PATH_TO_GRIDS, grid_type, VERSION, metallicity, compression, grid_slice_+'.h5') combine_grid_slices.append(path_to_grid) path_to_grid_combined = os.path.join(PATH_TO_GRIDS, grid_type, VERSION, metallicity, compression, GRIDS_COMBINED[l][i]+'.h5') df_tmp = pd.DataFrame() df_tmp[path_to_grid_combined] = combine_grid_slices df = pd.concat([df,df_tmp], axis=1) elif step_name == 'step_3': # CALCULATE_EXTRA_VALUES if (isinstance(ORIGINAL_COMPRESSIONS_, list) and\ len(ORIGINAL_COMPRESSIONS_)>0): # use one orginal for all compressions, hence use # first one and ignore others original_compression = ORIGINAL_COMPRESSIONS_[0] else: # if not defined assume 'ORIGINAL' original_compression = 'ORIGINAL' # This step needs the grid to process, the grid type, # the grid containing the ORIGINAL data, and the name # of the new grid containing the processed data. grids.append(os.path.join(PATH_TO_GRIDS, grid_type, VERSION, metallicity, compression, grid_slice+'.h5')) grids_type.append(grid_type) grids_ORIGINAL.append(os.path.join(PATH_TO_GRIDS, grid_type, VERSION, metallicity, original_compression+RLO, grid_slice+'.h5')) processed_grids.append(os.path.join(PATH_TO_GRIDS, grid_type, VERSION, metallicity, compression, grid_slice+'_processed.h5')) elif step_name == 'step_4': # TRAIN_INTERPOLATORS # This step needs the grids to use for the training, # the grid type, the interpolation methods, and the # names of the files where the interpolator data gets # written to. for method in INTERPOLATION_METHODS: grids.append(os.path.join(PATH_TO_GRIDS, grid_type, VERSION, metallicity, compression, grid_slice+'.h5')) grids_type.append(grid_type) interpolation_methods.append(method+RLO) interpolators.append(os.path.join(PATH_TO_GRIDS, grid_type, VERSION, metallicity, 'interpolation_objects', 'IF_'+method+RLO+'.pkl')) if i==0: interpolator_dir = os.path.join(PATH_TO_GRIDS, grid_type, VERSION, metallicity, 'interpolation_objects') # the interpolator directory will be created if not # existing already if not os.path.isdir(interpolator_dir): newly_created_files.append(interpolator_dir) elif step_name == 'step_5': # TRAIN_PROFILE_INTERPOLATORS # This step needs the grids to use for the training, # the grid type, the names of the files of the IF # interpolators, the quantities to get profiles # interpolated for, and the names of the files where # the interpolator data gets written to. if (isinstance(PROFILE_NAMES, list) and\ (len(PROFILE_NAMES)>0)): for method in INTERPOLATION_METHODS: grids.append(os.path.join(PATH_TO_GRIDS, grid_type, VERSION, metallicity, compression, grid_slice+'.h5')) grids_type.append(grid_type) interpolators.append(os.path.join( PATH_TO_GRIDS, grid_type, VERSION, metallicity, 'interpolation_objects', 'IF_'+method+RLO+'.pkl')) profile_quantities.append(PROFILE_NAMES) profile_interpolators.append(os.path.join( PATH_TO_GRIDS, grid_type, VERSION, metallicity, 'interpolation_objects', 'profile_'+method+RLO+'.pkl')) if i==0: interpolator_dir = os.path.join(PATH_TO_GRIDS, grid_type, VERSION, metallicity, 'interpolation_objects') # the interpolator directory will be created if # not existing already if not os.path.isdir(interpolator_dir): newly_created_files.append( interpolator_dir) elif step_name == 'step_9': # EXPORT_DATASET if "RLO" in grid_type: export_dir = grid_type else: export_dir = grid_type+RLO # This step needs the grids/interpolators to export and # the path where to export them to. grids.append(os.path.join(PATH_TO_GRIDS, grid_type, VERSION, metallicity, compression, grid_slice+'.h5')) full_export_dir = os.path.join(PATH, 'POSYDON_data', export_dir) if not os.path.isdir(full_export_dir): os.makedirs(full_export_dir) export_path.append(os.path.join(full_export_dir, metallicity+'.h5')) for method in [['linear','linear3c_kNN'], ['1NN','1NN_1NN']]: grids.append(os.path.join(PATH_TO_GRIDS, grid_type, VERSION, metallicity, 'interpolation_objects', 'IF_'+method[0]+RLO+'.pkl')) full_export_dir = os.path.join(PATH, 'POSYDON_data', export_dir, 'interpolators', method[1]) if not os.path.isdir(full_export_dir): os.makedirs(full_export_dir) export_path.append(os.path.join(full_export_dir, metallicity+'.pkl')) elif step_name == 'rerun': # RERUN # This step needs the grid to get the runs from, the # path for the new runs, the grid type, the # metallicity, the rerun type and the cluster name. grids.append(os.path.join(PATH_TO_GRIDS, grid_type, VERSION, metallicity, compression, grid_slice+'.h5')) rerun_path.append(os.path.join(PATH_TO_GRIDS, grid_type, VERSION, metallicity, 'rerun_'+RERUN_TYPE+'_'+grid_slice)) grids_type.append(grid_type) rerun_metallicities.append(metallicity) rerun_type.append(RERUN_TYPE) clusters.append(CLUSTER) if (('plot' in step_name) or ('check' in step_name)): # get output grid names for sub steps if 'step_2' in step_name: grid_s = GRIDS_COMBINED[l][i] elif 'step_3' in step_name: grid_s = grid_slice+'_processed' elif (('step_4' in step_name) or ('step_5' in step_name)): grid_s = CONTROL_GRIDS[l][i] else: grid_s = grid_slice if grid_s == '': # skip that slice for sub steps continue if (('step_4' not in step_name) and ('step_5' not in step_name)): # all steps without interpolation method get an # empty method if len(INTERPOLATION_METHODS)==0: INTERPOLATION_METHODS = [''] if 'plot' in step_name: # PLOTS # only plot first compression type, but allow for # with and without RLO flag if ((compression not in COMPRESSIONS_[0]) and (COMPRESSIONS_[0] not in compression)): continue # loop over interpolation methods and plots for method in INTERPOLATION_METHODS: # for to_plot in CREATE_PLOTS: # plot_quantities.append(to_plot) if len(CREATE_PLOTS)>0: # Plots need the to know what to plot, the # grid to get the data from, its grid type, # the directory to put the plots to, the # extension for the plot files, and the # name of an interpolator, when using it. plot_quantities.append(CREATE_PLOTS) grids.append(os.path.join(PATH_TO_GRIDS, grid_type, VERSION, metallicity, compression, grid_s+'.h5')) grids_type.append(grid_type) plot_dirs.append(os.path.join( PATH_TO_GRIDS, grid_type, VERSION, metallicity, 'plots', RLO[1:]+RLO[:1]+grid_s)) plot_extensions.append(PLOT_EXTENSION) if method != '': # if there is a method if 'step_4' in step_name: interpolators.append(os.path.join( PATH_TO_GRIDS, grid_type, VERSION, metallicity, 'interpolation_objects', 'IF_'+method+RLO+'.pkl')) elif 'step_5' in step_name: interpolators.append(os.path.join( PATH_TO_GRIDS, grid_type, VERSION, metallicity, 'interpolation_objects', 'profile_'+method+RLO+'.pkl')) if i==0: plots_dir = os.path.join(PATH_TO_GRIDS, grid_type, VERSION, metallicity, 'plots') # the plots directory will be created if not # existing already if not os.path.isdir(plots_dir): newly_created_files.append(plots_dir) elif 'check' in step_name: # CHECKS # loop over interpolation methods and checks for method in INTERPOLATION_METHODS: # for to_check in DO_CHECKS: # checks.append(to_check) if len(DO_CHECKS)>0: # Checks need the to know what kind of # check to do, the grid to get the data # from, and the name of an interpolator, # when using it. checks.append(DO_CHECKS) grids.append(os.path.join(PATH_TO_GRIDS, grid_type, VERSION, metallicity, compression, grid_s+'.h5')) if method != '': # if there is a method if 'step_4' not in step_name: interpolators.append(os.path.join( PATH_TO_GRIDS, grid_type, VERSION, metallicity, 'interpolation_objects', 'IF_'+method+RLO+'.pkl')) elif 'step_5' not in step_name: interpolators.append(os.path.join( PATH_TO_GRIDS, grid_type, VERSION, metallicity, 'interpolation_objects', 'profile_'+method+RLO+'.pkl')) # saving dataset to csv file (step 2 is special here, because the data # frame was already created.) if step_name != 'step_2': # create a data frame to export it to a csv file (each step requires # other data columns) grids = np.array(grids) df['path_to_grid'] = grids if step_name == 'step_1': df['compression'] = grids_compression df['grid_type'] = grids_type df['stop_before_carbon_depletion'] = stop_before_carbon_depletion elif step_name == 'step_3': df['grid_type'] = grids_type df['path_to_grid_ORIGINAL'] = grids_ORIGINAL df['path_to_processed_grid'] = processed_grids elif step_name == 'step_4': df['grid_type'] = grids_type df['interpolation_method'] = interpolation_methods df['path_to_interpolator'] = interpolators elif step_name == 'step_5': df['grid_type'] = grids_type df['profile_names'] = profile_quantities df['path_to_IF_interpolator'] = interpolators df['path_to_profile_interpolator'] = profile_interpolators elif step_name == 'step_9': df['export_path'] = export_path elif step_name == 'rerun': df['rerun_path'] = rerun_path df['grid_type'] = grids_type df['rerun_metallicity'] = rerun_metallicities df['rerun_type'] = rerun_type df['cluster'] = clusters elif 'plot' in step_name: df['grid_type'] = grids_type df['quantities_to_plot'] = plot_quantities df['path_to_plot'] = plot_dirs df['plot_extension'] = plot_extensions if len(interpolators)>0: df['path_to_interpolator'] = interpolators elif 'check' in step_name: df['checks_to_do'] = checks if len(interpolators)>0: df['path_to_interpolator'] = interpolators # drop lines when grid directories/files are not found if DROP_MISSING_FILES: drop_rows = [] for row in df.index: # grid file is always needed path = df.at[row,'path_to_grid'] if (not os.path.exists(path) and path not in previously_created_files): drop_rows.append(row) elif step_name == 'step_3': # grid file with ORIGINAL compression needed path = df.at[row,'path_to_grid_ORIGINAL'] if (not os.path.exists(path) and path not in previously_created_files): drop_rows.append(row) elif (('step_4_' in step_name) or ('step_5_' in step_name)): # interpolator files needed path = df.at[row,'path_to_interpolator'] if (not os.path.exists(path) and path not in previously_created_files): drop_rows.append(row) elif 'step_5' in step_name: # interpolator files needed path = df.at[row,'path_to_IF_interpolator'] if (not os.path.exists(path) and path not in previously_created_files): drop_rows.append(row) # report about missing files and remove tasks, which need them if len(drop_rows)>0: print("\n{:-^54s}".format(step_name)) print('The following grids will not be processed because the ' 'files/directories are missing! If this warning message ' 'is unexpected to you, please check the file paths!') for row in drop_rows: print(df.at[row,'path_to_grid']) if step_name == 'step_3': print(df.at[row,'path_to_grid_ORIGINAL']) elif (('step_4_' in step_name) or ('step_5_' in step_name)): print(df.at[row,'path_to_interpolator']) elif 'step_5' in step_name: print(df.at[row,'path_to_IF_interpolator']) print('') df = df.drop(index=drop_rows) # keep track of files created by a step if step_name == 'step_1': for row in df.index: path = df.at[row,'path_to_grid'] compression = df.at[row,'compression'] newly_created_files.append(get_new_grid_name(path,compression)) elif step_name == 'step_3': newly_created_files.extend(df['path_to_processed_grid'].to_list()) elif step_name == 'step_4': newly_created_files.extend(df['path_to_interpolator'].to_list()) elif step_name == 'step_5': newly_created_files.extend( df['path_to_profile_interpolator'].to_list()) elif step_name == 'step_9': newly_created_files.extend(df['export_path'].to_list()) elif step_name == 'rerun': newly_created_files.extend(df['rerun_path'].to_list()) elif 'plot' in step_name: newly_created_files.extend(plot_dirs) else: # step 2 # drop columns when psygrid files are not found if DROP_MISSING_FILES: first_time = True drop_columns = [] for column in df.keys(): drop_rows = [] for row in df.index: path = df.at[row,column] if pd.notna(path): if (not os.path.exists(path) and path not in previously_created_files): drop_rows.append(row) # report about missing files and adjust column if len(drop_rows)>0: if first_time: print("\n{:-^54s}".format(step_name)) print('If this warning message is unexpected to you, ' 'please check that step 1 occoured ' 'succesffully!') first_time = False print(f'In {column} the following grids are skipped!') for row in drop_rows: print(df.at[row,column]) df.at[row,column] = np.nan print('') newcol = df[column].dropna().to_list() if len(newcol)==0: drop_columns.append(column) newcol.extend((df.shape[0]-len(newcol))*[np.nan]) df[column] = newcol # report about empty tasks and remove them if len(drop_columns)>0: if first_time: print("\n{:-^54s}".format(step_name)) print('If this warning message is unexpected to you, ' 'please check that step 1 occoured ' 'succesffully!') first_time = False print('The following grids will not be combined as all grid ' 'slices are missing!') for column in drop_columns: print(column) print('') df = df.dropna(axis=0, how='all').dropna(axis=1, how='all') newly_created_files.extend(df.keys().to_list()) # finally save data frame output_fname = f'{step_name}.csv' df.to_csv(os.path.join(PATH, output_fname), index=False) # return list of new files created by that step return newly_created_files
[docs] def create_cleanup_slurm_job(job_name, step_name, PATH='.', ACCOUNT=None, PARTITION=None, WALLTIME=None, MAILTYPE=None, EMAIL=None, GROUP=None, VERBOSE=False, **kwargs): """Create slurm file for cleanup job. Parameters ---------- job_name : str String representation of the job. It will be used in the file name. step_name : str String representation of the parent step. It will be used for the logs. PATH : str Path of the working directory. ACCOUNT : str The slurm account name. Passed to slurm's option '--account'. PARTITION : str The slurm partition name. Passed to slurm's option '--partition'. WALLTIME : str Maximum time for the slurm job. Passed to slurm's option '--time'. MAILTYPE : str Specifier, which emails to send by slurm. Passed to slurm's option '--mail-type'. It requires EMAIL to be set, too. EMAIL : str Email address to send mails to. Passed to slurm's option '--mail-user'. It requires MAILTYPE to be set, too. GROUP : str Group name to get ownership and permissions on files created by the pipeline. VERBOSE : bool Enables/Disables additional output. kwargs : dict Other parameters (all being ignored). """ if ((job_name == 'step_2') or (job_name == 'step_9') or (job_name == 'rerun') or ('plot' in job_name) or ('check' in job_name) or ((job_name == 'pipeline') and (GROUP is not None))): if not os.path.isdir(PATH): raise NotADirectoryError(f"PATH={PATH} not found.") if os.path.exists(f'{job_name}_cleanup.slurm'): Pwarn(f'Replace {job_name}_cleanup.slurm', "OverwriteWarning") with open(f'{job_name}_cleanup.slurm', 'w') as f: # write slurm file content f.write("#!/bin/bash\n") if ACCOUNT is not None: f.write(f"#SBATCH --account={ACCOUNT}\n") if PARTITION is not None: f.write(f"#SBATCH --partition={PARTITION}\n") f.write("#SBATCH -N 1\n") f.write("#SBATCH --cpus-per-task 1\n") f.write("#SBATCH --ntasks-per-node 1\n") if WALLTIME is not None: f.write(f"#SBATCH --time={WALLTIME}\n") f.write(f"#SBATCH --job-name=cleanup\n") f.write("#SBATCH --mem-per-cpu=4G\n") if (EMAIL is not None): f.write(f"#SBATCH --mail-type=FAIL\n") f.write(f"#SBATCH --mail-user={EMAIL}\n") path_to_logs = os.path.join(PATH, 'logs', 'cleanup') log_file_name = get_log_file_name(job_name="cleanup",\ step_name=job_name) f.write("#SBATCH --open-mode=truncate\n") f.write(f"#SBATCH --output={path_to_logs}/{log_file_name}\n") # get logs path and set the output of the slurm job if step_name in job_name: path_to_log_files = os.path.join(PATH, 'logs', step_name) else: path_to_log_files = os.path.join(PATH, 'logs', job_name) if ((job_name == 'step_2') or (job_name == 'step_9') or (job_name == 'rerun') or ('plot' in job_name) or ('check' in job_name)): # combine logs old_log_file_names = get_log_file_name(job_name=job_name,\ step_name=step_name) new_log_file_name = old_log_file_names.replace("_%4a", "s") old_log_file_names = old_log_file_names.replace("_%4a", "_*") f.write(f"\ncat {path_to_log_files}/{old_log_file_names} >" f" {path_to_log_files}/{new_log_file_name}") f.write(f"\nrm {path_to_log_files}/{old_log_file_names}") # the overall cleanup of the pipeline if (job_name == 'pipeline'): if GROUP is not None: f.write(f"\necho \"Change group to {GROUP} and group " "permission to rwX at least for all files stated " f"in {PATH}/pipeline_files.csv\"") f.write(f"\nfor FILE in $(cat {PATH}/pipeline_files.csv); do") f.write(f"\n chgrp -fR {GROUP} $""{FILE};") f.write("\n chmod -fR g+rwX ${FILE};") f.write("\ndone") f.write(f"\necho \"Acting on {PATH} and all subdirectories" "/files in there\"") f.write(f"\necho \"Change group to {GROUP}\"") f.write(f"\nchgrp -fR {GROUP} {PATH}") f.write("\necho \"Change group permission to rwX at least\"") f.write(f"\nchmod -fR g+rwX {PATH}") f.write(f"\necho \"Done, check {PATH}:\"") f.write(f"\nls -al {PATH}") else: return False return True
[docs] def get_log_file_name(job_name, step_name): """Get name of log file Parameters ---------- job_name : str Name of the job. step_name : str Name of the step/parent job the job belongs to. Returns ------- str Name of the log file. """ if job_name == 'step_1': # CREATE_GRID_SLICES return "grid_slice_%4a.out" if job_name == 'step_2': # COMBINE_GRID_SLICES return "combine_grid_slice_%4a.out" if job_name == 'step_3': # CALCULATE_EXTRA_VALUES return "post_processing_%4a.out" if job_name == 'step_4': # TRAIN_INTERPOLATORS return "train_interpolator_%4a.out" if job_name == 'step_5': # TRAIN_PROFILE_INTERPOLATORS return "train_profile_interpolator_%4a.out" if job_name == 'step_9': # EXPORT_DATASET return "export_dataset_%4a.out" if job_name == 'rerun': # RERUN return "rerun_%4a.out" if 'plot' in job_name: # PLOTS return "plots_"+step_name+"_slice_%4a.out" if 'check' in job_name: # CHECKS return "checks_"+step_name+"_slice_%4a.out" if 'cleanup' in job_name: # CLEANUPS return "cleanup_"+step_name+".out" return "log.out"
if __name__ == '__main__': if len(sys.argv) >= 2: if str(sys.argv[1])=='--help': print('Programm to setup the posydon pipeline') print('syntax: posydon-setup-pipeline INI_FILE_PATH') print('INI_FILE_PATH: path to the ini file containing the data') sys.exit(0) ini_file_path = str(sys.argv[1]) if '.' not in ini_file_path: ini_file_path = './' + ini_file_path else: raise ValueError('No ini file specified.') pipeline = PostProcessingPipeline(ini_file_path) pipeline.create_csv_and_slurm_job_files() pipeline.create_log_dirs()