view doc/v2_planning/plugin.py @ 1531:88f361283a19 tip

Fix url/name to pylearn2.
author Frederic Bastien <nouiz@nouiz.org>
date Mon, 09 Sep 2013 10:08:05 -0400
parents a1957faecc9b
children
line wrap: on
line source


import time
from collections import defaultdict, deque
from copy import copy

inf = float('inf')

#############
### EVENT ###
#############

class Event(object):

    def __init__(self, type, **attributes):
        self.type = type
        self.__dict__.update(attributes)
        self.attributes = dict(type = type, **attributes)

    def match(self, other):
        if isinstance(other, Matcher):
            return other(self)
        else:
            oattr = other.attributes
            for k, v in self.attributes.iteritems():
                if k in oattr:
                    v2 = oattr[k]
                    if isinstance(v2, Matcher):
                        if not v2(v): return False
                    else:
                        if v != v2: return False
            return True

    def __str__(self):
        return "Event(%s)" % ", ".join("%s=%s" % (k, v) for k, v in self.attributes.iteritems())

class Matcher(object):

    def __call__(self, object):
        raise NotImplementedError("Implement this!")

class FnMatcher(Matcher):

    def __init__(self, function):
        self.function = function

    def __call__(self, object):
        return self.function(object)

all_events = FnMatcher(lambda _: True)



################
### SCHEDULE ###
################

class Schedule(Matcher):
    def __add__(self, i):
        return OffsetSchedule(self, i)
    def __or__(self, s):
        return UnionSchedule(self, to_schedule(s))
    def __and__(self, s):
        return IntersectionSchedule(self, to_schedule(s))
    def __sub__(self, i):
        return OffsetSchedule(self, -i)
    def __ror__(self, s):
        return UnionSchedule(to_schedule(s), self)
    def __rand__(self, s):
        return IntersectionSchedule(to_schedule(s), self)
    def __invert__(self):
        return NegatedSchedule(self)

def to_schedule(x):
    if x in (None, False):
        return never
    if x is True:
        return always
    elif isinstance(x, (list, tuple)):
        return reduce(UnionSchedule, x)
    else:
        return x


class ScheduleMix(Schedule):
    __n__ = None
    def __init__(self, *subschedules):
        assert (not self.__n__) or len(subschedules) == self.__n__
        self.subschedules = map(to_schedule, subschedules)

class UnionSchedule(ScheduleMix):
    def __call__(self, time):
        return any(s(time) for s in self.subschedules)

class IntersectionSchedule(ScheduleMix):
    def __call__(self, time):
        return all(s(time) for s in self.subschedules)

class DifferenceSchedule(ScheduleMix):
    __n__ = 2
    def __call__(self, time):
        return self.subschedules[0](time) and not self.subschedules[1](time)

class NegatedSchedule(ScheduleMix):
    __n__ = 1
    def __call__(self, time):
        return not self.subschedules[0](time)

class OffsetSchedule(Schedule):
    def __init__(self, schedule, offset):
        self.schedule = schedule
        self.offset = offset
    def __call__(self, time):
        if isinstance(time, int):
            return self.schedule(time - self.offset)
        else:
            t1, t2 = time
            return self.schedule((t1 - self.offset, t2 - self.offset))


class AlwaysSchedule(Schedule):
    def __call__(self, time):
        return True

always = AlwaysSchedule()
never = ~always

class IntervalSchedule(Schedule):
    def __init__(self, step, repeat = inf):
        self.step = step
        self.upper_bound = step * (repeat - 1)
    def __call__(self, time):
        if isinstance(time, int):
            if time < 0 or time > self.upper_bound:
                return False
            return time % self.step == 0
        else:
            t1, t2 = time
            if t2 < 0 or t1 > self.upper_bound:
                return False
            diff = t2 - t1
            t1m = t1 % self.step
            t2m = t2 % self.step
            return (diff >= self.step
                    or t1m == 0
                    or t2m == 0
                    or t1m > t2m)

