Mercurial > pylearn
view doc/v2_planning/plugin_greenlet.py @ 1367:9474fb4ad109
Refactored datalearn committee file to be easier to read
author | Olivier Delalleau <delallea@iro> |
---|---|
date | Mon, 15 Nov 2010 11:51:33 -0500 |
parents | acfd5e747a75 |
children |
line wrap: on
line source
"""plugin_greenlet - draft of library architecture using greenlets HISTORICAL - NOT ACTUALLY A PROPOSAL ==================================== This was the original approach for what I renamed to plugin_JB, until I realized that I could get the end result without using greenlets at all. Still, greenlets seem like they could be neat and making this program stretched my mind so I keep it. There's something wrong when you run with the kfold validation, but until that point I think it works. """ __license__ = None __copyright__ = None import copy, sys import numpy from greenlet import greenlet def vm_unpack(incoming): # can't reliably distinguish between a kwargs-only switch and a switch with one dict # argument if incoming is None: rval = (), {} if isinstance(incoming, dict): rval = (), incoming elif isinstance(incoming, tuple): if (len(incoming)==2 and isinstance(incoming[0], tuple) and isinstance(incoming[1], dict)): rval = incoming else: rval = incoming, {} else: rval = (incoming,), {} #print 'unpack', incoming, rval return rval[0][0], rval[0][1], rval[0][2:], rval[1] def unpack_from_vm(incoming): assert isinstance(incoming, tuple) assert len(incoming)==4 return incoming def vm_run(prog, *args, **kwargs): #TODO: make this into a class with different ways to start the loop # for example, if the (gr, dest, a, kw) tuple is returned, # then a program could be run for N steps and then paused, # saved, and restarted. n_steps = kwargs.pop('n_steps', float('inf')) def vm_loop(gr, dest, a, kw): loop_iter = 0 while loop_iter < n_steps: if gr == 'return': break #print 'vm_loop gr=',gr,'args=',a, 'kwargs=', kw gr, dest, a, kw = gr.switch(vm, gr, dest, a, kw) #print 'gmain incoming', incoming loop_iter += 1 # permit restarting return gr, dest, a, kw vm = greenlet(vm_loop) return vm.switch(prog, 'return', args, kwargs) #################################################### # CONTROL-FLOW CONSTRUCTS def SEQ(glets): return REPEAT(1, glets) def REPEAT(N, glets): def repeat_task(vm, gself, dest, args, kwargs): while True: for i in xrange(N): for glet in glets: #print 'repeat_task_i dest=%(dest)s args=%(args)s, kw=%(kwargs)s'%locals() # jump to task `glet` # with instructions to report results back to this loop `g` _vm, _gself, _dest, args, kwargs = vm.switch(glet, gself, args, kwargs) assert _gself is gself assert _dest is None # instructions can't tell us where to jump vm, gself, dest, args, kwargs = vm.switch(dest, None, args, kwargs) return greenlet(repeat_task) def LOOP(seq): #TODO: implement a true infinite loop try: iter(seq) return REPEAT(sys.maxint, seq) except TypeError: return REPEAT(sys.maxint, [seq]) def CHOOSE(which, options): raise NotImplementedError() def WEAVE(threads): def weave_task(vm, gself, dest, args, kwargs): # weave works by telling its threads that *it* is the vm # and reporting back to the real vm indirectly while True: # execution of weave is an iteration through this loop # initially broadcast the args and kwargs to all threads all_threads_live = True thread_info = [(t, 'return', args, kwargs) for t in threads] #print 'weave start -------------' while all_threads_live: #print 'weave iter' for i in xrange(len(threads)): t_next, t_dest, t_args, t_kwargs = thread_info[i] #tell the vm we're up to something, but ask it to come right back #print 'weave 1' _ignore = vm.switch(gself, None, (), {}) # pretend we're the vm_loop and tell the # thread to advance by one and report back to us #print 'weave 2', thread_info[i] thread_info[i] = t_next.switch(gself, t_next, t_dest, t_args, t_kwargs) #print 'weave 3', thread_info[i] if thread_info[i][0] is 'return': #print 'thread has finished', i all_threads_live = False # some thread has died so we return control to parent #####print 'weave returning', dest, args, kwargs vm, gself, dest, args, kwargs = vm.switch(dest, None, args, kwargs) #####print 'weave continuing', dest, args, kwargs return greenlet(weave_task) def BUFFER(N, glet): def BUFFER_loop(vm, gself, dest, args, kwargs): while True: #body runs once per execution buf = [] for i in xrange(N): # jump to task `glet` # with instructions to report results back to this loop `g` _vm, _gself, _dest, _args, _kwargs = vm.switch(glet, gself, args, kwargs) buf.append(_args[0]) assert len(_args)==1 assert _kwargs=={} assert _gself is gself assert _dest is None # instructions can't tell us where to jump buf = numpy.asarray(buf) vm, gself, dest, args, kwargs = vm.switch(dest, None, (buf,), {}) return greenlet(BUFFER_loop) def CALL(fn): """ Create a greenlet whose first argument is the return-jump location. fn must accept as the first positional argument this greenlet itself, which can be used as the return-jump location for internal greenlet switches (ideally using gswitch). """ def CALL_loop(vm, gself, dest, args, kwargs): while True: #print 'CALL calling', fn.__name__, args, kwargs t = fn(*args, **kwargs) #TODO consider a protocol for returning args, kwargs if t is None: _vm,_gself,dest, args, kwargs = vm.switch(dest, None, (), {}) else: _vm,_gself,dest, args, kwargs = vm.switch(dest, None, (t,), {}) assert gself is _gself return greenlet(CALL_loop) #################################################### # Components involved in the learning process class Dataset(object): def __init__(self, data): self.pos = 0 self.data = data def next(self): rval = self.data[self.pos] self.pos += 1 if self.pos == len(self.data): self.pos = 0 return rval def seek(self, pos): self.pos = pos class KFold(object): def __init__(self, data, K): self.data = data self.k = -1 self.scores = [None]*K self.K = K def next_fold(self): self.k += 1 self.data.seek(0) # restart the stream def next(self): #TODO: skip the examples that are ommitted in this split return self.data.next() def init_test(self): pass def next_test(self): return self.data.next() def test_size(self): return 5 def store_scores(self, scores): self.scores[self.k] = scores class PCA_Analysis(object): def __init__(self): self.clear() def clear(self): self.mean = 0 self.eigvecs=0 self.eigvals=0 def analyze(self, X): self.mean = X.mean(axis=0) self.eigvecs=1 self.eigvals=1 def filt(self, X): return (X - self.mean) * self.eigvecs #TODO: divide by root eigvals? def pseudo_inverse(self, Y): return Y class Layer(object): def __init__(self, w): self.w = w def filt(self, x): return self.w*x def clear(self): self.w =0 def print_obj(vm, gself, obj): print obj def no_op(*args, **kwargs): pass class cd1_update(object): def __init__(self, layer, lr): self.layer = layer self.lr = lr def __call__(self, X): # update self.layer from observation X print 'cd1', X print X.mean() self.layer.w += X.mean() * self.lr #TODO: not exactly correct math def main(): # create components dataset = Dataset(numpy.random.RandomState(123).randn(13,1)) pca = PCA_Analysis() layer1 = Layer(w=4) layer2 = Layer(w=3) kf = KFold(dataset, K=10) # create algorithm train_pca = SEQ([ BUFFER(1000, CALL(kf.next)), CALL(pca.analyze)]) train_layer1 = REPEAT(10, [ BUFFER(10, CALL(kf.next)), CALL(pca.filt), CALL(cd1_update(layer1, lr=.01))]) train_layer2 = REPEAT(10, [ BUFFER(10, CALL(kf.next)), CALL(pca.filt), CALL(layer1.filt), CALL(cd1_update(layer2, lr=.01))]) def print_layer_w(*a,**kw): print layer1.w train_prog = SEQ([ train_pca, WEAVE([ train_layer1, LOOP(CALL(print_layer_w))]), train_layer2, ]) kfold_prog = REPEAT(10, [ CALL(kf.next_fold), CALL(pca.clear), CALL(layer1.clear), CALL(layer2.clear), train_prog, CALL(kf.init_test), BUFFER(kf.test_size(), SEQ([ CALL(kf.next_test), CALL(pca.filt), # may want to allow this SEQ to be CALL(layer1.filt), # optimized into a shorter one that CALL(layer2.filt), CALL(numpy.mean)])), # chains together theano graphs CALL(kf.store_scores), ]) vm_run(kfold_prog, n_steps=500) print kf.scores if __name__ == '__main__': sys.exit(main())