Mercurial > pylearn
view doc/v2_planning/plugin.py @ 1361:7548dc1b163c
Some question/suggestions to datalearn
author | Razvan Pascanu <r.pascanu@gmail.com> |
---|---|
date | Thu, 11 Nov 2010 22:40:01 -0500 |
parents | a1957faecc9b |
children |
line wrap: on
line source
import time from collections import defaultdict, deque from copy import copy inf = float('inf') ############# ### EVENT ### ############# class Event(object): def __init__(self, type, **attributes): self.type = type self.__dict__.update(attributes) self.attributes = dict(type = type, **attributes) def match(self, other): if isinstance(other, Matcher): return other(self) else: oattr = other.attributes for k, v in self.attributes.iteritems(): if k in oattr: v2 = oattr[k] if isinstance(v2, Matcher): if not v2(v): return False else: if v != v2: return False return True def __str__(self): return "Event(%s)" % ", ".join("%s=%s" % (k, v) for k, v in self.attributes.iteritems()) class Matcher(object): def __call__(self, object): raise NotImplementedError("Implement this!") class FnMatcher(Matcher): def __init__(self, function): self.function = function def __call__(self, object): return self.function(object) all_events = FnMatcher(lambda _: True) ################ ### SCHEDULE ### ################ class Schedule(Matcher): def __add__(self, i): return OffsetSchedule(self, i) def __or__(self, s): return UnionSchedule(self, to_schedule(s)) def __and__(self, s): return IntersectionSchedule(self, to_schedule(s)) def __sub__(self, i): return OffsetSchedule(self, -i) def __ror__(self, s): return UnionSchedule(to_schedule(s), self) def __rand__(self, s): return IntersectionSchedule(to_schedule(s), self) def __invert__(self): return NegatedSchedule(self) def to_schedule(x): if x in (None, False): return never if x is True: return always elif isinstance(x, (list, tuple)): return reduce(UnionSchedule, x) else: return x class ScheduleMix(Schedule): __n__ = None def __init__(self, *subschedules): assert (not self.__n__) or len(subschedules) == self.__n__ self.subschedules = map(to_schedule, subschedules) class UnionSchedule(ScheduleMix): def __call__(self, time): return any(s(time) for s in self.subschedules) class IntersectionSchedule(ScheduleMix): def __call__(self, time): return all(s(time) for s in self.subschedules) class DifferenceSchedule(ScheduleMix): __n__ = 2 def __call__(self, time): return self.subschedules[0](time) and not self.subschedules[1](time) class NegatedSchedule(ScheduleMix): __n__ = 1 def __call__(self, time): return not self.subschedules[0](time) class OffsetSchedule(Schedule): def __init__(self, schedule, offset): self.schedule = schedule self.offset = offset def __call__(self, time): if isinstance(time, int): return self.schedule(time - self.offset) else: t1, t2 = time return self.schedule((t1 - self.offset, t2 - self.offset)) class AlwaysSchedule(Schedule): def __call__(self, time): return True always = AlwaysSchedule() never = ~always class IntervalSchedule(Schedule): def __init__(self, step, repeat = inf): self.step = step self.upper_bound = step * (repeat - 1) def __call__(self, time): if isinstance(time, int): if time < 0 or time > self.upper_bound: return False return time % self.step == 0 else: t1, t2 = time if t2 < 0 or t1 > self.upper_bound: return False diff = t2 - t1 t1m = t1 % self.step t2m = t2 % self.step return (diff >= self.step or t1m == 0 or t2m == 0 or t1m > t2m) each = lambda step, repeat = inf: each0(step, repeat) + step each0 = IntervalSchedule class RangeSchedule(Schedule): def __init__(self, low = None, high = None): self.low = low or -inf self.high = high or inf def __call__(self, time): if isinstance(time, int): return self.low <= time <= self.high else: t1, t2 = time return self.low <= t1 <= self.high \ or self.low <= t2 <= self.high inrange = RangeSchedule class ListSchedule(Schedule): def __init__(self, *schedules): self.schedules = schedules def __call__(self, time): if isinstance(time, int): return time in self.schedules else: for t in self.schedules: if t1 <= t <= t2: return True return False at = ListSchedule ############## ### PLUGIN ### ############## class Plugin(object): def attach(self, scheduler): c = copy(self) c.scheduler = scheduler return c def __call__(self, event): raise NotImplementedError("Implement this!") def fire(self, type, **attributes): event = Event(type, issuer = self, **attributes) self.scheduler.queue(event) class FnPlugin(Plugin): def __init__(self, function): self.function = function def __call__(self, event): return self.function(self, event) class DispatchPlugin(Plugin): def __call__(self, event): getattr(self, "on_" + event.type, self.generic)(event) def generic(self, event): return ################# ### SCHEDULER ### ################# class Scheduler(object): def __init__(self): self.plugins = [] self.categorized = defaultdict(list) self.event_queue = deque() def __call__(self): i = 0 evq = self.event_queue self.queue(Event("begin", issuer = self)) while True: self.queue(Event("tick", issuer = self, time = i)) while evq: event = evq.popleft() candidates = self.categorized[event.type] + self.categorized[None] for event_template, plugin in candidates: if event.match(event_template): plugin(event) # note: the plugin might queue more events if event.type == "terminate": return i += 1 def schedule_plugin(self, event_template, plugin): plugin = plugin.attach(self) if isinstance(event_template, Matcher) or isinstance(event_template.type, Matcher): # These plugins may execute upon any event type self.categorized[None].append((event_template, plugin)) else: self.categorized[event_template.type].append((event_template, plugin)) self.plugins.append((event_template, plugin)) def queue(self, event): self.event_queue.append(event) @FnPlugin def printer(self, event): print event @FnPlugin def stopper(self, event): self.fire("terminate") @FnPlugin def byebye(self, event): print "bye bye!" @FnPlugin def waiter(self, event): time.sleep(0.1) # @FnPlugin # def timer(self, event): # if not hasattr(self, 'previous'): # self.beginning = time.time() # self.previous = 0 # now = time.time() - self.beginning # inow = int(now) # if inow > self.previous: # self.fire("second", time = inow) # self.previous = now class Timer(DispatchPlugin): def on_begin(self, event): self.beginning = time.time() self.previous = 0 def on_tick(self, event): now = time.time() - self.beginning inow = int(now) if inow > self.previous: self.fire("second", time = inow) self.previous = now sch = Scheduler() sch.schedule_plugin(all_events, Timer()) sch.schedule_plugin(Event("tick"), waiter) # this means: execute the waiter plugin (a delay) on every "tick" event. Is it confusing to use Event(...)? sch.schedule_plugin(Event("second"), printer) # sch.schedule_plugin(all_events, printer) sch.schedule_plugin(Event("tick", time = at(100)), stopper) sch.schedule_plugin(Event("terminate"), byebye) sch()