Source code for candle.generic_utils

from __future__ import absolute_import, print_function

import marshal
import sys
import time
import types as python_types

import numpy as np


def get_from_module(
    identifier, module_params, module_name, instantiate=False, kwargs=None
):
    if isinstance(identifier, str):
        res = module_params.get(identifier)
        if not res:
            raise Exception("Invalid " + str(module_name) + ": " + str(identifier))
        if instantiate and not kwargs:
            return res()
        elif instantiate and kwargs:
            return res(**kwargs)
        else:
            return res
    elif type(identifier) is dict:
        name = identifier.pop("name")
        res = module_params.get(name)
        if res:
            return res(**identifier)
        else:
            raise Exception("Invalid " + str(module_name) + ": " + str(identifier))
    return identifier


def make_tuple(*args):
    return args


def func_dump(func):
    """Serialize user defined function."""
    code = marshal.dumps(func.__code__).decode("raw_unicode_escape")
    defaults = func.__defaults__
    if func.__closure__:
        closure = tuple(c.cell_contents for c in func.__closure__)
    else:
        closure = None
    return code, defaults, closure


def func_load(code, defaults=None, closure=None, globs=None):
    """Deserialize user defined function."""
    if isinstance(code, (tuple, list)):  # unpack previous dump
        code, defaults, closure = code
    code = marshal.loads(code.encode("raw_unicode_escape"))
    if closure is not None:
        closure = func_reconstruct_closure(closure)
    if globs is None:
        globs = globals()
    return python_types.FunctionType(
        code, globs, name=code.co_name, argdefs=defaults, closure=closure
    )


def func_reconstruct_closure(values):
    """Deserialization helper that reconstructs a closure."""
    nums = range(len(values))
    src = ["def func(arg):"]
    src += ["  _%d = arg[%d]" % (n, n) for n in nums]
    src += ["  return lambda:(%s)" % ",".join(["_%d" % n for n in nums]), ""]
    src = "\n".join(src)
    try:
        exec(src, globals())
    except SyntaxError:
        func = None
        raise SyntaxError(src)
    return func(values).__closure__


[docs]class Progbar(object): """ Progress bar """ def __init__(self, target, width=30, verbose=1, interval=0.01): """ :param int target: total number of steps expected :param float interval: minimum visual progress update interval (in seconds) """ self.width = width self.target = target self.sum_values = {} self.unique_values = [] self.start = time.time() self.last_update = 0 self.interval = interval self.total_width = 0 self.seen_so_far = 0 self.verbose = verbose
[docs] def update(self, current, values=[], force=False): """ :param int current: index of current step :param List values: list of tuples (name, value_for_last_step). \ The progress bar will display averages for these values. :param bool force: force visual progress update """ for k, v in values: if k not in self.sum_values: self.sum_values[k] = [ v * (current - self.seen_so_far), current - self.seen_so_far, ] self.unique_values.append(k) else: self.sum_values[k][0] += v * (current - self.seen_so_far) self.sum_values[k][1] += current - self.seen_so_far self.seen_so_far = current now = time.time() if self.verbose == 1: if not force and (now - self.last_update) < self.interval: return prev_total_width = self.total_width sys.stdout.write("\b" * prev_total_width) sys.stdout.write("\r") numdigits = int(np.floor(np.log10(self.target))) + 1 barstr = "%%%dd/%%%dd [" % (numdigits, numdigits) bar = barstr % (current, self.target) prog = float(current) / self.target prog_width = int(self.width * prog) if prog_width > 0: bar += "=" * (prog_width - 1) if current < self.target: bar += ">" else: bar += "=" bar += "." * (self.width - prog_width) bar += "]" sys.stdout.write(bar) self.total_width = len(bar) if current: time_per_unit = (now - self.start) / current else: time_per_unit = 0 eta = time_per_unit * (self.target - current) info = "" if current < self.target: info += " - ETA: %ds" % eta else: info += " - %ds" % (now - self.start) for k in self.unique_values: info += " - %s:" % k if type(self.sum_values[k]) is list: avg = self.sum_values[k][0] / max(1, self.sum_values[k][1]) if abs(avg) > 1e-3: info += " %.4f" % avg else: info += " %.4e" % avg else: info += " %s" % self.sum_values[k] self.total_width += len(info) if prev_total_width > self.total_width: info += (prev_total_width - self.total_width) * " " sys.stdout.write(info) sys.stdout.flush() if current >= self.target: sys.stdout.write("\n") if self.verbose == 2: if current >= self.target: info = "%ds" % (now - self.start) for k in self.unique_values: info += " - %s:" % k avg = self.sum_values[k][0] / max(1, self.sum_values[k][1]) if avg > 1e-3: info += " %.4f" % avg else: info += " %.4e" % avg sys.stdout.write(info + "\n") self.last_update = now
def add(self, n, values=[]): self.update(self.seen_so_far + n, values)
def display_table(rows, positions): def display_row(objects, positions): line = "" for i in range(len(objects)): line += str(objects[i]) line = line[: positions[i]] line += " " * (positions[i] - len(line)) print(line) for objects in rows: display_row(objects, positions)