diff doc/v2_planning/plugin_greenlet.py @ 1197:a60b3472c4ba

more progress on greenlets
author James Bergstra <bergstrj@iro.umontreal.ca>
date Sun, 19 Sep 2010 23:49:24 -0400
parents e9bb3340a870
children acfd5e747a75
line wrap: on
line diff
--- a/doc/v2_planning/plugin_greenlet.py	Sun Sep 19 13:06:16 2010 -0400
+++ b/doc/v2_planning/plugin_greenlet.py	Sun Sep 19 23:49:24 2010 -0400
@@ -1,5 +1,38 @@
 """plugin_greenlet - draft of library architecture using greenlets"""
 
+
+"""
+
+- PICKLABLE - algorithms are serializable at all points during execution
+
+- ITERATOR walks through algorithms with fine granularity
+
+- COMPONENTS - library provides components on which programs operate
+
+- ALGORITHMS - library provides algorithms in clean (no hooks) form
+
+- HOOKS - user can insert print / debug logic with search/replace type calls 
+  e.g. prog.find(CALL(cd1_update)).replace_with(SEQ([CALL(cd1_update), CALL(debugfn)]))
+
+- PRINTING - user can print the 'program code' of an algorithm built from library pieces
+
+- MODULAR EXPERIMENTS - an experiment object with one (or more?) programs and all of the objects referred to by
+  those programs.  It is the preferred type of object to be serialized.  The main components of
+  the algorithms should be top-level attributes of the package.  This object can be serialized
+  and loaded in another process to implement job migration.
+
+- OPTIMIZATION - program can be optimized automatically
+    e.g. BUFFER(N, CALL(dataset.next))   can be replaced if dataset.next implements the right
+    attribute/protocol for 'bufferable' or something.
+
+    e.g. SEQ([a,b,c,d])  can be compiled with Theano if sub-sequence is compatible
+
+- don't need greenlets to get efficiency, the implementations of control flow ops can manage a
+  stack or stack tree in the vm (like greenlets do I think) we don't really need
+  greenlets/stackless I don't think
+
+"""
+
 __license__ = None
 __copyright__ = None
 
@@ -33,23 +66,32 @@
     return incoming
 
 def vm_run(prog, *args, **kwargs):
-
+    #TODO: make this into a class with different ways to start the loop
+    #      for example, if the (gr, dest, a, kw) tuple is returned, 
+    #      then a program could be run for N steps and then paused,
+    #      saved, and restarted.
+    n_steps = kwargs.pop('n_steps', float('inf'))
     def vm_loop(gr, dest, a, kw):
-        while True:
+        loop_iter = 0
+        while loop_iter < n_steps:
             if gr == 'return':
-                return a, kw
+                break
             #print 'vm_loop gr=',gr,'args=',a, 'kwargs=', kw
             gr, dest, a, kw = gr.switch(vm, gr, dest, a, kw)
             #print 'gmain incoming', incoming
+            loop_iter += 1
+        # permit restarting
+        return gr, dest, a, kw
     vm = greenlet(vm_loop)
-
     return vm.switch(prog, 'return', args, kwargs)
 
+####################################################
+# CONTROL-FLOW CONSTRUCTS
 
-def seq(glets):
-    return repeat(1, glets)
+def SEQ(glets):
+    return REPEAT(1, glets)
 
-def repeat(N, glets):
+def REPEAT(N, glets):
     def repeat_task(vm, gself, dest, args, kwargs):
         while True:
             for i in xrange(N):
@@ -63,23 +105,77 @@
             vm, gself, dest, args, kwargs = vm.switch(dest, None, args, kwargs)
     return greenlet(repeat_task)
 
-def choose(which, options):
+def LOOP(seq):
+    #TODO: implement a true infinite loop
+    try:
+        iter(seq)
+        return REPEAT(sys.maxint, seq)
+    except TypeError:
+        return REPEAT(sys.maxint, [seq])
+
+def CHOOSE(which, options):
     raise NotImplementedError()
 
-def weave(threads): 
-    raise NotImplementedError()
+def WEAVE(threads): 
+    def weave_task(vm, gself, dest, args, kwargs):
+        # weave works by telling its threads that *it* is the vm
+        # and reporting back to the real vm indirectly
+        while True: # execution of weave is an iteration through this loop
+            # initially broadcast the args and kwargs to all threads
+            all_threads_live = True
+            thread_info = [(t, 'return', args, kwargs) for t in threads]
+            #print 'weave start -------------'
+            while all_threads_live:
+                #print 'weave iter'
+                for i in xrange(len(threads)):
+                    t_next, t_dest, t_args, t_kwargs = thread_info[i]
+                    #tell the vm we're up to something, but ask it to come right back
+                    #print 'weave 1'
+                    _ignore = vm.switch(gself, None, (), {})
 
