Mercurial > pylearn
view doc/v2_planning/plugin_greenlet.py @ 1197:a60b3472c4ba
more progress on greenlets
author | James Bergstra <bergstrj@iro.umontreal.ca> |
---|---|
date | Sun, 19 Sep 2010 23:49:24 -0400 |
parents | e9bb3340a870 |
children | acfd5e747a75 |
line wrap: on
line source
"""plugin_greenlet - draft of library architecture using greenlets""" """ - PICKLABLE - algorithms are serializable at all points during execution - ITERATOR walks through algorithms with fine granularity - COMPONENTS - library provides components on which programs operate - ALGORITHMS - library provides algorithms in clean (no hooks) form - HOOKS - user can insert print / debug logic with search/replace type calls e.g. prog.find(CALL(cd1_update)).replace_with(SEQ([CALL(cd1_update), CALL(debugfn)])) - PRINTING - user can print the 'program code' of an algorithm built from library pieces - MODULAR EXPERIMENTS - an experiment object with one (or more?) programs and all of the objects referred to by those programs. It is the preferred type of object to be serialized. The main components of the algorithms should be top-level attributes of the package. This object can be serialized and loaded in another process to implement job migration. - OPTIMIZATION - program can be optimized automatically e.g. BUFFER(N, CALL(dataset.next)) can be replaced if dataset.next implements the right attribute/protocol for 'bufferable' or something. e.g. SEQ([a,b,c,d]) can be compiled with Theano if sub-sequence is compatible - don't need greenlets to get efficiency, the implementations of control flow ops can manage a stack or stack tree in the vm (like greenlets do I think) we don't really need greenlets/stackless I don't think """ __license__ = None __copyright__ = None import copy, sys import numpy from greenlet import greenlet def vm_unpack(incoming): # can't reliably distinguish between a kwargs-only switch and a switch with one dict # argument if incoming is None: rval = (), {} if isinstance(incoming, dict): rval = (), incoming elif isinstance(incoming, tuple): if (len(incoming)==2 and isinstance(incoming[0], tuple) and isinstance(incoming[1], dict)): rval = incoming else: rval = incoming, {} else: rval = (incoming,), {} #print 'unpack', incoming, rval return rval[0][0], rval[0][1], rval[0][2:], rval[1] def unpack_from_vm(incoming): assert isinstance(incoming, tuple) assert len(incoming)==4 return incoming def vm_run(prog, *args, **kwargs): #TODO: make this into a class with different ways to start the loop # for example, if the (gr, dest, a, kw) tuple is returned, # then a program could be run for N steps and then paused, # saved, and restarted. n_steps = kwargs.pop('n_steps', float('inf')) def vm_loop(gr, dest, a, kw): loop_iter = 0 while loop_iter < n_steps: if gr == 'return': break #print 'vm_loop gr=',gr,'args=',a, 'kwargs=', kw gr, dest, a, kw = gr.switch(vm, gr, dest, a, kw) #print 'gmain incoming', incoming loop_iter += 1 # permit restarting return gr, dest, a, kw vm = greenlet(vm_loop) return vm.switch(prog, 'return', args, kwargs) #################################################### # CONTROL-FLOW CONSTRUCTS def SEQ(glets): return REPEAT(1, glets) def REPEAT(N, glets): def repeat_task(vm, gself, dest, args, kwargs): while True: for i in xrange(N): for glet in glets: #print 'repeat_task_i dest=%(dest)s args=%(args)s, kw=%(kwargs)s'%locals() # jump to task `glet` # with instructions to report results back to this loop `g` _vm, _gself, _dest, args, kwargs = vm.switch(glet, gself, args, kwargs) assert _gself is gself assert _dest is None # instructions can't tell us where to jump vm, gself, dest, args, kwargs = vm.switch(dest, None, args, kwargs) return greenlet(repeat_task) def LOOP(seq): #TODO: implement a true infinite loop try: iter(seq) return REPEAT(sys.maxint, seq) except TypeError: return REPEAT(sys.maxint, [seq]) def CHOOSE(which, options): raise NotImplementedError() def WEAVE(threads): def weave_task(vm, gself, dest, args, kwargs): # weave works by telling its threads that *it* is the vm # and reporting back to the real vm indirectly while True: # execution of weave is an iteration through this loop # initially broadcast the args and kwargs to all threads all_threads_live = True thread_info = [(t, 'return', args, kwargs) for t in threads] #print 'weave start -------------' while all_threads_live: #print 'weave iter' for i in xrange(len(threads)): t_next, t_dest, t_args, t_kwargs = thread_info[i] #tell the vm we're up to something, but ask it to come right back #print 'weave 1' _ignore = vm.switch(gself, None, (), {}) # pretend we're the vm_loop and tell the # thread to advance by one and report back to us #print 'weave 2', thread_info[i] thread_info[i] = t_next.switch(gself, t_next, t_dest, t_args, t_kwargs) #print 'weave 3', thread_info[i] if thread_info[i][0] is 'return': #print 'thread has finished', i all_threads_live = False # some thread has died so we return control to parent #####print 'weave returning', dest, args, kwargs vm, gself, dest, args, kwargs = vm.switch(dest, None, args, kwargs) #####print 'weave continuing', dest, args, kwargs return greenlet(weave_task) def BUFFER(N, glet): def BUFFER_loop(vm, gself, dest, args, kwargs): while True: #body runs once per execution buf = [] for i in xrange(N): # jump to task `glet` # with instructions to report results back to this loop `g` _vm, _gself, _dest, _args, _kwargs = vm.switch(glet, gself, args, kwargs) buf.append(_args[0]) assert len(_args)==1 assert _kwargs=={} assert _gself is gself assert _dest is None # instructions can't tell us where to jump buf = numpy.asarray(buf) vm, gself, dest, args, kwargs = vm.switch(dest, None, (buf,), {}) return greenlet(BUFFER_loop) def CALL(fn): """ Create a greenlet whose first argument is the return-jump location. fn must accept as the first positional argument this greenlet itself, which can be used as the return-jump location for internal greenlet switches (ideally using gswitch). """ def CALL_loop(vm, gself, dest, args, kwargs): while True: #print 'CALL calling', fn.__name__, args, kwargs t = fn(*args, **kwargs) #TODO consider a protocol for returning args, kwargs if t is None: _vm,_gself,dest, args, kwargs = vm.switch(dest, None, (), {}) else: _vm,_gself,dest, args, kwargs = vm.switch(dest, None, (t,), {}) assert gself is _gself return greenlet(CALL_loop) #################################################### # Components involved in the learning process class Dataset(object): def __init__(self, data): self.pos = 0 self.data = data def next(self): rval = self.data[self.pos] self.pos += 1 if self.pos == len(self.data): self.pos = 0 return rval def seek(self, pos): self.pos = pos class KFold(object): def __init__(self, data, K): self.data = data self.k = -1 self.scores = [None]*K self.K = K def next_fold(self): self.k += 1 self.data.seek(0) # restart the stream def next(self): #TODO: skip the examples that are ommitted in this split return self.data.next() def init_test(self): pass def next_test(self): return self.data.next() def test_size(self): return 5 def store_scores(self, scores): self.scores[self.k] = scores class PCA_Analysis(object): def __init__(self): self.clear() def clear(self): self.mean = 0 self.eigvecs=0 self.eigvals=0 def analyze(self, X): self.mean = X.mean(axis=0) self.eigvecs=1 self.eigvals=1 def filt(self, X): return (X - self.mean) * self.eigvecs #TODO: divide by root eigvals? def pseudo_inverse(self, Y): return Y class Layer(object): def __init__(self, w): self.w = w def filt(self, x): return self.w*x def clear(self): self.w =0 def print_obj(vm, gself, obj): print obj def no_op(*args, **kwargs): pass class cd1_update(object): def __init__(self, layer, lr): self.layer = layer self.lr = lr def __call__(self, X): # update self.layer from observation X print 'cd1', X print X.mean() self.layer.w += X.mean() * self.lr #TODO: not exactly correct math def main(): # create components dataset = Dataset(numpy.random.RandomState(123).randn(13,1)) pca = PCA_Analysis() layer1 = Layer(w=4) layer2 = Layer(w=3) kf = KFold(dataset, K=10) # create algorithm train_pca = SEQ([ BUFFER(1000, CALL(kf.next)), CALL(pca.analyze)]) train_layer1 = REPEAT(10, [ BUFFER(10, CALL(kf.next)), CALL(pca.filt), CALL(cd1_update(layer1, lr=.01))]) train_layer2 = REPEAT(10, [ BUFFER(10, CALL(kf.next)), CALL(pca.filt), CALL(layer1.filt), CALL(cd1_update(layer2, lr=.01))]) def print_layer_w(*a,**kw): print layer1.w train_prog = SEQ([ train_pca, WEAVE([ train_layer1, LOOP(CALL(print_layer_w))]), train_layer2, ]) kfold_prog = REPEAT(10, [ CALL(kf.next_fold), CALL(pca.clear), CALL(layer1.clear), CALL(layer2.clear), train_prog, CALL(kf.init_test), BUFFER(kf.test_size(), SEQ([ CALL(kf.next_test), CALL(pca.filt), # may want to allow this SEQ to be CALL(layer1.filt), # optimized into a shorter one that CALL(layer2.filt), CALL(numpy.mean)])), # chains together theano graphs CALL(kf.store_scores), ]) vm_run(kfold_prog, n_steps=500) print kf.scores if __name__ == '__main__': sys.exit(main())