from __future__ import absolute_import
from typing import List, Optional, Tuple, Type, Union
import numpy as np
from scipy.stats import cauchy, norm
from tensorflow.keras import backend as K
from tensorflow.keras import layers
from tensorflow.keras.callbacks import Callback
from tensorflow.keras.layers import Dense
from tensorflow.keras.metrics import mean_absolute_error, mean_squared_error
from tensorflow.keras.models import Model
from tensorflow.keras.utils import to_categorical
Array = Type[np.ndarray]
piSQ = np.pi**2
###################################################################
# For Abstention Model
[docs]def abstention_loss(alpha, mask: Array):
"""
Function to compute abstention loss. It is composed by two terms: (i)
original loss of the multiclass classification problem, (ii) cost
associated to the abstaining samples.
:param alpha: Keras variable. Weight of abstention term in cost function
:param ndarray mask: Numpy array to use as mask for abstention: \
it is 1 on the output associated to the abstention class and 0 otherwise
"""
def loss(y_true, y_pred):
"""
:param y_true: keras tensor. True values to predict
:param y_pred: keras tensor. Prediction made by the model. \
It is assumed that this keras tensor includes extra columns to store the abstaining classes.
"""
base_pred = (1 - mask) * y_pred + K.epsilon()
base_true = y_true
base_cost = K.categorical_crossentropy(base_true, base_pred)
# abs_pred = K.mean(mask * y_pred, axis=-1)
abs_pred = K.sum(mask * y_pred, axis=-1)
# add some small value to prevent NaN when prediction is abstained
abs_pred = K.clip(abs_pred, K.epsilon(), 1.0 - K.epsilon())
return (1.0 - abs_pred) * base_cost - alpha * K.log(1.0 - abs_pred)
# return K.mean((1. - abs_pred) * base_cost - alpha * K.log(1. - abs_pred), axis = -1)
loss.__name__ = "abs_crossentropy"
return loss
[docs]def sparse_abstention_loss(alpha, mask: Array):
"""
Function to compute abstention loss. It is composed by two terms: (i)
original loss of the multiclass classification problem, (ii) cost
associated to the abstaining samples. Assumes y_true is not one-hot
encoded.
:param alpha: Keras variable. Weight of abstention term in cost function
:param ndarray mask: Numpy array to use as mask for abstention: it is 1 on the output associated \
to the abstention class and 0 otherwise
"""
def loss(y_true, y_pred):
"""
:param y_true: keras tensor. True values to predict
:param y_pred: keras tensor. Prediction made by the model. \
It is assumed that this keras tensor includes extra columns to store the abstaining classes.
"""
base_pred = (1 - mask) * y_pred + K.epsilon()
base_true = y_true
base_cost = K.sparse_categorical_crossentropy(base_true, base_pred)
# abs_pred = K.mean(mask * y_pred, axis=-1)
abs_pred = K.sum(mask * y_pred, axis=-1)
# add some small value to prevent NaN when prediction is abstained
abs_pred = K.clip(abs_pred, K.epsilon(), 1.0 - K.epsilon())
return (1.0 - abs_pred) * base_cost - alpha * K.log(1.0 - abs_pred)
# return K.mean((1. - abs_pred) * base_cost - alpha * K.log(1. - abs_pred), axis = -1)
loss.__name__ = "sparse_abs_crossentropy"
return loss
[docs]def abstention_acc_metric(nb_classes: Union[int, Array]):
"""
Abstained accuracy: Function to estimate accuracy over the predicted
samples after removing the samples where the model is abstaining.
:param int or ndarray nb_classes: Integer or numpy array defining indices of the abstention class
"""
def metric(y_true, y_pred):
"""
:param y_true: keras tensor. True values to predict
:param y_pred: keras tensor. Prediction made by the model.\
It is assumed that this keras tensor includes extra columns to store the abstaining classes.
"""
# matching in original classes
true_pred = K.sum(
K.cast(
K.equal(K.argmax(y_true, axis=-1), K.argmax(y_pred, axis=-1)), "int64"
)
)
# total abstention
total_abs = K.sum(
K.cast(K.equal(K.argmax(y_pred, axis=-1), nb_classes), "int64")
)
# total predicted in original classes
total_pred = K.sum(
K.cast(
K.equal(K.argmax(y_pred, axis=-1), K.argmax(y_pred, axis=-1)), "int64"
)
)
# guard against divide by zero
condition = K.greater(total_pred, total_abs)
abs_acc = K.switch(
condition, true_pred / (total_pred - total_abs), total_pred / total_pred
)
return abs_acc
metric.__name__ = "abstention_acc"
return metric
[docs]def sparse_abstention_acc_metric(nb_classes: Union[int, Array]):
"""
Abstained accuracy: Function to estimate accuracy over the predicted
samples after removing the samples where the model is abstaining. Assumes
y_true is not one-hot encoded.
:param int or ndarray nb_classes: Integer or numpy array defining indices of the abstention class
"""
def metric(y_true, y_pred):
"""
:param y_true: keras tensor. True values to predict
:param y_pred: keras tensor. Prediction made by the model. \
It is assumed that this keras tensor includes extra columns to store the abstaining classes.
"""
# matching in original classes
y_pred_index = K.argmax(y_pred, axis=-1)
y_true_index = K.cast(K.max(y_true, axis=-1), "int64")
true_pred = K.sum(K.cast(K.equal(y_true_index, y_pred_index), "int64"))
# total abstention
total_abs = K.sum(
K.cast(K.equal(K.argmax(y_pred, axis=-1), nb_classes), "int64")
)
# total predicted in original classes
total_pred = K.sum(
K.cast(
K.equal(K.argmax(y_pred, axis=-1), K.argmax(y_pred, axis=-1)), "int64"
)
)
# guard against divide by zero
condition = K.greater(total_pred, total_abs)
abs_acc = K.switch(
condition, true_pred / (total_pred - total_abs), total_pred / total_pred
)
return abs_acc
metric.__name__ = "sparse_abstention_acc"
return metric
[docs]def abstention_metric(nb_classes: Union[int, Array]):
"""
Function to estimate fraction of the samples where the model is
abstaining.
:param int or ndarray nb_classes: Integer or numpy array defining indices of the abstention class
"""
def metric(y_true, y_pred):
"""
:param y_true: keras tensor. True values to predict
:param y_pred: keras tensor. Prediction made by the model. \
It is assumed that this keras tensor includes extra columns to store the abstaining classes.
"""
# total abstention
total_abs = K.sum(
K.cast(K.equal(K.argmax(y_pred, axis=-1), nb_classes), "int64")
)
# total predicted in original classes
total_pred = K.sum(
K.cast(
K.equal(K.argmax(y_pred, axis=-1), K.argmax(y_pred, axis=-1)), "int64"
)
)
return total_abs / total_pred
metric.__name__ = "abstention"
return metric
[docs]def acc_class_i_metric(class_i: int):
"""
Function to estimate accuracy over the ith class prediction. This
estimation is global (i.e. abstaining samples are not removed)
:param int class_i: Index of the class to estimate accuracy
"""
def metric(y_true, y_pred):
"""
:param y_true: keras tensor. True values to predict
:param y_pred: keras tensor. Prediction made by the model. \
It is assumed that this keras tensor includes extra columns to store the abstaining classes.
"""
# Find locations in ground truth belonging to class i
ytrueint = K.cast(K.equal(K.argmax(y_true, axis=-1), class_i), "int64")
# Compute total number of ground truth samples in class i
total_true_i = K.sum(ytrueint)
# Find samples in prediction belonging to class i (not accounting for abstention)
ypredint = K.cast(K.equal(K.argmax(y_pred[:, :-1], axis=-1), class_i), "int64")
# Find correctly predicted class i samples
true_i_pred = K.sum(ytrueint * ypredint)
# Compute accuracy in class i
acc = true_i_pred / total_true_i
# Since there could be few samples in class i
# it is possible that ground truth does not
# have any sample in class i, leading to a divide
# by zero and not valid accuracy
# Therefore, for the accuracy to be valid
# total_true_i should be greater than zero
# otherwise, return 0.
condition = K.greater(total_true_i, 0)
return K.switch(condition, acc, K.zeros_like(acc, dtype=acc.dtype))
metric.__name__ = "acc_class_{}".format(class_i)
return metric
[docs]def abstention_acc_class_i_metric(nb_classes: Union[int, Array], class_i: int):
"""
Function to estimate accuracy over the class i prediction after removing
the samples where the model is abstaining.
:param int or ndarray nb_classes: Integer or numpy array defining indices of the abstention class
:param int class_i: Index of the class to estimate accuracy after removing abstention samples
"""
def metric(y_true, y_pred):
"""
:param y_true: keras tensor. True values to predict
:param y_pred: keras tensor. Prediction made by the model. \
It is assumed that this keras tensor includes extra columns to store the abstaining classes.
"""
# Find locations in ground truth belonging to class i
ytrueint = K.cast(K.equal(K.argmax(y_true, axis=-1), class_i), "int64")
# Find locations that are predicted (not abstained)
mask_pred = K.cast(K.not_equal(K.argmax(y_pred, axis=-1), nb_classes), "int64")
# Compute total number of ground truth samples in class i filtering abstaining predictions
total_true_i = K.sum(ytrueint * mask_pred)
# matching in original class i after removing abstention
true_i_pred = K.sum(
mask_pred
* ytrueint
* K.cast(
K.equal(K.argmax(y_true, axis=-1), K.argmax(y_pred, axis=-1)), "int64"
)
)
# Compute accuracy in class i
acc = true_i_pred / total_true_i
# Since there could few samples in class i
# it is possible that ground truth does not
# have any sample in class i, leading to a divide
# by zero and not valid accuracy
# Therefore, for the accuracy to be valid
# total_true_i should be greater than zero
# otherwise, return 0.
condition = K.greater(total_true_i, 0)
return K.switch(condition, acc, K.zeros_like(acc, dtype=acc.dtype))
metric.__name__ = "abstention_acc_class_{}".format(class_i)
return metric
[docs]def abstention_class_i_metric(nb_classes: Union[int, Array], class_i: int):
"""
Function to estimate fraction of the samples where the model is
abstaining in class i.
:param int or ndarray nb_classes: Integer or numpy array defining indices of the abstention class
:param int class_i: Index of the class to estimate accuracy
"""
def metric(y_true, y_pred):
"""
:param y_true: keras tensor. True values to predict
:param y_pred: keras tensor. Prediction made by the model. \
It is assumed that this keras tensor includes extra columns to store the abstaining classes.
"""
# Find locations in ground truth belonging to class i
ytrue_i_int = K.cast(K.equal(K.argmax(y_true, axis=-1), class_i), "int64")
# total in class i
total_class_i = K.sum(ytrue_i_int)
# Abstention samples
y_abs = K.cast(K.equal(K.argmax(y_pred, axis=-1), nb_classes), "int64")
# Abstention in class_i
total_abs_i = K.sum(ytrue_i_int * y_abs)
abs_i = total_abs_i / total_class_i
condition = K.greater(total_class_i, 0)
return K.switch(condition, abs_i, K.zeros_like(abs_i, dtype=abs_i.dtype))
metric.__name__ = "abstention_class_{}".format(class_i)
return metric
[docs]class AbstentionAdapt_Callback(Callback):
"""
This callback is used to adapt the parameter alpha in the abstention
loss.
The parameter alpha (weight of the abstention term in the abstention
loss) is increased or decreased adaptively during the training run.
It is decreased if the current abstention accuracy is less than the
minimum accuracy set or increased if the current abstention fraction
is greater than the maximum fraction set. The abstention accuracy
metric to use must be specified as the 'acc_monitor' argument in the
initialization of the callback. It could be: the global abstention
accuracy (abstention_acc), the abstention accuracy over the ith
class (acc_class_i), etc. The abstention metric to use must be
specified as the 'abs_monitor' argument in the initialization of the
callback. It should be the metric that computes the fraction of
samples for which the model is abstaining (abstention). The factor
alpha is modified if the current abstention accuracy is less than
the minimum accuracy set or if the current abstention fraction is
greater than the maximum fraction set. Thresholds for minimum and
maximum correction factors are computed and the correction over
alpha is not allowed to be less or greater than them, respectively,
to avoid huge swings in the abstention loss evolution.
"""
def __init__(
self,
acc_monitor,
abs_monitor,
alpha0: float,
init_abs_epoch: int = 4,
alpha_scale_factor: float = 0.8,
min_abs_acc: float = 0.9,
max_abs_frac: float = 0.4,
acc_gain: float = 5.0,
abs_gain: float = 1.0,
):
"""
Initializer of the AbstentionAdapt_Callback.
:param keras.metric acc_monitor: Accuracy metric to monitor during the run and use \
as base to adapt the weight of the abstention term (i.e. alpha) in the abstention \
cost function. (Must be an accuracy metric that takes abstention into account).
:param keras.metric abs_monitor: Abstention metric monitored during the run and \
used as the other factor to adapt the weight of the abstention term (i.e. alpha) in the asbstention loss function
:param float alpha0: Initial weight of abstention term in cost function
:param int init_abs_epoch: Value of the epochs to start adjusting the weight of the abstention \
term (i.e. alpha). Default: 4.
:param float alpha_scale_factor: Factor to scale (increase by dividing or decrease by multiplying) \
the weight of the abstention term (i.e. alpha). Default: 0.8.
:param float min_abs_acc: Minimum accuracy to target in the current training. Default: 0.9.
:param float max_abs_frac: Maximum abstention fraction to tolerate in the current training. Default: 0.4.
:param float acc_gain: Factor to adjust alpha scale. Default: 5.0.
:param float abs_gain: Factor to adjust alpha scale. Default: 1.0.
"""
super(AbstentionAdapt_Callback, self).__init__()
self.acc_monitor = (
acc_monitor # Keras metric to monitor (must be an accuracy with abstention)
)
self.abs_monitor = abs_monitor # Keras metric momitoring abstention fraction
self.alpha = K.variable(value=alpha0) # Weight of abstention term
self.init_abs_epoch = init_abs_epoch # epoch to init abstention
self.alpha_scale_factor = alpha_scale_factor # factor to scale alpha (weight for abstention term in cost function)
self.min_abs_acc = min_abs_acc # minimum target accuracy (value specified as parameter of the run)
self.max_abs_frac = max_abs_frac # maximum abstention fraction (value specified as parameter of the run)
self.acc_gain = acc_gain # factor for adjusting alpha scale
self.abs_gain = abs_gain # factor for adjusting alpha scale
self.alphavalues: List[float] = [] # array to store alpha evolution
[docs] def on_epoch_end(self, epoch: int, logs=None):
"""
Updates the weight of abstention term on epoch end.
:param int epoch: Current epoch in training.
:param logs: Metrics stored during current keras training.
"""
new_alpha_val = K.get_value(self.alpha)
if epoch > self.init_abs_epoch:
if self.acc_monitor is None or self.abs_monitor is None:
raise Exception(
"ERROR! Abstention Adapt conditioned on metrics "
+ str(self.acc_monitor)
+ " and "
+ str(self.abs_monitor)
+ " which are not available. Available metrics are: "
+ ",".join(list(logs.keys()))
+ "... Exiting"
)
else:
# Current accuracy (with abstention)
abs_acc = logs.get(self.acc_monitor)
# Current abstention fraction
abs_frac = logs.get(self.abs_monitor)
if abs_acc is None or abs_frac is None:
raise Exception(
"ERROR! Abstention Adapt conditioned on metrics "
+ str(self.acc_monitor)
+ " and "
+ str(self.abs_monitor)
+ " which are not available. Available metrics are: "
+ ",".join(list(logs.keys()))
+ "... Exiting"
)
# modify alpha as needed
acc_error = abs_acc - self.min_abs_acc
acc_error = min(acc_error, 0.0)
abs_error = abs_frac - self.max_abs_frac
abs_error = max(abs_error, 0.0)
new_scale = 1.0 + self.acc_gain * acc_error + self.abs_gain * abs_error
# threshold to avoid huge swings
min_scale = self.alpha_scale_factor
max_scale = 1.0 / self.alpha_scale_factor
new_scale = min(new_scale, max_scale)
new_scale = max(new_scale, min_scale)
# print('Scaling factor: ', new_scale)
new_alpha_val *= new_scale
K.set_value(self.alpha, new_alpha_val)
print("Scaling factor: ", new_scale, " new alpha, ", new_alpha_val)
self.alphavalues.append(new_alpha_val)
[docs]def modify_labels(
numclasses_out: int, ytrain: Array, ytest: Array, yval: Optional[Array] = None
) -> Tuple[Array, ...]:
"""
This function generates a categorical representation with a class added
for indicating abstention.
:param int numclasses_out: Original number of classes + 1 abstention class
:param ndarray ytrain: Numpy array of the classes (labels) in the training set
:param ndarray ytest: Numpy array of the classes (labels) in the testing set
:param ndarray yval: Numpy array of the classes (labels) in the validation set
"""
classestrain = np.max(ytrain) + 1
classestest = np.max(ytest) + 1
if yval is not None:
classesval = np.max(yval) + 1
assert classestrain == classestest
if yval is not None:
assert classesval == classestest
assert (
classestrain + 1
) == numclasses_out # In this case only one other slot for abstention is created
labels_train = to_categorical(ytrain, numclasses_out)
labels_test = to_categorical(ytest, numclasses_out)
if yval is not None:
labels_val = to_categorical(yval, numclasses_out)
# For sanity check
mask_vec = np.zeros(labels_train.shape)
mask_vec[:, -1] = 1
i = np.random.choice(range(labels_train.shape[0]))
sanity_check = mask_vec[i, :] * labels_train[i, :]
print(sanity_check.shape)
if ytrain.ndim > 1:
ll = ytrain.shape[1]
else:
ll = 0
for i in range(ll):
for j in range(numclasses_out):
if sanity_check[i, j] == 1:
print("Problem at ", i, j)
if yval is not None:
return labels_train, labels_test, labels_val
return labels_train, labels_test
###################################################################
[docs]def add_model_output(
modelIn,
mode: Optional[str] = None,
num_add: Optional[int] = None,
activation: Optional[str] = None,
):
"""
This function modifies the last dense layer in the passed keras model.
The modification includes adding units and optionally changing the
activation function.
:param modelIn: keras model. Keras model to be modified.
:param string mode: Mode to modify the layer. It could be:
- 'abstain' for adding an arbitrary number of units for the abstention optimization strategy.
- 'qtl' for quantile regression which needs the outputs to be tripled.
- 'het' for heteroscedastic regression which needs the outputs to be doubled.
:param int num_add: Number of units to add. This only applies to the 'abstain' mode.
:param string activation: String with keras specification of activation function (e.g. 'relu', 'sigomid', 'softmax', etc.)
:return: Keras model after last dense layer has been modified as specified. \
If there is no mode specified it returns the same model. \
If the mode is not one of 'abstain', 'qtl' or 'het' an exception is raised.
:rtype: Keras model
"""
if mode is None:
return modelIn
numlayers = len(modelIn.layers)
# Find last dense layer
i = -1
while "dense" not in (modelIn.layers[i].name) and ((i + numlayers) > 0):
i -= 1
# Minimal verification about the validity of the layer found
assert (i + numlayers) >= 0
assert "dense" in modelIn.layers[i].name
# Compute new output size
if mode == "abstain":
assert num_add is not None
new_output_size = modelIn.layers[i].output_shape[-1] + num_add
elif mode == "qtl": # for quantile UQ
new_output_size = 3 * modelIn.layers[i].output_shape[-1]
elif mode == "het": # for heteroscedastic UQ
new_output_size = 2 * modelIn.layers[i].output_shape[-1]
else:
raise Exception(
"ERROR ! Type of mode specified for adding outputs to the model: "
+ mode
+ " not implemented... Exiting"
)
# Recover current layer options
config = modelIn.layers[i].get_config()
# Update number of units
config["units"] = new_output_size
# Update activation function if requested
if activation is not None:
config["activation"] = activation
# Bias initialization seems to help het and qtl
if mode == "het" or mode == "qtl":
config["bias_initializer"] = "ones"
# Create new Dense layer
reconstructed_layer = Dense.from_config(config)
# Connect new Dense last layer to previous one-before-last layer
additional = reconstructed_layer(modelIn.layers[i - 1].output)
# If the layer to replace is not the last layer, add the remainder layers
if i < -1:
for j in range(i + 1, 0):
config_j = modelIn.layers[j].get_config()
aux_j = layers.deserialize(
{"class_name": modelIn.layers[j].__class__.__name__, "config": config_j}
)
reconstructed_layer = aux_j.from_config(config_j)
additional = reconstructed_layer(additional)
modelOut = Model(modelIn.input, additional)
return modelOut
###################################################################
# UQ regression - utilities
[docs]def r2_heteroscedastic_metric(nout: int):
"""
This function computes the r2 for the heteroscedastic model. The r2 is
computed over the prediction of the mean and the standard deviation
prediction is not taken into account.
:param int nout: Number of outputs without uq augmentation
"""
def metric(y_true, y_pred):
"""
:param y_true: Keras tensor. Keras tensor including the ground truth
:param y_pred: Keras tensor. Keras tensor including the predictions of a heteroscedastic model. \
The predictions follow the order: (mean_0, S_0, mean_1, S_1, ...) with S_i the log of the variance for the ith output.
"""
if nout > 1:
y_out = K.reshape(y_pred[:, 0::2], K.shape(y_true))
else:
y_out = K.reshape(y_pred[:, 0], K.shape(y_true))
SS_res = K.sum(K.square(y_true - y_out))
SS_tot = K.sum(K.square(y_true - K.mean(y_true)))
return 1.0 - SS_res / (SS_tot + K.epsilon())
metric.__name__ = "r2_heteroscedastic"
return metric
[docs]def mae_heteroscedastic_metric(nout: int):
"""
This function computes the mean absolute error (mae) for the
heteroscedastic model. The mae is computed over the prediction of the mean
and the standard deviation prediction is not taken into account.
:param int nout: Number of outputs without uq augmentation
"""
def metric(y_true, y_pred):
"""
:param y_true: Keras tensor. Keras tensor including the ground truth
:param y_pred: Keras tensor. Keras tensor including the predictions of a heteroscedastic model. \
The predictions follow the order: (mean_0, S_0, mean_1, S_1, ...) with S_i the log of the variance for the ith output.
"""
if nout > 1:
y_out = K.reshape(y_pred[:, 0::2], K.shape(y_true))
else:
y_out = K.reshape(y_pred[:, 0], K.shape(y_true))
return mean_absolute_error(y_true, y_out)
metric.__name__ = "mae_heteroscedastic"
return metric
[docs]def mse_heteroscedastic_metric(nout: int):
"""
This function computes the mean squared error (mse) for the
heteroscedastic model. The mse is computed over the prediction of the mean
and the standard deviation prediction is not taken into account.
:param int nout: Number of outputs without uq augmentation
"""
def metric(y_true, y_pred):
"""
:param y_true: Keras tensor. Keras tensor including the ground truth
:param y_pred: Keras tensor. Keras tensor including the predictions of a heteroscedastic model. \
The predictions follow the order: (mean_0, S_0, mean_1, S_1, ...) with S_i the log of the variance for the ith output.
"""
if nout > 1:
y_out = K.reshape(y_pred[:, 0::2], K.shape(y_true))
else:
y_out = K.reshape(y_pred[:, 0], K.shape(y_true))
return mean_squared_error(y_true, y_out)
metric.__name__ = "mse_heteroscedastic"
return metric
[docs]def meanS_heteroscedastic_metric(nout: int):
"""
This function computes the mean log of the variance (log S) for the
heteroscedastic model. The mean log is computed over the standard deviation
prediction and the mean prediction is not taken into account.
:param int nout: Number of outputs without uq augmentation
"""
def metric(y_true, y_pred):
"""
:param y_true: Keras tensor. Keras tensor including the ground truth
:param y_pred: Keras tensor. Keras tensor including the predictions of a heteroscedastic model. \
The predictions follow the order: (mean_0, S_0, mean_1, S_1, ...) with S_i the log of the variance for the ith output.
"""
if nout > 1:
log_sig2 = y_pred[:, 1::2]
else:
log_sig2 = y_pred[:, 1]
return K.mean(log_sig2)
metric.__name__ = "meanS_heteroscedastic"
return metric
[docs]def heteroscedastic_loss(nout: int):
"""
This function computes the heteroscedastic loss for the heteroscedastic
model. Both mean and standard deviation predictions are taken into account.
:param int nout: Number of outputs without uq augmentation
"""
def loss(y_true, y_pred):
"""
This function computes the heteroscedastic loss.
:param y_true: Keras tensor. Keras tensor including the ground truth
:param y_pred: Keras tensor. Keras tensor including the predictions of a heteroscedastic model. \
The predictions follow the order: (mean_0, S_0, mean_1, S_1, ...) with S_i the log of the variance for the ith output.
"""
y_shape = K.shape(y_true)
if nout > 1:
y_out = K.reshape(y_pred[:, 0::2], y_shape)
log_sig2 = K.reshape(y_pred[:, 1::nout], y_shape)
else:
y_out = K.reshape(y_pred[:, 0], y_shape)
log_sig2 = K.reshape(y_pred[:, 1], y_shape)
diff_sq = K.square(y_out - y_true)
return K.mean(K.exp(-log_sig2) * diff_sq + log_sig2)
# Return a function
return loss
[docs]def quantile_loss(quantile: float, y_true, y_pred):
"""
This function computes the quantile loss for a given quantile fraction.
:param quantile: float in (0, 1). Quantile fraction to compute the loss.
:param y_true: Keras tensor. Keras tensor including the ground truth
:param y_pred: Keras tensor. Keras tensor including the predictions of a quantile model.
"""
error = y_true - y_pred
return K.mean(K.maximum(quantile * error, (quantile - 1) * error))
[docs]def triple_quantile_loss(nout: int, lowquantile: float, highquantile: float):
"""
This function computes the quantile loss for the median and low and high
quantiles. The median is given twice the weight of the other components.
:param int nout: Number of outputs without uq augmentation
:param lowquantile: float in (0, 1). Fraction corresponding to the low quantile
:param highquantile: float in (0, 1). Fraction corresponding to the high quantile
"""
def loss(y_true, y_pred):
"""
This function computes the quantile loss, considering the median and
low and high quantiles.
:param y_true: Keras tensor. Keras tensor including the ground truth
:param y_pred: Keras tensor. Keras tensor including the predictions of a heteroscedastic model. \
The predictions follow the order: (q50_0, qlow_0, qhigh_0, q50_1, qlow_1, qhigh_1, ...) \
with q50_i the median of the ith output and qlow_i and qhigh_i the low and high specified \
quantiles of the ith output.
"""
y_shape = K.shape(y_true)
if nout > 1:
y_qtl0 = K.reshape(y_pred[:, 0::3], y_shape)
y_qtl1 = K.reshape(y_pred[:, 1::3], y_shape)
y_qtl2 = K.reshape(y_pred[:, 2::3], y_shape)
else:
y_qtl0 = K.reshape(y_pred[:, 0], y_shape)
y_qtl1 = K.reshape(y_pred[:, 1], y_shape)
y_qtl2 = K.reshape(y_pred[:, 2], y_shape)
return (
quantile_loss(lowquantile, y_true, y_qtl1)
+ quantile_loss(highquantile, y_true, y_qtl2)
+ 2.0 * quantile_loss(0.5, y_true, y_qtl0)
)
return loss
[docs]def quantile_metric(nout: int, index: int, quantile: float):
"""
This function computes the quantile metric for a given quantile and
corresponding output index. This is provided as a metric to track evolution
while training.
:param int nout: Number of outputs without uq augmentation
:param int index: Index of output corresponding to the given quantile.
:param quantile: float in (0, 1). Fraction corresponding to the quantile
"""
def metric(y_true, y_pred):
"""
:param y_true: Keras tensor. Keras tensor including the ground truth
:param y_pred: Keras tensor. Keras tensor including the predictions of a quantile model. \
The predictions follow the order: (q50_0, qlow_0, qhigh_0, q50_1, qlow_1, qhigh_1, ...) \
with q50_i the median of the ith output and qlow_i and qhigh_i the low and high specified \
quantiles of the ith output.
"""
y_shape = K.shape(y_true)
if nout > 1:
y_qtl = K.reshape(y_pred[:, index::3], y_shape)
else:
y_qtl = K.reshape(y_pred[:, index], y_shape)
return quantile_loss(quantile, y_true, y_qtl)
metric.__name__ = "quantile_{}".format(quantile)
return metric
###################################################################
# For the Contamination Model
[docs]def add_index_to_output(y_train: Array) -> Array:
"""
This function adds a column to the training output to store the indices
of the corresponding samples in the training set.
:param ndarray y_train: Numpy array of the output in the training set
"""
# Add indices to y
y_train_index = range(y_train.shape[0])
if y_train.ndim > 1:
shp = (y_train.shape[0], 1)
y_train_augmented = np.hstack([y_train, np.reshape(y_train_index, shp)])
else:
y_train_augmented = np.vstack([y_train, y_train_index]).T
return y_train_augmented
[docs]def contamination_loss(nout: int, T_k, a, sigmaSQ, gammaSQ):
"""
Function to compute contamination loss. It is composed by two terms: (i)
the loss with respect to the normal distribution that models the
distribution of the training data samples, (ii) the loss with respect to
the Cauchy distribution that models the distribution of the outlier
samples. Note that the evaluation of this contamination loss function does
not make sense for any data different to the training set. This is because
latent variables are only defined for samples in the training set.
:param int nout: Number of outputs without uq augmentation (in the contamination model \
the augmentation corresponds to the data index in training).
:param T_k: Keras tensor. Tensor containing latent variables (probability of membership \
to normal and Cauchy distributions) for each of the samples in the training set. \
(Validation data is usually augmented too to be able to run training with validation set, \
however loss in validation should not be used as a criterion for early stopping training \
since the latent variables are defined for the training only, and thus, are not valid \
when used in combination with data different from training).
:param a: Keras variable. Probability of belonging to the normal distribution
:param sigmaSQ: Keras variable. Variance estimated for the normal distribution
:param gammaSQ: Keras variable. Scale estimated for the Cauchy distribution
"""
def loss(y_true, y_pred):
"""
:param y_true: keras tensor. True values to predict. It is assumed that this keras tensor \
includes extra columns to store the index of the data sample in the training set.
:param y_pred: keras tensor. Prediction made by the model.
"""
y_shape = K.shape(y_pred)
y_true_ = K.reshape(y_true[:, :-1], y_shape)
if nout > 1:
diff_sq = K.sum(K.square(y_true_ - y_pred), axis=-1)
else:
diff_sq = K.square(y_true_ - y_pred)
term_normal = (
diff_sq / (2.0 * sigmaSQ)
+ 0.5 * K.log(sigmaSQ)
+ 0.5 * K.log(2.0 * np.pi)
- K.log(a)
)
term_cauchy = (
K.log(1.0 + diff_sq / gammaSQ)
+ 0.5 * K.log(piSQ * gammaSQ)
- K.log(1.0 - a)
)
batch_index = K.cast(y_true[:, -1], "int64")
T_0_red = K.gather(T_k[:, 0], batch_index)
T_1_red = K.gather(T_k[:, 1], batch_index)
return K.sum(T_0_red * term_normal + T_1_red * term_cauchy)
return loss
[docs]class Contamination_Callback(Callback):
"""
This callback is used to update the parameters of the contamination
model.
This functionality follows the EM algorithm: in the E-step latent
variables are updated and in the M-step global variables are
updated. The global variables correspond to 'a' (probability of
membership to normal class), 'sigmaSQ' (variance of normal class)
and 'gammaSQ' (scale of Cauchy class, modeling outliers). The latent
variables correspond to 'T_k' (the first column corresponds to the
probability of membership to the normal distribution, while the
second column corresponds to the probability of membership to the
Cauchy distribution i.e. outlier).
"""
def __init__(self, x, y, a_max=0.99):
"""Initializer of the Contamination_Callback.
:param ndarray x: Array of samples (= input features) in training set.
:param ndarray y: Array of sample outputs in training set.
:param flaot a_max: Maximum value of a variable to allow
"""
super(Contamination_Callback, self).__init__()
if y.ndim > 1:
if y.shape[1] > 1:
raise Exception(
"ERROR ! Contamination model can be applied to one-output regression, but provided training data has: "
+ str(y.ndim)
+ "outpus... Exiting"
)
self.x = x # Features of training set
self.y = y # Output of training set
self.a_max = a_max # Set maximum a value to allow
self.sigmaSQ = K.variable(
value=0.01
) # Standard devation of normal distribution for error
self.gammaSQ = K.variable(value=0.01) # Scale of Cauchy distribution for error
# Parameter Initialization - Conditional distribution of the latent variables
if isinstance(x, list):
self.T = np.zeros((x[0].shape[0], 2))
else:
self.T = np.zeros((self.x.shape[0], 2))
self.T[:, 0] = np.random.uniform(size=self.T.shape[0])
self.T[:, 1] = 1.0 - self.T[:, 0]
self.T_k = K.variable(value=self.T)
self.a = K.variable(
value=np.mean(self.T[:, 0])
) # Probability of membership to normal distribution
self.avalues = [] # array to store a evolution
self.sigmaSQvalues = [] # array to store sigmaSQ evolution
self.gammaSQvalues = [] # array to store gammaSQ evolution
[docs] def on_epoch_end(self, epoch: int, logs={}):
"""
Updates the parameters of the distributions in the contamination
model on epoch end. The parameters updated are: 'a' for the global
weight of the membership to the normal distribution, 'sigmaSQ' for the
variance of the normal distribution and 'gammaSQ' for the scale of the
Cauchy distribution of outliers. The latent variables are updated as
well: 'T_k' describing in the first column the probability of
membership to normal distribution and in the second column probability
of membership to the Cauchy distribution i.e. outlier. Stores evolution
of global parameters (a, sigmaSQ and gammaSQ).
:param int epoch: Current epoch in training.
:param logs: keras logs. Metrics stored during current keras training.
"""
y_pred = self.model.predict(self.x)
error = self.y.squeeze() - y_pred.squeeze()
# Update parameters (M-Step)
errorSQ = error**2
aux = np.mean(self.T[:, 0])
if aux > self.a_max:
aux = self.a_max
K.set_value(self.a, aux)
K.set_value(self.sigmaSQ, np.sum(self.T[:, 0] * errorSQ) / np.sum(self.T[:, 0]))
# Gradient descent
gmSQ_eval = K.get_value(self.gammaSQ)
grad_gmSQ = (
0.5 * np.sum(self.T[:, 1])
- np.sum(self.T[:, 1] * errorSQ / (gmSQ_eval + errorSQ))
) / gmSQ_eval
# Guarantee positivity in update
eta = K.get_value(self.model.optimizer.lr)
new_gmSQ = gmSQ_eval - eta * grad_gmSQ
while new_gmSQ < 0 or (new_gmSQ / gmSQ_eval) > 1000:
eta /= 2
new_gmSQ = gmSQ_eval - eta * grad_gmSQ
K.set_value(self.gammaSQ, new_gmSQ)
# Update conditional distribution of latent variables (beginning of E-Step)
a_eval = K.get_value(self.a)
sigmaSQ_eval = K.get_value(self.sigmaSQ)
gammaSQ_eval = K.get_value(self.gammaSQ)
print("a: %f, sigmaSQ: %f, gammaSQ: %f" % (a_eval, sigmaSQ_eval, gammaSQ_eval))
norm_eval = norm.pdf(error, loc=0, scale=np.sqrt(sigmaSQ_eval))
cauchy_eval = cauchy.pdf(error, loc=0, scale=np.sqrt(gammaSQ_eval))
denominator = a_eval * norm_eval + (1.0 - a_eval) * cauchy_eval
self.T[:, 0] = a_eval * norm_eval / denominator
self.T[:, 1] = (1.0 - a_eval) * cauchy_eval / denominator
K.set_value(self.T_k, self.T)
# store evolution of global variables
self.avalues.append(a_eval)
self.sigmaSQvalues.append(sigmaSQ_eval)
self.gammaSQvalues.append(gammaSQ_eval)
[docs]def mse_contamination_metric(nout: int):
"""This function computes the mean squared error (mse) for the
contamination model. The mse is computed over the prediction. Therefore,
the augmentation for the index variable is ignored.
:param int nout: \
Number of outputs without uq augmentation (in the contamination model the augmentation corresponds to the data index in training).
"""
def metric(y_true, y_pred):
"""
:param y_true: Keras tensor. \
Keras tensor including the ground truth. Since the keras tensor includes an extra column to store the index of the data sample in the training set this column is ignored.
:param y_pred: Keras tensor. \
Keras tensor with the predictions of the contamination model (no data index).
"""
return mean_squared_error(y_true[:, :nout], y_pred[:, :nout])
metric.__name__ = "mse_contamination"
return metric
[docs]def mae_contamination_metric(nout: int):
"""
This function computes the mean absolute error (mae) for the
contamination model. The mae is computed over the prediction. Therefore,
the augmentation for the index variable is ignored.
:param int nout: \
Number of outputs without uq augmentation (in the contamination model the augmentation corresponds to the data index in training).
"""
def metric(y_true, y_pred):
"""
:param y_true: Keras tensor. \
Keras tensor including the ground truth. Since the keras tensor includes an extra column to store the index of the data sample in the training set this column is ignored.
:param y_pred: Keras tensor. \
Keras tensor with the predictions of the contamination model (no data index).
"""
return mean_absolute_error(y_true[:, :nout], y_pred[:, :nout])
metric.__name__ = "mae_contamination"
return metric
[docs]def r2_contamination_metric(nout: int):
"""
This function computes the r2 for the contamination model. The r2 is
computed over the prediction. Therefore, the augmentation for the index
variable is ignored.
:param int nout: \
Number of outputs without uq augmentation (in the contamination model the augmentation corresponds to the data index in training).
"""
def metric(y_true, y_pred):
"""
:param y_true: Keras tensor. \
Keras tensor including the ground truth. Since the keras tensor includes an extra column to store the index of the data sample in the training set this column is ignored.
:param y_pred: Keras tensor. \
Keras tensor with the predictions of the contamination model (no data index).
"""
# if nout > 1:
# y_true_ = K.reshape(y_true[:, :-1], K.shape(y_pred))
# else:
# y_true_ = K.reshape(y_true[:, 0], K.shape(y_pred))
y_true_ = K.reshape(y_true[:, :-1], K.shape(y_pred))
SS_res = K.sum(K.square(y_true_ - y_pred))
SS_tot = K.sum(K.square(y_true_ - K.mean(y_true_)))
return 1.0 - SS_res / (SS_tot + K.epsilon())
metric.__name__ = "r2_contamination"
return metric