# HG changeset patch # User James Bergstra # Date 1284915948 14400 # Node ID d3ee0d2d03e67fcdbace10938e00f746c683d935 # Parent 25d324ab372fb48c9fa021405d1feb8f247403b5 plugin_greenlet draft0 diff -r 25d324ab372f -r d3ee0d2d03e6 doc/v2_planning/plugin_greenlet.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/doc/v2_planning/plugin_greenlet.py Sun Sep 19 13:05:48 2010 -0400 @@ -0,0 +1,221 @@ +"""plugin_greenlet - draft of library architecture using greenlets""" + +__license__ = None +__copyright__ = None + +import copy, sys + +import numpy +from greenlet import greenlet + +def vm_unpack(incoming): + # can't reliably distinguish between a kwargs-only switch and a switch with one dict + # argument + if incoming is None: + rval = (), {} + if isinstance(incoming, dict): + rval = (), incoming + elif isinstance(incoming, tuple): + if (len(incoming)==2 + and isinstance(incoming[0], tuple) + and isinstance(incoming[1], dict)): + rval = incoming + else: + rval = incoming, {} + else: + rval = (incoming,), {} + #print 'unpack', incoming, rval + return rval[0][0], rval[0][1], rval[0][2:], rval[1] + +def unpack_from_vm(incoming): + assert isinstance(incoming, tuple) + assert len(incoming)==4 + return incoming + +def vm_run(prog, *args, **kwargs): + + def vm_loop(gr, dest, a, kw): + while True: + if gr == 'return': + return a, kw + print 'vm_loop gr=',gr,'args=',a, 'kwargs=', kw + gr, dest, a, kw = gr.switch(vm, gr, dest, a, kw) + #print 'gmain incoming', incoming + vm = greenlet(vm_loop) + + return vm.switch(prog, 'return', args, kwargs) + + +def seq(glets): + return repeat(1, glets) + +def repeat(N, glets): + def repeat_task(vm, gself, dest, args, kwargs): + while True: + for i in xrange(N): + for glet in glets: + print 'repeat_task_i dest=%(dest)s args=%(args)s, kw=%(kwargs)s'%locals() + # jump to task `glet` + # with instructions to report results back to this loop `g` + _vm, _gself, _dest, args, kwargs = vm.switch(glet, gself, args, kwargs) + assert _gself is gself + assert _dest is None # instructions can't tell us where to jump + vm, gself, dest, args, kwargs = vm.switch(dest, None, args, kwargs) + return greenlet(repeat_task) + +def choose(which, options): + raise NotImplementedError() + +def weave(threads): + raise NotImplementedError() + +def service(fn): + """ + Create a greenlet whose first argument is the return-jump location. + + fn must accept as the first positional argument this greenlet itself, which can be used as + the return-jump location for internal greenlet switches (ideally using gswitch). + """ + def service_loop(vm, gself, dest, args, kwargs): + while True: + print 'service calling', fn.__name__, args, kwargs + t = fn(vm, gself, *args, **kwargs) + #TODO consider a protocol for returning args, kwargs + if t is None: + _vm,_gself,dest, args, kwargs = vm.switch(dest, None, (), {}) + else: + _vm,_gself,dest, args, kwargs = vm.switch(dest, None, (t,), {}) + + assert gself is _gself + return greenlet(service_loop) + +#################################################### + +class Dataset(object): + def __init__(self, data): + self.pos = 0 + self.data = data + def next(self, vm, gself): + rval = self.data[self.pos] + self.pos += 1 + if self.pos == len(self.data): + self.pos = 0 + return rval + +class PCA_Analysis(object): + def __init__(self): + self.mean = 0 + self.eigvecs=0 + self.eigvals=0 + def analyze(self, me, X): + self.mean = X.mean(axis=0) + self.eigvecs=1 + self.eigvals=1 + def filt(self,me, X): + return (self.X - self.mean) * self.eigvecs #TODO: divide by root eigvals? + def pseudo_inverse(self, Y): + return Y + +class Layer(object): + def __init__(self, w): + self.w = w + def filt(self, x): + return self.w*x + +def batches(src, N): + # src is a service + def rval(me): + print 'batches src=', src, 'me=', me + return numpy.asarray([gswitch(src, me)[0][0] for i in range(N)]) + return rval + +def print_obj(vm, gself, obj): + print obj +def no_op(*args, **kwargs): + pass + +def build_pca_trainer(data_src, pca_module, N): + return greenlet( + batches( + N=5, + src=inf_data, + dest=flow(pca_module.analyze, + dest=layer1_trainer))) + +def main(): + dataset = Dataset(numpy.random.RandomState(123).randn(10,2)) + + prog=repeat(3, [service(dataset.next),service(print_obj)]) + vm_run(prog) + vm_run(prog) + + +def main_arch(): + + # create components + dataset = Dataset(numpy.random.RandomState(123).randn(10,2)) + pca_module = PCA_Analysis() + layer1 = Layer(w=4) + layer2 = Layer(w=3) + kf = KFold(dataset, K=10) + + # create algorithm + + train_pca = seq([ np_batch(kf.next, 1000), pca.analyze]) + train_layer1 = repeat(100, [kf.next, pca.filt, cd1_update(layer1, lr=.01)]) + + algo = repeat(10, [ + KFold.step, + seq([train_pca, + train_layer1, + train_layer2, + train_classifier, + save_classifier, + test_classifier]), + KFold.set_score]) + + gswitch(algo) + + +def main1(): + dataset = Dataset(numpy.random.RandomState(123).randn(10,2)) + pca_module = PCA_Analysis() + + # pca + next_data = service(dataset.next) + b5 = service(batches(src=next_data, N=5)) + print_pca_analyze = flow(pca_module.analyze, dest=sink(print_obj)) + + # layer1_training + layer1_training = driver( + fn=cd1_trainer(layer1), + srcs=[], + ) + + gswitch(b5, print_pca_analyze) + +if __name__ == '__main__': + sys.exit(main()) + + + +def flow(fn, dest): + def rval(*args, **kwargs): + while True: + print 'flow calling', fn.__name__, args, kwargs + t = fn(g, *args, **kwargs) + args, kwargs = gswitch(dest, t) + g = greenlet(rval) + return g + +def sink(fn): + def rval(*args, **kwargs): + return fn(g, *args, **kwargs) + g = greenlet(rval) + return g + +def consumer(fn, src): + def rval(*args, **kwargs): + while True: + fn(gswitch(src, *args, **kwargs)) + return greenlet(rval)