Source code for posydon.interpolation.IF_interpolation

"""Module for performing initial-final inteprolation.

We showcase the initial-final interpolator which plays a critical role in the
evolving binary populations. To use the initial-final interpolator we first
import the IFInterpolator class from the POSYDON library.


  # importing interpolator
  from posydon.interpolation.IF_interpolation import IFInterpolator

I. Loading a Pretrained Interpolator
------------------------------------

To load a pretrained interpolator we need to
pass in the filename argument into the IFInterpolator
constructor which specifies the path to a .pkl file where
the pretrained interpolator can be loaded from. POSYDON provides
various pretrained models whose corresponding .pkl files
can be found in the data directory of the POSYDON repository.


  model = IFInterpolator()

  model.load("path/to/file.pkl")


II. Training the Interpolator
-------------------------

The interpolator can be trained using an instance of the PSyGrid
class which can be constructed by running ones own simulations or
by loading a simulation from an h5 file stored in the data directory
of the POSYDON repository. For more details on the PSyGrid class
please visit the PSyGrid documentation. The IFInterpolator
class relies on the BaseIFInterpolator class to perform the interpolation
so parameters to construct instances of the BaseIFInterpolator classes are
required to construct the IFInterpolator. These parameters are:

1. interp_method: the interpolation method to be used can be either
linear, 1NN, or a list specifying which interpolation method to
be used for each type of track. If interp_classes is specified and this
parameter is not a list then the interpolator will use the specified method
for all classes.

2. interp_classes: a list of classes that the simulation tracks can
fall into. Usually specified as the mass transfer type. This only needs
be specified if interp_method is a list. Note that when using class-wise normalization 
only classes in interp_classes are normalized. This is the behavior for interpolation normalization
but not classification normalization.

3. class_method: the classification method to be used, either kNN or
1NN.

4. in_keys: the keys to be used as the input to the interpolator, by default
these are star_1_mass, star_2_mass, and period_days.

5. out_keys: the keys for which the interpolator is supposed to provide values,
by default all keys are used.

6. in_scaling: The scalings for the input keys, by default these scalings are
optimized through Monte Carlo Cross Validation.

7. out_scaling: The scalings for the output keys, by default these scalings
are optimized through Monte Carlo Cross Validation.

8. c_keys: A list of strings specifying which classifiers are to be trained

9. c_key: A string specifying by which class the interpolator should
interpolate binaries. Only to be specified in the MCInterpolator case.

For most applications specifying only the first four parameters is recommended.


    from posydon.grids.psygrid import PSyGrid
    from posydon.interpolation.IF_interpolation import IFInterpolator

    grid = PSyGrid("path/to/h5/file.h5") # loading grid from h5 file

    interp = IFInterpolator(grid = grid, interpolators = [
        {
            "interp_method": ["linear", "linear", "linear", "linear"],
            "interp_classes": ["no_MT", "stable_MT", "unstable_MT", "stable_reverse_MT"], # classes not in this array will not be normalized in class-wise normalization
            "out_keys": first,
            "class_method": "kNN",
            "c_keys": ["interpolation_class"],
            "c_key": "interpolation_class"
        },
        {
            "interp_method": ["linear", "linear", "linear", "linear"],
            "interp_classes": ["None", "BH", "WD", "NS"], # classes not in this array will not be normalized in class-wise normalization
            "out_keys": second,
            "class_method": "kNN",
            "c_keys": ['S1_direct_state'],
            "c_key": 'S1_direct_state'
        },
        {
            "interp_method": ["linear", "linear", "linear", "linear"],
            "interp_classes": ["None", "BH", "WD", "NS"], # classes not in this array will not be normalized in class-wise normalization
            "out_keys": third,
            "class_method": "kNN",
            "c_keys": ['S1_Fryer+12-rapid_state'],
            "c_key": 'S1_Fryer+12-rapid_state'
        },
        {
            "interp_method": ["linear", "linear", "linear", "linear"],
            "interp_classes": ["None", "BH", "WD", "NS"], # classes not in this array will not be normalized in class-wise normalization
            "out_keys": fourth,
            "class_method": "kNN",
            "c_keys": ['S1_Fryer+12-delayed_state'],
            "c_key": 'S1_Fryer+12-delayed_state'
        },
        {
            "interp_method": ["linear", "linear", "linear", "linear"],
            "interp_classes": ["None", "BH", "WD", "NS"], # classes not in this array will not be normalized in class-wise normalization
            "out_keys": fifth,
            "class_method": "kNN",
            "c_keys": ['S1_Sukhbold+16-engineN20_state'],
            "c_key": 'S1_Sukhbold+16-engineN20_state'
        },
        {
            "interp_method": ["linear", "linear", "linear", "linear"],
            "interp_classes": ["None", "BH", "WD", "NS"], # classes not in this array will not be normalized in class-wise normalization
            "out_keys": sixth,
            "class_method": "kNN",
            "c_keys": ['S1_Patton&Sukhbold20-engineN20_state'],
            "c_key": 'S1_Patton&Sukhbold20-engineN20_state'
        }
    ]) # constructing IFInterpolator

    interp.train() # training interpolator


III. Using the Interpolator
---------------------------

Once the interpolator has been trained or loaded from a .pkl file it can be
used to accomplish various tasks which most commonly are to classify a track
into its class given an input vector and or to approximate a final vector given
an input vector.


    from posydon.binary_evol.binarystar import BinaryStar
    from posydon.binary_evol.singlestar import SingleStar


    # creating binary, refer to BinaryStar documentation
    binary = BinaryStar(**binary_params,
                        star_1=SingleStar(**star1_params),
                        star_2=SingleStar(**star2_params))

    # evaluating returns a tuple of dictionaries
    interpolation, classification = interp.evaluate(binary)

Finally a trained interpolator can be easily saved by specifying a path to a
.pkl file where the interpolator will be saved to.


   model.save("path/to/file.pkl") # saving interpolator



"""


__authors__ = [
    "Juanga Serra Perez <jgserra@northwestern.edu>",
    "Konstantinos Kovlakas <Konstantinos.Kovlakas@unige.ch>",
    "Simone Bavera <Simone.Bavera@unige.ch>",
    "Philipp Moura Srivastava <philipp.msrivastava@northwestern.edu>",
    "Jeffrey Andrews <jeffrey.andrews@northwestern.edu>",
]


import os
import pickle
from datetime import date
# POSYDON
from posydon.grids.psygrid import PSyGrid
from posydon.interpolation.data_scaling import DataScaler
from posydon.utils.posydonwarning import Pwarn
# Maths
import numpy as np
# Machine Learning
from scipy.interpolate import LinearNDInterpolator
from sklearn.neighbors import KNeighborsRegressor
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import balanced_accuracy_score
from sklearn.metrics import confusion_matrix
# Plotting
import matplotlib.pyplot as plt
from matplotlib.lines import Line2D
from matplotlib.patches import Patch
from matplotlib.colors import ListedColormap
from posydon.visualization.plot_defaults import DEFAULT_LABELS
from posydon.interpolation.constraints import (
    find_constraints_to_apply, sanitize_interpolated_quantities)


