view doc/v2_planning/plugin_JB.py @ 1200:acfd5e747a75

v2planning - a few changes to plugin proposals
author James Bergstra <bergstrj@iro.umontreal.ca>
date Mon, 20 Sep 2010 11:28:23 -0400
parents 98954d8cb92d
children 865936d8221b
line wrap: on
line source

"""plugin_JB - draft of potential library architecture using iterators

This strategy makes use of a simple imperative language whose statements are python function
calls to create learning algorithms that can be manipulated and executed in several desirable
ways.  

The training procedure for a PCA module is easy to express:

    # allocate the relevant modules
    dataset = Dataset(numpy.random.RandomState(123).randn(13,1))
    pca = PCA_Analysis()
    pca_batchsize=1000

    # define the control-flow of the algorithm
    train_pca = SEQ([
        BUFFER_REPEAT(pca_batchsize, CALL(dataset.next)), 
        FILT(pca.analyze)])

    # run the program
    VirtualMachine(train_pca).run()

The CALL, SEQ, FILT, and BUFFER_REPEAT are control-flow elements. The control-flow elements I
defined so far are:

- CALL - a basic statement, just calls a python function
- FILT - like call, but passes the return value of the last CALL or FILT to the python function
- SEQ - a sequence of elements to run in order
- REPEAT - do something N times (and return None or maybe the last CALL?)
- BUFFER_REPEAT - do something N times and accumulate the return value from each iter
- LOOP - do something an infinite number of times
- CHOOSE - like a switch statement (should rename to SWITCH)
- WEAVE - interleave execution of multiple control-flow elements


We don't have many requirements per-se for the architecture, but I think this design respects
and realizes all of them.
The advantages of this approach are:

    - algorithms (including partially run ones) are COPYABLE, and SERIALIZABLE

    - algorithms can be executed without seizing control of the python process (the VM is an
      iterator) so your main loop (aka alternate VM implementation) can be checking for network
      or filesystem events related to job management

    - the library can provide learning algorithms via control-flow templates, and the user can
      edit them (with search/replace calls) to include HOOKS, and DIAGNOSTIC plug-in
      functionality

      e.g. prog.find(CALL(cd1_update, layer=layer1)).replace_with(
          SEQ([CALL(cd1_update, layer=layer1), CALL(my_debugfn)]))

    - user can print the 'program code' of an algorithm built from library pieces

    - program can be optimized automatically.
      
      - e.g. BUFFER(N, CALL(dataset.next))  could be replaced if dataset.next implements the
        right attribute/protocol for 'bufferable' or something.

      - e.g. SEQ([a,b,c,d])  could be compiled to a single CALL to a Theano-compiled function
        if a, b, c, and d are calls to callable objects that export something like a
        'theano_SEQ' interface


"""

__license__ = 'TODO'
__copyright__ = 'TODO'

import copy, sys, cPickle
import numpy

###################################################
# Virtual Machine for executing programs

class VirtualMachine(object):
    def __init__(self, prog):
        self.prog = prog
        self.started = False
        self.finished=False
    def __iter__(self):
        assert not self.started
        self.prog.start(None)
        self.started = True
        return self
    def next(self):
        if self.finished:
            raise StopIteration()
        r = self.prog.step()
        if r is INCOMPLETE:
            return r
        else:
            self.finished=True
            return r
    def run(self,n_steps=float('inf')):
        i = 0
        for r in self:
            i += 1
            if i > n_steps:
                break
        return r


####################################################
# CONTROL-FLOW CONSTRUCTS

class INCOMPLETE: 
    """Return value for Element.step"""

class ELEMENT(object):
    """
    every execution block has a driver

    the driver calls start when entering a new control element
       - this would be called once per e.g. outer loop iteration

    the driver calls step to advance the control element
       - which returns INCOMPLETE
       - which returns any other object to indicate completion
    """

    def start(self, arg):
        pass
    def step(self):
        pass

