view doc/v2_planning/plugin_JB.py @ 1198:1387771296a8

v2planning adding plugin_JB
author James Bergstra <bergstrj@iro.umontreal.ca>
date Mon, 20 Sep 2010 02:34:23 -0400
parents
children 98954d8cb92d
line wrap: on
line source

"""plugin_JB - draft of library architecture using iterators"""


"""

- PICKLABLE - algorithms are serializable at all points during execution

- ITERATOR walks through algorithms with fine granularity

- COMPONENTS - library provides components on which programs operate

- ALGORITHMS - library provides algorithms in clean (no hooks) form

- HOOKS - user can insert print / debug logic with search/replace type calls 
  e.g. prog.find(CALL(cd1_update)).replace_with(SEQ([CALL(cd1_update), CALL(debugfn)]))

- PRINTING - user can print the 'program code' of an algorithm built from library pieces

- MODULAR EXPERIMENTS - an experiment object with one (or more?) programs and all of the objects referred to by
  those programs.  It is the preferred type of object to be serialized.  The main components of
  the algorithms should be top-level attributes of the package.  This object can be serialized
  and loaded in another process to implement job migration.

- OPTIMIZATION - program can be optimized automatically
    e.g. BUFFER(N, CALL(dataset.next))   can be replaced if dataset.next implements the right
    attribute/protocol for 'bufferable' or something.

    e.g. SEQ([a,b,c,d])  can be compiled with Theano if sub-sequence is compatible

- don't need greenlets to get efficiency, the implementations of control flow ops can manage a
  stack or stack tree in the vm (like greenlets do I think) we don't really need
  greenlets/stackless I don't think

"""

__license__ = None
__copyright__ = None

import copy, sys, cPickle

import numpy


###################################################
# Virtual Machine for executing programs

class VirtualMachine(object):
    def __init__(self, prog):
        self.prog = prog
        self.started = False
        self.finished=False
    def __iter__(self):
        assert not self.started
        self.prog.start(None)
        self.started = True
        return self
    def next(self):
        if self.finished:
            raise StopIteration()
        r = self.prog.step()
        if r is INCOMPLETE:
            return r
        else:
            self.finished=True
            return r
    def run(self,n_steps=float('inf')):
        i = 0
        for r in self:
            i += 1
            if i > n_steps:
                break
        return r


####################################################
# CONTROL-FLOW CONSTRUCTS

class INCOMPLETE: 
    """Return value for Element.step"""

class ELEMENT(object):
    """
    every execution block has a driver

    the driver calls start when entering a new control element
       - this would be called once per e.g. outer loop iteration

    the driver calls step to advance the control element
       - which returns INCOMPLETE
       - which returns any other object to indicate completion
    """

    def start(self, arg):
        pass
    def step(self):
        pass


class BUFFER_REPEAT(ELEMENT):
    """
    Accumulate a number of return values into one list / array.

    The source of return values `src` is a control element that will be restarted repeatedly in
    order to fulfil the requiement of gathering N samples.

    TODO: support accumulating of tuples of arrays
    """
    def __init__(self, N, src, storage=None):
        """
        TODO: use preallocated `storage`
        """
        self.N = N
        self.n = 0
        self.src = src
        self.storage = storage
        self.src.start(None)
        if self.storage != None:
            raise NotImplementedError()
    def start(self, arg):
        self.buf = [None] * self.N
        self.n = 0
        self.finished = False
    def step(self):
        assert not self.finished
        r = self.src.step()
        if r is INCOMPLETE:
            return r
        self.src.start(None) # restart our stream
        self.buf[self.n] = r
        self.n += 1
        if self.n == self.N:
            self.finished = True
            return self.buf
        else:
            return INCOMPLETE
        assert 0

class CALL(ELEMENT):
    """
    Control flow terminal - call a python function or method.

    Returns the return value of the call.
    """
    def __init__(self, fn, *args, **kwargs):
        self.fn = fn
        self.args = args
        self.kwargs=kwargs
        self.use_start_arg = kwargs.pop('use_start_arg', False)
    def start(self, arg):
        self.start_arg = arg
        self.finished = False
        return self
    def step(self):
        assert not self.finished
        self.finished = True
        if self.use_start_arg:
            if self.args:
                raise TypeError('cant get positional args both ways')
            return self.fn(self.start_arg, **self.kwargs)
        else:
            return self.fn(*self.args, **self.kwargs)
    def __getstate__(self):
        rval = self.__dict__
        if type(self.fn) is type(self.step): #instancemethod
            fn = rval.pop('fn')
            rval['i fn'] = fn.im_func, fn.im_self, fn.im_class
        return rval
    def __setstate__(self, dct):
        if 'i fn' in dct:
            dct['fn'] = type(self.step)(*dct.pop('i fn'))
        self.__dict__.update(dct)

def FILT(*args, **kwargs):
    return CALL(use_start_arg=True, *args, **kwargs)

def CHOOSE(which, options):
    """
    Execute one out of a number of optional control flow paths
    """
    raise NotImplementedError()

def LOOP(elements):
    #TODO: implement a true infinite loop
    try:
        iter(elements)
        return REPEAT(sys.maxint, elements)
    except TypeError:
        return REPEAT(sys.maxint, [elements])

