Mercurial > pylearn
comparison doc/v2_planning/arch_src/plugin_JB.py @ 1212:478bb1f8215c
plugin_JB - added SPAWN control element and demo program
author | James Bergstra <bergstrj@iro.umontreal.ca> |
---|---|
date | Wed, 22 Sep 2010 01:37:55 -0400 |
parents | |
children | 9fac28d80fb7 |
comparison
equal
deleted
inserted
replaced
1211:e7ac87720fee | 1212:478bb1f8215c |
---|---|
1 """plugin_JB - draft of potential library architecture using iterators | |
2 | |
3 This strategy makes use of a simple imperative language whose statements are python function | |
4 calls to create learning algorithms that can be manipulated and executed in several desirable | |
5 ways. | |
6 | |
7 The training procedure for a PCA module is easy to express: | |
8 | |
9 # allocate the relevant modules | |
10 dataset = Dataset(numpy.random.RandomState(123).randn(13,1)) | |
11 pca = PCA_Analysis() | |
12 pca_batchsize=1000 | |
13 | |
14 # define the control-flow of the algorithm | |
15 train_pca = SEQ([ | |
16 BUFFER_REPEAT(pca_batchsize, CALL(dataset.next)), | |
17 FILT(pca.analyze)]) | |
18 | |
19 # run the program | |
20 train_pca.run() | |
21 | |
22 The CALL, SEQ, FILT, and BUFFER_REPEAT are control-flow elements. The control-flow elements I | |
23 defined so far are: | |
24 | |
25 - CALL - a basic statement, just calls a python function | |
26 - FILT - like call, but passes the return value of the last CALL or FILT to the python function | |
27 - SEQ - a sequence of elements to run in order | |
28 - REPEAT - do something N times (and return None or maybe the last CALL?) | |
29 - BUFFER_REPEAT - do something N times and accumulate the return value from each iter | |
30 - LOOP - do something an infinite number of times | |
31 - CHOOSE - like a switch statement (should rename to SWITCH) | |
32 - WEAVE - interleave execution of multiple control-flow elements | |
33 - POPEN - launch a process and return its status when it's complete | |
34 - PRINT - a shortcut for CALL(print_obj) | |
35 | |
36 | |
37 We don't have many requirements per-se for the architecture, but I think this design respects | |
38 and realizes all of them. | |
39 The advantages of this approach are: | |
40 | |
41 - algorithms (including partially run ones) are COPYABLE, and SERIALIZABLE | |
42 | |
43 - algorithms can be executed without seizing control of the python process (the run() | |
44 method does this, but if you look inside it you'll see it's a simple for loop) | |
45 | |
46 - it is easy to execute an algorithm step by step in a main loop that also checks for | |
47 network or filesystem events related to e.g. job management. | |
48 | |
49 - the library can provide learning algorithms via control-flow templates, and the user can | |
50 edit them (with search/replace calls) to include HOOKS, and DIAGNOSTIC plug-in | |
51 functionality | |
52 | |
53 e.g. prog.find(CALL(cd1_update, layer=layer1)).replace_with( | |
54 SEQ([CALL(cd1_update, layer=layer1), CALL(my_debugfn)])) | |
55 | |
56 - user can print the 'program code' of an algorithm built from library pieces | |
57 | |
58 - program can be optimized automatically. | |
59 | |
60 - e.g. BUFFER(N, CALL(dataset.next)) could be replaced if dataset.next implements the | |
61 right attribute/protocol for 'bufferable' or something. | |
62 | |
63 - e.g. SEQ([a,b,c,d]) could be compiled to a single CALL to a Theano-compiled function | |
64 if a, b, c, and d are calls to callable objects that export something like a | |
65 'theano_SEQ' interface | |
66 | |
67 | |
68 """ | |
69 | |
70 __license__ = 'TODO' | |
71 __copyright__ = 'TODO' | |
72 | |
73 import cPickle, copy, os, subprocess, sys, time | |
74 import numpy | |
75 | |
76 #################################################### | |
77 # CONTROL-FLOW CONSTRUCTS | |
78 | |
79 class INCOMPLETE: | |
80 """Return value for Element.step""" | |
81 | |
82 class ELEMENT(object): | |
83 """ | |
84 Base class for control flow elements (e.g. CALL, REPEAT, etc.) | |
85 | |
86 The design is that every element has a driver, that is another element, or the iterator | |
87 implementation in the ELEMENT class. | |
88 | |
89 the driver calls start when entering a new control element | |
90 - this would be called once per e.g. outer loop iteration | |
91 | |
92 the driver calls step to advance the control element | |
93 - which returns INCOMPLETE | |
94 - which returns any other object to indicate completion | |
95 """ | |
96 | |
97 # subclasses should override these methods: | |
98 def start(self, arg): | |
99 pass | |
100 def step(self): | |
101 pass | |
102 | |
103 # subclasses should typically not override these: | |
104 def run(self, arg=None, n_steps=float('inf')): | |
105 self.start(arg) | |
106 i = 0 | |
107 r = self.step() | |
108 while r is INCOMPLETE: | |
109 i += 1 | |
110 #TODO make sure there is not an off-by-one error | |
111 if i > n_steps: | |
112 break | |
113 r = self.step() | |
114 return r | |
115 | |
116 class BUFFER_REPEAT(ELEMENT): | |
117 """ | |
118 Accumulate a number of return values into one list / array. | |
119 | |
120 The source of return values `src` is a control element that will be restarted repeatedly in | |
121 order to fulfil the requiement of gathering N samples. | |
122 | |
123 TODO: support accumulating of tuples of arrays | |
124 """ | |
125 def __init__(self, N, src, storage=None): | |
126 """ | |
127 TODO: use preallocated `storage` | |
128 """ | |
129 self.N = N | |
130 self.n = 0 | |
131 self.src = src | |
132 self.storage = storage | |
133 self.src.start(None) | |
134 if self.storage != None: | |
135 raise NotImplementedError() | |
136 def start(self, arg): | |
137 self.buf = [None] * self.N | |
138 self.n = 0 | |
139 self.finished = False | |
140 def step(self): | |
141 assert not self.finished | |
142 r = self.src.step() | |
143 if r is INCOMPLETE: | |
144 return r | |
145 self.src.start(None) # restart our stream | |
146 self.buf[self.n] = r | |
147 self.n += 1 | |
148 if self.n == self.N: | |
149 self.finished = True | |
150 return self.buf | |
151 else: | |
152 return INCOMPLETE | |
153 assert 0 | |
154 | |
155 class CALL(ELEMENT): | |
156 """ | |
157 Control flow terminal - call a python function or method. | |
158 | |
159 Returns the return value of the call. | |
160 """ | |
161 def __init__(self, fn, *args, **kwargs): | |
162 self.fn = fn | |
163 self.args = args | |
164 self.kwargs=kwargs | |
165 self.use_start_arg = kwargs.pop('use_start_arg', False) | |
166 def start(self, arg): | |
167 self.start_arg = arg | |
168 self.finished = False | |
169 return self | |
170 def step(self): | |
171 assert not self.finished | |
172 self.finished = True | |
173 if self.use_start_arg: | |
174 if self.args: | |
175 raise TypeError('cant get positional args both ways') | |
176 return self.fn(self.start_arg, **self.kwargs) | |
177 else: | |
178 return self.fn(*self.args, **self.kwargs) | |
179 def __getstate__(self): | |
180 rval = dict(self.__dict__) | |
181 if type(self.fn) is type(self.step): #instancemethod | |
182 fn = rval.pop('fn') | |
183 rval['i fn'] = fn.im_func, fn.im_self, fn.im_class | |
184 return rval | |
185 def __setstate__(self, dct): | |
186 if 'i fn' in dct: | |
187 dct['fn'] = type(self.step)(*dct.pop('i fn')) | |
188 self.__dict__.update(dct) | |
189 | |
190 def FILT(fn, **kwargs): | |
191 """ | |
192 Return a CALL object that uses the return value from the previous CALL as the first and | |
193 only positional argument. | |
194 """ | |
195 return CALL(fn, use_start_arg=True, **kwargs) | |
196 | |
197 def CHOOSE(which, options): | |
198 """ | |
199 Execute one out of a number of optional control flow paths | |
200 """ | |
201 raise NotImplementedError() | |
202 | |
203 def LOOP(elements): | |
204 #TODO: implement a true infinite loop | |
205 try: | |
206 iter(elements) | |
207 return REPEAT(sys.maxint, elements) | |
208 except TypeError: | |
209 return REPEAT(sys.maxint, [elements]) | |
210 | |
211 class REPEAT(ELEMENT): | |
212 def __init__(self, N, elements, pass_rvals=False): | |
213 self.N = N | |
214 self.elements = elements | |
215 self.pass_rvals = pass_rvals | |
216 | |
217 #TODO: check for N being callable | |
218 def start(self, arg): | |
219 self.n = 0 #loop iteration | |
220 self.idx = 0 #element idx | |
221 self.finished = False | |
222 self.elements[0].start(arg) | |
223 def step(self): | |
224 assert not self.finished | |
225 r = self.elements[self.idx].step() | |
226 if r is INCOMPLETE: | |
227 return INCOMPLETE | |
228 self.idx += 1 | |
229 if self.idx < len(self.elements): | |
230 self.elements[self.idx].start(r) | |
231 return INCOMPLETE | |
232 self.n += 1 | |
233 if self.n < self.N: | |
234 self.idx = 0 | |
235 self.elements[self.idx].start(r) | |
236 return INCOMPLETE | |
237 else: | |
238 self.finished = True | |
239 return r | |
240 | |
241 def SEQ(elements): | |
242 return REPEAT(1, elements) | |
243 | |
244 class WEAVE(ELEMENT): | |
245 """ | |
246 Interleave execution of a number of elements. | |
247 | |
248 TODO: allow a schedule (at least relative frequency) of elements from each program | |
249 """ | |
250 def __init__(self, n_required, elements): | |
251 self.elements = elements | |
252 if n_required == -1: | |
253 self.n_required = len(elements) | |
254 else: | |
255 self.n_required = n_required | |
256 def start(self, arg): | |
257 for el in self.elements: | |
258 el.start(arg) | |
259 self.elem_finished = [0] * len(self.elements) | |
260 self.idx = 0 | |
261 self.finished= False | |
262 def step(self): | |
263 assert not self.finished # if this is triggered, we have a broken driver | |
264 | |
265 #start with this check in case there were no elements | |
266 # it's possible for the number of finished elements to exceed the threshold | |
267 if sum(self.elem_finished) >= self.n_required: | |
268 self.finished = True | |
269 return None | |
270 | |
271 # step the active element | |
272 r = self.elements[self.idx].step() | |
273 | |
274 if r is not INCOMPLETE: | |
275 self.elem_finished[self.idx] = True | |
276 | |
277 # check for completion | |
278 if sum(self.elem_finished) >= self.n_required: | |
279 self.finished = True | |
280 return None | |
281 | |
282 # advance to the next un-finished element | |
283 self.idx = (self.idx+1) % len(self.elements) | |
284 while self.elem_finished[self.idx]: | |
285 self.idx = (self.idx+1) % len(self.elements) | |
286 | |
287 return INCOMPLETE | |
288 | |
289 class POPEN(ELEMENT): | |
290 def __init__(self, args): | |
291 self.args = args | |
292 def start(self, arg): | |
293 self.p = subprocess.Popen(self.args) | |
294 def step(self): | |
295 r = self.p.poll() | |
296 if r is None: | |
297 return INCOMPLETE | |
298 return r | |
299 | |
300 def PRINT(obj): | |
301 return CALL(print_obj, obj) | |
302 | |
303 class SPAWN(ELEMENT): | |
304 SUCCESS = 0 | |
305 def __init__(self, data, prog): | |
306 self.data = data | |
307 self.prog = prog | |
308 def start(self, arg): | |
309 # pickle the (data, prog) pair | |
310 s = cPickle.dumps((self.data, self.prog)) | |
311 | |
312 # call python with a stub function that | |
313 # unpickles the data, prog pair and starts running the prog | |
314 self.rpipe, wpipe = os.pipe() | |
315 code = 'import sys, plugin_JB; sys.exit(plugin_JB.SPAWN._main(%i))'%wpipe | |
316 self.p = subprocess.Popen( | |
317 ['python', '-c', code], | |
318 stdin=subprocess.PIPE) | |
319 # send the data and prog to the other process | |
320 self.p.stdin.write(s) | |
321 self.finished= False | |
322 | |
323 #TODO: send over tgz of the modules this code needs | |
324 | |
325 #TODO: When the client process is on a different machine, negotiate with the client | |
326 # process to determine which modules it needs, and send over the code for pure python | |
327 # ones. Make sure versions match for non-pure python ones. | |
328 | |
329 def step(self): | |
330 assert not self.finished | |
331 r = self.p.poll() | |
332 if r is None: | |
333 return INCOMPLETE # typical exit case | |
334 self.finished = True | |
335 if r != self.SUCCESS: | |
336 print "UH OH", r # TODO - ??? | |
337 rfile = os.fdopen(self.rpipe) | |
338 # recv the revised of the data dictionary | |
339 data = cPickle.load(rfile) | |
340 # modify the data dict in-place | |
341 # for new values to be visible to other components | |
342 self.data.update(data) | |
343 rfile.close() | |
344 #TODO: return something meaningful? like r? | |
345 return None | |
346 | |
347 @staticmethod | |
348 def _main(wpipe): | |
349 #TODO: unpack and install tgz of the modules this code needs | |
350 data, prog = cPickle.load(sys.stdin) | |
351 rval = prog.run() | |
352 os.write(wpipe, cPickle.dumps(data)) | |
353 return SPAWN.SUCCESS | |
354 #os.close(wpipe) | |
355 | |
356 | |
357 def print_obj(obj): | |
358 print obj | |
359 def print_obj_attr(obj, attr): | |
360 print getattr(obj, attr) | |
361 def no_op(*args, **kwargs): | |
362 pass | |
363 | |
364 def importable_fn(d): | |
365 d['new key'] = len(d) | |
366 |