diff doc/v2_planning/plugin_JB.py @ 1198:1387771296a8

v2planning adding plugin_JB
author James Bergstra <bergstrj@iro.umontreal.ca>
date Mon, 20 Sep 2010 02:34:23 -0400
parents
children 98954d8cb92d
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/doc/v2_planning/plugin_JB.py	Mon Sep 20 02:34:23 2010 -0400
@@ -0,0 +1,400 @@
+"""plugin_JB - draft of library architecture using iterators"""
+
+
+"""
+
+- PICKLABLE - algorithms are serializable at all points during execution
+
+- ITERATOR walks through algorithms with fine granularity
+
+- COMPONENTS - library provides components on which programs operate
+
+- ALGORITHMS - library provides algorithms in clean (no hooks) form
+
+- HOOKS - user can insert print / debug logic with search/replace type calls 
+  e.g. prog.find(CALL(cd1_update)).replace_with(SEQ([CALL(cd1_update), CALL(debugfn)]))
+
+- PRINTING - user can print the 'program code' of an algorithm built from library pieces
+
+- MODULAR EXPERIMENTS - an experiment object with one (or more?) programs and all of the objects referred to by
+  those programs.  It is the preferred type of object to be serialized.  The main components of
+  the algorithms should be top-level attributes of the package.  This object can be serialized
+  and loaded in another process to implement job migration.
+
+- OPTIMIZATION - program can be optimized automatically
+    e.g. BUFFER(N, CALL(dataset.next))   can be replaced if dataset.next implements the right
+    attribute/protocol for 'bufferable' or something.
+
+    e.g. SEQ([a,b,c,d])  can be compiled with Theano if sub-sequence is compatible
+
+- don't need greenlets to get efficiency, the implementations of control flow ops can manage a
+  stack or stack tree in the vm (like greenlets do I think) we don't really need
+  greenlets/stackless I don't think
+
+"""
+
+__license__ = None
+__copyright__ = None
+
+import copy, sys, cPickle
+
+import numpy
+
+
+###################################################
+# Virtual Machine for executing programs
+
+class VirtualMachine(object):
+    def __init__(self, prog):
+        self.prog = prog
+        self.started = False
+        self.finished=False
+    def __iter__(self):
+        assert not self.started
+        self.prog.start(None)
+        self.started = True
+        return self
+    def next(self):
+        if self.finished:
+            raise StopIteration()
+        r = self.prog.step()
+        if r is INCOMPLETE:
+            return r
+        else:
+            self.finished=True
+            return r
+    def run(self,n_steps=float('inf')):
+        i = 0
+        for r in self:
+            i += 1
+            if i > n_steps:
+                break
+        return r
+
+
+####################################################
+# CONTROL-FLOW CONSTRUCTS
+
+class INCOMPLETE: 
+    """Return value for Element.step"""
+
+class ELEMENT(object):
+    """
+    every execution block has a driver
+
+    the driver calls start when entering a new control element
+       - this would be called once per e.g. outer loop iteration
+
+    the driver calls step to advance the control element
+       - which returns INCOMPLETE
+       - which returns any other object to indicate completion
+    """
+
+    def start(self, arg):
+        pass
+    def step(self):
+        pass
+
+
+class BUFFER_REPEAT(ELEMENT):
+    """
+    Accumulate a number of return values into one list / array.
+
+    The source of return values `src` is a control element that will be restarted repeatedly in
+    order to fulfil the requiement of gathering N samples.
+
+    TODO: support accumulating of tuples of arrays
+    """
+    def __init__(self, N, src, storage=None):
+        """
+        TODO: use preallocated `storage`
+        """
+        self.N = N
+        self.n = 0
+        self.src = src
+        self.storage = storage
+        self.src.start(None)
+        if self.storage != None:
+            raise NotImplementedError()
+    def start(self, arg):
+        self.buf = [None] * self.N
+        self.n = 0
+        self.finished = False
+    def step(self):
+        assert not self.finished
+        r = self.src.step()
+        if r is INCOMPLETE:
+            return r
+        self.src.start(None) # restart our stream
+        self.buf[self.n] = r
+        self.n += 1
+        if self.n == self.N:
+            self.finished = True
+            return self.buf
+        else:
+            return INCOMPLETE
+        assert 0
+
+class CALL(ELEMENT):
+    """
+    Control flow terminal - call a python function or method.
+
+    Returns the return value of the call.
+    """
+    def __init__(self, fn, *args, **kwargs):
+        self.fn = fn
+        self.args = args
+        self.kwargs=kwargs
+        self.use_start_arg = kwargs.pop('use_start_arg', False)
+    def start(self, arg):
+        self.start_arg = arg
+        self.finished = False
+        return self
+    def step(self):
+        assert not self.finished
+        self.finished = True
+        if self.use_start_arg:
+            if self.args:
+                raise TypeError('cant get positional args both ways')
+            return self.fn(self.start_arg, **self.kwargs)
+        else:
+            return self.fn(*self.args, **self.kwargs)
+    def __getstate__(self):
+        rval = self.__dict__
+        if type(self.fn) is type(self.step): #instancemethod
+            fn = rval.pop('fn')
+            rval['i fn'] = fn.im_func, fn.im_self, fn.im_class
+        return rval
+    def __setstate__(self, dct):
+        if 'i fn' in dct:
+            dct['fn'] = type(self.step)(*dct.pop('i fn'))
+        self.__dict__.update(dct)
+
+def FILT(*args, **kwargs):
+    return CALL(use_start_arg=True, *args, **kwargs)
+
+def CHOOSE(which, options):
+    """
+    Execute one out of a number of optional control flow paths
+    """
+    raise NotImplementedError()
+
+def LOOP(elements):
+    #TODO: implement a true infinite loop
+    try:
+        iter(elements)
+        return REPEAT(sys.maxint, elements)
+    except TypeError:
+        return REPEAT(sys.maxint, [elements])
+
+class REPEAT(ELEMENT):
+    def __init__(self, N, elements, pass_rvals=False):
+        self.N = N
+        self.elements = elements
+        self.pass_rvals = pass_rvals
+    #TODO: check for N being callable
+    def start(self, arg):
+        self.n = 0   #loop iteration
+        self.idx = 0 #element idx
+        self.finished = False
+        self.elements[0].start(arg)
+    def step(self):
+        assert not self.finished
+        r = self.elements[self.idx].step()
+        if r is INCOMPLETE:
+            return INCOMPLETE
+        self.idx += 1
+        if self.idx < len(self.elements):
+            self.elements[self.idx].start(r)
+            return INCOMPLETE
+        self.n += 1
+        if self.n < self.N:
+            self.idx = 0
+            self.elements[self.idx].start(r)
+            return INCOMPLETE
+        else:
+            self.finished = True
+            return r
+
+def SEQ(elements):
+    return REPEAT(1, elements)
+
+class WEAVE(ELEMENT):
+    """
+    Interleave execution of a number of elements.
+
+    TODO: allow a schedule (at least relative frequency) of elements from each program
+    """
+    def __init__(self, elements):
+        self.elements = elements
+    def start(self, arg):
+        for el in self.elements:
+            el.start(arg)
+        self.idx = 0
+        self.any_is_finished = False
+        self.finished= False 
+    def step(self):
+        assert not self.finished # if this is triggered, we have a broken driver
+        self.idx = self.idx % len(self.elements)
+        r = self.elements[self.idx].step()
+        if r is not INCOMPLETE:
+            self.any_is_finished = True
+        self.idx += 1
+        if self.idx == len(self.elements) and self.any_is_finished:
+            self.finished = True
+            return None # dummy completion value
+        else:
+            return INCOMPLETE
+
+
+####################################################
+# [Dummy] Components involved in learning algorithms
+
+class Dataset(object):
+    def __init__(self, data):
+        self.pos = 0
+        self.data = data
+    def next(self):
+        rval = self.data[self.pos]
+        self.pos += 1
+        if self.pos == len(self.data):
+            self.pos = 0
+        return rval
+    def seek(self, pos):
+        self.pos = pos
+
+class KFold(object):
+    def __init__(self, data, K):
+        self.data = data
+        self.k = -1
+        self.scores = [None]*K
+        self.K = K
+    def next_fold(self):
+        self.k += 1
+        self.data.seek(0) # restart the stream
+    def next(self):
+        #TODO: skip the examples that are ommitted in this split
+        return self.data.next()
+    def init_test(self):
+        pass
+    def next_test(self):
+        return self.data.next()
+    def test_size(self):
+        return 5
+    def store_scores(self, scores):
+        self.scores[self.k] = scores
+
+class PCA_Analysis(object):
+    def __init__(self):
+        self.clear()
+
+    def clear(self):
+        self.mean = 0
+        self.eigvecs=0
+        self.eigvals=0
+    def analyze(self, X):
+        self.mean = numpy.mean(X, axis=0)
+        self.eigvecs=1
+        self.eigvals=1
+    def filt(self, X):
+        return (X - self.mean) * self.eigvecs #TODO: divide by root eigvals?
+    def pseudo_inverse(self, Y):
+        return Y
+
+class Layer(object):
+    def __init__(self, w):
+        self.w = w
+    def filt(self, x):
+        return self.w*x
+    def clear(self):
+        self.w =0
+
+def print_obj(obj):
+    print obj
+def print_obj_attr(obj, attr):
+    print getattr(obj, attr)
+def no_op(*args, **kwargs):
+    pass
+
+class cd1_update(object):
+    def __init__(self, layer, lr):
+        self.layer = layer
+        self.lr = lr
+
+    def __call__(self, X):
+        # update self.layer from observation X
+        self.layer.w += X.mean() * self.lr #TODO: not exactly correct math
+
+def simple_main():
+
+    l = [0]
+    def f(a):
+        print l
+        l[0] += a
+        return l[0]
+
+    print VirtualMachine(WEAVE([
+        BUFFER_REPEAT(3,CALL(f,1)),
+        BUFFER_REPEAT(5,CALL(f,1)),
+        ])).run()
+
+def main():
+    # create components
+    dataset = Dataset(numpy.random.RandomState(123).randn(13,1))
+    pca = PCA_Analysis()
+    layer1 = Layer(w=4)
+    layer2 = Layer(w=3)
+    kf = KFold(dataset, K=10)
+
+    # create algorithm
+
+    train_pca = SEQ([
+        BUFFER_REPEAT(1000, CALL(kf.next)), 
+        FILT(pca.analyze)])
+
+    train_layer1 = REPEAT(10, [
+        BUFFER_REPEAT(10, CALL(kf.next)),
+        FILT(pca.filt), 
+        FILT(cd1_update(layer1, lr=.01))])
+
+    train_layer2 = REPEAT(10, [
+        BUFFER_REPEAT(10, CALL(kf.next)),
+        FILT(pca.filt), 
+        FILT(layer1.filt),
+        FILT(cd1_update(layer2, lr=.01))])
+
+    train_prog = SEQ([
+        train_pca,
+        WEAVE([
+            train_layer1, 
+            LOOP(CALL(print_obj_attr, layer1, 'w'))]),
+        train_layer2,
+        ])
+
+    kfold_prog = REPEAT(10, [
+        CALL(kf.next_fold),
+        CALL(pca.clear),
+        CALL(layer1.clear),
+        CALL(layer2.clear),
+        train_prog,
+        CALL(kf.init_test),
+        BUFFER_REPEAT(kf.test_size(),
+            SEQ([
+                CALL(kf.next_test),  
+                FILT(pca.filt),       # may want to allow this SEQ to be 
+                FILT(layer1.filt),    # optimized into a shorter one that
+                FILT(layer2.filt),
+                FILT(numpy.mean)])), # chains together theano graphs
+        FILT(kf.store_scores),
+        ])
+
+    vm = VirtualMachine(kfold_prog)
+
+    #vm2 = copy.deepcopy(vm)
+    vm.run(n_steps=200000)
+    print kf.scores
+
+
+if __name__ == '__main__':
+    sys.exit(main())
+