each = lambda step, repeat = inf: each0(step, repeat) + step
each0 = IntervalSchedule


class RangeSchedule(Schedule):
    def __init__(self, low = None, high = None):
        self.low = low or -inf
        self.high = high or inf
    def __call__(self, time):
        if isinstance(time, int):
            return self.low <= time <= self.high
        else:
            t1, t2 = time
            return self.low <= t1 <= self.high \
                or self.low <= t2 <= self.high

inrange = RangeSchedule    


class ListSchedule(Schedule):
    def __init__(self, *schedules):
        self.schedules = schedules
    def __call__(self, time):
        if isinstance(time, int):
            return time in self.schedules
        else:
            for t in self.schedules:
                if t1 <= t <= t2:
                    return True
        return False

at = ListSchedule


##############
### PLUGIN ###
##############

class Plugin(object):

    def attach(self, scheduler):
        c = copy(self)
        c.scheduler = scheduler
        return c

    def __call__(self, event):
        raise NotImplementedError("Implement this!")

    def fire(self, type, **attributes):
        event = Event(type, issuer = self, **attributes)
        self.scheduler.queue(event)

class FnPlugin(Plugin):

    def __init__(self, function):
        self.function = function

    def __call__(self, event):
        return self.function(self, event)

class DispatchPlugin(Plugin):

    def __call__(self, event):
        getattr(self, "on_" + event.type, self.generic)(event)

    def generic(self, event):
        return


#################
### SCHEDULER ###
#################

class Scheduler(object):

    def __init__(self):
        self.plugins = []
        self.categorized = defaultdict(list)
        self.event_queue = deque()

    def __call__(self):
        i = 0
        evq = self.event_queue
        self.queue(Event("begin", issuer = self))
        while True:
            self.queue(Event("tick", issuer = self, time = i))
            while evq:
                event = evq.popleft()
                candidates = self.categorized[event.type] + self.categorized[None]
                for event_template, plugin in candidates:
                    if event.match(event_template):
                        plugin(event) # note: the plugin might queue more events
                if event.type == "terminate":
                    return
            i += 1

    def schedule_plugin(self, event_template, plugin):
        plugin = plugin.attach(self)
        if isinstance(event_template, Matcher) or isinstance(event_template.type, Matcher):
            # These plugins may execute upon any event type
            self.categorized[None].append((event_template, plugin))
        else:
            self.categorized[event_template.type].append((event_template, plugin))
        self.plugins.append((event_template, plugin))

    def queue(self, event):
        self.event_queue.append(event)




@FnPlugin
def printer(self, event):
    print event

@FnPlugin
def stopper(self, event):
    self.fire("terminate")

@FnPlugin
def byebye(self, event):
    print "bye bye!"


@FnPlugin
def waiter(self, event):
    time.sleep(0.1)

# @FnPlugin
# def timer(self, event):
#     if not hasattr(self, 'previous'):
#         self.beginning = time.time()
#         self.previous = 0
#     now = time.time() - self.beginning
#     inow = int(now)
#     if inow > self.previous:
#         self.fire("second", time = inow)
#     self.previous = now

class Timer(DispatchPlugin):

    def on_begin(self, event):
        self.beginning = time.time()
        self.previous = 0

    def on_tick(self, event):
        now = time.time() - self.beginning
        inow = int(now)
        if inow > self.previous:
            self.fire("second", time = inow)
        self.previous = now



sch = Scheduler()


sch.schedule_plugin(all_events, Timer())
sch.schedule_plugin(Event("tick"), waiter) # this means: execute the waiter plugin (a delay) on every "tick" event. Is it confusing to use Event(...)?
sch.schedule_plugin(Event("second"), printer)

# sch.schedule_plugin(all_events, printer)

sch.schedule_plugin(Event("tick", time = at(100)), stopper)
sch.schedule_plugin(Event("terminate"), byebye)

sch()