# HG changeset patch # User James Bergstra # Date 1284964463 14400 # Node ID 1387771296a8ff813e00e7f065584e3debce96fa # Parent a60b3472c4baa1b4849ed5d42c34c8fab1cf0cd7 v2planning adding plugin_JB diff -r a60b3472c4ba -r 1387771296a8 doc/v2_planning/plugin_JB.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/doc/v2_planning/plugin_JB.py Mon Sep 20 02:34:23 2010 -0400 @@ -0,0 +1,400 @@ +"""plugin_JB - draft of library architecture using iterators""" + + +""" + +- PICKLABLE - algorithms are serializable at all points during execution + +- ITERATOR walks through algorithms with fine granularity + +- COMPONENTS - library provides components on which programs operate + +- ALGORITHMS - library provides algorithms in clean (no hooks) form + +- HOOKS - user can insert print / debug logic with search/replace type calls + e.g. prog.find(CALL(cd1_update)).replace_with(SEQ([CALL(cd1_update), CALL(debugfn)])) + +- PRINTING - user can print the 'program code' of an algorithm built from library pieces + +- MODULAR EXPERIMENTS - an experiment object with one (or more?) programs and all of the objects referred to by + those programs. It is the preferred type of object to be serialized. The main components of + the algorithms should be top-level attributes of the package. This object can be serialized + and loaded in another process to implement job migration. + +- OPTIMIZATION - program can be optimized automatically + e.g. BUFFER(N, CALL(dataset.next)) can be replaced if dataset.next implements the right + attribute/protocol for 'bufferable' or something. + + e.g. SEQ([a,b,c,d]) can be compiled with Theano if sub-sequence is compatible + +- don't need greenlets to get efficiency, the implementations of control flow ops can manage a + stack or stack tree in the vm (like greenlets do I think) we don't really need + greenlets/stackless I don't think + +""" + +__license__ = None +__copyright__ = None + +import copy, sys, cPickle + +import numpy + + +################################################### +# Virtual Machine for executing programs + +class VirtualMachine(object): + def __init__(self, prog): + self.prog = prog + self.started = False + self.finished=False + def __iter__(self): + assert not self.started + self.prog.start(None) + self.started = True + return self + def next(self): + if self.finished: + raise StopIteration() + r = self.prog.step() + if r is INCOMPLETE: + return r + else: + self.finished=True + return r + def run(self,n_steps=float('inf')): + i = 0 + for r in self: + i += 1 + if i > n_steps: + break + return r + + +#################################################### +# CONTROL-FLOW CONSTRUCTS + +class INCOMPLETE: + """Return value for Element.step""" + +class ELEMENT(object): + """ + every execution block has a driver + + the driver calls start when entering a new control element + - this would be called once per e.g. outer loop iteration + + the driver calls step to advance the control element + - which returns INCOMPLETE + - which returns any other object to indicate completion + """ + + def start(self, arg): + pass + def step(self): + pass + + +class BUFFER_REPEAT(ELEMENT): + """ + Accumulate a number of return values into one list / array. + + The source of return values `src` is a control element that will be restarted repeatedly in + order to fulfil the requiement of gathering N samples. + + TODO: support accumulating of tuples of arrays + """ + def __init__(self, N, src, storage=None): + """ + TODO: use preallocated `storage` + """ + self.N = N + self.n = 0 + self.src = src + self.storage = storage + self.src.start(None) + if self.storage != None: + raise NotImplementedError() + def start(self, arg): + self.buf = [None] * self.N + self.n = 0 + self.finished = False + def step(self): + assert not self.finished + r = self.src.step() + if r is INCOMPLETE: + return r + self.src.start(None) # restart our stream + self.buf[self.n] = r + self.n += 1 + if self.n == self.N: + self.finished = True + return self.buf + else: + return INCOMPLETE + assert 0 + +class CALL(ELEMENT): + """ + Control flow terminal - call a python function or method. + + Returns the return value of the call. + """ + def __init__(self, fn, *args, **kwargs): + self.fn = fn + self.args = args + self.kwargs=kwargs + self.use_start_arg = kwargs.pop('use_start_arg', False) + def start(self, arg): + self.start_arg = arg + self.finished = False + return self + def step(self): + assert not self.finished + self.finished = True + if self.use_start_arg: + if self.args: + raise TypeError('cant get positional args both ways') + return self.fn(self.start_arg, **self.kwargs) + else: + return self.fn(*self.args, **self.kwargs) + def __getstate__(self): + rval = self.__dict__ + if type(self.fn) is type(self.step): #instancemethod + fn = rval.pop('fn') + rval['i fn'] = fn.im_func, fn.im_self, fn.im_class + return rval + def __setstate__(self, dct): + if 'i fn' in dct: + dct['fn'] = type(self.step)(*dct.pop('i fn')) + self.__dict__.update(dct) + +def FILT(*args, **kwargs): + return CALL(use_start_arg=True, *args, **kwargs) + +def CHOOSE(which, options): + """ + Execute one out of a number of optional control flow paths + """ + raise NotImplementedError() + +def LOOP(elements): + #TODO: implement a true infinite loop + try: + iter(elements) + return REPEAT(sys.maxint, elements) + except TypeError: + return REPEAT(sys.maxint, [elements]) + +class REPEAT(ELEMENT): + def __init__(self, N, elements, pass_rvals=False): + self.N = N + self.elements = elements + self.pass_rvals = pass_rvals + #TODO: check for N being callable + def start(self, arg): + self.n = 0 #loop iteration + self.idx = 0 #element idx + self.finished = False + self.elements[0].start(arg) + def step(self): + assert not self.finished + r = self.elements[self.idx].step() + if r is INCOMPLETE: + return INCOMPLETE + self.idx += 1 + if self.idx < len(self.elements): + self.elements[self.idx].start(r) + return INCOMPLETE + self.n += 1 + if self.n < self.N: + self.idx = 0 + self.elements[self.idx].start(r) + return INCOMPLETE + else: + self.finished = True + return r + +def SEQ(elements): + return REPEAT(1, elements) + +class WEAVE(ELEMENT): + """ + Interleave execution of a number of elements. + + TODO: allow a schedule (at least relative frequency) of elements from each program + """ + def __init__(self, elements): + self.elements = elements + def start(self, arg): + for el in self.elements: + el.start(arg) + self.idx = 0 + self.any_is_finished = False + self.finished= False + def step(self): + assert not self.finished # if this is triggered, we have a broken driver + self.idx = self.idx % len(self.elements) + r = self.elements[self.idx].step() + if r is not INCOMPLETE: + self.any_is_finished = True + self.idx += 1 + if self.idx == len(self.elements) and self.any_is_finished: + self.finished = True + return None # dummy completion value + else: + return INCOMPLETE + + +#################################################### +# [Dummy] Components involved in learning algorithms + +class Dataset(object): + def __init__(self, data): + self.pos = 0 + self.data = data + def next(self): + rval = self.data[self.pos] + self.pos += 1 + if self.pos == len(self.data): + self.pos = 0 + return rval + def seek(self, pos): + self.pos = pos + +class KFold(object): + def __init__(self, data, K): + self.data = data + self.k = -1 + self.scores = [None]*K + self.K = K + def next_fold(self): + self.k += 1 + self.data.seek(0) # restart the stream + def next(self): + #TODO: skip the examples that are ommitted in this split + return self.data.next() + def init_test(self): + pass + def next_test(self): + return self.data.next() + def test_size(self): + return 5 + def store_scores(self, scores): + self.scores[self.k] = scores + +class PCA_Analysis(object): + def __init__(self): + self.clear() + + def clear(self): + self.mean = 0 + self.eigvecs=0 + self.eigvals=0 + def analyze(self, X): + self.mean = numpy.mean(X, axis=0) + self.eigvecs=1 + self.eigvals=1 + def filt(self, X): + return (X - self.mean) * self.eigvecs #TODO: divide by root eigvals? + def pseudo_inverse(self, Y): + return Y + +class Layer(object): + def __init__(self, w): + self.w = w + def filt(self, x): + return self.w*x + def clear(self): + self.w =0 + +def print_obj(obj): + print obj +def print_obj_attr(obj, attr): + print getattr(obj, attr) +def no_op(*args, **kwargs): + pass + +class cd1_update(object): + def __init__(self, layer, lr): + self.layer = layer + self.lr = lr + + def __call__(self, X): + # update self.layer from observation X + self.layer.w += X.mean() * self.lr #TODO: not exactly correct math + +def simple_main(): + + l = [0] + def f(a): + print l + l[0] += a + return l[0] + + print VirtualMachine(WEAVE([ + BUFFER_REPEAT(3,CALL(f,1)), + BUFFER_REPEAT(5,CALL(f,1)), + ])).run() + +def main(): + # create components + dataset = Dataset(numpy.random.RandomState(123).randn(13,1)) + pca = PCA_Analysis() + layer1 = Layer(w=4) + layer2 = Layer(w=3) + kf = KFold(dataset, K=10) + + # create algorithm + + train_pca = SEQ([ + BUFFER_REPEAT(1000, CALL(kf.next)), + FILT(pca.analyze)]) + + train_layer1 = REPEAT(10, [ + BUFFER_REPEAT(10, CALL(kf.next)), + FILT(pca.filt), + FILT(cd1_update(layer1, lr=.01))]) + + train_layer2 = REPEAT(10, [ + BUFFER_REPEAT(10, CALL(kf.next)), + FILT(pca.filt), + FILT(layer1.filt), + FILT(cd1_update(layer2, lr=.01))]) + + train_prog = SEQ([ + train_pca, + WEAVE([ + train_layer1, + LOOP(CALL(print_obj_attr, layer1, 'w'))]), + train_layer2, + ]) + + kfold_prog = REPEAT(10, [ + CALL(kf.next_fold), + CALL(pca.clear), + CALL(layer1.clear), + CALL(layer2.clear), + train_prog, + CALL(kf.init_test), + BUFFER_REPEAT(kf.test_size(), + SEQ([ + CALL(kf.next_test), + FILT(pca.filt), # may want to allow this SEQ to be + FILT(layer1.filt), # optimized into a shorter one that + FILT(layer2.filt), + FILT(numpy.mean)])), # chains together theano graphs + FILT(kf.store_scores), + ]) + + vm = VirtualMachine(kfold_prog) + + #vm2 = copy.deepcopy(vm) + vm.run(n_steps=200000) + print kf.scores + + +if __name__ == '__main__': + sys.exit(main()) +