Source code for posydon.utils.compress_mesa_files

"""Functions for bin/compress-mesa to handle the compression of files created
with MESA

"""

__authors__ = [
    "Konstantinos Kovlakas <Konstantinos.Kovlakas@unige.ch>",
    "Philipp Moura Srivastava <philipp.msrivastava@gmail.com>",
    "Matthias Kruckow <Matthias.Kruckow@unige.ch>",
]

import os
import sys
import shutil
import random
import argparse
from tqdm import tqdm
from posydon.utils.posydonwarning import Pwarn

def _parse_commandline():
    """Parse the arguments given on the command line

        Returns
        -------
        Namespace
            All the passed arguments from the command line or their defaults.

    """
    parser = argparse.ArgumentParser(description="Compressing MESA files")
    parser.add_argument("-td", "--test_dir",
                        type=str,
                        help="the path to where the testing directory should "
                             "be set up instead of compressing files",
                        default=None)
    parser.add_argument("-dsr", "--dsr",
                        type=float,
                        help="downsampling rate when creating test directory",
                        default=0.01)
    parser.add_argument("-v", "--verbose",
                        help="enable outputs",
                        default=False,
                        action='store_true')
    parser.add_argument("-d", "--debug",
                        help="enable debugging outputs",
                        default=False,
                        action='store_true')
    parser.add_argument("mesa_dir",
                        type=str,
                        help="the path to the directory containing "
                             "MESA-generated data",
                        default=None)
    args = parser.parse_args()
    return args


