Source code for keras_utils

from __future__ import absolute_import


from tensorflow.keras import backend as K
from tensorflow.keras import optimizers
from tensorflow.keras import initializers

from tensorflow.keras.layers import Dropout
from tensorflow.keras.callbacks import Callback
from tensorflow.keras.utils import get_custom_objects
from tensorflow.keras.metrics import binary_crossentropy, mean_squared_error, mean_absolute_error

from scipy.stats.stats import pearsonr

from default_utils import set_seed as set_seed_defaultUtils

import warnings
with warnings.catch_warnings():
    warnings.filterwarnings("ignore", category=DeprecationWarning)
    from sklearn.metrics import r2_score

import os


[docs]def set_parallelism_threads(): """ Set the number of parallel threads according to the number available on the hardware """ if K.backend() == 'tensorflow' and 'NUM_INTRA_THREADS' in os.environ and 'NUM_INTER_THREADS' in os.environ: import tensorflow as tf # print('Using Thread Parallelism: {} NUM_INTRA_THREADS, {} NUM_INTER_THREADS'.format(os.environ['NUM_INTRA_THREADS'], os.environ['NUM_INTER_THREADS'])) session_conf = tf.ConfigProto(inter_op_parallelism_threads=int(os.environ['NUM_INTER_THREADS']), intra_op_parallelism_threads=int(os.environ['NUM_INTRA_THREADS'])) sess = tf.Session(graph=tf.get_default_graph(), config=session_conf) K.set_session(sess)
[docs]def set_seed(seed): """ Set the random number seed to the desired value Parameters ---------- seed : integer Random number seed. """ set_seed_defaultUtils(seed) if K.backend() == 'tensorflow': import tensorflow as tf if tf.__version__ < "2.0.0": tf.compat.v1.set_random_seed(seed) else: tf.random.set_seed(seed)
[docs]def get_function(name): mapping = {} mapped = mapping.get(name) if not mapped: raise Exception('No keras function found for "{}"'.format(name)) return mapped
[docs]def build_optimizer(type, lr, kerasDefaults): """ Set the optimizer to the appropriate Keras optimizer function based on the input string and learning rate. Other required values are set to the Keras default values Parameters ---------- type : string String to choose the optimizer Options recognized: 'sgd', 'rmsprop', 'adagrad', adadelta', 'adam' See the Keras documentation for a full description of the options lr : float Learning rate kerasDefaults : list List of default parameter values to ensure consistency between frameworks Returns ---------- The appropriate Keras optimizer function """ if type == 'sgd': return optimizers.SGD(lr=lr, decay=kerasDefaults['decay_lr'], momentum=kerasDefaults['momentum_sgd'], nesterov=kerasDefaults['nesterov_sgd']) # , # clipnorm=kerasDefaults['clipnorm'], # clipvalue=kerasDefaults['clipvalue']) elif type == 'rmsprop': return optimizers.RMSprop(lr=lr, rho=kerasDefaults['rho'], epsilon=kerasDefaults['epsilon'], decay=kerasDefaults['decay_lr']) # , # clipnorm=kerasDefaults['clipnorm'], # clipvalue=kerasDefaults['clipvalue']) elif type == 'adagrad': return optimizers.Adagrad(lr=lr, epsilon=kerasDefaults['epsilon'], decay=kerasDefaults['decay_lr']) # , # clipnorm=kerasDefaults['clipnorm'], # clipvalue=kerasDefaults['clipvalue']) elif type == 'adadelta': return optimizers.Adadelta(lr=lr, rho=kerasDefaults['rho'], epsilon=kerasDefaults['epsilon'], decay=kerasDefaults['decay_lr']) # , # clipnorm=kerasDefaults['clipnorm'], # clipvalue=kerasDefaults['clipvalue']) elif type == 'adam': return optimizers.Adam(lr=lr, beta_1=kerasDefaults['beta_1'], beta_2=kerasDefaults['beta_2'], epsilon=kerasDefaults['epsilon'], decay=kerasDefaults['decay_lr']) # ,
# clipnorm=kerasDefaults['clipnorm'], # clipvalue=kerasDefaults['clipvalue']) # Not generally available # elif type == 'adamax': # return optimizers.Adamax(lr=lr, beta_1=kerasDefaults['beta_1'], # beta_2=kerasDefaults['beta_2'], # epsilon=kerasDefaults['epsilon'], # decay=kerasDefaults['decay_lr']) # elif type == 'nadam': # return optimizers.Nadam(lr=lr, beta_1=kerasDefaults['beta_1'], # beta_2=kerasDefaults['beta_2'], # epsilon=kerasDefaults['epsilon'], # schedule_decay=kerasDefaults['decay_schedule_lr'])
[docs]def build_initializer(type, kerasDefaults, seed=None, constant=0.): """ Set the initializer to the appropriate Keras initializer function based on the input string and learning rate. Other required values are set to the Keras default values Parameters ---------- type : string String to choose the initializer Options recognized: 'constant', 'uniform', 'normal', 'glorot_uniform', 'lecun_uniform', 'he_normal' See the Keras documentation for a full description of the options kerasDefaults : list List of default parameter values to ensure consistency between frameworks seed : integer Random number seed constant : float Constant value (for the constant initializer only) Return ---------- The appropriate Keras initializer function """ if type == 'constant': return initializers.Constant(value=constant) elif type == 'uniform': return initializers.RandomUniform(minval=kerasDefaults['minval_uniform'], maxval=kerasDefaults['maxval_uniform'], seed=seed) elif type == 'normal': return initializers.RandomNormal(mean=kerasDefaults['mean_normal'], stddev=kerasDefaults['stddev_normal'], seed=seed) elif type == 'glorot_normal': # aka Xavier normal initializer. keras default return initializers.glorot_normal(seed=seed) elif type == 'glorot_uniform': return initializers.glorot_uniform(seed=seed) elif type == 'lecun_uniform': return initializers.lecun_uniform(seed=seed) elif type == 'he_normal': return initializers.he_normal(seed=seed)
[docs]def xent(y_true, y_pred): return binary_crossentropy(y_true, y_pred)
[docs]def r2(y_true, y_pred): SS_res = K.sum(K.square(y_true - y_pred)) SS_tot = K.sum(K.square(y_true - K.mean(y_true))) return (1 - SS_res / (SS_tot + K.epsilon()))
[docs]def mae(y_true, y_pred): return mean_absolute_error(y_true, y_pred)
[docs]def mse(y_true, y_pred): return mean_squared_error(y_true, y_pred)
[docs]def covariance(x, y): return K.mean(x * y) - K.mean(x) * K.mean(y)
[docs]def corr(y_true, y_pred): cov = covariance(y_true, y_pred) var1 = covariance(y_true, y_true) var2 = covariance(y_pred, y_pred) return cov / (K.sqrt(var1 * var2) + K.epsilon())
[docs]def evaluate_autoencoder(y_pred, y_test): mse = mean_squared_error(y_pred, y_test) r2 = r2_score(y_test, y_pred) corr, _ = pearsonr(y_pred.flatten(), y_test.flatten()) # print('Mean squared error: {}%'.format(mse)) return {'mse': mse, 'r2_score': r2, 'correlation': corr}
[docs]class PermanentDropout(Dropout): def __init__(self, rate, **kwargs): super(PermanentDropout, self).__init__(rate, **kwargs) self.uses_learning_phase = False
[docs] def call(self, x, mask=None): if 0. < self.rate < 1.: noise_shape = self._get_noise_shape(x) x = K.dropout(x, self.rate, noise_shape) return x
[docs]def register_permanent_dropout(): get_custom_objects()['PermanentDropout'] = PermanentDropout
[docs]class LoggingCallback(Callback): def __init__(self, print_fcn=print): Callback.__init__(self) self.print_fcn = print_fcn
[docs] def on_epoch_end(self, epoch, logs={}): msg = "[Epoch: %i] %s" % (epoch, ", ".join("%s: %f" % (k, v) for k, v in sorted(logs.items()))) self.print_fcn(msg)