Mercurial > pylearn
view pylearn/dbdict/newstuff.py @ 576:ef424abb7458
bugfixes and a lot of documentation
author | Olivier Breuleux <breuleuo@iro.umontreal.ca> |
---|---|
date | Thu, 04 Dec 2008 16:51:46 -0500 |
parents | 9f5891cd4048 |
children | a027c4cedf98 |
line wrap: on
line source
from __future__ import with_statement from collections import defaultdict import re, sys, inspect, os, signal, tempfile, shutil, socket import sql ################################################################################ ### misc ################################################################################ class DD(defaultdict): def __getattr__(self, attr): return self[attr] def __setattr__(self, attr, value): self[attr] = value def __str__(self): return 'DD%s' % dict(self) def __repr__(self): return str(self) ################################################################################ ### resolve ################################################################################ def resolve(name): symbols = name.split('.') builder = __import__(symbols[0]) for sym in symbols[1:]: builder = getattr(builder, sym) return builder ################################################################################ ### dictionary ################################################################################ def convert(obj): try: return eval(obj, {}, {}) except (NameError, SyntaxError): return obj def flatten(obj): d = {} def helper(d, prefix, obj): if isinstance(obj, (str, int, float)): d[prefix] = obj #convert(obj) else: if isinstance(obj, dict): subd = obj else: subd = obj.state() subd['__builder__'] = '%s.%s' % (obj.__module__, obj.__class__.__name__) for k, v in subd.iteritems(): pfx = '.'.join([prefix, k]) if prefix else k helper(d, pfx, v) helper(d, '', obj) return d def expand(d): def dd(): return DD(dd) struct = dd() for k, v in d.iteritems(): if k == '': raise NotImplementedError() else: keys = k.split('.') current = struct for k2 in keys[:-1]: current = current[k2] current[keys[-1]] = v #convert(v) return struct def realize(d): if not isinstance(d, dict): return d d = dict((k, realize(v)) for k, v in d.iteritems()) if '__builder__' in d: builder = resolve(d.pop('__builder__')) return builder(**d) return d def make(d): return realize(expand(d)) ################################################################################ ### errors ################################################################################ class UsageError(Exception): pass ################################################################################ ### parsing and formatting ################################################################################ def parse(*strings): d = {} for string in strings: s1 = re.split(' *= *', string, 1) s2 = re.split(' *:: *', string, 1) if len(s1) == 1 and len(s2) == 1: raise UsageError('Expected a keyword argument in place of "%s"' % s1[0]) elif len(s1) == 2: k, v = s1 v = convert(v) elif len(s2) == 2: k, v = s2 k += '.__builder__' d[k] = v return d def format_d(d, sep = '\n', space = True): d = flatten(d) pattern = "%s = %r" if space else "%s=%r" return sep.join(pattern % (k, v) for k, v in d.iteritems()) def format_help(topic): if topic is None: return 'No help.' elif isinstance(topic, str): help = topic elif hasattr(topic, 'help'): help = topic.help() else: help = topic.__doc__ if not help: return 'No help.' ss = map(str.rstrip, help.split('\n')) try: baseline = min([len(line) - len(line.lstrip()) for line in ss if line]) except: return 'No help.' s = '\n'.join([line[baseline:] for line in ss]) s = re.sub(string = s, pattern = '\n{2,}', repl = '\n\n') s = re.sub(string = s, pattern = '(^\n*)|(\n*$)', repl = '') return s ################################################################################ ### single channels ################################################################################ # try: # import greenlet # except: # try: # from py import greenlet # except: # print >>sys.stderr, 'the greenlet module is unavailable' # greenlet = None class Channel(object): COMPLETE = None INCOMPLETE = True START = 0 """dbdict.status == START means a experiment is ready to run""" RUNNING = 1 """dbdict.status == RUNNING means a experiment is running on dbdict_hostname""" DONE = 2 """dbdict.status == DONE means a experiment has completed (not necessarily successfully)""" # Methods to be used by the experiment to communicate with the channel def save(self): """ Save the experiment's state to the various media supported by the Channel. """ raise NotImplementedError() def switch(self, message = None): """ Called from the experiment to give the control back to the channel. The following return values are meaningful: * 'stop' -> the experiment must stop as soon as possible. It may save what it needs to save. This occurs when SIGTERM or SIGINT are sent (or in user-defined circumstances). switch() may give the control to the user. In this case, the user may resume the experiment by calling switch() again. If an argument is given by the user, it will be relayed to the experiment. """ pass def __call__(self, message = None): return self.switch(message) def save_and_switch(self): self.save() self.switch() # Methods to run the experiment def setup(self): pass def __enter__(self): pass def __exit__(self): pass def run(self): pass class JobError(Exception): RUNNING = 0 DONE = 1 NOJOB = 2 class SingleChannel(Channel): def __init__(self, experiment, state): self.experiment = experiment self.state = state self.feedback = None def switch(self, message = None): feedback = self.feedback self.feedback = None return feedback def run(self, force = False): self.setup() status = self.state.dbdict.get('status', self.START) if status is self.DONE and not force: # If you want to disregard this, use the --force flag (not yet implemented) raise JobError(JobError.RUNNING, 'The job has already completed.') elif status is self.RUNNING and not force: raise JobError(JobError.DONE, 'The job is already running.') self.state.dbdict.status = self.RUNNING v = self.COMPLETE with self: try: v = self.experiment(self.state, self) finally: self.state.dbdict.status = self.DONE if v is self.COMPLETE else self.START return v def on_sigterm(self, signo, frame): # SIGTERM handler. It is the experiment function's responsibility to # call switch() often enough to get this feedback. self.feedback = 'stop' def __enter__(self): # install a SIGTERM handler that asks the experiment function to return # the next time it will call switch() self.prev_sigterm = signal.getsignal(signal.SIGTERM) self.prev_sigint = signal.getsignal(signal.SIGINT) signal.signal(signal.SIGTERM, self.on_sigterm) signal.signal(signal.SIGINT, self.on_sigterm) return self def __exit__(self, type, value, traceback): signal.signal(signal.SIGTERM, self.prev_sigterm) signal.signal(signal.SIGINT, self.prev_sigint) self.prev_sigterm = None self.prev_sigint = None self.save() class StandardChannel(SingleChannel): def __init__(self, path, experiment, state, redirect_stdout = False, redirect_stderr = False): super(StandardChannel, self).__init__(experiment, state) self.path = os.path.realpath(path) self.redirect_stdout = redirect_stdout self.redirect_stderr = redirect_stderr def save(self): with open(os.path.join(self.path, 'current.conf'), 'w') as current: current.write(format_d(self.state)) current.write('\n') def __enter__(self): self.old_cwd = os.getcwd() os.chdir(self.path) if self.redirect_stdout: self.old_stdout = sys.stdout sys.stdout = open('stdout', 'a') if self.redirect_stderr: self.old_stderr = sys.stderr sys.stderr = open('stderr', 'a') return super(StandardChannel, self).__enter__() def __exit__(self, type, value, traceback): if self.redirect_stdout: sys.stdout.close() sys.stdout = self.old_stdout if self.redirect_stderr: sys.stderr.close() sys.stderr = self.old_stderr os.chdir(self.old_cwd) return super(StandardChannel, self).__exit__(type, value, traceback) def setup(self): if not os.path.isdir(self.path): os.makedirs(self.path) with self: origf = os.path.join(self.path, 'orig.conf') if not os.path.isfile(origf): with open(origf, 'w') as orig: orig.write(format_d(self.state)) orig.write('\n') currentf = os.path.join(self.path, 'current.conf') if os.path.isfile(currentf): with open(currentf, 'r') as current: self.state = expand(parse(*map(str.strip, current.readlines()))) class RSyncException(Exception): pass class RSyncChannel(StandardChannel): def __init__(self, path, remote_path, experiment, state, redirect_stdout = False, redirect_stderr = False): super(RSyncChannel, self).__init__(path, experiment, state, redirect_stdout, redirect_stderr) ssh_prefix='ssh://' if remote_path.startswith(ssh_prefix): remote_path = remote_path[len(ssh_prefix):] colon_pos = remote_path.find(':') self.host = remote_path[:colon_pos] self.remote_path = remote_path[colon_pos+1:] else: self.host = '' self.remote_path = os.path.realpath(remote_path) def rsync(self, direction): """The directory at which experiment-related files are stored. """ path = self.path remote_path = self.remote_path if self.host: remote_path = ':'.join([self.host, remote_path]) # TODO: use something more portable than os.system if direction == 'push': rsync_cmd = 'rsync -ac "%s/" "%s/"' % (path, remote_path) elif direction == 'pull': rsync_cmd = 'rsync -ac "%s/" "%s/"' % (remote_path, path) else: raise RSyncException('invalid direction', direction) rsync_rval = os.system(rsync_cmd) if rsync_rval != 0: raise RSyncException('rsync failure', (rsync_rval, rsync_cmd)) def touch(self): if self.host: host = self.host touch_cmd = ('ssh %(host)s "mkdir -p \'%(path)s\'"' % dict(host = self.host, path = self.remote_path)) else: touch_cmd = ("mkdir -p '%(path)s'" % dict(path = self.remote_path)) # print "ECHO", touch_cmd touch_rval = os.system(touch_cmd) if 0 != touch_rval: raise Exception('touch failure', (touch_rval, touch_cmd)) def pull(self): return self.rsync('pull') def push(self): return self.rsync('push') def save(self): super(RSyncChannel, self).save() self.push() def setup(self): self.touch() self.pull() super(RSyncChannel, self).setup() class DBRSyncChannel(RSyncChannel): RESTART_PRIORITY = 2.0 def __init__(self, username, password, hostname, dbname, tablename, path, remote_root, redirect_stdout = False, redirect_stderr = False): self.username, self.password, self.hostname, self.dbname, self.tablename \ = username, password, hostname, dbname, tablename self.db = sql.postgres_serial( user = self.username, password = self.password, host = self.hostname, database = self.dbname, table_prefix = self.tablename) self.dbstate = sql.book_dct_postgres_serial(self.db) if self.dbstate is None: raise JobError(JobError.NOJOB, 'No job was found to run.') try: state = expand(self.dbstate) experiment = resolve(state.dbdict.experiment) remote_path = os.path.join(remote_root, self.dbname, self.tablename, str(self.dbstate.id)) super(DBRSyncChannel, self).__init__(path, remote_path, experiment, state, redirect_stdout, redirect_stderr) except: self.dbstate['dbdict.status'] = self.DONE raise def save(self): super(DBRSyncChannel, self).save() self.dbstate.update(flatten(self.state)) def setup(self): # Extract a single experiment from the table that is not already running. # set self.experiment and self.state super(DBRSyncChannel, self).setup() self.state.dbdict.sql.host_name = socket.gethostname() self.state.dbdict.sql.host_workdir = self.path self.dbstate.update(flatten(self.state)) def run(self): # We pass the force flag as True because the status flag is # already set to RUNNING by book_dct in __init__ v = super(DBRSyncChannel, self).run(force = True) if v is self.INCOMPLETE and self.state.dbdict.sql.priority != self.RESTART_PRIORITY: self.state.dbdict.sql.priority = self.RESTART_PRIORITY self.save() return v ################################################################################ ### running ################################################################################ def run(type, arguments): runner = runner_registry.get(type, None) if not runner: raise UsageError('Unknown runner: "%s"' % type) argspec = inspect.getargspec(runner) minargs = len(argspec[0])-(len(argspec[3]) if argspec[3] else 0) maxargs = len(argspec[0]) if minargs > len(arguments) or maxargs < len(arguments) and not argspec[1]: s = format_help(runner) raise UsageError(s) runner(*arguments) runner_registry = dict() def runner_cmdline(experiment, *strings): """ Start an experiment with parameters given on the command line. Usage: cmdline <experiment> <prop1::type> <parameters> Run an experiment with parameters provided on the command line. See the help topics for experiment and parameters for syntax information. Example use: dbdict-run cmdline mymodule.my_experiment \\ stopper::pylearn.stopper.nsteps \\ # use pylearn.stopper.nsteps stopper.n=10000 \\ # the argument "n" of nsteps is 10000 lr=0.03 """ state = expand(parse(*strings)) state.dbdict.experiment = experiment experiment = resolve(experiment) #channel = RSyncChannel('.', 'yaddayadda', experiment, state) channel = StandardChannel(format_d(state, sep=',', space = False), experiment, state) channel.run() runner_registry['cmdline'] = runner_cmdline def runner_sqlschedule(dbdescr, experiment, *strings): """ Schedule a job to run using the sql command. Usage: sqlschedule <tablepath> <experiment> <parameters> See the experiment and parameters topics for more information about these parameters. Assuming that a postgres database is running on `host`, contains a database called `dbname` and that `user` has the permissions to create, read and modify tables on that database, tablepath should be of the following form: postgres://user:pass@host/dbname/tablename If no table is named `tablename`, one will be created automatically. The state corresponding to the experiment and parameters specified in the command will be saved in the database, but no experiment will be run. To run an experiment scheduled using sqlschedule, see the sql command. Example use: dbdict-run sqlschedule postgres://user:pass@host/dbname/tablename \\ mymodule.my_experiment \\ stopper::pylearn.stopper.nsteps \\ # use pylearn.stopper.nsteps stopper.n=10000 \\ # the argument "n" of nsteps is 10000 lr=0.03 """ try: username, password, hostname, dbname, tablename \ = sql.parse_dbstring(dbdescr) except: raise UsageError('Wrong syntax for dbdescr') db = sql.postgres_serial( user = username, password = password, host = hostname, database = dbname, table_prefix = tablename) state = parse(*strings) try: resolve(experiment) except: raise UsageError('The first parameter to sqlschedule must be a valid, importable symbol.') state['dbdict.experiment'] = experiment sql.add_experiments_to_db([state], db, verbose = 1) runner_registry['sqlschedule'] = runner_sqlschedule def runner_sql(dbdescr, exproot): """ Run jobs from a sql table. Usage: sql <tablepath> <exproot> The jobs should be scheduled first with the sqlschedule command. Assuming that a postgres database is running on `host`, contains a database called `dbname` and that `user` has the permissions to create, read and modify tables on that database, tablepath should be of the following form: postgres://user:pass@host/dbname/tablename exproot can be a local path or a remote path. Examples of exproots: /some/local/path ssh://some_host:/some/remote/path # relative to the filesystem root ssh://some_host:other/remote/path # relative to the HOME on some_host The exproot will contain a subdirectory hierarchy corresponding to the dbname, tablename and job id which is a unique integer. The sql runner will pick any job in the table which is not running and is not done and will terminate when that job ends. You may call the same command multiple times, sequentially or in parallel, to run as many unfinished jobs as have been scheduled in that table with sqlschedule. Example use: dbdict-run sql \\ postgres://user:pass@host/dbname/tablename \\ ssh://central_host:myexperiments """ try: username, password, hostname, dbname, tablename \ = sql.parse_dbstring(dbdescr) except: raise UsageError('Wrong syntax for dbdescr') workdir = tempfile.mkdtemp() #print 'wdir', workdir channel = DBRSyncChannel(username, password, hostname, dbname, tablename, workdir, exproot, redirect_stdout = True, redirect_stderr = True) channel.run() shutil.rmtree(workdir, ignore_errors=True) runner_registry['sql'] = runner_sql def help(topic = None): """ Get help for a topic. Usage: help <topic> """ def bold(x): return '\033[1m%s\033[0m' % x if topic is None: print bold('Topics: (use help <topic> for more info)') print 'example Example of defining and running an experiment.' print 'experiment How to define an experiment.' print 'parameters How to list the parameters for an experiment.' print print bold('Available commands: (use help <command> for more info)') for name, command in sorted(runner_registry.iteritems()): print name.ljust(20), format_help(command).split('\n')[0] return elif topic == 'experiment': helptext = """ dbdict-run serves to run experiments. To define an experiment, you only have to define a function respecting the following protocol in a python file or module: def my_experiment(state, channel): # experiment code goes here The return value of my_experiment may be channel.COMPLETE or channel.INCOMPLETE. If the latter is returned, the experiment may be resumed at a later point. Note that the return value `None` is interpreted as channel.COMPLETE. If a command defined by dbdict-run has an <experiment> parameter, that parameter must be a string such that it could be used in a python import statement to import the my_experiment function. For example if you defined my_experiment in my_module.py, you can pass 'my_module.my_experiment' as the experiment parameter. When entering my_experiment, the current working directory will be set for you to a directory specially created for the experiment. The location and name of that directory vary depending on which dbdict-run command you run. You may create logs, save files, pictures, results, etc. in it. state is an object containing the parameters given to the experiment. For example, if you run the followinc command: dbdict-run cmdline my_module.my_experiment a.x=6 `state.a.x` will contain the integer 6, and so will `state['a']['x']`. If the state is changed, it will be saved when the experiment ends or when channel.save() is called. The next time the experiment is run with the same working directory, the modified state will be provided. It is not recommended to store large amounts of data in the state. It should be limited to scalar or string parameters. Results such as weight matrices should be stored in files in the working directory. channel is an object with the following important methods: - channel.switch() (or channel()) will give the control back to the user, if it is appropriate to do so. If a call to channel.switch() returns the string 'stop', it typically means that the signal SIGTERM (or SIGINT) was received. Therefore, the experiment may be killed soon, so it should save and return True or channel.INCOMPLETE so it can be resumed later. This should be checked periodically or data loss may be incurred. - channel.save() will save the current state. It is automatically called when the function returns, but it is a good idea to do it periodically. - channel.save_and_switch() is an useful shortcut to do both operations described above. """ elif topic == 'parameters': helptext = """ If a command takes <parameters> arguments, the arguments should each take one of the following forms: key=value Set a parameter with name `key` to `value`. The value will be casted to an appropriate type automatically and it will be accessible to the experiment using `state.key`. If `key` is a dotted name, the value will be set in nested dictionaries corresponding to each part. Examples: a=1 state.a <- 1 b=2.3 state.b <- 2.3 c.d="hello" state.c.d <- "hello" key::builder This is equivalent to key.__builder__=builder. The builder should be a symbol that can be used with import or __import__ and it should be callable. If a key has a builder defined, the experiment code may easily make an object out of it using the `make` function. `obj = make(state.key)`. This will call the builder on the substate corresponding to state.key, as will be made clear in the example: Example: regexp::re.compile regexp.pattern='a.*c' from pylearn.dbdict.newstuff import make def experiment(state, channel): regexp = make(state.regexp) # regexp is now re.compile(pattern = 'a.*c') print regexp.sub('blahblah', 'hello abbbbc there') If the above experiment was called with the state produced by the parameters in the example, it would print 'hello blahblah there'. """ elif topic == 'example': helptext = """ Example of an experiment that trains some model for 100000 iterations: # defined in: my_experiments.py def experiment(state, channel): try: model = cPickle.load(open('model', 'r')) except: model = my_model(state.some_param, state.other_param) state.n = 0 dataset = my_dataset(skipto = state.n) for i in xrange(100000 - state.n): model.update(dataset.next()) if i and i % 1000 == 0: if channel.save_and_switch() == 'stop': state.n += i + 1 rval = channel.INCOMPLETE break else: state.result = model.cost(some_test_set) rval = channel.COMPLETE cPickle.dump(model, open('model', 'w')) return rval And then you could run it this way: dbdict-run cmdline my_experiments.experiment \\ some_param=1 \\ other_param=0.4 Or this way: dbdict-run sqlschedule postgres://user:pass@host/dbname/tablename \\ my_experiments.experiment \\ some_param=1 \\ other_param=0.4 dbdict-run sql postgres://user:pass@host/dbname/tablename workdir You need to make sure that the module `my_experiments` is accessible from python. You can check with the command $ python -m my_experiments """ else: helptext = runner_registry.get(topic, None) print format_help(helptext) runner_registry['help'] = help ################################################################################ ### main ################################################################################ def run_cmdline(): try: if len(sys.argv) <= 1: raise UsageError('Usage: %s <run_type> [<arguments>*]' % sys.argv[0]) run(sys.argv[1], sys.argv[2:]) except UsageError, e: print 'Usage error:' print e if __name__ == '__main__': run_cmdline() # fuck this shit # ################################################################################ # ### multiple channels # ################################################################################ # class MultipleChannel(Channel): # def switch(self, job, *message): # raise NotImplementedError('This Channel does not allow switching between jobs.') # def broadcast(self, *message): # raise NotImplementedError() # class SpawnChannel(MultipleChannel): # # spawns one process for each task # pass # class GreenletChannel(MultipleChannel): # # uses a single process for all tasks, using greenlets to switch between them # pass # Hello, # I have changed/improved the dbdict interface a bit. All the new features are in pylearn.dbdict.newstuff and I copied and modified some existing features in another new file, pylearn.dbdict.sql. # I'm not completely sure how to organize the help right now (will do tomorrow) but in a nutshell you can use it in several ways already: # command line: # python newstuff.py cmdline mymodule.some_experiment a=2 b=3 c.x=4 c.y="hello" # register an experiment in sql: # python newstuff.py sqlschedule postgres://user:pass@host/dbname/tablename mymodule.some_experiment a=2 b=3 c.x=4 c.y="hello" # run experiments found in an sql table (and ideally registered using sqlschedule): # python newstuff.py sql postgres://user:pass@host/dbname/tablename ssh://host:remote_directory # I verified that SIGINT and SIGTERM are caught correctly. They are. I am missing handling for push_error. # Options that I am planning to add: # cmdline: --force to run a job that's already running or already done # sql: -n<n> to run n jobs instead of just one # sql: --retry-failed to re-run a job that raised an exception (if your mistake was at the end of training, it's nice to have) (this would require a special status in the db) # sql: --display to display stdout and stderr in addition to redirecting them to files # sqlschedule: --force to reset the status of the job described to START, if it already exists (useful if a bug leaves the status to RUNNING) # The experiment is a function like: # def run(state, channel): # The state is a sort of nested dictionary where getattr and getitem, setattr and setitem are aliased. # Olivier