changeset 1135:a1957faecc9b

revised plugin interface and implementation
author Olivier Breuleux <breuleuo@iro.umontreal.ca>
date Thu, 16 Sep 2010 02:58:24 -0400
parents 0653a85ff2e8
children 5f0c8ff2b3b6
files doc/v2_planning/plugin.py doc/v2_planning/plugin.txt
diffstat 2 files changed, 311 insertions(+), 212 deletions(-) [+]
line wrap: on
line diff
--- a/doc/v2_planning/plugin.py	Wed Sep 15 15:38:45 2010 -0400
+++ b/doc/v2_planning/plugin.py	Thu Sep 16 02:58:24 2010 -0400
@@ -1,14 +1,60 @@
 
 import time
-from collections import defaultdict
+from collections import defaultdict, deque
+from copy import copy
 
 inf = float('inf')
 
+#############
+### EVENT ###
+#############
+
+class Event(object):
+
+    def __init__(self, type, **attributes):
+        self.type = type
+        self.__dict__.update(attributes)
+        self.attributes = dict(type = type, **attributes)
+
+    def match(self, other):
+        if isinstance(other, Matcher):
+            return other(self)
+        else:
+            oattr = other.attributes
+            for k, v in self.attributes.iteritems():
+                if k in oattr:
+                    v2 = oattr[k]
+                    if isinstance(v2, Matcher):
+                        if not v2(v): return False
+                    else:
+                        if v != v2: return False
+            return True
+
+    def __str__(self):
+        return "Event(%s)" % ", ".join("%s=%s" % (k, v) for k, v in self.attributes.iteritems())
+
+class Matcher(object):
+
+    def __call__(self, object):
+        raise NotImplementedError("Implement this!")
+
+class FnMatcher(Matcher):
+
+    def __init__(self, function):
+        self.function = function
+
+    def __call__(self, object):
+        return self.function(object)
+
+all_events = FnMatcher(lambda _: True)
+
+
+
 ################
 ### SCHEDULE ###
 ################
 
-class Schedule(object):
+class Schedule(Matcher):
     def __add__(self, i):
         return OffsetSchedule(self, i)
     def __or__(self, s):
@@ -42,33 +88,37 @@
         self.subschedules = map(to_schedule, subschedules)
 
 class UnionSchedule(ScheduleMix):
-    def __call__(self, t1, t2):
-        return any(s(t1, t2) for s in self.subschedules)
+    def __call__(self, time):
+        return any(s(time) for s in self.subschedules)
 
 class IntersectionSchedule(ScheduleMix):
-    def __call__(self, t1, t2):
-        return all(s(t1, t2) for s in self.subschedules)
+    def __call__(self, time):
+        return all(s(time) for s in self.subschedules)
 
 class DifferenceSchedule(ScheduleMix):
     __n__ = 2
-    def __call__(self, t1, t2):
-        return self.subschedules[0](t1, t2) and not self.subschedules[1](t1, t2)
+    def __call__(self, time):
+        return self.subschedules[0](time) and not self.subschedules[1](time)
 
 class NegatedSchedule(ScheduleMix):
     __n__ = 1
-    def __call__(self, t1, t2):
-        return not self.subschedules[0](t1, t2)
+    def __call__(self, time):
+        return not self.subschedules[0](time)
 
 class OffsetSchedule(Schedule):
     def __init__(self, schedule, offset):
         self.schedule = schedule
         self.offset = offset
-    def __call__(self, t1, t2):
-        return self.schedule(t1 - self.offset, t2 - self.offset)
+    def __call__(self, time):
+        if isinstance(time, int):
+            return self.schedule(time - self.offset)
+        else:
+            t1, t2 = time
+            return self.schedule((t1 - self.offset, t2 - self.offset))
 
 
 class AlwaysSchedule(Schedule):
-    def __call__(self, t1, t2):
+    def __call__(self, time):
         return True
 
 always = AlwaysSchedule()
@@ -78,16 +128,22 @@
     def __init__(self, step, repeat = inf):
         self.step = step
         self.upper_bound = step * (repeat - 1)
