Source code for posydon.utils.data_download

"""Functions for bin/get-posydon-data to handle the download from Zenodo

"""

__authors__ = [
    "Jeff J Andrews <jeffrey.andrews@northwestern.edu>",
    "Simone Bavera <Simone.Bavera@unige.ch>",
    "Matthias Kruckow <Matthias.Kruckow@unige.ch>",
]

import argparse
import hashlib
import os
import progressbar
import tarfile
import textwrap
import urllib.request
from tqdm import tqdm
from posydon.config import PATH_TO_POSYDON_DATA
from posydon.utils.datasets import COMPLETE_SETS, ZENODO_COLLECTION
from posydon.utils.posydonwarning import Pwarn

def _parse_commandline():
    """Parse the arguments given on the command-line

        Returns
        -------
        Namespace
            All the passed arguments from the commoand line or their defaults.

    """
    defined_sets = list(COMPLETE_SETS.keys()) + list(ZENODO_COLLECTION.keys())
    parser = argparse.ArgumentParser(description="Downloading POSYDON data "
                                                 "from Zenodo")
    parser.add_argument('dataset',
                        help="Name of the dataset to download (default: v1)",
                        nargs='?',
                        default='v1')
    parser.add_argument('-l', '--listedsets',
                        help="list the datasets: 'complete' shows the full "
                             "dataset able to run POSYDON, 'individual' lists "
                             "the datasets on zenodo, which might need others "
                             "to run population synthesis (default: complete)",
                        nargs='?',
                        const='complete',
                        choices=['complete', 'individual'])
    parser.add_argument('-n', '--nomd5check',
                        help="do not confirm md5 checksum (default: False)",
                        default=False,
                        action='store_true')
    parser.add_argument('-v', '--verbose',
                        help="run in Verbose Mode (default: False)",
                        default=False,
                        action='store_true')
    args = parser.parse_args()
    if args.dataset not in defined_sets:
        raise parser.error("unknown dataset, use -l to show defined sets")
    return args

