Source code for posydon.active_learning.psy_cris.sample

"""The definition of the `Sampler` class in PSY-CRIS."""


__authors__ = [
    "Kyle Akira Rocha <kylerocha2024@u.northwestern.edu>",
    "Scott Coughlin <scottcoughlin2014@u.northwestern.edu>",
]


import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import time
import sys
from collections import OrderedDict

import scipy.stats
from scipy.spatial.distance import pdist


[docs] class Sampler: """Class implementing PTMCMC and MCMC for PSY-CRIS algorith. Modular implementation of PTMCMC and MCMC designed to implement the PSY-CRIS algorithm of sampling points in a target distribution constructed with a Classifier and Regressor. After a posterior is generated, methods in this class are also used to downsample. """ def __init__(self, classifier=None, regressor=None): """Initialize the sampler. Parameters ---------- classifier : instance of <class, Classifier> A trained classifier object. regressor : instance of <class, Regressor>, optional A trained regressor object. """ self._Classifier_ = classifier self._Regressor_ = regressor if (self._Classifier_ is not None) or (self._Regressor_ is not None): if self._Classifier_ is not None: self._TableData_ = self._Classifier_._TableData_ else: self._TableData_ = self._Regressor_._TableData_ # Find the bounds of the walker - should be a TableData attribute self._max_vals_ = self._TableData_._max_input_vals self._min_vals_ = self._TableData_._min_input_vals # You can save chains_history in here self._chain_step_hist_holder_ = OrderedDict() # Not fully implemented yet I'm pretty sure.... self._MAX_APC_str_ = []
[docs] def TD_2d_analytic(self, name, args, **kwargs): r"""2-dimensional analytic target distribution for testing MCMC/PTMCMC. The function: $\frac{16}{3\pi} \left( \exp\left[-\mu^2 - (9 + 4\mu^2 + 8\nu)^2\right] + \frac{1}{2} \exp\left[- 8 \mu^2 - 8 (\nu-2)^2\right] \right)$ Parameters ---------- name : str Name of algorithm to use. For this method None. args : array 2D location to get the value of the function. **kwargs Kwargs for more complex target distributions. Returns ------- array or float """ mu, nu = args arg1 = -(mu ** 2) - (9 + 4 * mu ** 2 + 8 * nu) ** 2 arg2 = -8 * mu ** 2 - 8 * (nu - 2) ** 2 return (16) / (3 * np.pi) * (np.exp(arg1) + 0.5 * np.exp(arg2))
[docs] def get_TD_classification_data(self, *args, **kwargs): """Get target-distribution classification data. Calculate terms relevant for creating target distributions with classification terms. Parameters ---------- classifier_name : str Trained classifier name to use for predictions. position : array Position in parameter space to eval **kwargs TD_verbose : bool Print useful output Returns ------- max_probs : array Maximum probabilities at each query point position : array Position in parameter space being queried cls_key : array Classification key predicted for each query position """ classifier_name, position = args position = np.array(position) if position.ndim == 1: position = position[np.newaxis, :] TD_verbose = kwargs.get("TD_verbose", False) normalized_probs, where_not_nan = self._Classifier_.return_probs( classifier_name, position, verbose=TD_verbose ) where_nan = np.array( [i for i in range(len(position)) if i not in where_not_nan] ) max_probs = np.max(normalized_probs, axis=1) cls_key = np.argmax(normalized_probs, axis=1) position = np.array(position) if len(where_nan) > 0: max_probs = np.zeros(position.shape[0]) + 1e-16 cls_key = [None] * position.shape[0] if TD_verbose: print('\t', max_probs, position, cls_key) return max_probs, position, cls_key else: max_probs = np.where(max_probs == 1, 1-1e-16, max_probs) if TD_verbose: print('\t', max_probs, position, cls_key) return max_probs, position, cls_key
[docs] def TD_classification(self, classifier_name, position, **kwargs): r"""Target distribution using classification. $f(x) = 1 - max[P_{\rm class}(x)]$ Parameters ---------- classifier_name : str String to specify the trained classification algorithm to use. position : array Single location in parameter space for the target distribution to be evaluated at. **kwargs TD_BETA : float Exponent of target distribution - $f(x)^{\rm TD_BETA}$ Used for smoothing or sharpening. TD_verbose : bool Extra print output every method call. Returns ------- array If classification probability is Nan: f(x) = 1E-16 """ # TD_verbose = kwargs.get("TD_verbose", False) TD_BETA = kwargs.get("TD_BETA", 1.0) max_probs, pos, cls_keys = self.get_TD_classification_data( classifier_name, position, **kwargs ) theoretical_max_TD_cls_term = 1 - 1 / self._TableData_.num_classes return ((1 - max_probs) * 1 / theoretical_max_TD_cls_term) ** (TD_BETA)
[docs] def TD_classification_regression(self, names, args, **kwargs): r"""Target distribution using both classification & regression. Classification: $1 - max[P_{\rm class}(x)]$ Regression: $ A_0 \log( A_1* abs( max[APC_n [loc]]) + 1 )$ Parameters ---------- names : list like Iterable containing the two strings specifying the classification and regression algorithm to use. args : array Position in parameter space to evaluate the target distribution at. **kwargs TD_A1 : float, optional Scaling factor inside the Log regression error term. (Default = 0.5) TD_TAU : float, optional Relative weight of classification to regression term. (Default = 0.5) TD_BETA : float, optional Exponent of the entire target distribution. Used for smoothing or sharpening the distribution. Default is 1. TD_verbose : bool, optional Print more diagnostic information. Rreturns -------- array """ normalized_probs, where_not_nan = self._Classifier_.return_probs( names[0], args, verbose=False ) max_probs = np.max(normalized_probs, axis=1) pred_class_ids = np.argmax(normalized_probs, axis=1) cls_key = [self._Classifier_.class_id_mapping[i] for i in pred_class_ids] theoretical_max_TD_cls_term = ( 1 - 1 / self._Classifier_._TableData_.num_classes) if max_probs == 1: max_probs = 1 - 1e-16 classification_term = (1 - max_probs) * 1 / theoretical_max_TD_cls_term if len(where_not_nan) != len(normalized_probs): return 1e-16 else: if isinstance( self._Regressor_.regr_dfs_per_class[cls_key[0]], pd.DataFrame ): max_APC, which_col_max = self._Regressor_.get_max_APC_val( names[1], cls_key[0], args ) self._MAX_APC_str_.append(which_col_max) if self._Regressor_.abs_max_APC is None: raise ValueError("No max APC value found in TableData...") A1 = kwargs.get("TD_A1", 0.5) scaling_log_func = lambda A1, x: np.log10(A1 * np.abs(x) + 1) A0 = 1 / scaling_log_func(A1, self._Regressor_.abs_max_APC) regression_term = A0 * scaling_log_func(A1, max_APC) else: regression_term = 1e-16 TD_TAU = kwargs.get("TD_TAU", 0.5) TD_BETA = kwargs.get("TD_BETA", 1.0) if kwargs.get("TD_verbose", False): print("TD_TAU: {0} | TD_BETA: {1}".format(TD_TAU, TD_BETA)) return (TD_TAU * classification_term + (1 - TD_TAU) * regression_term) ** TD_BETA
[docs] def save_chain_step_history(self, key, chain_step_history, overwrite=False): """Save PTMCMC output chain_step_history inside the sampler object.""" if not (key not in self._chain_step_hist_holder_.keys() or overwrite): raise Exception( "\nYou are about to overwrite an existing element in '{0}'\n\n" "\tUse the option 'overwrite=True' to reassign.".format(key)) self._chain_step_hist_holder_[key] = chain_step_history print("Saved chain to '{0}'.".format(key))
[docs] def get_saved_chain_step_history(self, key, return_all=False): """Return the saved chain step history.""" if return_all: return self._chain_step_hist_holder_ else: return self._chain_step_hist_holder_[key]
[docs] def run_PTMCMC(self, T_max, N_tot, target_dist, classifier_name, init_pos=None, N_draws_per_swap=3, c_spacing=1.2, alpha=None, upper_limit_reject=1e5, verbose=False, trace_plots=False, **TD_kwargs): """Run a Paralel Tempered MCMC with user-specified target distribution. Calls the method `run_MCMC`. Parameters ---------- T_max : float Sets the maximum temperature MCMC in the chain. N_tot : int The total number of iterations for the PTMCMC. target_dist : callable The target distribution to sample. Must take arguments (method_name, location_to_eval) (A 2D analytic function is provided - analytic_target_dist) classifier_name : str, list A single string or list of strings specifying the interpolator to use for classification or classification & regression respectively. init_pos : array Initial position of walkers in each axis. Default is the median of the input data in TableData. N_draws_per_swap : int, optional Number of draws to perform for each MCMC before swap proposals. c_spacing : float, optional Sets the spacing of temperatures in each chain. T_{i+1} = T_{i}^{1/c}, range: [T_max , T=1] alpha : float, optional Sets the standard deviation of steps taken by the walkers. Default is 1/5 the range of training data from TableData. upper_limit_reject : float, optional Sets the upper limit of rejected points. verbose : bool, optional Useful print statements during execution. Returns ------- chain_step_history : dict Hold the step history for every chain. Keys are integers that range from 0 (max T) to the total number of chains -1 (min T). T_list : array Array filled with the temperatures of each chain from max to min. Notes ----- There is a zero prior on the PTMCMC outside the range of training data. """ # create list of Temperatures: T_{i+1}=T_{i}^{1/c}, range: [T_max, T=1] T_list = [T_max] while T_list[-1] > 1.3: T_list.append(T_list[-1] ** (1 / c_spacing)) T_list.append(1) T_list = np.array(T_list) num_chains = len(T_list) if verbose: print("Num chains: {0}\nTemperatures: {1}\n".format(num_chains, T_list)) # data storage chain_holder = OrderedDict() for i in range(len(T_list)): chain_holder[i] = [] N_loops = int(N_tot / N_draws_per_swap) # Initial conditions for all links in chain # ADD: init_pos can be a unitless position in the range of the axes of # the data. This change should also be applied to alpha - user just # gives num(0, 1] if init_pos is None: init_pos = np.median(self._TableData_._input_.values, axis=0) if not isinstance(init_pos, np.ndarray): init_pos = np.array(init_pos) if init_pos.ndim > 1: raise ValueError( "init_pos has {0} dimensions, must be one dimensional.".format( init_pos.ndim ) ) if alpha is None: alpha = [abs(max_val-min_val)/5 for min_val, max_val in zip(self._min_vals_, self._max_vals_)] # start chains in the same position this_iter_step_loc = [init_pos] * num_chains # Accept ratio tracker total_acc = np.zeros(num_chains) total_rej = np.zeros(num_chains) acc_ratio_holder = np.zeros(num_chains) start_time = time.time() for counter in range(N_loops): # Number of draws before swap N_draws = N_draws_per_swap last_step_holder = [] for i in range(num_chains): # Run MCMC as f(T) N_draw times step_history = [this_iter_step_loc[i]] steps, acc, rej = self.run_MCMC( N_draws, alpha, step_history, target_dist, classifier_name, T=T_list[i], upper_limit_reject=upper_limit_reject, **TD_kwargs ) # save 'current' params for each T last_step_holder.append(steps[-1]) total_acc[i] += acc total_rej[i] += rej acc_ratio_holder[i] = total_acc[i] / (total_acc[i] + total_rej[i]) # data storage chain_holder[i].append(np.array(steps)) if verbose: # useful output during the PTMCMC num_bars = 20 how_close = int((counter / (N_loops - 1)) * num_bars) progress_bar = ( "|" + how_close * "=" + ">" + abs(num_bars - how_close) * " " + "|" + "{0:.1f}%".format(counter / (N_loops - 1) * 100) ) b = ("num_acc/total: Tmax={0:.4f} Tmin={1:.4f} loop #{2}, {3}". format(acc_ratio_holder[0], acc_ratio_holder[-1], counter, progress_bar)) sys.stdout.write("\r" + b) # Calc H to see if chains SWAP accept = 0 reject = 0 for i in range(len(T_list) - 1): args_i = last_step_holder[i] args_i_1 = last_step_holder[i + 1] top = (target_dist(classifier_name, args_i_1, **TD_kwargs))**( 1 / T_list[i] ) * (target_dist(classifier_name, args_i, **TD_kwargs)) ** ( 1 / T_list[i + 1] ) bot = (target_dist(classifier_name, args_i, **TD_kwargs)) ** ( 1 / T_list[i] ) * (target_dist(classifier_name, args_i_1, **TD_kwargs)) ** ( 1 / T_list[i + 1] ) # can get div 0 errors when using linear because of nans if bot == 0: ratio = 0 else: ratio = top / bot # inter-chain transition probability H = min(1, ratio) chance = np.random.uniform(low=0, high=1) if chance <= H: accept += 1 # SWAP the params between the two chains last_step_holder[i] = args_i_1 last_step_holder[i + 1] = args_i else: reject += 1 # print(accept, reject); print(last_step_holder) # Update current params (could be swapped) # to read into MCMC on next iteration!!! this_iter_step_loc = last_step_holder chain_step_history = OrderedDict() for pos, steps in chain_holder.items(): chain_step_history[pos] = np.concatenate(steps) if verbose: print("\nLength of chains: \n{0}".format( np.array([len(chain_step_history[i]) for i in range(num_chains)]))) fin_time_s = time.time() - start_time print( "Finished in {0:.2f} seconds, {1:.2f} minutes.".format( fin_time_s, fin_time_s / 60 ) ) if trace_plots: self.make_trace_plot(chain_step_history, T_list, 0, save_fig=False) self.make_trace_plot(chain_step_history, T_list, num_chains - 1, save_fig=False) return chain_step_history, T_list
[docs] def run_MCMC(self, N_trials, alpha, step_history, target_dist, classifier_name, T=1, upper_limit_reject=1e4, **TD_kwargs): """Run a Markov chain Monte Carlo given a target distribution. Parameters ---------- N_trials : int Number of proposals or trial steps to take before stopping. alpha : float Related to the step size of the MCMC walker. Defines the standard deviation of a zero mean normal from which the step is randomly drawn. step_history : list Initial starting location in parameter space. Could contain an arbitrary number of previous steps but a walker will start at the last step in the list. targe_dist : callable The target distribution to sample. Must take arguments ( method_name, element_of_step_history ) (A 2D analytic function is provided - TD_2d_analytic) classifier_name : str Name of interpolation technique used in the target_dist. T : float, optional Temperature of the MCMC. upper_limit_reject : int, optional Sets the maximum number of rejected steps before the MCMC stops walking. Avoiding a slowly converging walk with few accepted points. Returns ------- step_history : array An array containing all accepted steps of the MCMC. accept : int Total number of accepted steps. reject : int Total number of rejected steps. Notes ----- Assumes uniform priors and a symetric jump proposal (gaussian). """ if not isinstance(step_history, np.ndarray): step_history = np.array(step_history) if step_history.ndim == 1: step_history = np.array([step_history]) # We will be appending to the list step_history = list(step_history) accept = 0 reject = 0 while (accept + reject < N_trials and reject < abs(int(upper_limit_reject))): current_step = step_history[-1] # f(θ) val = target_dist(classifier_name, current_step, **TD_kwargs) # θ+Δθ trial_step = current_step + np.random.normal( 0, alpha, size=len(current_step) ) # f(θ+Δθ) trial_val = target_dist(classifier_name, trial_step, **TD_kwargs) # check if the trial step is in the range of data for i, step_in_axis in enumerate(trial_step): if ( step_in_axis <= self._max_vals_[i] and step_in_axis >= self._min_vals_[i] ): pass else: trial_val = 0 # essential reject points outside of range if val == 0: # avoid div 0 errors ratio = 0 else: ratio = trial_val / val accept_prob = min(1, (ratio) ** (1 / T)) chance = np.random.uniform(low=0, high=1) if chance <= accept_prob: accept += 1 step_history.append(trial_step) else: reject += 1 return np.array(step_history), accept, reject
[docs] def normalize_step_history(self, step_history): """Take steps and normalize [0,1] according to min/max in each axis. The max and min are taken from the original data set from TableData. """ normed_steps = np.copy(step_history) for j, steps_in_axis in enumerate(step_history.T): normed_steps.T[j] = (steps_in_axis - self._min_vals_[j]) / ( self._max_vals_[j] - self._min_vals_[j] ) return normed_steps
[docs] def undo_normalize_step_history(self, normed_steps): """Rescale step history. Take normed steps from [0,1] and return their value in the original range of the axes based off the range in TableData. """ mapped_steps = np.copy(normed_steps) for j, steps_in_axis in enumerate(normed_steps.T): mapped_steps.T[j] = ( steps_in_axis * (self._max_vals_[j] - self._min_vals_[j]) + self._min_vals_[j] ) return mapped_steps
[docs] def do_simple_density_logic(self, step_history, N_points, Kappa, var_mult=None, add_mvns_together=False, include_training_data=True, verbose=False): """Perform multivariate normal density logic on a given step history. This is a simplified version of the method 'do_density_logic'. It assumes that every accepted point will have the same exact MVN. Each proposal distribution starts with the training set from TableData which keeps training data from being proposed again. Parameters ---------- step_history : ndarray List of points from a PTMCMC or MCMC. (posterior) N_points : int Number of points desired to be drawn from the posterior but may not actually be the number of points accepted. Contributes to the length scale of the MVN distribution of accepted points (along with kappa). Kappa : float Scaling factor that sets the initial size of the MVN for accepted points. This should be proportional to the filling factor of the area of iterest described by the target distribution used to create the posterior. var_mult : float, ndarray, optional Variance multiplier for the MVN of accepted points. add_mvns_together : bool, optional Add MVNs together when creating the accepted point distribution. include_training_data : bool, optional Include the trainind data in the target distribution before sampling. verbose : bool, optional Print useful diagnostic information. Returns ------- accepted_points : ndarray Accepted points from the posterior to be labled by the user. (query points) rejected_points : ndarray Rejected points from the posterior. Notes ----- The accepted laguage here is indicative of query points for the oracle to label in an active learning scheme. It is not accepted vs rejected normally used for MCMC. """ if include_training_data: if self._TableData_ is None: raise ValueError("No TableData found in sampler. " "Set `include_training_data` to false.") original_training_data = self._TableData_.get_data("input") else: original_training_data = [] how_many_training_data = len(original_training_data) n_dim = len(original_training_data[0]) # approximate scaling of the variance given a set of N proposal points # the filling factor Kappa is not generally known a priori if var_mult is None: var_mult = 1 sigma = Kappa * 0.5 * (N_points) ** (-1.0 / n_dim) * var_mult Covariance = sigma * np.identity(n_dim) if verbose: print("Covariance: \n{0}".format(Covariance)) single_MVN = scipy.stats.multivariate_normal(np.zeros(n_dim), Covariance) max_val = 1 / np.sqrt(( 2 * np.pi) ** (n_dim) * np.linalg.det(Covariance)) # treat the training data as already accepted points accepted_points = list(original_training_data) rejected_points = [] for step in step_history: if len(accepted_points) < 1: accepted_points.append(step) continue # distance: how far is this point from all the accepted_points dist_to_acc_pts = step - np.array(accepted_points) pdf_val_at_point = single_MVN.pdf(dist_to_acc_pts) if isinstance(pdf_val_at_point, float): pdf_val_at_point = [pdf_val_at_point] if add_mvns_together: pdf_val_at_point = [np.sum(pdf_val_at_point)] random_chances = np.random.uniform( low=0, high=max_val, size=len(pdf_val_at_point) ) chance_above_distr = random_chances > pdf_val_at_point if chance_above_distr.all(): accepted_points.append(step) else: rejected_points.append(step) # remove training data from accepted points only_new_accpeted_points = accepted_points[how_many_training_data:] return np.array(only_new_accpeted_points), np.array(rejected_points)
[docs] def do_density_logic(self, step_history, N_points, Kappa, shuffle=False, norm_steps=False, var_mult=None, add_mvns_together=False, pre_acc_points=None, verbose=False): """Do the density based of the normal gaussian kernel on each point. This method automatically takes out the first 5% of steps of the MCMC so that the initial starting points are not chosen automatically (if you start in a non-ideal region). Wait for the burn in. Parameters ---------- step_history : ndarray N_points : int Kappa : float shuffle : bool, optional norm_steps : bool, optional var_mult : float, optional add_mvns_together : bool, optional verbose : bool, optional Returns ------- accepted_points : ndarray rejected_points : ndarray accepted_sigmas : ndarray """ if shuffle: # shuffle the order of the steps if verbose: print("Shuffling steps....") np.random.shuffle(step_history) # returns none if norm_steps: # normalize steps if verbose: print("Normalizing steps....") step_history = self.normalize_step_history(step_history) # Set the default average length scale num_dim = len(self._Classifier_.input_data[0]) sigma = Kappa * 0.5 * (N_points) ** (-1.0 / num_dim) # We assume the covaraince is the identity - later we may pass the # entire array but for now we just assume you pass a # variance multiplier (var_mult) if var_mult is None: var_mult = np.array([1] * num_dim) else: var_mult = np.array(var_mult) if var_mult.ndim != num_dim: raise ValueError( "var_mult must be the same dimensionality as input data." ) if verbose: print("Num dims: {0}".format(num_dim)) print("length scale sigma: {0}".format(sigma)) print("var_mult: {0}".format(var_mult)) print("Kappa: {0}".format(Kappa)) # -> Forcing a few key points to always be accepted, for example if pre_acc_points is None: accepted_points = [] elif isinstance(pre_acc_points, np.ndarray): accepted_points = list(pre_acc_points) else: accepted_points = [] accepted_points = [] accepted_sigmas = [] max_val_holder = [] accepted_mvn_holder = [] rejected_points = [] skip_steps = int(len(step_history) * 0.05) good_steps = step_history[ skip_steps: ] # skip first 5% of steps to get into a good region for i in range(len(good_steps)): proposed_step = good_steps[i] accept = False if len(accepted_points) == 0: accept = True else: # If you enter you must have accepted one point Sigma = accepted_sigmas[-1:] k = len(Sigma) max_val = 1 / np.sqrt((2 * np.pi)**k * np.linalg.det(Sigma)) max_val_holder.append(max_val) rnd_chance = np.random.uniform( low=0, high=np.max(max_val_holder), size=1 ) # we will choose the chance from [0, highest point in distr] distr_holder = [] for point in accepted_mvn_holder: eval_mvn_at_new_step = point.pdf(proposed_step) distr_holder.append(eval_mvn_at_new_step) if add_mvns_together: # instead of checking each individual point keeping all # mvns seperate, we want to add them # together and get upper bound # IF we do this we need to change the MVN to not be # normalized !!!! # THE UNORMALIZED MVN IS NOT IMPLEMENTED total_chance_above_distr = ( rnd_chance > np.sum(distr_holder)) else: total_chance_above_distr = np.sum( rnd_chance > distr_holder) if len(accepted_points) == total_chance_above_distr: accept = True else: pass # REJECT if accept: # https://stackoverflow.com/questions/619335 # corner = np.random.normal(0,0.1, 1) # A = np.array( [ [sigma, corner], [corner, sigma] ] ) # Sigma = np.dot(A,A.transpose()) # Sigma = [ [sigma*var_mult[0], 0.], [0., sigma*var_mult[1]] ] Sigma = (sigma * np.identity(len(var_mult)) * np.array([var_mult])) mvn = scipy.stats.multivariate_normal(proposed_step, Sigma) accepted_mvn_holder.append(mvn) accepted_sigmas.append(Sigma) accepted_points.append(proposed_step) else: rejected_points.append(proposed_step) if verbose: print("Num accepted: {0}".format(len(accepted_points))) print("Num rejected: {0}".format(len(rejected_points))) if norm_steps: if verbose: print("Unormalizing steps....") accepted_points = self.undo_normalize_step_history(accepted_points) rejected_points = self.undo_normalize_step_history(rejected_points) return ( np.array(accepted_points), np.array(rejected_points), np.array(accepted_sigmas), )
[docs] def get_proposed_points(self, step_history, N_points, Kappa, shuffle=False, norm_steps=False, add_mvns_together=False, include_training_data=True, var_mult=None, seed=None, n_repeats=1, max_iters=1e3, verbose=False, **kwargs): """Get proposed points in parameter space given a MCMC step history. The desnity logic is not deterministic, so multiple iterations may be needed to converge on a desired number of proposed points. This method performs multiple calls to do_density_logic while changing Kappa in order to return the desired number of points. After n_iters instances of the correct number of N_points, the distibution with the largest average distance is chosen. Warning: This algorithm has not been tested for large N data sets and may struggle to converge. Parameters ---------- step_history : ndarray Posterior from which to sample new query points. N_points : int N query points to converge to. Kappa : float Multiplies the length scale of MVNs and changes such that the desired number of query points is found. shuffle : bool, optional Shuffle points in posterior in place before sampling. norm_steps : bool, optional Normalize steps before sampling. add_mvns_together : bool, optional Add MVNs of accepted point distribution together. include_training_data : bool, optional Include training data in the accpeted point distribution before sampling. var_mult : ndarray, optional Variance multiplier. seed : float, optional Random seed to use for random sampling. n_repeats: int, optional Number of times to converge to the correct number of points. Each iteration may be a different realization of the posterior. verbose : bool, optional Print useful information. **kwargs show_plots : bool, optional Show 2D plot of proposed points with step history & training data. Returns ------- acc_pts : ndarray Array of proposed points to be used as initial conditions in new simulations. Kappa : float Scaling factor which reproduced the desired number of accepted points. Notes ----- Will automatically exit if it goes through max_iters iterations without converging on the desired number of points. """ if seed is not None: np.random.seed(seed=seed) print("Setting seed: {0}".format(seed)) if verbose: print( "Converging to {0} points, {1} times.".format(N_points, int(n_repeats)) ) enough_good_pts = False how_many_good_pts = int(n_repeats) good_n_points = [] avg_distances = [] good_kappas = [] iters = 1 start_time = time.time() while not (enough_good_pts) and iters < max_iters: acc_pts, rej_pts = self.do_simple_density_logic( step_history, N_points, Kappa, var_mult=var_mult, add_mvns_together=add_mvns_together, include_training_data=include_training_data, verbose=False, ) if len(acc_pts) == 0: Kappa = Kappa * 0.5 iters += 1 continue if len(acc_pts) == N_points: average_dist_between_acc_points = np.mean(pdist(acc_pts)) good_n_points.append(acc_pts) avg_distances.append(average_dist_between_acc_points) good_kappas.append(Kappa) if len(good_n_points) >= how_many_good_pts: enough_good_pts = True if verbose: if len(acc_pts) == N_points: print_str = " *{0}* {1:2.2f}s".format( len(good_n_points), abs(start_time - time.time()) ) ending = "\n" else: print_str = "" ending = "\r" print( "\t acc_pts: {0}, Kappa = {1:.3f}".format(len(acc_pts), Kappa) + print_str, end=ending, ) diff = abs(len(acc_pts) - N_points) change_factor = 0.01 / (max(1, np.log10(iters))) if len(acc_pts) > N_points: Kappa = Kappa * (1 + change_factor * diff) # increase kappa elif len(acc_pts) < N_points: if (1 - change_factor * diff) < 0: Kappa = Kappa * 0.1 else: Kappa = Kappa * (1 - change_factor*diff) # decrease kappa iters += 1 if iters == max_iters: print("Reached max iters before converging!") if verbose: print("\nFinal Kappa = {0}\nConverged in {1} iters.".format(Kappa, iters)) # we want 1/r dependance to penalize closely spaced points where_best_distribution = np.argmax(avg_distances) best_acc_pts = good_n_points[where_best_distribution] best_Kappa = good_kappas[where_best_distribution] if verbose: print("Average Distances: \n{0}".format(np.array(avg_distances))) print("Kappas: \n{0}".format(np.array(good_kappas))) print("loc: {0}".format(where_best_distribution)) if kwargs.get("show_plots", False): self.make_prop_points_plots(step_history, best_acc_pts) return best_acc_pts, best_Kappa
[docs] def make_prop_points_plots(self, step_hist, prop_points, axes=(0, 1), show_fig=True, save_fig=False): """Plot the proposed / accepted points over the step history.""" axis1, axis2 = axes fig, sub = plt.subplots(nrows=1, ncols=1, figsize=(4, 4), dpi=90) training_data = self._Classifier_.input_data sub.scatter( training_data.T[axis1], training_data.T[axis2], color="k", marker="x", label="training data", ) sub.scatter( step_hist.T[axis1], step_hist.T[axis2], color="C0", alpha=0.5, label="step history", ) sub.scatter( prop_points.T[axis1], prop_points.T[axis2], color="pink", label="new proposed points", ) sub.set_xlim(self._min_vals_[axis1], self._max_vals_[axis1]) sub.set_ylim(self._min_vals_[axis2], self._max_vals_[axis2]) plt.legend(loc="best", bbox_to_anchor=[1, 0, 0.22, 1]) return fig, sub
[docs] def make_trace_plot(self, chain_holder, T_list, Temp, save_fig=False, show_fig=True): """Make a `step number` vs. `position of a sampler in an axis` plot. This function makes titles assuming you are using the data from the classifier. """ if not show_fig and not save_fig: return which_temp = Temp # int? index n_axis = len(chain_holder[which_temp].T) steps = chain_holder[which_temp].T axis_names = self._Classifier_._TableData_.get_data( what_data="input", return_df=True ).keys() fig, axs = plt.subplots( nrows=n_axis, ncols=2, figsize=(10, 2.5 * n_axis), dpi=100, gridspec_kw={"width_ratios": [1.8, 1]}, ) for num, ax in enumerate(axs): ax[0].plot(steps[num], "-", linewidth=0.5, color="C4") ax[0].set_title("Input: {0}".format(axis_names[num])) ax[0].set_ylabel("Axis {0}".format(num) + " , T={0:.2f}". format(T_list[which_temp]), fontsize=12) ax[0].set_xlabel("N steps", fontsize=12) ax[1].hist(steps[num], bins=50, histtype="step", density=True, color="C1") ax[1].set_xlabel("Axis {0}".format(num), fontsize=12) ax[1].set_ylabel("Posterior", fontsize=12) fig.subplots_adjust(hspace=0.45) if save_fig: plt.savefig("trace_plot_T{0:.0f}.pdf".format(T_list[which_temp])) if show_fig: plt.show() return None