# HG changeset patch # User James Bergstra # Date 1284954564 14400 # Node ID a60b3472c4baa1b4849ed5d42c34c8fab1cf0cd7 # Parent e9bb3340a8702fdd5dfd089f7092f491d08d5ffe more progress on greenlets diff -r e9bb3340a870 -r a60b3472c4ba doc/v2_planning/plugin_greenlet.py --- a/doc/v2_planning/plugin_greenlet.py Sun Sep 19 13:06:16 2010 -0400 +++ b/doc/v2_planning/plugin_greenlet.py Sun Sep 19 23:49:24 2010 -0400 @@ -1,5 +1,38 @@ """plugin_greenlet - draft of library architecture using greenlets""" + +""" + +- PICKLABLE - algorithms are serializable at all points during execution + +- ITERATOR walks through algorithms with fine granularity + +- COMPONENTS - library provides components on which programs operate + +- ALGORITHMS - library provides algorithms in clean (no hooks) form + +- HOOKS - user can insert print / debug logic with search/replace type calls + e.g. prog.find(CALL(cd1_update)).replace_with(SEQ([CALL(cd1_update), CALL(debugfn)])) + +- PRINTING - user can print the 'program code' of an algorithm built from library pieces + +- MODULAR EXPERIMENTS - an experiment object with one (or more?) programs and all of the objects referred to by + those programs. It is the preferred type of object to be serialized. The main components of + the algorithms should be top-level attributes of the package. This object can be serialized + and loaded in another process to implement job migration. + +- OPTIMIZATION - program can be optimized automatically + e.g. BUFFER(N, CALL(dataset.next)) can be replaced if dataset.next implements the right + attribute/protocol for 'bufferable' or something. + + e.g. SEQ([a,b,c,d]) can be compiled with Theano if sub-sequence is compatible + +- don't need greenlets to get efficiency, the implementations of control flow ops can manage a + stack or stack tree in the vm (like greenlets do I think) we don't really need + greenlets/stackless I don't think + +""" + __license__ = None __copyright__ = None @@ -33,23 +66,32 @@ return incoming def vm_run(prog, *args, **kwargs): - + #TODO: make this into a class with different ways to start the loop + # for example, if the (gr, dest, a, kw) tuple is returned, + # then a program could be run for N steps and then paused, + # saved, and restarted. + n_steps = kwargs.pop('n_steps', float('inf')) def vm_loop(gr, dest, a, kw): - while True: + loop_iter = 0 + while loop_iter < n_steps: if gr == 'return': - return a, kw + break #print 'vm_loop gr=',gr,'args=',a, 'kwargs=', kw gr, dest, a, kw = gr.switch(vm, gr, dest, a, kw) #print 'gmain incoming', incoming + loop_iter += 1 + # permit restarting + return gr, dest, a, kw vm = greenlet(vm_loop) - return vm.switch(prog, 'return', args, kwargs) +#################################################### +# CONTROL-FLOW CONSTRUCTS -def seq(glets): - return repeat(1, glets) +def SEQ(glets): + return REPEAT(1, glets) -def repeat(N, glets): +def REPEAT(N, glets): def repeat_task(vm, gself, dest, args, kwargs): while True: for i in xrange(N): @@ -63,23 +105,77 @@ vm, gself, dest, args, kwargs = vm.switch(dest, None, args, kwargs) return greenlet(repeat_task) -def choose(which, options): +def LOOP(seq): + #TODO: implement a true infinite loop + try: + iter(seq) + return REPEAT(sys.maxint, seq) + except TypeError: + return REPEAT(sys.maxint, [seq]) + +def CHOOSE(which, options): raise NotImplementedError() -def weave(threads): - raise NotImplementedError() +def WEAVE(threads): + def weave_task(vm, gself, dest, args, kwargs): + # weave works by telling its threads that *it* is the vm + # and reporting back to the real vm indirectly + while True: # execution of weave is an iteration through this loop + # initially broadcast the args and kwargs to all threads + all_threads_live = True + thread_info = [(t, 'return', args, kwargs) for t in threads] + #print 'weave start -------------' + while all_threads_live: + #print 'weave iter' + for i in xrange(len(threads)): + t_next, t_dest, t_args, t_kwargs = thread_info[i] + #tell the vm we're up to something, but ask it to come right back + #print 'weave 1' + _ignore = vm.switch(gself, None, (), {}) -def service(fn): + # pretend we're the vm_loop and tell the + # thread to advance by one and report back to us + #print 'weave 2', thread_info[i] + thread_info[i] = t_next.switch(gself, t_next, t_dest, t_args, t_kwargs) + #print 'weave 3', thread_info[i] + if thread_info[i][0] is 'return': + #print 'thread has finished', i + all_threads_live = False + + # some thread has died so we return control to parent + #####print 'weave returning', dest, args, kwargs + vm, gself, dest, args, kwargs = vm.switch(dest, None, args, kwargs) + #####print 'weave continuing', dest, args, kwargs + return greenlet(weave_task) + +def BUFFER(N, glet): + def BUFFER_loop(vm, gself, dest, args, kwargs): + while True: #body runs once per execution + buf = [] + for i in xrange(N): + # jump to task `glet` + # with instructions to report results back to this loop `g` + _vm, _gself, _dest, _args, _kwargs = vm.switch(glet, gself, args, kwargs) + buf.append(_args[0]) + assert len(_args)==1 + assert _kwargs=={} + assert _gself is gself + assert _dest is None # instructions can't tell us where to jump + buf = numpy.asarray(buf) + vm, gself, dest, args, kwargs = vm.switch(dest, None, (buf,), {}) + return greenlet(BUFFER_loop) + +def CALL(fn): """ Create a greenlet whose first argument is the return-jump location. fn must accept as the first positional argument this greenlet itself, which can be used as the return-jump location for internal greenlet switches (ideally using gswitch). """ - def service_loop(vm, gself, dest, args, kwargs): + def CALL_loop(vm, gself, dest, args, kwargs): while True: - #print 'service calling', fn.__name__, args, kwargs - t = fn(vm, gself, *args, **kwargs) + #print 'CALL calling', fn.__name__, args, kwargs + t = fn(*args, **kwargs) #TODO consider a protocol for returning args, kwargs if t is None: _vm,_gself,dest, args, kwargs = vm.switch(dest, None, (), {}) @@ -87,32 +183,59 @@ _vm,_gself,dest, args, kwargs = vm.switch(dest, None, (t,), {}) assert gself is _gself - return greenlet(service_loop) + return greenlet(CALL_loop) #################################################### +# Components involved in the learning process class Dataset(object): def __init__(self, data): self.pos = 0 self.data = data - def next(self, vm, gself): + def next(self): rval = self.data[self.pos] self.pos += 1 if self.pos == len(self.data): self.pos = 0 return rval + def seek(self, pos): + self.pos = pos + +class KFold(object): + def __init__(self, data, K): + self.data = data + self.k = -1 + self.scores = [None]*K + self.K = K + def next_fold(self): + self.k += 1 + self.data.seek(0) # restart the stream + def next(self): + #TODO: skip the examples that are ommitted in this split + return self.data.next() + def init_test(self): + pass + def next_test(self): + return self.data.next() + def test_size(self): + return 5 + def store_scores(self, scores): + self.scores[self.k] = scores class PCA_Analysis(object): def __init__(self): + self.clear() + + def clear(self): self.mean = 0 self.eigvecs=0 self.eigvals=0 - def analyze(self, me, X): + def analyze(self, X): self.mean = X.mean(axis=0) self.eigvecs=1 self.eigvals=1 - def filt(self,me, X): - return (self.X - self.mean) * self.eigvecs #TODO: divide by root eigvals? + def filt(self, X): + return (X - self.mean) * self.eigvecs #TODO: divide by root eigvals? def pseudo_inverse(self, Y): return Y @@ -121,101 +244,82 @@ self.w = w def filt(self, x): return self.w*x - -def batches(src, N): - # src is a service - def rval(me): - print 'batches src=', src, 'me=', me - return numpy.asarray([gswitch(src, me)[0][0] for i in range(N)]) - return rval + def clear(self): + self.w =0 def print_obj(vm, gself, obj): print obj def no_op(*args, **kwargs): pass -def build_pca_trainer(data_src, pca_module, N): - return greenlet( - batches( - N=5, - src=inf_data, - dest=flow(pca_module.analyze, - dest=layer1_trainer))) +class cd1_update(object): + def __init__(self, layer, lr): + self.layer = layer + self.lr = lr + + def __call__(self, X): + # update self.layer from observation X + print 'cd1', X + print X.mean() + self.layer.w += X.mean() * self.lr #TODO: not exactly correct math def main(): - dataset = Dataset(numpy.random.RandomState(123).randn(10,2)) - - prog=repeat(3, [service(dataset.next),service(print_obj)]) - vm_run(prog) - vm_run(prog) - - -def main_arch(): - # create components - dataset = Dataset(numpy.random.RandomState(123).randn(10,2)) - pca_module = PCA_Analysis() + dataset = Dataset(numpy.random.RandomState(123).randn(13,1)) + pca = PCA_Analysis() layer1 = Layer(w=4) layer2 = Layer(w=3) kf = KFold(dataset, K=10) # create algorithm - train_pca = seq([ np_batch(kf.next, 1000), pca.analyze]) - train_layer1 = repeat(100, [kf.next, pca.filt, cd1_update(layer1, lr=.01)]) + train_pca = SEQ([ + BUFFER(1000, CALL(kf.next)), + CALL(pca.analyze)]) + + train_layer1 = REPEAT(10, [ + BUFFER(10, CALL(kf.next)), + CALL(pca.filt), + CALL(cd1_update(layer1, lr=.01))]) + + train_layer2 = REPEAT(10, [ + BUFFER(10, CALL(kf.next)), + CALL(pca.filt), + CALL(layer1.filt), + CALL(cd1_update(layer2, lr=.01))]) + + def print_layer_w(*a,**kw): + print layer1.w - algo = repeat(10, [ - KFold.step, - seq([train_pca, - train_layer1, - train_layer2, - train_classifier, - save_classifier, - test_classifier]), - KFold.set_score]) + train_prog = SEQ([ + train_pca, + WEAVE([ + train_layer1, + LOOP(CALL(print_layer_w))]), + train_layer2, + ]) - gswitch(algo) + kfold_prog = REPEAT(10, [ + CALL(kf.next_fold), + CALL(pca.clear), + CALL(layer1.clear), + CALL(layer2.clear), + train_prog, + CALL(kf.init_test), + BUFFER(kf.test_size(), + SEQ([ + CALL(kf.next_test), + CALL(pca.filt), # may want to allow this SEQ to be + CALL(layer1.filt), # optimized into a shorter one that + CALL(layer2.filt), + CALL(numpy.mean)])), # chains together theano graphs + CALL(kf.store_scores), + ]) + + vm_run(kfold_prog, n_steps=500) + print kf.scores -def main1(): - dataset = Dataset(numpy.random.RandomState(123).randn(10,2)) - pca_module = PCA_Analysis() - - # pca - next_data = service(dataset.next) - b5 = service(batches(src=next_data, N=5)) - print_pca_analyze = flow(pca_module.analyze, dest=sink(print_obj)) - - # layer1_training - layer1_training = driver( - fn=cd1_trainer(layer1), - srcs=[], - ) - - gswitch(b5, print_pca_analyze) - if __name__ == '__main__': sys.exit(main()) - - -def flow(fn, dest): - def rval(*args, **kwargs): - while True: - print 'flow calling', fn.__name__, args, kwargs - t = fn(g, *args, **kwargs) - args, kwargs = gswitch(dest, t) - g = greenlet(rval) - return g - -def sink(fn): - def rval(*args, **kwargs): - return fn(g, *args, **kwargs) - g = greenlet(rval) - return g - -def consumer(fn, src): - def rval(*args, **kwargs): - while True: - fn(gswitch(src, *args, **kwargs)) - return greenlet(rval)