class BUFFER_REPEAT(ELEMENT):
    """
    Accumulate a number of return values into one list / array.

    The source of return values `src` is a control element that will be restarted repeatedly in
    order to fulfil the requiement of gathering N samples.

    TODO: support accumulating of tuples of arrays
    """
    def __init__(self, N, src, storage=None):
        """
        TODO: use preallocated `storage`
        """
        self.N = N
        self.n = 0
        self.src = src
        self.storage = storage
        self.src.start(None)
        if self.storage != None:
            raise NotImplementedError()
    def start(self, arg):
        self.buf = [None] * self.N
        self.n = 0
        self.finished = False
    def step(self):
        assert not self.finished
        r = self.src.step()
        if r is INCOMPLETE:
            return r
        self.src.start(None) # restart our stream
        self.buf[self.n] = r
        self.n += 1
        if self.n == self.N:
            self.finished = True
            return self.buf
        else:
            return INCOMPLETE
        assert 0

class CALL(ELEMENT):
    """
    Control flow terminal - call a python function or method.

    Returns the return value of the call.
    """
    def __init__(self, fn, *args, **kwargs):
        self.fn = fn
        self.args = args
        self.kwargs=kwargs
        self.use_start_arg = kwargs.pop('use_start_arg', False)
    def start(self, arg):
        self.start_arg = arg
        self.finished = False
        return self
    def step(self):
        assert not self.finished
        self.finished = True
        if self.use_start_arg:
            if self.args:
                raise TypeError('cant get positional args both ways')
            return self.fn(self.start_arg, **self.kwargs)
        else:
            return self.fn(*self.args, **self.kwargs)
    def __getstate__(self):
        rval = dict(self.__dict__)
        if type(self.fn) is type(self.step): #instancemethod
            fn = rval.pop('fn')
            rval['i fn'] = fn.im_func, fn.im_self, fn.im_class
        return rval
    def __setstate__(self, dct):
        if 'i fn' in dct:
            dct['fn'] = type(self.step)(*dct.pop('i fn'))
        self.__dict__.update(dct)

def FILT(fn, **kwargs):
    """
    Return a CALL object that uses the return value from the previous CALL as the first and
    only positional argument.
    """
    return CALL(fn, use_start_arg=True, **kwargs)

def CHOOSE(which, options):
    """
    Execute one out of a number of optional control flow paths
    """
    raise NotImplementedError()

def LOOP(elements):
    #TODO: implement a true infinite loop
    try:
        iter(elements)
        return REPEAT(sys.maxint, elements)
    except TypeError:
        return REPEAT(sys.maxint, [elements])

class REPEAT(ELEMENT):
    def __init__(self, N, elements, pass_rvals=False):
        self.N = N
        self.elements = elements
        self.pass_rvals = pass_rvals
    #TODO: check for N being callable
    def start(self, arg):
        self.n = 0   #loop iteration
        self.idx = 0 #element idx
        self.finished = False
        self.elements[0].start(arg)
    def step(self):
        assert not self.finished
        r = self.elements[self.idx].step()
        if r is INCOMPLETE:
            return INCOMPLETE
        self.idx += 1
        if self.idx < len(self.elements):
            self.elements[self.idx].start(r)
            return INCOMPLETE
        self.n += 1
        if self.n < self.N:
            self.idx = 0
            self.elements[self.idx].start(r)
            return INCOMPLETE
        else:
            self.finished = True
            return r

def SEQ(elements):
    return REPEAT(1, elements)

class WEAVE(ELEMENT):
    """
    Interleave execution of a number of elements.

    TODO: allow a schedule (at least relative frequency) of elements from each program
    """
    def __init__(self, elements):
        self.elements = elements
    def start(self, arg):
        for el in self.elements:
            el.start(arg)
        self.idx = 0
        self.any_is_finished = False
        self.finished= False 
    def step(self):
        assert not self.finished # if this is triggered, we have a broken driver
        self.idx = self.idx % len(self.elements)
        r = self.elements[self.idx].step()
        if r is not INCOMPLETE:
            self.any_is_finished = True
        self.idx += 1
        if self.idx == len(self.elements) and self.any_is_finished:
            self.finished = True
            return None # dummy completion value
        else:
            return INCOMPLETE


####################################################
# [Dummy] Components involved in learning algorithms

class Dataset(object):
    def __init__(self, data):
        self.pos = 0
        self.data = data
    def next(self):
        rval = self.data[self.pos]
        self.pos += 1
        if self.pos == len(self.data):
            self.pos = 0
        return rval
    def seek(self, pos):
        self.pos = pos

