Source code for posydon.CLI.popsyn.check

"""Module for checking the status of a population synthesis run.
"""

__authors__ = [
    "Max Briel <max.briel@gmail.com>",
]

import glob
import os
import subprocess

import numpy as np

from posydon.CLI.io import (
    clear_previous_lines,
    create_bash_submit_rescue_script,
    create_batch_rescue_script,
    print_error,
    print_separator_line,
    print_success,
)
from posydon.popsyn.io import binarypop_kwargs_from_ini
from posydon.popsyn.synthetic_population import Population
from posydon.utils.common_functions import convert_metallicity_to_string
from posydon.utils.posydonwarning import Pwarn

# File naming patterns
MERGE_SCRIPT_PATTERN = "{met}_Zsun_merge_popsyn.slurm"
ARRAY_SCRIPT_PATTERN = "{met}_Zsun_slurm_array.slurm"
RESCUE_SCRIPT_PATTERN = "{met}_Zsun_rescue.slurm"
RESUBMIT_SCRIPT = "resubmit_slurm.sh"

[docs] def get_ini_file(args): '''Find and select the INI file for the population synthesis run. Parameters ---------- args : argparse.Namespace Command line arguments that needs to containing the run_folder path. Returns ------- str Path to the selected INI file. Raises ------ FileNotFoundError If no INI file is found in the run folder. ''' # Find and select the INI file ini_files = glob.glob(os.path.join(args.run_folder, '*.ini')) if not ini_files: raise FileNotFoundError("No INI file found in the run folder.") # Handle multiple INI files if len(ini_files) > 1: print("Multiple INI files found:\n") for idx, file in enumerate(ini_files): print(f"{idx}: {file}") print("") choice = input("Enter the index of the INI file you want to use: ") try: selected_index = int(choice) if not (0 <= selected_index < len(ini_files)): print("Invalid index; using the first INI file.") selected_ini = ini_files[0] else: selected_ini = ini_files[selected_index] except ValueError: print("Invalid input; using the first INI file.") selected_ini = ini_files[0] else: selected_ini = ini_files[0] print(f"\nUsing INI file:\n{selected_ini}") return selected_ini
[docs] def validate_run_folder(run_folder): """Validate that the run folder exists and is not empty. Parameters ---------- run_folder : str Path to the run folder to validate """ if not os.path.exists(run_folder): raise FileNotFoundError( f"Run folder '{run_folder}' does not exist.\n" "Please provide a valid path to a population run folder." ) try: folder_contents = os.listdir(run_folder) except (OSError, PermissionError) as e: # pragma: no cover raise FileNotFoundError( f"Cannot access run folder '{run_folder}': {e}\n" "Please check folder permissions." ) if not folder_contents: raise ValueError( f"Run folder '{run_folder}' is empty.\n" "This folder does not contain any population run files." )
[docs] def get_binary_params(ini_file): '''Read the binary population parameters from the INI file Parameters ---------- ini_file : str The path to the INI file Returns ------- int The number of metallicities int The number of binaries list of floats The list of metallicities in solar units dict The dictionary of population synthesis parameters from the INI file ''' # Read the population synthesis parameters synpop_params = binarypop_kwargs_from_ini(ini_file) metallicities = synpop_params.get('metallicity', []) num_metallicities = len(metallicities) number_of_binaries = synpop_params.get('number_of_binaries', 0) return num_metallicities, number_of_binaries, metallicities, synpop_params
[docs] def get_run_configuration(args): """Get and validate the run configuration from the run folder. Parameters ---------- args : argparse.Namespace Arguments containing run_folder path Returns ------- tuple (ini_file, num_metallicities, number_of_binaries, metallicities, synpop_params) Raises ------ FileNotFoundError If folder validation or INI file retrieval fails """ # Validate folder validate_run_folder(args.run_folder) # Get INI file ini_file = get_ini_file(args) # Get binary parameters num_metallicities, number_of_binaries, metallicities, synpop_params = \ get_binary_params(ini_file) print("REQUESTED PARAMETERS") print(f"# metallicities: {num_metallicities}") print(f"# binaries: {number_of_binaries}\n") return ini_file, num_metallicities, number_of_binaries, metallicities, synpop_params
# Check the population files and binary counts
[docs] def check_population_files(run_folder, metallicities): """Check if all merged population files exist. Parameters ---------- run_folder : str Path to the folder where the population run is located metallicities : list of floats List of metallicities to check in solar units Returns ------- bool True if all files exist, False otherwise dict Dictionary with metallicity as key and existence status as value """ print("POPULATION FILES CHECK ....................\n") print("MET\t\tPOP_FILE") all_exist = True status_dict = {} for met in metallicities: str_met = convert_metallicity_to_string(met) merged_file = os.path.join(run_folder, f'{str_met}_Zsun_population.h5') if not os.path.exists(merged_file): print_error(f"{str_met}\t\tNO") all_exist = False status_dict[met] = False else: print_success(f"{str_met} \t\tOK") status_dict[met] = True if not all_exist: print_error("POPULATION FILES CHECK ....................ERROR") else: clear_previous_lines(len(metallicities) + 3) print_success("POPULATION FILES CHECK ....................OK") return all_exist, status_dict
[docs] def check_binary_counts(run_folder, metallicities, expected_count): """Check if each population file has the expected number of binaries. Parameters ---------- run_folder : str Path to the folder where the population run is located metallicities : list of floats List of metallicities to check in solar units expected_count : int Expected number of binaries per file Returns ------- bool True if all counts match, False otherwise dict Dictionary with metallicity as key and binary count as value """ print("BINARY COUNT CHECK ....................\n") print("MET\t\tEXPECTED\tFOUND\tSTATUS") all_match = True counts_dict = {} for met in metallicities: str_met = convert_metallicity_to_string(met) merged_file = os.path.join(run_folder, f'{str_met}_Zsun_population.h5') try: pop = Population(merged_file) num_binaries = pop.number_of_systems counts_dict[met] = num_binaries if num_binaries != expected_count: print_error(f"{str_met}\t\t{expected_count}\t\t{num_binaries}" "\tMISMATCH") all_match = False else: print_success(f"{str_met}\t\t{expected_count}\t\t{num_binaries}" "\tOK") except Exception as e: print_error(f"{str_met}\t\t{expected_count}\t\t-" f"\tERROR: {str(e)}") all_match = False if not all_match: print_error("BINARY COUNT CHECK ....................ERROR") else: clear_previous_lines(len(metallicities) + 3) print_success("BINARY COUNT CHECK ....................OK") return all_match, counts_dict
[docs] def check_run_status(run_folder, metallicities, number_of_binaries): """Check whether the population files exist and if they have the expected number of binary counts. Parameters ---------- run_folder : str Path to the run folder metallicities : list List of metallicities to check number_of_binaries : int Expected number of binaries Returns ------- tuple (files_exist, counts_match, file_status) """ print("Checking the status of the population run") print_separator_line() files_exist, file_status = check_population_files(run_folder, metallicities) counts_match = False if files_exist: counts_match, binary_counts = check_binary_counts( run_folder, metallicities, number_of_binaries ) return files_exist, counts_match, file_status
# Check individual batch for reasons of the failed runs
[docs] def get_expected_batch_count(run_folder, str_met): """Parse SLURM script to get expected batch count. Parameters ---------- run_folder : str Path to the run folder str_met : str String representation of metallicity Returns ------- int or None Expected batch count, or None if not found """ slurm_script = os.path.join(run_folder, ARRAY_SCRIPT_PATTERN.format(met=str_met)) if not os.path.exists(slurm_script): return None with open(slurm_script, 'r') as f: for line in f: if line.startswith('#SBATCH --array='): array_range = line.split('=')[1].strip() if '-' in array_range: start, end = map(int, array_range.split('-')) return end - start + 1 return None
[docs] def find_missing_batch_indices(batch_folder, expected_count): """Find which batch indices are missing. Parameters ---------- batch_folder : str Path to the batch folder expected_count : int Expected number of batches Returns ------- set Set of missing batch indices """ batch_files = glob.glob(os.path.join(batch_folder, 'evolution.combined.*')) found_indices = set() for batch_file in batch_files: file_name = os.path.basename(batch_file) if 'evolution.combined' in file_name: # pragma: no cover idx_str = file_name.split('.')[-2] found_indices.add(int(idx_str)) return set(range(expected_count)) - found_indices
[docs] def select_job_id(run_folder, str_met): """Find and select a job ID from available log files. Parameters ---------- run_folder : str Path to the run folder str_met : str String representation of metallicity Returns ------- int or None Selected job ID, or None if no logs found """ jobIDs = glob.glob(os.path.join(run_folder, f'{str_met}_logs/popsyn_*.out')) if len(jobIDs) == 0: print("\n\033[33mNo log files found. Cannot determine failure reasons.\033[0m") print("This may indicate that the jobs were never submitted or the log directory is missing.") return None # Extract unique job IDs jobIDs = [ int(os.path.basename(job).split('_')[1].split('.')[0]) for job in jobIDs ] jobIDs = np.unique(jobIDs) # If multiple job IDs, let user select if len(jobIDs) > 1: print("Please select a job ID to use: ") for i, job_id in enumerate(jobIDs): print(f"{i}: {job_id}") selected_job_idx = None while selected_job_idx is None: # pragma: no cover try: idx = int(input("\nEnter the index to the job ID: ")) if 0 <= idx < len(jobIDs): selected_job_idx = idx return jobIDs[idx] else: # pragma: no cover print_error("Invalid selection. Please try again.") except ValueError: # pragma: no cover print_error("Please enter a valid number.") else: return jobIDs[0]
[docs] def read_batch_log_file(log_file_path, batch_index, str_met, jobID): """Read and analyze a batch log file to determine failure reason. Parameters ---------- log_file_path : str Path to the log file batch_index : int Index of the batch being analyzed str_met : str String representation of the metallicity jobID : int SLURM job ID Returns ------- None Prints the analysis results directly """ # Check if the log file exists if not os.path.exists(log_file_path): print_error(f"Batch {batch_index}: Log file not found") return try: with open(log_file_path, 'r') as f: lines = f.readlines() if not lines: print(f"Batch {batch_index}: <empty file>") return # Check for time limit exceeded if len(lines) >= 3: last_3_lines = [line.strip() for line in lines[-3:]] if "DUE TO TIME LIMIT" in str(last_3_lines): print(f"Batch {batch_index}: Wall time exceeded") else: print(f"{str_met}_logs/popsyn_{jobID}_{batch_index}.out:") for i, line in enumerate(last_3_lines): print(f" {i+1}: {line}") else: # File has fewer than 3 lines print(f"Batch {batch_index}: File contains {len(lines)} line(s):") if "DUE TO TIME LIMIT" in lines[0]: print(" Wall time exceeded") else: print(f"{str_met}_logs/popsyn_{jobID}_{batch_index}.out:") for i, line in enumerate(lines): print(f" {i+1}: {line.strip()}") except Exception as e: # pragma: no cover print_error(f"Batch {batch_index}: Error reading log file - {str(e)}")
[docs] def analyze_missing_batch_logs(run_folder, str_met, missing_indices): """Analyze log files for missing batches to determine failure reasons. Parameters ---------- run_folder : str Path to the run folder str_met : str String representation of metallicity missing_indices : set Set of missing batch indices """ if len(missing_indices) == 0: return jobID = select_job_id(run_folder, str_met) if jobID is None: return print(f"\nREADING THE LOGS OF {jobID} ....") print("Reasons for missing files:") for index in missing_indices: log_file_path = os.path.join(run_folder, f'{str_met}_logs/popsyn_{jobID}_{index}.out') read_batch_log_file(log_file_path, index, str_met, jobID)
[docs] def check_batch(run_folder, metallicity, batch_folder_name): """Check batch files for a specific metallicity when the population file is missing. Parameters ---------- run_folder : str Path to the folder where the population run is located metallicity : float Metallicity to check batch_folder_name : str Name of the folder containing batch files Returns ------- dict Dictionary with batch information including: - status: str ('complete', 'incomplete', 'folder_missing', etc.) - expected_count: int or None - found_count: int - metallicity: float - batch_folder: str - missing_indices: set or None """ str_met = convert_metallicity_to_string(metallicity) # Get expected batch count from SLURM script expected_batch_count = get_expected_batch_count(run_folder, str_met) # Check if batch folder exists batch_folder = os.path.join(run_folder, f'{str_met}_Zsun_{batch_folder_name}') if not os.path.exists(batch_folder): print_error(f"{str_met}\t\t-\t-\tNO BATCH DIR") return { "status": "folder_missing", "expected_count": expected_batch_count, "found_count": 0, "metallicity": metallicity, "batch_folder": batch_folder, "missing_indices": None, } # Count actual batch files batch_files = glob.glob(os.path.join(batch_folder, 'evolution.combined.*')) actual_count = len(batch_files) # Find missing batch indices if incomplete missing_indices = set() if expected_batch_count is not None and actual_count < expected_batch_count: missing_indices = find_missing_batch_indices(batch_folder, expected_batch_count) # Print status and determine overall status status = print_batch_status(str_met, expected_batch_count, actual_count, missing_indices) # Analyze logs for missing batches analyze_missing_batch_logs(run_folder, str_met, missing_indices) return { "status": status, "expected_count": expected_batch_count, "found_count": actual_count, "metallicity": metallicity, "batch_folder": batch_folder, "missing_indices": missing_indices if status == "incomplete" else None, }
# Get the status of all batches for missing populations
[docs] def get_batches_status(run_folder, missing_files, synpop_params): """Get the status of batch files for missing populations. Parameters ---------- run_folder : str Path to the run folder missing_files : dict Dictionary of metallicities with missing files synpop_params : dict Population synthesis parameters Returns ------- dict Batch status information for each metallicity """ batch_status = {} if missing_files: print("\nChecking batch files for missing populations:") for met in missing_files: print_separator_line() batch_status[met] = check_batch( run_folder, met, synpop_params.get('temp_directory', 'batches') ) print_separator_line() return batch_status
[docs] def get_user_confirmation(prompt, valid_yes=None, valid_no=None): """Get user confirmation with validation. Parameters ---------- prompt : str The prompt to show the user valid_yes : list, optional List of strings considered as "yes" (default: ['yes', 'y']) valid_no : list, optional List of strings considered as "no" (default: ['no', 'n']) Returns ------- bool True if user confirmed, False otherwise """ if valid_yes is None: valid_yes = ['yes', 'y'] if valid_no is None: valid_no = ['no', 'n'] choice = input(prompt).strip().lower() if choice in valid_yes: return True elif choice in valid_no: return False else: print_error(f"Unrecognized input '{choice}'. Treating as 'no'.") return False
[docs] def submit_slurm_job(script_path): """Submit a SLURM job and handle errors. Parameters ---------- script_path : str Path to the SLURM script to submit description : str, optional Description of the job for error messages Returns ------- bool True if submission succeeded, False otherwise """ try: result = subprocess.run( ['sbatch', script_path], check=True, capture_output=True, text=True ) job_id = result.stdout.strip() print_success(f" Job submitted: {job_id}") return True except subprocess.CalledProcessError as e: print_error(f"Failed to submit job: {e.stderr.strip()}") return False except FileNotFoundError: print_error("sbatch command not found. Is SLURM installed?") return False
[docs] def handle_batches_complete(args, missing_files, batch_status): """Handle the case where all batch files are complete. Parameters ---------- args : argparse.Namespace Command line arguments missing_files : dict Dictionary of metallicities with missing files batch_status : dict Batch status information Returns ------- bool True if merge jobs were handled (resubmitted or user declined), False if batches incomplete """ has_incomplete_batches = any( status["status"] != "complete" for status in batch_status.values() ) if not has_incomplete_batches: print_success("\nAll batch files are complete.") print("Please resubmit the merge jobs to generate the population files.") if get_user_confirmation("Would you like to resubmit the merge jobs? (yes/no): "): all_succeeded = True for met in missing_files: str_met = convert_metallicity_to_string(met) script_path = os.path.join( args.run_folder, MERGE_SCRIPT_PATTERN.format(met=str_met) ) if not submit_slurm_job(script_path): #pragma: no cover all_succeeded = False if all_succeeded: print_success("All merge jobs submitted successfully.") else: print_error("Some merge jobs failed to submit. Please check the errors above.") #pragma: no cover else: print("Merge jobs not resubmitted.") return True return False
########################################### ## Main function for checking a population ###########################################
[docs] def check_popsyn_function(args): """Function to check the status of a population run. Parameters ---------- args : argparse.Namespace The arguments passed to the function """ print(f"Checking the status of the population run in {args.run_folder}\n") # Get and validate configuration ini_file, num_metallicities, number_of_binaries, metallicities, synpop_params = \ get_run_configuration(args) # Check run status files_exist, counts_match, file_status = check_run_status( args.run_folder, metallicities, number_of_binaries ) # If all checks passed, we're done if files_exist and counts_match: print_success("\nAll checks passed successfully.") return 0 # Otherwise, investigate further print_error("\nOne or more checks failed.") print("We will attempt to rescue the failed runs.\n") print_separator_line() # Check batch status for missing files missing_files = {met: status for met, status in file_status.items() if not status} batch_status = get_batches_status(args.run_folder, missing_files, synpop_params) # Handle complete batches (just need merge) if handle_batches_complete(args, missing_files, batch_status): return 2 # If we get here, some batches are incomplete print("\nOne or more batch files are incomplete.\n" "We need to resubmit some of the batch jobs.\n") # Check if batch folders are missing if any(status["status"] == "folder_missing" for status in batch_status.values()): print_error("One or more batch folders are missing.") print("Cannot generate rescue scripts without the batch folders.") print("Please ensure that the batch folders exist and try again.") return 1 # Generate rescue scripts for incomplete batches print("We can generate rescue scripts for the incomplete batches.\n", "These scripts will resubmit only the missing batch jobs.\n" "You can change the script parameters by giving them as arguments\n" "to the 'posydon-popsyn rescue' command.\n") if get_user_confirmation("Do you want to create rescue scripts for the incomplete batches? " "(yes/no): "): # Create rescue scripts rescue_scripts = [] for _, status in batch_status.items(): print_separator_line() print("\nGENERATING A RESCUE SCRIPT ....................") rescue_scripts.append(create_batch_rescue_script(args, status)) print_separator_line() resubmit_sh_file = create_bash_submit_rescue_script(args.run_folder, rescue_scripts) if get_user_confirmation("Do you want to submit the rescue scripts now? (yes/no): "): os.system(f'sh {resubmit_sh_file}') print("Rescue scripts submitted.") return 0 else: print("Please submit the rescue scripts using the following command:") print("\n") print(f"sh {RESUBMIT_SCRIPT}") print("\n") return 2 else: print("Rescue scripts not created. Exiting.") return 2