# INITIAL-FINAL INTERPOLATOR
[docs] class IFInterpolator: """Class handling initial-final interpolation. Class handling initial-final interpolation with support to interpolate certain keys by differing classes. """ def __init__(self, grid=None, interpolators=None): """Initialize the IFInterpolator class. Parameters ---------- grid : PSyGrid The training grid interpolators : list of dictionaries Contain parameters for the BaseIFIneterpolators to be constructed """ self.interpolators = [] self.interpolator_parameters = interpolators self.grid = grid if self.interpolator_parameters is not None: out_keys = [] for params in self.interpolator_parameters: if ("out_keys" not in params and len(self.interpolator_parameters) > 1): raise ValueError("Overlapping out keys between different " "interpolators are not permited!") elif "out_keys" not in params: continue for key in params["out_keys"]: if(key in out_keys): raise ValueError( f"Overlapping out keys between different " f"interpolators are not permited! ({key} in more " f"than one set of out_keys)") else: out_keys.append(key)
[docs] def train(self): """Train the interpolator(s) on the PSyGrid used for construction.""" for interpolator in self.interpolator_parameters: self.interpolators.append(BaseIFInterpolator(grid=self.grid, **interpolator)) self.interp_in_q = self.interpolators[0].interp_in_q
[docs] def evaluate(self, binary, sanitization_verbose=False): """Get the output vector approximation. Get the output vector approximation as well as all of the classifications given a Binary Star Parameters ---------- binary : BinaryStar A class which is an input vector which are to be classified and for which an output vector will be approximated. Returns ------- Output space approximation as a tuple containing two dictionary """ ynums = {} ycats = {} for interpolator in self.interpolators: ynum, ycat = interpolator.evaluate(binary, sanitization_verbose) ynums = {**ynums, **ynum} ycats = {**ycats, **ycat} return ynums, ycats
[docs] def test_interpolator(self, initial_values, sanitization_verbose = False): """ Method that can take in a 2-D numpy array for more efficient use of the interpolator Parameters ---------- initial_values : numpy array A numpy array containing the in-key values of the binaries to be evolved Return ------ Interpolated values """ new_values = np.array(initial_values, copy = True) if self.interp_in_q: new_values.T[1] = new_values.T[1] / new_values.T[0] final_values = [] for interpolator in self.interpolators: final_values.append(interpolator.test_interpolator(new_values).T) return np.concatenate(final_values).T
[docs] def test_classifiers(self, initial_values): """ Method that can take in a 2-D numpy array for more efficient use of classifiers Parameters ---------- initial_values : numpy array A numpy array containing the in-key values of the binaries to be classified Return ------ Classified values """ new_values = np.array(initial_values, copy = True) if self.interp_in_q: new_values.T[1] = new_values.T[1] / new_values.T[0] classes = {} for interpolator in self.interpolators: classes = {**classes, **interpolator.test_classifiers(new_values)} return classes
[docs] def load(self, filename): """Load a saved IFInterpolator from a .pkl file. Parameters ---------- filename : string Path to the .pkl file """ with open(filename, 'rb') as f: self.interpolators = pickle.load(f) self.interp_in_q = self.interpolators[0].interp_in_q
[docs] def save(self, filename): """Save the interpolator to a .pkl file. Parameters ---------- filename : string Path where .pkl file will be saved """ with open(filename, "wb") as f: pickle.dump(self.interpolators, f)
[docs] class BaseIFInterpolator: """Class handling the initial-final interpolation for POSYDON.""" def __init__(self, grid=None, in_keys=None, out_keys=None, out_nan_keys=None, in_scaling=None, out_scaling=None, filename=None, interp_method="linear", interp_classes=None, class_method="kNN", c_keys=None, c_key=None): """Initialize the BaseIFInterpolation. Initialize the BaseIFInterpolation and if 'filename' is provided load a pretrained interpolator. If no filename is provided, the interpolator(s) and classifier(s) are trained upon initialization. Parameters ---------- grid : PSyGrid An instance of the PSyGrid class. in_keys : list of strings The keys to be used as the input to the interpolator, by default these are star_1_mass, star_2_mass, and period_days. out_keys : list of strings The keys for which the interpolator is supposed to provide values, by default all keys are used. out_nan_keys : list of strings The keys for which the interpolator is supposed to provide values, but are all nan in the grid. in_scaling : list of strings The scalings for the input keys, by default these scalings are optimized through Monte Carlo Cross Validation. out_scaling : list of strings The scalings for the output keys, by default these scalings are optimized through Monte Carlo Cross Validation. filename : string A path to a .pkl file containing a saved interpolator interp_method : string or list of strings The interpolation method to be used can be either linear, 1NN, or a list specifying which interpolation method to be used for each type of track. If interp_classes is specified and this parameter is not a list then the interpolator will use the specified method for all classes. interp_classes : list of strings A list of classes that the simulation tracks can fall into. Usually specified as the mass transfer type. This only needs be specified if interp_method is a list. class_method : string or list of strings The classification method to be used, either kNN or 1NN. c_keys: list of strings The keys for which a classifier should be trained. At a minimum should contain the keys needed for multi-class interpolation if it is requested """ self.in_keys, self.out_keys = in_keys, out_keys self.out_nan_keys = out_nan_keys self.in_scaling, self.out_scaling = in_scaling, out_scaling self.n_in, self.n_out = 0, 0 self.in_scalers, self.out_scalers = [], [] self.interp_in_q = False self.N = [] self.valid = None self.c_key = c_key if c_key is not None else "interpolation_class" self.interp_method = interp_method self.interpolator = None ignored_classes = ['not_converged', 'ignored_no_BH', 'ignored_no_RLO', 'initial_MT'] self.valid_classes = np.unique(grid.final_values["interpolation_class"]).tolist() # A list of mass-transfer classes that are kept for interpolations (exclude e.g. 'not_converged' class). for i in ignored_classes: while(i in self.valid_classes): self.valid_classes.remove(i) if interp_classes is not None: if not isinstance(interp_classes, list): raise ValueError("interp_classes must be a list of " "valid interpolation methods.") if isinstance(self.interp_method, list): if len(self.interp_method) != len(interp_classes): raise ValueError("Number of interpolation methods must " "match number of interpolation classes.") else: self.interp_method = [self.interp_method] * len(interp_classes) self.interp_classes = interp_classes self.class_method = class_method self.classifiers = None if filename is not None: self.load(filename) else: if grid is None: raise RuntimeError( "grid must be specified to create an IF interpolator." " Conversely, load an existing model specifying filename.") if self.in_keys is None: self.in_keys = ['star_1_mass', 'star_2_mass', 'period_days'] if self.out_keys is None: self.out_keys = [ key for key in grid.final_values.dtype.names if key != "model_number" and (type(grid.final_values[key][0]) != np.str_) and any(~np.isnan(grid.final_values[key])) ] if self.out_nan_keys is None: self.out_nan_keys = [ key for key in grid.final_values.dtype.names if type(grid.final_values[key][0]) != np.str_ and all(np.isnan(grid.final_values[key])) ] self.constraints = find_constraints_to_apply(self.out_keys) if c_keys is None: c_keys = [key for key in grid.final_values.dtype.names if type(grid.final_values[key][0]) == np.str_] self.classifiers = {key: None for key in c_keys} self.n_in, self.n_out = len(self.in_keys), len(self.out_keys) self.interp_in_q = self._interpIn_q(grid) self.XT, self.YT = self._grid2array(grid) self.valid = self._setValid( grid.final_values["interpolation_class"], self.XT) self.N = np.sum(self.valid >= 0) analize_nans(self.out_keys, self.YT, self.valid) analize_nones(self.classifiers, self.valid, grid) if (self.interp_method == 'linear' or isinstance(self.interp_method, list)): print("\nFilling missing values (nans) with 1NN") self._fillNans(grid.final_values[self.c_key]) elif self.interp_method == '1NN': self.YT[np.isnan(self.YT)] = -100 if (self.in_scaling is None) or (self.out_scaling is None): if self.interp_method == '1NN': self.in_scaling, self.out_scaling = (self._bestInScaling(grid.final_values[self.c_key]), ['none']*self.n_out) else: self.in_scaling, self.out_scaling = self._bestScaling(grid.final_values[self.c_key]) self.X_scaler = Scaler(self.in_scaling, self.XT[self.valid >= 0, :], grid.final_values[self.c_key][self.valid > 0]) self.Y_scaler = Scaler(self.out_scaling, self.YT[self.valid > 0, :], grid.final_values[self.c_key][self.valid > 0]) if self.class_method == "kNN": options = {'nfolds': 3, 'p_test': 0.05, 'nmax': 10} else: options = {} self.train_classifiers(grid, method=self.class_method, **options) self.train_interpolator( ic=grid.final_values[self.c_key])
[docs] def save(self, filename): """Save complete interpolation model. Parameters ---------- filename : str path/name of '.pkl' file where the model will be saved. """ SAVE_ATTRS = self.__dict__ DONT_SAVE = [] myattrs = {key: SAVE_ATTRS[key] for key in SAVE_ATTRS if key not in DONT_SAVE} with open(filename, 'wb') as f: pickle.dump(myattrs, f)
[docs] def load(self, filename): """Load interpolation model, which can only be used for predictions. Parameters ---------- filename : str path/name of pickle file to be loaded. """ with open(filename, 'rb') as f: myattrs = pickle.load(f) for key in myattrs: setattr(self, key, myattrs[key])
def _grid2array(self, grid): """Convert a PSyGrid to a numpy array. Parameters ---------- grid : PSyGrid a loaded PSyGrid object Returns ------- Two numpy arrays with the first being the input space and the second being the output space """ self.n_in, self.n_out = len(self.in_keys), len(self.out_keys) self.N = len(grid.initial_values[self.in_keys[0]]) X = np.empty((self.N, self.n_in)) Y = np.empty((self.N, self.n_out)) for i in range(self.n_in): X[:, i] = grid.initial_values[self.in_keys[i]] for i in range(self.n_out): Y[:, i] = grid.final_values[self.out_keys[i]] if self.interp_in_q: i_m1 = self.in_keys.index('star_1_mass') i_m2 = self.in_keys.index('star_2_mass') X[:, i_m2] = X[:, i_m2] / X[:, i_m1] return X, Y def _binary2array(self, binary): """Convert a binary instance to a numpy array. Parameters ---------- binary : BinaryStar An initialized instance of the Binary star class Returns ------- A binary as a numpy array """ var2 = binary.star_2.mass if self.interp_in_q: var2 = binary.star_2.mass / binary.star_1.mass Xt = np.array([[binary.star_1.mass, var2, binary.orbital_period]]) return Xt def _setValid(self, ic, X): """Set binary tracks as (in)valid depending on termination flags.""" valid = np.zeros(len(ic), dtype=int) for flag in ['not_converged', 'ignored_no_RLO', 'ignored_no_binary_history']: which = (ic == flag) valid[which] = -1 print(f"Discarded {np.sum(which)} binaries with " f"interpolation_class = [{flag}]") which = np.isnan(np.sum(X, axis=1)) valid[which] = -1 print(f"Discarded {np.sum(which)} binaries with nans in input values.") for i, flag in enumerate(self.valid_classes): valid[ic == flag] = i + 1 if(self.interp_in_q): # if HMS-HMS grid, take out q = 1 for i, iv in enumerate(X): if(iv[1] == 1): valid[i] = -1 return valid
[docs] def train_interpolator(self, ic=None): """Train the interpolator. Parameters ---------- ic : numpy array Contains the interpolation class for each track in the grid used for training the interpolator. """ ic = ic[self.valid > 0] if isinstance(self.interp_method, list) else None XTn = self.X_scaler.normalize(self.XT[self.valid > 0, :], ic) YTn = self.Y_scaler.normalize(self.YT[self.valid > 0, :], ic) if self.interp_method == "linear": self.interpolator = LinInterpolator() self.interpolator.train(XTn, YTn) elif self.interp_method == "1NN": self.interpolator = NNInterpolator() self.interpolator.train(XTn, YTn) elif isinstance(self.interp_method, list): self.interpolator = MC_Interpolator( self.classifiers[self.c_key], self.interp_classes, self.interp_method) self.interpolator.train(XTn, YTn, ic)
[docs] def test_interpolator(self, Xt): """Use the interpolator to approximate output vector. Parameters ---------- Xt : numpy array A list of input vectors for which the output vectors are to be approximated. Returns ------- Output space approximation as numpy array """ classes = self.test_classifier(self.c_key, Xt) if isinstance(self.interp_method, list): Xtn = self.X_scaler.normalize(Xt, classes) Ypredn = self.interpolator.predict(Xtn, classes, self.X_scaler) else: Xtn = self.X_scaler.normalize(Xt) Ypredn = self.interpolator.predict(Xtn) Ypredn = np.array([ list(sanitize_interpolated_quantities( dict(zip(self.out_keys, track)), self.constraints, verbose=False).values()) for track in self.Y_scaler.denormalize(Ypredn, classes) ]) return Ypredn
[docs] def train_classifiers(self, grid, method='kNN', **options): """Train the classifiers. Paramaters ---------- grid : PSyGrid The training grid method : string Specifies the classification method, default is kNN """ for key in self.classifiers: self.train_classifier(grid, key, method, **options)
[docs] def test_classifiers(self, Xt): """Use the classifiers. Parameters ---------- Xt : numpy array A list of input vectors which are to be classified. """ return {key: self.test_classifier(key, Xt) if self.classifiers[key] is not None else None for key in self.classifiers}
[docs] def train_classifier(self, grid, key, method='kNN', **options): """Train a specific classifier. Paramaters ---------- grid : PSyGrid The training grid method : string Specifies the classification method, default is kNN """ v = self.valid >= 0 wnn = grid.final_values[key] != 'None' if any(wnn[v]): if (method == 'kNN') or (method == '1NN'): self.classifiers[key] = KNNClassifier() else: raise ValueError("Wrong method name.") # If we want to exclude Nones as a class: # XTn = self.X_scaler.normalize(self.XT[v & wnn, :]) # y = grid.final_values[key][v & wnn] XTn = self.X_scaler.normalize(self.XT[v, :]) y = grid.final_values[key][v] uy = np.unique(y) print(f"\nTraining {method} classifier for {key} " f"[nclasses = {len(uy)}]...") for c in uy: print(f" - [{np.sum(y == c)}] {c}") if method == '1NN': self.classifiers[key].train(XTn, y, K=1) elif method == 'kNN': self.classifiers[key].xtrain(XTn, y)
[docs] def test_classifier(self, key, Xt): """Use just one specific classifier. Parameters ---------- key : string Name of the classifier Xt : numpy array A list of input vectors which are to be classified. Returns ------- Output space approximation as numpy array """ Xn = self.X_scaler.normalize(Xt) return self.classifiers[key].predict(Xn)
[docs] def test_classifier_prob(self, key, Xt): """Test the classifier. Use just one specific classifier to get the probabilities of the input being in a class Parameters ---------- key : string Name of the classifier Xt : numpy array A list of input vectors which are to be classified. Returns ------- Output space approximation as numpy array """ Xn = self.X_scaler.normalize(Xt) return self.classifiers[key].predict_prob(Xn)
[docs] def evaluate_mat(self, Xt): """Get the output vector approximation. Get the output vector approximation as well as all of the classifications given an input vector. Parameters ---------- Xt : numpy array A list of input vectors which are to be classified and for which output vectors are to be approximated. Returns ------- Output space approximations as numpy arrays """ if len(Xt.shape) == 1: Xt = Xt.reshape((1, -1)) if Xt.shape[1] != self.n_in: raise ValueError("Wrong dimensions. Xt should have as many " "columns as it was trained with.") # if binary classified as 'initial_MT', set numerical quantities to nan ynum, ycat = self.test_interpolator(Xt), self.test_classifiers(Xt) if self.class_method != '1NN': ynum[ycat[self.c_key] == 'initial_MT', :] = np.nan if self.interp_method == '1NN': ynum[ynum == - 100] = np.nan return ynum, ycat
[docs] def evaluate(self, binary, sanitization_verbose=False): """Get the output vector approximation. Get the output vector approximation as well as all of the classifications given a Binary Star. Parameters ---------- binary : BinaryStar A class which is an input vector which are to be classified and for which an output vector will be approximated. Returns ------- Output space approximation as a tuple containing two dictionary """ ynum, ycat = self.evaluate_mat(self._binary2array(binary)) for key in ycat: if ycat[key] is not None: ycat[key] = ycat[key][0] yt = dict(zip(self.out_keys, ynum.flatten())) # for a single binary yt.update(dict(zip(self.out_nan_keys, [np.nan] * len(self.out_nan_keys)))) # yt = sanitize_interpolated_quantities(yt, # verbose=sanitization_verbose) # yt = np.append(ynum, np.array([np.nan] * len(self.out_nan_keys))) return yt, ycat
def _interpIn_q(self, grid): """Find if the (regular) grid was uniformly sampled in M_2 or q. Parameters ---------- grid : posydon.grids.psygrid.PSyGrid Grid of binaries to convert to data matrix. Returns ------- bool True if the grid was sampled in q. """ m1 = grid.initial_values['star_1_mass'].copy() m2 = grid.initial_values['star_2_mass'].copy() # Make sure nans do not affect wnan = np.isnan(m1) | np.isnan(m2) m1, m2 = m1[~wnan], m2[~wnan] tol = 1 return (m2[m1 > 0.95 * m1.max()].min() / m2[m1 < 1.05 * m1.min()].min() > 1 + tol) def _bestInScaling(self, ic): """Find the best scaling for the input space.""" def in_scale_one(klass = None): in_scaling = [] # I assume inputs are positive-valued for i in range(self.n_in): dim = self.XT[:, i] if klass is None else self.XT[np.where(ic == klass)[0]] if (np.abs(np.mean(dim) - np.median(dim)) / np.std(dim) < np.abs(np.mean(np.log10(dim)) - np.median(np.log10(dim))) / np.std(np.log10(dim))): in_scaling.append('min_max') else: in_scaling.append('log_min_max') return in_scaling cin_scaling = in_scale_one() if isinstance(self.interp_method, list): in_scaling = {} for c in self.interp_classes: in_scaling[c] = in_scale_one(c) else: in_scaling = cin_scaling return (in_scaling, cin_scaling) def _bestScaling(self, ic, unique_in=True, nfolds = 5, p_test = 0.15): """Find the best scaling for both input and output space.""" in_scaling = self._bestInScaling(ic) def scale_one(in_scaling, klass = None): # if False, the scaling for the inputs will be output-dependent if unique_in: r = 1 else: r = 2 ** self.n_in inds = np.where(self.valid > 0 if klass is None else (ic == klass) & (self.valid > 0))[0] err = np.nan * np.ones((r * 2, self.n_out, nfolds)) which_abs = np.abs(self.YT[inds, :]).min(axis=0) == 0 out_scalings = [['min_max'] * self.n_out, ['log_min_max'] * self.n_out] for i, key in enumerate(self.out_keys): y = self.YT[inds, i] if np.nanmin(y) == np.nanmax(y): out_scalings[0][i] = 'none' out_scalings[1][i] = 'none' elif np.nanmax(y) < 0: out_scalings[1][i] = 'neg_log_min_max' elif np.nanmin(y) <= 0: out_scalings[1][i] = 'min_max' XT = self.XT[inds, :] YT = self.YT[inds, :] for j in range(2): # normal and log when possible for i in range(r): # valid also with metallicity included if r > 1: in_scaling = [ 'min_max' if i % (2 ** (j + 1)) < 2 ** j else 'log_min_max' for j in range(self.n_in) ] xs, ys = (MatrixScaler(in_scaling[1], XT), # using class cumulative scaling (not per class) to get optimal MatrixScaler(out_scalings[j], YT)) XTn, YTn = xs.normalize(XT), ys.normalize(YT) np.random.seed(0) for fold in range(nfolds): iTrain, itest = xval_indices(inds.shape[0], percent_test=p_test) X_T, Y_T = XTn[iTrain, :], YTn[iTrain, :] X_t, Y_t = XT[itest, :], YT[itest, :] interp = LinearNDInterpolator(X_T, Y_T) ypred_i = ys.denormalize(interp(xs.normalize(X_t))) err[i + r * j, ~which_abs, fold] = np.nanpercentile( np.abs((ypred_i[:, ~which_abs] - Y_t[:, ~which_abs]) / Y_t[:, ~which_abs]), 90, axis=0) err[i + r * j, which_abs, fold] = np.nanpercentile( np.abs(ypred_i[:, which_abs] - Y_t[:, which_abs]), 90, axis=0) no_nan_slices = np.isfinite(np.nanmean(err, axis = 2)).all(axis = 0) where_min = np.full(shape = (self.n_out, ), fill_value = np.nan) where_min[no_nan_slices] = np.nanargmin(np.nanmean(err, axis=2)[:, no_nan_slices], axis=0) out_scaling = [] for i in range(self.n_out): if where_min[i] < r or np.isnan(where_min[i]): out_scaling.append(out_scalings[0][i]) else: out_scaling.append(out_scalings[1][i]) where_min[i] -= r return out_scaling cout_scaling = scale_one(in_scaling) if isinstance(self.interp_method, list): out_scaling = {} for c in self.interp_classes: out_scaling[c] = scale_one(in_scaling, c) else: out_scaling = cout_scaling return in_scaling, (out_scaling, cout_scaling) def _fillNans(self, ic): """Fill nan values i numerical magnitudes with 1NN.""" for i in range(self.n_out): wnan = np.isnan(self.YT[:, i]) | np.isinf(self.YT[:, i]) if any(np.isinf(self.YT[:, i])): print(f"inftys: {np.sum(np.isinf(self.YT[:, i]))}, " f"{self.out_keys[i]}") if any(wnan[self.valid > 0]): k1r = KNeighborsRegressor(n_neighbors=1) wT = (~wnan) & (self.valid > 0) if wT.any(): xs = MatrixScaler(self._bestInScaling(ic)[1], self.XT[self.valid >= 0, :]) k1r.fit(xs.normalize(self.XT[wT, :]), self.YT[wT, i]) wt = wnan & (self.valid > 0) self.YT[wt, i] = k1r.predict(xs.normalize(self.XT[wt, :])) else: wt = wnan & (self.valid > 0) self.YT[wt, i] = 0.0 self.out_nan_keys.append(self.out_keys[i])
# BASE INTERPOLATORS
[docs] class Interpolator: """Class used as a shell parent class for all Interpolators.""" def __init__(self): """Initialize the Interpolator.""" self.interpolator = None self.D = None
[docs] def train(self, XT, YT): """Train the interpolator. Parameters ---------- XT : numpy array List of input vectors YT : numpy array List of corresponding output vectors """ self.D = XT.shape[1]
[docs] def predict(self, Xt): """Interpolate and approximate output vectors given input vectors. Parameters ---------- XT : numpy array List of input vectors """ if self.interpolator is None: raise RuntimeError("Train Interpolator first.") if Xt.shape[1] != self.D: raise ValueError("Wrong input dimension.")
[docs] def train_error(self, XT, YT): """Calculate approximation error given testing data. Parameters ---------- XT : numpy array List of input vectors YT : numpy array List of corresponding output vectors """ error = np.abs(YT - self.predict(XT)) print("\nMean Abs. Train Error:") print(error.mean(axis=0))
[docs] class NNInterpolator(Interpolator): """Class implementing Nearest Neighbor interpolation."""
[docs] def train(self, XT, YT): """Train the interpolator. Parameters ---------- XT : numpy array List of input vectors YT : numpy array List of corresponding output vectors """ super().train(XT, YT) self.interpolator = KNeighborsRegressor(n_neighbors=1) self.interpolator.fit(XT, YT)
# self.train_error(XT, YT)
[docs] def predict(self, Xt, scaler = None, klass = None): """Interpolate and approximate output vectors given input vectors. Parameters ---------- XT : numpy array List of input vectors scaler : #TODO klass : #TODO Returns ------- Output space approximation as numpy array """ super().predict(Xt) return self.interpolator.predict(Xt)
[docs] class LinInterpolator(Interpolator): """Class implementing Linear Interpolation."""
[docs] def train(self, XT, YT): """Train the interpolator. Parameters ---------- XT : numpy array List of input vectors YT : numpy array List of corresponding output vectors """ super().train(XT, YT) self.interpolator = [LinearNDInterpolator(XT, YT)] OoH = KNeighborsRegressor(n_neighbors=1) OoH.fit(XT, YT) self.interpolator.append(OoH)
# self.train_error(XT, YT)
[docs] def predict(self, Xt, scaler = None, klass = None): """Interpolate and approximate output vectors given input vectors. Parameters ---------- XT : numpy array List of input vectors scaler : #TODO klass : #TODO Returns ------- Output space approximation as numpy array """ super().predict(Xt) Ypred = self.interpolator[0](Xt) wnan = np.isnan(Ypred[:, 0]) # 1NN interpolation for binaries out of hull if np.any(wnan): Ypred[wnan, :] = self.interpolator[1].predict(Xt[wnan, :]) dists, _ = self.interpolator[1].kneighbors(Xt[wnan, :]) max_distance = dists.argmax() # only finding information for furthest nearest neighbor neighbors = scaler.denormalize(self.interpolator[1]._tree.data.base) # unnormalizing max_distance_point = scaler.denormalize(Xt[[max_distance]]) nearest_neighbor = np.sum(np.square(neighbors - max_distance_point), axis = 1) nearest_neighbor = nearest_neighbor.argsort()[0] Pwarn(f"1NN interpolation used for {np.sum(wnan)} " f"binaries out of hull. Parameter-wise distance (Unnormalized) for point with" f" maximum out-of-hull euclidian distance (Normalized): {np.abs(neighbors[nearest_neighbor] - max_distance_point[0])}", "InterpolationWarning") return Ypred
[docs] class MC_Interpolator: """Class implementing class-wise interpolation.""" def __init__(self, classifier, classes, methods): """Initialize the class-wise interpolation. classifier : KNNClassifier The classifier that is used to classify input vectors classes : List of strings The classes in question methods : List of strings The methods to be used to interpolate between each class of tracks """ self.interpolators = [None] * len(classes) self.D = classifier.D self.classifier = classifier self.classes = [] for c in classes: if isinstance(c, list): self.classes.append(c) else: self.classes.append([c]) self.M = None if not isinstance(methods, list): self.methods = [methods] * len(self.classes) else: if len(methods) != len(self.classes): raise ValueError("Number of interpolators must match " "number of classes.") self.methods = methods for i, method in enumerate(self.methods): if method == "linear": self.interpolators[i] = LinInterpolator() elif method == "1NN": self.interpolators[i] = NNInterpolator() else: raise AttributeError( "Valid interpolation methods are 'linear' and '1NN'.")
[docs] def train(self, XT, YT, z): """Train the interpolator. Parameters ---------- XT : numpy array List of input vectors YT : numpy array List of corresponding output vectors z : #TODO """ self.M = YT.shape[1] for i in range(len(self.classes)): which = np.zeros_like(z, dtype=bool) for j in range(len(self.classes[i])): which += z == self.classes[i][j] self.interpolators[i].train(XT[which, :], YT[which, :])
[docs] def classifier(self, Xt): return self.classifier.predict(Xt)
[docs] def predict(self, Xt, zpred, scaler = None): """Interpolate and approximate output vectors given input vectors. Parameters ---------- XT : numpy array List of input vectors zpred : #TODO scaler : #TODO Returns ------- Output space approximation as numpy array """ Ypred = np.ones((Xt.shape[0], self.M)) * np.nan for i in range(len(self.classes)): which = np.zeros_like(zpred, dtype=bool) for j in range(len(self.classes[i])): which += zpred == self.classes[i][j] if which.any(): Ypred[which, :] = self.interpolators[i].predict(Xt[which, :], scaler = scaler, klass = self.classes[i]) return Ypred
# BASE CLASSIFIERS
[docs] class Classifier: """Class used as a shell parent class for all classifiers.""" def __init__(self): """Initialize the Classifier.""" self.classifier = None self.D = None self.labels = None self.method = None
[docs] def train(self, XT, yT): """Train the classifier. Parameters ---------- XT : numpy array List of input vectors YT : numpy array List of corresponding classes """ assert XT.shape[0] == len(yT) self.D = XT.shape[1]
[docs] def predict(self, Xt): """Classify and approximate classes given input vectors. Parameters ---------- XT : numpy array List of input vectors Returns ------- Output space approximation as numpy array """ if self.classifier is None: raise RuntimeError("Train Classifier first.") if Xt.shape[1] != self.D: raise ValueError("Wrong input dimension.")
[docs] def predict_prob(self, Xt): """Classify and get probability of input vector belonging to any class. Parameters ---------- XT : numpy array List of input vectors Returns ------- Output space approximation as numpy array """ if self.classifier is None: raise RuntimeError("Train Classifier first.") if Xt.shape[1] != self.D: raise ValueError("Wrong input dimension.")
[docs] def train_error(self, XT, yT): """Calculate approximation error given testing data. Parameters ---------- XT : numpy array List of input vectors YT : numpy array List of corresponding classes """ error = np.sum(yT == self.predict(XT)) print("\nOverall Train Accuracy:") print(error / XT.shape[0])
[docs] class KNNClassifier(Classifier): """Class implementing K - Nearest Neighbors classifier."""
[docs] def train(self, XT, yT, K=5): """Train the classifier. Parameters ---------- XT : numpy array List of input vectors YT : numpy array List of corresponding classes K : #TODO """ super().train(XT, yT) self.classifier = KNeighborsClassifier(n_neighbors=K, weights='distance') self.classifier.fit(XT, yT) self.labels = self.classifier.classes_ self.method = str(K)+'-NN'
# self.train_error(XT, yT)
[docs] def predict(self, Xt): """Classify and approximate classes given input vectors. Parameters ---------- XT : numpy array List of input vectors Returns ------- Output space approximation as numpy array """ super().predict(Xt) return self.classifier.predict(Xt)
[docs] def predict_prob(self, Xt): """Classify and get probability of input vector belonging to any class. Parameters ---------- XT : numpy array List of input vectors Returns ------- Output space approximation as numpy array """ super().predict_prob(Xt) return self.classifier.predict_proba(Xt)
[docs] def xtrain(self, XT, yT, **opts): """Perform cross validation to find optimal k to use in classification. Parameters ---------- XT : numpy array List of input vectors YT : numpy array List of corresponding classes """ nfolds = opts.get('nfolds', 10) p_test = opts.get('p_test', 0.1) nmax = opts.get('nmax', 20) if len(np.unique(yT)) > 5: print("Too many classes for xval training. Set K=2.") n_opt = 2 else: acc = np.zeros(nmax) for n in range(1, nmax + 1): for i in range(nfolds): iTrain, itest = xval_indices( XT.shape[0], percent_test=p_test, labels=yT) Xi, yi = XT[iTrain, :], yT[iTrain] ki = KNeighborsClassifier(n_neighbors=n, weights='distance') ki.fit(Xi, yi) acc[n - 1] += balanced_accuracy_score( yT[itest], ki.predict(XT[itest, :])) / nfolds n_opt = np.argmax(acc) + 1 if n_opt < 3: # k must be at least 3 n_opt = 3 self.train(XT, yT, K=n_opt)
# MATRIX SCALING
[docs] class Scaler: def __init__(self, norms, XT, ic): if norms[0] == norms[1]: self.scaler = { None: MatrixScaler(norms[0], XT) } else: self.scaler = {} for klass, n in norms[0].items(): self.scaler[klass] = MatrixScaler(n, XT[np.where(ic == klass)[0]]) # need to only use relevant classes in XT self.scaler[None] = MatrixScaler(norms[1], XT)
[docs] def normalize(self, X, klass = None): if klass is None: return self.scaler[klass].normalize(X) else: normalized = X.copy() classes = np.unique(klass) for c in classes: inds = np.where(klass == c)[0] c = None if c == "None" else c if c not in self.scaler.keys(): Pwarn(f"normalization was skipped during interpolation: c={c}, inds={inds}", "InterpolationWarning") continue normalized[inds] = self.scaler[c].normalize(X[inds]) return normalized
[docs] def denormalize(self, Xn, klass = None): if klass is None: return self.scaler[klass].denormalize(Xn) else: normalized = Xn classes = np.unique(klass) for c in classes: inds = np.where(klass == c)[0] c = None if c == "None" else c if c not in self.scaler.keys(): Pwarn(f"de-normalization was skipped during interpolation: c={c}, inds={inds}", "InterpolationWarning") continue normalized[inds] = self.scaler[c].denormalize(Xn[inds]) return normalized
[docs] class MatrixScaler: """Class used to scale input and output space.""" def __init__(self, norms, XT): """Initialize the Scaler with desired scalings.""" if len(norms) != XT.shape[1]: raise ValueError("The number of columns in XT must be equal " "to the length of norms.") self.N = XT.shape[1] self.scalers = [] for i in range(self.N): self.scalers.append(DataScaler()) which = ~np.isnan(XT[:, i]) self.scalers[i].fit(XT[which, i], method=norms[i])
[docs] def normalize(self, X): """Scale input X.""" assert X.shape[1] == self.N Xn = np.empty_like(X) for i in range(self.N): Xn[:, i] = self.scalers[i].transform(X[:, i]) return Xn
[docs] def denormalize(self, Xn): """Unscale input X.""" assert Xn.shape[1] == self.N X = np.empty_like(Xn) for i in range(self.N): X[:, i] = self.scalers[i].inv_transform(Xn[:, i]) return X
# MISCELLANEOUS FUNCTIONS
[docs] def xval_indices(N, percent_test=0.15, labels=None): """Perform Monte Carlo Cross Validation; stratified if labels are provided. Parameters ---------- N : int number of samples in the data set. percent_test : float Percentage of samples to be in the test sets. (0 < percent_test < 0.5) labels : #TODO Returns ------- (np.ndarray, np.ndarray) 1D int vectors containing the indices for train and test splits, respectively. """ if labels is None: indices = np.random.permutation(N) NT = int(np.round(N * (1 - percent_test))) iT, it = indices[:NT], indices[NT:] else: assert len(labels) == N indices = np.arange(N) iT, it = np.empty(0, dtype=int), np.empty(0, dtype=int) for label in np.unique(labels): ind_l = indices[labels == label] NT_l = int(np.round(len(ind_l) * (1 - percent_test))) iT = np.hstack((iT, ind_l[:NT_l])) it = np.hstack((it, ind_l[NT_l:])) np.random.shuffle(iT), np.random.shuffle(it) return iT, it
# PERFORMANCE ASSESSMENT
[docs] def train_and_assess(grid_train, grid_test, ux2, path, classes): """Train interpolators and classes and create assesment plots. Parameters ---------- grid_train : string Path to training grid grid_test : string Path to testing grid ux2 : list of floats List containing the slices at which plots are to be generated. Slices are the 3rd variable in the input space, that is the mass of the second star for CO-HeMS and CO-HMS_RLO grids and the mass ratio for the HMS-HMS grid path : string The path where assesment plots will be saved classes : list of strings List containing the classes to be used in the MC-Interpolator """ # check folders exist path2obj = os.path.join(path, 'interpolation_objects') if not os.path.isdir(path2obj): os.mkdir(path2obj) path2figs = os.path.join(path, "plots") if not os.path.isdir(path2figs): os.mkdir(path2figs) os.mkdir(os.path.join(path2figs, 'classif')) os.mkdir(os.path.join(path2figs, 'interp')) else: if not os.path.isdir(os.path.join(path2figs, 'classif')): os.mkdir(os.path.join(path2figs, 'classif')) if not os.path.isdir(os.path.join(path2figs, 'interp')): os.mkdir(os.path.join(path2figs, 'interp')) grid_T = PSyGrid() grid_T.load(grid_train) grid_t = PSyGrid() grid_t.load(grid_test) dat = date.today().strftime("%y%m%d") interp_method, class_method = "linear", "kNN" mlin = IFInterpolator(grid=grid_T, interp_method=interp_method, class_method=class_method) name = "IF_" + dat + '_' + interp_method + "_" + class_method + ".pkl" mlin.save(os.path.join(path2obj, name)) interp_method, class_method = "1NN", "1NN" m1NN = IFInterpolator(grid=grid_T, interp_method=interp_method, class_method=class_method) name = "IF_" + dat + '_' + interp_method + "_" + class_method + ".pkl" m1NN.save(os.path.join(path2obj, name)) interp_method, class_method = "linear", "kNN" # classes = ['no_MT', 'stable_MT', 'unstable_MT', 'stable_reverse_MT'] mMC = IFInterpolator(grid=grid_T, interp_method=interp_method, class_method=class_method, interp_classes=classes) interp_method = "linear3c" name = "IF_" + dat + "_" + interp_method + "_" + class_method + ".pkl" mMC.save(os.path.join(path2obj, name)) assess_models([m1NN, mlin, mMC], grid_T, grid_t, ux2, path=path2figs) return
[docs] def assess_models(models, grid_T, grid_t, ux2, path='./'): """Assess models using a grid. Parameters ---------- models : psi.Interpolator or list of psi.Interpolator Models to be evaluated. grid_T : psg.PSyGrid Train grid grid_t : psg.PSyGrid Test grid ux2 : #TODO path : #TODO """ if not isinstance(models, list): models = [models] path2prmaps = os.path.join(path, 'classif') path2interp = os.path.join(path, 'interp') # Classes on which the interpolator was trained uic = np.unique( grid_T.final_values['interpolation_class'][models[0].valid >= 0]) # Test binaries Xt, Yt = models[0]._grid2array(grid_t) validt = models[0]._setValid(grid_t.final_values['interpolation_class'], Xt) Ytpred, Ztpred = [], [] interp_methods, class_methods = [], [] for m in models: ynum, ycat = m.evaluate_mat(Xt[validt >= 0, :]) Ytpred.append(ynum) Ztpred.append(ycat) class_methods.append(m.class_method) if isinstance(m.interp_method, str): interp_methods.append(m.interp_method) else: interp_methods.append('-'.join(m.interp_method)) # Assess classification results print("\n\n#### Classification ######################################") for key in models[0].classifiers: print(f"\n Classification for [{key}]:") if models[0].classifiers[key] is not None: for c in uic: f = grid_T.final_values['interpolation_class'] == c print( f"\t{c}: [{np.sum(grid_T.final_values[key][f] == 'None')}" f"/{np.sum(f)}] NaNs") # Ground Truth zt_gt = grid_t.final_values[key][validt >= 0] # Confusion Matrices for i, m in enumerate(models): print(f"\n\t {m.classifiers[key].method}") bas = balanced_accuracy_score(zt_gt, Ztpred[i][key]) print(f"\t Balanced Accuracy Score: {100 * bas:.3f}") oa = np.sum(Ztpred[i][key] == zt_gt) / len(zt_gt) print(f"\t Overall Accuracy: {100 * oa:.3f}") CM = confusion_matrix(zt_gt, Ztpred[i][key], m.classifiers[key].labels) savename = os.path.join( path2prmaps, key + '_' + class_methods[i] + '.png') fig, ax = plot_conf_matrix( 100 * CM, m.classifiers[key].method, key, m.classifiers[key].labels, savename=savename) plt.close(fig) ZT = grid_T.final_values[key][m.valid >= 0] zt_gt = grid_t.final_values[key][validt >= 0] fig, ax = plot_mc_classifier( m, key, m.XT[m.valid >= 0, :], ZT, ux2, Xt=Xt[validt >= 0], zt=zt_gt, zt_pred=Ztpred[i][key], path=path2prmaps) else: print("\tNaN") # Assess interpolation results print("\n\n#### Interpolation ######################################") for m in models: plot_interpolation(m, ['star_1_mass', 'period_days'], ux2[2:4], ux2, scales=['log', 'log'], path=path2interp) Ygt = Yt[validt >= 0, :] err_r, err_a = [], [] for i in range(len(models)): err_a.append(np.abs(Ytpred[i] - Ygt)) err_r.append(err_a[i] / (np.abs(Ygt))) # Analize regression errors # Nt = np.sum(validt >= 0) ic_gt = grid_t.final_values['interpolation_class'][validt >= 0] w_interp = ic_gt != 'initial_MT' YT = models[0].YT[models[0].valid > 0, :] keys = models[0].out_keys for ind in range(len(keys)): print("\n==================================================") print(f"{ind}: {keys[ind]}") print("==================================================\n") print(" e_r " " e_a") print(" ============================================" " ============================================") w_sMT, w_uMT, w_nMT = [], [], [] for i, m in enumerate(models): normi = m.out_scaling[ind] ic_t = Ztpred[i]['interpolation_class'] correct = w_interp == (ic_t != 'initial_MT') w_sMT.append(((ic_gt == 'stable_MT') & (ic_t == 'stable_MT'))) w_uMT.append(((ic_gt == 'unstable_MT') & (ic_t == 'unstable_MT'))) w_nMT.append(((ic_gt == 'no_MT') & (ic_t == 'no_MT'))) #TODO: add interpolation class "stable_reverse_MT" w = [w_sMT, w_uMT, w_nMT, correct] percent = [50, 90] tab = np.zeros((len(percent), 2 * len(w))) for k in range(4): for l in range(len(percent)): tab[l, k] = np.nanpercentile(err_r[i][w[k][i], ind], percent[l]) tab[l, k + 4] = np.nanpercentile(err_a[i][w[k][i], ind], percent[l]) print(f"classif.: [{class_methods[i]}] --- " f"interp: [{interp_methods[i]}] --- scaling: [{normi}]") print(" stable MT unstable MT no MT total" " stable MT unstable MT no MT total") for l in range(len(percent)): print(f" p{str(percent[l])}\t{tab[l, 0]:.2e}\t{tab[l, 1]:.2e}" f"\t{tab[l, 2]:.2e}\t{tab[l, 3]:.2e}" f"\t\t{tab[l, 4]:.2e}\t{tab[l, 5]:.2e}\t{tab[l, 6]:.2e}" f"\t{tab[l, 7]:.2e}") print(f"range (training samples): [{YT[:, ind].min()}, " f"{YT[:, ind].max()}]") plot_interp_error(Ygt, Ytpred, w[0:3], interp_methods, keys, path=path2interp) return
[docs] def analize_nans(out_keys, YT, valid): """Find nans in input numerical variables.""" print("\nNans in numerical variables:") for i, key in enumerate(out_keys): n = [] for v in range(4): n.append(np.sum(np.isnan(YT[valid == v, i]))) if np.sum(np.array(n)) > 0: print(f"[i_MT] = {n[0]:5d}, [no_MT] = {n[1]:5d}, " f"[st_MT] = {n[2]:5d}, [u_MT] = {n[3]:5d} : {i:3d} {key}") n = [] for v in range(4): n.append(np.sum(valid == v)) print(f"\n[i_MT] = {n[0]:5d}, [no_MT] = {n[1]:5d}, [st_MT] = {n[2]:5d}, " f"[u_MT] = {n[3]:5d}")
[docs] def analize_nones(classifiers, valid, grid): """Find None values in input categorical variables.""" print("\nNones in categorical variables:") for key in classifiers: n = [] y = grid.final_values[key] for v in range(4): n.append(np.sum(y[valid == v] == 'None')) if np.sum(np.array(n)) > 0: print(f"[i_MT] = {n[0]:5d}, [no_MT] = {n[1]:5d}, " f"[st_MT] = {n[2]:5d}, [u_MT] = {n[3]:5d} : {key}") n = [] for v in range(4): n.append(np.sum(valid == v)) print(f"\n[i_MT] = {n[0]:5d}, [no_MT] = {n[1]:5d}, " f"[st_MT] = {n[2]:5d}, [u_MT] = {n[3]:5d}")
# PLOTTING FUNCTIONS
[docs] def heatmap(CM, ax, **heat_kwargs): """Plot confusion matrix as heatmap.""" lw = heat_kwargs.pop('linewidth', 1.5) ax.matshow(CM, **heat_kwargs) mi, ma = CM.flatten().min(), CM.flatten().max() for i in range(CM.shape[0]): for j in range(CM.shape[1]): col = 'w' if CM[i, j] > (0.7 * (ma - mi) + mi): col = 'k' ax.text(x=j, y=i, s=str(round(CM[i, j], 2)), va='center', ha='center', size='large', c=col) for i in range(CM.shape[0] - 1): for j in range(CM.shape[1] - 1): ax.plot([i + 0.5] * 2, [-0.5, CM.shape[1] - 0.5], 'w', linewidth=lw) ax.plot([-0.5, CM.shape[0] - 0.5], [i + 0.5] * 2, 'w', linewidth=lw)
CONFMAT_KWARGS = { 'cmap': 'cividis', 'linewidth': 1.5, }
[docs] def plot_conf_matrix(CM, method, varname, labels, savename=None, **kwargs): """Plot confusion matrix for classification assessment.""" heat_kwargs = {} for arg in CONFMAT_KWARGS: heat_kwargs[arg] = kwargs.get(arg, CONFMAT_KWARGS[arg]) fig, ax = plt.subplots(1, 1, figsize=(len(labels),) * 2) s = np.sum(CM, axis=1)[:, np.newaxis] s[s == 0] = 1 heatmap(100 * CM / s, ax, **heat_kwargs) lmax = 0 for lab in labels: llab = len(lab) if llab > lmax: lmax = llab labels_short = [lab if len(lab) < 12 else lab[:11] for lab in labels] alpha = 0 if lmax > 9: alpha = 20 ax.xaxis.tick_top() plt.xticks(np.arange(len(labels)), labels_short, rotation=alpha) plt.yticks(np.arange(len(labels)), labels_short, rotation=90 - alpha, va='center') ax.set(title='predicted class', xlabel=method + ' -- ' + varname, ylabel='actual class') if lmax > 11: handles = [Line2D([0], [0], marker=r'$\mathcal{C}_' + str(i) + '$', color='w', label=str(s[i]) + ' ' + labels[i], markerfacecolor='k', markeredgecolor='None', markersize=11) for i in range(len(labels))] plt.legend(handles, labels, bbox_to_anchor=(1.05, 1), loc='upper left', borderaxespad=0.) if savename is not None: plt.savefig(savename, bbox_inches='tight') return fig, ax
PLT_CLASS_KWARGS = { 'N': 150, 'figsize': (7, 5), 'tight_layout': True, 'dpi': 150, 's': 10, # marker size 'linewidths': 0.05 }
[docs] def plot_mc_classifier(m, key, XT, zT, ux2, Xt=None, zt=None, zt_pred=None, path=None, **pltargs): """Plot slices illustrating decision boundaries and classification prob.""" N = pltargs.pop('N', PLT_CLASS_KWARGS['N']) marker_size = pltargs.pop('s', PLT_CLASS_KWARGS['s']) fig_kwargs = {} sct_kwargs = {} for arg in PLT_CLASS_KWARGS: if arg in ['figsize', 'dpi', 'tight_layout']: fig_kwargs[arg] = pltargs.get(arg, PLT_CLASS_KWARGS[arg]) elif arg in ['s', 'linewidths']: sct_kwargs[arg] = pltargs.get(arg, PLT_CLASS_KWARGS[arg]) classes = m.classifiers[key].labels mycolors = ['tab:blue', 'tab:orange', 'tab:green', 'tab:red', 'tab:purple', 'tab:brown', 'tab:pink', 'tab:olive', 'tab:cyan', 'tab:gray', 'lime', 'm', 'aqua', 'bisque', 'salmon', 'gold', 'palegreen', 'mistyrose', 'yellow', 'darkviolet'] MAP = ListedColormap(mycolors[:len(classes)]) for i in range(len(ux2)): fig, ax = plt.subplots(**fig_kwargs) # Which training and test binaries belong to the ith slice whichT = is_in_ball(XT[:, 1], ux2[i], ux2, m.in_scaling[1].startswith('log')) xx, yy, X = test_mesh(XT[whichT, :], N) proba = (m.test_classifier_prob(key, X).max(axis=1)).reshape(xx.shape) zpred = m.test_classifier(key, X).reshape(xx.shape) for k, c in enumerate(classes): zpred[zpred == c] = k _ = ax.contourf(xx, yy, zpred.astype(float), alpha=.6, cmap=MAP) contf = ax.contourf(xx, yy, proba, alpha=0.3, cmap='gist_gray') fig.colorbar(contf) for k, c in enumerate(classes): color = [MAP(k)] wk = zT[whichT] == c ax.scatter(XT[whichT, 0][wk], XT[whichT, 2][wk], c=color, marker='.', **sct_kwargs) if Xt is not None: whicht = is_in_ball(Xt[:, 1], ux2[i], ux2, m.in_scaling[1].startswith('log')) for k, c in enumerate(classes): color = [MAP(k)] wk = zt[whicht] == c ax.scatter(Xt[whicht, 0][wk], Xt[whicht, 2][wk], c=color, marker='*', s=1.5 * marker_size, linewidths=0.4) which_miss = whicht & (zt != zt_pred) ax.scatter(Xt[which_miss, 0], Xt[which_miss, 2], marker='*', s=1.5 * marker_size, label='missclassified', facecolor='None', edgecolor='k', linewidths=0.4) ax.set_xscale('log') ax.set_yscale('log') if m.interp_in_q: var2 = DEFAULT_LABELS['mass_ratio'][0] else: var2 = DEFAULT_LABELS['star_2_mass'][0] ax.set(xlabel=DEFAULT_LABELS['star_1_mass'][1], ylabel=DEFAULT_LABELS['period_days'][1]) ax.set(title=key + ' (' + var2 + ' = ' + str(round(ux2[i], 2)) + ')') legend_elements = [ Line2D([0], [0], marker='.', color='None', label='train binary', markerfacecolor='k', markeredgecolor='k', markersize=0.5 * marker_size, mew=0.4), Line2D([0], [0], marker='*', color='None', label='test binary', markerfacecolor='gray', markeredgecolor='None', markersize=0.75 * marker_size, mew=0.4), Line2D([0], [0], marker='*', color='None', label='misclassified', markerfacecolor='gray', markeredgecolor='k', markersize=0.75 * marker_size, mew=0.4)] leg1 = plt.legend(handles=legend_elements, loc='lower right', ncol=3, fontsize='small') legend_el_class = [ Patch(facecolor=mycolors[j], edgecolor='k', label=classes[j]) for j in range(len(classes)) ] plt.legend(handles=legend_el_class, loc='upper right', fontsize='small') ax.add_artist(leg1) if path is not None: savename = os.path.join(path, key + '_' + m.classifiers[key].method + '_' + str(round(ux2[i], 2)) + '.png') fig.savefig(savename) plt.close(fig) return fig, ax
[docs] def is_in_ball(x, x0, ux, log): """Helper function for plot_mc_classifier.""" if log: d = np.diff(np.log10(ux))[0] / 2 w = (np.log10(x) > np.log10(x0) - d) & (np.log10(x) < np.log10(x0) + d) else: d = np.diff(ux)[0] / 2 w = (x > x - d) & (x < x + d) return w
[docs] def test_mesh(XT, N): """Helper function for plot_mc_classifier creating mesh for test data.""" a, b = np.log10(XT[:, 0].min()), np.log10(XT[:, 0].max()) delta = (b - a) / 50 x_min, x_max = 10 ** (a - delta), 10 ** (b + delta) a, b = np.log10(XT[:, 2].min()), np.log10(XT[:, 2].max()) delta = (b - a) / 50 # 50 is around 2 times the # of points in the axis y_min, y_max = 10 ** (a - delta), 10 ** (b + delta) xx, yy = np.meshgrid(np.logspace(np.log10(x_min), np.log10(x_max), N), np.logspace(np.log10(y_min), np.log10(y_max), N)) X = np.vstack((xx.ravel(), XT[0, 1] * np.ones(N ** 2), yy.ravel())).T return xx, yy, X
# Default arguments for matplotlib PLT_INTERP_KWARGS = { 'N': 300, 'figsize': (7, 5), 'tight_layout': True, 'dpi': 150, 's': 10, # marker size 'linewidths': 0.05 }
[docs] def plot_interpolation(m, keys, v2, ux2, scales=None, path=None, **pltargs): """Plot the interpolation errors by key. Parameters ---------- m : IFInterpolator A trained instance of the IFInterpolator keys : List of strings Variables for which interpolation errors will be plotted v2 : #TODO ux2 : #TODO scales : #TODO path : #TODO """ N = pltargs.pop('N', PLT_INTERP_KWARGS['N']) # marker_size = pltargs.pop('s', PLT_INTERP_KWARGS['s']) fig_kwargs = {} sct_kwargs = {} for arg in PLT_INTERP_KWARGS: if arg in ['figsize', 'dpi', 'tight_layout']: fig_kwargs[arg] = pltargs.get(arg, PLT_INTERP_KWARGS[arg]) elif arg in ['s', 'linewidths']: sct_kwargs[arg] = pltargs.get(arg, PLT_INTERP_KWARGS[arg]) keys = list(keys) XT = m.XT[m.valid >= 0, :] for v in v2: # Which training and test binaries belong to the ith slice whichT = is_in_ball(XT[:, 1], v, ux2, m.in_scaling[1].startswith('log')) xx, yy, X = test_mesh(XT[whichT, :], N) ynum, ycat = m.evaluate_mat(X) for i, key in enumerate(keys): fig, ax = plt.subplots(**fig_kwargs) Ct = ynum[:, m.out_keys.index(key)].reshape(xx.shape) clab = DEFAULT_LABELS[key][0] if scales is not None: if scales[i] == 'log': Ct = np.log10(Ct) clab = DEFAULT_LABELS[key][1] c = ax.pcolormesh(xx, yy, Ct, cmap='viridis') cbar = fig.colorbar(c, ax=ax) cbar.set_label(clab) ax.set_xscale('log') ax.set_yscale('log') if m.interp_in_q: var2 = DEFAULT_LABELS['mass_ratio'][0] else: var2 = DEFAULT_LABELS['star_2_mass'][0] ax.set(xlabel=DEFAULT_LABELS['star_1_mass'][1], ylabel=DEFAULT_LABELS['period_days'][1]) ax.set(title=var2 + ' = ' + str(round(v, 2))) if path is not None: if not isinstance(m.interp_method, list): method = m.interp_method else: method = '-'.join(m.interp_method) name = (key + '_' + m.classifiers['interpolation_class'].method + '_' + method + '_' + str(round(v, 2)) + '.png') name = os.path.join(path, name) fig.savefig(name) plt.close(fig)
[docs] def plot_interp_error(Ygt, Ys, ind_MT, methods, keys, path='.'): """Make violin plots for the error distributions of each of the variables. Parameters ---------- Ygt : numpy array Ground truth of the final values of testing tracks Ys : numpy array Model's approximation of the final values of testing tracks ind_MT : #TODO methods : List of strings List that indicates the interpolation methods keys : List of strings List indicating for which variables error distributions will be plotted path : #TODO """ labels = ['stable MT', 'unstable MT', 'no MT'] col_lab = ['tab:purple', 'tab:olive', 'tab:pink'] colors = ['tab:blue', 'tab:orange', 'tab:green', 'tab:red', 'tab:purple'] errors = ['$e_d (e_a/range)$', '$e_r$', '$e_a$'] for ind in range(len(keys)): for er in errors: fig, axs = plt.subplots(1, 2, figsize=(12, 4)) for j, m in enumerate(methods): absdif = np.abs(Ygt[:, ind] - Ys[j][:, ind]) if er == '$e_a/range$': e_d = absdif / (np.nanmax(Ygt[:, ind]) - np.nanmin(Ygt[:, ind])) elif er == '$e_r$': e_d = absdif / (np.abs(Ygt[:, ind]) + 1e-16) else: e_d = absdif col = colors[j] for k in range(len(ind_MT)): if np.sum(ind_MT[k][j]) > 0: parts = axs[0].violinplot( np.log10(e_d[ind_MT[k][j]]), positions=[k], showextrema=True) for pc in parts['bodies']: pc.set_facecolor(col) # pc.set_alpha(0.5) parts['cmins'].set_edgecolor(col) parts['cmaxes'].set_edgecolor(col) parts['cbars'].set_edgecolor(col) axs[0].scatter(k, np.log10( np.nanmedian(e_d[ind_MT[k][j]])), color=col, marker='o') axs[0].scatter(k, np.log10( np.nanpercentile(e_d[ind_MT[k][j]], 90)), color=col, marker='x') if j == 2: xx = Ygt[:, ind][ind_MT[k][j]] yy = e_d[ind_MT[k][j]] axs[1].scatter(xx, yy, c=col_lab[k], label=labels[k], alpha=0.4) axs[1].plot([np.nanmin(xx), np.nanmax(xx)], [np.nanpercentile(yy, 90)] * 2, c=col_lab[k]) axs[1].set_title(keys[ind]) axs[1].set_xlabel('ground truth') axs[1].set_ylabel('error') axs[0].set_xticks(np.arange(0, len(labels))) axs[0].set_xticklabels(labels) axs[0].set_xlim(-0.75, len(labels) - 0.25) axs[0].set_title(keys[ind]) axs[0].set_ylabel(er) legend_m = [ Patch(facecolor=colors[i], edgecolor='k', label=methods[i]) for i in range(len(methods))] axs[0].legend(handles=legend_m, loc='upper right', fontsize='small') axs[1].legend(loc='upper right', fontsize='small') plt.savefig(os.path.join( path, str(ind) + '_' + keys[ind] + '_' + er[1:4] + '.png')) plt.close() absdif = np.abs(Ygt - Ys[j]) e_r = absdif / (np.abs(Ygt) + 1e-16) order = np.argsort(np.nanpercentile(e_r[ind_MT[0][j], :], 90, axis=0)) ncol = 10 nrows = len(order) // ncol fig, axs = plt.subplots(nrows, 1, figsize=(12, 60)) axs[0].set_title('purple: st. MT, green: unst. MT, pink no MT') for k in range(len(ind_MT)): if np.sum(ind_MT[k][j]) > 0: for i in range(nrows): indices = order[i * ncol:np.min([(i + 1) * ncol, len(order)])] parts = axs[i].violinplot( np.log10(e_r[ind_MT[k][j], :][:, indices]), showextrema=True, showmedians=True) for pc in parts['bodies']: pc.set_facecolor(col_lab[k]) parts['cmins'].set_edgecolor(col_lab[k]) parts['cmaxes'].set_edgecolor(col_lab[k]) parts['cbars'].set_edgecolor(col_lab[k]) parts['cmedians'].set_edgecolor(col_lab[k]) plt.sca(axs[i]) plt.xticks(np.arange(1, 11), [str(i) + ' ' + keys[i] for i in indices], rotation=20) axs[i].set_ylim(-7, 1) axs[i].set_ylabel(errors[1]) for i in range(nrows): axs[i].grid(axis='y') fig.tight_layout() plt.savefig(os.path.join(path, '0_' + 'errors.png'))