Mercurial > pylearn
view doc/v2_planning/plugin_JB.py @ 1211:e7ac87720fee
v2planning plugin_JB - added PRINT and POPEN to demonstrate parallel async. control flows
author | James Bergstra <bergstrj@iro.umontreal.ca> |
---|---|
date | Wed, 22 Sep 2010 00:23:07 -0400 |
parents | cbe1fb32686c |
children | 478bb1f8215c |
line wrap: on
line source
"""plugin_JB - draft of potential library architecture using iterators This strategy makes use of a simple imperative language whose statements are python function calls to create learning algorithms that can be manipulated and executed in several desirable ways. The training procedure for a PCA module is easy to express: # allocate the relevant modules dataset = Dataset(numpy.random.RandomState(123).randn(13,1)) pca = PCA_Analysis() pca_batchsize=1000 # define the control-flow of the algorithm train_pca = SEQ([ BUFFER_REPEAT(pca_batchsize, CALL(dataset.next)), FILT(pca.analyze)]) # run the program train_pca.run() The CALL, SEQ, FILT, and BUFFER_REPEAT are control-flow elements. The control-flow elements I defined so far are: - CALL - a basic statement, just calls a python function - FILT - like call, but passes the return value of the last CALL or FILT to the python function - SEQ - a sequence of elements to run in order - REPEAT - do something N times (and return None or maybe the last CALL?) - BUFFER_REPEAT - do something N times and accumulate the return value from each iter - LOOP - do something an infinite number of times - CHOOSE - like a switch statement (should rename to SWITCH) - WEAVE - interleave execution of multiple control-flow elements - POPEN - launch a process and return its status when it's complete - PRINT - a shortcut for CALL(print_obj) We don't have many requirements per-se for the architecture, but I think this design respects and realizes all of them. The advantages of this approach are: - algorithms (including partially run ones) are COPYABLE, and SERIALIZABLE - algorithms can be executed without seizing control of the python process (the run() method does this, but if you look inside it you'll see it's a simple for loop) - it is easy to execute an algorithm step by step in a main loop that also checks for network or filesystem events related to e.g. job management. - the library can provide learning algorithms via control-flow templates, and the user can edit them (with search/replace calls) to include HOOKS, and DIAGNOSTIC plug-in functionality e.g. prog.find(CALL(cd1_update, layer=layer1)).replace_with( SEQ([CALL(cd1_update, layer=layer1), CALL(my_debugfn)])) - user can print the 'program code' of an algorithm built from library pieces - program can be optimized automatically. - e.g. BUFFER(N, CALL(dataset.next)) could be replaced if dataset.next implements the right attribute/protocol for 'bufferable' or something. - e.g. SEQ([a,b,c,d]) could be compiled to a single CALL to a Theano-compiled function if a, b, c, and d are calls to callable objects that export something like a 'theano_SEQ' interface """ __license__ = 'TODO' __copyright__ = 'TODO' import cPickle, copy, subprocess, sys, time import numpy #################################################### # CONTROL-FLOW CONSTRUCTS class INCOMPLETE: """Return value for Element.step""" class ELEMENT(object): """ Base class for control flow elements (e.g. CALL, REPEAT, etc.) The design is that every element has a driver, that is another element, or the iterator implementation in the ELEMENT class. the driver calls start when entering a new control element - this would be called once per e.g. outer loop iteration the driver calls step to advance the control element - which returns INCOMPLETE - which returns any other object to indicate completion """ # subclasses should override these methods: def start(self, arg): pass def step(self): pass # subclasses should typically not override these: def run(self, arg=None, n_steps=float('inf')): self.start(arg) i = 0 r = self.step() while r is INCOMPLETE: i += 1 #TODO make sure there is not an off-by-one error if i > n_steps: break r = self.step() return r class BUFFER_REPEAT(ELEMENT): """ Accumulate a number of return values into one list / array. The source of return values `src` is a control element that will be restarted repeatedly in order to fulfil the requiement of gathering N samples. TODO: support accumulating of tuples of arrays """ def __init__(self, N, src, storage=None): """ TODO: use preallocated `storage` """ self.N = N self.n = 0 self.src = src self.storage = storage self.src.start(None) if self.storage != None: raise NotImplementedError() def start(self, arg): self.buf = [None] * self.N self.n = 0 self.finished = False def step(self): assert not self.finished r = self.src.step() if r is INCOMPLETE: return r self.src.start(None) # restart our stream self.buf[self.n] = r self.n += 1 if self.n == self.N: self.finished = True return self.buf else: return INCOMPLETE assert 0 class CALL(ELEMENT): """ Control flow terminal - call a python function or method. Returns the return value of the call. """ def __init__(self, fn, *args, **kwargs): self.fn = fn self.args = args self.kwargs=kwargs self.use_start_arg = kwargs.pop('use_start_arg', False) def start(self, arg): self.start_arg = arg self.finished = False return self def step(self): assert not self.finished self.finished = True if self.use_start_arg: if self.args: raise TypeError('cant get positional args both ways') return self.fn(self.start_arg, **self.kwargs) else: return self.fn(*self.args, **self.kwargs) def __getstate__(self): rval = dict(self.__dict__) if type(self.fn) is type(self.step): #instancemethod fn = rval.pop('fn') rval['i fn'] = fn.im_func, fn.im_self, fn.im_class return rval def __setstate__(self, dct): if 'i fn' in dct: dct['fn'] = type(self.step)(*dct.pop('i fn')) self.__dict__.update(dct) def FILT(fn, **kwargs): """ Return a CALL object that uses the return value from the previous CALL as the first and only positional argument. """ return CALL(fn, use_start_arg=True, **kwargs) def CHOOSE(which, options): """ Execute one out of a number of optional control flow paths """ raise NotImplementedError() def LOOP(elements): #TODO: implement a true infinite loop try: iter(elements) return REPEAT(sys.maxint, elements) except TypeError: return REPEAT(sys.maxint, [elements]) class REPEAT(ELEMENT): def __init__(self, N, elements, pass_rvals=False): self.N = N self.elements = elements self.pass_rvals = pass_rvals #TODO: check for N being callable def start(self, arg): self.n = 0 #loop iteration self.idx = 0 #element idx self.finished = False self.elements[0].start(arg) def step(self): assert not self.finished r = self.elements[self.idx].step() if r is INCOMPLETE: return INCOMPLETE self.idx += 1 if self.idx < len(self.elements): self.elements[self.idx].start(r) return INCOMPLETE self.n += 1 if self.n < self.N: self.idx = 0 self.elements[self.idx].start(r) return INCOMPLETE else: self.finished = True return r def SEQ(elements): return REPEAT(1, elements) class WEAVE(ELEMENT): """ Interleave execution of a number of elements. TODO: allow a schedule (at least relative frequency) of elements from each program """ def __init__(self, n_required, elements): self.elements = elements if n_required == -1: self.n_required = len(elements) else: self.n_required = n_required def start(self, arg): for el in self.elements: el.start(arg) self.elem_finished = [0] * len(self.elements) self.idx = 0 self.finished= False def step(self): assert not self.finished # if this is triggered, we have a broken driver #start with this check in case there were no elements # it's possible for the number of finished elements to exceed the threshold if sum(self.elem_finished) >= self.n_required: self.finished = True return None # step the active element r = self.elements[self.idx].step() if r is not INCOMPLETE: self.elem_finished[self.idx] = True # check for completion if sum(self.elem_finished) >= self.n_required: self.finished = True return None # advance to the next un-finished element self.idx = (self.idx+1) % len(self.elements) while self.elem_finished[self.idx]: self.idx = (self.idx+1) % len(self.elements) return INCOMPLETE class POPEN(ELEMENT): def __init__(self, args): self.args = args def start(self, arg): self.p = subprocess.Popen(self.args) def step(self): r = self.p.poll() if r is None: return INCOMPLETE return r def PRINT(obj): return CALL(print_obj, obj) #################################################### # [Dummy] Components involved in learning algorithms class Dataset(object): def __init__(self, data): self.pos = 0 self.data = data def next(self): rval = self.data[self.pos] self.pos += 1 if self.pos == len(self.data): self.pos = 0 return rval def seek(self, pos): self.pos = pos class KFold(object): def __init__(self, data, K): self.data = data self.k = -1 self.scores = [None]*K self.K = K def next_fold(self): self.k += 1 self.data.seek(0) # restart the stream def next(self): #TODO: skip the examples that are ommitted in this split return self.data.next() def init_test(self): pass def next_test(self): return self.data.next() def test_size(self): return 5 def store_scores(self, scores): self.scores[self.k] = scores def prog(self, clear, train, test): return REPEAT(self.K, [ CALL(self.next_fold), clear, train, CALL(self.init_test), BUFFER_REPEAT(self.test_size(), SEQ([ CALL(self.next_test), test])), FILT(self.store_scores) ]) class PCA_Analysis(object): def __init__(self): self.clear() def clear(self): self.mean = 0 self.eigvecs=0 self.eigvals=0 def analyze(self, X): self.mean = numpy.mean(X, axis=0) self.eigvecs=1 self.eigvals=1 def filt(self, X): return (X - self.mean) * self.eigvecs #TODO: divide by root eigvals? def pseudo_inverse(self, Y): return Y class Layer(object): def __init__(self, w): self.w = w def filt(self, x): return self.w*x def clear(self): self.w =0 def print_obj(obj): print obj def print_obj_attr(obj, attr): print getattr(obj, attr) def no_op(*args, **kwargs): pass def cd1_update(X, layer, lr): # update self.layer from observation X layer.w += X.mean() * lr #TODO: not exactly correct math! ############################################################### # Example algorithms written in this control flow mini-language def main_weave(): # Uses weave to demonstrate the interleaving of two bufferings of a single stream l = [0] def f(a): print l l[0] += a return l[0] print WEAVE(1, [ BUFFER_REPEAT(3,CALL(f,1)), BUFFER_REPEAT(5,CALL(f,1)), ]).run() def main_weave_popen(): # Uses weave and Popen to demonstrate the control of a program with some asynchronous # parallelism p = WEAVE(2,[ SEQ([POPEN(['sleep', '5']), PRINT('done 1')]), SEQ([POPEN(['sleep', '10']), PRINT('done 2')]), LOOP([ CALL(print_obj, 'polling...'), CALL(time.sleep, 1)])]) # The LOOP would forever if the WEAVE were not configured to stop after 2 of its elements # complete. p.run() # Note that the program can be run multiple times... p.run() main = main_weave_popen def main_kfold_dbn(): # Uses many of the control-flow elements to define the k-fold evaluation of a dbn # The algorithm is not quite right, but the example shows off all of the required # control-flow elements I think. # create components dataset = Dataset(numpy.random.RandomState(123).randn(13,1)) pca = PCA_Analysis() layer1 = Layer(w=4) layer2 = Layer(w=3) kf = KFold(dataset, K=10) pca_batchsize=1000 cd_batchsize = 5 n_cd_updates_layer1 = 10 n_cd_updates_layer2 = 10 # create algorithm train_pca = SEQ([ BUFFER_REPEAT(pca_batchsize, CALL(kf.next)), FILT(pca.analyze)]) train_layer1 = REPEAT(n_cd_updates_layer1, [ BUFFER_REPEAT(cd_batchsize, CALL(kf.next)), FILT(pca.filt), FILT(cd1_update, layer=layer1, lr=.01)]) train_layer2 = REPEAT(n_cd_updates_layer2, [ BUFFER_REPEAT(cd_batchsize, CALL(kf.next)), FILT(pca.filt), FILT(layer1.filt), FILT(cd1_update, layer=layer2, lr=.01)]) kfold_prog = kf.prog( clear = SEQ([ # FRAGMENT 1: this bit is the reset/clear stage CALL(pca.clear), CALL(layer1.clear), CALL(layer2.clear), ]), train = SEQ([ train_pca, WEAVE(1, [ # Silly example of how to do debugging / loggin with WEAVE train_layer1, LOOP(CALL(print_obj_attr, layer1, 'w'))]), train_layer2, ]), test=SEQ([ FILT(pca.filt), # may want to allow this SEQ to be FILT(layer1.filt), # optimized into a shorter one that FILT(layer2.filt), # compiles these calls together with FILT(numpy.mean)])) # Theano pkg1 = dict(prog=kfold_prog, kf=kf) pkg2 = copy.deepcopy(pkg1) # programs can be copied try: pkg3 = cPickle.loads(cPickle.dumps(pkg1)) except: print >> sys.stderr, "pickling doesnt work, but it can be fixed I think" pkg = pkg2 # running a program updates the variables in its package, but not the other package pkg['prog'].run() print pkg['kf'].scores if __name__ == '__main__': sys.exit(main())