view doc/v2_planning/plugin_greenlet.py @ 1274:9d5905d6d879

hmc - changed updates to member fn from lambda for pickling
author James Bergstra <bergstrj@iro.umontreal.ca>
date Wed, 08 Sep 2010 13:17:45 -0400
parents acfd5e747a75
children
line wrap: on
line source

"""plugin_greenlet - draft of library architecture using greenlets

HISTORICAL - NOT ACTUALLY A PROPOSAL
====================================

This was the original approach for what I renamed to plugin_JB, until I realized that I could
get the end result without using greenlets at all.

Still, greenlets seem like they could be neat and making this program stretched my mind so I
keep it.  There's something wrong when you run with the kfold validation, but until that point
I think it works.

"""

__license__ = None
__copyright__ = None

import copy, sys

import numpy
from greenlet import greenlet

def vm_unpack(incoming):
    # can't reliably distinguish between a kwargs-only switch and a switch with one dict
    # argument
    if incoming is None:
        rval = (), {}
    if isinstance(incoming, dict):
        rval = (), incoming
    elif isinstance(incoming, tuple):
        if (len(incoming)==2 
                and isinstance(incoming[0], tuple)
                and isinstance(incoming[1], dict)):
            rval = incoming
        else:
            rval = incoming, {}
    else:
        rval = (incoming,), {}
    #print 'unpack', incoming, rval
    return rval[0][0], rval[0][1], rval[0][2:], rval[1]

def unpack_from_vm(incoming):
    assert isinstance(incoming, tuple)
    assert len(incoming)==4
    return incoming

def vm_run(prog, *args, **kwargs):
    #TODO: make this into a class with different ways to start the loop
    #      for example, if the (gr, dest, a, kw) tuple is returned, 
    #      then a program could be run for N steps and then paused,
    #      saved, and restarted.
    n_steps = kwargs.pop('n_steps', float('inf'))
    def vm_loop(gr, dest, a, kw):
        loop_iter = 0
        while loop_iter < n_steps:
            if gr == 'return':
                break
            #print 'vm_loop gr=',gr,'args=',a, 'kwargs=', kw
            gr, dest, a, kw = gr.switch(vm, gr, dest, a, kw)
            #print 'gmain incoming', incoming
            loop_iter += 1
        # permit restarting
        return gr, dest, a, kw
    vm = greenlet(vm_loop)
    return vm.switch(prog, 'return', args, kwargs)

####################################################
# CONTROL-FLOW CONSTRUCTS

def SEQ(glets):
    return REPEAT(1, glets)

def REPEAT(N, glets):
    def repeat_task(vm, gself, dest, args, kwargs):
        while True:
            for i in xrange(N):
                for glet in glets:
                    #print 'repeat_task_i dest=%(dest)s args=%(args)s, kw=%(kwargs)s'%locals()
                    # jump to task `glet`
                    # with instructions to report results back to this loop `g`
                    _vm, _gself, _dest, args, kwargs = vm.switch(glet, gself, args, kwargs)
                    assert _gself is gself
                    assert _dest is None # instructions can't tell us where to jump
            vm, gself, dest, args, kwargs = vm.switch(dest, None, args, kwargs)
    return greenlet(repeat_task)

def LOOP(seq):
    #TODO: implement a true infinite loop
    try:
        iter(seq)
        return REPEAT(sys.maxint, seq)
    except TypeError:
        return REPEAT(sys.maxint, [seq])

def CHOOSE(which, options):
    raise NotImplementedError()

def WEAVE(threads): 
    def weave_task(vm, gself, dest, args, kwargs):
        # weave works by telling its threads that *it* is the vm
        # and reporting back to the real vm indirectly
        while True: # execution of weave is an iteration through this loop
            # initially broadcast the args and kwargs to all threads
            all_threads_live = True
            thread_info = [(t, 'return', args, kwargs) for t in threads]
            #print 'weave start -------------'
            while all_threads_live:
                #print 'weave iter'
                for i in xrange(len(threads)):
                    t_next, t_dest, t_args, t_kwargs = thread_info[i]
                    #tell the vm we're up to something, but ask it to come right back
                    #print 'weave 1'
                    _ignore = vm.switch(gself, None, (), {})

                    # pretend we're the vm_loop and tell the
                    # thread to advance by one and report back to us
                    #print 'weave 2', thread_info[i]
                    thread_info[i] = t_next.switch(gself, t_next, t_dest, t_args, t_kwargs)
                    #print 'weave 3', thread_info[i]
                    if thread_info[i][0] is 'return':
                        #print 'thread has finished', i
                        all_threads_live = False
                        
            # some thread has died so we return control to parent
            #####print 'weave returning', dest, args, kwargs
            vm, gself, dest, args, kwargs = vm.switch(dest, None, args, kwargs)
            #####print 'weave continuing', dest, args, kwargs
    return greenlet(weave_task)