[docs] def textsize(filesize, floatfmt=".3g", base=1024, threshold=1000): """Get a human-readable file size in string. Parameters ---------- filesize : int or float The size of the file, or directory, ... (in bytes) floatfmt : str The format for the float before the size descriptor (e.g., 2.34K). base : int The base of the units. Typically 1024 but 1000 can be used as well. threshold : int or float The threshold for using the next unit. For example, if base is 1024 and threshold is 1000, then 1000 bytes will be returned as 0.98K. Returns ------- str The filesize in string format using b, K, M, ... """ if base not in [1000, 1024]: raise ValueError(f"base={base} should be 1000 or 1024") if threshold <= 0: raise ValueError(f"threshold={threshold} should be larger than 0") if base < threshold: raise ValueError(f"threshold={threshold} should be smaller or equal "\ f"to base={base}") if filesize < 0: return "-" + textsize(-filesize, floatfmt=floatfmt, base=base,\ threshold=threshold) units = ["b", "K", "M", "G", "T", "P", "E", "Z", "Y"] unit_values = [base**i for i in range(len(units))] for unit, unit_value in zip(units, unit_values): if filesize < unit_value * threshold: quantity = filesize / unit_value return f"{quantity:{floatfmt}}{unit}" return f"{filesize:.3g} bytes"
[docs] def set_up_test(args): """Set up a testing directory in the requested directory. It copies data from the mesa_dir into the testing directory. Parameters (keys in `args`) --------------------------- mesa_dir : string The directory where the MESA tracks are stored. test_dir : string The directory where the test directory is to be set up. dsr : float Downsampling rate when creating test directory. The test directory will contain a random sample of the runs from mesa_dir, downsampled by the factor set here. """ if args.test_dir is None: raise NameError("--test_dir needs to be specified for set_up_test") elif not os.path.isdir(args.test_dir): raise NotADirectoryError(f"Directory {args.test_dir} does not exist.") if args.mesa_dir is None: raise NameError("mesa_dir needs to be specified for set_up_test") elif not os.path.isdir(args.mesa_dir): raise NotADirectoryError(f"Directory {args.mesa_dir} does not exist.") for folder in os.listdir(args.mesa_dir): if os.path.isdir(os.path.join(args.mesa_dir, folder)): is_mesa_run = False sub_dir = os.listdir(os.path.join(args.mesa_dir, folder)) track_dirs = [] for _f in sub_dir: if "_grid_index_" in _f: is_mesa_run = True track_dirs.append(os.path.join(args.mesa_dir, folder, _f)) if is_mesa_run: # checking if directory is a mesa run os.mkdir(os.path.join(args.test_dir, folder)) # choosing which tracks to copy over inds = random.sample(list(range(len(track_dirs))), int(len(track_dirs) * args.dsr)) for ind in inds: if os.path.isdir(track_dirs[ind]): shutil.copytree(track_dirs[ind], os.path.join( args.test_dir, folder, os.path.split(track_dirs[ind])[1])) else: shutil.copy(track_dirs[ind], os.path.join( args.test_dir, folder, os.path.split(track_dirs[ind])[1])) print(f"Created Test Directory at {args.test_dir}.")
[docs] def get_size(start_path="."): """Gets the size of a directory and selects MESA files for compression and removal. Parameters ---------- start_path : string The directory root to start the file system walk. Returns ------- total_size : int The size in bytes. remove_files : list List of files to remove. compress_files : list List of files to compress. n_runs : int Number of MESA run directories. n_remove_files : int Number of files to remove. n_compress_files : int Number of files to compress. """ total_size = 0 remove_files = [] compress_files = [] n_runs = 0 n_remove_files = 0 n_compress_files = 0 for dirpath, _, filenames in os.walk(start_path): if "_grid_index_" in dirpath: # checking if directory is mesa run new_remove_files = [] new_compress_files = [] if "_grid_index_" in os.path.basename(dirpath): n_runs += 1 else: new_remove_files = None new_compress_files = None for filename in filenames: filepath = os.path.join(dirpath, filename) # skip if it is symbolic link if not os.path.islink(filepath): total_size += os.path.getsize(filepath) # check for files in mesa run, whether to remove or compress it if new_remove_files is not None: name, ext = os.path.splitext(filename) if name == "core": # remove core dump files new_remove_files.append(filename) elif ext in [".data", ".mod", ".txt"]: # compress .data, .mod, .txt files new_compress_files.append(filename) if ((new_remove_files is not None) and (len(new_remove_files)>0)): remove_files.append((dirpath, new_remove_files)) n_remove_files += len(new_remove_files) if ((new_compress_files is not None) and (len(new_compress_files)>0)): compress_files.append((dirpath, new_compress_files)) n_compress_files += len(new_compress_files) return (total_size, remove_files, compress_files, n_runs, n_remove_files, n_compress_files)
[docs] def compress_dir(args): """Compresses a directory containing tracks evolved with MESA. Parameters (keys in `args`) --------------------------- verbose : bool Enable/Disable additional output. mesa_dir : string The directory where the MESA tracks are stored. """ if args.mesa_dir is None: raise NameError("mesa_dir needs to be specified for set_up_test") elif not os.path.isdir(args.mesa_dir): raise NotADirectoryError(f"Directory {args.mesa_dir} does not exist.") og_size, to_remove, to_compress, n_runs, n_remove_files, n_compress_files\ = get_size(args.mesa_dir) if args.verbose: print("remove", n_remove_files, "core dump files in", len(to_remove),\ "directories of", n_runs, "MESA runs") for folder, files in tqdm(to_remove): for remove_file in files: if os.path.isfile(os.path.join(folder, remove_file)): if args.debug: print("remove:", os.path.join(folder, remove_file)) try: os.remove(os.path.join(folder, remove_file)) except: #pragma: no cover print("Could not remove:", remove_file, "in", folder) else: #pragma: no cover raise FileNotFoundError(f"{os.path.join(folder, remove_file)}" " is not a file.") if args.verbose: print("compress", n_compress_files, "files in", len(to_compress),\ "directories of", n_runs, "MESA runs") for folder, files in tqdm(to_compress): for compress_file in files: if os.path.isfile(os.path.join(folder, compress_file)): if args.debug: print("compress:", os.path.join(folder, compress_file)) os.system(f"gzip -1 {os.path.join(folder, compress_file)}") else: #pragma: no cover raise FileNotFoundError(f"{os.path.join(folder, remove_file)}" " is not a file.") new_size, to_remove, to_compress, n_runs, n_remove_files, n_compress_files\ = get_size(args.mesa_dir) if args.verbose: print("") print("Compressed MESA tracks") print(f"Original size {textsize(og_size)} | "\ f"Compressed size {textsize(new_size)}") if len(to_remove)>0: #pragma: no cover Pwarn("Still files to remove: {}".format(to_remove), "IncompletenessWarning") if len(to_compress)>0: #pragma: no cover Pwarn("Still files to compress: {}".format(to_compress), "IncompletenessWarning")
def _compress_MESA(): """Run the compression of MESA files """ args = _parse_commandline() if args.test_dir is not None: set_up_test(args) else: compress_dir(args)