view doc/v2_planning/plugin_greenlet.py @ 1197:a60b3472c4ba

more progress on greenlets
author James Bergstra <bergstrj@iro.umontreal.ca>
date Sun, 19 Sep 2010 23:49:24 -0400
parents e9bb3340a870
children acfd5e747a75
line wrap: on
line source

"""plugin_greenlet - draft of library architecture using greenlets"""


"""

- PICKLABLE - algorithms are serializable at all points during execution

- ITERATOR walks through algorithms with fine granularity

- COMPONENTS - library provides components on which programs operate

- ALGORITHMS - library provides algorithms in clean (no hooks) form

- HOOKS - user can insert print / debug logic with search/replace type calls 
  e.g. prog.find(CALL(cd1_update)).replace_with(SEQ([CALL(cd1_update), CALL(debugfn)]))

- PRINTING - user can print the 'program code' of an algorithm built from library pieces

- MODULAR EXPERIMENTS - an experiment object with one (or more?) programs and all of the objects referred to by
  those programs.  It is the preferred type of object to be serialized.  The main components of
  the algorithms should be top-level attributes of the package.  This object can be serialized
  and loaded in another process to implement job migration.

- OPTIMIZATION - program can be optimized automatically
    e.g. BUFFER(N, CALL(dataset.next))   can be replaced if dataset.next implements the right
    attribute/protocol for 'bufferable' or something.

    e.g. SEQ([a,b,c,d])  can be compiled with Theano if sub-sequence is compatible

- don't need greenlets to get efficiency, the implementations of control flow ops can manage a
  stack or stack tree in the vm (like greenlets do I think) we don't really need
  greenlets/stackless I don't think

"""

__license__ = None
__copyright__ = None

import copy, sys

import numpy
from greenlet import greenlet

def vm_unpack(incoming):
    # can't reliably distinguish between a kwargs-only switch and a switch with one dict
    # argument
    if incoming is None:
        rval = (), {}
    if isinstance(incoming, dict):
        rval = (), incoming
    elif isinstance(incoming, tuple):
        if (len(incoming)==2 
                and isinstance(incoming[0], tuple)
                and isinstance(incoming[1], dict)):
            rval = incoming
        else:
            rval = incoming, {}
    else:
        rval = (incoming,), {}
    #print 'unpack', incoming, rval
    return rval[0][0], rval[0][1], rval[0][2:], rval[1]

def unpack_from_vm(incoming):
    assert isinstance(incoming, tuple)
    assert len(incoming)==4
    return incoming

def vm_run(prog, *args, **kwargs):
    #TODO: make this into a class with different ways to start the loop
    #      for example, if the (gr, dest, a, kw) tuple is returned, 
    #      then a program could be run for N steps and then paused,
    #      saved, and restarted.
    n_steps = kwargs.pop('n_steps', float('inf'))
    def vm_loop(gr, dest, a, kw):
        loop_iter = 0
        while loop_iter < n_steps:
            if gr == 'return':
                break
            #print 'vm_loop gr=',gr,'args=',a, 'kwargs=', kw
            gr, dest, a, kw = gr.switch(vm, gr, dest, a, kw)
            #print 'gmain incoming', incoming
            loop_iter += 1
        # permit restarting
        return gr, dest, a, kw
    vm = greenlet(vm_loop)
    return vm.switch(prog, 'return', args, kwargs)

####################################################
# CONTROL-FLOW CONSTRUCTS

def SEQ(glets):
    return REPEAT(1, glets)

def REPEAT(N, glets):
    def repeat_task(vm, gself, dest, args, kwargs):
        while True:
            for i in xrange(N):
                for glet in glets:
                    #print 'repeat_task_i dest=%(dest)s args=%(args)s, kw=%(kwargs)s'%locals()
                    # jump to task `glet`
                    # with instructions to report results back to this loop `g`
                    _vm, _gself, _dest, args, kwargs = vm.switch(glet, gself, args, kwargs)
                    assert _gself is gself
                    assert _dest is None # instructions can't tell us where to jump
            vm, gself, dest, args, kwargs = vm.switch(dest, None, args, kwargs)
    return greenlet(repeat_task)

def LOOP(seq):
    #TODO: implement a true infinite loop
    try:
        iter(seq)
        return REPEAT(sys.maxint, seq)
    except TypeError:
        return REPEAT(sys.maxint, [seq])

def CHOOSE(which, options):
    raise NotImplementedError()

def WEAVE(threads): 
    def weave_task(vm, gself, dest, args, kwargs):
        # weave works by telling its threads that *it* is the vm
        # and reporting back to the real vm indirectly
        while True: # execution of weave is an iteration through this loop
            # initially broadcast the args and kwargs to all threads
            all_threads_live = True
            thread_info = [(t, 'return', args, kwargs) for t in threads]
            #print 'weave start -------------'
            while all_threads_live:
                #print 'weave iter'
                for i in xrange(len(threads)):
                    t_next, t_dest, t_args, t_kwargs = thread_info[i]
                    #tell the vm we're up to something, but ask it to come right back
                    #print 'weave 1'
                    _ignore = vm.switch(gself, None, (), {})

                    # pretend we're the vm_loop and tell the
                    # thread to advance by one and report back to us
                    #print 'weave 2', thread_info[i]
                    thread_info[i] = t_next.switch(gself, t_next, t_dest, t_args, t_kwargs)
                    #print 'weave 3', thread_info[i]
                    if thread_info[i][0] is 'return':
                        #print 'thread has finished', i
                        all_threads_live = False
                        
            # some thread has died so we return control to parent
            #####print 'weave returning', dest, args, kwargs
            vm, gself, dest, args, kwargs = vm.switch(dest, None, args, kwargs)
            #####print 'weave continuing', dest, args, kwargs
    return greenlet(weave_task)

