Source code for candle.uq_keras_utils

from __future__ import absolute_import

from typing import List, Optional, Tuple, Type, Union

import numpy as np
from scipy.stats import cauchy, norm
from tensorflow.keras import backend as K
from tensorflow.keras import layers
from tensorflow.keras.callbacks import Callback
from tensorflow.keras.layers import Dense
from tensorflow.keras.metrics import mean_absolute_error, mean_squared_error
from tensorflow.keras.models import Model
from tensorflow.keras.utils import to_categorical

Array = Type[np.ndarray]

piSQ = np.pi**2

###################################################################

# For Abstention Model


[docs]def abstention_loss(alpha, mask: Array): """ Function to compute abstention loss. It is composed by two terms: (i) original loss of the multiclass classification problem, (ii) cost associated to the abstaining samples. :param alpha: Keras variable. Weight of abstention term in cost function :param ndarray mask: Numpy array to use as mask for abstention: \ it is 1 on the output associated to the abstention class and 0 otherwise """ def loss(y_true, y_pred): """ :param y_true: keras tensor. True values to predict :param y_pred: keras tensor. Prediction made by the model. \ It is assumed that this keras tensor includes extra columns to store the abstaining classes. """ base_pred = (1 - mask) * y_pred + K.epsilon() base_true = y_true base_cost = K.categorical_crossentropy(base_true, base_pred) # abs_pred = K.mean(mask * y_pred, axis=-1) abs_pred = K.sum(mask * y_pred, axis=-1) # add some small value to prevent NaN when prediction is abstained abs_pred = K.clip(abs_pred, K.epsilon(), 1.0 - K.epsilon()) return (1.0 - abs_pred) * base_cost - alpha * K.log(1.0 - abs_pred) # return K.mean((1. - abs_pred) * base_cost - alpha * K.log(1. - abs_pred), axis = -1) loss.__name__ = "abs_crossentropy" return loss
[docs]def sparse_abstention_loss(alpha, mask: Array): """ Function to compute abstention loss. It is composed by two terms: (i) original loss of the multiclass classification problem, (ii) cost associated to the abstaining samples. Assumes y_true is not one-hot encoded. :param alpha: Keras variable. Weight of abstention term in cost function :param ndarray mask: Numpy array to use as mask for abstention: it is 1 on the output associated \ to the abstention class and 0 otherwise """ def loss(y_true, y_pred): """ :param y_true: keras tensor. True values to predict :param y_pred: keras tensor. Prediction made by the model. \ It is assumed that this keras tensor includes extra columns to store the abstaining classes. """ base_pred = (1 - mask) * y_pred + K.epsilon() base_true = y_true base_cost = K.sparse_categorical_crossentropy(base_true, base_pred) # abs_pred = K.mean(mask * y_pred, axis=-1) abs_pred = K.sum(mask * y_pred, axis=-1) # add some small value to prevent NaN when prediction is abstained abs_pred = K.clip(abs_pred, K.epsilon(), 1.0 - K.epsilon()) return (1.0 - abs_pred) * base_cost - alpha * K.log(1.0 - abs_pred) # return K.mean((1. - abs_pred) * base_cost - alpha * K.log(1. - abs_pred), axis = -1) loss.__name__ = "sparse_abs_crossentropy" return loss
[docs]def abstention_acc_metric(nb_classes: Union[int, Array]): """ Abstained accuracy: Function to estimate accuracy over the predicted samples after removing the samples where the model is abstaining. :param int or ndarray nb_classes: Integer or numpy array defining indices of the abstention class """ def metric(y_true, y_pred): """ :param y_true: keras tensor. True values to predict :param y_pred: keras tensor. Prediction made by the model.\ It is assumed that this keras tensor includes extra columns to store the abstaining classes. """ # matching in original classes true_pred = K.sum( K.cast( K.equal(K.argmax(y_true, axis=-1), K.argmax(y_pred, axis=-1)), "int64" ) ) # total abstention total_abs = K.sum( K.cast(K.equal(K.argmax(y_pred, axis=-1), nb_classes), "int64") ) # total predicted in original classes total_pred = K.sum( K.cast( K.equal(K.argmax(y_pred, axis=-1), K.argmax(y_pred, axis=-1)), "int64" ) ) # guard against divide by zero condition = K.greater(total_pred, total_abs) abs_acc = K.switch( condition, true_pred / (total_pred - total_abs), total_pred / total_pred ) return abs_acc metric.__name__ = "abstention_acc" return metric
[docs]def sparse_abstention_acc_metric(nb_classes: Union[int, Array]): """ Abstained accuracy: Function to estimate accuracy over the predicted samples after removing the samples where the model is abstaining. Assumes y_true is not one-hot encoded. :param int or ndarray nb_classes: Integer or numpy array defining indices of the abstention class """ def metric(y_true, y_pred): """ :param y_true: keras tensor. True values to predict :param y_pred: keras tensor. Prediction made by the model. \ It is assumed that this keras tensor includes extra columns to store the abstaining classes. """ # matching in original classes y_pred_index = K.argmax(y_pred, axis=-1) y_true_index = K.cast(K.max(y_true, axis=-1), "int64") true_pred = K.sum(K.cast(K.equal(y_true_index, y_pred_index), "int64")) # total abstention total_abs = K.sum( K.cast(K.equal(K.argmax(y_pred, axis=-1), nb_classes), "int64") ) # total predicted in original classes total_pred = K.sum( K.cast( K.equal(K.argmax(y_pred, axis=-1), K.argmax(y_pred, axis=-1)), "int64" ) ) # guard against divide by zero condition = K.greater(total_pred, total_abs) abs_acc = K.switch( condition, true_pred / (total_pred - total_abs), total_pred / total_pred ) return abs_acc metric.__name__ = "sparse_abstention_acc" return metric
[docs]def abstention_metric(nb_classes: Union[int, Array]): """ Function to estimate fraction of the samples where the model is abstaining. :param int or ndarray nb_classes: Integer or numpy array defining indices of the abstention class """ def metric(y_true, y_pred): """ :param y_true: keras tensor. True values to predict :param y_pred: keras tensor. Prediction made by the model. \ It is assumed that this keras tensor includes extra columns to store the abstaining classes. """ # total abstention total_abs = K.sum( K.cast(K.equal(K.argmax(y_pred, axis=-1), nb_classes), "int64") ) # total predicted in original classes total_pred = K.sum( K.cast( K.equal(K.argmax(y_pred, axis=-1), K.argmax(y_pred, axis=-1)), "int64" ) ) return total_abs / total_pred metric.__name__ = "abstention" return metric
[docs]def acc_class_i_metric(class_i: int): """ Function to estimate accuracy over the ith class prediction. This estimation is global (i.e. abstaining samples are not removed) :param int class_i: Index of the class to estimate accuracy """ def metric(y_true, y_pred): """ :param y_true: keras tensor. True values to predict :param y_pred: keras tensor. Prediction made by the model. \ It is assumed that this keras tensor includes extra columns to store the abstaining classes. """ # Find locations in ground truth belonging to class i ytrueint = K.cast(K.equal(K.argmax(y_true, axis=-1), class_i), "int64") # Compute total number of ground truth samples in class i total_true_i = K.sum(ytrueint) # Find samples in prediction belonging to class i (not accounting for abstention) ypredint = K.cast(K.equal(K.argmax(y_pred[:, :-1], axis=-1), class_i), "int64") # Find correctly predicted class i samples true_i_pred = K.sum(ytrueint * ypredint) # Compute accuracy in class i acc = true_i_pred / total_true_i # Since there could be few samples in class i # it is possible that ground truth does not # have any sample in class i, leading to a divide # by zero and not valid accuracy # Therefore, for the accuracy to be valid # total_true_i should be greater than zero # otherwise, return 0. condition = K.greater(total_true_i, 0) return K.switch(condition, acc, K.zeros_like(acc, dtype=acc.dtype)) metric.__name__ = "acc_class_{}".format(class_i) return metric
[docs]def abstention_acc_class_i_metric(nb_classes: Union[int, Array], class_i: int): """ Function to estimate accuracy over the class i prediction after removing the samples where the model is abstaining. :param int or ndarray nb_classes: Integer or numpy array defining indices of the abstention class :param int class_i: Index of the class to estimate accuracy after removing abstention samples """ def metric(y_true, y_pred): """ :param y_true: keras tensor. True values to predict :param y_pred: keras tensor. Prediction made by the model. \ It is assumed that this keras tensor includes extra columns to store the abstaining classes. """ # Find locations in ground truth belonging to class i ytrueint = K.cast(K.equal(K.argmax(y_true, axis=-1), class_i), "int64") # Find locations that are predicted (not abstained) mask_pred = K.cast(K.not_equal(K.argmax(y_pred, axis=-1), nb_classes), "int64") # Compute total number of ground truth samples in class i filtering abstaining predictions total_true_i = K.sum(ytrueint * mask_pred) # matching in original class i after removing abstention true_i_pred = K.sum( mask_pred * ytrueint * K.cast( K.equal(K.argmax(y_true, axis=-1), K.argmax(y_pred, axis=-1)), "int64" ) ) # Compute accuracy in class i acc = true_i_pred / total_true_i # Since there could few samples in class i # it is possible that ground truth does not # have any sample in class i, leading to a divide # by zero and not valid accuracy # Therefore, for the accuracy to be valid # total_true_i should be greater than zero # otherwise, return 0. condition = K.greater(total_true_i, 0) return K.switch(condition, acc, K.zeros_like(acc, dtype=acc.dtype)) metric.__name__ = "abstention_acc_class_{}".format(class_i) return metric
[docs]def abstention_class_i_metric(nb_classes: Union[int, Array], class_i: int): """ Function to estimate fraction of the samples where the model is abstaining in class i. :param int or ndarray nb_classes: Integer or numpy array defining indices of the abstention class :param int class_i: Index of the class to estimate accuracy """ def metric(y_true, y_pred): """ :param y_true: keras tensor. True values to predict :param y_pred: keras tensor. Prediction made by the model. \ It is assumed that this keras tensor includes extra columns to store the abstaining classes. """ # Find locations in ground truth belonging to class i ytrue_i_int = K.cast(K.equal(K.argmax(y_true, axis=-1), class_i), "int64") # total in class i total_class_i = K.sum(ytrue_i_int) # Abstention samples y_abs = K.cast(K.equal(K.argmax(y_pred, axis=-1), nb_classes), "int64") # Abstention in class_i total_abs_i = K.sum(ytrue_i_int * y_abs) abs_i = total_abs_i / total_class_i condition = K.greater(total_class_i, 0) return K.switch(condition, abs_i, K.zeros_like(abs_i, dtype=abs_i.dtype)) metric.__name__ = "abstention_class_{}".format(class_i) return metric
[docs]class AbstentionAdapt_Callback(Callback): """ This callback is used to adapt the parameter alpha in the abstention loss. The parameter alpha (weight of the abstention term in the abstention loss) is increased or decreased adaptively during the training run. It is decreased if the current abstention accuracy is less than the minimum accuracy set or increased if the current abstention fraction is greater than the maximum fraction set. The abstention accuracy metric to use must be specified as the 'acc_monitor' argument in the initialization of the callback. It could be: the global abstention accuracy (abstention_acc), the abstention accuracy over the ith class (acc_class_i), etc. The abstention metric to use must be specified as the 'abs_monitor' argument in the initialization of the callback. It should be the metric that computes the fraction of samples for which the model is abstaining (abstention). The factor alpha is modified if the current abstention accuracy is less than the minimum accuracy set or if the current abstention fraction is greater than the maximum fraction set. Thresholds for minimum and maximum correction factors are computed and the correction over alpha is not allowed to be less or greater than them, respectively, to avoid huge swings in the abstention loss evolution. """ def __init__( self, acc_monitor, abs_monitor, alpha0: float, init_abs_epoch: int = 4, alpha_scale_factor: float = 0.8, min_abs_acc: float = 0.9, max_abs_frac: float = 0.4, acc_gain: float = 5.0, abs_gain: float = 1.0, ): """ Initializer of the AbstentionAdapt_Callback. :param keras.metric acc_monitor: Accuracy metric to monitor during the run and use \ as base to adapt the weight of the abstention term (i.e. alpha) in the abstention \ cost function. (Must be an accuracy metric that takes abstention into account). :param keras.metric abs_monitor: Abstention metric monitored during the run and \ used as the other factor to adapt the weight of the abstention term (i.e. alpha) in the asbstention loss function :param float alpha0: Initial weight of abstention term in cost function :param int init_abs_epoch: Value of the epochs to start adjusting the weight of the abstention \ term (i.e. alpha). Default: 4. :param float alpha_scale_factor: Factor to scale (increase by dividing or decrease by multiplying) \ the weight of the abstention term (i.e. alpha). Default: 0.8. :param float min_abs_acc: Minimum accuracy to target in the current training. Default: 0.9. :param float max_abs_frac: Maximum abstention fraction to tolerate in the current training. Default: 0.4. :param float acc_gain: Factor to adjust alpha scale. Default: 5.0. :param float abs_gain: Factor to adjust alpha scale. Default: 1.0. """ super(AbstentionAdapt_Callback, self).__init__() self.acc_monitor = ( acc_monitor # Keras metric to monitor (must be an accuracy with abstention) ) self.abs_monitor = abs_monitor # Keras metric momitoring abstention fraction self.alpha = K.variable(value=alpha0) # Weight of abstention term self.init_abs_epoch = init_abs_epoch # epoch to init abstention self.alpha_scale_factor = alpha_scale_factor # factor to scale alpha (weight for abstention term in cost function) self.min_abs_acc = min_abs_acc # minimum target accuracy (value specified as parameter of the run) self.max_abs_frac = max_abs_frac # maximum abstention fraction (value specified as parameter of the run) self.acc_gain = acc_gain # factor for adjusting alpha scale self.abs_gain = abs_gain # factor for adjusting alpha scale self.alphavalues: List[float] = [] # array to store alpha evolution
[docs] def on_epoch_end(self, epoch: int, logs=None): """ Updates the weight of abstention term on epoch end. :param int epoch: Current epoch in training. :param logs: Metrics stored during current keras training. """ new_alpha_val = K.get_value(self.alpha) if epoch > self.init_abs_epoch: if self.acc_monitor is None or self.abs_monitor is None: raise Exception( "ERROR! Abstention Adapt conditioned on metrics " + str(self.acc_monitor) + " and " + str(self.abs_monitor) + " which are not available. Available metrics are: " + ",".join(list(logs.keys())) + "... Exiting" ) else: # Current accuracy (with abstention) abs_acc = logs.get(self.acc_monitor) # Current abstention fraction abs_frac = logs.get(self.abs_monitor) if abs_acc is None or abs_frac is None: raise Exception( "ERROR! Abstention Adapt conditioned on metrics " + str(self.acc_monitor) + " and " + str(self.abs_monitor) + " which are not available. Available metrics are: " + ",".join(list(logs.keys())) + "... Exiting" ) # modify alpha as needed acc_error = abs_acc - self.min_abs_acc acc_error = min(acc_error, 0.0) abs_error = abs_frac - self.max_abs_frac abs_error = max(abs_error, 0.0) new_scale = 1.0 + self.acc_gain * acc_error + self.abs_gain * abs_error # threshold to avoid huge swings min_scale = self.alpha_scale_factor max_scale = 1.0 / self.alpha_scale_factor new_scale = min(new_scale, max_scale) new_scale = max(new_scale, min_scale) # print('Scaling factor: ', new_scale) new_alpha_val *= new_scale K.set_value(self.alpha, new_alpha_val) print("Scaling factor: ", new_scale, " new alpha, ", new_alpha_val) self.alphavalues.append(new_alpha_val)
[docs]def modify_labels( numclasses_out: int, ytrain: Array, ytest: Array, yval: Optional[Array] = None ) -> Tuple[Array, ...]: """ This function generates a categorical representation with a class added for indicating abstention. :param int numclasses_out: Original number of classes + 1 abstention class :param ndarray ytrain: Numpy array of the classes (labels) in the training set :param ndarray ytest: Numpy array of the classes (labels) in the testing set :param ndarray yval: Numpy array of the classes (labels) in the validation set """ classestrain = np.max(ytrain) + 1 classestest = np.max(ytest) + 1 if yval is not None: classesval = np.max(yval) + 1 assert classestrain == classestest if yval is not None: assert classesval == classestest assert ( classestrain + 1 ) == numclasses_out # In this case only one other slot for abstention is created labels_train = to_categorical(ytrain, numclasses_out) labels_test = to_categorical(ytest, numclasses_out) if yval is not None: labels_val = to_categorical(yval, numclasses_out) # For sanity check mask_vec = np.zeros(labels_train.shape) mask_vec[:, -1] = 1 i = np.random.choice(range(labels_train.shape[0])) sanity_check = mask_vec[i, :] * labels_train[i, :] print(sanity_check.shape) if ytrain.ndim > 1: ll = ytrain.shape[1] else: ll = 0 for i in range(ll): for j in range(numclasses_out): if sanity_check[i, j] == 1: print("Problem at ", i, j) if yval is not None: return labels_train, labels_test, labels_val return labels_train, labels_test
###################################################################
[docs]def add_model_output( modelIn, mode: Optional[str] = None, num_add: Optional[int] = None, activation: Optional[str] = None, ): """ This function modifies the last dense layer in the passed keras model. The modification includes adding units and optionally changing the activation function. :param modelIn: keras model. Keras model to be modified. :param string mode: Mode to modify the layer. It could be: - 'abstain' for adding an arbitrary number of units for the abstention optimization strategy. - 'qtl' for quantile regression which needs the outputs to be tripled. - 'het' for heteroscedastic regression which needs the outputs to be doubled. :param int num_add: Number of units to add. This only applies to the 'abstain' mode. :param string activation: String with keras specification of activation function (e.g. 'relu', 'sigomid', 'softmax', etc.) :return: Keras model after last dense layer has been modified as specified. \ If there is no mode specified it returns the same model. \ If the mode is not one of 'abstain', 'qtl' or 'het' an exception is raised. :rtype: Keras model """ if mode is None: return modelIn numlayers = len(modelIn.layers) # Find last dense layer i = -1 while "dense" not in (modelIn.layers[i].name) and ((i + numlayers) > 0): i -= 1 # Minimal verification about the validity of the layer found assert (i + numlayers) >= 0 assert "dense" in modelIn.layers[i].name # Compute new output size if mode == "abstain": assert num_add is not None new_output_size = modelIn.layers[i].output_shape[-1] + num_add elif mode == "qtl": # for quantile UQ new_output_size = 3 * modelIn.layers[i].output_shape[-1] elif mode == "het": # for heteroscedastic UQ new_output_size = 2 * modelIn.layers[i].output_shape[-1] else: raise Exception( "ERROR ! Type of mode specified for adding outputs to the model: " + mode + " not implemented... Exiting" ) # Recover current layer options config = modelIn.layers[i].get_config() # Update number of units config["units"] = new_output_size # Update activation function if requested if activation is not None: config["activation"] = activation # Bias initialization seems to help het and qtl if mode == "het" or mode == "qtl": config["bias_initializer"] = "ones" # Create new Dense layer reconstructed_layer = Dense.from_config(config) # Connect new Dense last layer to previous one-before-last layer additional = reconstructed_layer(modelIn.layers[i - 1].output) # If the layer to replace is not the last layer, add the remainder layers if i < -1: for j in range(i + 1, 0): config_j = modelIn.layers[j].get_config() aux_j = layers.deserialize( {"class_name": modelIn.layers[j].__class__.__name__, "config": config_j} ) reconstructed_layer = aux_j.from_config(config_j) additional = reconstructed_layer(additional) modelOut = Model(modelIn.input, additional) return modelOut
################################################################### # UQ regression - utilities
[docs]def r2_heteroscedastic_metric(nout: int): """ This function computes the r2 for the heteroscedastic model. The r2 is computed over the prediction of the mean and the standard deviation prediction is not taken into account. :param int nout: Number of outputs without uq augmentation """ def metric(y_true, y_pred): """ :param y_true: Keras tensor. Keras tensor including the ground truth :param y_pred: Keras tensor. Keras tensor including the predictions of a heteroscedastic model. \ The predictions follow the order: (mean_0, S_0, mean_1, S_1, ...) with S_i the log of the variance for the ith output. """ if nout > 1: y_out = K.reshape(y_pred[:, 0::2], K.shape(y_true)) else: y_out = K.reshape(y_pred[:, 0], K.shape(y_true)) SS_res = K.sum(K.square(y_true - y_out)) SS_tot = K.sum(K.square(y_true - K.mean(y_true))) return 1.0 - SS_res / (SS_tot + K.epsilon()) metric.__name__ = "r2_heteroscedastic" return metric
[docs]def mae_heteroscedastic_metric(nout: int): """ This function computes the mean absolute error (mae) for the heteroscedastic model. The mae is computed over the prediction of the mean and the standard deviation prediction is not taken into account. :param int nout: Number of outputs without uq augmentation """ def metric(y_true, y_pred): """ :param y_true: Keras tensor. Keras tensor including the ground truth :param y_pred: Keras tensor. Keras tensor including the predictions of a heteroscedastic model. \ The predictions follow the order: (mean_0, S_0, mean_1, S_1, ...) with S_i the log of the variance for the ith output. """ if nout > 1: y_out = K.reshape(y_pred[:, 0::2], K.shape(y_true)) else: y_out = K.reshape(y_pred[:, 0], K.shape(y_true)) return mean_absolute_error(y_true, y_out) metric.__name__ = "mae_heteroscedastic" return metric
[docs]def mse_heteroscedastic_metric(nout: int): """ This function computes the mean squared error (mse) for the heteroscedastic model. The mse is computed over the prediction of the mean and the standard deviation prediction is not taken into account. :param int nout: Number of outputs without uq augmentation """ def metric(y_true, y_pred): """ :param y_true: Keras tensor. Keras tensor including the ground truth :param y_pred: Keras tensor. Keras tensor including the predictions of a heteroscedastic model. \ The predictions follow the order: (mean_0, S_0, mean_1, S_1, ...) with S_i the log of the variance for the ith output. """ if nout > 1: y_out = K.reshape(y_pred[:, 0::2], K.shape(y_true)) else: y_out = K.reshape(y_pred[:, 0], K.shape(y_true)) return mean_squared_error(y_true, y_out) metric.__name__ = "mse_heteroscedastic" return metric
[docs]def meanS_heteroscedastic_metric(nout: int): """ This function computes the mean log of the variance (log S) for the heteroscedastic model. The mean log is computed over the standard deviation prediction and the mean prediction is not taken into account. :param int nout: Number of outputs without uq augmentation """ def metric(y_true, y_pred): """ :param y_true: Keras tensor. Keras tensor including the ground truth :param y_pred: Keras tensor. Keras tensor including the predictions of a heteroscedastic model. \ The predictions follow the order: (mean_0, S_0, mean_1, S_1, ...) with S_i the log of the variance for the ith output. """ if nout > 1: log_sig2 = y_pred[:, 1::2] else: log_sig2 = y_pred[:, 1] return K.mean(log_sig2) metric.__name__ = "meanS_heteroscedastic" return metric
[docs]def heteroscedastic_loss(nout: int): """ This function computes the heteroscedastic loss for the heteroscedastic model. Both mean and standard deviation predictions are taken into account. :param int nout: Number of outputs without uq augmentation """ def loss(y_true, y_pred): """ This function computes the heteroscedastic loss. :param y_true: Keras tensor. Keras tensor including the ground truth :param y_pred: Keras tensor. Keras tensor including the predictions of a heteroscedastic model. \ The predictions follow the order: (mean_0, S_0, mean_1, S_1, ...) with S_i the log of the variance for the ith output. """ y_shape = K.shape(y_true) if nout > 1: y_out = K.reshape(y_pred[:, 0::2], y_shape) log_sig2 = K.reshape(y_pred[:, 1::nout], y_shape) else: y_out = K.reshape(y_pred[:, 0], y_shape) log_sig2 = K.reshape(y_pred[:, 1], y_shape) diff_sq = K.square(y_out - y_true) return K.mean(K.exp(-log_sig2) * diff_sq + log_sig2) # Return a function return loss
[docs]def quantile_loss(quantile: float, y_true, y_pred): """ This function computes the quantile loss for a given quantile fraction. :param quantile: float in (0, 1). Quantile fraction to compute the loss. :param y_true: Keras tensor. Keras tensor including the ground truth :param y_pred: Keras tensor. Keras tensor including the predictions of a quantile model. """ error = y_true - y_pred return K.mean(K.maximum(quantile * error, (quantile - 1) * error))
[docs]def triple_quantile_loss(nout: int, lowquantile: float, highquantile: float): """ This function computes the quantile loss for the median and low and high quantiles. The median is given twice the weight of the other components. :param int nout: Number of outputs without uq augmentation :param lowquantile: float in (0, 1). Fraction corresponding to the low quantile :param highquantile: float in (0, 1). Fraction corresponding to the high quantile """ def loss(y_true, y_pred): """ This function computes the quantile loss, considering the median and low and high quantiles. :param y_true: Keras tensor. Keras tensor including the ground truth :param y_pred: Keras tensor. Keras tensor including the predictions of a heteroscedastic model. \ The predictions follow the order: (q50_0, qlow_0, qhigh_0, q50_1, qlow_1, qhigh_1, ...) \ with q50_i the median of the ith output and qlow_i and qhigh_i the low and high specified \ quantiles of the ith output. """ y_shape = K.shape(y_true) if nout > 1: y_qtl0 = K.reshape(y_pred[:, 0::3], y_shape) y_qtl1 = K.reshape(y_pred[:, 1::3], y_shape) y_qtl2 = K.reshape(y_pred[:, 2::3], y_shape) else: y_qtl0 = K.reshape(y_pred[:, 0], y_shape) y_qtl1 = K.reshape(y_pred[:, 1], y_shape) y_qtl2 = K.reshape(y_pred[:, 2], y_shape) return ( quantile_loss(lowquantile, y_true, y_qtl1) + quantile_loss(highquantile, y_true, y_qtl2) + 2.0 * quantile_loss(0.5, y_true, y_qtl0) ) return loss
[docs]def quantile_metric(nout: int, index: int, quantile: float): """ This function computes the quantile metric for a given quantile and corresponding output index. This is provided as a metric to track evolution while training. :param int nout: Number of outputs without uq augmentation :param int index: Index of output corresponding to the given quantile. :param quantile: float in (0, 1). Fraction corresponding to the quantile """ def metric(y_true, y_pred): """ :param y_true: Keras tensor. Keras tensor including the ground truth :param y_pred: Keras tensor. Keras tensor including the predictions of a quantile model. \ The predictions follow the order: (q50_0, qlow_0, qhigh_0, q50_1, qlow_1, qhigh_1, ...) \ with q50_i the median of the ith output and qlow_i and qhigh_i the low and high specified \ quantiles of the ith output. """ y_shape = K.shape(y_true) if nout > 1: y_qtl = K.reshape(y_pred[:, index::3], y_shape) else: y_qtl = K.reshape(y_pred[:, index], y_shape) return quantile_loss(quantile, y_true, y_qtl) metric.__name__ = "quantile_{}".format(quantile) return metric
################################################################### # For the Contamination Model
[docs]def add_index_to_output(y_train: Array) -> Array: """ This function adds a column to the training output to store the indices of the corresponding samples in the training set. :param ndarray y_train: Numpy array of the output in the training set """ # Add indices to y y_train_index = range(y_train.shape[0]) if y_train.ndim > 1: shp = (y_train.shape[0], 1) y_train_augmented = np.hstack([y_train, np.reshape(y_train_index, shp)]) else: y_train_augmented = np.vstack([y_train, y_train_index]).T return y_train_augmented
[docs]def contamination_loss(nout: int, T_k, a, sigmaSQ, gammaSQ): """ Function to compute contamination loss. It is composed by two terms: (i) the loss with respect to the normal distribution that models the distribution of the training data samples, (ii) the loss with respect to the Cauchy distribution that models the distribution of the outlier samples. Note that the evaluation of this contamination loss function does not make sense for any data different to the training set. This is because latent variables are only defined for samples in the training set. :param int nout: Number of outputs without uq augmentation (in the contamination model \ the augmentation corresponds to the data index in training). :param T_k: Keras tensor. Tensor containing latent variables (probability of membership \ to normal and Cauchy distributions) for each of the samples in the training set. \ (Validation data is usually augmented too to be able to run training with validation set, \ however loss in validation should not be used as a criterion for early stopping training \ since the latent variables are defined for the training only, and thus, are not valid \ when used in combination with data different from training). :param a: Keras variable. Probability of belonging to the normal distribution :param sigmaSQ: Keras variable. Variance estimated for the normal distribution :param gammaSQ: Keras variable. Scale estimated for the Cauchy distribution """ def loss(y_true, y_pred): """ :param y_true: keras tensor. True values to predict. It is assumed that this keras tensor \ includes extra columns to store the index of the data sample in the training set. :param y_pred: keras tensor. Prediction made by the model. """ y_shape = K.shape(y_pred) y_true_ = K.reshape(y_true[:, :-1], y_shape) if nout > 1: diff_sq = K.sum(K.square(y_true_ - y_pred), axis=-1) else: diff_sq = K.square(y_true_ - y_pred) term_normal = ( diff_sq / (2.0 * sigmaSQ) + 0.5 * K.log(sigmaSQ) + 0.5 * K.log(2.0 * np.pi) - K.log(a) ) term_cauchy = ( K.log(1.0 + diff_sq / gammaSQ) + 0.5 * K.log(piSQ * gammaSQ) - K.log(1.0 - a) ) batch_index = K.cast(y_true[:, -1], "int64") T_0_red = K.gather(T_k[:, 0], batch_index) T_1_red = K.gather(T_k[:, 1], batch_index) return K.sum(T_0_red * term_normal + T_1_red * term_cauchy) return loss
[docs]class Contamination_Callback(Callback): """ This callback is used to update the parameters of the contamination model. This functionality follows the EM algorithm: in the E-step latent variables are updated and in the M-step global variables are updated. The global variables correspond to 'a' (probability of membership to normal class), 'sigmaSQ' (variance of normal class) and 'gammaSQ' (scale of Cauchy class, modeling outliers). The latent variables correspond to 'T_k' (the first column corresponds to the probability of membership to the normal distribution, while the second column corresponds to the probability of membership to the Cauchy distribution i.e. outlier). """ def __init__(self, x, y, a_max=0.99): """Initializer of the Contamination_Callback. :param ndarray x: Array of samples (= input features) in training set. :param ndarray y: Array of sample outputs in training set. :param flaot a_max: Maximum value of a variable to allow """ super(Contamination_Callback, self).__init__() if y.ndim > 1: if y.shape[1] > 1: raise Exception( "ERROR ! Contamination model can be applied to one-output regression, but provided training data has: " + str(y.ndim) + "outpus... Exiting" ) self.x = x # Features of training set self.y = y # Output of training set self.a_max = a_max # Set maximum a value to allow self.sigmaSQ = K.variable( value=0.01 ) # Standard devation of normal distribution for error self.gammaSQ = K.variable(value=0.01) # Scale of Cauchy distribution for error # Parameter Initialization - Conditional distribution of the latent variables if isinstance(x, list): self.T = np.zeros((x[0].shape[0], 2)) else: self.T = np.zeros((self.x.shape[0], 2)) self.T[:, 0] = np.random.uniform(size=self.T.shape[0]) self.T[:, 1] = 1.0 - self.T[:, 0] self.T_k = K.variable(value=self.T) self.a = K.variable( value=np.mean(self.T[:, 0]) ) # Probability of membership to normal distribution self.avalues = [] # array to store a evolution self.sigmaSQvalues = [] # array to store sigmaSQ evolution self.gammaSQvalues = [] # array to store gammaSQ evolution
[docs] def on_epoch_end(self, epoch: int, logs={}): """ Updates the parameters of the distributions in the contamination model on epoch end. The parameters updated are: 'a' for the global weight of the membership to the normal distribution, 'sigmaSQ' for the variance of the normal distribution and 'gammaSQ' for the scale of the Cauchy distribution of outliers. The latent variables are updated as well: 'T_k' describing in the first column the probability of membership to normal distribution and in the second column probability of membership to the Cauchy distribution i.e. outlier. Stores evolution of global parameters (a, sigmaSQ and gammaSQ). :param int epoch: Current epoch in training. :param logs: keras logs. Metrics stored during current keras training. """ y_pred = self.model.predict(self.x) error = self.y.squeeze() - y_pred.squeeze() # Update parameters (M-Step) errorSQ = error**2 aux = np.mean(self.T[:, 0]) if aux > self.a_max: aux = self.a_max K.set_value(self.a, aux) K.set_value(self.sigmaSQ, np.sum(self.T[:, 0] * errorSQ) / np.sum(self.T[:, 0])) # Gradient descent gmSQ_eval = K.get_value(self.gammaSQ) grad_gmSQ = ( 0.5 * np.sum(self.T[:, 1]) - np.sum(self.T[:, 1] * errorSQ / (gmSQ_eval + errorSQ)) ) / gmSQ_eval # Guarantee positivity in update eta = K.get_value(self.model.optimizer.lr) new_gmSQ = gmSQ_eval - eta * grad_gmSQ while new_gmSQ < 0 or (new_gmSQ / gmSQ_eval) > 1000: eta /= 2 new_gmSQ = gmSQ_eval - eta * grad_gmSQ K.set_value(self.gammaSQ, new_gmSQ) # Update conditional distribution of latent variables (beginning of E-Step) a_eval = K.get_value(self.a) sigmaSQ_eval = K.get_value(self.sigmaSQ) gammaSQ_eval = K.get_value(self.gammaSQ) print("a: %f, sigmaSQ: %f, gammaSQ: %f" % (a_eval, sigmaSQ_eval, gammaSQ_eval)) norm_eval = norm.pdf(error, loc=0, scale=np.sqrt(sigmaSQ_eval)) cauchy_eval = cauchy.pdf(error, loc=0, scale=np.sqrt(gammaSQ_eval)) denominator = a_eval * norm_eval + (1.0 - a_eval) * cauchy_eval self.T[:, 0] = a_eval * norm_eval / denominator self.T[:, 1] = (1.0 - a_eval) * cauchy_eval / denominator K.set_value(self.T_k, self.T) # store evolution of global variables self.avalues.append(a_eval) self.sigmaSQvalues.append(sigmaSQ_eval) self.gammaSQvalues.append(gammaSQ_eval)
[docs]def mse_contamination_metric(nout: int): """This function computes the mean squared error (mse) for the contamination model. The mse is computed over the prediction. Therefore, the augmentation for the index variable is ignored. :param int nout: \ Number of outputs without uq augmentation (in the contamination model the augmentation corresponds to the data index in training). """ def metric(y_true, y_pred): """ :param y_true: Keras tensor. \ Keras tensor including the ground truth. Since the keras tensor includes an extra column to store the index of the data sample in the training set this column is ignored. :param y_pred: Keras tensor. \ Keras tensor with the predictions of the contamination model (no data index). """ return mean_squared_error(y_true[:, :nout], y_pred[:, :nout]) metric.__name__ = "mse_contamination" return metric
[docs]def mae_contamination_metric(nout: int): """ This function computes the mean absolute error (mae) for the contamination model. The mae is computed over the prediction. Therefore, the augmentation for the index variable is ignored. :param int nout: \ Number of outputs without uq augmentation (in the contamination model the augmentation corresponds to the data index in training). """ def metric(y_true, y_pred): """ :param y_true: Keras tensor. \ Keras tensor including the ground truth. Since the keras tensor includes an extra column to store the index of the data sample in the training set this column is ignored. :param y_pred: Keras tensor. \ Keras tensor with the predictions of the contamination model (no data index). """ return mean_absolute_error(y_true[:, :nout], y_pred[:, :nout]) metric.__name__ = "mae_contamination" return metric
[docs]def r2_contamination_metric(nout: int): """ This function computes the r2 for the contamination model. The r2 is computed over the prediction. Therefore, the augmentation for the index variable is ignored. :param int nout: \ Number of outputs without uq augmentation (in the contamination model the augmentation corresponds to the data index in training). """ def metric(y_true, y_pred): """ :param y_true: Keras tensor. \ Keras tensor including the ground truth. Since the keras tensor includes an extra column to store the index of the data sample in the training set this column is ignored. :param y_pred: Keras tensor. \ Keras tensor with the predictions of the contamination model (no data index). """ # if nout > 1: # y_true_ = K.reshape(y_true[:, :-1], K.shape(y_pred)) # else: # y_true_ = K.reshape(y_true[:, 0], K.shape(y_pred)) y_true_ = K.reshape(y_true[:, :-1], K.shape(y_pred)) SS_res = K.sum(K.square(y_true_ - y_pred)) SS_tot = K.sum(K.square(y_true_ - K.mean(y_true_))) return 1.0 - SS_res / (SS_tot + K.epsilon()) metric.__name__ = "r2_contamination" return metric