Mercurial > lcfOS
view python/ppci/tasks.py @ 347:742588fb8cd6 devel
Merge into devel branch
author | Windel Bouwman |
---|---|
date | Fri, 07 Mar 2014 17:10:21 +0100 |
parents | 3bb7dcfe5529 |
children | 9667d78ba79e |
line wrap: on
line source
""" This module defines tasks and a runner for these tasks. Tasks can have dependencies and it can be determined if they need to be run. """ import logging class TaskError(Exception): def __init__(self, msg): self.msg = msg class Task: """ Task that can run, and depend on other tasks """ def __init__(self, name): self.name = name self.completed = False self.dependencies = set() self.duration = 1 def run(self): raise NotImplementedError("Implement this abstract method!") def fire(self): """ Wrapper around run that marks the task as done """ assert all(t.completed for t in self.dependencies) self.run() self.completed = True def add_dependency(self, task): """ Add another task as a dependency for this task """ if task is self: raise TaskError('Can not add dependency on task itself!') if self in task.down_stream_tasks: raise TaskError('Can not introduce circular task') self.dependencies.add(task) return task @property def down_stream_tasks(self): """ Return a set of all tasks that follow this task """ # TODO: is this upstream or downstream??? cdst = list(dep.down_stream_tasks for dep in self.dependencies) cdst.append(self.dependencies) return set.union(*cdst) def __gt__(self, other): return other in self.down_stream_tasks def __repr__(self): return 'Task "{}"'.format(self.name) class EmptyTask(Task): """ Basic task that does nothing """ def run(self): pass class TaskRunner: """ Basic task runner that can run some tasks in sequence """ def __init__(self): self.logger = logging.getLogger('taskrunner') self.task_list = [] def add_task(self, task): self.task_list.append(task) @property def total_duration(self): return sum(t.duration for t in self.task_list) def run_tasks(self): # First sort tasks: self.task_list.sort() # Run tasks: passed_time = 0.0 total_time = self.total_duration try: for t in self.task_list: self.report_progress(passed_time / total_time, t.name) t.fire() passed_time += t.duration except TaskError as e: self.logger.error(str(e.msg)) return 1 self.report_progress(1, 'OK') return 0 def display(self): """ Display task how they would be run """ for task in self.task_list: print(task) def report_progress(self, percentage, text): self.logger.info('[{:3.1%}] {}'.format(percentage, text))