-    def __call__(self, t1, t2):
-        if t2 < 0 or t1 > self.upper_bound:
-            return False
-        diff = t2 - t1
-        t1m = t1 % self.step
-        t2m = t2 % self.step
-        return (diff >= self.step
-                or t1m == 0
-                or t2m == 0
-                or t1m > t2m)
+    def __call__(self, time):
+        if isinstance(time, int):
+            if time < 0 or time > self.upper_bound:
+                return False
+            return time % self.step == 0
+        else:
+            t1, t2 = time
+            if t2 < 0 or t1 > self.upper_bound:
+                return False
+            diff = t2 - t1
+            t1m = t1 % self.step
+            t2m = t2 % self.step
+            return (diff >= self.step
+                    or t1m == 0
+                    or t2m == 0
+                    or t1m > t2m)
 
 each = lambda step, repeat = inf: each0(step, repeat) + step
 each0 = IntervalSchedule
@@ -97,9 +153,13 @@
     def __init__(self, low = None, high = None):
         self.low = low or -inf
         self.high = high or inf
-    def __call__(self, t1, t2):
-        return self.low <= t1 <= self.high \
-            or self.low <= t2 <= self.high
+    def __call__(self, time):
+        if isinstance(time, int):
+            return self.low <= time <= self.high
+        else:
+            t1, t2 = time
+            return self.low <= t1 <= self.high \
+                or self.low <= t2 <= self.high
 
 inrange = RangeSchedule    
 
@@ -107,221 +167,148 @@
 class ListSchedule(Schedule):
     def __init__(self, *schedules):
         self.schedules = schedules
-    def __call__(self, t1, t2):
-        for t in self.schedules:
-            if t1 <= t <= t2:
-                return True
+    def __call__(self, time):
+        if isinstance(time, int):
+            return time in self.schedules
+        else:
+            for t in self.schedules:
+                if t1 <= t <= t2:
+                    return True
         return False
 
 at = ListSchedule
-at_start = at(-inf)
-at_end = at(inf)
 
 
 ##############
-### RUNNER ###
+### PLUGIN ###
 ##############
 
-class scratchpad:
-    pass
+class Plugin(object):
+
+    def attach(self, scheduler):
+        c = copy(self)
+        c.scheduler = scheduler
+        return c
 
-# # ORIGINAL RUNNER, NO TIMELINES
-# def runner(master, plugins):
-#     """
-#     master is a function which is in charge of the "this" object.  It
-#         is in charge of updating the t1, t2 and done fields, It must
-#         take a single argument, this.
+    def __call__(self, event):
+        raise NotImplementedError("Implement this!")
+
+    def fire(self, type, **attributes):
+        event = Event(type, issuer = self, **attributes)
+        self.scheduler.queue(event)
+
+class FnPlugin(Plugin):
 
-#     plugins is a list of (schedule, function) pairs. In-between each
-#         execution of the master function, as well as at the very
-#         beginning and at the very end, the schedule will be consulted
-#         for the time range [t1, t2], and if there is a match, the
-#         function will be called with this as the argument. The order
-#         in which the functions are provided is respected.
+    def __init__(self, function):
+        self.function = function
+
+    def __call__(self, event):
+        return self.function(self, event)
+
+class DispatchPlugin(Plugin):
 
-#     Note: the reason why we use t1 and t2 instead of just t is that it
-#     gives the master function the ability to run several iterations at
-#     once without consulting any plugins. In that situation, t1 and t2
-#     represent a range, and the schedule must determine if there would
-#     have been an event in that range (we do not distinguish between a
-#     single event and multiple events).
+    def __call__(self, event):
+        getattr(self, "on_" + event.type, self.generic)(event)
+
+    def generic(self, event):
+        return
+
+
+#################
+### SCHEDULER ###
+#################
 
-#     For instance, if one is training using minibatches, one could set
-#     t1 and t2 to the index of the lower and higher examples, and the
-#     plugins' schedules would be given according to how many examples
-#     were seen rather than how many minibatches were processed.
+class Scheduler(object):
 
