1139
|
1 Overview
|
|
2 ========
|
|
3
|
|
4 The "central authority" (CA) is the glue which takes care of interfacing plugins
|
|
5 with one another. It has 3 basic roles:
|
|
6 * it maintains a list of "registered" or "active" plugins
|
|
7 * it receives and queues the various messages sent by the plugins
|
|
8 * dispatches the messages to the recipient, based on various "events"
|
|
9
|
|
10 Events can take different forms:
|
|
11 * the CA can trigger various events based on running time
|
|
12 * can be linked to messages emitted by the various plugins. Events can be
|
|
13 triggered based on the frequency of such messages.
|
|
14 * Once an event is triggered, it is relayed to the appropriate "recipient
|
|
15 plugin(s)"
|
|
16
|
|
17 It is the responsibility of each plugin to inform the CA of which "events" it
|
|
18 cares about.
|
|
19
|
|
20
|
|
21 Generic Pseudo-code
|
|
22 ===================
|
|
23
|
|
24 I'll try to write this in pseudo-python as best I can. I'll do this in
|
|
25 traditional OOP, as this is what I'm more comfortable with. I'll leave it up to
|
|
26 James and OB to python-ize this :)
|
|
27
|
|
28
|
|
29 class MessageX(Message):
|
|
30 """
|
|
31 A message is basically a data container. This could very well be replaced by
|
|
32 a generic Python object.
|
|
33 """
|
|
34
|
|
35 class Plugin(object):
|
|
36 """
|
|
37 The base plugin object doesn't do much. It contains a reference to the CA
|
|
38 (upon plugin being registered with the CA), provides boilerplate code
|
|
39 for storing which "events" this plugin is susceptible to, as well as code
|
|
40 for registering callback functions for the various messages.
|
|
41 """
|
|
42
|
|
43 CA = None # to be initialized upon plugin registration
|
|
44 active_msg = {} # dictionary of messages this plugin is susceptible to
|
|
45 callbacks = {} # mapping of message class names --> callback function
|
|
46
|
|
47 def listen(msg_class, interval):
|
|
48 """
|
|
49 :param msg_class: reference to the "message" class we are interested in.
|
|
50 These messages will be forwarded to this plugin, when
|
|
51 the trigger condition is met.
|
|
52 :param interval: integer. Forward the message to this plugin every 'interval'
|
|
53 such messages.
|
|
54 """
|
|
55 self.active_msg[msg_class] = interval
|
|
56
|
|
57
|
|
58 def check_trigger(msg_class, time):
|
|
59 """
|
|
60 Checks whether or not the "trigger" condition associated with message of
|
|
61 class 'msg_class' is satisfied or not. This could be the default
|
|
62 behavior, and be overridden by the various plugins.
|
|
63 """
|
|
64 return time % self.active_msg[msg_class] == 0
|
|
65
|
|
66
|
|
67 def handler(msg_class, callback):
|
|
68 """
|
|
69 Decorator which registers a callback function for the given message
|
|
70 type.
|
|
71
|
|
72 NOTE: I don't think what I wrote would work as a Python decorator. I am
|
|
73 not sure how to handle decoraters with multiple parameters (one
|
|
74 explicit, and the other as the reference to the function). I'm pretty
|
|
75 sure James or OB could figure it out though !
|
|
76
|
|
77 :params msg_class: reference to the message class for which we are
|
|
78 registering a callback function
|
|
79 :params callback : reference to which function to call for a given message
|
|
80 """
|
|
81
|
|
82 self.callbacks[msg_class] = callback
|
|
83
|
|
84
|
|
85 def execute(self, message):
|
|
86 """
|
|
87 Boiler-plate code which executes the right callback function, for the
|
|
88 given message type.
|
|
89 """
|
|
90 for (msg_class, callback) in self.callbacks.iteritems():
|
|
91 if message.__class__ == msg_class:
|
|
92 callback(message)
|
|
93
|
|
94
|
|
95 class ProducerPlugin(Plugin):
|
|
96
|
|
97 def dostuff():
|
|
98 """
|
|
99 A typical "producer" plugin. It basically performs an arbitrary action
|
|
100 and asks the CA to forward the results (in the form of a message) to
|
|
101 other plugins.
|
|
102 """
|
|
103
|
|
104 # iteratively do stuff and relay messages to other plugins
|
|
105 while(condition):
|
|
106
|
|
107 msga = # do something
|
|
108 ca.send(msga) # ask CA to forward to other plugins
|
|
109
|
|
110
|
|
111 class ConsumerPlugin(Plugin):
|
|
112
|
|
113 @handler(MessageA)
|
|
114 def func(msga):
|
|
115 """
|
|
116 A consumer or "passive plugin" (eg. logger, etc). This function is
|
|
117 register as being the callback function for Message A objects.
|
|
118 """
|
|
119 # do something with message A
|
|
120
|
|
121
|
|
122 class ConsumerProducerPlugin(Plugin):
|
|
123
|
|
124 @handler(MessageA)
|
|
125 def func(msga):
|
|
126 """
|
|
127 Example of a consumer / producer plugin. It receives MessageA messages,
|
|
128 processes the data, then asks the CA to send a new message (MessageB) as
|
|
129 the result of its computation. The CA will automatically forward to all
|
|
130 interested parties.
|
|
131
|
|
132 :param msga: MessageA instance
|
|
133 """
|
|
134
|
|
135 data = dostuff(msga) # process message
|
|
136 msgb = MessageB(data) # generate new message for other plugins
|
|
137 ca.send(msgb) # ask CA to forward to other plugins
|
|
138
|
|
139
|
|
140
|
|
141 class CentralAuthority(object):
|
|
142
|
|
143 active_plugins = [] # contains a list of registered plugins
|
|
144
|
|
145 mailmain = {} # dictionary which contains, for each message class, a
|
|
146 # list of plugins interested in this message
|
|
147
|
|
148 event_count = {} # dictionary of "event" counts for various messages
|
|
149
|
|
150 def register(plugin):
|
|
151 """
|
|
152 Registers the plugin and adds it as a listener for the various messages
|
|
153 it is interested in.
|
|
154 :param plugin: plugin instance which we want to "activate"
|
|
155 """
|
|
156
|
|
157 # each plugin must have a reference to the CA
|
|
158 plugin.ca = self
|
|
159
|
|
160 # maintain list of active plugins
|
|
161 active_plugins.append(plugin)
|
|
162
|
|
163 # remember which messages this plugin cares about
|
|
164 for msg in plugin.active_msg.keys():
|
|
165 self.mailman[msg].append(plugin)
|
|
166 self.event_count[msg] = 0
|
|
167
|
|
168 def send(msg):
|
|
169 """
|
|
170 This function relays the message to the appropriate plugins, based on
|
|
171 their "trigger" condition. It also keeps track of the number of times
|
|
172 this event was raised.
|
|
173
|
|
174 :param msg: message instance
|
|
175 """
|
|
176
|
|
177 event_count[msg.__class__] += 1
|
|
178
|
|
179 # for all plugins interested in this message ...
|
|
180 for plugin in self.mailman[msg.__class__]:
|
|
181
|
|
182 # check if trigger condition is met
|
|
183 if plugin.check_trigger(msg, self.event_count[msg.__class__]):
|
|
184
|
|
185 # have the plugin execute the message
|
|
186 plugin.execute(msg)
|
|
187
|
|
188
|
|
189 def run(self):
|
|
190 """
|
|
191 This would be the main loop of the program. I won't go into details
|
|
192 because its still somewhat blurry in my head :) But basically, the CA
|
|
193 could be configured to send out its own messages, independently from all
|
|
194 other plugins.
|
|
195
|
|
196 These could be "synchronous" messages such as: "5 seconds have passed",
|
|
197 or others such as "save state we are about to get killed".
|
|
198
|
|
199 NOTE: seems like this would almost have to live in its own thread ...
|
|
200 """
|
|
201
|
|
202 # the following would be parametrized obviously
|
|
203 while(True):
|
|
204 msg = ElapsedTimeMessage(5)
|
|
205 self.send(msg)
|
|
206 sleep(5)
|
|
207
|
|
208
|
|
209
|
|
210 Putting it all-together
|
|
211 =======================
|
|
212
|
|
213
|
|
214 def main():
|
|
215
|
|
216 ca = CentralAuthority()
|
|
217
|
|
218 producer = ProducerPlugin()
|
|
219 ca.register(producer)
|
|
220
|
|
221 consumer = ConsumerPlugin()
|
|
222 consumer.listen(MessageB, 1)
|
|
223 ca.register(consumer))
|
|
224
|
|
225 other = ConsumerProducerPlugin()
|
|
226 other.listen(MessageB, 10)
|
|
227 ca.register(other)
|
|
228
|
|
229 # this is the function call which gets the ball rolling
|
|
230 producer.dostuff()
|
|
231
|
|
232
|
|
233 DISCUSSION: blocking vs. non-blocking
|
|
234 =====================================
|
|
235
|
|
236 In the above example, I used "blocking" sends. However it is not-clear that this
|
|
237 is the best option.
|
|
238
|
|
239 In the example, the producer basically acts as the main loop. It relinquishes
|
|
240 control of the main loop when the CA decides to forward the message to other
|
|
241 plugins. Control will only be returned once the cascade of send/receives
|
|
242 initiated with MessageA is complete (all subplugins have processed MessageA and
|
|
243 any messages sent as a side-effect have also been processed).
|
|
244
|
|
245 This definitely imposes constraints on what the plugins can do, and how they do
|
|
246 it. For the type of single-processor / linear jobs we tend to run, this might be
|
|
247 enough (??).
|
|
248
|
|
249 The good news is that going forward, the above plugin architecture can also
|
|
250 scale to distributed systems, by changing the sends to be non-blocking. Plugins
|
|
251 could then live on different machines and process data as they see fit.
|
|
252 Synchronization would be enforced by way of messages. In the above, the "main
|
|
253 producer" would thus become a consumer/producer who listens for "done processing
|
|
254 MessageA" messages and produces a new MessageA as a result.
|
|
255
|
|
256 On single-processor systems, the synchronization overhead might be too costly
|
|
257 however. That is something we would have to investigate. On the plus side
|
|
258 however, our plugins would be "future proof" and lend themselves well to the
|
|
259 type of "massively parallel jobs" we wish to run (i.e. meta-learners, etc.)
|
|
260
|
|
261
|
|
262
|
|
263 Logistic Regression
|
|
264 ===================
|
|
265
|
|
266
|
|
267 TO COME SOON (?)
|