Mercurial > pylearn
view doc/v2_planning/plugin_JB.py @ 1199:98954d8cb92d
v2planning - modifs to plugin_JB
author | James Bergstra <bergstrj@iro.umontreal.ca> |
---|---|
date | Mon, 20 Sep 2010 02:56:11 -0400 |
parents | 1387771296a8 |
children | acfd5e747a75 |
line wrap: on
line source
"""plugin_JB - draft of library architecture using iterators""" """ - PICKLABLE - algorithms are serializable at all points during execution - ITERATOR walks through algorithms with fine granularity - COMPONENTS - library provides components on which programs operate - ALGORITHMS - library provides algorithms in clean (no hooks) form - HOOKS - user can insert print / debug logic with search/replace type calls e.g. prog.find(CALL(cd1_update)).replace_with(SEQ([CALL(cd1_update), CALL(debugfn)])) - PRINTING - user can print the 'program code' of an algorithm built from library pieces - MODULAR EXPERIMENTS - an experiment object with one (or more?) programs and all of the objects referred to by those programs. It is the preferred type of object to be serialized. The main components of the algorithms should be top-level attributes of the package. This object can be serialized and loaded in another process to implement job migration. - OPTIMIZATION - program can be optimized automatically e.g. BUFFER(N, CALL(dataset.next)) can be replaced if dataset.next implements the right attribute/protocol for 'bufferable' or something. e.g. SEQ([a,b,c,d]) can be compiled with Theano if sub-sequence is compatible - don't need greenlets to get efficiency, the implementations of control flow ops can manage a stack or stack tree in the vm (like greenlets do I think) we don't really need greenlets/stackless I don't think """ __license__ = None __copyright__ = None import copy, sys, cPickle import numpy ################################################### # Virtual Machine for executing programs class VirtualMachine(object): def __init__(self, prog): self.prog = prog self.started = False self.finished=False def __iter__(self): assert not self.started self.prog.start(None) self.started = True return self def next(self): if self.finished: raise StopIteration() r = self.prog.step() if r is INCOMPLETE: return r else: self.finished=True return r def run(self,n_steps=float('inf')): i = 0 for r in self: i += 1 if i > n_steps: break return r #################################################### # CONTROL-FLOW CONSTRUCTS class INCOMPLETE: """Return value for Element.step""" class ELEMENT(object): """ every execution block has a driver the driver calls start when entering a new control element - this would be called once per e.g. outer loop iteration the driver calls step to advance the control element - which returns INCOMPLETE - which returns any other object to indicate completion """ def start(self, arg): pass def step(self): pass class BUFFER_REPEAT(ELEMENT): """ Accumulate a number of return values into one list / array. The source of return values `src` is a control element that will be restarted repeatedly in order to fulfil the requiement of gathering N samples. TODO: support accumulating of tuples of arrays """ def __init__(self, N, src, storage=None): """ TODO: use preallocated `storage` """ self.N = N self.n = 0 self.src = src self.storage = storage self.src.start(None) if self.storage != None: raise NotImplementedError() def start(self, arg): self.buf = [None] * self.N self.n = 0 self.finished = False def step(self): assert not self.finished r = self.src.step() if r is INCOMPLETE: return r self.src.start(None) # restart our stream self.buf[self.n] = r self.n += 1 if self.n == self.N: self.finished = True return self.buf else: return INCOMPLETE assert 0 class CALL(ELEMENT): """ Control flow terminal - call a python function or method. Returns the return value of the call. """ def __init__(self, fn, *args, **kwargs): self.fn = fn self.args = args self.kwargs=kwargs self.use_start_arg = kwargs.pop('use_start_arg', False) def start(self, arg): self.start_arg = arg self.finished = False return self def step(self): assert not self.finished self.finished = True if self.use_start_arg: if self.args: raise TypeError('cant get positional args both ways') return self.fn(self.start_arg, **self.kwargs) else: return self.fn(*self.args, **self.kwargs) def __getstate__(self): rval = dict(self.__dict__) if type(self.fn) is type(self.step): #instancemethod fn = rval.pop('fn') rval['i fn'] = fn.im_func, fn.im_self, fn.im_class return rval def __setstate__(self, dct): if 'i fn' in dct: dct['fn'] = type(self.step)(*dct.pop('i fn')) self.__dict__.update(dct) def FILT(fn, **kwargs): """ Return a CALL object that uses the return value from the previous CALL as the first and only positional argument. """ return CALL(fn, use_start_arg=True, **kwargs) def CHOOSE(which, options): """ Execute one out of a number of optional control flow paths """ raise NotImplementedError() def LOOP(elements): #TODO: implement a true infinite loop try: iter(elements) return REPEAT(sys.maxint, elements) except TypeError: return REPEAT(sys.maxint, [elements]) class REPEAT(ELEMENT): def __init__(self, N, elements, pass_rvals=False): self.N = N self.elements = elements self.pass_rvals = pass_rvals #TODO: check for N being callable def start(self, arg): self.n = 0 #loop iteration self.idx = 0 #element idx self.finished = False self.elements[0].start(arg) def step(self): assert not self.finished r = self.elements[self.idx].step() if r is INCOMPLETE: return INCOMPLETE self.idx += 1 if self.idx < len(self.elements): self.elements[self.idx].start(r) return INCOMPLETE self.n += 1 if self.n < self.N: self.idx = 0 self.elements[self.idx].start(r) return INCOMPLETE else: self.finished = True return r def SEQ(elements): return REPEAT(1, elements) class WEAVE(ELEMENT): """ Interleave execution of a number of elements. TODO: allow a schedule (at least relative frequency) of elements from each program """ def __init__(self, elements): self.elements = elements def start(self, arg): for el in self.elements: el.start(arg) self.idx = 0 self.any_is_finished = False self.finished= False def step(self): assert not self.finished # if this is triggered, we have a broken driver self.idx = self.idx % len(self.elements) r = self.elements[self.idx].step() if r is not INCOMPLETE: self.any_is_finished = True self.idx += 1 if self.idx == len(self.elements) and self.any_is_finished: self.finished = True return None # dummy completion value else: return INCOMPLETE #################################################### # [Dummy] Components involved in learning algorithms class Dataset(object): def __init__(self, data): self.pos = 0 self.data = data def next(self): rval = self.data[self.pos] self.pos += 1 if self.pos == len(self.data): self.pos = 0 return rval def seek(self, pos): self.pos = pos class KFold(object): def __init__(self, data, K): self.data = data self.k = -1 self.scores = [None]*K self.K = K def next_fold(self): self.k += 1 self.data.seek(0) # restart the stream def next(self): #TODO: skip the examples that are ommitted in this split return self.data.next() def init_test(self): pass def next_test(self): return self.data.next() def test_size(self): return 5 def store_scores(self, scores): self.scores[self.k] = scores def prog(self, clear, train, test): return REPEAT(self.K, [ CALL(self.next_fold), clear, train, CALL(self.init_test), BUFFER_REPEAT(self.test_size(), SEQ([ CALL(self.next_test), test])), FILT(self.store_scores) ]) class PCA_Analysis(object): def __init__(self): self.clear() def clear(self): self.mean = 0 self.eigvecs=0 self.eigvals=0 def analyze(self, X): self.mean = numpy.mean(X, axis=0) self.eigvecs=1 self.eigvals=1 def filt(self, X): return (X - self.mean) * self.eigvecs #TODO: divide by root eigvals? def pseudo_inverse(self, Y): return Y class Layer(object): def __init__(self, w): self.w = w def filt(self, x): return self.w*x def clear(self): self.w =0 def print_obj(obj): print obj def print_obj_attr(obj, attr): print getattr(obj, attr) def no_op(*args, **kwargs): pass def cd1_update(X, layer, lr): # update self.layer from observation X layer.w += X.mean() * lr #TODO: not exactly correct math! def simple_main(): l = [0] def f(a): print l l[0] += a return l[0] print VirtualMachine(WEAVE([ BUFFER_REPEAT(3,CALL(f,1)), BUFFER_REPEAT(5,CALL(f,1)), ])).run() def main(): # create components dataset = Dataset(numpy.random.RandomState(123).randn(13,1)) pca = PCA_Analysis() layer1 = Layer(w=4) layer2 = Layer(w=3) kf = KFold(dataset, K=10) pca_batchsize=1000 cd_batchsize = 5 n_cd_updates_layer1 = 10 n_cd_updates_layer2 = 10 # create algorithm train_pca = SEQ([ BUFFER_REPEAT(pca_batchsize, CALL(kf.next)), FILT(pca.analyze)]) train_layer1 = REPEAT(n_cd_updates_layer1, [ BUFFER_REPEAT(cd_batchsize, CALL(kf.next)), FILT(pca.filt), FILT(cd1_update, layer=layer1, lr=.01)]) train_layer2 = REPEAT(n_cd_updates_layer2, [ BUFFER_REPEAT(cd_batchsize, CALL(kf.next)), FILT(pca.filt), FILT(layer1.filt), FILT(cd1_update, layer=layer2, lr=.01)]) kfold_prog = kf.prog( clear = SEQ([ # FRAGMENT 1: this bit is the reset/clear stage CALL(pca.clear), CALL(layer1.clear), CALL(layer2.clear), ]), train = SEQ([ train_pca, WEAVE([ # Silly example of how to do debugging / loggin with WEAVE train_layer1, LOOP(CALL(print_obj_attr, layer1, 'w'))]), train_layer2, ]), test=SEQ([ FILT(pca.filt), # may want to allow this SEQ to be FILT(layer1.filt), # optimized into a shorter one that FILT(layer2.filt), # compiles these calls together with FILT(numpy.mean)])) # Theano pkg1 = dict(prog=kfold_prog, kf=kf) pkg2 = copy.deepcopy(pkg1) # programs can be copied try: pkg3 = cPickle.loads(cPickle.dumps(pkg1)) except: print >> sys.stderr, "pickling doesnt work, but it can be fixed I think" pkg = pkg2 # running a program updates the variables in its package, but not the other package VirtualMachine(pkg['prog']).run() print pkg['kf'].scores if __name__ == '__main__': sys.exit(main())