-def service(fn):
+                    # pretend we're the vm_loop and tell the
+                    # thread to advance by one and report back to us
+                    #print 'weave 2', thread_info[i]
+                    thread_info[i] = t_next.switch(gself, t_next, t_dest, t_args, t_kwargs)
+                    #print 'weave 3', thread_info[i]
+                    if thread_info[i][0] is 'return':
+                        #print 'thread has finished', i
+                        all_threads_live = False
+                        
+            # some thread has died so we return control to parent
+            #####print 'weave returning', dest, args, kwargs
+            vm, gself, dest, args, kwargs = vm.switch(dest, None, args, kwargs)
+            #####print 'weave continuing', dest, args, kwargs
+    return greenlet(weave_task)
+
+def BUFFER(N, glet):
+    def BUFFER_loop(vm, gself, dest, args, kwargs):
+        while True: #body runs once per execution
+            buf = []
+            for i in xrange(N):
+                # jump to task `glet`
+                # with instructions to report results back to this loop `g`
+                _vm, _gself, _dest, _args, _kwargs = vm.switch(glet, gself, args, kwargs)
+                buf.append(_args[0])
+                assert len(_args)==1
+                assert _kwargs=={}
+                assert _gself is gself
+                assert _dest is None # instructions can't tell us where to jump
+            buf = numpy.asarray(buf)
+            vm, gself, dest, args, kwargs = vm.switch(dest, None, (buf,), {})
+    return greenlet(BUFFER_loop)
+
+def CALL(fn):
     """
     Create a greenlet whose first argument is the return-jump location.
 
     fn must accept as the first positional argument this greenlet itself, which can be used as
     the return-jump location for internal greenlet switches (ideally using gswitch).
     """
-    def service_loop(vm, gself, dest, args, kwargs):
+    def CALL_loop(vm, gself, dest, args, kwargs):
         while True:
-            #print 'service calling', fn.__name__, args, kwargs
-            t = fn(vm, gself, *args, **kwargs)
+            #print 'CALL calling', fn.__name__, args, kwargs
+            t = fn(*args, **kwargs)
             #TODO consider a protocol for returning args, kwargs
             if t is None:
                 _vm,_gself,dest, args, kwargs = vm.switch(dest, None, (), {})
@@ -87,32 +183,59 @@
                 _vm,_gself,dest, args, kwargs = vm.switch(dest, None, (t,), {})
 
             assert gself is _gself
-    return greenlet(service_loop)
+    return greenlet(CALL_loop)
 
 ####################################################
+# Components involved in the learning process
 
 class Dataset(object):
     def __init__(self, data):
         self.pos = 0
         self.data = data
-    def next(self, vm, gself):
+    def next(self):
         rval = self.data[self.pos]
         self.pos += 1
         if self.pos == len(self.data):
             self.pos = 0
         return rval
+    def seek(self, pos):
+        self.pos = pos
+
+class KFold(object):
+    def __init__(self, data, K):
+        self.data = data
+        self.k = -1
+        self.scores = [None]*K
+        self.K = K
+    def next_fold(self):
+        self.k += 1
+        self.data.seek(0) # restart the stream
+    def next(self):
+        #TODO: skip the examples that are ommitted in this split
+        return self.data.next()
+    def init_test(self):
+        pass
+    def next_test(self):
+        return self.data.next()
+    def test_size(self):
+        return 5
+    def store_scores(self, scores):
+        self.scores[self.k] = scores
 
 class PCA_Analysis(object):
     def __init__(self):
+        self.clear()
+
+    def clear(self):
         self.mean = 0
         self.eigvecs=0
         self.eigvals=0
-    def analyze(self, me, X):
+    def analyze(self, X):
         self.mean = X.mean(axis=0)
         self.eigvecs=1
         self.eigvals=1
-    def filt(self,me, X):
-        return (self.X - self.mean) * self.eigvecs #TODO: divide by root eigvals?
+    def filt(self, X):
+        return (X - self.mean) * self.eigvecs #TODO: divide by root eigvals?
     def pseudo_inverse(self, Y):
         return Y
 
@@ -121,101 +244,82 @@
         self.w = w
     def filt(self, x):
         return self.w*x
-
-def batches(src, N):
-    # src is a service
-    def rval(me):
-        print 'batches src=', src, 'me=', me
-        return numpy.asarray([gswitch(src, me)[0][0] for i in range(N)])
-    return rval
+    def clear(self):
+        self.w =0
 
 def print_obj(vm, gself, obj):
     print obj
 def no_op(*args, **kwargs):
     pass
 