class KFold(object):
    def __init__(self, data, K):
        self.data = data
        self.k = -1
        self.scores = [None]*K
        self.K = K
    def next_fold(self):
        self.k += 1
        self.data.seek(0) # restart the stream
    def next(self):
        #TODO: skip the examples that are ommitted in this split
        return self.data.next()
    def init_test(self):
        pass
    def next_test(self):
        return self.data.next()
    def test_size(self):
        return 5
    def store_scores(self, scores):
        self.scores[self.k] = scores

    def prog(self, clear, train, test):
        return REPEAT(self.K, [
            CALL(self.next_fold),
            clear,
            train,
            CALL(self.init_test),
            BUFFER_REPEAT(self.test_size(),
                SEQ([ CALL(self.next_test), test])),
            FILT(self.store_scores) ])

class PCA_Analysis(object):
    def __init__(self):
        self.clear()

    def clear(self):
        self.mean = 0
        self.eigvecs=0
        self.eigvals=0
    def analyze(self, X):
        self.mean = numpy.mean(X, axis=0)
        self.eigvecs=1
        self.eigvals=1
    def filt(self, X):
        return (X - self.mean) * self.eigvecs #TODO: divide by root eigvals?
    def pseudo_inverse(self, Y):
        return Y

class Layer(object):
    def __init__(self, w):
        self.w = w
    def filt(self, x):
        return self.w*x
    def clear(self):
        self.w =0

def print_obj(obj):
    print obj
def print_obj_attr(obj, attr):
    print getattr(obj, attr)
def no_op(*args, **kwargs):
    pass

def cd1_update(X, layer, lr):
    # update self.layer from observation X
    layer.w += X.mean() * lr #TODO: not exactly correct math!

def simple_main():

    l = [0]
    def f(a):
        print l
        l[0] += a
        return l[0]

    print VirtualMachine(WEAVE([
        BUFFER_REPEAT(3,CALL(f,1)),
        BUFFER_REPEAT(5,CALL(f,1)),
        ])).run()

def main():
    # create components
    dataset = Dataset(numpy.random.RandomState(123).randn(13,1))
    pca = PCA_Analysis()
    layer1 = Layer(w=4)
    layer2 = Layer(w=3)
    kf = KFold(dataset, K=10)

    pca_batchsize=1000
    cd_batchsize = 5
    n_cd_updates_layer1 = 10
    n_cd_updates_layer2 = 10

    # create algorithm

    train_pca = SEQ([
        BUFFER_REPEAT(pca_batchsize, CALL(kf.next)), 
        FILT(pca.analyze)])

    train_layer1 = REPEAT(n_cd_updates_layer1, [
        BUFFER_REPEAT(cd_batchsize, CALL(kf.next)),
        FILT(pca.filt), 
        FILT(cd1_update, layer=layer1, lr=.01)])

    train_layer2 = REPEAT(n_cd_updates_layer2, [
        BUFFER_REPEAT(cd_batchsize, CALL(kf.next)),
        FILT(pca.filt), 
        FILT(layer1.filt),
        FILT(cd1_update, layer=layer2, lr=.01)])

    kfold_prog = kf.prog(
            clear = SEQ([   # FRAGMENT 1: this bit is the reset/clear stage
                CALL(pca.clear),
                CALL(layer1.clear),
                CALL(layer2.clear),
                ]),
            train = SEQ([
                train_pca,
                WEAVE([    # Silly example of how to do debugging / loggin with WEAVE
                    train_layer1, 
                    LOOP(CALL(print_obj_attr, layer1, 'w'))]),
                train_layer2,
                ]),
            test=SEQ([
                FILT(pca.filt),       # may want to allow this SEQ to be 
                FILT(layer1.filt),    # optimized into a shorter one that
                FILT(layer2.filt),    # compiles these calls together with 
                FILT(numpy.mean)]))   # Theano

    pkg1 = dict(prog=kfold_prog, kf=kf)
    pkg2 = copy.deepcopy(pkg1)       # programs can be copied

    try:
        pkg3 = cPickle.loads(cPickle.dumps(pkg1)) 
    except:
        print >> sys.stderr, "pickling doesnt work, but it can be fixed I think"

    pkg = pkg2

    # running a program updates the variables in its package, but not the other package
    VirtualMachine(pkg['prog']).run()
    print pkg['kf'].scores


if __name__ == '__main__':
    sys.exit(main())