comparison doc/v2_planning/plugin_JB.py @ 1212:478bb1f8215c

plugin_JB - added SPAWN control element and demo program
author James Bergstra <bergstrj@iro.umontreal.ca>
date Wed, 22 Sep 2010 01:37:55 -0400
parents e7ac87720fee
children
comparison
equal deleted inserted replaced
1211:e7ac87720fee 1212:478bb1f8215c
1 """plugin_JB - draft of potential library architecture using iterators 1 print "Moved to ./arch_src/plugin_JB.py"
2
3 This strategy makes use of a simple imperative language whose statements are python function
4 calls to create learning algorithms that can be manipulated and executed in several desirable
5 ways.
6
7 The training procedure for a PCA module is easy to express:
8
9 # allocate the relevant modules
10 dataset = Dataset(numpy.random.RandomState(123).randn(13,1))
11 pca = PCA_Analysis()
12 pca_batchsize=1000
13
14 # define the control-flow of the algorithm
15 train_pca = SEQ([
16 BUFFER_REPEAT(pca_batchsize, CALL(dataset.next)),
17 FILT(pca.analyze)])
18
19 # run the program
20 train_pca.run()
21
22 The CALL, SEQ, FILT, and BUFFER_REPEAT are control-flow elements. The control-flow elements I
23 defined so far are:
24
25 - CALL - a basic statement, just calls a python function
26 - FILT - like call, but passes the return value of the last CALL or FILT to the python function
27 - SEQ - a sequence of elements to run in order
28 - REPEAT - do something N times (and return None or maybe the last CALL?)
29 - BUFFER_REPEAT - do something N times and accumulate the return value from each iter
30 - LOOP - do something an infinite number of times
31 - CHOOSE - like a switch statement (should rename to SWITCH)
32 - WEAVE - interleave execution of multiple control-flow elements
33 - POPEN - launch a process and return its status when it's complete
34 - PRINT - a shortcut for CALL(print_obj)
35
36
37 We don't have many requirements per-se for the architecture, but I think this design respects
38 and realizes all of them.
39 The advantages of this approach are:
40
41 - algorithms (including partially run ones) are COPYABLE, and SERIALIZABLE
42
43 - algorithms can be executed without seizing control of the python process (the run()
44 method does this, but if you look inside it you'll see it's a simple for loop)
45
46 - it is easy to execute an algorithm step by step in a main loop that also checks for
47 network or filesystem events related to e.g. job management.
48
49 - the library can provide learning algorithms via control-flow templates, and the user can
50 edit them (with search/replace calls) to include HOOKS, and DIAGNOSTIC plug-in
51 functionality
52
53 e.g. prog.find(CALL(cd1_update, layer=layer1)).replace_with(
54 SEQ([CALL(cd1_update, layer=layer1), CALL(my_debugfn)]))
55
56 - user can print the 'program code' of an algorithm built from library pieces
57
58 - program can be optimized automatically.
59
60 - e.g. BUFFER(N, CALL(dataset.next)) could be replaced if dataset.next implements the
61 right attribute/protocol for 'bufferable' or something.
62
63 - e.g. SEQ([a,b,c,d]) could be compiled to a single CALL to a Theano-compiled function
64 if a, b, c, and d are calls to callable objects that export something like a
65 'theano_SEQ' interface
66
67
68 """
69
70 __license__ = 'TODO'
71 __copyright__ = 'TODO'
72
73 import cPickle, copy, subprocess, sys, time
74 import numpy
75
76 ####################################################
77 # CONTROL-FLOW CONSTRUCTS
78
79 class INCOMPLETE:
80 """Return value for Element.step"""
81
82 class ELEMENT(object):
83 """
84 Base class for control flow elements (e.g. CALL, REPEAT, etc.)
85
86 The design is that every element has a driver, that is another element, or the iterator
87 implementation in the ELEMENT class.
88
89 the driver calls start when entering a new control element
90 - this would be called once per e.g. outer loop iteration
91
92 the driver calls step to advance the control element
93 - which returns INCOMPLETE
94 - which returns any other object to indicate completion
95 """
96
97 # subclasses should override these methods:
98 def start(self, arg):
99 pass
100 def step(self):
101 pass
102
103 # subclasses should typically not override these:
104 def run(self, arg=None, n_steps=float('inf')):
105 self.start(arg)
106 i = 0
107 r = self.step()
108 while r is INCOMPLETE:
109 i += 1
110 #TODO make sure there is not an off-by-one error
111 if i > n_steps:
112 break
113 r = self.step()
114 return r
115
116 class BUFFER_REPEAT(ELEMENT):
117 """
118 Accumulate a number of return values into one list / array.
119
120 The source of return values `src` is a control element that will be restarted repeatedly in
121 order to fulfil the requiement of gathering N samples.
122
123 TODO: support accumulating of tuples of arrays
124 """
125 def __init__(self, N, src, storage=None):
126 """
127 TODO: use preallocated `storage`
128 """
129 self.N = N
130 self.n = 0
131 self.src = src
132 self.storage = storage
133 self.src.start(None)
134 if self.storage != None:
135 raise NotImplementedError()
136 def start(self, arg):
137 self.buf = [None] * self.N
138 self.n = 0
139 self.finished = False
140 def step(self):
141 assert not self.finished
142 r = self.src.step()
143 if r is INCOMPLETE:
144 return r
145 self.src.start(None) # restart our stream
146 self.buf[self.n] = r
147 self.n += 1
148 if self.n == self.N:
149 self.finished = True
150 return self.buf
151 else:
152 return INCOMPLETE
153 assert 0
154
155 class CALL(ELEMENT):
156 """
157 Control flow terminal - call a python function or method.
158
159 Returns the return value of the call.
160 """
161 def __init__(self, fn, *args, **kwargs):
162 self.fn = fn
163 self.args = args
164 self.kwargs=kwargs
165 self.use_start_arg = kwargs.pop('use_start_arg', False)
166 def start(self, arg):
167 self.start_arg = arg
168 self.finished = False
169 return self
170 def step(self):
171 assert not self.finished
172 self.finished = True
173 if self.use_start_arg:
174 if self.args:
175 raise TypeError('cant get positional args both ways')
176 return self.fn(self.start_arg, **self.kwargs)
177 else:
178 return self.fn(*self.args, **self.kwargs)
179 def __getstate__(self):
180 rval = dict(self.__dict__)
181 if type(self.fn) is type(self.step): #instancemethod
182 fn = rval.pop('fn')
183 rval['i fn'] = fn.im_func, fn.im_self, fn.im_class
184 return rval
185 def __setstate__(self, dct):
186 if 'i fn' in dct:
187 dct['fn'] = type(self.step)(*dct.pop('i fn'))
188 self.__dict__.update(dct)
189
190 def FILT(fn, **kwargs):
191 """
192 Return a CALL object that uses the return value from the previous CALL as the first and
193 only positional argument.
194 """
195 return CALL(fn, use_start_arg=True, **kwargs)
196
197 def CHOOSE(which, options):
198 """
199 Execute one out of a number of optional control flow paths
200 """
201 raise NotImplementedError()
202
203 def LOOP(elements):
204 #TODO: implement a true infinite loop
205 try:
206 iter(elements)
207 return REPEAT(sys.maxint, elements)
208 except TypeError:
209 return REPEAT(sys.maxint, [elements])
210
211 class REPEAT(ELEMENT):
212 def __init__(self, N, elements, pass_rvals=False):
213 self.N = N
214 self.elements = elements
215 self.pass_rvals = pass_rvals
216
217 #TODO: check for N being callable
218 def start(self, arg):
219 self.n = 0 #loop iteration
220 self.idx = 0 #element idx
221 self.finished = False
222 self.elements[0].start(arg)
223 def step(self):
224 assert not self.finished
225 r = self.elements[self.idx].step()
226 if r is INCOMPLETE:
227 return INCOMPLETE
228 self.idx += 1
229 if self.idx < len(self.elements):
230 self.elements[self.idx].start(r)
231 return INCOMPLETE
232 self.n += 1
233 if self.n < self.N:
234 self.idx = 0
235 self.elements[self.idx].start(r)
236 return INCOMPLETE
237 else:
238 self.finished = True
239 return r
240
241 def SEQ(elements):
242 return REPEAT(1, elements)
243
244 class WEAVE(ELEMENT):
245 """
246 Interleave execution of a number of elements.
247
248 TODO: allow a schedule (at least relative frequency) of elements from each program
249 """
250 def __init__(self, n_required, elements):
251 self.elements = elements
252 if n_required == -1:
253 self.n_required = len(elements)
254 else:
255 self.n_required = n_required
256 def start(self, arg):
257 for el in self.elements:
258 el.start(arg)
259 self.elem_finished = [0] * len(self.elements)
260 self.idx = 0
261 self.finished= False
262 def step(self):
263 assert not self.finished # if this is triggered, we have a broken driver
264
265 #start with this check in case there were no elements
266 # it's possible for the number of finished elements to exceed the threshold
267 if sum(self.elem_finished) >= self.n_required:
268 self.finished = True
269 return None
270
271 # step the active element
272 r = self.elements[self.idx].step()
273
274 if r is not INCOMPLETE:
275 self.elem_finished[self.idx] = True
276
277 # check for completion
278 if sum(self.elem_finished) >= self.n_required:
279 self.finished = True
280 return None
281
282 # advance to the next un-finished element
283 self.idx = (self.idx+1) % len(self.elements)
284 while self.elem_finished[self.idx]:
285 self.idx = (self.idx+1) % len(self.elements)
286
287 return INCOMPLETE
288
289 class POPEN(ELEMENT):
290 def __init__(self, args):
291 self.args = args
292 def start(self, arg):
293 self.p = subprocess.Popen(self.args)
294 def step(self):
295 r = self.p.poll()
296 if r is None:
297 return INCOMPLETE
298 return r
299
300 def PRINT(obj):
301 return CALL(print_obj, obj)
302
303 ####################################################
304 # [Dummy] Components involved in learning algorithms
305
306 class Dataset(object):
307 def __init__(self, data):
308 self.pos = 0
309 self.data = data
310 def next(self):
311 rval = self.data[self.pos]
312 self.pos += 1
313 if self.pos == len(self.data):
314 self.pos = 0
315 return rval
316 def seek(self, pos):
317 self.pos = pos
318
319 class KFold(object):
320 def __init__(self, data, K):
321 self.data = data
322 self.k = -1
323 self.scores = [None]*K
324 self.K = K
325 def next_fold(self):
326 self.k += 1
327 self.data.seek(0) # restart the stream
328 def next(self):
329 #TODO: skip the examples that are ommitted in this split
330 return self.data.next()
331 def init_test(self):
332 pass
333 def next_test(self):
334 return self.data.next()
335 def test_size(self):
336 return 5
337 def store_scores(self, scores):
338 self.scores[self.k] = scores
339
340 def prog(self, clear, train, test):
341 return REPEAT(self.K, [
342 CALL(self.next_fold),
343 clear,
344 train,
345 CALL(self.init_test),
346 BUFFER_REPEAT(self.test_size(),
347 SEQ([ CALL(self.next_test), test])),
348 FILT(self.store_scores) ])
349
350 class PCA_Analysis(object):
351 def __init__(self):
352 self.clear()
353
354 def clear(self):
355 self.mean = 0
356 self.eigvecs=0
357 self.eigvals=0
358 def analyze(self, X):
359 self.mean = numpy.mean(X, axis=0)
360 self.eigvecs=1
361 self.eigvals=1
362 def filt(self, X):
363 return (X - self.mean) * self.eigvecs #TODO: divide by root eigvals?
364 def pseudo_inverse(self, Y):
365 return Y
366
367 class Layer(object):
368 def __init__(self, w):
369 self.w = w
370 def filt(self, x):
371 return self.w*x
372 def clear(self):
373 self.w =0
374
375 def print_obj(obj):
376 print obj
377 def print_obj_attr(obj, attr):
378 print getattr(obj, attr)
379 def no_op(*args, **kwargs):
380 pass
381
382 def cd1_update(X, layer, lr):
383 # update self.layer from observation X
384 layer.w += X.mean() * lr #TODO: not exactly correct math!
385
386
387 ###############################################################
388 # Example algorithms written in this control flow mini-language
389
390 def main_weave():
391 # Uses weave to demonstrate the interleaving of two bufferings of a single stream
392
393 l = [0]
394 def f(a):
395 print l
396 l[0] += a
397 return l[0]
398
399 print WEAVE(1, [
400 BUFFER_REPEAT(3,CALL(f,1)),
401 BUFFER_REPEAT(5,CALL(f,1)),
402 ]).run()
403
404 def main_weave_popen():
405 # Uses weave and Popen to demonstrate the control of a program with some asynchronous
406 # parallelism
407
408 p = WEAVE(2,[
409 SEQ([POPEN(['sleep', '5']), PRINT('done 1')]),
410 SEQ([POPEN(['sleep', '10']), PRINT('done 2')]),
411 LOOP([
412 CALL(print_obj, 'polling...'),
413 CALL(time.sleep, 1)])])
414 # The LOOP would forever if the WEAVE were not configured to stop after 2 of its elements
415 # complete.
416
417 p.run()
418 # Note that the program can be run multiple times...
419 p.run()
420
421 main = main_weave_popen
422 def main_kfold_dbn():
423 # Uses many of the control-flow elements to define the k-fold evaluation of a dbn
424 # The algorithm is not quite right, but the example shows off all of the required
425 # control-flow elements I think.
426
427 # create components
428 dataset = Dataset(numpy.random.RandomState(123).randn(13,1))
429 pca = PCA_Analysis()
430 layer1 = Layer(w=4)
431 layer2 = Layer(w=3)
432 kf = KFold(dataset, K=10)
433
434 pca_batchsize=1000
435 cd_batchsize = 5
436 n_cd_updates_layer1 = 10
437 n_cd_updates_layer2 = 10
438
439 # create algorithm
440
441 train_pca = SEQ([
442 BUFFER_REPEAT(pca_batchsize, CALL(kf.next)),
443 FILT(pca.analyze)])
444
445 train_layer1 = REPEAT(n_cd_updates_layer1, [
446 BUFFER_REPEAT(cd_batchsize, CALL(kf.next)),
447 FILT(pca.filt),
448 FILT(cd1_update, layer=layer1, lr=.01)])
449
450 train_layer2 = REPEAT(n_cd_updates_layer2, [
451 BUFFER_REPEAT(cd_batchsize, CALL(kf.next)),
452 FILT(pca.filt),
453 FILT(layer1.filt),
454 FILT(cd1_update, layer=layer2, lr=.01)])
455
456 kfold_prog = kf.prog(
457 clear = SEQ([ # FRAGMENT 1: this bit is the reset/clear stage
458 CALL(pca.clear),
459 CALL(layer1.clear),
460 CALL(layer2.clear),
461 ]),
462 train = SEQ([
463 train_pca,
464 WEAVE(1, [ # Silly example of how to do debugging / loggin with WEAVE
465 train_layer1,
466 LOOP(CALL(print_obj_attr, layer1, 'w'))]),
467 train_layer2,
468 ]),
469 test=SEQ([
470 FILT(pca.filt), # may want to allow this SEQ to be
471 FILT(layer1.filt), # optimized into a shorter one that
472 FILT(layer2.filt), # compiles these calls together with
473 FILT(numpy.mean)])) # Theano
474
475 pkg1 = dict(prog=kfold_prog, kf=kf)
476 pkg2 = copy.deepcopy(pkg1) # programs can be copied
477
478 try:
479 pkg3 = cPickle.loads(cPickle.dumps(pkg1))
480 except:
481 print >> sys.stderr, "pickling doesnt work, but it can be fixed I think"
482
483 pkg = pkg2
484
485 # running a program updates the variables in its package, but not the other package
486 pkg['prog'].run()
487 print pkg['kf'].scores
488
489
490 if __name__ == '__main__':
491 sys.exit(main())
492