#!/usr/bin/env python
__authors__ = [
"Simone Bavera <Simone.Bavera@unige.ch>",
"Kyle Akira Rocha <kylerocha2024@u.northwestern.edu>",
"Matthias Kruckow <Matthias.Kruckow@unige.ch>",
]
##############################################################################
# IMPORT ALL NECESSARY PYTHON PACKAGES
##############################################################################
import os
import sys
import ast
import shutil
import pandas as pd
import numpy as np
from pprint import pformat
from posydon.popsyn.io import parse_inifile
from posydon.utils.common_functions import PATH_TO_POSYDON
from posydon.utils.gridutils import get_new_grid_name
from posydon.utils.posydonwarning import Pwarn
# this data processing pipeline was designed assuming POSYDON v2 data structure
'''
Data tree structure
PATH_TO_GRIDS/
/HMS-HMS/
/CO-HMS_RLO/
/CO-HeMS/
/single_HMS/
/single_HeMS/
/v1/
/v2/
/1e+00_Zsun/
/1e-01_Zsun/
/1e-02_Zsun/
...
/grid_low_res_0/
/grid_low_res_1/
/grid_rerun_1/
...
/LITE/
/ORIGINAL/
/plots/
/grid_low_res_combined/
/TF1/
/TF2/
...
'''
# pipeline steps
ACTION_TO_STEP_NUM = {
'CREATE_GRID_SLICES' : 'step_1',
'COMBINE_GRID_SLICES' : 'step_2',
'CALCULATE_EXTRA_VALUES' : 'step_3',
'TRAIN_INTERPOLATORS' : 'step_4',
'TRAIN_PROFILE_INTERPOLATORS': 'step_5',
'EXPORT_DATASET' : 'step_9',
'RERUN' : 'rerun'
}
# pipeline substeps
SUB_STEPS_TO_EXTENSION = {
'CREATE_PLOTS' : '_plots',
'DO_CHECKS' : '_checks'
}
# predefined plotting sets
PLOTTING_SETS = {
'PLOT_AFTER_CREATE' : [],
'PLOT_AFTER_COMBINE' : ['combined_TF12', 'termination_flag_1',\
'termination_flag_2', 'termination_flag_3',\
'termination_flag_4', 'rl_relative_overflow_1',\
'rl_relative_overflow_2', 'lg_mtransfer_rate'],
'PLOT_AFTER_EXTRA' : ['S1_MODEL01_CO_type', 'S1_MODEL01_SN_type',\
'S1_MODEL01_mass', 'S1_MODEL01_spin',\
'S1_MODEL01_m_disk_radiated',
'S1_MODEL02_CO_type', 'S1_MODEL02_SN_type',\
'S1_MODEL02_mass', 'S1_MODEL02_spin',\
'S1_MODEL02_m_disk_radiated',
'S1_MODEL03_CO_type', 'S1_MODEL03_SN_type',\
'S1_MODEL03_mass', 'S1_MODEL03_spin',\
'S1_MODEL03_m_disk_radiated',
'S1_MODEL04_CO_type', 'S1_MODEL04_SN_type',\
'S1_MODEL04_mass', 'S1_MODEL04_spin',\
'S1_MODEL04_m_disk_radiated',
'S1_MODEL05_CO_type', 'S1_MODEL05_SN_type',\
'S1_MODEL05_mass', 'S1_MODEL05_spin',\
'S1_MODEL05_m_disk_radiated',
'S1_MODEL06_CO_type', 'S1_MODEL06_SN_type',\
'S1_MODEL06_mass', 'S1_MODEL06_spin',\
'S1_MODEL06_m_disk_radiated',
'S1_MODEL07_CO_type', 'S1_MODEL07_SN_type',\
'S1_MODEL07_mass', 'S1_MODEL07_spin',\
'S1_MODEL07_m_disk_radiated',
'S1_MODEL08_CO_type', 'S1_MODEL08_SN_type',\
'S1_MODEL08_mass', 'S1_MODEL08_spin',\
'S1_MODEL08_m_disk_radiated',
'S1_MODEL09_CO_type', 'S1_MODEL09_SN_type',\
'S1_MODEL09_mass', 'S1_MODEL09_spin',\
'S1_MODEL09_m_disk_radiated',
'S1_MODEL10_CO_type', 'S1_MODEL10_SN_type',\
'S1_MODEL10_mass', 'S1_MODEL10_spin',\
'S1_MODEL10_m_disk_radiated'],
'PLOT_AFTER_TRAINING' : ['INTERP_ERROR_age', 'INTERP_ERROR_star_1_mass',\
'INTERP_ERROR_star_2_mass',\
'INTERP_ERROR_period_days',\
'INTERP_ERROR_S1_co_core_mass',\
'INTERP_ERROR_S1_co_core_radius',\
'INTERP_ERROR_S1_he_core_mass',\
'INTERP_ERROR_S1_he_core_radius',\
'INTERP_ERROR_S1_center_h1',\
'INTERP_ERROR_S1_center_he4',\
'INTERP_ERROR_S1_surface_h1',\
'INTERP_ERROR_S1_surface_he4',\
'INTERP_ERROR_S1_surf_avg_omega_div_omega_crit',\
'INTERP_ERROR_S1_log_Teff',\
'INTERP_ERROR_S1_log_L', 'INTERP_ERROR_S1_log_R',\
'INTERP_ERROR_S1_spin_parameter',\
'INTERP_ERROR_S1_lambda_CE_10cent',\
'INTERP_ERROR_S2_co_core_mass',\
'INTERP_ERROR_S2_co_core_radius',\
'INTERP_ERROR_S2_he_core_mass',\
'INTERP_ERROR_S2_he_core_radius',\
'INTERP_ERROR_S2_center_h1',\
'INTERP_ERROR_S2_center_he4',\
'INTERP_ERROR_S2_surface_h1',\
'INTERP_ERROR_S2_surface_he4',\
'INTERP_ERROR_S2_surf_avg_omega_div_omega_crit',\
'INTERP_ERROR_S2_log_Teff',\
'INTERP_ERROR_S2_log_L', 'INTERP_ERROR_S2_log_R',\
'INTERP_ERROR_S2_spin_parameter',\
'INTERP_ERROR_S2_lambda_CE_10cent',\
'INTERP_ERROR_S1_MODEL01_mass',\
'INTERP_ERROR_S1_MODEL01_spin',\
'INTERP_ERROR_S1_MODEL01_m_disk_radiated',\
'INTERP_ERROR_S1_MODEL05_mass',\
'INTERP_ERROR_S1_MODEL05_spin',\
'INTERP_ERROR_S1_MODEL05_m_disk_radiated',\
'INTERP_ERROR_S1_MODEL06_mass',\
'INTERP_ERROR_S1_MODEL06_spin',\
'INTERP_ERROR_S1_MODEL06_m_disk_radiated',\
'INTERP_ERROR_S1_MODEL10_mass',\
'INTERP_ERROR_S1_MODEL10_spin',\
'INTERP_ERROR_S1_MODEL10_m_disk_radiated'],
'PLOT_AFTER_PROFILE_TRAINING' : []
}
# predefined checking sets
CHECKING_SETS = {
'CHECK_AFTER_CREATE' : [],
'CHECK_AFTER_COMBINE' : ['failure_rate'],
'CHECK_AFTER_EXTRA' : ['CO_type', 'SN_type'],
'CHECK_AFTER_TRAINING' : [],
'CHECK_AFTER_PROFILE_TRAINING' : []
}
# common attributes for all steps
ATTRIBUTES_TO_CHECK = ['GRID_TYPES', 'METALLICITIES', 'GRID_SLICES',\
'COMPRESSIONS']
[docs]
class PostProcessingPipeline:
"""A class to handle the post-processing pipeline."""
def __init__(self, path_to_inifile=None):
"""Initialize a pipeline
Parameters
----------
path_to_inifile : path
The location of an ini file to read the parameters for the pipeline
(default: None)
"""
# set base variables
self.PATH_TO_POSYDON = PATH_TO_POSYDON
self.PATH_TO_INIFILE = path_to_inifile
# if there is an ini file extract the parameters from it
if self.PATH_TO_INIFILE is not None:
if os.path.isfile(self.PATH_TO_INIFILE):
self.pipeline_kwargs = self.parse_setup_params(
self.PATH_TO_INIFILE)
else:
raise FileNotFoundError("Can't find ini-file: "
f"{self.PATH_TO_INIFILE}")
[docs]
@staticmethod
def parse_setup_params(path=None):
"""Parse inifile for running post-processing pipelines.
Parameters
----------
path : path
The location of an ini file to read the parameters for the pipeline
(default: None)
Returns
-------
dict
A dictionary with all the parameters in the ini file. It accounts
for one sectioning level.
"""
if path is None:
return
else:
parser = parse_inifile(path)
pipeline_kwargs = dict()
for section in parser.sections():
# create a subdirectory for each section
section_dict = dict()
for key, val in parser[section].items():
# interprete all input in python types
# MK:this is probably not a save way to do this
section_dict[key] = ast.literal_eval(val)
pipeline_kwargs[section] = section_dict
return pipeline_kwargs
[docs]
def create_csv_and_slurm_job_files(self):
"""Creates all files the pipeline needs."""
def expand_runfile(runfile, step_number='step_X', last_step=''):
"""Add next entry to the script in runfile.
Parameters
----------
runfile : file object (text file)
IO object of an opened file to write to
step_number : str
String representation of the current step
last_step : str
String representation of the last step
Returns
-------
str
On success it returns step_number
"""
if not runfile.writable():
raise PermissionError(f"Can't write to {runfile.name}")
if not os.path.isfile(f"{step_number}.slurm"):
raise FileNotFoundError(f"Can't find file {step_number}.slurm")
if last_step == '':
# if there was no previous step the current step has no
# dependency
runfile.write(f"ID{step_number}=$(sbatch --parsable "
f"{step_number}.slurm)\n")
else:
# steps waiting for the previous step
runfile.write(f"ID{step_number}=$(sbatch --parsable "
"--dependency=afterok:$""{"f"ID{last_step}""} "
"--kill-on-invalid-dep=yes "
f"{step_number}.slurm)\n")
runfile.write(f"echo '{step_number}.slurm submitted as '$"
"{"f"ID{step_number}""}"f"\n")
return step_number
def get_step_kwargs(self, step_number='step_X', last_step=''):
"""Provides the attributes of a step. Additionally, it replaces
defined sets and it checks for missing attributes and may copies
them from a previous step.
Parameters
----------
step_number : str
String representation of the current step
last_step : str
String representation of the last step
Returns
-------
dict
A dictionary containing all the parameters for the current step
"""
if step_number not in self.pipeline_kwargs.keys():
raise KeyError(f'{step_number} not in pipeline_kwargs.')
elif (('step' in step_number) or (step_number=='rerun')):
# check for attributes
for attribute in ATTRIBUTES_TO_CHECK:
if (attribute not in self.pipeline_kwargs[step_number].\
keys() or not isinstance(self.pipeline_kwargs\
[step_number][attribute], list)):
if (last_step in self.pipeline_kwargs.keys()\
and attribute in\
self.pipeline_kwargs[last_step].keys()):
# infer missing attribute from last step
Pwarn(f"Copy {attribute} from {last_step} to "
f"{step_number}", "ReplaceValueWarning")
self.pipeline_kwargs[step_number][attribute] = \
self.pipeline_kwargs[last_step][attribute]
else:
raise KeyError(f'{step_number} has no {attribute}')
# replace pre defined sets
if 'CREATE_PLOTS' in self.pipeline_kwargs[step_number].keys():
for set_name, set_content in PLOTTING_SETS.items():
if set_name in self.pipeline_kwargs[step_number]\
['CREATE_PLOTS']:
self.pipeline_kwargs[step_number]['CREATE_PLOTS'].\
remove(set_name)
self.pipeline_kwargs[step_number]['CREATE_PLOTS'].\
extend(set_content)
# remove all plot entries in case the global plotting flag
# is off
if (('pipeline setup' in self.pipeline_kwargs.keys()) and\
('MAKE_PLOTS' in\
self.pipeline_kwargs['pipeline setup'].keys())
and not (self.pipeline_kwargs['pipeline setup']\
['MAKE_PLOTS'])):
self.pipeline_kwargs[step_number]['CREATE_PLOTS'] = []
if 'DO_CHECKS' in self.pipeline_kwargs[step_number].keys():
for set_name, set_content in CHECKING_SETS.items():
if set_name in self.pipeline_kwargs[step_number]\
['DO_CHECKS']:
self.pipeline_kwargs[step_number]['DO_CHECKS'].\
remove(set_name)
self.pipeline_kwargs[step_number]['DO_CHECKS'].\
extend(set_content)
# remove all check entries in case the global checking flag
# is off
if (('pipeline setup' in self.pipeline_kwargs.keys()) and\
('MAKE_CHECKS' in\
self.pipeline_kwargs['pipeline setup'].keys())
and not (self.pipeline_kwargs['pipeline setup']\
['MAKE_CHECKS'])):
self.pipeline_kwargs[step_number]['DO_CHECKS'] = []
# return attributes
return self.pipeline_kwargs[step_number]
def has_substep(self, step_number='step_X', substep='ALL'):
"""Checks whether a step has substeps.
Parameters
----------
step_number : str
String representation of the current step
substep : str
String representation of the substep
Returns
-------
bool
Only True if a valid substep is found
"""
if step_number not in self.pipeline_kwargs.keys():
raise KeyError(f'{step_number} not in pipeline_kwargs!')
if substep=='ALL': # check all possible substeps
substeps = SUB_STEPS_TO_EXTENSION.keys()
else: # check the specified substep
substeps = [substep]
for sub_step in substeps:
if sub_step in self.pipeline_kwargs[step_number].keys():
SUB_STEP_LIST = self.pipeline_kwargs[step_number][sub_step]
if (isinstance(SUB_STEP_LIST, list) and
len(SUB_STEP_LIST)>0):
# at least one entry is found in the substep
return True
# no entry for the substep(s)
return False
# check for sections 'account' and 'pipeline setup' and store their
# attributes
if 'account' not in self.pipeline_kwargs.keys():
raise KeyError('account not in pipeline_kwargs!')
if 'pipeline setup' not in self.pipeline_kwargs.keys():
raise KeyError('pipeline setup not in pipeline_kwargs!')
account_kwargs = self.pipeline_kwargs['account']
setup_kwargs = self.pipeline_kwargs['pipeline setup']
# take verbose value from setup arguments
if 'VERBOSE' in setup_kwargs.keys():
VERBOSE = setup_kwargs['VERBOSE']
else:
Pwarn("VERBOSE not given, use False", "ReplaceValueWarning")
VERBOSE = False
if VERBOSE:
print( "\n{:+^45s} \n{}\n\n{:+^45s}\n{}".format(
'ACCOUNT', pformat(account_kwargs, indent=2),
'SETUP', pformat(setup_kwargs, indent=2) )
)
previously_created_files = []
plot_dirs = []
pipeline_steps = []
if os.path.exists('run_pipeline.sh'):
Pwarn('Replace run_pipeline.sh', "OverwriteWarning")
# create a runfile script
with open('run_pipeline.sh', 'w') as runfile:
if runfile.writable():
runfile.write("#!/bin/bash\n")
else:
raise PermissionError(f"Can't write to {runfile.name}")
# loop through all steps
last_step = ''
for action, step_number in ACTION_TO_STEP_NUM.items():
if action in setup_kwargs.keys():
do_step_bool = setup_kwargs[action]
else:
do_step_bool = False
if VERBOSE:
print( "\n\n{:-^45s} {:^8s}:{:^6s}".format(
action, step_number, str(do_step_bool)) )
if do_step_bool or has_substep(self, step_number=step_number):
step_kwargs = get_step_kwargs(self,
step_number=step_number,
last_step=last_step)
if VERBOSE:
print( pformat(step_kwargs, indent=2) )
if do_step_bool:
# create csv file and remember therein created files
previously_created_files += create_csv(
step_name=step_number,
previously_created_files=previously_created_files,
**{**step_kwargs, **setup_kwargs}
)
# create slurm file
slurm_job(
job_name=step_number,
step_name=step_number,
PATH_TO_POSYDON=self.PATH_TO_POSYDON,
**{**step_kwargs, **setup_kwargs, **account_kwargs}
)
pipeline_steps.append(step_number)
# add job to the list in the runfile script
expand_runfile(runfile, step_number=step_number,
last_step=last_step)
# remember that this step was done last
last_step = step_number
# add cleanup
if create_cleanup_slurm_job(job_name=step_number,\
step_name=step_number,\
**{**step_kwargs, **setup_kwargs, **account_kwargs}):
pipeline_steps.append(step_number+"_cleanup")
expand_runfile(runfile,
step_number=step_number+"_cleanup",
last_step=last_step)
# try to load the profile interpolation module, because it
# will be needed when running step 5
if step_number == "step_5":
from posydon.interpolation.profile_interpolation\
import CompileData, ProfileInterpolator
for sub_step, step_extension in SUB_STEPS_TO_EXTENSION.items():
if sub_step not in\
self.pipeline_kwargs[step_number].keys():
continue
if has_substep(self, step_number=step_number,
substep=sub_step):
# get sub-step number to be a concatenation of the main
# step and the sub-step extension.
sub_step_number = step_number+step_extension
# create csv file for sub step
plot_dirs += create_csv(
step_name=sub_step_number,
previously_created_files=previously_created_files,
**{**step_kwargs, **setup_kwargs}
)
# create slurm file for sub step
slurm_job(
job_name=sub_step_number,
step_name=step_number,
PATH_TO_POSYDON=self.PATH_TO_POSYDON,
**{**step_kwargs, **setup_kwargs, **account_kwargs}
)
pipeline_steps.append(sub_step_number)
# add job to the list in the runfile script
expand_runfile(runfile, step_number=sub_step_number,
last_step=last_step)
# add cleanup
if create_cleanup_slurm_job(job_name=sub_step_number,\
step_name=step_number, **{**step_kwargs,\
**setup_kwargs, **account_kwargs}):
pipeline_steps.append(sub_step_number+"_cleanup")
expand_runfile(runfile,
step_number=sub_step_number+"_cleanup",
last_step=sub_step_number)
# there are no dependencies supported on sub steps
# beside the cleanup
# add cleanup for pipeline
if create_cleanup_slurm_job(job_name='pipeline',\
step_name='pipeline',\
**{**step_kwargs, **setup_kwargs, **account_kwargs}):
# create csv with all created files
df = pd.DataFrame()
if 'PATH' in setup_kwargs.keys():
PATH = setup_kwargs['PATH']
else:
Pwarn("PATH not given, use current directory",
"ReplaceValueWarning")
PATH = '.'
df['pipeline_files'] = previously_created_files +\
list(set(plot_dirs))
df.to_csv(os.path.join(PATH, 'pipeline_files.csv'),
header=False, index=False)
# add cleanup to run script
submission_line = "IDpipeline_cleanup=$(sbatch --parsable "
for i, step in enumerate(pipeline_steps):
if i==0:
submission_line += "--dependency="
else:
submission_line += ","
submission_line += "afterany:$""{"f"ID{step}""}"
submission_line += " --kill-on-invalid-dep=yes "
submission_line += "pipeline_cleanup.slurm)\n"
if runfile.writable():
if not os.path.isfile("pipeline_cleanup.slurm"):
raise FileNotFoundError("Can't find file "
"pipeline_cleanup.slurm")
runfile.write(submission_line)
runfile.write("echo 'pipeline_cleanup.slurm submitted as '"
"${IDpipeline_cleanup}\n")
else:
raise PermissionError(f"Can't write to {runfile.name}")
# make the runfile script executable
os.system("chmod a+x run_pipeline.sh")
[docs]
def create_log_dirs(self):
"""Create directories to store the log files."""
# use work directory given by PATH in the setup attributes
if 'pipeline setup' not in self.pipeline_kwargs.keys():
raise KeyError('pipeline setup not in pipeline_kwargs!')
setup_kwargs = self.pipeline_kwargs['pipeline setup']
if 'PATH' in setup_kwargs.keys():
logs_path = os.path.join(setup_kwargs['PATH'], 'logs')
else:
Pwarn("PATH not given, use current directory",
"ReplaceValueWarning")
logs_path = os.path.join('.', 'logs')
# create logs in working dir to store all slurm outputs
if not os.path.isdir(logs_path):
os.makedirs(logs_path)
# create sub directories for each step
for action, step_number in ACTION_TO_STEP_NUM.items():
if ((action in setup_kwargs.keys()) and setup_kwargs[action]):
step_log_path = os.path.join(logs_path, step_number)
if not os.path.isdir(step_log_path):
os.makedirs(step_log_path)
# create sub directory for cleanup
step_log_path = os.path.join(logs_path, 'cleanup')
if not os.path.isdir(step_log_path):
os.makedirs(step_log_path)
[docs]
def slurm_job(job_name,
step_name,
PATH_TO_GRIDS=None,
PATH_TO_POSYDON=None,
PATH='.',
ACCOUNT=None,
PARTITION=None,
WALLTIME=None,
MAILTYPE=None,
EMAIL=None,
VERBOSE=False,
**kwargs):
"""Create slurm file.
Parameters
----------
job_name : str
String representation of the job. It will be used in the file name.
step_name : str
String representation of the parent step. It will be used for the logs.
PATH_TO_GRIDS : str
Path where to find the root directory of the grids.
PATH_TO_POSYDON : str
Path where to find the root directory of POSYDON.
PATH : str
Path of the working directory.
ACCOUNT : str
The slurm account name. Passed to slurm's option '--account'.
PARTITION : str
The slurm partition name. Passed to slurm's option '--partition'.
WALLTIME : str
Maximum time for the slurm job. Passed to slurm's option '--time'.
MAILTYPE : str
Specifier, which emails to send by slurm. Passed to slurm's option
'--mail-type'. It requires EMAIL to be set, too.
EMAIL : str
Email address to send mails to. Passed to slurm's option '--mail-user'.
It requires MAILTYPE to be set, too.
VERBOSE : bool
Enables/Disables additional output.
kwargs : dict
Other parameters (all being ignored).
"""
if not os.path.isdir(PATH_TO_GRIDS):
raise NotADirectoryError(f"PATH_TO_GRIDS={PATH_TO_GRIDS} not found.")
# get path to csv file
path_to_csv_file = os.path.join(PATH, f'{job_name}.csv')
if not os.path.isfile(path_to_csv_file):
raise FileNotFoundError(f"Missing csv file: {path_to_csv_file}")
if os.path.exists(f'{job_name}.slurm'):
Pwarn(f'Replace {job_name}.slurm', "OverwriteWarning")
# create slurm file
with open(f'{job_name}.slurm', 'w') as f:
# get STEPID
if ('step' in job_name) and (len(job_name.split("_")) > 1):
STEPID = job_name.split("_")[1]
if (len(job_name.split("_")) > 2):
STEPID += job_name.split("_")[2]
elif job_name == 'rerun':
STEPID = 'R'
else:
Pwarn("No step ID detected, leave it empty", "ReplaceValueWarning")
STEPID = ''
# write slurm file content
f.write("#!/bin/bash\n")
if ACCOUNT is not None:
f.write(f"#SBATCH --account={ACCOUNT}\n")
if PARTITION is not None:
f.write(f"#SBATCH --partition={PARTITION}\n")
f.write("#SBATCH -N 1\n")
f.write("#SBATCH --cpus-per-task 1\n")
f.write("#SBATCH --ntasks-per-node 1\n")
if WALLTIME is not None:
f.write(f"#SBATCH --time={WALLTIME}\n")
f.write(f"#SBATCH --job-name=psygrid{STEPID}\n")
f.write("#SBATCH --mem-per-cpu=4G\n")
if ((EMAIL is not None) and (MAILTYPE is not None)):
f.write(f"#SBATCH --mail-type={MAILTYPE}\n")
f.write(f"#SBATCH --mail-user={EMAIL}\n")
# extract array size
if ((job_name in ['step_1', 'step_3', 'step_4', 'step_5', 'step_9',\
'rerun']) or ('plot' in job_name) or ('check' in job_name)):
df = pd.read_csv(path_to_csv_file)
N = df.shape[0]-1
elif job_name in ['step_2']:
df = pd.read_csv(path_to_csv_file)
N = df.shape[1]-1
else:
raise ValueError(f'This should never happen! job_name={job_name}')
if N<0:
raise ValueError(f'{job_name} has no jobs to run, please check '
f'{path_to_csv_file}')
f.write(f"#SBATCH --array=0-{N}\n")
slurm_array = '$SLURM_ARRAY_TASK_ID'
# get logs path and set the output of the slurm job
if step_name in job_name:
path_to_logs = os.path.join(PATH, 'logs', step_name)
else:
path_to_logs = os.path.join(PATH, 'logs', job_name)
log_file_name = get_log_file_name(job_name=job_name,\
step_name=step_name)
f.write("#SBATCH --open-mode=truncate\n")
f.write(f"#SBATCH --output={path_to_logs}/{log_file_name}\n")
# set PATH_TO_POSYDON where needed
if job_name == 'step_3': # CALCULATE_EXTRA_VALUES
f.write(f"export PATH_TO_POSYDON={PATH_TO_POSYDON}\n")
# reset display settings
if 'plot' in job_name: # PLOTS
f.write("unset DISPLAY\n")
# get path to run the pipeline
path_to_run_pipeline = os.path.join(PATH_TO_POSYDON, 'bin',
'posydon-run-pipeline')
if not os.path.isfile(path_to_run_pipeline):
raise FileNotFoundError("Pipeline's runfile not found at "
f"{path_to_run_pipeline}.")
# encode the verbose information for running the pipeline as a command
# line argument
if VERBOSE and 'check' not in job_name:
f.write(f"\nsrun python {path_to_run_pipeline} {PATH_TO_GRIDS} "
f"{path_to_csv_file} {slurm_array} 1")
else:
f.write(f"\nsrun python {path_to_run_pipeline} {PATH_TO_GRIDS} "
f"{path_to_csv_file} {slurm_array} 0")
[docs]
def create_csv(GRID_TYPES=[],
METALLICITIES=[],
GRID_SLICES=[],
COMPRESSIONS=[],
step_name='',
VERSION='',
PLOT_EXTENSION='pdf',
GRIDS_COMBINED=[],
ORIGINAL_COMPRESSIONS=[],
INTERPOLATION_METHODS=[],
PROFILE_NAMES=[],
CONTROL_GRIDS=[],
STOP_BEFORE_CARBON_DEPLETION=0,
RERUN_TYPE='',
CLUSTER='quest',
DROP_MISSING_FILES=False,
CREATE_PLOTS=[],
DO_CHECKS=[],
PATH_TO_GRIDS='',
PATH='.',
previously_created_files=[],
**kwargs):
"""Create csv file with a list containing all data to process a step.
Parameters
----------
GRID_TYPES : list
Grid types in the directory tree in PATH_TO_GRIDS.
METALLICITIES : list
Metallicities in the directory tree below grid types (or version).
GRID_SLICES : list
Grid slices in the directory tree below the metallicities.
COMPRESSIONS: list
Compression types (e.g. LITE or ORIGINAL).
step_name : str
String representation of the step. It will be used in the file name.
VERSION : str
Version in the directory tree level below the grid type. (outdated)
PLOT_EXTENSION : str
The file extension for plots recognized by mathplotlib or
'multipage-pdf'.
GRIDS_COMBINED : list
The names of the new grids combined from several slices.
ORIGINAL_COMPRESSIONS : list
Grids with the orgininal compression to get detailed data from them.
INTERPOLATION_METHODS : list
The interpolation methodes (e.g. linear or 1NN).
PROFILE_NAMES : list
The profiles which should get interpolated (e.g. radius, logRho, ...)
CONTROL_GRIDS : list
Grids used to check the interpolators.
STOP_BEFORE_CARBON_DEPLETION : int
Flag to stop high mass stars before carbon depletion because of chaotic
behavior.
RERUN_TYPE : str
Rerun types (e.g. PISN, reverse_MT, ...).
CLUSTER : str
Name of the cluster (e.g. yggdrasil or quest).
DROP_MISSING_FILES : bool
Enable/Disable to ignore files which neither exists nor are created
beforehand.
CREATE_PLOTS : list
Requested plots to be created (e.g. combined_TF12, termination_flag_1,
...).
DO_CHECKS : list
Requested checks (e.g. failure_rate, CO_type).
PATH_TO_GRIDS : str
Path where to find the root directory of the grids.
PATH : str
Path of the working directory.
previously_created_files : list
List of filenames created by previous steps.
kwargs : dict
Other parameters (all being ignored).
Returns
-------
list
List of names of files which will be created by this step.
"""
if not os.path.isdir(PATH_TO_GRIDS):
raise NotADirectoryError(f"PATH_TO_GRIDS={PATH_TO_GRIDS} not found.")
if not os.path.isdir(PATH):
raise NotADirectoryError(f"PATH={PATH} not found.")
# number of grid types
N = len(GRID_TYPES)
if ( N != len(METALLICITIES) or N != len(GRID_SLICES) or
N != len(COMPRESSIONS)):
raise ValueError('Missmatch between the len of GRID_TYPES, '
'METALLICITIES, GRID_SLICES, COMPRESSIONS.'
f'Lengths: {N}, {len(METALLICITIES)}, '
f'{len(GRID_SLICES)}, {len(COMPRESSIONS)}')
if step_name == 'step_2':
# there need to be as many combined files specified as recipes are
# given to create them.
if N != len(GRIDS_COMBINED):
raise ValueError(f'len(GRID_TYPES) = {len(GRID_TYPES)} != '
f'{len(GRIDS_COMBINED)} = len(GRIDS_COMBINED)!')
elif step_name == 'step_3':
# here we need a reference grid with ORIGINAL compression to
# calculated the extra quantities from.
if N != len(ORIGINAL_COMPRESSIONS):
raise ValueError(f'len(GRID_TYPES) = {len(GRID_TYPES)} != '
f'{len(ORIGINAL_COMPRESSIONS)} = '
'len(ORIGINAL_COMPRESSIONS)!')
elif (('step_4_' in step_name) or ('step_5' in step_name)):
# sub steps of the interpolator training need a control grid
if len(GRID_SLICES) != len(CONTROL_GRIDS):
raise ValueError(f'len(GRID_SLICES) = {len(GRID_SLICES)} != '
f'{len(CONTROL_GRIDS)} = len(CONTROL_GRIDS)!')
grids = []
grids_compression = []
grids_type = []
stop_before_carbon_depletion = []
grids_ORIGINAL = []
processed_grids = []
interpolation_methods = []
interpolators = []
profile_quantities = []
profile_interpolators = []
export_path = []
rerun_metallicities = []
rerun_path = []
rerun_type = []
clusters = []
plot_dirs = []
plot_quantities = []
plot_extensions = []
checks = []
newly_created_files = []
df = pd.DataFrame()
# loop over all grid types
for l, grid_type in enumerate(GRID_TYPES):
METALLICITIES_ = METALLICITIES[l]
GRID_SLICES_ = GRID_SLICES[l]
COMPRESSIONS_ = COMPRESSIONS[l]
if (isinstance(ORIGINAL_COMPRESSIONS, list) and\
len(ORIGINAL_COMPRESSIONS)>0):
ORIGINAL_COMPRESSIONS_ = ORIGINAL_COMPRESSIONS[l]
# loop over all metallicities, grid slices and compressions
for metallicity in METALLICITIES_:
for i, grid_slice in enumerate(GRID_SLICES_):
for compression in COMPRESSIONS_:
# get RLO extension text depending on compression
if 'RLO' in compression:
RLO = '_RLO'
else:
RLO = ''
# check for combined gird in step 2 and its supsteps
if 'step_2' in step_name:
if len(GRID_SLICES[l]) != len(GRIDS_COMBINED[l]):
raise ValueError(f'len(GRID_SLICES[{l}]) = '
f'{len(GRID_SLICES[l])} != '
f'{len(GRIDS_COMBINED[l])} ='
f'len(GRIDS_COMBINED[{l}])!')
# check for combined gird in steps 4 and 5 (incl. supsteps)
if (('step_4_' in step_name) or ('step_5' in step_name)):
if len(GRID_SLICES[l]) != len(CONTROL_GRIDS[l]):
raise ValueError(f'len(GRID_SLICES[{l}]) = '
f'{len(GRID_SLICES[l])} != '
f'{len(CONTROL_GRIDS[l])} ='
f'len(CONTROL_GRIDS[{l}])!')
# create data lists depending on the (sub)step
if step_name == 'step_1': # CREATE_GRID_SLICES
# This step needs the location of the grid slice, the
# compression applied to it, the type of the gird, and
# the information, whether it is OK to stop cut of some
# evolution short before carbon depletion for massive
# stars.
grids.append(os.path.join(PATH_TO_GRIDS, grid_type,
VERSION, metallicity,
grid_slice))
grids_compression.append(compression)
grids_type.append(grid_type)
stop_before_carbon_depletion.append(
STOP_BEFORE_CARBON_DEPLETION)
if i==0:
compression_dir = os.path.join(PATH_TO_GRIDS,
grid_type, VERSION,
metallicity,
compression)
# the compression directory will be created if not
# existing already
if not os.path.isdir(compression_dir):
newly_created_files.append(compression_dir)
elif step_name == 'step_2': # COMBINE_GRID_SLICES
# This step needs the name of the combined gird and the
# information which slices get combined for each
# combined gird. It will be stored in a data frame,
# with the name of the combined grid as column key and
# the column containing the slices to be combined.
combine_grid_slices = []
# grid_slice is a batch
for grid_slice_ in grid_slice:
path_to_grid = os.path.join(PATH_TO_GRIDS,
grid_type, VERSION,
metallicity, compression,
grid_slice_+'.h5')
combine_grid_slices.append(path_to_grid)
path_to_grid_combined = os.path.join(PATH_TO_GRIDS,
grid_type, VERSION,
metallicity, compression,
GRIDS_COMBINED[l][i]+'.h5')
df_tmp = pd.DataFrame()
df_tmp[path_to_grid_combined] = combine_grid_slices
df = pd.concat([df,df_tmp], axis=1)
elif step_name == 'step_3': # CALCULATE_EXTRA_VALUES
if (isinstance(ORIGINAL_COMPRESSIONS_, list) and\
len(ORIGINAL_COMPRESSIONS_)>0):
# use one orginal for all compressions, hence use
# first one and ignore others
original_compression = ORIGINAL_COMPRESSIONS_[0]
else:
# if not defined assume 'ORIGINAL'
original_compression = 'ORIGINAL'
# This step needs the grid to process, the grid type,
# the grid containing the ORIGINAL data, and the name
# of the new grid containing the processed data.
grids.append(os.path.join(PATH_TO_GRIDS,
grid_type, VERSION, metallicity,
compression, grid_slice+'.h5'))
grids_type.append(grid_type)
grids_ORIGINAL.append(os.path.join(PATH_TO_GRIDS,
grid_type, VERSION,
metallicity,
original_compression+RLO,
grid_slice+'.h5'))
processed_grids.append(os.path.join(PATH_TO_GRIDS,
grid_type, VERSION,
metallicity, compression,
grid_slice+'_processed.h5'))
elif step_name == 'step_4': # TRAIN_INTERPOLATORS
# This step needs the grids to use for the training,
# the grid type, the interpolation methods, and the
# names of the files where the interpolator data gets
# written to.
for method in INTERPOLATION_METHODS:
grids.append(os.path.join(PATH_TO_GRIDS,
grid_type, VERSION, metallicity,
compression, grid_slice+'.h5'))
grids_type.append(grid_type)
interpolation_methods.append(method+RLO)
interpolators.append(os.path.join(PATH_TO_GRIDS,
grid_type, VERSION,
metallicity,
'interpolation_objects',
'IF_'+method+RLO+'.pkl'))
if i==0:
interpolator_dir = os.path.join(PATH_TO_GRIDS,
grid_type, VERSION,
metallicity,
'interpolation_objects')
# the interpolator directory will be created if not
# existing already
if not os.path.isdir(interpolator_dir):
newly_created_files.append(interpolator_dir)
elif step_name == 'step_5': # TRAIN_PROFILE_INTERPOLATORS
# This step needs the grids to use for the training,
# the grid type, the names of the files of the IF
# interpolators, the quantities to get profiles
# interpolated for, and the names of the files where
# the interpolator data gets written to.
if (isinstance(PROFILE_NAMES, list) and\
(len(PROFILE_NAMES)>0)):
for method in INTERPOLATION_METHODS:
grids.append(os.path.join(PATH_TO_GRIDS,
grid_type, VERSION, metallicity,
compression, grid_slice+'.h5'))
grids_type.append(grid_type)
interpolators.append(os.path.join(
PATH_TO_GRIDS, grid_type, VERSION,
metallicity, 'interpolation_objects',
'IF_'+method+RLO+'.pkl'))
profile_quantities.append(PROFILE_NAMES)
profile_interpolators.append(os.path.join(
PATH_TO_GRIDS, grid_type, VERSION,
metallicity, 'interpolation_objects',
'profile_'+method+RLO+'.pkl'))
if i==0:
interpolator_dir = os.path.join(PATH_TO_GRIDS,
grid_type, VERSION,
metallicity,
'interpolation_objects')
# the interpolator directory will be created if
# not existing already
if not os.path.isdir(interpolator_dir):
newly_created_files.append(
interpolator_dir)
elif step_name == 'step_9': # EXPORT_DATASET
if "RLO" in grid_type:
export_dir = grid_type
else:
export_dir = grid_type+RLO
# This step needs the grids/interpolators to export and
# the path where to export them to.
grids.append(os.path.join(PATH_TO_GRIDS,
grid_type, VERSION, metallicity,
compression, grid_slice+'.h5'))
full_export_dir = os.path.join(PATH, 'POSYDON_data',
export_dir)
if not os.path.isdir(full_export_dir):
os.makedirs(full_export_dir)
export_path.append(os.path.join(full_export_dir,
metallicity+'.h5'))
for method in [['linear','linear3c_kNN'],
['1NN','1NN_1NN']]:
grids.append(os.path.join(PATH_TO_GRIDS, grid_type,
VERSION, metallicity,
'interpolation_objects',
'IF_'+method[0]+RLO+'.pkl'))
full_export_dir = os.path.join(PATH,
'POSYDON_data', export_dir,
'interpolators', method[1])
if not os.path.isdir(full_export_dir):
os.makedirs(full_export_dir)
export_path.append(os.path.join(full_export_dir,
metallicity+'.pkl'))
elif step_name == 'rerun': # RERUN
# This step needs the grid to get the runs from, the
# path for the new runs, the grid type, the
# metallicity, the rerun type and the cluster name.
grids.append(os.path.join(PATH_TO_GRIDS, grid_type,
VERSION, metallicity, compression,
grid_slice+'.h5'))
rerun_path.append(os.path.join(PATH_TO_GRIDS,
grid_type, VERSION, metallicity,
'rerun_'+RERUN_TYPE+'_'+grid_slice))
grids_type.append(grid_type)
rerun_metallicities.append(metallicity)
rerun_type.append(RERUN_TYPE)
clusters.append(CLUSTER)
if (('plot' in step_name) or ('check' in step_name)):
# get output grid names for sub steps
if 'step_2' in step_name:
grid_s = GRIDS_COMBINED[l][i]
elif 'step_3' in step_name:
grid_s = grid_slice+'_processed'
elif (('step_4' in step_name) or
('step_5' in step_name)):
grid_s = CONTROL_GRIDS[l][i]
else:
grid_s = grid_slice
if grid_s == '': # skip that slice for sub steps
continue
if (('step_4' not in step_name) and
('step_5' not in step_name)):
# all steps without interpolation method get an
# empty method
if len(INTERPOLATION_METHODS)==0:
INTERPOLATION_METHODS = ['']
if 'plot' in step_name: # PLOTS
# only plot first compression type, but allow for
# with and without RLO flag
if ((compression not in COMPRESSIONS_[0]) and
(COMPRESSIONS_[0] not in compression)):
continue
# loop over interpolation methods and plots
for method in INTERPOLATION_METHODS:
# for to_plot in CREATE_PLOTS:
# plot_quantities.append(to_plot)
if len(CREATE_PLOTS)>0:
# Plots need the to know what to plot, the
# grid to get the data from, its grid type,
# the directory to put the plots to, the
# extension for the plot files, and the
# name of an interpolator, when using it.
plot_quantities.append(CREATE_PLOTS)
grids.append(os.path.join(PATH_TO_GRIDS,
grid_type, VERSION,
metallicity, compression,
grid_s+'.h5'))
grids_type.append(grid_type)
plot_dirs.append(os.path.join(
PATH_TO_GRIDS,
grid_type, VERSION,
metallicity, 'plots',
RLO[1:]+RLO[:1]+grid_s))
plot_extensions.append(PLOT_EXTENSION)
if method != '': # if there is a method
if 'step_4' in step_name:
interpolators.append(os.path.join(
PATH_TO_GRIDS, grid_type,
VERSION, metallicity,
'interpolation_objects',
'IF_'+method+RLO+'.pkl'))
elif 'step_5' in step_name:
interpolators.append(os.path.join(
PATH_TO_GRIDS, grid_type,
VERSION, metallicity,
'interpolation_objects',
'profile_'+method+RLO+'.pkl'))
if i==0:
plots_dir = os.path.join(PATH_TO_GRIDS,
grid_type, VERSION,
metallicity, 'plots')
# the plots directory will be created if not
# existing already
if not os.path.isdir(plots_dir):
newly_created_files.append(plots_dir)
elif 'check' in step_name: # CHECKS
# loop over interpolation methods and checks
for method in INTERPOLATION_METHODS:
# for to_check in DO_CHECKS:
# checks.append(to_check)
if len(DO_CHECKS)>0:
# Checks need the to know what kind of
# check to do, the grid to get the data
# from, and the name of an interpolator,
# when using it.
checks.append(DO_CHECKS)
grids.append(os.path.join(PATH_TO_GRIDS,
grid_type, VERSION,
metallicity, compression,
grid_s+'.h5'))
if method != '': # if there is a method
if 'step_4' not in step_name:
interpolators.append(os.path.join(
PATH_TO_GRIDS, grid_type,
VERSION, metallicity,
'interpolation_objects',
'IF_'+method+RLO+'.pkl'))
elif 'step_5' not in step_name:
interpolators.append(os.path.join(
PATH_TO_GRIDS, grid_type,
VERSION, metallicity,
'interpolation_objects',
'profile_'+method+RLO+'.pkl'))
# saving dataset to csv file (step 2 is special here, because the data
# frame was already created.)
if step_name != 'step_2':
# create a data frame to export it to a csv file (each step requires
# other data columns)
grids = np.array(grids)
df['path_to_grid'] = grids
if step_name == 'step_1':
df['compression'] = grids_compression
df['grid_type'] = grids_type
df['stop_before_carbon_depletion'] = stop_before_carbon_depletion
elif step_name == 'step_3':
df['grid_type'] = grids_type
df['path_to_grid_ORIGINAL'] = grids_ORIGINAL
df['path_to_processed_grid'] = processed_grids
elif step_name == 'step_4':
df['grid_type'] = grids_type
df['interpolation_method'] = interpolation_methods
df['path_to_interpolator'] = interpolators
elif step_name == 'step_5':
df['grid_type'] = grids_type
df['profile_names'] = profile_quantities
df['path_to_IF_interpolator'] = interpolators
df['path_to_profile_interpolator'] = profile_interpolators
elif step_name == 'step_9':
df['export_path'] = export_path
elif step_name == 'rerun':
df['rerun_path'] = rerun_path
df['grid_type'] = grids_type
df['rerun_metallicity'] = rerun_metallicities
df['rerun_type'] = rerun_type
df['cluster'] = clusters
elif 'plot' in step_name:
df['grid_type'] = grids_type
df['quantities_to_plot'] = plot_quantities
df['path_to_plot'] = plot_dirs
df['plot_extension'] = plot_extensions
if len(interpolators)>0:
df['path_to_interpolator'] = interpolators
elif 'check' in step_name:
df['checks_to_do'] = checks
if len(interpolators)>0:
df['path_to_interpolator'] = interpolators
# drop lines when grid directories/files are not found
if DROP_MISSING_FILES:
drop_rows = []
for row in df.index:
# grid file is always needed
path = df.at[row,'path_to_grid']
if (not os.path.exists(path) and
path not in previously_created_files):
drop_rows.append(row)
elif step_name == 'step_3':
# grid file with ORIGINAL compression needed
path = df.at[row,'path_to_grid_ORIGINAL']
if (not os.path.exists(path) and
path not in previously_created_files):
drop_rows.append(row)
elif (('step_4_' in step_name) or ('step_5_' in step_name)):
# interpolator files needed
path = df.at[row,'path_to_interpolator']
if (not os.path.exists(path) and
path not in previously_created_files):
drop_rows.append(row)
elif 'step_5' in step_name:
# interpolator files needed
path = df.at[row,'path_to_IF_interpolator']
if (not os.path.exists(path) and
path not in previously_created_files):
drop_rows.append(row)
# report about missing files and remove tasks, which need them
if len(drop_rows)>0:
print("\n{:-^54s}".format(step_name))
print('The following grids will not be processed because the '
'files/directories are missing! If this warning message '
'is unexpected to you, please check the file paths!')
for row in drop_rows:
print(df.at[row,'path_to_grid'])
if step_name == 'step_3':
print(df.at[row,'path_to_grid_ORIGINAL'])
elif (('step_4_' in step_name) or
('step_5_' in step_name)):
print(df.at[row,'path_to_interpolator'])
elif 'step_5' in step_name:
print(df.at[row,'path_to_IF_interpolator'])
print('')
df = df.drop(index=drop_rows)
# keep track of files created by a step
if step_name == 'step_1':
for row in df.index:
path = df.at[row,'path_to_grid']
compression = df.at[row,'compression']
newly_created_files.append(get_new_grid_name(path,compression))
elif step_name == 'step_3':
newly_created_files.extend(df['path_to_processed_grid'].to_list())
elif step_name == 'step_4':
newly_created_files.extend(df['path_to_interpolator'].to_list())
elif step_name == 'step_5':
newly_created_files.extend(
df['path_to_profile_interpolator'].to_list())
elif step_name == 'step_9':
newly_created_files.extend(df['export_path'].to_list())
elif step_name == 'rerun':
newly_created_files.extend(df['rerun_path'].to_list())
elif 'plot' in step_name:
newly_created_files.extend(plot_dirs)
else: # step 2
# drop columns when psygrid files are not found
if DROP_MISSING_FILES:
first_time = True
drop_columns = []
for column in df.keys():
drop_rows = []
for row in df.index:
path = df.at[row,column]
if pd.notna(path):
if (not os.path.exists(path) and
path not in previously_created_files):
drop_rows.append(row)
# report about missing files and adjust column
if len(drop_rows)>0:
if first_time:
print("\n{:-^54s}".format(step_name))
print('If this warning message is unexpected to you, '
'please check that step 1 occoured '
'succesffully!')
first_time = False
print(f'In {column} the following grids are skipped!')
for row in drop_rows:
print(df.at[row,column])
df.at[row,column] = np.nan
print('')
newcol = df[column].dropna().to_list()
if len(newcol)==0:
drop_columns.append(column)
newcol.extend((df.shape[0]-len(newcol))*[np.nan])
df[column] = newcol
# report about empty tasks and remove them
if len(drop_columns)>0:
if first_time:
print("\n{:-^54s}".format(step_name))
print('If this warning message is unexpected to you, '
'please check that step 1 occoured '
'succesffully!')
first_time = False
print('The following grids will not be combined as all grid '
'slices are missing!')
for column in drop_columns:
print(column)
print('')
df = df.dropna(axis=0, how='all').dropna(axis=1, how='all')
newly_created_files.extend(df.keys().to_list())
# finally save data frame
output_fname = f'{step_name}.csv'
df.to_csv(os.path.join(PATH, output_fname), index=False)
# return list of new files created by that step
return newly_created_files
[docs]
def create_cleanup_slurm_job(job_name,
step_name,
PATH='.',
ACCOUNT=None,
PARTITION=None,
WALLTIME=None,
MAILTYPE=None,
EMAIL=None,
GROUP=None,
VERBOSE=False,
**kwargs):
"""Create slurm file for cleanup job.
Parameters
----------
job_name : str
String representation of the job. It will be used in the file name.
step_name : str
String representation of the parent step. It will be used for the logs.
PATH : str
Path of the working directory.
ACCOUNT : str
The slurm account name. Passed to slurm's option '--account'.
PARTITION : str
The slurm partition name. Passed to slurm's option '--partition'.
WALLTIME : str
Maximum time for the slurm job. Passed to slurm's option '--time'.
MAILTYPE : str
Specifier, which emails to send by slurm. Passed to slurm's option
'--mail-type'. It requires EMAIL to be set, too.
EMAIL : str
Email address to send mails to. Passed to slurm's option '--mail-user'.
It requires MAILTYPE to be set, too.
GROUP : str
Group name to get ownership and permissions on files created by the
pipeline.
VERBOSE : bool
Enables/Disables additional output.
kwargs : dict
Other parameters (all being ignored).
"""
if ((job_name == 'step_2') or (job_name == 'step_9') or
(job_name == 'rerun') or ('plot' in job_name) or
('check' in job_name) or
((job_name == 'pipeline') and (GROUP is not None))):
if not os.path.isdir(PATH):
raise NotADirectoryError(f"PATH={PATH} not found.")
if os.path.exists(f'{job_name}_cleanup.slurm'):
Pwarn(f'Replace {job_name}_cleanup.slurm', "OverwriteWarning")
with open(f'{job_name}_cleanup.slurm', 'w') as f:
# write slurm file content
f.write("#!/bin/bash\n")
if ACCOUNT is not None:
f.write(f"#SBATCH --account={ACCOUNT}\n")
if PARTITION is not None:
f.write(f"#SBATCH --partition={PARTITION}\n")
f.write("#SBATCH -N 1\n")
f.write("#SBATCH --cpus-per-task 1\n")
f.write("#SBATCH --ntasks-per-node 1\n")
if WALLTIME is not None:
f.write(f"#SBATCH --time={WALLTIME}\n")
f.write(f"#SBATCH --job-name=cleanup\n")
f.write("#SBATCH --mem-per-cpu=4G\n")
if (EMAIL is not None):
f.write(f"#SBATCH --mail-type=FAIL\n")
f.write(f"#SBATCH --mail-user={EMAIL}\n")
path_to_logs = os.path.join(PATH, 'logs', 'cleanup')
log_file_name = get_log_file_name(job_name="cleanup",\
step_name=job_name)
f.write("#SBATCH --open-mode=truncate\n")
f.write(f"#SBATCH --output={path_to_logs}/{log_file_name}\n")
# get logs path and set the output of the slurm job
if step_name in job_name:
path_to_log_files = os.path.join(PATH, 'logs', step_name)
else:
path_to_log_files = os.path.join(PATH, 'logs', job_name)
if ((job_name == 'step_2') or (job_name == 'step_9') or
(job_name == 'rerun') or ('plot' in job_name) or
('check' in job_name)):
# combine logs
old_log_file_names = get_log_file_name(job_name=job_name,\
step_name=step_name)
new_log_file_name = old_log_file_names.replace("_%4a", "s")
old_log_file_names = old_log_file_names.replace("_%4a", "_*")
f.write(f"\ncat {path_to_log_files}/{old_log_file_names} >"
f" {path_to_log_files}/{new_log_file_name}")
f.write(f"\nrm {path_to_log_files}/{old_log_file_names}")
# the overall cleanup of the pipeline
if (job_name == 'pipeline'):
if GROUP is not None:
f.write(f"\necho \"Change group to {GROUP} and group "
"permission to rwX at least for all files stated "
f"in {PATH}/pipeline_files.csv\"")
f.write(f"\nfor FILE in $(cat {PATH}/pipeline_files.csv); do")
f.write(f"\n chgrp -fR {GROUP} $""{FILE};")
f.write("\n chmod -fR g+rwX ${FILE};")
f.write("\ndone")
f.write(f"\necho \"Acting on {PATH} and all subdirectories"
"/files in there\"")
f.write(f"\necho \"Change group to {GROUP}\"")
f.write(f"\nchgrp -fR {GROUP} {PATH}")
f.write("\necho \"Change group permission to rwX at least\"")
f.write(f"\nchmod -fR g+rwX {PATH}")
f.write(f"\necho \"Done, check {PATH}:\"")
f.write(f"\nls -al {PATH}")
else:
return False
return True
[docs]
def get_log_file_name(job_name, step_name):
"""Get name of log file
Parameters
----------
job_name : str
Name of the job.
step_name : str
Name of the step/parent job the job belongs to.
Returns
-------
str
Name of the log file.
"""
if job_name == 'step_1': # CREATE_GRID_SLICES
return "grid_slice_%4a.out"
if job_name == 'step_2': # COMBINE_GRID_SLICES
return "combine_grid_slice_%4a.out"
if job_name == 'step_3': # CALCULATE_EXTRA_VALUES
return "post_processing_%4a.out"
if job_name == 'step_4': # TRAIN_INTERPOLATORS
return "train_interpolator_%4a.out"
if job_name == 'step_5': # TRAIN_PROFILE_INTERPOLATORS
return "train_profile_interpolator_%4a.out"
if job_name == 'step_9': # EXPORT_DATASET
return "export_dataset_%4a.out"
if job_name == 'rerun': # RERUN
return "rerun_%4a.out"
if 'plot' in job_name: # PLOTS
return "plots_"+step_name+"_slice_%4a.out"
if 'check' in job_name: # CHECKS
return "checks_"+step_name+"_slice_%4a.out"
if 'cleanup' in job_name: # CLEANUPS
return "cleanup_"+step_name+".out"
return "log.out"
if __name__ == '__main__':
if len(sys.argv) >= 2:
if str(sys.argv[1])=='--help':
print('Programm to setup the posydon pipeline')
print('syntax: posydon-setup-pipeline INI_FILE_PATH')
print('INI_FILE_PATH: path to the ini file containing the data')
sys.exit(0)
ini_file_path = str(sys.argv[1])
if '.' not in ini_file_path:
ini_file_path = './' + ini_file_path
else:
raise ValueError('No ini file specified.')
pipeline = PostProcessingPipeline(ini_file_path)
pipeline.create_csv_and_slurm_job_files()
pipeline.create_log_dirs()