from __future__ import absolute_import
from __future__ import print_function
import numpy as np
import time
import sys
import six
import marshal
import types as python_types
[docs]def get_from_module(identifier, module_params, module_name,
instantiate=False, kwargs=None):
if isinstance(identifier, six.string_types):
res = module_params.get(identifier)
if not res:
raise Exception('Invalid ' + str(module_name) + ': ' + str(identifier))
if instantiate and not kwargs:
return res()
elif instantiate and kwargs:
return res(**kwargs)
else:
return res
elif type(identifier) is dict:
name = identifier.pop('name')
res = module_params.get(name)
if res:
return res(**identifier)
else:
raise Exception('Invalid ' + str(module_name) + ': ' + str(identifier))
return identifier
[docs]def make_tuple(*args):
return args
[docs]def func_dump(func):
""" Serialize user defined function. """
code = marshal.dumps(func.__code__).decode('raw_unicode_escape')
defaults = func.__defaults__
if func.__closure__:
closure = tuple(c.cell_contents for c in func.__closure__)
else:
closure = None
return code, defaults, closure
[docs]def func_load(code, defaults=None, closure=None, globs=None):
""" Deserialize user defined function. """
if isinstance(code, (tuple, list)): # unpack previous dump
code, defaults, closure = code
code = marshal.loads(code.encode('raw_unicode_escape'))
if closure is not None:
closure = func_reconstruct_closure(closure)
if globs is None:
globs = globals()
return python_types.FunctionType(code, globs, name=code.co_name, argdefs=defaults, closure=closure)
[docs]def func_reconstruct_closure(values):
""" Deserialization helper that reconstructs a closure. """
nums = range(len(values))
src = ["def func(arg):"]
src += [" _%d = arg[%d]" % (n, n) for n in nums]
src += [" return lambda:(%s)" % ','.join(["_%d" % n for n in nums]), ""]
src = '\n'.join(src)
try:
exec(src, globals())
except SyntaxError:
func = None
raise SyntaxError(src)
return func(values).__closure__
[docs]class Progbar(object):
def __init__(self, target, width=30, verbose=1, interval=0.01):
"""
Parameters
------------
target: int
total number of steps expected
interval: float
minimum visual progress update interval (in seconds)
"""
self.width = width
self.target = target
self.sum_values = {}
self.unique_values = []
self.start = time.time()
self.last_update = 0
self.interval = interval
self.total_width = 0
self.seen_so_far = 0
self.verbose = verbose
[docs] def update(self, current, values=[], force=False):
"""
Parameters
------------
current : int
index of current step
values : list of tuples (name, value_for_last_step).
The progress bar will display averages for these values.
force : boolean
force visual progress update
"""
for k, v in values:
if k not in self.sum_values:
self.sum_values[k] = [v * (current - self.seen_so_far), current - self.seen_so_far]
self.unique_values.append(k)
else:
self.sum_values[k][0] += v * (current - self.seen_so_far)
self.sum_values[k][1] += (current - self.seen_so_far)
self.seen_so_far = current
now = time.time()
if self.verbose == 1:
if not force and (now - self.last_update) < self.interval:
return
prev_total_width = self.total_width
sys.stdout.write("\b" * prev_total_width)
sys.stdout.write("\r")
numdigits = int(np.floor(np.log10(self.target))) + 1
barstr = '%%%dd/%%%dd [' % (numdigits, numdigits)
bar = barstr % (current, self.target)
prog = float(current) / self.target
prog_width = int(self.width * prog)
if prog_width > 0:
bar += ('=' * (prog_width - 1))
if current < self.target:
bar += '>'
else:
bar += '='
bar += ('.' * (self.width - prog_width))
bar += ']'
sys.stdout.write(bar)
self.total_width = len(bar)
if current:
time_per_unit = (now - self.start) / current
else:
time_per_unit = 0
eta = time_per_unit * (self.target - current)
info = ''
if current < self.target:
info += ' - ETA: %ds' % eta
else:
info += ' - %ds' % (now - self.start)
for k in self.unique_values:
info += ' - %s:' % k
if type(self.sum_values[k]) is list:
avg = self.sum_values[k][0] / max(1, self.sum_values[k][1])
if abs(avg) > 1e-3:
info += ' %.4f' % avg
else:
info += ' %.4e' % avg
else:
info += ' %s' % self.sum_values[k]
self.total_width += len(info)
if prev_total_width > self.total_width:
info += ((prev_total_width - self.total_width) * " ")
sys.stdout.write(info)
sys.stdout.flush()
if current >= self.target:
sys.stdout.write("\n")
if self.verbose == 2:
if current >= self.target:
info = '%ds' % (now - self.start)
for k in self.unique_values:
info += ' - %s:' % k
avg = self.sum_values[k][0] / max(1, self.sum_values[k][1])
if avg > 1e-3:
info += ' %.4f' % avg
else:
info += ' %.4e' % avg
sys.stdout.write(info + "\n")
self.last_update = now
[docs] def add(self, n, values=[]):
self.update(self.seen_so_far + n, values)
[docs]def display_table(rows, positions):
def display_row(objects, positions):
line = ''
for i in range(len(objects)):
line += str(objects[i])
line = line[:positions[i]]
line += ' ' * (positions[i] - len(line))
print(line)
for objects in rows:
display_row(objects, positions)