class REPEAT(ELEMENT):
    def __init__(self, N, elements, pass_rvals=False):
        self.N = N
        self.elements = elements
        self.pass_rvals = pass_rvals
    #TODO: check for N being callable
    def start(self, arg):
        self.n = 0   #loop iteration
        self.idx = 0 #element idx
        self.finished = False
        self.elements[0].start(arg)
    def step(self):
        assert not self.finished
        r = self.elements[self.idx].step()
        if r is INCOMPLETE:
            return INCOMPLETE
        self.idx += 1
        if self.idx < len(self.elements):
            self.elements[self.idx].start(r)
            return INCOMPLETE
        self.n += 1
        if self.n < self.N:
            self.idx = 0
            self.elements[self.idx].start(r)
            return INCOMPLETE
        else:
            self.finished = True
            return r

def SEQ(elements):
    return REPEAT(1, elements)

class WEAVE(ELEMENT):
    """
    Interleave execution of a number of elements.

    TODO: allow a schedule (at least relative frequency) of elements from each program
    """
    def __init__(self, elements):
        self.elements = elements
    def start(self, arg):
        for el in self.elements:
            el.start(arg)
        self.idx = 0
        self.any_is_finished = False
        self.finished= False 
    def step(self):
        assert not self.finished # if this is triggered, we have a broken driver
        self.idx = self.idx % len(self.elements)
        r = self.elements[self.idx].step()
        if r is not INCOMPLETE:
            self.any_is_finished = True
        self.idx += 1
        if self.idx == len(self.elements) and self.any_is_finished:
            self.finished = True
            return None # dummy completion value
        else:
            return INCOMPLETE


####################################################
# [Dummy] Components involved in learning algorithms

class Dataset(object):
    def __init__(self, data):
        self.pos = 0
        self.data = data
    def next(self):
        rval = self.data[self.pos]
        self.pos += 1
        if self.pos == len(self.data):
            self.pos = 0
        return rval
    def seek(self, pos):
        self.pos = pos

class KFold(object):
    def __init__(self, data, K):
        self.data = data
        self.k = -1
        self.scores = [None]*K
        self.K = K
    def next_fold(self):
        self.k += 1
        self.data.seek(0) # restart the stream
    def next(self):
        #TODO: skip the examples that are ommitted in this split
        return self.data.next()
    def init_test(self):
        pass
    def next_test(self):
        return self.data.next()
    def test_size(self):
        return 5
    def store_scores(self, scores):
        self.scores[self.k] = scores

class PCA_Analysis(object):
    def __init__(self):
        self.clear()

    def clear(self):
        self.mean = 0
        self.eigvecs=0
        self.eigvals=0
    def analyze(self, X):
        self.mean = numpy.mean(X, axis=0)
        self.eigvecs=1
        self.eigvals=1
    def filt(self, X):
        return (X - self.mean) * self.eigvecs #TODO: divide by root eigvals?
    def pseudo_inverse(self, Y):
        return Y

class Layer(object):
    def __init__(self, w):
        self.w = w
    def filt(self, x):
        return self.w*x
    def clear(self):
        self.w =0

def print_obj(obj):
    print obj
def print_obj_attr(obj, attr):
    print getattr(obj, attr)
def no_op(*args, **kwargs):
    pass

class cd1_update(object):
    def __init__(self, layer, lr):
        self.layer = layer
        self.lr = lr

    def __call__(self, X):
        # update self.layer from observation X
        self.layer.w += X.mean() * self.lr #TODO: not exactly correct math

def simple_main():

    l = [0]
    def f(a):
        print l
        l[0] += a
        return l[0]

    print VirtualMachine(WEAVE([
        BUFFER_REPEAT(3,CALL(f,1)),
        BUFFER_REPEAT(5,CALL(f,1)),
        ])).run()

def main():
    # create components
    dataset = Dataset(numpy.random.RandomState(123).randn(13,1))
    pca = PCA_Analysis()
    layer1 = Layer(w=4)
    layer2 = Layer(w=3)
    kf = KFold(dataset, K=10)

    # create algorithm

    train_pca = SEQ([
        BUFFER_REPEAT(1000, CALL(kf.next)), 
        FILT(pca.analyze)])

    train_layer1 = REPEAT(10, [
        BUFFER_REPEAT(10, CALL(kf.next)),
        FILT(pca.filt), 
        FILT(cd1_update(layer1, lr=.01))])

    train_layer2 = REPEAT(10, [
        BUFFER_REPEAT(10, CALL(kf.next)),
        FILT(pca.filt), 
        FILT(layer1.filt),
        FILT(cd1_update(layer2, lr=.01))])

    train_prog = SEQ([
        train_pca,
        WEAVE([
            train_layer1, 
            LOOP(CALL(print_obj_attr, layer1, 'w'))]),
        train_layer2,
        ])

    kfold_prog = REPEAT(10, [
        CALL(kf.next_fold),
        CALL(pca.clear),
        CALL(layer1.clear),
        CALL(layer2.clear),
        train_prog,
        CALL(kf.init_test),
        BUFFER_REPEAT(kf.test_size(),
            SEQ([
                CALL(kf.next_test),  
                FILT(pca.filt),       # may want to allow this SEQ to be 
                FILT(layer1.filt),    # optimized into a shorter one that
                FILT(layer2.filt),
                FILT(numpy.mean)])), # chains together theano graphs
        FILT(kf.store_scores),
        ])

    vm = VirtualMachine(kfold_prog)

    #vm2 = copy.deepcopy(vm)
    vm.run(n_steps=200000)
    print kf.scores


if __name__ == '__main__':
    sys.exit(main())