-#     Another possibility is to use real time - t1 would be the time
-#     before the execution of the master function, t2 the time after
-#     (in, say, milliseconds). Then you can define plugins that run
-#     every second or every minute, but only in-between two training
-#     iterations.
-#     """
+    def __init__(self):
+        self.plugins = []
+        self.categorized = defaultdict(list)
+        self.event_queue = deque()
 
-#     this = scratchpad()
-#     this.t1 = -inf
-#     this.t2 = -inf
-#     this.started = False
-#     this.done = False
-#     while True:
-#         for schedule, function in plugins:
-#             if schedule(this.t1, this.t2):
-#                 function(this)
-#                 if this.done:
-#                     break
-#         master(this)
-#         this.started = True
-#         if this.done:
-#             break
-#     this.t1 = inf
-#     this.t2 = inf
-#     for schedule, function in plugins:
-#         if schedule(this.t1, this.t2):
-#             function(this)
+    def __call__(self):
+        i = 0
+        evq = self.event_queue
+        self.queue(Event("begin", issuer = self))
+        while True:
+            self.queue(Event("tick", issuer = self, time = i))
+            while evq:
+                event = evq.popleft()
+                candidates = self.categorized[event.type] + self.categorized[None]
+                for event_template, plugin in candidates:
+                    if event.match(event_template):
+                        plugin(event) # note: the plugin might queue more events
+                if event.type == "terminate":
+                    return
+            i += 1
+
+    def schedule_plugin(self, event_template, plugin):
+        plugin = plugin.attach(self)
+        if isinstance(event_template, Matcher) or isinstance(event_template.type, Matcher):
+            # These plugins may execute upon any event type
+            self.categorized[None].append((event_template, plugin))
+        else:
+            self.categorized[event_template.type].append((event_template, plugin))
+        self.plugins.append((event_template, plugin))
+
+    def queue(self, event):
+        self.event_queue.append(event)
 
 
 
 
-def runner(main, plugins):
-    """
-    :param main: A function which must take a single argument,
-        ``this``. The ``this`` argument contains a settable ``done``
-        flag indicating whether the iterations should keep going or
-        not, as well as a flag indicating whether this is the first
-        time runner() is calling main(). main() may store whatever it
-        wants in ``this``. It may also add one or more timelines in
-        ``this.timelines[timeline_name]``, which plugins can exploit.
+@FnPlugin
+def printer(self, event):
+    print event
+
+@FnPlugin
+def stopper(self, event):
+    self.fire("terminate")
 
-    :param plugins: A list of (schedule, timeline, function)
-        tuples. In-between each execution of the main function, as
-        well as at the very beginning and at the very end, the
-        schedule will be consulted for the time range [t1, t2] from
-        the appropriate timeline, and if there is a match, the
-        function will be called with ``this`` as the argument. The
-        order in which the functions are provided is respected.
+@FnPlugin
+def byebye(self, event):
+    print "bye bye!"
 
-        For any plugin, the timeline can be
-        * 'iterations', where t1 == t2 == the iteration number
-        * 'real_time', where t1 and t2 mark the start of the last
-          loop and the start of the current loop, in seconds since
-          the beginning of training (includes time spent in plugins)
-        * 'algorithm_time', where t1 and t2 mark the start and end
-          of the last iteration of the main function (does not
-          include time spent in plugins)
-        * A main function specific timeline.
 
-        At the very beginning, the time for all timelines is
-        -infinity, at the very end it is +infinity.
-    """
-    start_time = time.time()
-
-    this = scratchpad()
+@FnPlugin
+def waiter(self, event):
+    time.sleep(0.1)
 
-    this.timelines = defaultdict(lambda: [-inf, -inf])
-    realt = this.timelines['real_time']
-    algot = this.timelines['algorithm_time']
-    itert = this.timelines['iterations']
-
-    this.started = False
-    this.done = False
-
-    while True:
-
-        for schedule, timeline, function in plugins:
-            if schedule(*this.timelines[timeline]):
-                function(this)
-                if this.done:
-                    break
+# @FnPlugin
+# def timer(self, event):
+#     if not hasattr(self, 'previous'):
+#         self.beginning = time.time()
+#         self.previous = 0
+#     now = time.time() - self.beginning
+#     inow = int(now)
+#     if inow > self.previous:
+#         self.fire("second", time = inow)
+#     self.previous = now
 