-def build_pca_trainer(data_src, pca_module, N):
-    return greenlet(
-                batches(
-                    N=5,
-                    src=inf_data,
-                    dest=flow(pca_module.analyze,
-                        dest=layer1_trainer)))
+class cd1_update(object):
+    def __init__(self, layer, lr):
+        self.layer = layer
+        self.lr = lr
+
+    def __call__(self, X):
+        # update self.layer from observation X
+        print 'cd1', X
+        print X.mean()
+        self.layer.w += X.mean() * self.lr #TODO: not exactly correct math
 
 def main():
-    dataset = Dataset(numpy.random.RandomState(123).randn(10,2))
-
-    prog=repeat(3, [service(dataset.next),service(print_obj)])
-    vm_run(prog)
-    vm_run(prog)
-
-
-def main_arch():
-
     # create components
-    dataset = Dataset(numpy.random.RandomState(123).randn(10,2))
-    pca_module = PCA_Analysis()
+    dataset = Dataset(numpy.random.RandomState(123).randn(13,1))
+    pca = PCA_Analysis()
     layer1 = Layer(w=4)
     layer2 = Layer(w=3)
     kf = KFold(dataset, K=10)
 
     # create algorithm
 
-    train_pca = seq([ np_batch(kf.next, 1000), pca.analyze])
-    train_layer1 = repeat(100, [kf.next, pca.filt, cd1_update(layer1, lr=.01)])
+    train_pca = SEQ([
+        BUFFER(1000, CALL(kf.next)), 
+        CALL(pca.analyze)])
+
+    train_layer1 = REPEAT(10, [
+        BUFFER(10, CALL(kf.next)),
+        CALL(pca.filt), 
+        CALL(cd1_update(layer1, lr=.01))])
+
+    train_layer2 = REPEAT(10, [
+        BUFFER(10, CALL(kf.next)),
+        CALL(pca.filt), 
+        CALL(layer1.filt),
+        CALL(cd1_update(layer2, lr=.01))])
+
+    def print_layer_w(*a,**kw):
+        print layer1.w
 
-    algo = repeat(10, [
-        KFold.step,
-        seq([train_pca,
-            train_layer1,
-            train_layer2,
-            train_classifier,
-            save_classifier,
-            test_classifier]),
-        KFold.set_score])
+    train_prog = SEQ([
+        train_pca,
+        WEAVE([
+            train_layer1, 
+            LOOP(CALL(print_layer_w))]),
+        train_layer2,
+        ])
 
-    gswitch(algo)
+    kfold_prog = REPEAT(10, [
+        CALL(kf.next_fold),
+        CALL(pca.clear),
+        CALL(layer1.clear),
+        CALL(layer2.clear),
+        train_prog,
+        CALL(kf.init_test),
+        BUFFER(kf.test_size(),
+            SEQ([
+                CALL(kf.next_test),  
+                CALL(pca.filt),       # may want to allow this SEQ to be 
+                CALL(layer1.filt),    # optimized into a shorter one that
+                CALL(layer2.filt),
+                CALL(numpy.mean)])), # chains together theano graphs
+        CALL(kf.store_scores),
+        ])
+
+    vm_run(kfold_prog, n_steps=500)
+    print kf.scores
 
 
-def main1():
-    dataset = Dataset(numpy.random.RandomState(123).randn(10,2))
-    pca_module = PCA_Analysis()
-
-    # pca
-    next_data = service(dataset.next)
-    b5 = service(batches(src=next_data, N=5))
-    print_pca_analyze = flow(pca_module.analyze, dest=sink(print_obj))
-
-    # layer1_training
-    layer1_training = driver(
-            fn=cd1_trainer(layer1),
-            srcs=[],
-            )
-
-    gswitch(b5, print_pca_analyze)
-    
 if __name__ == '__main__':
     sys.exit(main())
 
-
-
-def flow(fn, dest):
-    def rval(*args, **kwargs):
-        while True:
-            print 'flow calling', fn.__name__, args, kwargs
-            t = fn(g, *args, **kwargs)
-            args, kwargs = gswitch(dest, t)
-    g = greenlet(rval)
-    return g
-
-def sink(fn):
-    def rval(*args, **kwargs):
-        return fn(g, *args, **kwargs)
-    g = greenlet(rval)
-    return g
-
-def consumer(fn, src):
-    def rval(*args, **kwargs):
-        while True:
-            fn(gswitch(src, *args, **kwargs))
-    return greenlet(rval)