import json
import warnings
from datetime import datetime
from typing import Dict
import numpy as np
from scipy.stats.stats import pearsonr
from tensorflow.keras import backend as K
from tensorflow.keras import initializers, optimizers
from tensorflow.keras.callbacks import Callback
from tensorflow.keras.layers import Dropout
from tensorflow.keras.metrics import (
binary_crossentropy,
mean_absolute_error,
mean_squared_error,
)
from tensorflow.keras.utils import get_custom_objects
from .helper_utils import set_seed as set_seed_defaultUtils
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=DeprecationWarning)
from sklearn.metrics import r2_score
import os
[docs]def set_parallelism_threads():
"""
Set the number of parallel threads according to the number available on
the hardware.
"""
if (
K.backend() == "tensorflow"
and "NUM_INTRA_THREADS" in os.environ
and "NUM_INTER_THREADS" in os.environ
):
import tensorflow as tf
# print('Using Thread Parallelism: {} NUM_INTRA_THREADS, {} NUM_INTER_THREADS'.format(os.environ['NUM_INTRA_THREADS'], os.environ['NUM_INTER_THREADS']))
session_conf = tf.ConfigProto(
inter_op_parallelism_threads=int(os.environ["NUM_INTER_THREADS"]),
intra_op_parallelism_threads=int(os.environ["NUM_INTRA_THREADS"]),
)
sess = tf.Session(graph=tf.get_default_graph(), config=session_conf)
K.set_session(sess)
[docs]def set_seed(seed: int):
"""
Set the random number seed to the desired value.
:param int seed: Random number seed.
"""
set_seed_defaultUtils(seed)
if K.backend() == "tensorflow":
import tensorflow as tf
if tf.__version__ < "2.0.0":
tf.compat.v1.set_random_seed(seed)
else:
tf.random.set_seed(seed)
[docs]def get_function(name: str):
mapping = {}
mapped = mapping.get(name)
if not mapped:
raise Exception('No keras function found for "{}"'.format(name))
return mapped
[docs]def build_optimizer(optimizer, lr, kerasDefaults):
"""
Set the optimizer to the appropriate Keras optimizer function based on
the input string and learning rate. Other required values are set to the
Keras default values.
:param string optimizer: String to choose the optimizer \
Options recognized: 'sgd', 'rmsprop', 'adagrad', adadelta', 'adam' \
See the Keras documentation for a full description of the options
:param float lr: Learning rate
:param List kerasDefaults: List of default parameter values to ensure consistency between frameworks
:return: The appropriate Keras optimizer function
"""
if optimizer == "sgd":
return optimizers.SGD(
lr=lr,
decay=kerasDefaults["decay_lr"],
momentum=kerasDefaults["momentum_sgd"],
nesterov=kerasDefaults["nesterov_sgd"],
)
elif optimizer == "rmsprop":
return optimizers.RMSprop(
lr=lr,
rho=kerasDefaults["rho"],
epsilon=kerasDefaults["epsilon"],
decay=kerasDefaults["decay_lr"],
)
elif optimizer == "adagrad":
return optimizers.Adagrad(
lr=lr, epsilon=kerasDefaults["epsilon"], decay=kerasDefaults["decay_lr"]
)
elif optimizer == "adadelta":
return optimizers.Adadelta(
lr=lr,
rho=kerasDefaults["rho"],
epsilon=kerasDefaults["epsilon"],
decay=kerasDefaults["decay_lr"],
)
elif optimizer == "adam":
return optimizers.Adam(
lr=lr,
beta_1=kerasDefaults["beta_1"],
beta_2=kerasDefaults["beta_2"],
epsilon=kerasDefaults["epsilon"],
decay=kerasDefaults["decay_lr"],
)
[docs]def build_initializer(
initializer: str, kerasDefaults: Dict, seed: int = None, constant: float = 0.0
):
"""
Set the initializer to the appropriate Keras initializer function based
on the input string and learning rate. Other required values are set to the
Keras default values.
:param string initializer: String to choose the initializer \
Options recognized: 'constant', 'uniform', 'normal', \
'glorot_uniform', 'lecun_uniform', 'he_normal' \
See the Keras documentation for a full description of the options
:param List kerasDefaults: List of default parameter values to ensure consistency between frameworks
:param int seed: Random number seed
:param float constant: Constant value (for the constant initializer only)
:return: The appropriate Keras initializer function
"""
if initializer == "constant":
return initializers.Constant(value=constant)
elif initializer == "uniform":
return initializers.RandomUniform(
minval=kerasDefaults["minval_uniform"],
maxval=kerasDefaults["maxval_uniform"],
seed=seed,
)
elif initializer == "normal":
return initializers.RandomNormal(
mean=kerasDefaults["mean_normal"],
stddev=kerasDefaults["stddev_normal"],
seed=seed,
)
elif initializer == "glorot_normal":
# aka Xavier normal initializer. keras default
return initializers.glorot_normal(seed=seed)
elif initializer == "glorot_uniform":
return initializers.glorot_uniform(seed=seed)
elif initializer == "lecun_uniform":
return initializers.lecun_uniform(seed=seed)
elif initializer == "he_normal":
return initializers.he_normal(seed=seed)
def xent(y_true, y_pred):
return binary_crossentropy(y_true, y_pred)
[docs]def r2(y_true, y_pred):
SS_res = K.sum(K.square(y_true - y_pred))
SS_tot = K.sum(K.square(y_true - K.mean(y_true)))
return 1 - SS_res / (SS_tot + K.epsilon())
[docs]def mae(y_true, y_pred):
return mean_absolute_error(y_true, y_pred)
[docs]def mse(y_true, y_pred):
return mean_squared_error(y_true, y_pred)
def covariance(x, y):
return K.mean(x * y) - K.mean(x) * K.mean(y)
def corr(y_true, y_pred):
cov = covariance(y_true, y_pred)
var1 = covariance(y_true, y_true)
var2 = covariance(y_pred, y_pred)
return cov / (K.sqrt(var1 * var2) + K.epsilon())
def evaluate_autoencoder(y_pred, y_test):
mse = mean_squared_error(y_pred, y_test)
r2 = r2_score(y_test, y_pred)
corr, _ = pearsonr(y_pred.flatten(), y_test.flatten())
# print('Mean squared error: {}%'.format(mse))
return {"mse": mse, "r2_score": r2, "correlation": corr}
[docs]class PermanentDropout(Dropout):
def __init__(self, rate, **kwargs):
super(PermanentDropout, self).__init__(rate, **kwargs)
self.uses_learning_phase = False
[docs] def call(self, x, mask=None):
if 0.0 < self.rate < 1.0:
noise_shape = self._get_noise_shape(x)
x = K.dropout(x, self.rate, noise_shape)
return x
[docs]def register_permanent_dropout():
get_custom_objects()["PermanentDropout"] = PermanentDropout
[docs]class LoggingCallback(Callback):
def __init__(self, print_fcn=print):
Callback.__init__(self)
self.print_fcn = print_fcn
[docs] def on_epoch_end(self, epoch, logs={}):
msg = "[Epoch: %i] %s" % (
epoch,
", ".join("%s: %f" % (k, v) for k, v in sorted(logs.items())),
)
self.print_fcn(msg)
[docs]def compute_trainable_params(model):
"""
Extract number of parameters from the given Keras model
:param model: Keras model
:return: python dictionary that contains trainable_params, non_trainable_params and total_params
"""
if str(type(model)).startswith("<class 'keras."):
from keras import backend as K
else:
import tensorflow.keras.backend as K
trainable_count = int(np.sum([K.count_params(w) for w in model.trainable_weights]))
non_trainable_count = int(
np.sum([K.count_params(w) for w in model.non_trainable_weights])
)
return {
"trainable_params": trainable_count,
"non_trainable_params": non_trainable_count,
"total_params": (trainable_count + non_trainable_count),
}
[docs]class TerminateOnTimeOut(Callback):
"""
This class implements timeout on model training.
When the script reaches timeout,
this class sets model.stop_training = True
"""
def __init__(self, timeout_in_sec=10):
"""
Initialize TerminateOnTimeOut class.
:param int timeout_in_sec: seconds to timeout
"""
super(TerminateOnTimeOut, self).__init__()
self.run_timestamp = None
self.timeout_in_sec = timeout_in_sec
[docs] def on_train_begin(self, logs={}):
"""Start clock to calculate timeout."""
self.run_timestamp = datetime.now()
[docs] def on_epoch_end(self, epoch, logs={}):
"""On every epoch end, check whether it exceeded timeout and terminate
training if necessary."""
run_end = datetime.now()
run_duration = run_end - self.run_timestamp
run_in_sec = run_duration.total_seconds()
print("Current time ....%2.3f" % run_in_sec)
if self.timeout_in_sec != -1:
if run_in_sec >= self.timeout_in_sec:
print(
"Timeout==>Runtime: %2.3fs, Maxtime: %2.3fs"
% (run_in_sec, self.timeout_in_sec)
)
self.model.stop_training = True
[docs]class CandleRemoteMonitor(Callback):
"""
Capture Run level output and store/send for monitoring.
"""
def __init__(self, params=None):
super(CandleRemoteMonitor, self).__init__()
self.global_params = params
# init
self.experiment_id = None
self.run_id = None
self.run_timestamp = None
self.epoch_timestamp = None
self.log_messages = []
[docs] def on_train_begin(self, logs=None):
logs = logs or {}
self.run_timestamp = datetime.now()
self.experiment_id = (
self.global_params["experiment_id"]
if "experiment_id" in self.global_params
else "EXP_default"
)
self.run_id = (
self.global_params["run_id"]
if "run_id" in self.global_params
else "RUN_default"
)
run_params = []
for key, val in self.global_params.items():
run_params.append("{}: {}".format(key, val))
send = {
"experiment_id": self.experiment_id,
"run_id": self.run_id,
"parameters": run_params,
"start_time": str(self.run_timestamp),
"status": "Started",
}
# print("on_train_begin", send)
self.log_messages.append(send)
[docs] def on_epoch_begin(self, epoch, logs=None):
self.epoch_timestamp = datetime.now()
[docs] def on_epoch_end(self, epoch, logs=None):
logs = logs or {}
loss = logs.get("loss")
val_loss = logs.get("val_loss")
epoch_total = self.global_params["epochs"]
epoch_duration = datetime.now() - self.epoch_timestamp
epoch_in_sec = epoch_duration.total_seconds()
epoch_line = "epoch: {}/{}, duration: {}s, loss: {}, val_loss: {}".format(
(epoch + 1), epoch_total, epoch_in_sec, loss, val_loss
)
send = {
"run_id": self.run_id,
"status": {"set": "Running"},
"training_loss": {"set": loss},
"validation_loss": {"set": val_loss},
"run_progress": {"add": [epoch_line]},
}
# print("on_epoch_end", send)
self.log_messages.append(send)
[docs] def on_train_end(self, logs=None):
logs = logs or {}
run_end = datetime.now()
run_duration = run_end - self.run_timestamp
run_in_hour = run_duration.total_seconds() / (60 * 60)
send = {
"run_id": self.run_id,
"runtime_hours": {"set": run_in_hour},
"end_time": {"set": str(run_end)},
"status": {"set": "Finished"},
"date_modified": {"set": "NOW"},
}
# print("on_train_end", send)
self.log_messages.append(send)
# save to file when finished
self.save()
[docs] def save(self):
"""Save log_messages to file."""
# path = os.getenv('TURBINE_OUTPUT') if 'TURBINE_OUTPUT' in os.environ else '.'
path = (
self.global_params["output_dir"]
if "output_dir" in self.global_params
else "."
)
if not os.path.exists(path):
os.makedirs(path)
filename = "/run.{}.json".format(self.run_id)
with open(path + filename, "a") as file_run_json:
file_run_json.write(
json.dumps(self.log_messages, indent=4, separators=(",", ": "))
)