-        t1 = time.time()
-        main(this)
-        t2 = time.time()
+class Timer(DispatchPlugin):
+
+    def on_begin(self, event):
+        self.beginning = time.time()
+        self.previous = 0
 
-        if not this.started:
-            realt[:] = [0, 0]
-            algot[:] = [0, 0]
-            itert[:] = [-1, -1]
-        realt[:] = [realt[1], t2 - start_time]
-        algot[:] = [algot[1], algot[1] + (t2 - t1)]
-        itert[:] = [itert[0] + 1, itert[1] + 1]
-
-        this.started = True
-        if this.done:
-            break
-
-    this.timelines = defaultdict(lambda: [inf, inf])
-
-    for schedule, timeline, function in plugins:
-        if schedule(*this.timelines[timeline]):
-            function(this)
+    def on_tick(self, event):
+        now = time.time() - self.beginning
+        inow = int(now)
+        if inow > self.previous:
+            self.fire("second", time = inow)
+        self.previous = now
 
 
 
-
-
-################
-### SHOWCASE ###
-################
-
-def main(this):
-    if not this.started:
-        this.error = 1.0
-        # note: runner will automatically set this.started to true
-    else:
-        this.error /= 1.1
+sch = Scheduler()
 
 
-def welcome(this):
-    print "Let's start!"
-
-def print_iter(this):
-    print "Now running iteration #%i" % this.timelines['iterations'][0]
-
-def print_error(this):
-    print "The error rate is %s" % this.error
-
-def maybe_stop(this):
-    thr = 0.01
-    if this.error < thr:
-        print "Error is below the threshold: %s <= %s" % (this.error, thr)
-        this.done = True
-
-def wait_a_bit(this):
-    time.sleep(1./37)
-
-def printer(txt):
-    def f(this):
-        print txt
-    return f
-
-def stop_this_madness(this):
-    this.done = True
+sch.schedule_plugin(all_events, Timer())
+sch.schedule_plugin(Event("tick"), waiter) # this means: execute the waiter plugin (a delay) on every "tick" event. Is it confusing to use Event(...)?
+sch.schedule_plugin(Event("second"), printer)
 
-def byebye(this):
-    print "Bye bye!"
+# sch.schedule_plugin(all_events, printer)
 
-runner(main = main,
-       plugins = [# At the very beginning, print a welcome message
-                  (at_start, 'iterations', welcome),
-                  # Each iteration from 1 to 10 inclusive, OR each multiple of 10
-                  # (except 0 - each() excludes 0, each0() includes it)
-                  # print the error
-                  (inrange(1, 10) | each(10), 'iterations',  print_error),
-                  # Each multiple of 10, check for stopping condition
-                  (each(10), 'iterations',  maybe_stop),
-                  # At iteration 1000, if we ever get that far, just stop
-                  (at(1000), 'iterations',  stop_this_madness),
-                  # Wait a bit
-                  (each(1), 'iterations',  wait_a_bit),
-                  # Print bonk each second of real time
-                  (each(1), 'real_time',  printer('BONK')),
-                  # Print thunk each second of time in main() (main()
-                  # is too fast, so this does not happen for many
-                  # iterations)
-                  (each(1), 'algorithm_time',  printer('THUNK')),
-                  # Announce the next iteration
-                  (each0(1), 'iterations',  print_iter),
-                  # At the very end, display a message
-                  (at_end, 'iterations',  byebye)])
+sch.schedule_plugin(Event("tick", time = at(100)), stopper)
+sch.schedule_plugin(Event("terminate"), byebye)
 
