comparison doc/v2_planning/plugin_greenlet.py @ 1195:d3ee0d2d03e6

plugin_greenlet draft0
author James Bergstra <bergstrj@iro.umontreal.ca>
date Sun, 19 Sep 2010 13:05:48 -0400
parents
children e9bb3340a870
comparison
equal deleted inserted replaced
1194:25d324ab372f 1195:d3ee0d2d03e6
1 """plugin_greenlet - draft of library architecture using greenlets"""
2
3 __license__ = None
4 __copyright__ = None
5
6 import copy, sys
7
8 import numpy
9 from greenlet import greenlet
10
11 def vm_unpack(incoming):
12 # can't reliably distinguish between a kwargs-only switch and a switch with one dict
13 # argument
14 if incoming is None:
15 rval = (), {}
16 if isinstance(incoming, dict):
17 rval = (), incoming
18 elif isinstance(incoming, tuple):
19 if (len(incoming)==2
20 and isinstance(incoming[0], tuple)
21 and isinstance(incoming[1], dict)):
22 rval = incoming
23 else:
24 rval = incoming, {}
25 else:
26 rval = (incoming,), {}
27 #print 'unpack', incoming, rval
28 return rval[0][0], rval[0][1], rval[0][2:], rval[1]
29
30 def unpack_from_vm(incoming):
31 assert isinstance(incoming, tuple)
32 assert len(incoming)==4
33 return incoming
34
35 def vm_run(prog, *args, **kwargs):
36
37 def vm_loop(gr, dest, a, kw):
38 while True:
39 if gr == 'return':
40 return a, kw
41 print 'vm_loop gr=',gr,'args=',a, 'kwargs=', kw
42 gr, dest, a, kw = gr.switch(vm, gr, dest, a, kw)
43 #print 'gmain incoming', incoming
44 vm = greenlet(vm_loop)
45
46 return vm.switch(prog, 'return', args, kwargs)
47
48
49 def seq(glets):
50 return repeat(1, glets)
51
52 def repeat(N, glets):
53 def repeat_task(vm, gself, dest, args, kwargs):
54 while True:
55 for i in xrange(N):
56 for glet in glets:
57 print 'repeat_task_i dest=%(dest)s args=%(args)s, kw=%(kwargs)s'%locals()
58 # jump to task `glet`
59 # with instructions to report results back to this loop `g`
60 _vm, _gself, _dest, args, kwargs = vm.switch(glet, gself, args, kwargs)
61 assert _gself is gself
62 assert _dest is None # instructions can't tell us where to jump
63 vm, gself, dest, args, kwargs = vm.switch(dest, None, args, kwargs)
64 return greenlet(repeat_task)
65
66 def choose(which, options):
67 raise NotImplementedError()
68
69 def weave(threads):
70 raise NotImplementedError()
71
72 def service(fn):
73 """
74 Create a greenlet whose first argument is the return-jump location.
75
76 fn must accept as the first positional argument this greenlet itself, which can be used as
77 the return-jump location for internal greenlet switches (ideally using gswitch).
78 """
79 def service_loop(vm, gself, dest, args, kwargs):
80 while True:
81 print 'service calling', fn.__name__, args, kwargs
82 t = fn(vm, gself, *args, **kwargs)
83 #TODO consider a protocol for returning args, kwargs
84 if t is None:
85 _vm,_gself,dest, args, kwargs = vm.switch(dest, None, (), {})
86 else:
87 _vm,_gself,dest, args, kwargs = vm.switch(dest, None, (t,), {})
88
89 assert gself is _gself
90 return greenlet(service_loop)
91
92 ####################################################
93
94 class Dataset(object):
95 def __init__(self, data):
96 self.pos = 0
97 self.data = data
98 def next(self, vm, gself):
99 rval = self.data[self.pos]
100 self.pos += 1
101 if self.pos == len(self.data):
102 self.pos = 0
103 return rval
104
105 class PCA_Analysis(object):
106 def __init__(self):
107 self.mean = 0
108 self.eigvecs=0
109 self.eigvals=0
110 def analyze(self, me, X):
111 self.mean = X.mean(axis=0)
112 self.eigvecs=1
113 self.eigvals=1
114 def filt(self,me, X):
115 return (self.X - self.mean) * self.eigvecs #TODO: divide by root eigvals?
116 def pseudo_inverse(self, Y):
117 return Y
118
119 class Layer(object):
120 def __init__(self, w):
121 self.w = w
122 def filt(self, x):
123 return self.w*x
124
125 def batches(src, N):
126 # src is a service
127 def rval(me):
128 print 'batches src=', src, 'me=', me
129 return numpy.asarray([gswitch(src, me)[0][0] for i in range(N)])
130 return rval
131
132 def print_obj(vm, gself, obj):
133 print obj
134 def no_op(*args, **kwargs):
135 pass
136
137 def build_pca_trainer(data_src, pca_module, N):
138 return greenlet(
139 batches(
140 N=5,
141 src=inf_data,
142 dest=flow(pca_module.analyze,
143 dest=layer1_trainer)))
144
145 def main():
146 dataset = Dataset(numpy.random.RandomState(123).randn(10,2))
147
148 prog=repeat(3, [service(dataset.next),service(print_obj)])
149 vm_run(prog)
150 vm_run(prog)
151
152
153 def main_arch():
154
155 # create components
156 dataset = Dataset(numpy.random.RandomState(123).randn(10,2))
157 pca_module = PCA_Analysis()
158 layer1 = Layer(w=4)
159 layer2 = Layer(w=3)
160 kf = KFold(dataset, K=10)
161
162 # create algorithm
163
164 train_pca = seq([ np_batch(kf.next, 1000), pca.analyze])
165 train_layer1 = repeat(100, [kf.next, pca.filt, cd1_update(layer1, lr=.01)])
166
167 algo = repeat(10, [
168 KFold.step,
169 seq([train_pca,
170 train_layer1,
171 train_layer2,
172 train_classifier,
173 save_classifier,
174 test_classifier]),
175 KFold.set_score])
176
177 gswitch(algo)
178
179
180 def main1():
181 dataset = Dataset(numpy.random.RandomState(123).randn(10,2))
182 pca_module = PCA_Analysis()
183
184 # pca
185 next_data = service(dataset.next)
186 b5 = service(batches(src=next_data, N=5))
187 print_pca_analyze = flow(pca_module.analyze, dest=sink(print_obj))
188
189 # layer1_training
190 layer1_training = driver(
191 fn=cd1_trainer(layer1),
192 srcs=[],
193 )
194
195 gswitch(b5, print_pca_analyze)
196
197 if __name__ == '__main__':
198 sys.exit(main())
199
200
201
202 def flow(fn, dest):
203 def rval(*args, **kwargs):
204 while True:
205 print 'flow calling', fn.__name__, args, kwargs
206 t = fn(g, *args, **kwargs)
207 args, kwargs = gswitch(dest, t)
208 g = greenlet(rval)
209 return g
210
211 def sink(fn):
212 def rval(*args, **kwargs):
213 return fn(g, *args, **kwargs)
214 g = greenlet(rval)
215 return g
216
217 def consumer(fn, src):
218 def rval(*args, **kwargs):
219 while True:
220 fn(gswitch(src, *args, **kwargs))
221 return greenlet(rval)