# HG changeset patch # User Olivier Breuleux # Date 1284620304 14400 # Node ID a1957faecc9b90452e9d38f628356e3eadf05d71 # Parent 0653a85ff2e88edc2d3ea7027e453fc249b8ceb4 revised plugin interface and implementation diff -r 0653a85ff2e8 -r a1957faecc9b doc/v2_planning/plugin.py --- a/doc/v2_planning/plugin.py Wed Sep 15 15:38:45 2010 -0400 +++ b/doc/v2_planning/plugin.py Thu Sep 16 02:58:24 2010 -0400 @@ -1,14 +1,60 @@ import time -from collections import defaultdict +from collections import defaultdict, deque +from copy import copy inf = float('inf') +############# +### EVENT ### +############# + +class Event(object): + + def __init__(self, type, **attributes): + self.type = type + self.__dict__.update(attributes) + self.attributes = dict(type = type, **attributes) + + def match(self, other): + if isinstance(other, Matcher): + return other(self) + else: + oattr = other.attributes + for k, v in self.attributes.iteritems(): + if k in oattr: + v2 = oattr[k] + if isinstance(v2, Matcher): + if not v2(v): return False + else: + if v != v2: return False + return True + + def __str__(self): + return "Event(%s)" % ", ".join("%s=%s" % (k, v) for k, v in self.attributes.iteritems()) + +class Matcher(object): + + def __call__(self, object): + raise NotImplementedError("Implement this!") + +class FnMatcher(Matcher): + + def __init__(self, function): + self.function = function + + def __call__(self, object): + return self.function(object) + +all_events = FnMatcher(lambda _: True) + + + ################ ### SCHEDULE ### ################ -class Schedule(object): +class Schedule(Matcher): def __add__(self, i): return OffsetSchedule(self, i) def __or__(self, s): @@ -42,33 +88,37 @@ self.subschedules = map(to_schedule, subschedules) class UnionSchedule(ScheduleMix): - def __call__(self, t1, t2): - return any(s(t1, t2) for s in self.subschedules) + def __call__(self, time): + return any(s(time) for s in self.subschedules) class IntersectionSchedule(ScheduleMix): - def __call__(self, t1, t2): - return all(s(t1, t2) for s in self.subschedules) + def __call__(self, time): + return all(s(time) for s in self.subschedules) class DifferenceSchedule(ScheduleMix): __n__ = 2 - def __call__(self, t1, t2): - return self.subschedules[0](t1, t2) and not self.subschedules[1](t1, t2) + def __call__(self, time): + return self.subschedules[0](time) and not self.subschedules[1](time) class NegatedSchedule(ScheduleMix): __n__ = 1 - def __call__(self, t1, t2): - return not self.subschedules[0](t1, t2) + def __call__(self, time): + return not self.subschedules[0](time) class OffsetSchedule(Schedule): def __init__(self, schedule, offset): self.schedule = schedule self.offset = offset - def __call__(self, t1, t2): - return self.schedule(t1 - self.offset, t2 - self.offset) + def __call__(self, time): + if isinstance(time, int): + return self.schedule(time - self.offset) + else: + t1, t2 = time + return self.schedule((t1 - self.offset, t2 - self.offset)) class AlwaysSchedule(Schedule): - def __call__(self, t1, t2): + def __call__(self, time): return True always = AlwaysSchedule() @@ -78,16 +128,22 @@ def __init__(self, step, repeat = inf): self.step = step self.upper_bound = step * (repeat - 1) - def __call__(self, t1, t2): - if t2 < 0 or t1 > self.upper_bound: - return False - diff = t2 - t1 - t1m = t1 % self.step - t2m = t2 % self.step - return (diff >= self.step - or t1m == 0 - or t2m == 0 - or t1m > t2m) + def __call__(self, time): + if isinstance(time, int): + if time < 0 or time > self.upper_bound: + return False + return time % self.step == 0 + else: + t1, t2 = time + if t2 < 0 or t1 > self.upper_bound: + return False + diff = t2 - t1 + t1m = t1 % self.step + t2m = t2 % self.step + return (diff >= self.step + or t1m == 0 + or t2m == 0 + or t1m > t2m) each = lambda step, repeat = inf: each0(step, repeat) + step each0 = IntervalSchedule @@ -97,9 +153,13 @@ def __init__(self, low = None, high = None): self.low = low or -inf self.high = high or inf - def __call__(self, t1, t2): - return self.low <= t1 <= self.high \ - or self.low <= t2 <= self.high + def __call__(self, time): + if isinstance(time, int): + return self.low <= time <= self.high + else: + t1, t2 = time + return self.low <= t1 <= self.high \ + or self.low <= t2 <= self.high inrange = RangeSchedule @@ -107,221 +167,148 @@ class ListSchedule(Schedule): def __init__(self, *schedules): self.schedules = schedules - def __call__(self, t1, t2): - for t in self.schedules: - if t1 <= t <= t2: - return True + def __call__(self, time): + if isinstance(time, int): + return time in self.schedules + else: + for t in self.schedules: + if t1 <= t <= t2: + return True return False at = ListSchedule -at_start = at(-inf) -at_end = at(inf) ############## -### RUNNER ### +### PLUGIN ### ############## -class scratchpad: - pass +class Plugin(object): + + def attach(self, scheduler): + c = copy(self) + c.scheduler = scheduler + return c -# # ORIGINAL RUNNER, NO TIMELINES -# def runner(master, plugins): -# """ -# master is a function which is in charge of the "this" object. It -# is in charge of updating the t1, t2 and done fields, It must -# take a single argument, this. + def __call__(self, event): + raise NotImplementedError("Implement this!") + + def fire(self, type, **attributes): + event = Event(type, issuer = self, **attributes) + self.scheduler.queue(event) + +class FnPlugin(Plugin): -# plugins is a list of (schedule, function) pairs. In-between each -# execution of the master function, as well as at the very -# beginning and at the very end, the schedule will be consulted -# for the time range [t1, t2], and if there is a match, the -# function will be called with this as the argument. The order -# in which the functions are provided is respected. + def __init__(self, function): + self.function = function + + def __call__(self, event): + return self.function(self, event) + +class DispatchPlugin(Plugin): -# Note: the reason why we use t1 and t2 instead of just t is that it -# gives the master function the ability to run several iterations at -# once without consulting any plugins. In that situation, t1 and t2 -# represent a range, and the schedule must determine if there would -# have been an event in that range (we do not distinguish between a -# single event and multiple events). + def __call__(self, event): + getattr(self, "on_" + event.type, self.generic)(event) + + def generic(self, event): + return + + +################# +### SCHEDULER ### +################# -# For instance, if one is training using minibatches, one could set -# t1 and t2 to the index of the lower and higher examples, and the -# plugins' schedules would be given according to how many examples -# were seen rather than how many minibatches were processed. +class Scheduler(object): -# Another possibility is to use real time - t1 would be the time -# before the execution of the master function, t2 the time after -# (in, say, milliseconds). Then you can define plugins that run -# every second or every minute, but only in-between two training -# iterations. -# """ + def __init__(self): + self.plugins = [] + self.categorized = defaultdict(list) + self.event_queue = deque() -# this = scratchpad() -# this.t1 = -inf -# this.t2 = -inf -# this.started = False -# this.done = False -# while True: -# for schedule, function in plugins: -# if schedule(this.t1, this.t2): -# function(this) -# if this.done: -# break -# master(this) -# this.started = True -# if this.done: -# break -# this.t1 = inf -# this.t2 = inf -# for schedule, function in plugins: -# if schedule(this.t1, this.t2): -# function(this) + def __call__(self): + i = 0 + evq = self.event_queue + self.queue(Event("begin", issuer = self)) + while True: + self.queue(Event("tick", issuer = self, time = i)) + while evq: + event = evq.popleft() + candidates = self.categorized[event.type] + self.categorized[None] + for event_template, plugin in candidates: + if event.match(event_template): + plugin(event) # note: the plugin might queue more events + if event.type == "terminate": + return + i += 1 + + def schedule_plugin(self, event_template, plugin): + plugin = plugin.attach(self) + if isinstance(event_template, Matcher) or isinstance(event_template.type, Matcher): + # These plugins may execute upon any event type + self.categorized[None].append((event_template, plugin)) + else: + self.categorized[event_template.type].append((event_template, plugin)) + self.plugins.append((event_template, plugin)) + + def queue(self, event): + self.event_queue.append(event) -def runner(main, plugins): - """ - :param main: A function which must take a single argument, - ``this``. The ``this`` argument contains a settable ``done`` - flag indicating whether the iterations should keep going or - not, as well as a flag indicating whether this is the first - time runner() is calling main(). main() may store whatever it - wants in ``this``. It may also add one or more timelines in - ``this.timelines[timeline_name]``, which plugins can exploit. +@FnPlugin +def printer(self, event): + print event + +@FnPlugin +def stopper(self, event): + self.fire("terminate") - :param plugins: A list of (schedule, timeline, function) - tuples. In-between each execution of the main function, as - well as at the very beginning and at the very end, the - schedule will be consulted for the time range [t1, t2] from - the appropriate timeline, and if there is a match, the - function will be called with ``this`` as the argument. The - order in which the functions are provided is respected. +@FnPlugin +def byebye(self, event): + print "bye bye!" - For any plugin, the timeline can be - * 'iterations', where t1 == t2 == the iteration number - * 'real_time', where t1 and t2 mark the start of the last - loop and the start of the current loop, in seconds since - the beginning of training (includes time spent in plugins) - * 'algorithm_time', where t1 and t2 mark the start and end - of the last iteration of the main function (does not - include time spent in plugins) - * A main function specific timeline. - At the very beginning, the time for all timelines is - -infinity, at the very end it is +infinity. - """ - start_time = time.time() - - this = scratchpad() +@FnPlugin +def waiter(self, event): + time.sleep(0.1) - this.timelines = defaultdict(lambda: [-inf, -inf]) - realt = this.timelines['real_time'] - algot = this.timelines['algorithm_time'] - itert = this.timelines['iterations'] - - this.started = False - this.done = False - - while True: - - for schedule, timeline, function in plugins: - if schedule(*this.timelines[timeline]): - function(this) - if this.done: - break +# @FnPlugin +# def timer(self, event): +# if not hasattr(self, 'previous'): +# self.beginning = time.time() +# self.previous = 0 +# now = time.time() - self.beginning +# inow = int(now) +# if inow > self.previous: +# self.fire("second", time = inow) +# self.previous = now - t1 = time.time() - main(this) - t2 = time.time() +class Timer(DispatchPlugin): + + def on_begin(self, event): + self.beginning = time.time() + self.previous = 0 - if not this.started: - realt[:] = [0, 0] - algot[:] = [0, 0] - itert[:] = [-1, -1] - realt[:] = [realt[1], t2 - start_time] - algot[:] = [algot[1], algot[1] + (t2 - t1)] - itert[:] = [itert[0] + 1, itert[1] + 1] - - this.started = True - if this.done: - break - - this.timelines = defaultdict(lambda: [inf, inf]) - - for schedule, timeline, function in plugins: - if schedule(*this.timelines[timeline]): - function(this) + def on_tick(self, event): + now = time.time() - self.beginning + inow = int(now) + if inow > self.previous: + self.fire("second", time = inow) + self.previous = now - - -################ -### SHOWCASE ### -################ - -def main(this): - if not this.started: - this.error = 1.0 - # note: runner will automatically set this.started to true - else: - this.error /= 1.1 +sch = Scheduler() -def welcome(this): - print "Let's start!" - -def print_iter(this): - print "Now running iteration #%i" % this.timelines['iterations'][0] - -def print_error(this): - print "The error rate is %s" % this.error - -def maybe_stop(this): - thr = 0.01 - if this.error < thr: - print "Error is below the threshold: %s <= %s" % (this.error, thr) - this.done = True - -def wait_a_bit(this): - time.sleep(1./37) - -def printer(txt): - def f(this): - print txt - return f - -def stop_this_madness(this): - this.done = True +sch.schedule_plugin(all_events, Timer()) +sch.schedule_plugin(Event("tick"), waiter) # this means: execute the waiter plugin (a delay) on every "tick" event. Is it confusing to use Event(...)? +sch.schedule_plugin(Event("second"), printer) -def byebye(this): - print "Bye bye!" +# sch.schedule_plugin(all_events, printer) -runner(main = main, - plugins = [# At the very beginning, print a welcome message - (at_start, 'iterations', welcome), - # Each iteration from 1 to 10 inclusive, OR each multiple of 10 - # (except 0 - each() excludes 0, each0() includes it) - # print the error - (inrange(1, 10) | each(10), 'iterations', print_error), - # Each multiple of 10, check for stopping condition - (each(10), 'iterations', maybe_stop), - # At iteration 1000, if we ever get that far, just stop - (at(1000), 'iterations', stop_this_madness), - # Wait a bit - (each(1), 'iterations', wait_a_bit), - # Print bonk each second of real time - (each(1), 'real_time', printer('BONK')), - # Print thunk each second of time in main() (main() - # is too fast, so this does not happen for many - # iterations) - (each(1), 'algorithm_time', printer('THUNK')), - # Announce the next iteration - (each0(1), 'iterations', print_iter), - # At the very end, display a message - (at_end, 'iterations', byebye)]) +sch.schedule_plugin(Event("tick", time = at(100)), stopper) +sch.schedule_plugin(Event("terminate"), byebye) - +sch() diff -r 0653a85ff2e8 -r a1957faecc9b doc/v2_planning/plugin.txt --- a/doc/v2_planning/plugin.txt Wed Sep 15 15:38:45 2010 -0400 +++ b/doc/v2_planning/plugin.txt Thu Sep 16 02:58:24 2010 -0400 @@ -67,3 +67,115 @@ I have implemented the feature in plugin.py, in this directory. Simply run python plugin.py to test it. + + +=============== +Revised version +=============== + +Taking into account ideas thrown around during the September 16 +meeting I (OB) have made the following modifications to my original +proposal: + +Event objects +============= + +In the revised framework, an Event is a generic object which can +contain any attributes you want, with one privileged attribute, the +'type' attribute, which is a string. I expect the following attributes +to be used widely: + +* type: this is a string describing the abstract semantics of this + event ("tick", "second", "millisecond", "batch", etc.) + +* issuer: a pointer to the plugin that issued this event. This allows + for fine grained filtering in the case where several plugins can + fire the same event type + +* time: an integer or float index on an abstract timeline. For + instance, the "tick" event would have a "time" field, which would be + increased by one every time the event is fired. Pretty much all + recurrent events should include this. + +* data: some data associated to the event. presumably it doesn't have + to be named "data", and more than one data field could be given. + +The basic idea is that it should be possible to say: "I want this +plugin to be executed every tenth time an event of this type is fired +by this plugin", or any subset of these conditions. + +Matching events +=============== + +When registering a plugin, you specify a sort of "abstract event" that +an event must "match" in order to be fed to the plugin. This can be +done by simply instantiating an event with the fields you want to +match. I think examples would explain best my idea +(sch.schedule_plugin = add a plugin to the scheduler): + +# Print the error on every parameter update (learner given in the event) +sch.schedule_plugin(Event("parameter_update"), PrintError()) +# Print the reconstruction error of daa0 whenever it does a parameter update +sch.schedule_plugin(Event("parameter_update", issuer = daa0), PrintReconstructionError()) +# Save the learner every 10 minutes +sch.schedule_plugin(Event("minute", time = each(10)), Save(learner)) + +The events given as first argument to schedule_plugin are not real +events: they are "template events" meant to be *matched* against the +real events that will be fired. If the terminology is confusing, it +would not be a problem to use another class with a better name (for +example, On("minute", time = each(10)) could be clearer than +Event(...), I don't know). + +Note that fields in these Event objects can be a special kind of +object, a Matcher, which allows to filter events based on arbitrary +conditions. My Schedule objects (each, at, etc.) now inherit from +Matcher. You could easily have a matcher that allows you to match +issuers that are instances of a certain class, or matches every single +event (I have an example of the latter in plugin.py). + +Plugins +======= + +The plugin class would have the following methods: + +* attach(scheduler): tell the plugin that it is being scheduled by the + scheduler, store the scheduler in self. The method can return self, + or a copy of itself. + +* fire(type, **attributes): adds Event(type, issuer = self, **attributes) + to the event queue of self.scheduler + +Scheduler +========= + +A Scheduler would have a schedule_plugin(event_template, plugin) +method to add plugins, a queue(event) method to queue a new event, and +it would be callable. + +My current version proceeds as follows: + +* Fire Event("begin"). Somewhat equivalent to "tick" at time 0, but I + find it cleaner to have a special event to mark the beginning of the + event loop. +* Infinite loop + * Fire Event("tick", time = ) + * Loop until the queue is empty + * Pop event, execute all plugins that respond to it + * Check if event.type == "terminate". If so, stop. + +Varia +===== + +I've made a very simple implementation of a DispatchPlugin which, upon +reception of an event, dispatches it to its "on_" method +(or calls a fallback). It seems nice. However, in order for it to work +reliably, it has to be registered on all events, and I'm not sure it +can scale well to more complex problems where the source of events is +important. + +Implementation +============== + +See plugin.py. +