Source code for uq_keras_utils

from __future__ import absolute_import

from tensorflow.keras import backend as K

from tensorflow.keras.callbacks import Callback

from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense
from tensorflow.keras import layers

from tensorflow.keras.utils import to_categorical
from tensorflow.keras.metrics import mean_squared_error, mean_absolute_error

import numpy as np

from scipy.stats import norm, cauchy

piSQ = np.pi**2

###################################################################

# For Abstention Model


[docs]def abstention_loss(alpha, mask): """ Function to compute abstention loss. It is composed by two terms: (i) original loss of the multiclass classification problem, (ii) cost associated to the abstaining samples. Parameters ---------- alpha : Keras variable Weight of abstention term in cost function mask : ndarray Numpy array to use as mask for abstention: it is 1 on the output associated to the abstention class and 0 otherwise """ def loss(y_true, y_pred): """ Parameters ---------- y_true : keras tensor True values to predict y_pred : keras tensor Prediction made by the model. It is assumed that this keras tensor includes extra columns to store the abstaining classes. """ base_pred = (1 - mask) * y_pred + K.epsilon() base_true = y_true base_cost = K.categorical_crossentropy(base_true, base_pred) # abs_pred = K.mean(mask * y_pred, axis=-1) abs_pred = K.sum(mask * y_pred, axis=-1) # add some small value to prevent NaN when prediction is abstained abs_pred = K.clip(abs_pred, K.epsilon(), 1. - K.epsilon()) return ((1. - abs_pred) * base_cost - alpha * K.log(1. - abs_pred)) # return K.mean((1. - abs_pred) * base_cost - alpha * K.log(1. - abs_pred), axis = -1) loss.__name__ = 'abs_crossentropy' return loss
[docs]def sparse_abstention_loss(alpha, mask): """ Function to compute abstention loss. It is composed by two terms: (i) original loss of the multiclass classification problem, (ii) cost associated to the abstaining samples. Assumes y_true is not one-hot encoded. Parameters ---------- alpha : Keras variable Weight of abstention term in cost function mask : ndarray Numpy array to use as mask for abstention: it is 1 on the output associated to the abstention class and 0 otherwise """ def loss(y_true, y_pred): """ Parameters ---------- y_true : keras tensor True values to predict y_pred : keras tensor Prediction made by the model. It is assumed that this keras tensor includes extra columns to store the abstaining classes. """ base_pred = (1 - mask) * y_pred + K.epsilon() base_true = y_true base_cost = K.sparse_categorical_crossentropy(base_true, base_pred) # abs_pred = K.mean(mask * y_pred, axis=-1) abs_pred = K.sum(mask * y_pred, axis=-1) # add some small value to prevent NaN when prediction is abstained abs_pred = K.clip(abs_pred, K.epsilon(), 1. - K.epsilon()) return ((1. - abs_pred) * base_cost - alpha * K.log(1. - abs_pred)) # return K.mean((1. - abs_pred) * base_cost - alpha * K.log(1. - abs_pred), axis = -1) loss.__name__ = 'sparse_abs_crossentropy' return loss
[docs]def abstention_acc_metric(nb_classes): """ Abstained accuracy: Function to estimate accuracy over the predicted samples after removing the samples where the model is abstaining. Parameters ---------- nb_classes : int or ndarray Integer or numpy array defining indices of the abstention class """ def metric(y_true, y_pred): """ Parameters ---------- y_true : keras tensor True values to predict y_pred : keras tensor Prediction made by the model. It is assumed that this keras tensor includes extra columns to store the abstaining classes. """ # matching in original classes true_pred = K.sum(K.cast(K.equal(K.argmax(y_true, axis=-1), K.argmax(y_pred, axis=-1)), 'int64')) # total abstention total_abs = K.sum(K.cast(K.equal(K.argmax(y_pred, axis=-1), nb_classes), 'int64')) # total predicted in original classes total_pred = K.sum(K.cast(K.equal(K.argmax(y_pred, axis=-1), K.argmax(y_pred, axis=-1)), 'int64')) # guard against divide by zero condition = K.greater(total_pred, total_abs) abs_acc = K.switch(condition, true_pred / (total_pred - total_abs), total_pred / total_pred) return abs_acc metric.__name__ = 'abstention_acc' return metric
[docs]def sparse_abstention_acc_metric(nb_classes): """ Abstained accuracy: Function to estimate accuracy over the predicted samples after removing the samples where the model is abstaining. Assumes y_true is not one-hot encoded. Parameters ---------- nb_classes : int or ndarray Integer or numpy array defining indices of the abstention class """ def metric(y_true, y_pred): """ Parameters ---------- y_true : keras tensor True values to predict y_pred : keras tensor Prediction made by the model. It is assumed that this keras tensor includes extra columns to store the abstaining classes. """ # matching in original classes y_pred_index = K.argmax(y_pred, axis=-1) y_true_index = K.cast(K.max(y_true, axis=-1), 'int64') true_pred = K.sum(K.cast(K.equal(y_true_index, y_pred_index), 'int64')) # total abstention total_abs = K.sum(K.cast(K.equal(K.argmax(y_pred, axis=-1), nb_classes), 'int64')) # total predicted in original classes total_pred = K.sum(K.cast(K.equal(K.argmax(y_pred, axis=-1), K.argmax(y_pred, axis=-1)), 'int64')) # guard against divide by zero condition = K.greater(total_pred, total_abs) abs_acc = K.switch(condition, true_pred / (total_pred - total_abs), total_pred / total_pred) return abs_acc metric.__name__ = 'sparse_abstention_acc' return metric
[docs]def abstention_metric(nb_classes): """ Function to estimate fraction of the samples where the model is abstaining. Parameters ---------- nb_classes : int or ndarray Integer or numpy array defining indices of the abstention class """ def metric(y_true, y_pred): """ Parameters ---------- y_true : keras tensor True values to predict y_pred : keras tensor Prediction made by the model. It is assumed that this keras tensor includes extra columns to store the abstaining classes. """ # total abstention total_abs = K.sum(K.cast(K.equal(K.argmax(y_pred, axis=-1), nb_classes), 'int64')) # total predicted in original classes total_pred = K.sum(K.cast(K.equal(K.argmax(y_pred, axis=-1), K.argmax(y_pred, axis=-1)), 'int64')) return total_abs / total_pred metric.__name__ = 'abstention' return metric
[docs]def acc_class_i_metric(class_i): """ Function to estimate accuracy over the ith class prediction. This estimation is global (i.e. abstaining samples are not removed) Parameters ---------- class_i : int Index of the class to estimate accuracy """ def metric(y_true, y_pred): """ Parameters ---------- y_true : keras tensor True values to predict y_pred : keras tensor Prediction made by the model. It is assumed that this keras tensor includes extra columns to store the abstaining classes. """ # Find locations in ground truth belonging to class i ytrueint = K.cast(K.equal(K.argmax(y_true, axis=-1), class_i), 'int64') # Compute total number of ground truth samples in class i total_true_i = K.sum(ytrueint) # Find samples in prediction belonging to class i (not accounting for abstention) ypredint = K.cast(K.equal(K.argmax(y_pred[:, :-1], axis=-1), class_i), 'int64') # Find correctly predicted class i samples true_i_pred = K.sum(ytrueint * ypredint) # Compute accuracy in class i acc = true_i_pred / total_true_i # Since there could be few samples in class i # it is possible that ground truth does not # have any sample in class i, leading to a divide # by zero and not valid accuracy # Therefore, for the accuracy to be valid # total_true_i should be greater than zero # otherwise, return 0. condition = K.greater(total_true_i, 0) return K.switch(condition, acc, K.zeros_like(acc, dtype=acc.dtype)) metric.__name__ = 'acc_class_{}'.format(class_i) return metric
[docs]def abstention_acc_class_i_metric(nb_classes, class_i): """ Function to estimate accuracy over the class i prediction after removing the samples where the model is abstaining. Parameters ---------- nb_classes : int or ndarray Integer or numpy array defining indices of the abstention class class_i : int Index of the class to estimate accuracy after removing abstention samples """ def metric(y_true, y_pred): """ Parameters ---------- y_true : keras tensor True values to predict y_pred : keras tensor Prediction made by the model. It is assumed that this keras tensor includes extra columns to store the abstaining classes. """ # Find locations in ground truth belonging to class i ytrueint = K.cast(K.equal(K.argmax(y_true, axis=-1), class_i), 'int64') # Find locations that are predicted (not abstained) mask_pred = K.cast(K.not_equal(K.argmax(y_pred, axis=-1), nb_classes), 'int64') # Compute total number of ground truth samples in class i filtering abstaining predictions total_true_i = K.sum(ytrueint * mask_pred) # matching in original class i after removing abstention true_i_pred = K.sum(mask_pred * ytrueint * K.cast(K.equal(K.argmax(y_true, axis=-1), K.argmax(y_pred, axis=-1)), 'int64')) # Compute accuracy in class i acc = true_i_pred / total_true_i # Since there could few samples in class i # it is possible that ground truth does not # have any sample in class i, leading to a divide # by zero and not valid accuracy # Therefore, for the accuracy to be valid # total_true_i should be greater than zero # otherwise, return 0. condition = K.greater(total_true_i, 0) return K.switch(condition, acc, K.zeros_like(acc, dtype=acc.dtype)) metric.__name__ = 'abstention_acc_class_{}'.format(class_i) return metric
[docs]def abstention_class_i_metric(nb_classes, class_i): """ Function to estimate fraction of the samples where the model is abstaining in class i. Parameters ---------- nb_classes : int or ndarray Integer or numpy array defining indices of the abstention class class_i : int Index of the class to estimate accuracy """ def metric(y_true, y_pred): """ Parameters ---------- y_true : keras tensor True values to predict y_pred : keras tensor Prediction made by the model. It is assumed that this keras tensor includes extra columns to store the abstaining classes. """ # Find locations in ground truth belonging to class i ytrue_i_int = K.cast(K.equal(K.argmax(y_true, axis=-1), class_i), 'int64') # total in class i total_class_i = K.sum(ytrue_i_int) # Abstention samples y_abs = K.cast(K.equal(K.argmax(y_pred, axis=-1), nb_classes), 'int64') # Abstention in class_i total_abs_i = K.sum(ytrue_i_int * y_abs) abs_i = total_abs_i / total_class_i condition = K.greater(total_class_i, 0) return K.switch(condition, abs_i, K.zeros_like(abs_i, dtype=abs_i.dtype)) metric.__name__ = 'abstention_class_{}'.format(class_i) return metric
[docs]class AbstentionAdapt_Callback(Callback): """ This callback is used to adapt the parameter alpha in the abstention loss. The parameter alpha (weight of the abstention term in the abstention loss) is increased or decreased adaptively during the training run. It is decreased if the current abstention accuracy is less than the minimum accuracy set or increased if the current abstention fraction is greater than the maximum fraction set. The abstention accuracy metric to use must be specified as the 'acc_monitor' argument in the initialization of the callback. It could be: the global abstention accuracy (abstention_acc), the abstention accuracy over the ith class (acc_class_i), etc. The abstention metric to use must be specified as the 'abs_monitor' argument in the initialization of the callback. It should be the metric that computes the fraction of samples for which the model is abstaining (abstention). The factor alpha is modified if the current abstention accuracy is less than the minimum accuracy set or if the current abstention fraction is greater than the maximum fraction set. Thresholds for minimum and maximum correction factors are computed and the correction over alpha is not allowed to be less or greater than them, respectively, to avoid huge swings in the abstention loss evolution. """ def __init__(self, acc_monitor, abs_monitor, alpha0, init_abs_epoch=4, alpha_scale_factor=0.8, min_abs_acc=0.9, max_abs_frac=0.4, acc_gain=5.0, abs_gain=1.0): """ Initializer of the AbstentionAdapt_Callback. Parameters ---------- acc_monitor : keras metric Accuracy metric to monitor during the run and use as base to adapt the weight of the abstention term (i.e. alpha) in the abstention cost function. (Must be an accuracy metric that takes abstention into account). abs_monitor : keras metric Abstention metric monitored during the run and used as the other factor to adapt the weight of the abstention term (i.e. alpha) in the asbstention loss function alpha0 : float Initial weight of abstention term in cost function init_abs_epoch : integer Value of the epochs to start adjusting the weight of the abstention term (i.e. alpha). Default: 4. alpha_scale_factor: float Factor to scale (increase by dividing or decrease by multiplying) the weight of the abstention term (i.e. alpha). Default: 0.8. min_abs_acc: float Minimum accuracy to target in the current training. Default: 0.9. max_abs_frac: float Maximum abstention fraction to tolerate in the current training. Default: 0.4. acc_gain: float Factor to adjust alpha scale. Default: 5.0. abs_gain: float Factor to adjust alpha scale. Default: 1.0. """ super(AbstentionAdapt_Callback, self).__init__() self.acc_monitor = acc_monitor # Keras metric to monitor (must be an accuracy with abstention) self.abs_monitor = abs_monitor # Keras metric momitoring abstention fraction self.alpha = K.variable(value=alpha0) # Weight of abstention term self.init_abs_epoch = init_abs_epoch # epoch to init abstention self.alpha_scale_factor = alpha_scale_factor # factor to scale alpha (weight for abstention term in cost function) self.min_abs_acc = min_abs_acc # minimum target accuracy (value specified as parameter of the run) self.max_abs_frac = max_abs_frac # maximum abstention fraction (value specified as parameter of the run) self.acc_gain = acc_gain # factor for adjusting alpha scale self.abs_gain = abs_gain # factor for adjusting alpha scale self.alphavalues = [] # array to store alpha evolution
[docs] def on_epoch_end(self, epoch, logs=None): """ Updates the weight of abstention term on epoch end. Parameters ---------- epoch : integer Current epoch in training. logs : keras logs Metrics stored during current keras training. """ new_alpha_val = K.get_value(self.alpha) if epoch > self.init_abs_epoch: if self.acc_monitor is None or self.abs_monitor is None: raise Exception('ERROR! Abstention Adapt conditioned on metrics ' + str(self.acc_monitor) + ' and ' + str(self.abs_monitor) + ' which are not available. Available metrics are: ' + ','.join(list(logs.keys())) + '... Exiting') else: # Current accuracy (with abstention) abs_acc = logs.get(self.acc_monitor) # Current abstention fraction abs_frac = logs.get(self.abs_monitor) if abs_acc is None or abs_frac is None: raise Exception('ERROR! Abstention Adapt conditioned on metrics ' + str(self.acc_monitor) + ' and ' + str(self.abs_monitor) + ' which are not available. Available metrics are: ' + ','.join(list(logs.keys())) + '... Exiting') # modify alpha as needed acc_error = abs_acc - self.min_abs_acc acc_error = min(acc_error, 0.0) abs_error = abs_frac - self.max_abs_frac abs_error = max(abs_error, 0.0) new_scale = 1.0 + self.acc_gain * acc_error + self.abs_gain * abs_error # threshold to avoid huge swings min_scale = self.alpha_scale_factor max_scale = 1. / self.alpha_scale_factor new_scale = min(new_scale, max_scale) new_scale = max(new_scale, min_scale) # print('Scaling factor: ', new_scale) new_alpha_val *= new_scale K.set_value(self.alpha, new_alpha_val) print('Scaling factor: ', new_scale, ' new alpha, ', new_alpha_val) self.alphavalues.append(new_alpha_val)
[docs]def modify_labels(numclasses_out, ytrain, ytest, yval=None): """ This function generates a categorical representation with a class added for indicating abstention. Parameters ---------- numclasses_out : integer Original number of classes + 1 abstention class ytrain : ndarray Numpy array of the classes (labels) in the training set ytest : ndarray Numpy array of the classes (labels) in the testing set yval : ndarray Numpy array of the classes (labels) in the validation set """ classestrain = np.max(ytrain) + 1 classestest = np.max(ytest) + 1 if yval is not None: classesval = np.max(yval) + 1 assert(classestrain == classestest) if yval is not None: assert(classesval == classestest) assert((classestrain + 1) == numclasses_out) # In this case only one other slot for abstention is created labels_train = to_categorical(ytrain, numclasses_out) labels_test = to_categorical(ytest, numclasses_out) if yval is not None: labels_val = to_categorical(yval, numclasses_out) # For sanity check mask_vec = np.zeros(labels_train.shape) mask_vec[:, -1] = 1 i = np.random.choice(range(labels_train.shape[0])) sanity_check = mask_vec[i, :] * labels_train[i, :] print(sanity_check.shape) if ytrain.ndim > 1: ll = ytrain.shape[1] else: ll = 0 for i in range(ll): for j in range(numclasses_out): if sanity_check[i, j] == 1: print('Problem at ', i, j) if yval is not None: return labels_train, labels_test, labels_val return labels_train, labels_test
###################################################################
[docs]def add_model_output(modelIn, mode=None, num_add=None, activation=None): """ This function modifies the last dense layer in the passed keras model. The modification includes adding units and optionally changing the activation function. Parameters ---------- modelIn : keras model Keras model to be modified. mode : string Mode to modify the layer. It could be: 'abstain' for adding an arbitrary number of units for the abstention optimization strategy. 'qtl' for quantile regression which needs the outputs to be tripled. 'het' for heteroscedastic regression which needs the outputs to be doubled. num_add : integer Number of units to add. This only applies to the 'abstain' mode. activation : string String with keras specification of activation function (e.g. 'relu', 'sigomid', 'softmax', etc.) Return ---------- modelOut : keras model Keras model after last dense layer has been modified as specified. If there is no mode specified it returns the same model. If the mode is not one of 'abstain', 'qtl' or 'het' an exception is raised. """ if mode is None: return modelIn numlayers = len(modelIn.layers) # Find last dense layer i = -1 while 'dense' not in (modelIn.layers[i].name) and ((i + numlayers) > 0): i -= 1 # Minimal verification about the validity of the layer found assert ((i + numlayers) >= 0) assert ('dense' in modelIn.layers[i].name) # Compute new output size if mode == 'abstain': assert num_add is not None new_output_size = modelIn.layers[i].output_shape[-1] + num_add elif mode == 'qtl': # for quantile UQ new_output_size = 3 * modelIn.layers[i].output_shape[-1] elif mode == 'het': # for heteroscedastic UQ new_output_size = 2 * modelIn.layers[i].output_shape[-1] else: raise Exception('ERROR ! Type of mode specified for adding outputs to the model: ' + mode + ' not implemented... Exiting') # Recover current layer options config = modelIn.layers[i].get_config() # Update number of units config['units'] = new_output_size # Update activation function if requested if activation is not None: config['activation'] = activation # Bias initialization seems to help het and qtl if mode == 'het' or mode == 'qtl': config['bias_initializer'] = 'ones' # Create new Dense layer reconstructed_layer = Dense.from_config(config) # Connect new Dense last layer to previous one-before-last layer additional = reconstructed_layer(modelIn.layers[i - 1].output) # If the layer to replace is not the last layer, add the remainder layers if i < -1: for j in range(i + 1, 0): config_j = modelIn.layers[j].get_config() aux_j = layers.deserialize({'class_name': modelIn.layers[j].__class__.__name__, 'config': config_j}) reconstructed_layer = aux_j.from_config(config_j) additional = reconstructed_layer(additional) modelOut = Model(modelIn.input, additional) return modelOut
################################################################### # UQ regression - utilities
[docs]def r2_heteroscedastic_metric(nout): """This function computes the r2 for the heteroscedastic model. The r2 is computed over the prediction of the mean and the standard deviation prediction is not taken into account. Parameters ---------- nout : int Number of outputs without uq augmentation """ def metric(y_true, y_pred): """ Parameters ---------- y_true : Keras tensor Keras tensor including the ground truth y_pred : Keras tensor Keras tensor including the predictions of a heteroscedastic model. The predictions follow the order: (mean_0, S_0, mean_1, S_1, ...) with S_i the log of the variance for the ith output. """ if nout > 1: y_out = K.reshape(y_pred[:, 0::nout], K.shape(y_true)) else: y_out = K.reshape(y_pred[:, 0], K.shape(y_true)) SS_res = K.sum(K.square(y_true - y_out)) SS_tot = K.sum(K.square(y_true - K.mean(y_true))) return (1. - SS_res / (SS_tot + K.epsilon())) metric.__name__ = 'r2_heteroscedastic' return metric
[docs]def mae_heteroscedastic_metric(nout): """This function computes the mean absolute error (mae) for the heteroscedastic model. The mae is computed over the prediction of the mean and the standard deviation prediction is not taken into account. Parameters ---------- nout : int Number of outputs without uq augmentation """ def metric(y_true, y_pred): """ Parameters ---------- y_true : Keras tensor Keras tensor including the ground truth y_pred : Keras tensor Keras tensor including the predictions of a heteroscedastic model. The predictions follow the order: (mean_0, S_0, mean_1, S_1, ...) with S_i the log of the variance for the ith output. """ if nout > 1: y_out = K.reshape(y_pred[:, 0::nout], K.shape(y_true)) else: y_out = K.reshape(y_pred[:, 0], K.shape(y_true)) return mean_absolute_error(y_true, y_out) metric.__name__ = 'mae_heteroscedastic' return metric
[docs]def mse_heteroscedastic_metric(nout): """This function computes the mean squared error (mse) for the heteroscedastic model. The mse is computed over the prediction of the mean and the standard deviation prediction is not taken into account. Parameters ---------- nout : int Number of outputs without uq augmentation """ def metric(y_true, y_pred): """ Parameters ---------- y_true : Keras tensor Keras tensor including the ground truth y_pred : Keras tensor Keras tensor including the predictions of a heteroscedastic model. The predictions follow the order: (mean_0, S_0, mean_1, S_1, ...) with S_i the log of the variance for the ith output. """ if nout > 1: y_out = K.reshape(y_pred[:, 0::nout], K.shape(y_true)) else: y_out = K.reshape(y_pred[:, 0], K.shape(y_true)) return mean_squared_error(y_true, y_out) metric.__name__ = 'mse_heteroscedastic' return metric
[docs]def meanS_heteroscedastic_metric(nout): """This function computes the mean log of the variance (log S) for the heteroscedastic model. The mean log is computed over the standard deviation prediction and the mean prediction is not taken into account. Parameters ---------- nout : int Number of outputs without uq augmentation """ def metric(y_true, y_pred): """ Parameters ---------- y_true : Keras tensor Keras tensor including the ground truth y_pred : Keras tensor Keras tensor including the predictions of a heteroscedastic model. The predictions follow the order: (mean_0, S_0, mean_1, S_1, ...) with S_i the log of the variance for the ith output. """ if nout > 1: log_sig2 = y_pred[:, 1::nout] else: log_sig2 = y_pred[:, 1] return K.mean(log_sig2) metric.__name__ = 'meanS_heteroscedastic' return metric
[docs]def heteroscedastic_loss(nout): """This function computes the heteroscedastic loss for the heteroscedastic model. Both mean and standard deviation predictions are taken into account. Parameters ---------- nout : int Number of outputs without uq augmentation """ def loss(y_true, y_pred): """This function computes the heteroscedastic loss. Parameters ---------- y_true : Keras tensor Keras tensor including the ground truth y_pred : Keras tensor Keras tensor including the predictions of a heteroscedastic model. The predictions follow the order: (mean_0, S_0, mean_1, S_1, ...) with S_i the log of the variance for the ith output. """ y_shape = K.shape(y_true) if nout > 1: y_out = K.reshape(y_pred[:, 0::nout], y_shape) log_sig2 = K.reshape(y_pred[:, 1::nout], y_shape) else: y_out = K.reshape(y_pred[:, 0], y_shape) log_sig2 = K.reshape(y_pred[:, 1], y_shape) diff_sq = K.square(y_out - y_true) return K.mean(K.exp(-log_sig2) * diff_sq + log_sig2) # Return a function return loss
[docs]def quantile_loss(quantile, y_true, y_pred): """This function computes the quantile loss for a given quantile fraction. Parameters ---------- quantile : float in (0, 1) Quantile fraction to compute the loss. y_true : Keras tensor Keras tensor including the ground truth y_pred : Keras tensor Keras tensor including the predictions of a quantile model. """ error = (y_true - y_pred) return K.mean(K.maximum(quantile * error, (quantile - 1) * error))
[docs]def triple_quantile_loss(nout, lowquantile, highquantile): """This function computes the quantile loss for the median and low and high quantiles. The median is given twice the weight of the other components. Parameters ---------- nout : int Number of outputs without uq augmentation lowquantile: float in (0, 1) Fraction corresponding to the low quantile highquantile: float in (0, 1) Fraction corresponding to the high quantile """ def loss(y_true, y_pred): """This function computes the quantile loss, considering the median and low and high quantiles. Parameters ---------- y_true : Keras tensor Keras tensor including the ground truth y_pred : Keras tensor Keras tensor including the predictions of a heteroscedastic model. The predictions follow the order: (q50_0, qlow_0, qhigh_0, q50_1, qlow_1, qhigh_1, ...) with q50_i the median of the ith output and qlow_i and qhigh_i the low and high specified quantiles of the ith output. """ y_shape = K.shape(y_true) if nout > 1: y_out0 = K.reshape(y_pred[:, 0::nout], y_shape) y_out1 = K.reshape(y_pred[:, 1::nout], y_shape) y_out2 = K.reshape(y_pred[:, 2::nout], y_shape) else: y_out0 = K.reshape(y_pred[:, 0], y_shape) y_out1 = K.reshape(y_pred[:, 1], y_shape) y_out2 = K.reshape(y_pred[:, 2], y_shape) return quantile_loss(lowquantile, y_true, y_out1) + quantile_loss(highquantile, y_true, y_out2) + 2. * quantile_loss(0.5, y_true, y_out0) return loss
[docs]def quantile_metric(nout, index, quantile): """This function computes the quantile metric for a given quantile and corresponding output index. This is provided as a metric to track evolution while training. Parameters ---------- nout : int Number of outputs without uq augmentation index : int Index of output corresponding to the given quantile. quantile: float in (0, 1) Fraction corresponding to the quantile """ def metric(y_true, y_pred): """ Parameters ---------- y_true : Keras tensor Keras tensor including the ground truth y_pred : Keras tensor Keras tensor including the predictions of a quantile model. The predictions follow the order: (q50_0, qlow_0, qhigh_0, q50_1, qlow_1, qhigh_1, ...) with q50_i the median of the ith output and qlow_i and qhigh_i the low and high specified quantiles of the ith output. """ y_shape = K.shape(y_true) if nout > 1: y_out = K.reshape(y_pred[:, index::nout], y_shape) else: y_out = K.reshape(y_pred[:, index], y_shape) return quantile_loss(quantile, y_true, y_out) metric.__name__ = 'quantile_{}'.format(quantile) return metric
################################################################### # For the Contamination Model
[docs]def add_index_to_output(y_train): """ This function adds a column to the training output to store the indices of the corresponding samples in the training set. Parameters ---------- y_train : ndarray Numpy array of the output in the training set """ # Add indices to y y_train_index = range(y_train.shape[0]) y_train_augmented = np.vstack([y_train, y_train_index]).T return y_train_augmented
[docs]def contamination_loss(nout, T_k, a, sigmaSQ, gammaSQ): """ Function to compute contamination loss. It is composed by two terms: (i) the loss with respect to the normal distribution that models the distribution of the training data samples, (ii) the loss with respect to the Cauchy distribution that models the distribution of the outlier samples. Note that the evaluation of this contamination loss function does not make sense for any data different to the training set. This is because latent variables are only defined for samples in the training set. Parameters ---------- nout : int Number of outputs without uq augmentation (in the contamination model the augmentation corresponds to the data index in training). T_k : Keras tensor Tensor containing latent variables (probability of membership to normal and Cauchy distributions) for each of the samples in the training set. (Validation data is usually augmented too to be able to run training with validation set, however loss in validation should not be used as a criterion for early stopping training since the latent variables are defined for the training only, and thus, are not valid when used in combination with data different from training). a : Keras variable Probability of belonging to the normal distribution sigmaSQ : Keras variable Variance estimated for the normal distribution gammaSQ : Keras variable Scale estimated for the Cauchy distribution """ def loss(y_true, y_pred): """ Parameters ---------- y_true : keras tensor True values to predict. It is assumed that this keras tensor includes extra columns to store the index of the data sample in the training set. y_pred : keras tensor Prediction made by the model. """ y_shape = K.shape(y_pred) y_true_ = K.reshape(y_true[:, :-1], y_shape) if nout > 1: diff_sq = K.sum(K.square(y_true_ - y_pred), axis=-1) else: diff_sq = K.square(y_true_ - y_pred) term_normal = diff_sq / (2. * sigmaSQ) + 0.5 * K.log(sigmaSQ) + 0.5 * K.log(2. * np.pi) - K.log(a) term_cauchy = K.log(1. + diff_sq / gammaSQ) + 0.5 * K.log(piSQ * gammaSQ) - K.log(1. - a) batch_index = K.cast(y_true[:, -1], 'int64') T_0_red = K.gather(T_k[:, 0], batch_index) T_1_red = K.gather(T_k[:, 1], batch_index) return K.sum(T_0_red * term_normal + T_1_red * term_cauchy) return loss
[docs]class Contamination_Callback(Callback): """ This callback is used to update the parameters of the contamination model. This functionality follows the EM algorithm: in the E-step latent variables are updated and in the M-step global variables are updated. The global variables correspond to 'a' (probability of membership to normal class), 'sigmaSQ' (variance of normal class) and 'gammaSQ' (scale of Cauchy class, modeling outliers). The latent variables correspond to 'T_k' (the first column corresponds to the probability of membership to the normal distribution, while the second column corresponds to the probability of membership to the Cauchy distribution i.e. outlier). """ def __init__(self, x, y, a_max=0.99): """ Initializer of the Contamination_Callback. Parameters ---------- x : ndarray Array of samples (= input features) in training set. y : ndarray Array of sample outputs in training set. a_max : float Maximum value of a variable to allow """ super(Contamination_Callback, self).__init__() self.x = x # Features of training set self.y = y # Output of training set self.a_max = a_max # Set maximum a value to allow self.sigmaSQ = K.variable(value=0.01) # Standard devation of normal distribution for error self.gammaSQ = K.variable(value=0.01) # Scale of Cauchy distribution for error # Parameter Initialization - Conditional distribution of the latent variables if isinstance(x, list): self.T = np.zeros((x[0].shape[0], 2)) else: self.T = np.zeros((self.x.shape[0], 2)) self.T[:, 0] = np.random.uniform(size=self.T.shape[0]) self.T[:, 1] = 1. - self.T[:, 0] self.T_k = K.variable(value=self.T) self.a = K.variable(value=np.mean(self.T[:, 0])) # Probability of membership to normal distribution self.avalues = [] # array to store a evolution self.sigmaSQvalues = [] # array to store sigmaSQ evolution self.gammaSQvalues = [] # array to store gammaSQ evolution
[docs] def on_epoch_end(self, epoch, logs={}): """ Updates the parameters of the distributions in the contamination model on epoch end. The parameters updated are: 'a' for the global weight of the membership to the normal distribution, 'sigmaSQ' for the variance of the normal distribution and 'gammaSQ' for the scale of the Cauchy distribution of outliers. The latent variables are updated as well: 'T_k' describing in the first column the probability of membership to normal distribution and in the second column probability of membership to the Cauchy distribution i.e. outlier. Stores evolution of global parameters (a, sigmaSQ and gammaSQ). Parameters ---------- epoch : integer Current epoch in training. logs : keras logs Metrics stored during current keras training. """ y_pred = self.model.predict(self.x) error = self.y.squeeze() - y_pred.squeeze() # Update parameters (M-Step) errorSQ = error**2 aux = np.mean(self.T[:, 0]) if aux > self.a_max: aux = self.a_max K.set_value(self.a, aux) K.set_value(self.sigmaSQ, np.sum(self.T[:, 0] * errorSQ) / np.sum(self.T[:, 0])) # Gradient descent gmSQ_eval = K.get_value(self.gammaSQ) grad_gmSQ = (0.5 * np.sum(self.T[:, 1]) - np.sum(self.T[:, 1] * errorSQ / (gmSQ_eval + errorSQ))) / gmSQ_eval # Guarantee positivity in update eta = K.get_value(self.model.optimizer.lr) new_gmSQ = gmSQ_eval - eta * grad_gmSQ while new_gmSQ < 0 or (new_gmSQ / gmSQ_eval) > 1000: eta /= 2 new_gmSQ = gmSQ_eval - eta * grad_gmSQ K.set_value(self.gammaSQ, new_gmSQ) # Update conditional distribution of latent variables (beginning of E-Step) a_eval = K.get_value(self.a) sigmaSQ_eval = K.get_value(self.sigmaSQ) gammaSQ_eval = K.get_value(self.gammaSQ) print('a: %f, sigmaSQ: %f, gammaSQ: %f' % (a_eval, sigmaSQ_eval, gammaSQ_eval)) norm_eval = norm.pdf(error, loc=0, scale=np.sqrt(sigmaSQ_eval)) cauchy_eval = cauchy.pdf(error, loc=0, scale=np.sqrt(gammaSQ_eval)) denominator = a_eval * norm_eval + (1. - a_eval) * cauchy_eval self.T[:, 0] = a_eval * norm_eval / denominator self.T[:, 1] = (1. - a_eval) * cauchy_eval / denominator K.set_value(self.T_k, self.T) # store evolution of global variables self.avalues.append(a_eval) self.sigmaSQvalues.append(sigmaSQ_eval) self.gammaSQvalues.append(gammaSQ_eval)
[docs]def mse_contamination_metric(nout): """This function computes the mean squared error (mse) for the contamination model. The mse is computed over the prediction. Therefore, the augmentation for the index variable is ignored. Parameters ---------- nout : int Number of outputs without uq augmentation (in the contamination model the augmentation corresponds to the data index in training). """ def metric(y_true, y_pred): """ Parameters ---------- y_true : Keras tensor Keras tensor including the ground truth. Since the keras tensor includes an extra column to store the index of the data sample in the training set this column is ignored. y_pred : Keras tensor Keras tensor with the predictions of the contamination model (no data index). """ return mean_squared_error(y_true[:, :nout], y_pred[:, :nout]) metric.__name__ = 'mse_contamination' return metric
[docs]def mae_contamination_metric(nout): """This function computes the mean absolute error (mae) for the contamination model. The mae is computed over the prediction. Therefore, the augmentation for the index variable is ignored. Parameters ---------- nout : int Number of outputs without uq augmentation (in the contamination model the augmentation corresponds to the data index in training). """ def metric(y_true, y_pred): """ Parameters ---------- y_true : Keras tensor Keras tensor including the ground truth. Since the keras tensor includes an extra column to store the index of the data sample in the training set this column is ignored. y_pred : Keras tensor Keras tensor with the predictions of the contamination model (no data index). """ return mean_absolute_error(y_true[:, :nout], y_pred[:, :nout]) metric.__name__ = 'mae_contamination' return metric
[docs]def r2_contamination_metric(nout): """This function computes the r2 for the contamination model. The r2 is computed over the prediction. Therefore, the augmentation for the index variable is ignored. Parameters ---------- nout : int Number of outputs without uq augmentation (in the contamination model the augmentation corresponds to the data index in training). """ def metric(y_true, y_pred): """ Parameters ---------- y_true : Keras tensor Keras tensor including the ground truth. Since the keras tensor includes an extra column to store the index of the data sample in the training set this column is ignored. y_pred : Keras tensor Keras tensor with the predictions of the contamination model (no data index). """ # if nout > 1: # y_true_ = K.reshape(y_true[:, :-1], K.shape(y_pred)) # else: # y_true_ = K.reshape(y_true[:, 0], K.shape(y_pred)) y_true_ = K.reshape(y_true[:, :-1], K.shape(y_pred)) SS_res = K.sum(K.square(y_true_ - y_pred)) SS_tot = K.sum(K.square(y_true_ - K.mean(y_true_))) return (1. - SS_res / (SS_tot + K.epsilon())) metric.__name__ = 'r2_contamination' return metric