Mercurial > pylearn
changeset 1212:478bb1f8215c
plugin_JB - added SPAWN control element and demo program
author | James Bergstra <bergstrj@iro.umontreal.ca> |
---|---|
date | Wed, 22 Sep 2010 01:37:55 -0400 |
parents | e7ac87720fee |
children | 33513a46c41b |
files | doc/v2_planning/arch_src/plugin_JB.py doc/v2_planning/arch_src/plugin_JB_main.py doc/v2_planning/plugin_JB.py |
diffstat | 3 files changed, 576 insertions(+), 492 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/doc/v2_planning/arch_src/plugin_JB.py Wed Sep 22 01:37:55 2010 -0400 @@ -0,0 +1,366 @@ +"""plugin_JB - draft of potential library architecture using iterators + +This strategy makes use of a simple imperative language whose statements are python function +calls to create learning algorithms that can be manipulated and executed in several desirable +ways. + +The training procedure for a PCA module is easy to express: + + # allocate the relevant modules + dataset = Dataset(numpy.random.RandomState(123).randn(13,1)) + pca = PCA_Analysis() + pca_batchsize=1000 + + # define the control-flow of the algorithm + train_pca = SEQ([ + BUFFER_REPEAT(pca_batchsize, CALL(dataset.next)), + FILT(pca.analyze)]) + + # run the program + train_pca.run() + +The CALL, SEQ, FILT, and BUFFER_REPEAT are control-flow elements. The control-flow elements I +defined so far are: + +- CALL - a basic statement, just calls a python function +- FILT - like call, but passes the return value of the last CALL or FILT to the python function +- SEQ - a sequence of elements to run in order +- REPEAT - do something N times (and return None or maybe the last CALL?) +- BUFFER_REPEAT - do something N times and accumulate the return value from each iter +- LOOP - do something an infinite number of times +- CHOOSE - like a switch statement (should rename to SWITCH) +- WEAVE - interleave execution of multiple control-flow elements +- POPEN - launch a process and return its status when it's complete +- PRINT - a shortcut for CALL(print_obj) + + +We don't have many requirements per-se for the architecture, but I think this design respects +and realizes all of them. +The advantages of this approach are: + + - algorithms (including partially run ones) are COPYABLE, and SERIALIZABLE + + - algorithms can be executed without seizing control of the python process (the run() + method does this, but if you look inside it you'll see it's a simple for loop) + + - it is easy to execute an algorithm step by step in a main loop that also checks for + network or filesystem events related to e.g. job management. + + - the library can provide learning algorithms via control-flow templates, and the user can + edit them (with search/replace calls) to include HOOKS, and DIAGNOSTIC plug-in + functionality + + e.g. prog.find(CALL(cd1_update, layer=layer1)).replace_with( + SEQ([CALL(cd1_update, layer=layer1), CALL(my_debugfn)])) + + - user can print the 'program code' of an algorithm built from library pieces + + - program can be optimized automatically. + + - e.g. BUFFER(N, CALL(dataset.next)) could be replaced if dataset.next implements the + right attribute/protocol for 'bufferable' or something. + + - e.g. SEQ([a,b,c,d]) could be compiled to a single CALL to a Theano-compiled function + if a, b, c, and d are calls to callable objects that export something like a + 'theano_SEQ' interface + + +""" + +__license__ = 'TODO' +__copyright__ = 'TODO' + +import cPickle, copy, os, subprocess, sys, time +import numpy + +#################################################### +# CONTROL-FLOW CONSTRUCTS + +class INCOMPLETE: + """Return value for Element.step""" + +class ELEMENT(object): + """ + Base class for control flow elements (e.g. CALL, REPEAT, etc.) + + The design is that every element has a driver, that is another element, or the iterator + implementation in the ELEMENT class. + + the driver calls start when entering a new control element + - this would be called once per e.g. outer loop iteration + + the driver calls step to advance the control element + - which returns INCOMPLETE + - which returns any other object to indicate completion + """ + + # subclasses should override these methods: + def start(self, arg): + pass + def step(self): + pass + + # subclasses should typically not override these: + def run(self, arg=None, n_steps=float('inf')): + self.start(arg) + i = 0 + r = self.step() + while r is INCOMPLETE: + i += 1 + #TODO make sure there is not an off-by-one error + if i > n_steps: + break + r = self.step() + return r + +class BUFFER_REPEAT(ELEMENT): + """ + Accumulate a number of return values into one list / array. + + The source of return values `src` is a control element that will be restarted repeatedly in + order to fulfil the requiement of gathering N samples. + + TODO: support accumulating of tuples of arrays + """ + def __init__(self, N, src, storage=None): + """ + TODO: use preallocated `storage` + """ + self.N = N + self.n = 0 + self.src = src + self.storage = storage + self.src.start(None) + if self.storage != None: + raise NotImplementedError() + def start(self, arg): + self.buf = [None] * self.N + self.n = 0 + self.finished = False + def step(self): + assert not self.finished + r = self.src.step() + if r is INCOMPLETE: + return r + self.src.start(None) # restart our stream + self.buf[self.n] = r + self.n += 1 + if self.n == self.N: + self.finished = True + return self.buf + else: + return INCOMPLETE + assert 0 + +class CALL(ELEMENT): + """ + Control flow terminal - call a python function or method. + + Returns the return value of the call. + """ + def __init__(self, fn, *args, **kwargs): + self.fn = fn + self.args = args + self.kwargs=kwargs + self.use_start_arg = kwargs.pop('use_start_arg', False) + def start(self, arg): + self.start_arg = arg + self.finished = False + return self + def step(self): + assert not self.finished + self.finished = True + if self.use_start_arg: + if self.args: + raise TypeError('cant get positional args both ways') + return self.fn(self.start_arg, **self.kwargs) + else: + return self.fn(*self.args, **self.kwargs) + def __getstate__(self): + rval = dict(self.__dict__) + if type(self.fn) is type(self.step): #instancemethod + fn = rval.pop('fn') + rval['i fn'] = fn.im_func, fn.im_self, fn.im_class + return rval + def __setstate__(self, dct): + if 'i fn' in dct: + dct['fn'] = type(self.step)(*dct.pop('i fn')) + self.__dict__.update(dct) + +def FILT(fn, **kwargs): + """ + Return a CALL object that uses the return value from the previous CALL as the first and + only positional argument. + """ + return CALL(fn, use_start_arg=True, **kwargs) + +def CHOOSE(which, options): + """ + Execute one out of a number of optional control flow paths + """ + raise NotImplementedError() + +def LOOP(elements): + #TODO: implement a true infinite loop + try: + iter(elements) + return REPEAT(sys.maxint, elements) + except TypeError: + return REPEAT(sys.maxint, [elements]) + +class REPEAT(ELEMENT): + def __init__(self, N, elements, pass_rvals=False): + self.N = N + self.elements = elements + self.pass_rvals = pass_rvals + + #TODO: check for N being callable + def start(self, arg): + self.n = 0 #loop iteration + self.idx = 0 #element idx + self.finished = False + self.elements[0].start(arg) + def step(self): + assert not self.finished + r = self.elements[self.idx].step() + if r is INCOMPLETE: + return INCOMPLETE + self.idx += 1 + if self.idx < len(self.elements): + self.elements[self.idx].start(r) + return INCOMPLETE + self.n += 1 + if self.n < self.N: + self.idx = 0 + self.elements[self.idx].start(r) + return INCOMPLETE + else: + self.finished = True + return r + +def SEQ(elements): + return REPEAT(1, elements) + +class WEAVE(ELEMENT): + """ + Interleave execution of a number of elements. + + TODO: allow a schedule (at least relative frequency) of elements from each program + """ + def __init__(self, n_required, elements): + self.elements = elements + if n_required == -1: + self.n_required = len(elements) + else: + self.n_required = n_required + def start(self, arg): + for el in self.elements: + el.start(arg) + self.elem_finished = [0] * len(self.elements) + self.idx = 0 + self.finished= False + def step(self): + assert not self.finished # if this is triggered, we have a broken driver + + #start with this check in case there were no elements + # it's possible for the number of finished elements to exceed the threshold + if sum(self.elem_finished) >= self.n_required: + self.finished = True + return None + + # step the active element + r = self.elements[self.idx].step() + + if r is not INCOMPLETE: + self.elem_finished[self.idx] = True + + # check for completion + if sum(self.elem_finished) >= self.n_required: + self.finished = True + return None + + # advance to the next un-finished element + self.idx = (self.idx+1) % len(self.elements) + while self.elem_finished[self.idx]: + self.idx = (self.idx+1) % len(self.elements) + + return INCOMPLETE + +class POPEN(ELEMENT): + def __init__(self, args): + self.args = args + def start(self, arg): + self.p = subprocess.Popen(self.args) + def step(self): + r = self.p.poll() + if r is None: + return INCOMPLETE + return r + +def PRINT(obj): + return CALL(print_obj, obj) + +class SPAWN(ELEMENT): + SUCCESS = 0 + def __init__(self, data, prog): + self.data = data + self.prog = prog + def start(self, arg): + # pickle the (data, prog) pair + s = cPickle.dumps((self.data, self.prog)) + + # call python with a stub function that + # unpickles the data, prog pair and starts running the prog + self.rpipe, wpipe = os.pipe() + code = 'import sys, plugin_JB; sys.exit(plugin_JB.SPAWN._main(%i))'%wpipe + self.p = subprocess.Popen( + ['python', '-c', code], + stdin=subprocess.PIPE) + # send the data and prog to the other process + self.p.stdin.write(s) + self.finished= False + + #TODO: send over tgz of the modules this code needs + + #TODO: When the client process is on a different machine, negotiate with the client + # process to determine which modules it needs, and send over the code for pure python + # ones. Make sure versions match for non-pure python ones. + + def step(self): + assert not self.finished + r = self.p.poll() + if r is None: + return INCOMPLETE # typical exit case + self.finished = True + if r != self.SUCCESS: + print "UH OH", r # TODO - ??? + rfile = os.fdopen(self.rpipe) + # recv the revised of the data dictionary + data = cPickle.load(rfile) + # modify the data dict in-place + # for new values to be visible to other components + self.data.update(data) + rfile.close() + #TODO: return something meaningful? like r? + return None + + @staticmethod + def _main(wpipe): + #TODO: unpack and install tgz of the modules this code needs + data, prog = cPickle.load(sys.stdin) + rval = prog.run() + os.write(wpipe, cPickle.dumps(data)) + return SPAWN.SUCCESS + #os.close(wpipe) + + +def print_obj(obj): + print obj +def print_obj_attr(obj, attr): + print getattr(obj, attr) +def no_op(*args, **kwargs): + pass + +def importable_fn(d): + d['new key'] = len(d) +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/doc/v2_planning/arch_src/plugin_JB_main.py Wed Sep 22 01:37:55 2010 -0400 @@ -0,0 +1,209 @@ +"""plugin_JB_main - main functions illustrating control flow library""" + +from plugin_JB import * #TODO: don't do this + + +#################################################### +# [Dummy] Components involved in learning algorithms + +class Dataset(object): + def __init__(self, data): + self.pos = 0 + self.data = data + def next(self): + rval = self.data[self.pos] + self.pos += 1 + if self.pos == len(self.data): + self.pos = 0 + return rval + def seek(self, pos): + self.pos = pos + +class KFold(object): + def __init__(self, data, K): + self.data = data + self.k = -1 + self.scores = [None]*K + self.K = K + def next_fold(self): + self.k += 1 + self.data.seek(0) # restart the stream + def next(self): + #TODO: skip the examples that are ommitted in this split + return self.data.next() + def init_test(self): + pass + def next_test(self): + return self.data.next() + def test_size(self): + return 5 + def store_scores(self, scores): + self.scores[self.k] = scores + + def prog(self, clear, train, test): + return REPEAT(self.K, [ + CALL(self.next_fold), + clear, + train, + CALL(self.init_test), + BUFFER_REPEAT(self.test_size(), + SEQ([ CALL(self.next_test), test])), + FILT(self.store_scores) ]) + +class PCA_Analysis(object): + def __init__(self): + self.clear() + + def clear(self): + self.mean = 0 + self.eigvecs=0 + self.eigvals=0 + def analyze(self, X): + self.mean = numpy.mean(X, axis=0) + self.eigvecs=1 + self.eigvals=1 + def filt(self, X): + return (X - self.mean) * self.eigvecs #TODO: divide by root eigvals? + def pseudo_inverse(self, Y): + return Y + +class Layer(object): + def __init__(self, w): + self.w = w + def filt(self, x): + return self.w*x + def clear(self): + self.w =0 + +def cd1_update(X, layer, lr): + # update self.layer from observation X + layer.w += X.mean() * lr #TODO: not exactly correct math! + + +############################################################### +# Example algorithms written in this control flow mini-language + +def main_weave(): + # Uses weave to demonstrate the interleaving of two bufferings of a single stream + + l = [0] + def f(a): + print l + l[0] += a + return l[0] + + print WEAVE(1, [ + BUFFER_REPEAT(3,CALL(f,1)), + BUFFER_REPEAT(5,CALL(f,1)), + ]).run() + +def main_weave_popen(): + # Uses weave and Popen to demonstrate the control of a program with some asynchronous + # parallelism + + p = WEAVE(2,[ + SEQ([POPEN(['sleep', '5']), PRINT('done 1')]), + SEQ([POPEN(['sleep', '10']), PRINT('done 2')]), + LOOP([ + CALL(print_obj, 'polling...'), + CALL(time.sleep, 1)])]) + # The LOOP would forever if the WEAVE were not configured to stop after 2 of its elements + # complete. + + p.run() + # Note that the program can be run multiple times... + p.run() + +def main_spawn(): + # illustate the use of SPAWN to drive a set of control programs + # in other processes + data1 = {0:"blah data1"} + data2 = {1:"foo data2"} + p = WEAVE(2,[ + SPAWN(data1, REPEAT(3, [ + CALL(importable_fn, data1), + PRINT("hello from 1")])), + SPAWN(data2, REPEAT(1, [ + CALL(importable_fn, data2), + PRINT("hello from 2")])), + LOOP([ + CALL(print_obj, 'polling...'), + CALL(time.sleep, 0.5)])]) + print 'BEFORE' + print data1 + print data2 + p.run() + print 'AFTER' + print data1 + print data2 + +def main_kfold_dbn(): + # Uses many of the control-flow elements to define the k-fold evaluation of a dbn + # The algorithm is not quite right, but the example shows off all of the required + # control-flow elements I think. + + # create components + dataset = Dataset(numpy.random.RandomState(123).randn(13,1)) + pca = PCA_Analysis() + layer1 = Layer(w=4) + layer2 = Layer(w=3) + kf = KFold(dataset, K=10) + + pca_batchsize=1000 + cd_batchsize = 5 + n_cd_updates_layer1 = 10 + n_cd_updates_layer2 = 10 + + # create algorithm + + train_pca = SEQ([ + BUFFER_REPEAT(pca_batchsize, CALL(kf.next)), + FILT(pca.analyze)]) + + train_layer1 = REPEAT(n_cd_updates_layer1, [ + BUFFER_REPEAT(cd_batchsize, CALL(kf.next)), + FILT(pca.filt), + FILT(cd1_update, layer=layer1, lr=.01)]) + + train_layer2 = REPEAT(n_cd_updates_layer2, [ + BUFFER_REPEAT(cd_batchsize, CALL(kf.next)), + FILT(pca.filt), + FILT(layer1.filt), + FILT(cd1_update, layer=layer2, lr=.01)]) + + kfold_prog = kf.prog( + clear = SEQ([ # FRAGMENT 1: this bit is the reset/clear stage + CALL(pca.clear), + CALL(layer1.clear), + CALL(layer2.clear), + ]), + train = SEQ([ + train_pca, + WEAVE(1, [ # Silly example of how to do debugging / loggin with WEAVE + train_layer1, + LOOP(CALL(print_obj_attr, layer1, 'w'))]), + train_layer2, + ]), + test=SEQ([ + FILT(pca.filt), # may want to allow this SEQ to be + FILT(layer1.filt), # optimized into a shorter one that + FILT(layer2.filt), # compiles these calls together with + FILT(numpy.mean)])) # Theano + + pkg1 = dict(prog=kfold_prog, kf=kf) + pkg2 = copy.deepcopy(pkg1) # programs can be copied + + try: + pkg3 = cPickle.loads(cPickle.dumps(pkg1)) + except: + print >> sys.stderr, "pickling doesnt work, but it can be fixed I think" + + pkg = pkg2 + + # running a program updates the variables in its package, but not the other package + pkg['prog'].run() + print pkg['kf'].scores + + +if __name__ == '__main__': + sys.exit(eval(sys.argv[1]))
--- a/doc/v2_planning/plugin_JB.py Wed Sep 22 00:23:07 2010 -0400 +++ b/doc/v2_planning/plugin_JB.py Wed Sep 22 01:37:55 2010 -0400 @@ -1,492 +1,1 @@ -"""plugin_JB - draft of potential library architecture using iterators - -This strategy makes use of a simple imperative language whose statements are python function -calls to create learning algorithms that can be manipulated and executed in several desirable -ways. - -The training procedure for a PCA module is easy to express: - - # allocate the relevant modules - dataset = Dataset(numpy.random.RandomState(123).randn(13,1)) - pca = PCA_Analysis() - pca_batchsize=1000 - - # define the control-flow of the algorithm - train_pca = SEQ([ - BUFFER_REPEAT(pca_batchsize, CALL(dataset.next)), - FILT(pca.analyze)]) - - # run the program - train_pca.run() - -The CALL, SEQ, FILT, and BUFFER_REPEAT are control-flow elements. The control-flow elements I -defined so far are: - -- CALL - a basic statement, just calls a python function -- FILT - like call, but passes the return value of the last CALL or FILT to the python function -- SEQ - a sequence of elements to run in order -- REPEAT - do something N times (and return None or maybe the last CALL?) -- BUFFER_REPEAT - do something N times and accumulate the return value from each iter -- LOOP - do something an infinite number of times -- CHOOSE - like a switch statement (should rename to SWITCH) -- WEAVE - interleave execution of multiple control-flow elements -- POPEN - launch a process and return its status when it's complete -- PRINT - a shortcut for CALL(print_obj) - - -We don't have many requirements per-se for the architecture, but I think this design respects -and realizes all of them. -The advantages of this approach are: - - - algorithms (including partially run ones) are COPYABLE, and SERIALIZABLE - - - algorithms can be executed without seizing control of the python process (the run() - method does this, but if you look inside it you'll see it's a simple for loop) - - - it is easy to execute an algorithm step by step in a main loop that also checks for - network or filesystem events related to e.g. job management. - - - the library can provide learning algorithms via control-flow templates, and the user can - edit them (with search/replace calls) to include HOOKS, and DIAGNOSTIC plug-in - functionality - - e.g. prog.find(CALL(cd1_update, layer=layer1)).replace_with( - SEQ([CALL(cd1_update, layer=layer1), CALL(my_debugfn)])) - - - user can print the 'program code' of an algorithm built from library pieces - - - program can be optimized automatically. - - - e.g. BUFFER(N, CALL(dataset.next)) could be replaced if dataset.next implements the - right attribute/protocol for 'bufferable' or something. - - - e.g. SEQ([a,b,c,d]) could be compiled to a single CALL to a Theano-compiled function - if a, b, c, and d are calls to callable objects that export something like a - 'theano_SEQ' interface - - -""" - -__license__ = 'TODO' -__copyright__ = 'TODO' - -import cPickle, copy, subprocess, sys, time -import numpy - -#################################################### -# CONTROL-FLOW CONSTRUCTS - -class INCOMPLETE: - """Return value for Element.step""" - -class ELEMENT(object): - """ - Base class for control flow elements (e.g. CALL, REPEAT, etc.) - - The design is that every element has a driver, that is another element, or the iterator - implementation in the ELEMENT class. - - the driver calls start when entering a new control element - - this would be called once per e.g. outer loop iteration - - the driver calls step to advance the control element - - which returns INCOMPLETE - - which returns any other object to indicate completion - """ - - # subclasses should override these methods: - def start(self, arg): - pass - def step(self): - pass - - # subclasses should typically not override these: - def run(self, arg=None, n_steps=float('inf')): - self.start(arg) - i = 0 - r = self.step() - while r is INCOMPLETE: - i += 1 - #TODO make sure there is not an off-by-one error - if i > n_steps: - break - r = self.step() - return r - -class BUFFER_REPEAT(ELEMENT): - """ - Accumulate a number of return values into one list / array. - - The source of return values `src` is a control element that will be restarted repeatedly in - order to fulfil the requiement of gathering N samples. - - TODO: support accumulating of tuples of arrays - """ - def __init__(self, N, src, storage=None): - """ - TODO: use preallocated `storage` - """ - self.N = N - self.n = 0 - self.src = src - self.storage = storage - self.src.start(None) - if self.storage != None: - raise NotImplementedError() - def start(self, arg): - self.buf = [None] * self.N - self.n = 0 - self.finished = False - def step(self): - assert not self.finished - r = self.src.step() - if r is INCOMPLETE: - return r - self.src.start(None) # restart our stream - self.buf[self.n] = r - self.n += 1 - if self.n == self.N: - self.finished = True - return self.buf - else: - return INCOMPLETE - assert 0 - -class CALL(ELEMENT): - """ - Control flow terminal - call a python function or method. - - Returns the return value of the call. - """ - def __init__(self, fn, *args, **kwargs): - self.fn = fn - self.args = args - self.kwargs=kwargs - self.use_start_arg = kwargs.pop('use_start_arg', False) - def start(self, arg): - self.start_arg = arg - self.finished = False - return self - def step(self): - assert not self.finished - self.finished = True - if self.use_start_arg: - if self.args: - raise TypeError('cant get positional args both ways') - return self.fn(self.start_arg, **self.kwargs) - else: - return self.fn(*self.args, **self.kwargs) - def __getstate__(self): - rval = dict(self.__dict__) - if type(self.fn) is type(self.step): #instancemethod - fn = rval.pop('fn') - rval['i fn'] = fn.im_func, fn.im_self, fn.im_class - return rval - def __setstate__(self, dct): - if 'i fn' in dct: - dct['fn'] = type(self.step)(*dct.pop('i fn')) - self.__dict__.update(dct) - -def FILT(fn, **kwargs): - """ - Return a CALL object that uses the return value from the previous CALL as the first and - only positional argument. - """ - return CALL(fn, use_start_arg=True, **kwargs) - -def CHOOSE(which, options): - """ - Execute one out of a number of optional control flow paths - """ - raise NotImplementedError() - -def LOOP(elements): - #TODO: implement a true infinite loop - try: - iter(elements) - return REPEAT(sys.maxint, elements) - except TypeError: - return REPEAT(sys.maxint, [elements]) - -class REPEAT(ELEMENT): - def __init__(self, N, elements, pass_rvals=False): - self.N = N - self.elements = elements - self.pass_rvals = pass_rvals - - #TODO: check for N being callable - def start(self, arg): - self.n = 0 #loop iteration - self.idx = 0 #element idx - self.finished = False - self.elements[0].start(arg) - def step(self): - assert not self.finished - r = self.elements[self.idx].step() - if r is INCOMPLETE: - return INCOMPLETE - self.idx += 1 - if self.idx < len(self.elements): - self.elements[self.idx].start(r) - return INCOMPLETE - self.n += 1 - if self.n < self.N: - self.idx = 0 - self.elements[self.idx].start(r) - return INCOMPLETE - else: - self.finished = True - return r - -def SEQ(elements): - return REPEAT(1, elements) - -class WEAVE(ELEMENT): - """ - Interleave execution of a number of elements. - - TODO: allow a schedule (at least relative frequency) of elements from each program - """ - def __init__(self, n_required, elements): - self.elements = elements - if n_required == -1: - self.n_required = len(elements) - else: - self.n_required = n_required - def start(self, arg): - for el in self.elements: - el.start(arg) - self.elem_finished = [0] * len(self.elements) - self.idx = 0 - self.finished= False - def step(self): - assert not self.finished # if this is triggered, we have a broken driver - - #start with this check in case there were no elements - # it's possible for the number of finished elements to exceed the threshold - if sum(self.elem_finished) >= self.n_required: - self.finished = True - return None - - # step the active element - r = self.elements[self.idx].step() - - if r is not INCOMPLETE: - self.elem_finished[self.idx] = True - - # check for completion - if sum(self.elem_finished) >= self.n_required: - self.finished = True - return None - - # advance to the next un-finished element - self.idx = (self.idx+1) % len(self.elements) - while self.elem_finished[self.idx]: - self.idx = (self.idx+1) % len(self.elements) - - return INCOMPLETE - -class POPEN(ELEMENT): - def __init__(self, args): - self.args = args - def start(self, arg): - self.p = subprocess.Popen(self.args) - def step(self): - r = self.p.poll() - if r is None: - return INCOMPLETE - return r - -def PRINT(obj): - return CALL(print_obj, obj) - -#################################################### -# [Dummy] Components involved in learning algorithms - -class Dataset(object): - def __init__(self, data): - self.pos = 0 - self.data = data - def next(self): - rval = self.data[self.pos] - self.pos += 1 - if self.pos == len(self.data): - self.pos = 0 - return rval - def seek(self, pos): - self.pos = pos - -class KFold(object): - def __init__(self, data, K): - self.data = data - self.k = -1 - self.scores = [None]*K - self.K = K - def next_fold(self): - self.k += 1 - self.data.seek(0) # restart the stream - def next(self): - #TODO: skip the examples that are ommitted in this split - return self.data.next() - def init_test(self): - pass - def next_test(self): - return self.data.next() - def test_size(self): - return 5 - def store_scores(self, scores): - self.scores[self.k] = scores - - def prog(self, clear, train, test): - return REPEAT(self.K, [ - CALL(self.next_fold), - clear, - train, - CALL(self.init_test), - BUFFER_REPEAT(self.test_size(), - SEQ([ CALL(self.next_test), test])), - FILT(self.store_scores) ]) - -class PCA_Analysis(object): - def __init__(self): - self.clear() - - def clear(self): - self.mean = 0 - self.eigvecs=0 - self.eigvals=0 - def analyze(self, X): - self.mean = numpy.mean(X, axis=0) - self.eigvecs=1 - self.eigvals=1 - def filt(self, X): - return (X - self.mean) * self.eigvecs #TODO: divide by root eigvals? - def pseudo_inverse(self, Y): - return Y - -class Layer(object): - def __init__(self, w): - self.w = w - def filt(self, x): - return self.w*x - def clear(self): - self.w =0 - -def print_obj(obj): - print obj -def print_obj_attr(obj, attr): - print getattr(obj, attr) -def no_op(*args, **kwargs): - pass - -def cd1_update(X, layer, lr): - # update self.layer from observation X - layer.w += X.mean() * lr #TODO: not exactly correct math! - - -############################################################### -# Example algorithms written in this control flow mini-language - -def main_weave(): - # Uses weave to demonstrate the interleaving of two bufferings of a single stream - - l = [0] - def f(a): - print l - l[0] += a - return l[0] - - print WEAVE(1, [ - BUFFER_REPEAT(3,CALL(f,1)), - BUFFER_REPEAT(5,CALL(f,1)), - ]).run() - -def main_weave_popen(): - # Uses weave and Popen to demonstrate the control of a program with some asynchronous - # parallelism - - p = WEAVE(2,[ - SEQ([POPEN(['sleep', '5']), PRINT('done 1')]), - SEQ([POPEN(['sleep', '10']), PRINT('done 2')]), - LOOP([ - CALL(print_obj, 'polling...'), - CALL(time.sleep, 1)])]) - # The LOOP would forever if the WEAVE were not configured to stop after 2 of its elements - # complete. - - p.run() - # Note that the program can be run multiple times... - p.run() - -main = main_weave_popen -def main_kfold_dbn(): - # Uses many of the control-flow elements to define the k-fold evaluation of a dbn - # The algorithm is not quite right, but the example shows off all of the required - # control-flow elements I think. - - # create components - dataset = Dataset(numpy.random.RandomState(123).randn(13,1)) - pca = PCA_Analysis() - layer1 = Layer(w=4) - layer2 = Layer(w=3) - kf = KFold(dataset, K=10) - - pca_batchsize=1000 - cd_batchsize = 5 - n_cd_updates_layer1 = 10 - n_cd_updates_layer2 = 10 - - # create algorithm - - train_pca = SEQ([ - BUFFER_REPEAT(pca_batchsize, CALL(kf.next)), - FILT(pca.analyze)]) - - train_layer1 = REPEAT(n_cd_updates_layer1, [ - BUFFER_REPEAT(cd_batchsize, CALL(kf.next)), - FILT(pca.filt), - FILT(cd1_update, layer=layer1, lr=.01)]) - - train_layer2 = REPEAT(n_cd_updates_layer2, [ - BUFFER_REPEAT(cd_batchsize, CALL(kf.next)), - FILT(pca.filt), - FILT(layer1.filt), - FILT(cd1_update, layer=layer2, lr=.01)]) - - kfold_prog = kf.prog( - clear = SEQ([ # FRAGMENT 1: this bit is the reset/clear stage - CALL(pca.clear), - CALL(layer1.clear), - CALL(layer2.clear), - ]), - train = SEQ([ - train_pca, - WEAVE(1, [ # Silly example of how to do debugging / loggin with WEAVE - train_layer1, - LOOP(CALL(print_obj_attr, layer1, 'w'))]), - train_layer2, - ]), - test=SEQ([ - FILT(pca.filt), # may want to allow this SEQ to be - FILT(layer1.filt), # optimized into a shorter one that - FILT(layer2.filt), # compiles these calls together with - FILT(numpy.mean)])) # Theano - - pkg1 = dict(prog=kfold_prog, kf=kf) - pkg2 = copy.deepcopy(pkg1) # programs can be copied - - try: - pkg3 = cPickle.loads(cPickle.dumps(pkg1)) - except: - print >> sys.stderr, "pickling doesnt work, but it can be fixed I think" - - pkg = pkg2 - - # running a program updates the variables in its package, but not the other package - pkg['prog'].run() - print pkg['kf'].scores - - -if __name__ == '__main__': - sys.exit(main()) - +print "Moved to ./arch_src/plugin_JB.py"