from __future__ import absolute_import

import argparse
import os
import sys
import warnings
from pprint import pprint

import numpy as np

if sys.version_info >= (3, 8):
    from typing import TypedDict  # pylint: disable=no-name-in-module
    from typing_extensions import TypedDict

from typing import Any, List, Optional, Set, Type, Union

from .file_utils import directory_tree_from_parameters
from .helper_utils import str2bool

# Seed for random generation -- default value
DEFAULT_TIMEOUT = -1  # no timeout

[docs]class ArgumentStruct: """ Class that converts a python dictionary into an object with named entries given by the dictionary keys. This structure simplifies the calling convention for accessing the dictionary values (corresponding to problem parameters). After the object instantiation both modes of access (dictionary or object entries) can be used. """ def __init__(self, **entries): self.__dict__.update(entries)
class ListOfListsAction(argparse.Action): """This class extends the argparse.Action class by instantiating an argparser that constructs a list-of-lists from an input (command-line option or argument) given as a string.""" def __init__(self, option_strings: str, dest, type: Any, **kwargs): """Initialize a ListOfListsAction object. If no type is specified, an integer is assumed by default as the type for the elements of the list- of-lists. Parameters ---------- option_strings : string String to parse dest : object Object to store the output (in this case the parsed list-of-lists). type : data type Data type to decode the elements of the lists. Defaults to np.int32. kwargs : object Python object containing other argparse.Action parameters. """ super(ListOfListsAction, self).__init__(option_strings, dest, **kwargs) self.dtype = type if self.dtype is None: self.dtype = np.int32 def __call__(self, parser, namespace, values, option_string=None): """This function overrides the __call__ method of the base argparse.Action class. This function implements the action of the ListOfListAction class by parsing an input string (command-line option or argument) and maping it into a list-of-lists. The resulting list-of-lists is added to the namespace of parsed arguments. The parsing assumes that the separator between lists is a colon ':' and the separator inside the list is a comma ','. The values of the list are casted to the type specified at the object initialization. Parameters ---------- parser : ArgumentParser object Object that contains this action namespace : Namespace object Namespace object that will be returned by the parse_args() function. values : string The associated command-line arguments converted to string type (i.e. input). option_string : string The option string that was used to invoke this action. (optional) """ decoded_list = [] removed1 = values.replace("[", "") removed2 = removed1.replace("]", "") out_list = removed2.split(":") for line in out_list: in_list = [] elem = line.split(",") for el in elem: in_list.append(self.dtype(el)) decoded_list.append(in_list) setattr(namespace, self.dest, decoded_list) class ParseDict(TypedDict): """Definition of the dictionary structure expected for the parsing of parameters.""" name: str # ABV = abbreviated form abv: Optional[str] action: Union[str, Type[ListOfListsAction]] type: Optional[Any] default: Any help: str choices: List[str] nargs: str class ConfigDict(TypedDict): """Definition of the dictionary structure expected for the configuration of parameters.""" config_file: str data_type: str rng_seed: float train_bool: bool eval_bool: bool timeout: int gpus: Union[List[int], int] profiling: bool save_path: str model_name: str home_dir: str train_data: str val_data: str test_data: str output_dir: str data_url: str experiment_id: str run_id: str verbose: bool logfile: str scaling: str shuffle: bool feature_subsample: int dense: Union[List[int], int] conv: Union[List[int], int] locally_connected: bool activation: str out_activation: str lstm_size: int recurrent_dropout: float dropout: float pool: int batch_normalization: bool loss: str optimizer: str metrics: str epochs: int batch_size: int learning_rate: float early_stop: bool momentum: float initialization: str val_split: float train_steps: int val_steps: int test_steps: int train_samples: int val_samples: int clr_flag: bool clr_mode: str clr_base_lr: float clr_max_lr: float clr_gamma: float ckpt_restart_mode: str ckpt_checksum: bool ckpt_skip_epochs: int ckpt_directory: str ckpt_save_best: bool ckpt_save_best_metric: str ckpt_save_weights_only: bool ckpt_save_interval: int ckpt_keep_mode: str ckpt_keep_limit: int basic_conf = [ { "name": "config_file", "type": str, "default": argparse.SUPPRESS, "help": "specify model configuration file", }, { "name": "data_type", "abv": "d", "type": str, "default": argparse.SUPPRESS, "choices": ["f16", "f32", "f64"], "help": "default floating point.", }, { "name": "rng_seed", "abv": "r", "type": float, "default": argparse.SUPPRESS, "help": "random number generator seed.", }, {"name": "train_bool", "type": str2bool, "default": True, "help": "train model."}, { "name": "eval_bool", "type": str2bool, "default": argparse.SUPPRESS, "help": "evaluate model (use it for inference).", }, { "name": "timeout", "action": "store", "type": int, "default": argparse.SUPPRESS, "help": "seconds allowed to train model (default: no timeout).", }, { "name": "gpus", "nargs": "+", "type": int, "default": argparse.SUPPRESS, "help": "set IDs of GPUs to use.", }, { "name": "profiling", "abv": "p", "type": str2bool, "default": False, "help": "Turn profiling on or off.", }, ] input_output_conf = [ { "name": "save_path", "abv": "s", "type": str, "default": argparse.SUPPRESS, "help": "file path to save model snapshots.", }, { "name": "model_name", "type": str, "default": argparse.SUPPRESS, "help": "specify model name to use when building filenames for saving.", }, { "name": "home_dir", "type": str, "default": argparse.SUPPRESS, "help": "set home directory.", }, { "name": "train_data", "action": "store", "type": str, "default": argparse.SUPPRESS, "help": "training data filename.", }, { "name": "val_data", "action": "store", "type": str, "default": argparse.SUPPRESS, "help": "validation data filename.", }, { "name": "test_data", "type": str, "action": "store", "default": argparse.SUPPRESS, "help": "testing data filename.", }, { "name": "output_dir", "type": str, "default": argparse.SUPPRESS, "help": "output directory.", }, { "name": "data_url", "type": str, "default": argparse.SUPPRESS, "help": "set data source url.", }, { "name": "experiment_id", "type": str, "default": "EXP000", "help": "set the experiment unique identifier.", }, { "name": "run_id", "type": str, "default": "RUN000", "help": "set the run unique identifier.", }, ] logging_conf = [ { "name": "verbose", "abv": "v", "type": str2bool, "default": False, "help": "increase output verbosity.", }, {"name": "logfile", "abv": "l", "type": str, "default": None, "help": "log file"}, ] data_preprocess_conf = [ { "name": "scaling", "type": str, "default": argparse.SUPPRESS, "choices": ["minabs", "minmax", "std", "none"], "help": "type of feature scaling; 'minabs': to [-1,1]; 'minmax': to [0,1], 'std': standard unit normalization; 'none': no normalization.", }, { "name": "shuffle", "type": str2bool, "default": False, "help": "randomly shuffle data set (produces different training and testing partitions each run depending on the seed)", }, { "name": "feature_subsample", "type": int, "default": argparse.SUPPRESS, "help": "number of features to randomly sample from each category (cellline expression, drug descriptors, etc), 0 means using all features", }, ] model_conf = [ { "name": "dense", "nargs": "+", "type": int, "help": "number of units in fully connected layers in an integer array.", }, { "name": "conv", "nargs": "+", "type": int, "default": argparse.SUPPRESS, "help": "integer array describing convolution layers: conv1_filters, conv1_filter_len, conv1_stride, conv2_filters, conv2_filter_len, conv2_stride ....", }, { "name": "locally_connected", "type": str2bool, "default": argparse.SUPPRESS, "help": "use locally connected layers instead of convolution layers.", }, { "name": "activation", "abv": "a", "type": str, "default": argparse.SUPPRESS, "help": "keras activation function to use in inner layers: relu, tanh, sigmoid...", }, { "name": "out_activation", "type": str, "default": argparse.SUPPRESS, "help": "keras activation function to use in out layer: softmax, linear, ...", }, { "name": "lstm_size", "nargs": "+", "type": int, "default": argparse.SUPPRESS, "help": "integer array describing size of LSTM internal state per layer.", }, { "name": "recurrent_dropout", "action": "store", "type": float, "default": argparse.SUPPRESS, "help": "ratio of recurrent dropout.", }, { "name": "dropout", "type": float, "default": argparse.SUPPRESS, "help": "ratio of dropout used in fully connected layers.", }, { "name": "pool", "type": int, "default": argparse.SUPPRESS, "help": "pooling layer length.", }, { "name": "batch_normalization", "type": str2bool, "default": argparse.SUPPRESS, "help": "use batch normalization.", }, { "name": "loss", "type": str, "default": argparse.SUPPRESS, "help": "keras loss function to use: mse, ...", }, { "name": "optimizer", "type": str, "default": argparse.SUPPRESS, "help": "keras optimizer to use: sgd, rmsprop, ...", }, { "name": "metrics", "type": str, "default": argparse.SUPPRESS, "help": "metrics to evaluate performance: accuracy, ...", }, ] training_conf = [ { "name": "epochs", "type": int, "abv": "e", "default": argparse.SUPPRESS, "help": "number of training epochs.", }, { "name": "batch_size", "type": int, "abv": "z", "default": argparse.SUPPRESS, "help": "batch size.", }, { "name": "learning_rate", "abv": "lr", "type": float, "default": argparse.SUPPRESS, "help": "overrides the learning rate for training.", }, { "name": "early_stop", "type": str2bool, "default": argparse.SUPPRESS, "help": "activates keras callback for early stopping of training in function of the monitored variable specified.", }, { "name": "momentum", "type": float, "default": argparse.SUPPRESS, "help": "overrides the momentum to use in the SGD optimizer when training.", }, { "name": "initialization", "type": str, "default": argparse.SUPPRESS, "choices": [ "constant", "uniform", "normal", "glorot_uniform", "glorot_normal", "lecun_uniform", "he_normal", ], "help": "type of weight initialization; 'constant': to 0; 'uniform': to [-0.05,0.05], 'normal': mean 0, stddev 0.05; 'glorot_uniform': [-lim,lim] with lim = sqrt(6/(fan_in+fan_out)); 'lecun_uniform' : [-lim,lim] with lim = sqrt(3/fan_in); 'he_normal' : mean 0, stddev sqrt(2/fan_in).", }, { "name": "val_split", "type": float, "default": argparse.SUPPRESS, "help": "fraction of data to use in validation.", }, { "name": "train_steps", "type": int, "default": argparse.SUPPRESS, "help": "overrides the number of training batches per epoch if set to nonzero.", }, { "name": "val_steps", "type": int, "default": argparse.SUPPRESS, "help": "overrides the number of validation batches per epoch if set to nonzero.", }, { "name": "test_steps", "type": int, "default": argparse.SUPPRESS, "help": "overrides the number of test batches per epoch if set to nonzero.", }, { "name": "train_samples", "type": int, "default": argparse.SUPPRESS, "help": "overrides the number of training samples if set to nonzero.", }, { "name": "val_samples", "type": int, "default": argparse.SUPPRESS, "help": "overrides the number of validation samples if set to nonzero.", }, ] cyclic_learning_conf = [ { "name": "clr_flag", "type": str2bool, "default": argparse.SUPPRESS, "help": "CLR flag (boolean).", }, { "name": "clr_mode", "type": str, "default": argparse.SUPPRESS, "choices": ["trng1", "trng2", "exp"], "help": "CLR mode (default: trng1).", }, { "name": "clr_base_lr", "type": float, "default": argparse.SUPPRESS, "help": "Base lr for cycle lr.", }, { "name": "clr_max_lr", "type": float, "default": argparse.SUPPRESS, "help": "Max lr for cycle lr.", }, { "name": "clr_gamma", "type": float, "default": argparse.SUPPRESS, "help": "Gamma parameter for learning cycle LR.", }, ] ckpt_conf = [ { "name": "ckpt_restart_mode", "type": str, "default": "auto", "choices": ["off", "auto", "required"], "help": "Mode to restart from a saved checkpoint file, choices are 'off', 'auto', 'required'.", }, { "name": "ckpt_checksum", "type": str2bool, "default": False, "help": "Checksum the restart file after read+write.", }, { "name": "ckpt_skip_epochs", "type": int, "default": 0, "help": "Number of epochs to skip before saving epochs.", }, { "name": "ckpt_directory", "type": str, "default": "./save", "help": "Base directory in which to save checkpoints.", }, { "name": "ckpt_save_best", "type": str2bool, "default": True, "help": "Toggle saving best model.", }, { "name": "ckpt_save_best_metric", "type": str, "default": "val_loss", "help": "Metric for determining when to save best model.", }, { "name": "ckpt_save_weights_only", "type": str2bool, "default": False, "help": "Toggle saving only weights (not optimizer) (NYI).", }, { "name": "ckpt_save_interval", "type": int, "default": 0, "help": "Interval to save checkpoints.", }, { "name": "ckpt_keep_mode", "type": str, "default": "linear", "choices": ["linear", "exponential"], "help": "Checkpoint saving mode, choices are 'linear' or 'exponential'.", }, { "name": "ckpt_keep_limit", "type": int, "default": 1000000, "help": "Limit checkpoints to keep.", }, ] registered_conf = [ basic_conf, input_output_conf, logging_conf, data_preprocess_conf, model_conf, training_conf, cyclic_learning_conf, ckpt_conf, ] def extract_keywords(lst_dict, kw): """Extract the value associated to a specific keyword in a list of dictionaries. Returns the list of values extracted from the keywords. Parameters ---------- lst_dict : python list of dictionaries list to extract keywords from kw : string keyword to extract from dictionary """ lst = [di[kw] for di in lst_dict] return lst # Extract list of parameters in registered configuration PARAMETERS_CANDLE = [ item for lst in registered_conf for item in extract_keywords(lst, "name") ] CONFLICT_LIST = [["clr_flag", "warmup_lr"], ["clr_flag", "reduce_lr"]]
[docs]def check_flag_conflicts(params: ConfigDict): """ Check if parameters that must be exclusive are used in conjunction. The check is made against CONFLICT_LIST, a global list that describes parameter pairs that should be exclusive. Raises an exception if pairs of parameters in CONFLICT_LIST are specified simulataneously. :param Dict params: list to extract keywords from """ key_set: Set[str] = set(params.keys()) # check for conflicts # conflict_flag = False # loop over each set of mutually exclusive flags # if any set conflicts exit program for flag_list in CONFLICT_LIST: flag_count = 0 for i in flag_list: if i in key_set: if params[i] is True: flag_count += 1 if flag_count > 1: raise Exception( "ERROR ! Conflict in flag specification. These flags should not be used together: " + str(sorted(flag_list)) + "... Exiting" )
def check_file_parameters_exists( params_parser: ConfigDict, params_benchmark: ConfigDict, params_file: ConfigDict ): """Functionality to verify that the parameters defined in the configuration file are recognizable by the command line parser (i.e. no uknown keywords are used in the configuration file). Parameters ---------- params_parser : python dictionary Includes parameters set via the command line. params_benchmark : python list Includes additional parameters defined in the benchmark. params_file : python dictionary Includes parameters read from the configuration file. Global: PARAMETERS_CANDLE : python list Includes all the core keywords that are specified in CANDLE. """ # Get keywords from arguments coming via command line (and CANDLE supervisor) args_dict = vars(params_parser) args_set = set(args_dict.keys()) # Get keywords from benchmark definition bmk_keys: List[str] = [] for item in params_benchmark: bmk_keys.append(item["name"]) bmk_set = set(bmk_keys) # Get core CANDLE keywords candle_set = set(PARAMETERS_CANDLE) # Consolidate keywords from CANDLE core, command line, CANDLE supervisor and benchmark candle_set = candle_set.union(args_set) candle_set = candle_set.union(bmk_set) # Get keywords used in config_file file_set = set(params_file.keys()) # Compute keywords that come from the config_file that are not in the CANDLE specs diff_set = file_set.difference(candle_set) if len(diff_set) > 0: message = ( "These keywords used in the configuration file are not defined in CANDLE: " + str(sorted(diff_set)) ) warnings.warn(message, RuntimeWarning)
[docs]def finalize_parameters(bmk): """ Utility to parse parameters in common as well as parameters particular to each benchmark. :param Benchmark bmk: Object that has benchmark filepaths and specifications :return: Dictionary with all the parameters necessary to run the benchmark.\ Command line overwrites config file specifications """ # Parse common and benchmark parameters bmk.parse_parameters() # print('Args:', args) # Get parameters from configuration file # Reads parameter subset, just checking if a config_file has been set # by comand line (the parse_known_args() function allows a partial # parsing) aux = bmk.parser.parse_known_args() try: # Try to get the 'config_file' option conffile_txt = aux[0].config_file except AttributeError: # The 'config_file' option was not set by command-line conffile = bmk.conffile # use default file else: # a 'config_file' has been set --> use this file if os.path.isabs(conffile_txt): conffile = conffile_txt elif conffile_txt.startswith( "./" ): # user is trying to use a builtin alternate model file conffile = os.path.join(bmk.file_path, conffile_txt) else: if os.environ["CANDLE_DATA_DIR"] is not None: conffile = os.path.join(os.environ["CANDLE_DATA_DIR"], conffile_txt) else: conffile = os.path.join(bmk.file_path, conffile_txt) if not os.path.isfile(conffile): raise Exception( "ERROR ! Specified configuration file " + conffile + " not found ... Exiting" ) # print("Configuration file: ", conffile) fileParameters = bmk.read_config_file( conffile ) # aux.config_file)#args.config_file) # Get command-line parameters args = bmk.parser.parse_args() # print ('Params:', fileParameters) # Check keywords from file against CANDLE common and module definitions bmk_dict = bmk.additional_definitions check_file_parameters_exists(args, bmk_dict, fileParameters) # Consolidate parameter set. Command-line parameters overwrite file configuration gParameters = args_overwrite_config(args, fileParameters) # Check that required set of parameters has been defined bmk.check_required_exists(gParameters) print("Params:") pprint(gParameters) # Check that no keywords conflict check_flag_conflicts(gParameters) return gParameters
def args_overwrite_config(args, config: ConfigDict): """Overwrite configuration parameters with parameters specified via command-line. Parameters ---------- args : ArgumentParser object Parameters specified via command-line config : python dictionary Parameters read from configuration file """ params = config args_dict = vars(args) for key in args_dict.keys(): # try casting here params[key] = args_dict[key] if "data_type" not in params: params["data_type"] = DEFAULT_DATATYPE else: if params["data_type"] in set(["f16", "f32", "f64"]): params["data_type"] = get_choice(params["data_type"]) if "output_dir" not in params: params["data_dir"], params["output_dir"] = directory_tree_from_parameters( params ) else: params["data_dir"], params["output_dir"] = directory_tree_from_parameters( params, params["output_dir"] ) if "rng_seed" not in params: params["rng_seed"] = DEFAULT_SEED if "timeout" not in params: params["timeout"] = DEFAULT_TIMEOUT return params def get_choice(name: str): """Maps name string to the right type of argument.""" mapping = {} # dtype mapping["f16"] = np.float16 mapping["f32"] = np.float32 mapping["f64"] = np.float64 mapped = mapping.get(name) if not mapped: raise Exception('No mapping found for "{}"'.format(name)) return mapped
[docs]def parse_from_dictlist(dictlist: List[ParseDict], parser): """ Functionality to parse options. :param List pardict: Specification of parameters :param ArgumentParser parser: Current parser :return: consolidated parameters :rtype: ArgumentParser """ for d in dictlist: if "type" not in d: d["type"] = None # print(d['name'], 'type is ', d['type']) if "default" not in d: d["default"] = argparse.SUPPRESS if "help" not in d: d["help"] = "" if "abv" not in d: d["abv"] = None if "action" in d: # Actions if ( d["action"] == "list-of-lists" ): # Non standard. Specific functionallity has been added d["action"] = ListOfListsAction if d["abv"] is None: parser.add_argument( "--" + d["name"], dest=d["name"], action=d["action"], type=d["type"], default=d["default"], help=d["help"], ) else: parser.add_argument( "-" + d["abv"], "--" + d["name"], dest=d["name"], action=d["action"], type=d["type"], default=d["default"], help=d["help"], ) elif (d["action"] == "store_true") or (d["action"] == "store_false"): raise Exception( "The usage of store_true or store_false cannot be undone in the command line. Use type=str2bool instead." ) else: if d["abv"] is None: parser.add_argument( "--" + d["name"], action=d["action"], default=d["default"], help=d["help"], type=d["type"], ) else: parser.add_argument( "-" + d["abv"], "--" + d["name"], action=d["action"], default=d["default"], help=d["help"], type=d["type"], ) else: # Non actions if "nargs" in d: # variable parameters if "choices" in d: # choices with variable parameters if d["abv"] is None: parser.add_argument( "--" + d["name"], nargs=d["nargs"], choices=d["choices"], default=d["default"], help=d["help"], ) else: parser.add_argument( "-" + d["abv"], "--" + d["name"], nargs=d["nargs"], choices=d["choices"], default=d["default"], help=d["help"], ) else: # Variable parameters (free, no limited choices) if d["abv"] is None: parser.add_argument( "--" + d["name"], nargs=d["nargs"], type=d["type"], default=d["default"], help=d["help"], ) else: parser.add_argument( "-" + d["abv"], "--" + d["name"], nargs=d["nargs"], type=d["type"], default=d["default"], help=d["help"], ) elif "choices" in d: # Select from choice (fixed number of parameters) if d["abv"] is None: parser.add_argument( "--" + d["name"], choices=d["choices"], default=d["default"], help=d["help"], ) else: parser.add_argument( "-" + d["abv"], "--" + d["name"], choices=d["choices"], default=d["default"], help=d["help"], ) else: # Non an action, one parameter, no choices # print('Adding ', d['name'], ' to parser') if d["abv"] is None: parser.add_argument( "--" + d["name"], type=d["type"], default=d["default"], help=d["help"], ) else: parser.add_argument( "-" + d["abv"], "--" + d["name"], type=d["type"], default=d["default"], help=d["help"], ) return parser
def parse_common(parser): """Functionality to parse options. Parameters ---------- parser : ArgumentParser object Current parser """ for lst in registered_conf: parser = parse_from_dictlist(lst, parser) return parser