def BUFFER(N, glet):
    def BUFFER_loop(vm, gself, dest, args, kwargs):
        while True: #body runs once per execution
            buf = []
            for i in xrange(N):
                # jump to task `glet`
                # with instructions to report results back to this loop `g`
                _vm, _gself, _dest, _args, _kwargs = vm.switch(glet, gself, args, kwargs)
                buf.append(_args[0])
                assert len(_args)==1
                assert _kwargs=={}
                assert _gself is gself
                assert _dest is None # instructions can't tell us where to jump
            buf = numpy.asarray(buf)
            vm, gself, dest, args, kwargs = vm.switch(dest, None, (buf,), {})
    return greenlet(BUFFER_loop)

def CALL(fn):
    """
    Create a greenlet whose first argument is the return-jump location.

    fn must accept as the first positional argument this greenlet itself, which can be used as
    the return-jump location for internal greenlet switches (ideally using gswitch).
    """
    def CALL_loop(vm, gself, dest, args, kwargs):
        while True:
            #print 'CALL calling', fn.__name__, args, kwargs
            t = fn(*args, **kwargs)
            #TODO consider a protocol for returning args, kwargs
            if t is None:
                _vm,_gself,dest, args, kwargs = vm.switch(dest, None, (), {})
            else:
                _vm,_gself,dest, args, kwargs = vm.switch(dest, None, (t,), {})

            assert gself is _gself
    return greenlet(CALL_loop)

####################################################
# Components involved in the learning process

class Dataset(object):
    def __init__(self, data):
        self.pos = 0
        self.data = data
    def next(self):
        rval = self.data[self.pos]
        self.pos += 1
        if self.pos == len(self.data):
            self.pos = 0
        return rval
    def seek(self, pos):
        self.pos = pos

class KFold(object):
    def __init__(self, data, K):
        self.data = data
        self.k = -1
        self.scores = [None]*K
        self.K = K
    def next_fold(self):
        self.k += 1
        self.data.seek(0) # restart the stream
    def next(self):
        #TODO: skip the examples that are ommitted in this split
        return self.data.next()
    def init_test(self):
        pass
    def next_test(self):
        return self.data.next()
    def test_size(self):
        return 5
    def store_scores(self, scores):
        self.scores[self.k] = scores

class PCA_Analysis(object):
    def __init__(self):
        self.clear()

    def clear(self):
        self.mean = 0
        self.eigvecs=0
        self.eigvals=0
    def analyze(self, X):
        self.mean = X.mean(axis=0)
        self.eigvecs=1
        self.eigvals=1
    def filt(self, X):
        return (X - self.mean) * self.eigvecs #TODO: divide by root eigvals?
    def pseudo_inverse(self, Y):
        return Y

class Layer(object):
    def __init__(self, w):
        self.w = w
    def filt(self, x):
        return self.w*x
    def clear(self):
        self.w =0

def print_obj(vm, gself, obj):
    print obj
def no_op(*args, **kwargs):
    pass

class cd1_update(object):
    def __init__(self, layer, lr):
        self.layer = layer
        self.lr = lr

    def __call__(self, X):
        # update self.layer from observation X
        print 'cd1', X
        print X.mean()
        self.layer.w += X.mean() * self.lr #TODO: not exactly correct math

def main():
    # create components
    dataset = Dataset(numpy.random.RandomState(123).randn(13,1))
    pca = PCA_Analysis()
    layer1 = Layer(w=4)
    layer2 = Layer(w=3)
    kf = KFold(dataset, K=10)

    # create algorithm

    train_pca = SEQ([
        BUFFER(1000, CALL(kf.next)), 
        CALL(pca.analyze)])

    train_layer1 = REPEAT(10, [
        BUFFER(10, CALL(kf.next)),
        CALL(pca.filt), 
        CALL(cd1_update(layer1, lr=.01))])

    train_layer2 = REPEAT(10, [
        BUFFER(10, CALL(kf.next)),
        CALL(pca.filt), 
        CALL(layer1.filt),
        CALL(cd1_update(layer2, lr=.01))])

    def print_layer_w(*a,**kw):
        print layer1.w

    train_prog = SEQ([
        train_pca,
        WEAVE([
            train_layer1, 
            LOOP(CALL(print_layer_w))]),
        train_layer2,
        ])

    kfold_prog = REPEAT(10, [
        CALL(kf.next_fold),
        CALL(pca.clear),
        CALL(layer1.clear),
        CALL(layer2.clear),
        train_prog,
        CALL(kf.init_test),
        BUFFER(kf.test_size(),
            SEQ([
                CALL(kf.next_test),  
                CALL(pca.filt),       # may want to allow this SEQ to be 
                CALL(layer1.filt),    # optimized into a shorter one that
                CALL(layer2.filt),
                CALL(numpy.mean)])), # chains together theano graphs
        CALL(kf.store_scores),
        ])

    vm_run(kfold_prog, n_steps=500)
    print kf.scores


if __name__ == '__main__':
    sys.exit(main())