Source code for compress_mesa

#!/usr/bin/env python
import os
import sys
import shutil
import random
import argparse
from tqdm import tqdm
from posydon.utils.posydonwarning import Pwarn


[docs] def textsize(filesize, floatfmt=".3g", base=1024, threshold=1000): """Get a human-readable file size in string. Parameters ---------- filesize : int or float The size of the file, or directory, ... (in bytes) floatfmt : str The format for the float before the size descriptor (e.g., 2.34K). base : int The base of the units. Typically 1024 but 1000 can be used as well. threshold : int or float The threshold for using the next unit. For example, if base is 1024 and threshold is 1000, then 1000 bytes will be returned as 0.98K. Returns ------- str The filesize in string format using b, K, M, ... """ if base not in [1000, 1024]: raise ValueError(f"base={base} should be 1000 or 1024") if threshold <= 0: raise ValueError(f"threshold={threshold} should be larger than 0") if base < threshold: raise ValueError(f"threshold={threshold} should be smaller or equal "\ f"to base={base}") if filesize < 0: return "-" + textsize(-filesize, floatfmt=floatfmt, base=base,\ threshold=threshold) units = ["b", "K", "M", "G", "T", "P", "E", "Z", "Y"] unit_values = [base**i for i in range(len(units))] for unit, unit_value in zip(units, unit_values): if filesize < unit_value * threshold: quantity = filesize / unit_value return f"{quantity:{floatfmt}}{unit}" return f"{filesize:.3g} bytes"
[docs] def set_up_test(args): """Set up a testing directory in the requested directory. Parameters (keys in `args`) --------------------------- mesa_dir : string The directory where the MESA tracks are stored. test_dir : string The directory where the test directory is to be set up. dsr : float Downsampling rate when creating testing directory. """ if args.test_dir is None: raise NameError("--test_dir needs to be specified for set_up_test") elif not os.path.isdir(args.test_dir): raise NotADirectoryError(f"Directory {args.test_dir} does not exist.") if args.mesa_dir is None: raise NameError("mesa_dir needs to be specified for set_up_test") elif not os.path.isdir(args.mesa_dir): raise NotADirectoryError(f"Directory {args.mesa_dir} does not exist.") for folder in os.listdir(args.mesa_dir): if os.path.isdir(os.path.join(args.mesa_dir, folder)): is_mesa_run = False sub_dir = os.listdir(os.path.join(args.mesa_dir, folder)) track_dirs = [] for _f in sub_dir: if "_grid_index_" in _f: is_mesa_run = True track_dirs.append(os.path.join(args.mesa_dir, folder, _f)) if is_mesa_run: # checking if directory is a mesa run os.mkdir(os.path.join(args.test_dir, folder)) # choosing which tracks to copy over inds = random.sample(list(range(len(track_dirs))), int(len(track_dirs) * args.dsr)) for ind in inds: if os.path.isdir(track_dirs[ind]): shutil.copytree(track_dirs[ind], os.path.join( args.test_dir, folder, os.path.split(track_dirs[ind])[1])) else: shutil.copy(track_dirs[ind], os.path.join( args.test_dir, folder, os.path.split(track_dirs[ind])[1])) print(f"Created Test Directory at {args.test_dir}.")
[docs] def compress_dir(args): """Compresses a directory containing tracks evolved with MESA. Parameters (keys in `args`) --------------------------- verbose : bool Enable/Disable additional output. mesa_dir : string The directory where the MESA tracks are stored. """ def get_size(start_path="."): total_size = 0 remove_files = [] compress_files = [] n_runs = 0 n_remove_files = 0 n_compress_files = 0 for dirpath, _, filenames in os.walk(start_path): if "_grid_index_" in dirpath: # checking if directory is mesa run new_remove_files = [] new_compress_files = [] if "_grid_index_" in os.path.basename(dirpath): n_runs += 1 else: new_remove_files = None new_compress_files = None for filename in filenames: filepath = os.path.join(dirpath, filename) # skip if it is symbolic link if not os.path.islink(filepath): total_size += os.path.getsize(filepath) # check for files in mesa run, whether to remove or compress it if new_remove_files is not None: name, ext = os.path.splitext(filename) if name == "core": # remove core dump files new_remove_files.append(filename) elif ext in [".data", ".mod", ".txt"]: # compress .data, .mod, .txt files new_compress_files.append(filename) if ((new_remove_files is not None) and (len(new_remove_files)>0)): remove_files.append((dirpath, new_remove_files)) n_remove_files += len(new_remove_files) if ((new_compress_files is not None) and (len(new_compress_files)>0)): compress_files.append((dirpath, new_compress_files)) n_compress_files += len(new_compress_files) return total_size, remove_files, compress_files, n_runs,\ n_remove_files, n_compress_files if args.mesa_dir is None: raise NameError("mesa_dir needs to be specified for set_up_test") elif not os.path.isdir(args.mesa_dir): raise NotADirectoryError(f"Directory {args.mesa_dir} does not exist.") og_size, to_remove, to_compress, n_runs, n_remove_files, n_compress_files\ = get_size(args.mesa_dir) if args.verbose: print("remove", n_remove_files, "core dump files in", len(to_remove),\ "directories of", n_runs, "MESA runs") for folder, files in tqdm(to_remove): for remove_file in files: if os.path.isfile(os.path.join(folder, remove_file)): # print("remove:", os.path.join(folder, remove_file)) try: os.remove(os.path.join(folder, remove_file)) except: print("Could not remove:", remove_file, "in", folder) if args.verbose: print("compress", n_compress_files, "files in", len(to_compress),\ "directories of", n_runs, "MESA runs") for folder, files in tqdm(to_compress): for compress_file in files: if os.path.isfile(os.path.join(folder, compress_file)): # print(f"gzip -1 {os.path.join(folder, compress_file)}") os.system(f"gzip -1 {os.path.join(folder, compress_file)}") new_size, to_remove, to_compress, n_runs, n_remove_files, n_compress_files\ = get_size(args.mesa_dir) if args.verbose: print("") print("Compressed MESA tracks") print(f"Original size {textsize(og_size)} | "\ f"Compressed size {textsize(new_size)}") if len(to_remove)>0: Pwarn("Still files to remove: {}".format(to_remove), "IncompletenessWarning") if len(to_compress)>0: Pwarn("Still files to compress: {}".format(to_compress), "IncompletenessWarning")
if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("-f", "--function", type=str, help="The name of the "\ "function to be called, either set_up_test or "\ "compress_dir.", default='compress_dir') parser.add_argument("-td", "--test_dir", type=str, help="The path to "\ "where the testing directory should be set up.",\ default=None) parser.add_argument("-dsr", "--dsr", type=float, help="Downsampling rate "\ "when creating testing directory", default=0.01) parser.add_argument("-v", "--verbose", type=bool, help="Enable/Disable "\ "outputs", default=True) parser.add_argument("mesa_dir", type=str, help="The path to the "\ "directory containing MESA-generated data",\ default=None) functions = {"set_up_test": set_up_test, "compress_dir": compress_dir} arguments = parser.parse_args() functions[arguments.function](arguments)