#!/usr/bin/env python
import os
import sys
import shutil
import random
import argparse
from tqdm import tqdm
from posydon.utils.posydonwarning import Pwarn
def textsize(filesize, floatfmt=".3g", base=1024, threshold=1000):
"""Get a human-readable file size in string.
filesize : int or float
The size of the file, or directory, ... (in bytes)
floatfmt : str
The format for the float before the size descriptor (e.g., 2.34K).
base : int
The base of the units. Typically 1024 but 1000 can be used as well.
threshold : int or float
The threshold for using the next unit. For example, if base is 1024
and threshold is 1000, then 1000 bytes will be returned as 0.98K.
The filesize in string format using b, K, M, ...
if base not in [1000, 1024]:
raise ValueError(f"base={base} should be 1000 or 1024")
if threshold <= 0:
raise ValueError(f"threshold={threshold} should be larger than 0")
if base < threshold:
raise ValueError(f"threshold={threshold} should be smaller or equal "\
f"to base={base}")
if filesize < 0:
return "-" + textsize(-filesize, floatfmt=floatfmt, base=base,\
units = ["b", "K", "M", "G", "T", "P", "E", "Z", "Y"]
unit_values = [base**i for i in range(len(units))]
for unit, unit_value in zip(units, unit_values):
if filesize < unit_value * threshold:
quantity = filesize / unit_value
return f"{quantity:{floatfmt}}{unit}"
return f"{filesize:.3g} bytes"
def set_up_test(args):
"""Set up a testing directory in the requested directory.
Parameters (keys in `args`)
mesa_dir : string
The directory where the MESA tracks are stored.
test_dir : string
The directory where the test directory is to be set up.
dsr : float
Downsampling rate when creating testing directory.
if args.test_dir is None:
raise NameError("--test_dir needs to be specified for set_up_test")
elif not os.path.isdir(args.test_dir):
raise NotADirectoryError(f"Directory {args.test_dir} does not exist.")
if args.mesa_dir is None:
raise NameError("mesa_dir needs to be specified for set_up_test")
elif not os.path.isdir(args.mesa_dir):
raise NotADirectoryError(f"Directory {args.mesa_dir} does not exist.")
for folder in os.listdir(args.mesa_dir):
if os.path.isdir(os.path.join(args.mesa_dir, folder)):
is_mesa_run = False
sub_dir = os.listdir(os.path.join(args.mesa_dir, folder))
track_dirs = []
for _f in sub_dir:
if "_grid_index_" in _f:
is_mesa_run = True
track_dirs.append(os.path.join(args.mesa_dir, folder, _f))
if is_mesa_run: # checking if directory is a mesa run
os.mkdir(os.path.join(args.test_dir, folder))
# choosing which tracks to copy over
inds = random.sample(list(range(len(track_dirs))),
int(len(track_dirs) * args.dsr))
for ind in inds:
if os.path.isdir(track_dirs[ind]):
shutil.copytree(track_dirs[ind], os.path.join(
shutil.copy(track_dirs[ind], os.path.join(
print(f"Created Test Directory at {args.test_dir}.")
def compress_dir(args):
"""Compresses a directory containing tracks evolved with MESA.
Parameters (keys in `args`)
verbose : bool
Enable/Disable additional output.
mesa_dir : string
The directory where the MESA tracks are stored.
def get_size(start_path="."):
total_size = 0
remove_files = []
compress_files = []
n_runs = 0
n_remove_files = 0
n_compress_files = 0
for dirpath, _, filenames in os.walk(start_path):
if "_grid_index_" in dirpath: # checking if directory is mesa run
new_remove_files = []
new_compress_files = []
if "_grid_index_" in os.path.basename(dirpath):
n_runs += 1
new_remove_files = None
new_compress_files = None
for filename in filenames:
filepath = os.path.join(dirpath, filename)
# skip if it is symbolic link
if not os.path.islink(filepath):
total_size += os.path.getsize(filepath)
# check for files in mesa run, whether to remove or compress it
if new_remove_files is not None:
name, ext = os.path.splitext(filename)
if name == "core":
# remove core dump files
elif ext in [".data", ".mod", ".txt"]:
# compress .data, .mod, .txt files
if ((new_remove_files is not None) and
remove_files.append((dirpath, new_remove_files))
n_remove_files += len(new_remove_files)
if ((new_compress_files is not None) and
compress_files.append((dirpath, new_compress_files))
n_compress_files += len(new_compress_files)
return total_size, remove_files, compress_files, n_runs,\
n_remove_files, n_compress_files
if args.mesa_dir is None:
raise NameError("mesa_dir needs to be specified for set_up_test")
elif not os.path.isdir(args.mesa_dir):
raise NotADirectoryError(f"Directory {args.mesa_dir} does not exist.")
og_size, to_remove, to_compress, n_runs, n_remove_files, n_compress_files\
= get_size(args.mesa_dir)
if args.verbose:
print("remove", n_remove_files, "core dump files in", len(to_remove),\
"directories of", n_runs, "MESA runs")
for folder, files in tqdm(to_remove):
for remove_file in files:
if os.path.isfile(os.path.join(folder, remove_file)):
# print("remove:", os.path.join(folder, remove_file))
os.remove(os.path.join(folder, remove_file))
print("Could not remove:", remove_file, "in", folder)
if args.verbose:
print("compress", n_compress_files, "files in", len(to_compress),\
"directories of", n_runs, "MESA runs")
for folder, files in tqdm(to_compress):
for compress_file in files:
if os.path.isfile(os.path.join(folder, compress_file)):
# print(f"gzip -1 {os.path.join(folder, compress_file)}")
os.system(f"gzip -1 {os.path.join(folder, compress_file)}")
new_size, to_remove, to_compress, n_runs, n_remove_files, n_compress_files\
= get_size(args.mesa_dir)
if args.verbose:
print("Compressed MESA tracks")
print(f"Original size {textsize(og_size)} | "\
f"Compressed size {textsize(new_size)}")
if len(to_remove)>0:
Pwarn("Still files to remove: {}".format(to_remove),
if len(to_compress)>0:
Pwarn("Still files to compress: {}".format(to_compress),
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("-f", "--function", type=str, help="The name of the "\
"function to be called, either set_up_test or "\
"compress_dir.", default='compress_dir')
parser.add_argument("-td", "--test_dir", type=str, help="The path to "\
"where the testing directory should be set up.",\
parser.add_argument("-dsr", "--dsr", type=float, help="Downsampling rate "\
"when creating testing directory", default=0.01)
parser.add_argument("-v", "--verbose", type=bool, help="Enable/Disable "\
"outputs", default=True)
parser.add_argument("mesa_dir", type=str, help="The path to the "\
"directory containing MESA-generated data",\
functions = {"set_up_test": set_up_test, "compress_dir": compress_dir}
arguments = parser.parse_args()