# HG changeset patch # User Olivier Breuleux # Date 1228348354 18000 # Node ID d88c35e8f83a1b9bde72d9f216dc8494af197e14 # Parent a71971ccc1e4bda4814f6bf8c35e9e65e43e209b another checkpoint diff -r a71971ccc1e4 -r d88c35e8f83a pylearn/dbdict/newstuff.py --- a/pylearn/dbdict/newstuff.py Wed Dec 03 16:07:04 2008 -0500 +++ b/pylearn/dbdict/newstuff.py Wed Dec 03 18:52:34 2008 -0500 @@ -1,6 +1,11 @@ +from __future__ import with_statement + +import FileLock +import portalocker from collections import defaultdict import re, sys, inspect, os +import signal ################################################################################ ### resolve @@ -18,25 +23,26 @@ ################################################################################ def convert(obj): - if not isinstance(obj, str): - return obj - def kw(x): - x = x.lower() - return dict(true = True, - false = False, - none = None)[x] - for f in (kw, int, float): - try: - return f(obj) - except: - pass - return obj + return eval(obj) +# if not isinstance(obj, str): +# return obj +# def kw(x): +# x = x.lower() +# return dict(true = True, +# false = False, +# none = None)[x] +# for f in (kw, int, float): +# try: +# return f(obj) +# except: +# pass +# return obj def flatten(obj): d = {} def helper(d, prefix, obj): if isinstance(obj, (str, int, float)): - d[prefix] = convert(obj) + d[prefix] = obj #convert(obj) else: if isinstance(obj, dict): subd = obj @@ -61,7 +67,7 @@ current = struct for k2 in keys[:-1]: current = current[k2] - current[keys[-1]] = convert(v) + current[keys[-1]] = v #convert(v) return struct def realize(d): @@ -104,7 +110,7 @@ def format_d(d, sep = '\n', space = True): d = flatten(d) - pattern = "%s = %s" if space else "%s=%s" + pattern = "%s = %r" if space else "%s=%r" return sep.join(pattern % (k, v) for k, v in d.iteritems()) def format_help(topic): @@ -125,97 +131,186 @@ ### single channels ################################################################################ -try: - import greenlet -except: - try: - from py import greenlet - except: - print >>sys.stderr, 'the greenlet module is unavailable' - greenlet = None +# try: +# import greenlet +# except: +# try: +# from py import greenlet +# except: +# print >>sys.stderr, 'the greenlet module is unavailable' +# greenlet = None -class Complete(Exception): - pass -class Incomplete(Exception): - pass class Channel(object): + COMPLETE = None INCOMPLETE = True + + START = 0 + """dbdict.status == START means a experiment is ready to run""" + RUNNING = 1 + """dbdict.status == RUNNING means a experiment is running on dbdict_hostname""" + DONE = 2 + """dbdict.status == DONE means a experiment has completed (not necessarily successfully)""" + + # Methods to be used by the experiment to communicate with the channel + def save(self): + """ + Save the experiment's state to the various media supported by + the Channel. + """ raise NotImplementedError() - def switch(self, *message): + + def switch(self, message = None): + """ + Called from the experiment to give the control back to the channel. + The following return values are meaningful: + * 'stop' -> the experiment must stop as soon as possible. It may save what + it needs to save. This occurs when SIGTERM or SIGINT are sent (or in + user-defined circumstances). + switch() may give the control to the user. In this case, the user may + resume the experiment by calling switch() again. If an argument is given + by the user, it will be relayed to the experiment. + """ pass - def process_message(self, message): - pass + + __call__ = switch + + def save_and_switch(self): + self.save() + self.switch() + + # Methods to run the experiment + def setup(self): pass + + def __enter__(self): + pass + + def __exit__(self): + pass + def run(self): pass - def on_sigterm(signo, frame): - channel_rval[0] = 'stop' +class SingleChannel(Channel): - #install a SIGTERM handler that asks the run_state function to return - signal.signal(signal.SIGTERM, on_sigterm) - signal.signal(signal.SIGINT, on_sigterm) - - -class SingleChannel(Channel): def __init__(self, experiment, state): self.experiment = experiment self.state = state -# def switch(self, *message): -# if greenlet: -# if greenlet.getcurrent() is self.expg: -# self.feedback = message -# self.manager.switch(message) -# else: -# self.expg.switch(message) -# else: -# self.feedback = message - def run(self, interactive = False): - if interactive and not greenlet: - raise Exception('interactive mode requires the greenlet package to be installed (try easy_install greenlet or easy_install py)') - self.interactive = interactive + self.feedback = None + + def switch(self, message): + feedback = self.feedback + self.feedback = None + return feedback + + def run(self): + # install a SIGTERM handler that asks the experiment function to return + # the next time it will call switch() + def on_sigterm(signo, frame): + self.feedback = 'stop' + signal.signal(signal.SIGTERM, on_sigterm) + signal.signal(signal.SIGINT, on_sigterm) + self.setup() - self.state['job'].setdefault('complete', False) - if self.state['job']['complete']: - raise Complete('The job has already completed.') -# if greenlet: -# # self.manager = greenlet.getcurrent() -# # self.expg = greenlet.greenlet(self.experiment) -# # expg.switch(self, self.state) -# else: - v = self.experiment(self, self.state) - self.state['job']['complete'] = v is COMPLETE - self.save() + + status = self.state['dbdict'].get('status', self.START) + if status is self.DONE: + raise Exception('The job has already completed.') + elif status is self.RUNNING: + raise Exception('The job is already running.') + self.state['dbdict'].setdefault('status', self.START) + + with self: + v = self.experiment(self, self.state) + self.state['dbdict']['status'] = self.DONE if v is self.COMPLETE else self.START + return v + def __enter__(self): + return self + + def __exit__(self, type, value, traceback): + self.save() + + class StandardChannel(SingleChannel): - def __init__(self, root, experiment, state): + + def __init__(self, root, experiment, state, redirect_stdout = False, redirect_stderr = False): self.root = root self.experiment = experiment self.state = state self.dirname = format_d(self.state, sep=',', space=False) self.path = os.path.join(self.root, self.dirname) + self.lock = None + self.redirect_stdout = redirect_stdout + self.redirect_stderr = redirect_stderr + def save(self): + with open(os.path.join(self.path, 'current.conf'), 'w') as current: + current.write(format_d(self.state)) + current.write('\n') + + def __enter__(self): + ###assert self.lock is None + ##lockf = os.path.join(self.path, 'lock') + ##self.lock = open(lockf, 'r+') + ##portalocker.lock(self.lock, portalocker.LOCK_EX) + #self.lock = FileLock.FileLock(os.path.join(self.path, 'lock')) + #self.lock.lock() + self.old_cwd = os.getcwd() os.chdir(self.path) - current = open('current.conf', 'w') - current.write(format_d(self.state)) - current.write('\n') + if self.redirect_stdout: + self.old_stdout = sys.stdout + sys.stdout = open('stdout', 'a') + if self.redirect_stderr: + self.old_stderr = sys.stderr + sys.stderr = open('stderr', 'a') + return super(StandardChannel, self).__enter__() + + def __exit__(self, type, value, traceback): + ###assert self.lock is not None + if self.redirect_stdout: + sys.stdout.close() + sys.stdout = self.old_stdout + if self.redirect_stderr: + sys.stderr.close() + sys.stderr = self.old_stderr + os.chdir(self.old_cwd) + ##self.lock.close() + #self.lock.unlock() + ###self.lock = None + return super(StandardChannel, self).__exit__(type, value, traceback) + def setup(self): if not os.path.isdir(self.path): os.mkdir(self.path) - os.chdir(self.path) - if not os.path.isfile('orig.conf'): - orig = open('orig.conf', 'w') - orig.write(format_d(self.state)) - orig.write('\n') - if os.path.isfile('current.conf'): - self.state = expand(parse(*map(str.strip, open('current.conf', 'r').readlines()))) + with self: + origf = os.path.join(self.path, 'orig.conf') + if not os.path.isfile(origf): + with open(origf, 'w') as orig: + orig.write(format_d(self.state)) + orig.write('\n') + currentf = os.path.join(self.path, 'current.conf') + if os.path.isfile(currentf): + with open(currentf, 'r') as current: + self.state = expand(parse(*map(str.strip, current.readlines()))) + +# origf = os.path.join(self.path, 'orig.conf') +# if not os.path.isfile(origf): +# with open(os.path.isfile(origf), 'w') as orig: +# orig.write(format_d(self.state)) +# orig.write('\n') +# currentf = os.path.join(self.path, 'current.conf') +# if os.path.isfile(currentf): +# with open(currentf, 'r') as current: +# self.state = expand(parse(*map(str.strip, current.readlines()))) + class RSyncException(Exception): pass @@ -348,7 +443,8 @@ """ state = expand(parse(*strings)) experiment = resolve(experiment) - channel = RSyncChannel(os.getcwd(), os.path.realpath('yaddayadda'), experiment, state) + #channel = RSyncChannel(os.getcwd(), os.path.realpath('yaddayadda'), experiment, state) + channel = StandardChannel(os.getcwd(), experiment, state) channel.run() runner_registry['cmdline'] = cmdline