Mercurial > pylearn
view doc/v2_planning/plugin_JB.py @ 1200:acfd5e747a75
v2planning - a few changes to plugin proposals
author | James Bergstra <bergstrj@iro.umontreal.ca> |
---|---|
date | Mon, 20 Sep 2010 11:28:23 -0400 |
parents | 98954d8cb92d |
children | 865936d8221b |
line wrap: on
line source
"""plugin_JB - draft of potential library architecture using iterators This strategy makes use of a simple imperative language whose statements are python function calls to create learning algorithms that can be manipulated and executed in several desirable ways. The training procedure for a PCA module is easy to express: # allocate the relevant modules dataset = Dataset(numpy.random.RandomState(123).randn(13,1)) pca = PCA_Analysis() pca_batchsize=1000 # define the control-flow of the algorithm train_pca = SEQ([ BUFFER_REPEAT(pca_batchsize, CALL(dataset.next)), FILT(pca.analyze)]) # run the program VirtualMachine(train_pca).run() The CALL, SEQ, FILT, and BUFFER_REPEAT are control-flow elements. The control-flow elements I defined so far are: - CALL - a basic statement, just calls a python function - FILT - like call, but passes the return value of the last CALL or FILT to the python function - SEQ - a sequence of elements to run in order - REPEAT - do something N times (and return None or maybe the last CALL?) - BUFFER_REPEAT - do something N times and accumulate the return value from each iter - LOOP - do something an infinite number of times - CHOOSE - like a switch statement (should rename to SWITCH) - WEAVE - interleave execution of multiple control-flow elements We don't have many requirements per-se for the architecture, but I think this design respects and realizes all of them. The advantages of this approach are: - algorithms (including partially run ones) are COPYABLE, and SERIALIZABLE - algorithms can be executed without seizing control of the python process (the VM is an iterator) so your main loop (aka alternate VM implementation) can be checking for network or filesystem events related to job management - the library can provide learning algorithms via control-flow templates, and the user can edit them (with search/replace calls) to include HOOKS, and DIAGNOSTIC plug-in functionality e.g. prog.find(CALL(cd1_update, layer=layer1)).replace_with( SEQ([CALL(cd1_update, layer=layer1), CALL(my_debugfn)])) - user can print the 'program code' of an algorithm built from library pieces - program can be optimized automatically. - e.g. BUFFER(N, CALL(dataset.next)) could be replaced if dataset.next implements the right attribute/protocol for 'bufferable' or something. - e.g. SEQ([a,b,c,d]) could be compiled to a single CALL to a Theano-compiled function if a, b, c, and d are calls to callable objects that export something like a 'theano_SEQ' interface """ __license__ = 'TODO' __copyright__ = 'TODO' import copy, sys, cPickle import numpy ################################################### # Virtual Machine for executing programs class VirtualMachine(object): def __init__(self, prog): self.prog = prog self.started = False self.finished=False def __iter__(self): assert not self.started self.prog.start(None) self.started = True return self def next(self): if self.finished: raise StopIteration() r = self.prog.step() if r is INCOMPLETE: return r else: self.finished=True return r def run(self,n_steps=float('inf')): i = 0 for r in self: i += 1 if i > n_steps: break return r #################################################### # CONTROL-FLOW CONSTRUCTS class INCOMPLETE: """Return value for Element.step""" class ELEMENT(object): """ every execution block has a driver the driver calls start when entering a new control element - this would be called once per e.g. outer loop iteration the driver calls step to advance the control element - which returns INCOMPLETE - which returns any other object to indicate completion """ def start(self, arg): pass def step(self): pass class BUFFER_REPEAT(ELEMENT): """ Accumulate a number of return values into one list / array. The source of return values `src` is a control element that will be restarted repeatedly in order to fulfil the requiement of gathering N samples. TODO: support accumulating of tuples of arrays """ def __init__(self, N, src, storage=None): """ TODO: use preallocated `storage` """ self.N = N self.n = 0 self.src = src self.storage = storage self.src.start(None) if self.storage != None: raise NotImplementedError() def start(self, arg): self.buf = [None] * self.N self.n = 0 self.finished = False def step(self): assert not self.finished r = self.src.step() if r is INCOMPLETE: return r self.src.start(None) # restart our stream self.buf[self.n] = r self.n += 1 if self.n == self.N: self.finished = True return self.buf else: return INCOMPLETE assert 0 class CALL(ELEMENT): """ Control flow terminal - call a python function or method. Returns the return value of the call. """ def __init__(self, fn, *args, **kwargs): self.fn = fn self.args = args self.kwargs=kwargs self.use_start_arg = kwargs.pop('use_start_arg', False) def start(self, arg): self.start_arg = arg self.finished = False return self def step(self): assert not self.finished self.finished = True if self.use_start_arg: if self.args: raise TypeError('cant get positional args both ways') return self.fn(self.start_arg, **self.kwargs) else: return self.fn(*self.args, **self.kwargs) def __getstate__(self): rval = dict(self.__dict__) if type(self.fn) is type(self.step): #instancemethod fn = rval.pop('fn') rval['i fn'] = fn.im_func, fn.im_self, fn.im_class return rval def __setstate__(self, dct): if 'i fn' in dct: dct['fn'] = type(self.step)(*dct.pop('i fn')) self.__dict__.update(dct) def FILT(fn, **kwargs): """ Return a CALL object that uses the return value from the previous CALL as the first and only positional argument. """ return CALL(fn, use_start_arg=True, **kwargs) def CHOOSE(which, options): """ Execute one out of a number of optional control flow paths """ raise NotImplementedError() def LOOP(elements): #TODO: implement a true infinite loop try: iter(elements) return REPEAT(sys.maxint, elements) except TypeError: return REPEAT(sys.maxint, [elements]) class REPEAT(ELEMENT): def __init__(self, N, elements, pass_rvals=False): self.N = N self.elements = elements self.pass_rvals = pass_rvals #TODO: check for N being callable def start(self, arg): self.n = 0 #loop iteration self.idx = 0 #element idx self.finished = False self.elements[0].start(arg) def step(self): assert not self.finished r = self.elements[self.idx].step() if r is INCOMPLETE: return INCOMPLETE self.idx += 1 if self.idx < len(self.elements): self.elements[self.idx].start(r) return INCOMPLETE self.n += 1 if self.n < self.N: self.idx = 0 self.elements[self.idx].start(r) return INCOMPLETE else: self.finished = True return r def SEQ(elements): return REPEAT(1, elements) class WEAVE(ELEMENT): """ Interleave execution of a number of elements. TODO: allow a schedule (at least relative frequency) of elements from each program """ def __init__(self, elements): self.elements = elements def start(self, arg): for el in self.elements: el.start(arg) self.idx = 0 self.any_is_finished = False self.finished= False def step(self): assert not self.finished # if this is triggered, we have a broken driver self.idx = self.idx % len(self.elements) r = self.elements[self.idx].step() if r is not INCOMPLETE: self.any_is_finished = True self.idx += 1 if self.idx == len(self.elements) and self.any_is_finished: self.finished = True return None # dummy completion value else: return INCOMPLETE #################################################### # [Dummy] Components involved in learning algorithms class Dataset(object): def __init__(self, data): self.pos = 0 self.data = data def next(self): rval = self.data[self.pos] self.pos += 1 if self.pos == len(self.data): self.pos = 0 return rval def seek(self, pos): self.pos = pos class KFold(object): def __init__(self, data, K): self.data = data self.k = -1 self.scores = [None]*K self.K = K def next_fold(self): self.k += 1 self.data.seek(0) # restart the stream def next(self): #TODO: skip the examples that are ommitted in this split return self.data.next() def init_test(self): pass def next_test(self): return self.data.next() def test_size(self): return 5 def store_scores(self, scores): self.scores[self.k] = scores def prog(self, clear, train, test): return REPEAT(self.K, [ CALL(self.next_fold), clear, train, CALL(self.init_test), BUFFER_REPEAT(self.test_size(), SEQ([ CALL(self.next_test), test])), FILT(self.store_scores) ]) class PCA_Analysis(object): def __init__(self): self.clear() def clear(self): self.mean = 0 self.eigvecs=0 self.eigvals=0 def analyze(self, X): self.mean = numpy.mean(X, axis=0) self.eigvecs=1 self.eigvals=1 def filt(self, X): return (X - self.mean) * self.eigvecs #TODO: divide by root eigvals? def pseudo_inverse(self, Y): return Y class Layer(object): def __init__(self, w): self.w = w def filt(self, x): return self.w*x def clear(self): self.w =0 def print_obj(obj): print obj def print_obj_attr(obj, attr): print getattr(obj, attr) def no_op(*args, **kwargs): pass def cd1_update(X, layer, lr): # update self.layer from observation X layer.w += X.mean() * lr #TODO: not exactly correct math! def simple_main(): l = [0] def f(a): print l l[0] += a return l[0] print VirtualMachine(WEAVE([ BUFFER_REPEAT(3,CALL(f,1)), BUFFER_REPEAT(5,CALL(f,1)), ])).run() def main(): # create components dataset = Dataset(numpy.random.RandomState(123).randn(13,1)) pca = PCA_Analysis() layer1 = Layer(w=4) layer2 = Layer(w=3) kf = KFold(dataset, K=10) pca_batchsize=1000 cd_batchsize = 5 n_cd_updates_layer1 = 10 n_cd_updates_layer2 = 10 # create algorithm train_pca = SEQ([ BUFFER_REPEAT(pca_batchsize, CALL(kf.next)), FILT(pca.analyze)]) train_layer1 = REPEAT(n_cd_updates_layer1, [ BUFFER_REPEAT(cd_batchsize, CALL(kf.next)), FILT(pca.filt), FILT(cd1_update, layer=layer1, lr=.01)]) train_layer2 = REPEAT(n_cd_updates_layer2, [ BUFFER_REPEAT(cd_batchsize, CALL(kf.next)), FILT(pca.filt), FILT(layer1.filt), FILT(cd1_update, layer=layer2, lr=.01)]) kfold_prog = kf.prog( clear = SEQ([ # FRAGMENT 1: this bit is the reset/clear stage CALL(pca.clear), CALL(layer1.clear), CALL(layer2.clear), ]), train = SEQ([ train_pca, WEAVE([ # Silly example of how to do debugging / loggin with WEAVE train_layer1, LOOP(CALL(print_obj_attr, layer1, 'w'))]), train_layer2, ]), test=SEQ([ FILT(pca.filt), # may want to allow this SEQ to be FILT(layer1.filt), # optimized into a shorter one that FILT(layer2.filt), # compiles these calls together with FILT(numpy.mean)])) # Theano pkg1 = dict(prog=kfold_prog, kf=kf) pkg2 = copy.deepcopy(pkg1) # programs can be copied try: pkg3 = cPickle.loads(cPickle.dumps(pkg1)) except: print >> sys.stderr, "pickling doesnt work, but it can be fixed I think" pkg = pkg2 # running a program updates the variables in its package, but not the other package VirtualMachine(pkg['prog']).run() print pkg['kf'].scores if __name__ == '__main__': sys.exit(main())