-
+sch()
--- a/doc/v2_planning/plugin.txt	Wed Sep 15 15:38:45 2010 -0400
+++ b/doc/v2_planning/plugin.txt	Thu Sep 16 02:58:24 2010 -0400
@@ -67,3 +67,115 @@
 I have implemented the feature in plugin.py, in this directory. Simply
 run python plugin.py to test it.
 
+
+
+===============
+Revised version
+===============
+
+Taking into account ideas thrown around during the September 16
+meeting I (OB) have made the following modifications to my original
+proposal:
+
+Event objects
+=============
+
+In the revised framework, an Event is a generic object which can
+contain any attributes you want, with one privileged attribute, the
+'type' attribute, which is a string. I expect the following attributes
+to be used widely:
+
+* type: this is a string describing the abstract semantics of this
+  event ("tick", "second", "millisecond", "batch", etc.)
+
+* issuer: a pointer to the plugin that issued this event. This allows
+  for fine grained filtering in the case where several plugins can
+  fire the same event type
+
+* time: an integer or float index on an abstract timeline. For
+  instance, the "tick" event would have a "time" field, which would be
+  increased by one every time the event is fired. Pretty much all
+  recurrent events should include this.
+
+* data: some data associated to the event. presumably it doesn't have
+  to be named "data", and more than one data field could be given.
+
+The basic idea is that it should be possible to say: "I want this
+plugin to be executed every tenth time an event of this type is fired
+by this plugin", or any subset of these conditions.
+
+Matching events
+===============
+
+When registering a plugin, you specify a sort of "abstract event" that
+an event must "match" in order to be fed to the plugin. This can be
+done by simply instantiating an event with the fields you want to
+match. I think examples would explain best my idea
+(sch.schedule_plugin = add a plugin to the scheduler):
+
+# Print the error on every parameter update (learner given in the event)
+sch.schedule_plugin(Event("parameter_update"), PrintError())
+# Print the reconstruction error of daa0 whenever it does a parameter update
+sch.schedule_plugin(Event("parameter_update", issuer = daa0), PrintReconstructionError())
+# Save the learner every 10 minutes
+sch.schedule_plugin(Event("minute", time = each(10)), Save(learner))
+
+The events given as first argument to schedule_plugin are not real
+events: they are "template events" meant to be *matched* against the
+real events that will be fired. If the terminology is confusing, it
+would not be a problem to use another class with a better name (for
+example, On("minute", time = each(10)) could be clearer than
+Event(...), I don't know).
+
+Note that fields in these Event objects can be a special kind of
+object, a Matcher, which allows to filter events based on arbitrary
+conditions. My Schedule objects (each, at, etc.) now inherit from
+Matcher. You could easily have a matcher that allows you to match
+issuers that are instances of a certain class, or matches every single
+event (I have an example of the latter in plugin.py).
+
+Plugins
+=======
+
+The plugin class would have the following methods:
+
+* attach(scheduler): tell the plugin that it is being scheduled by the
+  scheduler, store the scheduler in self. The method can return self,
+  or a copy of itself.
+
+* fire(type, **attributes): adds Event(type, issuer = self, **attributes)
+  to the event queue of self.scheduler
+
+Scheduler
+=========
+
+A Scheduler would have a schedule_plugin(event_template, plugin)
+method to add plugins, a queue(event) method to queue a new event, and
+it would be callable.
+
+My current version proceeds as follows:
+
+* Fire Event("begin"). Somewhat equivalent to "tick" at time 0, but I
+  find it cleaner to have a special event to mark the beginning of the
+  event loop.
+* Infinite loop
+  * Fire Event("tick", time = <iteration#>)
+  * Loop until the queue is empty
+    * Pop event, execute all plugins that respond to it
+    * Check if event.type == "terminate". If so, stop.
+
+Varia
+=====
+
+I've made a very simple implementation of a DispatchPlugin which, upon
+reception of an event, dispatches it to its "on_<event.type>" method
+(or calls a fallback). It seems nice. However, in order for it to work
+reliably, it has to be registered on all events, and I'm not sure it
+can scale well to more complex problems where the source of events is
+important.
+
+Implementation
+==============
+
+See plugin.py.
+