[docs] class ProgressBar(): def __init__(self): self.pbar = None self.widgets = [progressbar.Bar(marker="#",left="[",right="]"), progressbar.Percentage(), " | ", progressbar.FileTransferSpeed(), " | ", progressbar.DataSize(), " / ", progressbar.DataSize(variable="max_value"), " | ", progressbar.ETA()] def __call__(self, block_num, block_size, total_size): if not self.pbar: self.pbar=progressbar.ProgressBar(widgets=self.widgets, max_value=total_size) self.pbar.start() downloaded = block_num * block_size if downloaded < total_size: self.pbar.update(downloaded) else: self.pbar.finish()
[docs] def list_datasets(individual_sets=False, verbose=False): """Print a list of available datasets Parameters ---------- individual_sets : boolean (default: False) Show the individual sets or only the complete sets. verbose : boolean (default: False) Enables verbose output. """ if individual_sets: print("Defined individual sets are:") for dataset in ZENODO_COLLECTION: prefix = f" - '{dataset}': " indent = " "*len(prefix) wrapper = textwrap.TextWrapper(initial_indent=prefix, width=80, subsequent_indent=indent) print(wrapper.fill(ZENODO_COLLECTION[dataset]['title'])) if verbose: wrapper = textwrap.TextWrapper(initial_indent=indent, width=80, subsequent_indent=indent) print(wrapper.fill(ZENODO_COLLECTION[dataset]['description'])) print(wrapper.fill("more information at " +ZENODO_COLLECTION[dataset]['url'])) else: print("Defined complete sets are:") for set_name,complete_set in COMPLETE_SETS.items(): print(f" - '{set_name}' consisting of:") for dataset in complete_set: prefix = f" - '{dataset}': " indent = " "*len(prefix) wrapper = textwrap.TextWrapper(initial_indent=prefix, width=80, subsequent_indent=indent) print(wrapper.fill(ZENODO_COLLECTION[dataset]['title'])) if verbose: wrapper = textwrap.TextWrapper(initial_indent=indent, width=80, subsequent_indent=indent) print(wrapper.fill(ZENODO_COLLECTION[dataset]['description'])) print(wrapper.fill("more information at " +ZENODO_COLLECTION[dataset]['url']))
[docs] def download_one_dataset(dataset='v1_for_v2.0.0-pre1', MD5_check=True, verbose=False): """Download a data set from Zenodo if they do not exist. Parameters ---------- dataset : string (default: 'v1_for_v2.0.0-pre1') Name of the data set to be in COMPLETE_SETS or ZENODO_COLLECTION. MD5_check : boolean (default: True) Use the MD5 check to make sure data is not corrupted. verbose : boolean (default: False) Enables verbose output. """ if not isinstance(dataset, str): raise TypeError("'dataset' should be a string.") if dataset not in ZENODO_COLLECTION: raise KeyError(f"The dataset '{dataset}' is not defined.") # First, generate filename and make sure the path does not exist data_url = ZENODO_COLLECTION[dataset]['data'] if data_url is None: raise ValueError(f"The dataset '{dataset}' has no publication yet.") original_md5 = ZENODO_COLLECTION[dataset]['md5'] if original_md5 is None: MD5_check = False Pwarn("MD5 undefined, skip MD5 check.", "ReplaceValueWarning") filename = os.path.basename(data_url) directory = os.path.dirname(PATH_TO_POSYDON_DATA) filepath = os.path.join(directory, filename) if not os.path.isdir(os.path.dirname(filepath)): raise NotADirectoryError("PATH_TO_POSYDON_DATA does not refer to a " "valid directory.") if os.path.exists(filepath): raise FileExistsError(f"POSYDON data already exists at {filepath}.") # Download the data print(f"Downloading POSYDON data '{dataset}' from Zenodo to {directory}") urllib.request.urlretrieve(data_url, filepath, ProgressBar()) # Compare original MD5 with freshly calculated if MD5_check: try: with open(filepath, "rb") as file_to_check: # read contents of the file data = file_to_check.read() # pipe contents of the file through md5_returned = hashlib.md5(data).hexdigest() if original_md5 == md5_returned: if verbose: print("MD5 verified.") else: # Delete file - we cannot rely upon that data os.remove(filepath) # Raise value error raise ValueError("MD5 verification failed!.") except: print('Failed to read the tar.gz file for MD5 verification, ' 'cannot guarantee file integrity (this error seems to ' 'happen only on macOS).') # extract each file print(f"Extracting POSYDON data '{dataset}' from tar file...") with tarfile.open(filepath) as tar: for member in tqdm(iterable=tar.getmembers(), total=len(tar.getmembers())): tar.extract(member=member, path=directory) # remove tar files after extracted if os.path.exists(filepath): if verbose: print('Removed downloaded tar file.') os.remove(filepath)
[docs] def data_download(set_name='v1', MD5_check=True, verbose=False): """Download data files from Zenodo if they do not exist. Parameters ---------- set_name : string (default: 'v1') Name of the data set to be in COMPLETE_SETS or ZENODO_COLLECTION. MD5_check : boolean (default: True) Use the MD5 check to make sure data is not corrupted. verbose : boolean (default: False) Enables verbose output. """ if not isinstance(set_name, str): raise TypeError("'set_name' should be a string.") # Check whether the set is in the complete sets or just a single dataset. if set_name in COMPLETE_SETS: for dataset in COMPLETE_SETS[set_name]: download_one_dataset(dataset=dataset, MD5_check=MD5_check, verbose=verbose) elif set_name in ZENODO_COLLECTION: if verbose: print("You are downloading a single data set, which might not " "contain all the data needed.") download_one_dataset(dataset=set_name, MD5_check=MD5_check, verbose=verbose) else: raise KeyError(f"The dataset '{set_name}' is not defined.")
def _get_posydon_data(): """Run the data download or list the datasets """ args = _parse_commandline() if args.listedsets == 'complete': list_datasets(individual_sets=False, verbose=args.verbose) elif args.listedsets == 'individual': list_datasets(individual_sets=True, verbose=args.verbose) else: data_download(set_name=args.dataset, MD5_check=not args.nomd5check, verbose=args.verbose)