def BUFFER(N, glet):
    def BUFFER_loop(vm, gself, dest, args, kwargs):
        while True: #body runs once per execution
            buf = []
            for i in xrange(N):
                # jump to task `glet`
                # with instructions to report results back to this loop `g`
                _vm, _gself, _dest, _args, _kwargs = vm.switch(glet, gself, args, kwargs)
                buf.append(_args[0])
                assert len(_args)==1
                assert _kwargs=={}
                assert _gself is gself
                assert _dest is None # instructions can't tell us where to jump
            buf = numpy.asarray(buf)
            vm, gself, dest, args, kwargs = vm.switch(dest, None, (buf,), {})
    return greenlet(BUFFER_loop)

def CALL(fn):
    """
    Create a greenlet whose first argument is the return-jump location.

    fn must accept as the first positional argument this greenlet itself, which can be used as
    the return-jump location for internal greenlet switches (ideally using gswitch).
    """
    def CALL_loop(vm, gself, dest, args, kwargs):
        while True:
            #print 'CALL calling', fn.__name__, args, kwargs
            t = fn(*args, **kwargs)
            #TODO consider a protocol for returning args, kwargs
            if t is None:
                _vm,_gself,dest, args, kwargs = vm.switch(dest, None, (), {})
            else:
                _vm,_gself,dest, args, kwargs = vm.switch(dest, None, (t,), {})

            assert gself is _gself
    return greenlet(CALL_loop)

####################################################
# Components involved in the learning process

class Dataset(object):
    def __init__(self, data):
        self.pos = 0
        self.data = data
    def next(self):
        rval = self.data[self.pos]
        self.pos += 1
        if self.pos == len(self.data):
            self.pos = 0
        return rval
    def seek(self, pos):
        self.pos = pos

class KFold(object):
    def __init__(self, data, K):
        self.data = data
        self.k = -1
        self.scores = [None]*K
        self.K = K
    def next_fold(self):
        self.k += 1
        self.data.seek(0) # restart the stream
    def next(self):
        #TODO: skip the examples that are ommitted in this split
        return self.data.next()
    def init_test(self):
        pass
    def next_test(self):
        return self.data.next()
    def test_size(self):
        return 5
    def store_scores(self, scores):
        self.scores[self.k] = scores

class PCA_Analysis(object):
    def __init__(self):
        self.clear()

    def clear(self):
        self.mean = 0
        self.eigvecs=0
        self.eigvals=0
    def analyze(self, X):
        self.mean = X.mean(axis=0)
        self.eigvecs=1
        self.eigvals=1
    def filt(self, X):
        return (X - self.mean) * self.eigvecs #TODO: divide by root eigvals?
    def pseudo_inverse(self, Y):
        return Y

class Layer(object):
    def __init__(self, w):
        self.w = w
    def filt(self, x):
        return self.w*x
    def clear(self):
        self.w =0

def print_obj(vm, gself, obj):
    print obj
def no_op(*args, **kwargs):
    pass

class cd1_update(object):
    def __init__(self, layer, lr):
        self.layer = layer
        self.lr = lr

    def __call__(self, X):
        # update self.layer from observation X
        print 'cd1', X
        print X.mean()
        self.layer.w += X.mean() * self.lr #TODO: not exactly correct math

def main():
    # create components
    dataset = Dataset(numpy.random.RandomState(123).randn(13,1))
    pca = PCA_Analysis()
    layer1 = Layer(w=4)
    layer2 = Layer(w=3)
    kf = KFold(dataset, K=10)

    # create algorithm

    train_pca = SEQ([
        BUFFER(1000, CALL(kf.next)), 
        CALL(pca.analyze)])

    train_layer1 = REPEAT(10, [
        BUFFER(10, CALL(kf.next)),
        CALL(pca.filt), 
        CALL(cd1_update(layer1, lr=.01))])

    train_layer2 = REPEAT(10, [
        BUFFER(10, CALL(kf.next)),
        CALL(pca.filt), 
        CALL(layer1.filt),
        CALL(cd1_update(layer2, lr=.01))])

    def print_layer_w(*a,**kw):
        print layer1.w

    train_prog = SEQ([
        train_pca,
        WEAVE([
            train_layer1, 
            LOOP(CALL(print_layer_w))]),
        train_layer2,
        ])

    kfold_prog = REPEAT(10, [
        CALL(kf.next_fold),
        CALL(pca.clear),
        CALL(layer1.clear),
        CALL(layer2.clear),
        train_prog,
        CALL(kf.init_test),
        BUFFER(kf.test_size(),
            SEQ([
                CALL(kf.next_test),  
                CALL(pca.filt),       # may want to allow this SEQ to be 
                CALL(layer1.filt),    # optimized into a shorter one that
                CALL(layer2.filt),
                CALL(numpy.mean)])), # chains together theano graphs
        CALL(kf.store_scores),
        ])

    vm_run(kfold_prog, n_steps=500)
    print kf.scores


if __name__ == '__main__':
    sys.exit(main())