Mercurial > pylearn
comparison doc/v2_planning/plugin_JB.py @ 1212:478bb1f8215c
plugin_JB - added SPAWN control element and demo program
author | James Bergstra <bergstrj@iro.umontreal.ca> |
---|---|
date | Wed, 22 Sep 2010 01:37:55 -0400 |
parents | e7ac87720fee |
children |
comparison
equal
deleted
inserted
replaced
1211:e7ac87720fee | 1212:478bb1f8215c |
---|---|
1 """plugin_JB - draft of potential library architecture using iterators | 1 print "Moved to ./arch_src/plugin_JB.py" |
2 | |
3 This strategy makes use of a simple imperative language whose statements are python function | |
4 calls to create learning algorithms that can be manipulated and executed in several desirable | |
5 ways. | |
6 | |
7 The training procedure for a PCA module is easy to express: | |
8 | |
9 # allocate the relevant modules | |
10 dataset = Dataset(numpy.random.RandomState(123).randn(13,1)) | |
11 pca = PCA_Analysis() | |
12 pca_batchsize=1000 | |
13 | |
14 # define the control-flow of the algorithm | |
15 train_pca = SEQ([ | |
16 BUFFER_REPEAT(pca_batchsize, CALL(dataset.next)), | |
17 FILT(pca.analyze)]) | |
18 | |
19 # run the program | |
20 train_pca.run() | |
21 | |
22 The CALL, SEQ, FILT, and BUFFER_REPEAT are control-flow elements. The control-flow elements I | |
23 defined so far are: | |
24 | |
25 - CALL - a basic statement, just calls a python function | |
26 - FILT - like call, but passes the return value of the last CALL or FILT to the python function | |
27 - SEQ - a sequence of elements to run in order | |
28 - REPEAT - do something N times (and return None or maybe the last CALL?) | |
29 - BUFFER_REPEAT - do something N times and accumulate the return value from each iter | |
30 - LOOP - do something an infinite number of times | |
31 - CHOOSE - like a switch statement (should rename to SWITCH) | |
32 - WEAVE - interleave execution of multiple control-flow elements | |
33 - POPEN - launch a process and return its status when it's complete | |
34 - PRINT - a shortcut for CALL(print_obj) | |
35 | |
36 | |
37 We don't have many requirements per-se for the architecture, but I think this design respects | |
38 and realizes all of them. | |
39 The advantages of this approach are: | |
40 | |
41 - algorithms (including partially run ones) are COPYABLE, and SERIALIZABLE | |
42 | |
43 - algorithms can be executed without seizing control of the python process (the run() | |
44 method does this, but if you look inside it you'll see it's a simple for loop) | |
45 | |
46 - it is easy to execute an algorithm step by step in a main loop that also checks for | |
47 network or filesystem events related to e.g. job management. | |
48 | |
49 - the library can provide learning algorithms via control-flow templates, and the user can | |
50 edit them (with search/replace calls) to include HOOKS, and DIAGNOSTIC plug-in | |
51 functionality | |
52 | |
53 e.g. prog.find(CALL(cd1_update, layer=layer1)).replace_with( | |
54 SEQ([CALL(cd1_update, layer=layer1), CALL(my_debugfn)])) | |
55 | |
56 - user can print the 'program code' of an algorithm built from library pieces | |
57 | |
58 - program can be optimized automatically. | |
59 | |
60 - e.g. BUFFER(N, CALL(dataset.next)) could be replaced if dataset.next implements the | |
61 right attribute/protocol for 'bufferable' or something. | |
62 | |
63 - e.g. SEQ([a,b,c,d]) could be compiled to a single CALL to a Theano-compiled function | |
64 if a, b, c, and d are calls to callable objects that export something like a | |
65 'theano_SEQ' interface | |
66 | |
67 | |
68 """ | |
69 | |
70 __license__ = 'TODO' | |
71 __copyright__ = 'TODO' | |
72 | |
73 import cPickle, copy, subprocess, sys, time | |
74 import numpy | |
75 | |
76 #################################################### | |
77 # CONTROL-FLOW CONSTRUCTS | |
78 | |
79 class INCOMPLETE: | |
80 """Return value for Element.step""" | |
81 | |
82 class ELEMENT(object): | |
83 """ | |
84 Base class for control flow elements (e.g. CALL, REPEAT, etc.) | |
85 | |
86 The design is that every element has a driver, that is another element, or the iterator | |
87 implementation in the ELEMENT class. | |
88 | |
89 the driver calls start when entering a new control element | |
90 - this would be called once per e.g. outer loop iteration | |
91 | |
92 the driver calls step to advance the control element | |
93 - which returns INCOMPLETE | |
94 - which returns any other object to indicate completion | |
95 """ | |
96 | |
97 # subclasses should override these methods: | |
98 def start(self, arg): | |
99 pass | |
100 def step(self): | |
101 pass | |
102 | |
103 # subclasses should typically not override these: | |
104 def run(self, arg=None, n_steps=float('inf')): | |
105 self.start(arg) | |
106 i = 0 | |
107 r = self.step() | |
108 while r is INCOMPLETE: | |
109 i += 1 | |
110 #TODO make sure there is not an off-by-one error | |
111 if i > n_steps: | |
112 break | |
113 r = self.step() | |
114 return r | |
115 | |
116 class BUFFER_REPEAT(ELEMENT): | |
117 """ | |
118 Accumulate a number of return values into one list / array. | |
119 | |
120 The source of return values `src` is a control element that will be restarted repeatedly in | |
121 order to fulfil the requiement of gathering N samples. | |
122 | |
123 TODO: support accumulating of tuples of arrays | |
124 """ | |
125 def __init__(self, N, src, storage=None): | |
126 """ | |
127 TODO: use preallocated `storage` | |
128 """ | |
129 self.N = N | |
130 self.n = 0 | |
131 self.src = src | |
132 self.storage = storage | |
133 self.src.start(None) | |
134 if self.storage != None: | |
135 raise NotImplementedError() | |
136 def start(self, arg): | |
137 self.buf = [None] * self.N | |
138 self.n = 0 | |
139 self.finished = False | |
140 def step(self): | |
141 assert not self.finished | |
142 r = self.src.step() | |
143 if r is INCOMPLETE: | |
144 return r | |
145 self.src.start(None) # restart our stream | |
146 self.buf[self.n] = r | |
147 self.n += 1 | |
148 if self.n == self.N: | |
149 self.finished = True | |
150 return self.buf | |
151 else: | |
152 return INCOMPLETE | |
153 assert 0 | |
154 | |
155 class CALL(ELEMENT): | |
156 """ | |
157 Control flow terminal - call a python function or method. | |
158 | |
159 Returns the return value of the call. | |
160 """ | |
161 def __init__(self, fn, *args, **kwargs): | |
162 self.fn = fn | |
163 self.args = args | |
164 self.kwargs=kwargs | |
165 self.use_start_arg = kwargs.pop('use_start_arg', False) | |
166 def start(self, arg): | |
167 self.start_arg = arg | |
168 self.finished = False | |
169 return self | |
170 def step(self): | |
171 assert not self.finished | |
172 self.finished = True | |
173 if self.use_start_arg: | |
174 if self.args: | |
175 raise TypeError('cant get positional args both ways') | |
176 return self.fn(self.start_arg, **self.kwargs) | |
177 else: | |
178 return self.fn(*self.args, **self.kwargs) | |
179 def __getstate__(self): | |
180 rval = dict(self.__dict__) | |
181 if type(self.fn) is type(self.step): #instancemethod | |
182 fn = rval.pop('fn') | |
183 rval['i fn'] = fn.im_func, fn.im_self, fn.im_class | |
184 return rval | |
185 def __setstate__(self, dct): | |
186 if 'i fn' in dct: | |
187 dct['fn'] = type(self.step)(*dct.pop('i fn')) | |
188 self.__dict__.update(dct) | |
189 | |
190 def FILT(fn, **kwargs): | |
191 """ | |
192 Return a CALL object that uses the return value from the previous CALL as the first and | |
193 only positional argument. | |
194 """ | |
195 return CALL(fn, use_start_arg=True, **kwargs) | |
196 | |
197 def CHOOSE(which, options): | |
198 """ | |
199 Execute one out of a number of optional control flow paths | |
200 """ | |
201 raise NotImplementedError() | |
202 | |
203 def LOOP(elements): | |
204 #TODO: implement a true infinite loop | |
205 try: | |
206 iter(elements) | |
207 return REPEAT(sys.maxint, elements) | |
208 except TypeError: | |
209 return REPEAT(sys.maxint, [elements]) | |
210 | |
211 class REPEAT(ELEMENT): | |
212 def __init__(self, N, elements, pass_rvals=False): | |
213 self.N = N | |
214 self.elements = elements | |
215 self.pass_rvals = pass_rvals | |
216 | |
217 #TODO: check for N being callable | |
218 def start(self, arg): | |
219 self.n = 0 #loop iteration | |
220 self.idx = 0 #element idx | |
221 self.finished = False | |
222 self.elements[0].start(arg) | |
223 def step(self): | |
224 assert not self.finished | |
225 r = self.elements[self.idx].step() | |
226 if r is INCOMPLETE: | |
227 return INCOMPLETE | |
228 self.idx += 1 | |
229 if self.idx < len(self.elements): | |
230 self.elements[self.idx].start(r) | |
231 return INCOMPLETE | |
232 self.n += 1 | |
233 if self.n < self.N: | |
234 self.idx = 0 | |
235 self.elements[self.idx].start(r) | |
236 return INCOMPLETE | |
237 else: | |
238 self.finished = True | |
239 return r | |
240 | |
241 def SEQ(elements): | |
242 return REPEAT(1, elements) | |
243 | |
244 class WEAVE(ELEMENT): | |
245 """ | |
246 Interleave execution of a number of elements. | |
247 | |
248 TODO: allow a schedule (at least relative frequency) of elements from each program | |
249 """ | |
250 def __init__(self, n_required, elements): | |
251 self.elements = elements | |
252 if n_required == -1: | |
253 self.n_required = len(elements) | |
254 else: | |
255 self.n_required = n_required | |
256 def start(self, arg): | |
257 for el in self.elements: | |
258 el.start(arg) | |
259 self.elem_finished = [0] * len(self.elements) | |
260 self.idx = 0 | |
261 self.finished= False | |
262 def step(self): | |
263 assert not self.finished # if this is triggered, we have a broken driver | |
264 | |
265 #start with this check in case there were no elements | |
266 # it's possible for the number of finished elements to exceed the threshold | |
267 if sum(self.elem_finished) >= self.n_required: | |
268 self.finished = True | |
269 return None | |
270 | |
271 # step the active element | |
272 r = self.elements[self.idx].step() | |
273 | |
274 if r is not INCOMPLETE: | |
275 self.elem_finished[self.idx] = True | |
276 | |
277 # check for completion | |
278 if sum(self.elem_finished) >= self.n_required: | |
279 self.finished = True | |
280 return None | |
281 | |
282 # advance to the next un-finished element | |
283 self.idx = (self.idx+1) % len(self.elements) | |
284 while self.elem_finished[self.idx]: | |
285 self.idx = (self.idx+1) % len(self.elements) | |
286 | |
287 return INCOMPLETE | |
288 | |
289 class POPEN(ELEMENT): | |
290 def __init__(self, args): | |
291 self.args = args | |
292 def start(self, arg): | |
293 self.p = subprocess.Popen(self.args) | |
294 def step(self): | |
295 r = self.p.poll() | |
296 if r is None: | |
297 return INCOMPLETE | |
298 return r | |
299 | |
300 def PRINT(obj): | |
301 return CALL(print_obj, obj) | |
302 | |
303 #################################################### | |
304 # [Dummy] Components involved in learning algorithms | |
305 | |
306 class Dataset(object): | |
307 def __init__(self, data): | |
308 self.pos = 0 | |
309 self.data = data | |
310 def next(self): | |
311 rval = self.data[self.pos] | |
312 self.pos += 1 | |
313 if self.pos == len(self.data): | |
314 self.pos = 0 | |
315 return rval | |
316 def seek(self, pos): | |
317 self.pos = pos | |
318 | |
319 class KFold(object): | |
320 def __init__(self, data, K): | |
321 self.data = data | |
322 self.k = -1 | |
323 self.scores = [None]*K | |
324 self.K = K | |
325 def next_fold(self): | |
326 self.k += 1 | |
327 self.data.seek(0) # restart the stream | |
328 def next(self): | |
329 #TODO: skip the examples that are ommitted in this split | |
330 return self.data.next() | |
331 def init_test(self): | |
332 pass | |
333 def next_test(self): | |
334 return self.data.next() | |
335 def test_size(self): | |
336 return 5 | |
337 def store_scores(self, scores): | |
338 self.scores[self.k] = scores | |
339 | |
340 def prog(self, clear, train, test): | |
341 return REPEAT(self.K, [ | |
342 CALL(self.next_fold), | |
343 clear, | |
344 train, | |
345 CALL(self.init_test), | |
346 BUFFER_REPEAT(self.test_size(), | |
347 SEQ([ CALL(self.next_test), test])), | |
348 FILT(self.store_scores) ]) | |
349 | |
350 class PCA_Analysis(object): | |
351 def __init__(self): | |
352 self.clear() | |
353 | |
354 def clear(self): | |
355 self.mean = 0 | |
356 self.eigvecs=0 | |
357 self.eigvals=0 | |
358 def analyze(self, X): | |
359 self.mean = numpy.mean(X, axis=0) | |
360 self.eigvecs=1 | |
361 self.eigvals=1 | |
362 def filt(self, X): | |
363 return (X - self.mean) * self.eigvecs #TODO: divide by root eigvals? | |
364 def pseudo_inverse(self, Y): | |
365 return Y | |
366 | |
367 class Layer(object): | |
368 def __init__(self, w): | |
369 self.w = w | |
370 def filt(self, x): | |
371 return self.w*x | |
372 def clear(self): | |
373 self.w =0 | |
374 | |
375 def print_obj(obj): | |
376 print obj | |
377 def print_obj_attr(obj, attr): | |
378 print getattr(obj, attr) | |
379 def no_op(*args, **kwargs): | |
380 pass | |
381 | |
382 def cd1_update(X, layer, lr): | |
383 # update self.layer from observation X | |
384 layer.w += X.mean() * lr #TODO: not exactly correct math! | |
385 | |
386 | |
387 ############################################################### | |
388 # Example algorithms written in this control flow mini-language | |
389 | |
390 def main_weave(): | |
391 # Uses weave to demonstrate the interleaving of two bufferings of a single stream | |
392 | |
393 l = [0] | |
394 def f(a): | |
395 print l | |
396 l[0] += a | |
397 return l[0] | |
398 | |
399 print WEAVE(1, [ | |
400 BUFFER_REPEAT(3,CALL(f,1)), | |
401 BUFFER_REPEAT(5,CALL(f,1)), | |
402 ]).run() | |
403 | |
404 def main_weave_popen(): | |
405 # Uses weave and Popen to demonstrate the control of a program with some asynchronous | |
406 # parallelism | |
407 | |
408 p = WEAVE(2,[ | |
409 SEQ([POPEN(['sleep', '5']), PRINT('done 1')]), | |
410 SEQ([POPEN(['sleep', '10']), PRINT('done 2')]), | |
411 LOOP([ | |
412 CALL(print_obj, 'polling...'), | |
413 CALL(time.sleep, 1)])]) | |
414 # The LOOP would forever if the WEAVE were not configured to stop after 2 of its elements | |
415 # complete. | |
416 | |
417 p.run() | |
418 # Note that the program can be run multiple times... | |
419 p.run() | |
420 | |
421 main = main_weave_popen | |
422 def main_kfold_dbn(): | |
423 # Uses many of the control-flow elements to define the k-fold evaluation of a dbn | |
424 # The algorithm is not quite right, but the example shows off all of the required | |
425 # control-flow elements I think. | |
426 | |
427 # create components | |
428 dataset = Dataset(numpy.random.RandomState(123).randn(13,1)) | |
429 pca = PCA_Analysis() | |
430 layer1 = Layer(w=4) | |
431 layer2 = Layer(w=3) | |
432 kf = KFold(dataset, K=10) | |
433 | |
434 pca_batchsize=1000 | |
435 cd_batchsize = 5 | |
436 n_cd_updates_layer1 = 10 | |
437 n_cd_updates_layer2 = 10 | |
438 | |
439 # create algorithm | |
440 | |
441 train_pca = SEQ([ | |
442 BUFFER_REPEAT(pca_batchsize, CALL(kf.next)), | |
443 FILT(pca.analyze)]) | |
444 | |
445 train_layer1 = REPEAT(n_cd_updates_layer1, [ | |
446 BUFFER_REPEAT(cd_batchsize, CALL(kf.next)), | |
447 FILT(pca.filt), | |
448 FILT(cd1_update, layer=layer1, lr=.01)]) | |
449 | |
450 train_layer2 = REPEAT(n_cd_updates_layer2, [ | |
451 BUFFER_REPEAT(cd_batchsize, CALL(kf.next)), | |
452 FILT(pca.filt), | |
453 FILT(layer1.filt), | |
454 FILT(cd1_update, layer=layer2, lr=.01)]) | |
455 | |
456 kfold_prog = kf.prog( | |
457 clear = SEQ([ # FRAGMENT 1: this bit is the reset/clear stage | |
458 CALL(pca.clear), | |
459 CALL(layer1.clear), | |
460 CALL(layer2.clear), | |
461 ]), | |
462 train = SEQ([ | |
463 train_pca, | |
464 WEAVE(1, [ # Silly example of how to do debugging / loggin with WEAVE | |
465 train_layer1, | |
466 LOOP(CALL(print_obj_attr, layer1, 'w'))]), | |
467 train_layer2, | |
468 ]), | |
469 test=SEQ([ | |
470 FILT(pca.filt), # may want to allow this SEQ to be | |
471 FILT(layer1.filt), # optimized into a shorter one that | |
472 FILT(layer2.filt), # compiles these calls together with | |
473 FILT(numpy.mean)])) # Theano | |
474 | |
475 pkg1 = dict(prog=kfold_prog, kf=kf) | |
476 pkg2 = copy.deepcopy(pkg1) # programs can be copied | |
477 | |
478 try: | |
479 pkg3 = cPickle.loads(cPickle.dumps(pkg1)) | |
480 except: | |
481 print >> sys.stderr, "pickling doesnt work, but it can be fixed I think" | |
482 | |
483 pkg = pkg2 | |
484 | |
485 # running a program updates the variables in its package, but not the other package | |
486 pkg['prog'].run() | |
487 print pkg['kf'].scores | |
488 | |
489 | |
490 if __name__ == '__main__': | |
491 sys.exit(main()) | |
492 |