# HG changeset patch # User Pascal Lamblin # Date 1233697660 18000 # Node ID fa6c399dc292e0aa857924ef54ebe5ce34f67a98 # Parent d8af9c8a4bbbbfd67dd198d33175a8fcd4179984 Get rid of dbdict, it now has its own repository. diff -r d8af9c8a4bbb -r fa6c399dc292 bin/dbdict-query --- a/bin/dbdict-query Mon Feb 02 17:41:54 2009 -0500 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,3 +0,0 @@ -#!/usr/bin/env python -from dbdict.tools import standalone_query -standalone_query() diff -r d8af9c8a4bbb -r fa6c399dc292 bin/dbdict-run --- a/bin/dbdict-run Mon Feb 02 17:41:54 2009 -0500 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,3 +0,0 @@ -#!/usr/bin/env python -from pylearn.dbdict.newstuff import run_cmdline -run_cmdline() diff -r d8af9c8a4bbb -r fa6c399dc292 bin/dbdict-run-job --- a/bin/dbdict-run-job Mon Feb 02 17:41:54 2009 -0500 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,3 +0,0 @@ -#!/usr/bin/env python -from dbdict.tools import standalone_run_job -standalone_run_job() diff -r d8af9c8a4bbb -r fa6c399dc292 pylearn/dbdict/__init__.py --- a/pylearn/dbdict/__init__.py Mon Feb 02 17:41:54 2009 -0500 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,1 +0,0 @@ -from api0 import sqlite_file_db, sqlite_memory_db, postgres_db diff -r d8af9c8a4bbb -r fa6c399dc292 pylearn/dbdict/api0.py --- a/pylearn/dbdict/api0.py Mon Feb 02 17:41:54 2009 -0500 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,563 +0,0 @@ -from sqlalchemy import create_engine, desc -import sqlalchemy.pool -from sqlalchemy.orm import sessionmaker -from sqlalchemy import Table, Column, MetaData, ForeignKey, ForeignKeyConstraint -from sqlalchemy import Integer, String, Float, Boolean, DateTime, Text, Binary -from sqlalchemy.databases import postgres -from sqlalchemy.orm import mapper, relation, backref, eagerload -from sqlalchemy.sql import operators, select -from sql_commands import crazy_sql_command - -class Todo(Exception): """Replace this with some working code!""" - -class DbHandle (object): - """ - This class also provides filtering shortcuts that hide the names of the - DbHandle internal databases. - - Attributes: - dict_table - pair_table - - - - dict_table - - An SqlAlchemy-mapped class corresponding to database table with the - following schema: - - Column('id', Integer, primary_key=True) - Column('create', DateTime) - Column('write', DateTime) - Column('read', DateTime) - - #TODO: reconsider create/read/write - - pair_table - - An SqlAlchemy-mapped class corresponding to database table with the - following schema: - - Column('id', Integer, primary_key=True) - Column('name', String(128)) - Column('ntype', Boolean) - Column('fval', Double) - Column('sval', Text)) - Column('bval', Blob) - - #TODO: Consider difference between text and binary - #TODO: Consider adding a 'type' column - #TODO: Consider union? - #TODO: Are there stanard ways of doing this kind of thing? - - """ - - e_bad_table = 'incompatible columns in table' - - def __init__(h_self, Session, engine, dict_table, pair_table): - h_self._engine = engine; - h_self._dict_table = dict_table - h_self._pair_table = pair_table - - #TODO: replace this crude algorithm (ticket #17) - if ['id', 'create', 'write', 'read', 'status', 'priority','hash'] != [c.name for c in dict_table.c]: - raise ValueError(h_self.e_bad_table, dict_table) - if ['id', 'dict_id', 'name', 'ntype', 'fval', 'sval', 'bval'] != [c.name for c in pair_table.c]: - raise ValueError(h_self.e_bad_table, pair_table) - - h_self._session_fn = Session - - class KeyVal (object): - """KeyVal interfaces between python types and the database. - - It encapsulates heuristics for type conversion. - """ - def __init__(k_self, name, val): - k_self.name = name - k_self.val = val - def __repr__(k_self): - return "" % (k_self.id, k_self.name, repr(k_self.val)) - def __get_val(k_self): - val = None - if k_self.fval is not None: val = [int, float][k_self.ntype](k_self.fval) - if k_self.bval is not None: val = eval(str(k_self.bval)) - if k_self.sval is not None: val = k_self.sval - return val - def __set_val(k_self, val): - if isinstance(val, (str,unicode)): - k_self.fval = None - k_self.bval = None - k_self.sval = val - else: - k_self.sval = None - try: - f = float(val) - except (TypeError, ValueError): - f = None - if f is None: #binary data - k_self.bval = repr(val) - assert eval(k_self.bval) == val - k_self.fval = None - k_self.ntype = None - else: - k_self.bval = None - k_self.fval = f - k_self.ntype = isinstance(val,float) - val = property(__get_val, __set_val) - - mapper(KeyVal, pair_table) - - class Dict (object): - """ - Instances are dict-like objects with additional features for - communicating with an active database. - - This class will be mapped by SqlAlchemy to the dict_table. - - Attributes: - handle - reference to L{DbHandle} (creator) - - """ - def __init__(d_self, session=None): - if session is None: - s = h_self._session_fn() - s.add(d_self) #d_self transient -> pending - s.commit() #d_self -> persistent - s.close() #d_self -> detached - else: - s = session - s.save(d_self) - - _forbidden_keys = set(['session']) - - # - # dictionary interface - # - - def __contains__(d_self, key): - for a in d_self._attrs: - if a.name == key: - return True - return False - - def __eq__(self, other): - return dict(self) == dict(other) - def __neq__(self, other): - return dict(self) != dict(other) - - def __getitem__(d_self, key): - for a in d_self._attrs: - if a.name == key: - return a.val - raise KeyError(key) - - def __setitem__(d_self, key, val, session=None): - if session is None: - s = h_self._session_fn() - s.add(d_self) - d_self._set_in_session(key, val, s) - s.commit() - s.close() - else: - s = session - s.add(d_self) - d_self._set_in_session(key, val, s) - - def __delitem__(d_self, key, session=None): - if session is None: - s = h_self._session_fn() - commit_close = True - else: - s = session - commit_close = False - s.add(d_self) - - #find the item to delete in d_self._attrs - to_del = None - for i,a in enumerate(d_self._attrs): - if a.name == key: - assert to_del is None - to_del = (i,a) - if to_del is None: - raise KeyError(key) - else: - i, a = to_del - s.delete(a) - del d_self._attrs[i] - if commit_close: - s.commit() - s.close() - - def iteritems(d_self): - return d_self.items() - - def items(d_self): - return [(kv.name, kv.val) for kv in d_self._attrs] - - def keys(d_self): - return [kv.name for kv in d_self._attrs] - - def values(d_self): - return [kv.val for kv in d_self._attrs] - - def update(d_self, dct, session=None, **kwargs): - """Like dict.update(), set keys from kwargs""" - if session is None: - s = h_self._session_fn() - commit_close = True - else: - s = session - commit_close = False - s.add(d_self) - for k, v in dct.items(): - d_self._set_in_session(k, v, s) - for k, v in kwargs.items(): - d_self._set_in_session(k, v, s) - - if commit_close: - s.commit() - s.close() - - def get(d_self, key, default): - try: - return d_self[key] - except KeyError: - return default - - def __str__(self): - return 'Dict'+ str(dict(self)) - - # - # database stuff - # - - def refresh(d_self, session=None): - """Sync key-value pairs from database to self - - @param session: use the given session, and do not commit. - - """ - if session is None: - session = h_self._session_fn() - session.add(d_self) #so session knows about us - session.refresh(d_self) - session.commit() - session.close() - else: - session.add(d_self) #so session knows about us - session.refresh(self.dbrow) - - def delete(d_self, session=None): - """Delete this dictionary from the database - - @param session: use the given session, and do not commit. - """ - if session is None: - session = h_self._session_fn() - session.add(d_self) #so session knows about us - session.delete(d_self) #mark for deletion - session.commit() - session.close() - else: - session.add(d_self) #so session knows about us - session.delete(d_self) - - # helper routine by update() and __setitem__ - def _set_in_session(d_self, key, val, session): - """Modify an existing key or create a key to hold val""" - - #FIRST SOME MIRRORING HACKS - if key == 'dbdict.status': - ival = int(val) - d_self.status = ival - if key == 'dbdict.sql.priority': - fval = float(val) - d_self.priority = fval - if key == 'dbdict.hash': - ival = int(val) - d_self.hash = ival - - if key in d_self._forbidden_keys: - raise KeyError(key) - created = None - for i,a in enumerate(d_self._attrs): - if a.name == key: - assert created == None - created = h_self._KeyVal(key, val) - d_self._attrs[i] = created - if not created: - created = h_self._KeyVal(key, val) - d_self._attrs.append(created) - session.save(created) - - mapper(Dict, dict_table, - properties = { - '_attrs': relation(KeyVal, - cascade="all, delete-orphan") - }) - - class _Query (object): - """ - Attributes: - _query - SqlAlchemy.Query object - """ - - def __init__(q_self, query): - q_self._query = query - - def __iter__(q_self): - return q_self.all().__iter__() - - def __getitem__(q_self, item): - return q_self._query.__getitem__(item) - - def filter_eq(q_self, kw, arg): - """Return a Query object that restricts to dictionaries containing - the given kwargs""" - - #Note: when we add new types to the key columns, add them here - q = q_self._query - T = h_self._Dict - if isinstance(arg, (str,unicode)): - q = q.filter(T._attrs.any(name=kw, sval=arg)) - else: - try: - f = float(arg) - except (TypeError, ValueError): - f = None - if f is None: - q = q.filter(T._attrs.any(name=kw, bval=repr(arg))) - else: - q = q.filter(T._attrs.any(name=kw, fval=f)) - - return h_self._Query(q) - - def filter_eq_dct(q_self, dct): - rval = q_self - for key, val in dct.items(): - rval = rval.filter_eq(key,val) - return rval - - def all(q_self): - """Return an iterator over all matching dictionaries. - - See L{SqlAlchemy.Query} - """ - return q_self._query.all() - - def count(q_self): - """Return the number of matching dictionaries. - - See L{SqlAlchemy.Query} - """ - return q_self._query.count() - - def first(q_self): - """Return some matching dictionary, or None - See L{SqlAlchemy.Query} - """ - return q_self._query.first() - - def all_ordered_by(q_self, key, desc=False): - """Return query results, sorted. - - @type key: string or tuple of string or list of string - @param: keys by which to sort the results. - - @rtype: list of L{DbHandle._Dict} instances - @return: query results, sorted by given keys - """ - - # order_by is not easy to do in SQL based on the data structures we're - # using. Considering we support different data types, it may not be - # possible at all. - # - # It would be easy if 'pivot' or 'crosstab' were provided as part of the - # underlying API, but they are not. For example, read this: - # http://www.simple-talk.com/sql/t-sql-programming/creating-cross-tab-queries-and-pivot-tables-in-sql/ - - # load query results - results = list(q_self.all()) - - if isinstance(key, (tuple, list)): - val_results = [([d[k] for k in key], d) for d in results] - else: - val_results = [(d[key], d) for d in results] - - val_results.sort() #interesting: there is an optional key parameter - if desc: - val_results.reverse() - return [vr[-1] for vr in val_results] - - h_self._KeyVal = KeyVal - h_self._Dict = Dict - h_self._Query = _Query - - def __iter__(h_self): - s = h_self.session() - rval = list(h_self.query(s).__iter__()) - s.close() - return rval.__iter__() - - def insert_kwargs(h_self, session=None, **dct): - """ - @rtype: DbHandle with reference to self - @return: a DbHandle initialized as a copy of dct - - @type dct: dict-like instance whose keys are strings, and values are - either strings, integers, floats - - @param dct: dictionary to insert - - """ - return h_self.insert(dct, session=session) - - def insert(h_self, dct, session=None): - """ - @rtype: DbHandle with reference to self - @return: a DbHandle initialized as a copy of dct - - @type dct: dict-like instance whose keys are strings, and values are - either strings, integers, floats - - @param dct: dictionary to insert - - """ - if session is None: - s = h_self.session() - rval = h_self._Dict(s) - if dct: rval.update(dct, session=s) - s.commit() - s.close() - else: - rval = h_self._Dict(session) - if dct: rval.update(dct, session=session) - return rval - - def query(h_self, session): - """Construct an SqlAlchemy query, which can be subsequently filtered - using the instance methods of DbQuery""" - return h_self._Query(session.query(h_self._Dict)\ - .options(eagerload('_attrs'))) - - def createView(h_self, view): - - s = h_self.session() - cols = [] - - for col in view.columns: - if col.name is "id": - continue; - elif isinstance(col.type, (Integer,Float)): - cols.append([col.name,'fval']); - elif isinstance(col.type,String): - cols.append([col.name,'sval']); - elif isinstance(col.type,Binary): - cols.append([col.name,'bval']); - else: - assert "Error: wrong column type in view",view.name; - - # generate raw sql command string - viewsql = crazy_sql_command(view.name, cols, \ - h_self._dict_table.name, \ - h_self._pair_table.name) - - print 'Creating sql view with command:\n', viewsql; - h_self._engine.execute(viewsql); - s.commit(); - s.close() - - class MappedClass(object): - pass - - mapper(MappedClass, view) - - return MappedClass - - def session(h_self): - return h_self._session_fn() - - def get(h_self, id): - s = h_self.session() - rval = s.query(h_self._Dict).get(id) - if rval: - #eagerload hack - str(rval) - rval.id - s.close() - return rval - - - - -def db_from_engine(engine, - table_prefix='DbHandle_default_', - trial_suffix='trial', - keyval_suffix='keyval'): - """Create a DbHandle instance - - @type engine: sqlalchemy engine (e.g. from create_engine) - @param engine: connect to this database for transactions - - @type table_prefix: string - @type trial_suffix: string - @type keyval_suffix: string - - @rtype: DbHandle instance - - @note: The returned DbHandle will use three tables to implement the - many-to-many pattern that it needs: - - I{table_prefix + trial_suffix}, - - I{table_prefix + keyval_suffix} - - """ - Session = sessionmaker(autoflush=True, autocommit=False) - - metadata = MetaData() - - t_trial = Table(table_prefix+trial_suffix, metadata, - Column('id', Integer, primary_key=True), - Column('create', DateTime), - Column('write', DateTime), - Column('read', DateTime), - Column('status', Integer), - Column('priority', Float(53)), - Column('hash', postgres.PGBigInteger)) - - t_keyval = Table(table_prefix+keyval_suffix, metadata, - Column('id', Integer, primary_key=True), - Column('dict_id', Integer, index=True), - Column('name', String(128), index=True, nullable=False), #name of attribute - Column('ntype', Boolean), - Column('fval', Float(53)), - Column('sval', Text), - Column('bval', Binary), - ForeignKeyConstraint(['dict_id'], [table_prefix+trial_suffix+'.id'])) - - #, ForeignKey('%s.id' % t_trial)), - metadata.bind = engine - metadata.create_all() # no-op when tables already exist - #warning: tables can exist, but have incorrect schema - # see bug mentioned in DbHandle constructor - - return DbHandle(Session, engine, t_trial, t_keyval) - -def sqlite_memory_db(echo=False, **kwargs): - """Return a DbHandle backed by a memory-based database""" - engine = create_engine('sqlite:///:memory:', echo=False) - return db_from_engine(engine, **kwargs) - -def sqlite_file_db(filename, echo=False, **kwargs): - """Return a DbHandle backed by a file-based database""" - engine = create_engine('sqlite:///%s' % filename, echo=False) - return db_from_engine(engine, **kwargs) - -def postgres_db(user, password, host, database, echo=False, poolclass=sqlalchemy.pool.NullPool, **kwargs): - """Create an engine to access a postgres_dbhandle - """ - db_str ='postgres://%(user)s:%(password)s@%(host)s/%(database)s' % locals() - - engine = create_engine(db_str, echo=echo, poolclass=poolclass) - - return db_from_engine(engine, **kwargs) - diff -r d8af9c8a4bbb -r fa6c399dc292 pylearn/dbdict/crap.py --- a/pylearn/dbdict/crap.py Mon Feb 02 17:41:54 2009 -0500 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,256 +0,0 @@ - -from __future__ import absolute_import - -import os, sys, socket, datetime, copy, tempfile, shutil -from .dconfig import Config, load -from .api0 import sqlite_memory_db -from .api0 import postgres_serial - - -class Job(object): - @classmethod - def is_a_default_option(cls, key, val): - if callable(val): - return False - if key.startswith('_'): - return False - if key in ('job_module', 'job_symbol'): - return False - return True - - @classmethod - def options(cls): - opts = [attr for attr in dir(cls) - if cls.is_a_default_option(attr, getattr(cls, attr))] - opts.sort() - return opts - - - def __init__(self, config): - for key, val in config.items(): - if self.is_a_default_option(key, val): - if hasattr(self, key): - setattr(self, key, val) - else: - raise AttributeError("Job has no such option:", key) - - @classmethod - def print_defaults(cls): - for attr in cls.options(): - print attr, '=', getattr(cls, attr) - - def print_config(self, outfile=sys.stdout): - for attr in self.options(): - print >> outfile, attr, '=', getattr(self, attr) - - def save_config(self, outfile=sys.stdout): - """Like print_config, but with parsable format rather than human-readable format""" - kwargs = dict([(attr,getattr(self, attr)) for attr in self.options()]) - Config(**kwargs).save_file(outfile) - -def run_job(fn=None, cwd=None, - - config_path = 'job_config.py', - result_path = 'job_result.py', - stdout_path = 'stdout', - stderr_path = 'stderr', - work_dir = 'workdir' - ): - cwd = os.getcwd() if cwd is None else cwd - -def _pop_cwd(): - try: - cwd = sys.argv.pop(0) - if not cwd.startswith('/'): - cwd = os.path.join(os.getcwd(), cwd) - except IndexError: - cwd = os.getcwd() - return cwd - - -class RunJob(object): - """ - """ - - path_perf = 'job_perf.py' - path_results = 'job_results.py' - path_config = 'job_config.py' - path_fullconfig = 'job_fullconfig.py' - path_stdout = 'stdout' - path_stderr = 'stderr' - path_workdir = 'workdir' - - def __init__(self, exename): - pass - - def _load_config(self, cwd = None): - cwd = _pop_cwd() if cwd is None else cwd - config = load(os.path.join(cwd, self.path_config)) - return config - - def _load_job_class(self, config=None): - config = self._load_config() if config is None else config - job_module_name = config.job_module - job_symbol = config.job_symbol - #level=0 -> absolute imports - #fromlist=[None] -> evaluate the rightmost element of the module name, when it has - # dots (e.g., "A.Z" will return module Z) - job_module = __import__(job_module_name, fromlist=[None], level=0) - try: - job_class = getattr(job_module, job_symbol) - except: - print >> sys.stderr, "failed to load job class:", job_module_name, job_symbol - raise - return job_class - - def print_config(self): - config = self._load_config() - job_class = self._load_job_class(config) - job = job_class(config) - job.print_config() - - def run(self): - cwd = _pop_cwd() - config = self._load_config(cwd) - job_class = self._load_job_class(config) - job = job_class(config) - - job.save_config(open(os.path.join(cwd, self.path_fullconfig),'w')) - - perf = Config() - perf.host_name = socket.gethostname() - perf.start_time = str(datetime.datetime.now()) - perf.save(os.path.join(cwd, self.path_perf)) - - stdout_orig = sys.stdout - stderr_orig = sys.stderr - - try: - sys.stdout = open(os.path.join(cwd, self.path_stdout), 'w') - sys.stderr = open(os.path.join(cwd, self.path_stderr), 'w') - - # - # mess around with the working directory - # - - wd = os.path.join(cwd, self.path_workdir) - try: - os.mkdir(wd) - except OSError, e: - print >> sys.stderr, "trouble making wordking directory:" - print >> sys.stderr, e - print >> sys.stderr, "ignoring error and proceeding anyway" - try: - os.chdir(wd) - except: - pass - - print >> sys.stderr, "cwd:", os.getcwd() - - # - # run the job... - # - - #TODO load the results file before running the job, to resume job - results = Config() - - job_rval = job.run(results) - - #TODO: use the return value to decide whether the job should be resumed or not, - # False means run is done - # True means the job has yielded, but should be continued - results.save(os.path.join(cwd, self.path_results)) - - finally: - #put back stderr and stdout - sys.stdout = stdout_orig - sys.stderr = stderr_orig - perf.end_time = str(datetime.datetime.now()) - perf.save(os.path.join(cwd, self.path_perf)) - - def defaults(self): - job_class = self._load_job_class() - job_class.print_defaults() - -def standalone_run_job(): - exe = RunJob(sys.argv.pop(0)) - try: - cmd = sys.argv.pop(0) - fn = getattr(exe, cmd) - except IndexError: - fn = getattr(exe, 'run') - except AttributeError: - print >> sys.stderr, "command not supported", cmd - - fn() - -def build_db(cwd, db=None): - """WRITEME""" - db = sqlite_memory_db() if db is None else db - for e in os.listdir(cwd): - e = os.path.join(cwd, e) - try: - e_config = open(os.path.join(e, 'job_config.py')) - except: - e_config = None - - try: - e_sentinel = open(os.path.join(e, '__jobdir__')) - except: - e_sentinel = None - - if not (e_config or e_sentinel): - continue #this is not a job dir - - if e_config: - e_config.close() - config = load(os.path.join(e, 'job_config.py')) - kwargs = copy.copy(config.__dict__) - try: - results = load(os.path.join(e, 'job_results.py')) - kwargs.update(results.__dict__) - except: - pass - try: - perf = load(os.path.join(e, 'job_perf.py')) - kwargs.update(perf.__dict__) - except: - pass - - #TODO: this is a backward-compatibility hack for AISTATS*09 - if 'perf_jobdir' not in kwargs: - kwargs['perf_jobdir'] = e - if 'perf_workdir' not in kwargs: - kwargs['perf_workdir'] = os.path.join(e, 'workdir') - - entry = db.insert(kwargs) - - if e_sentinel: - print >> sys.stderr, "NOT-IMPLEMENTED: RECURSION INTO SUBDIRECTORY", e - return db - - -class RunQuery(object): - - def __init__(self, exename): - pass - - def run(self): - cwd = _pop_cwd() - db = build_db(cwd) - for entry in db.query().all(): - print entry.items() - - -def standalone_query(): - exe = RunQuery(sys.argv.pop(0)) - try: - cmd = sys.argv.pop(0) - fn = getattr(exe, cmd) - except IndexError: - fn = getattr(exe, 'run') - except AttributeError: - print >> sys.stderr, "command not supported", cmd - - fn() - diff -r d8af9c8a4bbb -r fa6c399dc292 pylearn/dbdict/dbdict_run.py --- a/pylearn/dbdict/dbdict_run.py Mon Feb 02 17:41:54 2009 -0500 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,78 +0,0 @@ -import sys, signal -from .tools import DictProxyState, load_state_fn, run_state - -# N.B. -# don't put exotic imports here -# see what I did with def sql() below... since we shouldn't try to import that stuff unless we -# need it. - -_experiment_usage = """Usage: - dbdict-experiment - -Commands: - - help Print help. (You're looking at it.) - - cmdline Obtain experiment configuration by evaluating the commandline - sql Obtain experiment configuration by querying an sql database - -Help on individual commands might be available by typing 'dbdict-experiment help' - -""" - #dbdict Obtain experiment configuration by loading a dbdict file - -def _dispatch_cmd(self, stack=sys.argv): - try: - cmd = stack.pop(0) - except IndexError: - cmd = 'help' - try: - fn = getattr(self, cmd) - except AttributeError: - print >> sys.stderr, "command not supported", cmd - fn = self.help - fn() - -class RunCmdline(object): - def __init__(self): - try: - dct = eval('dict(' + sys.argv.pop(0) + ')') - except Exception, e: - print >> sys.stderr, "Exception:", e - self.help() - return - - channel_rval = [None] - - def on_sigterm(signo, frame): - channel_rval[0] = 'stop' - - #install a SIGTERM handler that asks the run_state function to return - signal.signal(signal.SIGTERM, on_sigterm) - signal.signal(signal.SIGINT, on_sigterm) - - def channel(*args, **kwargs): - return channel_rval[0] - - run_state(dct, channel) - print dct - - def help(self): - print >> sys.stderr, "Usage: dbdict-experiment cmdline " - - -class RunExperiment(object): - """This class handles the behaviour of the dbdict-run script.""" - def __init__(self): - exe = sys.argv.pop(0) - #print 'ARGV', sys.argv - _dispatch_cmd(self) - - def sql(self): - from .dbdict_run_sql import run_sql - return run_sql() - - cmdline = RunCmdline - - def help(self): - print _experiment_usage # string not inlined so as not to fool VIM's folding mechanism diff -r d8af9c8a4bbb -r fa6c399dc292 pylearn/dbdict/dbdict_run_sql.py --- a/pylearn/dbdict/dbdict_run_sql.py Mon Feb 02 17:41:54 2009 -0500 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,446 +0,0 @@ -"""Code related to using the api0 dictionary table to run experiments. - -This module has been tested with postgres. - -""" - -import sys, os, socket, tempfile, shutil, copy, time -import numpy # for the random wait time in book_unstarted_trial - -import sqlalchemy -from sqlalchemy import create_engine, desc -from sqlalchemy.orm import eagerload -import psycopg2, psycopg2.extensions - -from .api0 import db_from_engine, postgres_db -from .tools import run_state, DictProxyState, COMPLETE, INCOMPLETE, SYMBOL, MODULE # tools -from .dconfig import save_items - -# _TEST CONCURRENCY -# To ensure that concurrency is handled properly by the consumer (book_dct) -# set this flag to True and (manually) run the following test. -# -# Launch two processes side by side, starting one of them a few seconds after the other. -# There is an extra sleep(10) that will delay each process's job dequeue. -# -# You should see that the process that started first gets the job, -# and the process that started second tries to get the same job, -# fails, and then gets another one instead. -_TEST_CONCURRENCY = False - -_help = """Usage: -dbdict-run sql postgres://:@// - - user - postgres username - pass - password - hostname - the network address of the host on which a postgres server is - running (on port ??) - database - a database served by the postgres server on - api0-table - the name (actually, table_prefix) associated with tables, - created by dbdict.api0. - - experiment-root - a local or network path. Network paths begin with ssh:// - E.g. /tmp/blah - ssh://mammouth:blah - ssh://foo@linux.org:/tmp/blah - -Experiment-root is used to store the file results of experiments. If a job with a given -creates file 'a.txt', and directory 'b' with file 'foo.py', then these will be rsync'ed to the -experiment-root when job has finished running. They will be found here: - - ////workdir/a.txt - ////workdir/b/foo.py - -Files 'stdout', 'stderr', and 'state.py' will be created. - - ////stdout - opened for append - ////stderr - opened for append - ////state.py - overwritten with database version - -If a job is restarted or resumed, then those files are rsync'ed back to the current working -directory, and stdout and stderr are re-opened for appending. When a resumed job stops for -the second (or more) time, the cwd is rsync'ed back to the experiment-root. In this way, the -experiment-root accumulates the results of experiments that run. - -""" - -STATUS = 'dbdict_sql_status' -PRIORITY = 'dbdict_sql_priority' -HOST = 'dbdict_sql_hostname' -HOST_WORKDIR = 'dbdict_sql_host_workdir' -PUSH_ERROR = 'dbdict_sql_push_error' - -START = 0 -"""dbdict_status == START means a experiment is ready to run""" - -RUNNING = 1 -"""dbdict_status == RUNNING means a experiment is running on dbdict_hostname""" - -DONE = 2 -"""dbdict_status == DONE means a experiment has completed (not necessarily successfully)""" - -RESTART_PRIORITY = 2.0 -"""Stopped experiments are marked with this priority""" - - -def postgres_serial(user, password, host, database, **kwargs): - """Return a DbHandle instance that communicates with a postgres database at transaction - isolation_level 'SERIALIZABLE'. - - :param user: a username in the database - :param password: the password for the username - :param host: the network address of the host on which the postgres server is running - :param database: a database served by the postgres server - - """ - this = postgres_serial - - if not hasattr(this,'engine'): - def connect(): - c = psycopg2.connect(user=user, password=password, database=database, host=host) - c.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE) - return c - pool_size = 0 - this.engine = create_engine('postgres://' - ,creator=connect - ,pool_size=0 # should force the app release connections - ) - - db = db_from_engine(this.engine, **kwargs) - db._is_serialized_session_db = True - return db - - -def add_experiments_to_db(exp_cls, jobs, db, verbose=0, add_dups=False, type_check=None): - """Add experiments paramatrized by exp_cls and jobs[i] to database db. - - Default behaviour is to ignore jobs which are already in the database. - - If type_check is a class (instead of None) then it will be used as a type declaration for - all the elements in each job dictionary. For each key,value pair in the dictionary, there - must exist an attribute,value pair in the class meeting the following criteria: - the attribute and the key are equal, and the types of the values are equal. - - :param exp_cls: The Experiment class to run these experiments. - :param jobs: The parameters of experiments to run. - :type jobs: an iterable object over dictionaries - :param verbose: print which jobs are added and which are skipped - :param add_dups: False will ignore a job if it matches (on all items()) with a db entry. - :type add_dups: Bool - - :returns: list of (Bool,job[i]) in which the flags mean the corresponding job actually was - inserted. - - """ - rval = [] - for job in jobs: - job = copy.copy(job) - do_insert = add_dups or (None is db.query(**job).first()) - - if do_insert: - if type_check: - for k,v in job.items(): - if type(v) != getattr(type_check, k): - raise TypeError('Experiment contains value with wrong type', - ((k,v), getattr(type_check, k))) - - job[STATUS] = START - job[SYMBOL] = exp_cls.__name__ - job[MODULE] = exp_cls.__module__ - job[PRIORITY] = 1.0 - if verbose: - print 'ADDING ', job - db.insert(job) - rval.append((True, job)) - else: - if verbose: - print 'SKIPPING', job - rval.append((False, job)) - - -def book_dct_postgres_serial(db, retry_max_sleep=10.0, verbose=0): - """Find a trial in the lisa_db with status START. - - A trial will be returned with dbdict_status=RUNNING. - - Returns None if no such trial exists in DB. - - This function uses a serial access to the lisadb to guarantee that no other - process will retrieve the same dct. It is designed to facilitate writing - a "consumer" for a Producer-Consumer pattern based on the database. - - """ - print >> sys.stderr, """#TODO: use the priority field, not the status.""" - print >> sys.stderr, """#TODO: ignore entries with key PUSH_ERROR.""" - - s = db._session - - # NB. we need the query and attribute update to be in the same transaction - assert s.autocommit == False - - dcts_seen = set([]) - keep_trying = True - - dct = None - while (dct is None) and keep_trying: - #build a query - q = s.query(db._Dict) - q = q.options(eagerload('_attrs')) #hard-coded in api0 - q = q.filter(db._Dict._attrs.any(name=STATUS, fval=START)) - - #try to reserve a dct - try: - #first() may raise psycopg2.ProgrammingError - dct = q.first() - - if dct is not None: - assert (dct not in dcts_seen) - if verbose: print 'book_unstarted_dct retrieved, ', dct - dct._set_in_session(STATUS, RUNNING, s) - if 1: - if _TEST_CONCURRENCY: - print >> sys.stderr, 'SLEEPING BEFORE BOOKING' - time.sleep(10) - - #commit() may raise psycopg2.ProgrammingError - s.commit() - else: - print >> sys.stderr, 'DEBUG MODE: NOT RESERVING JOB!', dct - #if we get this far, the job is ours! - else: - # no jobs are left - keep_trying = False - except (psycopg2.OperationalError, - sqlalchemy.exceptions.ProgrammingError), e: - #either the first() or the commit() raised - s.rollback() # docs say to do this (or close) after commit raises exception - if verbose: print 'caught exception', e - if dct: - # first() succeeded, commit() failed - dcts_seen.add(dct) - dct = None - wait = numpy.random.rand(1)*retry_max_sleep - if verbose: print 'another process stole our dct. Waiting %f secs' % wait - time.sleep(wait) - return dct - -def book_dct(db): - print >> sys.stderr, """#TODO: use the priority field, not the status.""" - print >> sys.stderr, """#TODO: ignore entries with key self.push_error.""" - - return db.query(dbdict_status=START).first() - -def parse_dbstring(dbstring): - postgres = 'postgres://' - assert dbstring.startswith(postgres) - dbstring = dbstring[len(postgres):] - - #username_and_password - colon_pos = dbstring.find('@') - username_and_password = dbstring[:colon_pos] - dbstring = dbstring[colon_pos+1:] - - colon_pos = username_and_password.find(':') - if -1 == colon_pos: - username = username_and_password - password = None - else: - username = username_and_password[:colon_pos] - password = username_and_password[colon_pos+1:] - - #hostname - colon_pos = dbstring.find('/') - hostname = dbstring[:colon_pos] - dbstring = dbstring[colon_pos+1:] - - #dbname - colon_pos = dbstring.find('/') - dbname = dbstring[:colon_pos] - dbstring = dbstring[colon_pos+1:] - - #tablename - tablename = dbstring - - if password is None: - password = open(os.getenv('HOME')+'/.dbdict_%s'%dbname).readline()[:-1] - if False: - print 'USERNAME', username - print 'PASS', password - print 'HOST', hostname - print 'DB', dbname - print 'TABLE', tablename - - return username, password, hostname, dbname, tablename - -class ExperimentLocation(object): - def __init__(self, root, dbname, tablename, id): - ssh_prefix='ssh://' - if root.startswith(ssh_prefix): - root = root[len(ssh_prefix):] - #at_pos = root.find('@') - colon_pos = root.find(':') - self.host, self.path = root[:colon_pos], root[colon_pos+1:] - else: - self.host, self.path = '', root - self.dbname = dbname - self.tablename = tablename - self.id = id - - def rsync(self, direction): - """The directory at which experiment-related files are stored. - - :returns: ":", of the sort used by ssh and rsync. - """ - if self.host: - path = os.path.join( - ':'.join([self.host, self.path]), - self.dbname, - self.tablename, - self.id) - else: - path = os.path.join(self.path, - self.dbname, self.tablename, self.id) - - if direction == 'push': - rsync_cmd = 'rsync -a * "%s/"' % path - elif direction == 'pull': - rsync_cmd = 'rsync -a "%s/" .' % path - else: - raise Exception('invalid direction', direction) - - rsync_rval = os.system(rsync_cmd) - if rsync_rval != 0: - raise Exception('rsync failure', (rsync_rval, rsync_cmd)) - - def pull(self): - return self.rsync('pull') - - def push(self): - return self.rsync('push') - - def touch(self): - path = os.path.join(self.path, self.dbname, self.tablename, self.id) - if self.host: - host = self.host - touch_cmd = ('ssh %(host)s "mkdir -p \'%(path)s\' && cd \'%(path)s\' ' - '&& touch stdout stderr && mkdir workdir"' % locals()) - else: - touch_cmd = ("mkdir -p '%(path)s' && cd '%(path)s' " - '&& touch stdout stderr && mkdir workdir' % locals()) - print "ECHO", touch_cmd - touch_rval = os.system(touch_cmd) - if 0 != touch_rval: - raise Exception('touch failure', (touch_rval, touch_cmd)) - - def delete(self): - #something like ssh %s 'rm -Rf %s' should work, but it's pretty scary... - raise NotImplementedError() - -def run_sql(): - try: - username, password, hostname, dbname, tablename = parse_dbstring(sys.argv.pop(0)) - except Exception, e: - print >> sys.stderr, e - print >> sys.stderr, _help - raise - - #set experiment_root - try: - exproot = sys.argv.pop(0) - except: - exproot = os.getcwd() - - if not exproot.startswith('/'): - exproot = os.path.join(os.getcwd(), exproot) - - #TODO: THIS IS A GOOD IDEA RIGHT? - # It makes module-lookup work based on cwd-relative paths - # But possibly has really annoying side effects? Is there a cleaner - # way to change the import path just for loading the experiment class? - sys.path.insert(0, os.getcwd()) - - #TODO: refactor this so that we can use any kind of database (not just postgres) - - #a serialized session is necessary for jobs not be double-booked by listeners running - #in parallel on the cluster. - db = postgres_serial(user=username, - password=password, - host=hostname, - database=dbname, - table_prefix=tablename) - - while True: - dct = book_dct_postgres_serial(db, verbose=1) - if dct is None: - break - - try: - # - # chdir to a temp folder - # - workdir = tempfile.mkdtemp() - print >> sys.stderr, "INFO RUNNING ID %i IN %s" % (dct.id, workdir) - os.chdir(workdir) - - # not sure where else to put this... - dct[HOST] = socket.gethostname() - dct[HOST_WORKDIR] = workdir - - exploc = ExperimentLocation(exproot, dbname, tablename, str(dct.id)) - - # - # pull cwd contents - # - exploc.touch() - exploc.pull() - - # - # run the experiment - # - old_stdout, old_stderr = sys.stdout, sys.stderr - sys.stdout, sys.stderr = open('stdout', 'a+'), open('stderr', 'a+') - assert RUNNING == dct[STATUS] #set by get_dct - os.chdir(os.path.join(workdir, 'workdir')) - try: - run_rval = run_state(DictProxyState(dct)) - except Exception, e: - run_rval = COMPLETE - print >> sys.stderr, 'Exception:', e - print >> old_stderr, '#TODO: print a bigger traceback to stderr' - sys.stdout, sys.stderr = old_stdout, old_stderr - - # - # push the results back to the experiment_root - # - try: - os.chdir(workdir) - #pickle the state #TODO: write it human-readable - #cPickle.dump(dict((k,v) for k,v in dct.items()), open('state.pickle','w')) - save_items(dct.items(), open('state.py', 'w')) - exploc.push() - except Exception, e: - dct[PUSH_ERROR] = str(e) - raise - - # Cleanup the tempdir - # TODO: put this in a 'finally' block? - # - shutil.rmtree(workdir, ignore_errors=True) - - except: - dct[STATUS] = DONE - dct[PRIORITY] = None - raise - - if run_rval is INCOMPLETE: - #mark the job as needing restarting - dct[STATUS] = START - dct[PRIORITY] = RESTART_PRIORITY - else: - #mark the job as being done - dct[STATUS] = DONE - dct[PRIORITY] = None - - break # don't actually loop - - diff -r d8af9c8a4bbb -r fa6c399dc292 pylearn/dbdict/dconfig.py --- a/pylearn/dbdict/dconfig.py Mon Feb 02 17:41:54 2009 -0500 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,145 +0,0 @@ -import sys -# -# -# Utility -# -# - -class Config (object): - """A class to store experiment configurations. - - Configuration variables are stored in class instance __dict__. This class - ensures that keys are alphanumeric strings, and that values can be rebuilt - from their representations. - - It can be serialized to/from a python file. - - """ - def __init__(self, __dict__=None, **kwargs): - if __dict__: - Config.__checkkeys__(*__dict__.keys()) - self.__dict__.update(__dict__) - if kwargs: - Config.__checkkeys__(*kwargs.keys()) - self.__dict__.update(kwargs) - - def __setattr__(self, attr, value): - Config.__checkkeys__(attr) - self.__dict__[attr] = value - - def __hash__(self): - """Compute a hash string from a dict, based on items(). - - @type dct: dict of hashable keys and values. - @param dct: compute the hash of this dict - - @rtype: string - - """ - items = list(self.items()) - items.sort() - return hash(repr(items)) - - def keys(self): - return self.__dict__.keys() - - def items(self): - return self.__dict__.items() - - def update(self, dct): - Config.__checkkeys__(*dct.keys()) - self.__dict__.update(dct) - - def save_file(self, f=sys.stdout): - items = self.items() - items.sort() - save_dictlike(items, f) - - def save(self, filename): - """Write a python file as a way of serializing a dictionary - - @type filename: string - @param filename: filename to open (overwrite) to save dictionary contents - - @return None - - """ - f = open(filename, 'w') - self.save_file(f) - f.close() - - def update_fromfile(self, filename): - """Read the local variables from a python file - - @type filename: string, filename suitable for __import__() - @param filename: a file whose module variables will be returned as a - dictionary - - @rtype: dict - @return: the local variables of the imported filename - - @note: - This implementation silently ignores all module symbols that don't meet - the standards enforced by L{Config.__checkkeys__}. This is meant to - ignore module special variables. - - """ - #m = __import__(filename) #major security problem, or feature? - f = open(filename) - for line in f: - if line.startswith('#'): - continue - line = line[:-1] #trim the '\n' - tokens = line.split(' ') - try: - key = tokens[0] - assert '=' == tokens[1] - except: - raise Exception('trouble parsing line:', line) - repr_val = ' '.join(tokens[2:]) - val = eval(repr_val) - setattr(self, key, val) - - @staticmethod - def __checkkeys__(*args): - conf = Config() - for k in args: - #must be string - if type(k) != str: - raise KeyError(k) - #mustn't be part of Config class interface - if hasattr(conf, k): - raise KeyError(k) - #all alphanumeric - for c in k: - if c not in ('abcdefghijklmnopqrstuvwxyz' - 'ABCDEFGHIJKLMNOPQRSTUVWXYZ' - '_0123456789'): - raise KeyError(k) - #no symbols that look like reserved symbols - if k[:2]=='__' and k[-2:]=='__' and len(k)>4: - raise KeyError(k) - -def load(*filenames): - rval = Config() - for filename in filenames: - rval.update_fromfile(filename) - return rval - -def save_items(items, f): - """Write a python file as a way of serializing a dictionary - - @type f: writable file-like object - @param f: the attributes of this object will be written here - - @return None - - """ - print >> f, "#File generated by dbdict.dconfig.save_items" - for key, val in items: - Config.__checkkeys__(key) - # should never raise... illegal keys are not - # supposed to ever get into the dictionary - repr_val = repr(val) - assert val == eval(repr_val) - print >> f, key, '=', repr_val diff -r d8af9c8a4bbb -r fa6c399dc292 pylearn/dbdict/design.txt --- a/pylearn/dbdict/design.txt Mon Feb 02 17:41:54 2009 -0500 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,316 +0,0 @@ - -TODO -==== - -- Class interface for jobs (SHAPING UP) - -- How exactly will state work (STILL A MYSTERY sql vs. file vs. script) - - How to insert jobs - - How to retrieve jobs - -- How will the driver work (rsync / file conventions) - - - -Components -========== - -DbDict ------- - -A DbDict is a restricted dictionary. - -It's keys are restricted to simple types, simple enough to be stored into a -database. Strings, integers, floating-points. - - -Experiment ----------- - -An Experiment is a python class with methods like start(), run(), stop(). A -user should make an Experiment class whose run() method does some computations -like training a neural network and measuring test errors, training an SVM, -doing some feature extraction, etc. - -An Experiment's arguments are stored in a DbDict. - -The results of an experiment can be stored in two places: in a DbDict (for -easy SQL searching later on) and in files. - - -ExperimentLauncher ------------------- - -Experiments are not standalone executables, they need to be run by some -additional code. There are several ways to launch an experiment: - -- running on your local machine vs. cluster vs. condor vs. mammouth - -- loading the arguments from the commandline vs. file vs. SQL query - -- running alone, or running in parallel with other experiments. - -The ExperimentLauncher is an executable python program that can run -experiments in these different ways. - - - -Workflow (How to run experiments) -================================= - -First, you'll need a Python program to compute what you want. Think about how -long it will take and whether you will want to be able to resume after being -stopped. - -If you want to run a simple or short experiment that is not restartable, -simply extend dbdict.Experiment like this: - -.. code-block:: python - - class MyExperiment(dbdict.Experiment): - - def __init__(self, dct): - self.dct = dct - - def run(self): - - # compute something - result = self.dct.a * self.dct.b - - # store it - self.dct.result = result - - # write some other results to a file - f = open('some_file', 'w') - print >> f, "hello from the job?" - - # return False or None to indicate the job is done - -If you want to run a more elaborate experiment, then extend dbdict.Experiment -like this: - -.. code-block:: python - - class MyRestartableExperiment(dbdict.Experiment): - - def __init__(self, state): - """Called once per lifetime of the class instance. Can be used to - create new jobs and save them to the database. This function will not - be called when a Job is retrieved from the database. - - Parent creates keys: dbdict_id, dbdict_module, dbdict_symbol, dbdict_state. - - dbdict_state = NEW (possibilities: NEW, RUNNING, DONE) - - """ - - def start(self): - """Called once per lifetime of the compute job. - - This is a good place to initialize internal variables. - - After this function returns, either stop() or run() will be called. - - dbdict_state -> RUNNING - - """ - - def resume(self): - """Called to resume computations on a previously stop()'ed job. The - os.getcwd() is just like it was after some previous stop() command. - - This is a good place to load internal variables from os.getcwd(). - - dbdict_state -> RUNNING - - """ - - def run(self, channel): - """Called after start() or resume(). - - channel() may return different things at different times. - None - run should continue. - 'stop' - the job should save state as soon as possible because - the process may soon be terminated - - When this function returns, dbdict_state -> DONE. - """ - - -Having written your Experiment subclass, you can now test it directly, or -using the ExperimentLauncher. - -.. code-block:: python - - if __name__ == '__main__': - - class Config(object): - a = 3 - b = 50 - - experiment = MyExperiment(Config()) - experiment.run() - -.. code-block:: bash - - dbdict-run my_file.MyExperiment 'dict(a=3, b=50)' - - -To run a batch of experiments on a cluster using 3 concurrent processes using -the SQL-based job list, you have to insert the jobs into a database table, and -then launch 10 instances of dbdict-run. - -.. code-block:: python - - from dbdict.tools import insert, view_table - - def add_jobs(): - experiment_module = 'my_repo.some_file' #match you module name - experiment_symbol = 'MyExperiment' #match your class name - experiment_name = 'showing some stuff' - for a in 3, 4, 5: - for b in 0.0, 0.1, 3.0: - insert(locals(), ignore_duplicates=False) - add_jobs() - - dbdict_run_table('my_launcher_view', experiment_name='showing_some_stuff') - - - -.. code-block:: bash - - dbidispatch --condor dbdict-run sql my_launcher_view - - - -Component Documentation -======================= - -DbDict ------- - - -Experiment ----------- - -class Experiment(object): - - new_stdout == 'job_stdout' - new_stderr == 'job_stderr' - - def remap_stdout(self): - """ - Called before start and resume. - - Parent-class behaviour is to replace sys.stdout with open(self.new_stdout, 'w+'). - - """ - - def remap_stderr(self): - """ - Called before start and resume. - - """ - - def tempdir(self): - """ - Return the recommended filesystem location for temporary files. - - The idea is that this will be a fast, local disk partition, suitable - for temporary storage. - - Files here will not generally be available at the time of resume(). - - The return value of this function may be controlled by one or more - environment variables. - - Will return $DBDICT_EXPERIMENT_TEMPDIR if present. - Failing that, will return $TMP/username-dbdict/hash(self) - Failing that, will return /tmp/username-dbdict/hash(self) - - .. note:: - Maybe we should use Python stdlib's tempdir mechanism. - - """ - - -Environment Variables -===================== - - -TMP - sometimes used by Experiment.tempdir() - -DBDICT_EXPERIMENT_TEMPDIR - used by Experiment.tempdir() - - -JOB DRIVER ----------- - -When a job is started, save: - -- workdir - -- dir - -- network (e.g. mammouth vs. diro... where are jobdir and workdir? - Alternatively, automatically rsync finished jobs' workdir and jobdir to diro.) - - -dbdict-run-job --------------- - -prepare job dir -~~~~~~~~~~~~~~~~ - -On iro, cd to fringant2/.../job_id/ - -when restarting on mammouth, cd to somewhere and rsync -r fringant2/.../job_id job_id - -after stopping on mammouth rsync -r job_id fringant2/.../job_id - -.. note: - What to do on error in rsync? Where to report problems? - -.. note: - What to do with stderr, stdout? - -job queue table -~~~~~~~~~~~~~~~ -New table: id job_id priority - -This is the table that dbdict-run-job will pop jobs from. - - -How to run experiments using DbDict ------------------------------------ - -- Inherit from tools.Job. This is where you put the logic that takes some - parameters, and computes some results associated with those parameters. - -- Write a loop that creates a bunch of Jobs, and then tests to see if they are - already in the db before inserting them. - -- Insert the new job ids and priority into the qu -- Save result files, and files needed for job resume to os.getcwd(). - -- Save non-result files to a TEMP directory, obtained by e.g. self.gettmpdir() - from inside a Job instance. - -- Load datasets through pylearn. Pylearn has internal configuration that allows - it to find files both on mammouth and DIRO networks. You just call - MNIST.head() or shapeset1.train_valid_test() and bingo. - -- Manage experiments in a postgres database. Which ones have been queued, which - ones are running, which are finished, simple results, etc. - -- To analyze results, select from PG and create a dedicated table in sqlite. - Make columns for all the dictionary keys from all the selected entries. - - - -When running classification experiments ---------------------------------------- - -Have convention of saving (train, valid, test) X (NLL, RATE). -Should the convention be enforced? How? - diff -r d8af9c8a4bbb -r fa6c399dc292 pylearn/dbdict/experiment.py --- a/pylearn/dbdict/experiment.py Mon Feb 02 17:41:54 2009 -0500 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,122 +0,0 @@ -"""Helper code for implementing dbdict-compatible jobs""" -import inspect, sys, copy - -#State values should be instances of these types: -INT = type(0) -FLT = type(0.0) -STR = type('') - -COMPLETE = None #jobs can return this by returning nothing as well -INCOMPLETE = True #jobs can return this and be restarted - -def subdict(dct, prefix): - """Return the dictionary formed by keys in `dct` that start with the string `prefix`. - - In the returned dictionary, the `prefix` is removed from the keynames. - Updates to the sub-dict are reflected in the original dictionary. - - Example: - a = {'aa':0, 'ab':1, 'bb':2} - s = subdict(a, 'a') # returns dict-like object with keyvals {'a':0, 'b':1} - s['a'] = 5 - s['c'] = 9 - # a == {'aa':5, 'ab':1, 'ac':9, 'bb':2} - - """ - class SubDict(object): - def __copy__(s): - rval = {} - rval.update(s) - return rval - def __eq__(s, other): - if len(s) != len(other): - return False - for k in other: - if other[k] != s[k]: - return False - return True - def __len__(s): - return len(s.items()) - def __str__(s): - d = {} - d.update(s) - return str(d) - def keys(s): - return [k[len(prefix):] for k in dct if k.startswith(prefix)] - def values(s): - return [dct[k] for k in dct if k.startswith(prefix)] - def items(s): - return [(k[len(prefix):],dct[k]) for k in dct if k.startswith(prefix)] - def update(s, other): - for k,v in other.items(): - self[k] = v - def __getitem__(s, a): - return dct[prefix+a] - def __setitem__(s, a, v): - dct[prefix+a] = v - - return SubDict() - -def subdict_copy(dct, prefix): - return copy.copy(subdict(dct, prefix)) - -def call_with_kwargs_from_dict(fn, dct, logfile='stderr'): - """Call function `fn` with kwargs taken from dct. - - When fn has a '**' parameter, this function is equivalent to fn(**dct). - - When fn has no '**' parameter, this function removes keys from dct which are not parameter - names of `fn`. The keys which are ignored in this way are logged to the `logfile`. If - logfile is the string 'stdout' or 'stderr', then errors are logged to sys.stdout or - sys.stderr respectively. - - The reason this function exists is to make it easier to provide default arguments and - - :param fn: function to call - :param dct: dictionary from which to take arguments of fn - :param logfile: log ignored keys to this file - :type logfile: file-like object or string 'stdout' or string 'stderr' - - :returns: fn(**) - - """ - argspec = inspect.getargspec(fn) - argnames = argspec[0] - if argspec[2] == None: #if there is no room for a **args type-thing in fn... - kwargs = {} - for k,v in dct.items(): - if k in argnames: - kwargs[k] = v - else: - if not logfile: - pass - elif logfile == 'stderr': - print >> sys.stderr, "WARNING: DictProxyState.call_substate ignoring key-value pair:", k, v - elif logfile == 'stdout': - print >> sys.stdout, "WARNING: DictProxyState.call_substate ignoring key-value pair:", k, v - else: - print >> logfile, "WARNING: DictProxyState.call_substate ignoring key-value pair:", k, v - return fn(**kwargs) - else: - #there is a **args type thing in fn. Here we pass everything. - return fn(**dct) - -#MAKE YOUR OWN DBDICT-COMPATIBLE EXPERIMENTS IN THIS MODEL -def sample_experiment(state, channel): - - #read from the state to obtain parameters, configuration, etc. - print >> sys.stdout, state.items() - - import time - for i in xrange(100): - time.sleep(1) - # use the channel to know if the job should stop ASAP - if channel() == 'stop': - break - - # modify state to record results - state['answer'] = 42 - - #return either INCOMPLETE or COMPLETE to indicate that the job should be re-run or not. - return COMPLETE - diff -r d8af9c8a4bbb -r fa6c399dc292 pylearn/dbdict/newstuff.py --- a/pylearn/dbdict/newstuff.py Mon Feb 02 17:41:54 2009 -0500 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,1013 +0,0 @@ - -from __future__ import with_statement - -from collections import defaultdict -import re, sys, inspect, os, signal, tempfile, shutil, socket -import traceback - -import sql - - -################################################################################ -### misc -################################################################################ - -class DD(dict): - def __getattr__(self, attr): - return self[attr] - def __setattr__(self, attr, value): - self[attr] = value - def __str__(self): - return 'DD%s' % dict(self) - def __repr__(self): - return str(self) - -################################################################################ -### resolve -################################################################################ - -def resolve(name, try_import=True): - """ - Resolve a string of the form X.Y...Z to a python object by repeatedly using getattr, and - __import__ to introspect objects (in this case X, then Y, etc. until finally Z is loaded). - - """ - symbols = name.split('.') - builder = __import__(symbols[0]) - try: - for sym in symbols[1:]: - try: - builder = getattr(builder, sym) - except AttributeError, e: - if try_import: - __import__(builder.__name__, fromlist=[sym]) - builder = getattr(builder, sym) - else: - raise e - except (AttributeError, ImportError), e: - raise type(e)('Failed to resolve compound symbol %s' % name, e) - return builder - -################################################################################ -### dictionary -################################################################################ - -def convert(obj): - try: - return eval(obj, {}, {}) - except (NameError, SyntaxError): - return obj - -def flatten(obj): - """nested dictionary -> flat dictionary with '.' notation """ - d = {} - def helper(d, prefix, obj): - if isinstance(obj, (str, int, float, list, tuple)): - d[prefix] = obj #convert(obj) - else: - if isinstance(obj, dict): - subd = obj - else: - subd = obj.state() - subd['__builder__'] = '%s.%s' % (obj.__module__, obj.__class__.__name__) - for k, v in subd.iteritems(): - pfx = '.'.join([prefix, k]) if prefix else k - helper(d, pfx, v) - helper(d, '', obj) - return d - -def expand(d): - """inverse of flatten()""" - #def dd(): - #return DD(dd) - struct = DD() - for k, v in d.iteritems(): - if k == '': - raise NotImplementedError() - else: - keys = k.split('.') - current = struct - for k2 in keys[:-1]: - current = current.setdefault(k2, DD()) - current[keys[-1]] = v #convert(v) - return struct - -def realize(d): - if not isinstance(d, dict): - return d - d = dict((k, realize(v)) for k, v in d.iteritems()) - if '__builder__' in d: - builder = resolve(d.pop('__builder__')) - return builder(**d) - return d - -def make(d): - return realize(expand(d)) - -################################################################################ -### errors -################################################################################ - -class UsageError(Exception): - pass - -################################################################################ -### parsing and formatting -################################################################################ - -def parse(*strings): - d = {} - for string in strings: - s1 = re.split(' *= *', string, 1) - s2 = re.split(' *:: *', string, 1) - if len(s1) == 1 and len(s2) == 1: - raise UsageError('Expected a keyword argument in place of "%s"' % s1[0]) - elif len(s1) == 2: - k, v = s1 - v = convert(v) - elif len(s2) == 2: - k, v = s2 - k += '.__builder__' - d[k] = v - return d - -def format_d(d, sep = '\n', space = True): - d = flatten(d) - pattern = "%s = %r" if space else "%s=%r" - return sep.join(pattern % (k, v) for k, v in d.iteritems()) - -def format_help(topic): - if topic is None: - return 'No help.' - elif isinstance(topic, str): - help = topic - elif hasattr(topic, 'help'): - help = topic.help() - else: - help = topic.__doc__ - if not help: - return 'No help.' - - ss = map(str.rstrip, help.split('\n')) - try: - baseline = min([len(line) - len(line.lstrip()) for line in ss if line]) - except: - return 'No help.' - s = '\n'.join([line[baseline:] for line in ss]) - s = re.sub(string = s, pattern = '\n{2,}', repl = '\n\n') - s = re.sub(string = s, pattern = '(^\n*)|(\n*$)', repl = '') - - return s - -################################################################################ -### single channels -################################################################################ - -# try: -# import greenlet -# except: -# try: -# from py import greenlet -# except: -# print >>sys.stderr, 'the greenlet module is unavailable' -# greenlet = None - - -class Channel(object): - - COMPLETE = property(lambda s:None, - doc=("Experiments should return this value to " - "indicate that they are done (if not done, return `Incomplete`")) - INCOMPLETE = property(lambda s:True, - doc=("Experiments should return this value to indicate that " - "they are not done (if done return `COMPLETE`)")) - - START = property(lambda s: 0, - doc="dbdict.status == START means a experiment is ready to run") - RUNNING = property(lambda s: 1, - doc="dbdict.status == RUNNING means a experiment is running on dbdict_hostname") - DONE = property(lambda s: 2, - doc="dbdict.status == DONE means a experiment has completed (not necessarily successfully)") - - # Methods to be used by the experiment to communicate with the channel - - def save(self): - """ - Save the experiment's state to the various media supported by - the Channel. - """ - raise NotImplementedError() - - def switch(self, message = None): - """ - Called from the experiment to give the control back to the channel. - The following return values are meaningful: - * 'stop' -> the experiment must stop as soon as possible. It may save what - it needs to save. This occurs when SIGTERM or SIGINT are sent (or in - user-defined circumstances). - switch() may give the control to the user. In this case, the user may - resume the experiment by calling switch() again. If an argument is given - by the user, it will be relayed to the experiment. - """ - pass - - def __call__(self, message = None): - return self.switch(message) - - def save_and_switch(self): - self.save() - self.switch() - - # Methods to run the experiment - - def setup(self): - pass - - def __enter__(self): - pass - - def __exit__(self): - pass - - def run(self): - pass - - - -class JobError(Exception): - RUNNING = 0 - DONE = 1 - NOJOB = 2 - - -class SingleChannel(Channel): - - def __init__(self, experiment, state): - self.experiment = experiment - self.state = state - self.feedback = None - - #TODO: make this a property and disallow changing it during a with block - self.catch_sigterm = True - self.catch_sigint = True - - def switch(self, message = None): - feedback = self.feedback - self.feedback = None - return feedback - - def run(self, force = False): - self.setup() - - status = self.state.dbdict.get('status', self.START) - if status is self.DONE and not force: - # If you want to disregard this, use the --force flag (not yet implemented) - raise JobError(JobError.RUNNING, - 'The job has already completed.') - elif status is self.RUNNING and not force: - raise JobError(JobError.DONE, - 'The job is already running.') - self.state.dbdict.status = self.RUNNING - - v = self.COMPLETE - with self: #calls __enter__ and then __exit__ - try: - v = self.experiment(self.state, self) - finally: - self.state.dbdict.status = self.DONE if v is self.COMPLETE else self.START - - return v - - def on_sigterm(self, signo, frame): - # SIGTERM handler. It is the experiment function's responsibility to - # call switch() often enough to get this feedback. - self.feedback = 'stop' - - def __enter__(self): - # install a SIGTERM handler that asks the experiment function to return - # the next time it will call switch() - if self.catch_sigterm: - self.prev_sigterm = signal.getsignal(signal.SIGTERM) - signal.signal(signal.SIGTERM, self.on_sigterm) - if self.catch_sigint: - self.prev_sigint = signal.getsignal(signal.SIGINT) - signal.signal(signal.SIGINT, self.on_sigterm) - return self - - def __exit__(self, type, value, tb_traceback, save = True): - if type: - try: - raise type, value, tb_traceback - except: - traceback.print_exc() - if self.catch_sigterm: - signal.signal(signal.SIGTERM, self.prev_sigterm) - self.prev_sigterm = None - if self.catch_sigint: - signal.signal(signal.SIGINT, self.prev_sigint) - self.prev_sigint = None - if save: - self.save() - return True - - -class StandardChannel(SingleChannel): - - def __init__(self, path, experiment, state, redirect_stdout = False, redirect_stderr = False): - super(StandardChannel, self).__init__(experiment, state) - self.path = os.path.realpath(path) - self.redirect_stdout = redirect_stdout - self.redirect_stderr = redirect_stderr - - def save(self): - with open(os.path.join(self.path, 'current.conf'), 'w') as current: - current.write(format_d(self.state)) - current.write('\n') - - def __enter__(self): - self.old_cwd = os.getcwd() - os.chdir(self.path) - if self.redirect_stdout: - self.old_stdout = sys.stdout - sys.stdout = open('stdout', 'a') - if self.redirect_stderr: - self.old_stderr = sys.stderr - sys.stderr = open('stderr', 'a') - return super(StandardChannel, self).__enter__() - - def __exit__(self, type, value, traceback): - rval = super(StandardChannel, self).__exit__(type, value, traceback, save = False) - if self.redirect_stdout: - sys.stdout.close() - sys.stdout = self.old_stdout - if self.redirect_stderr: - sys.stderr.close() - sys.stderr = self.old_stderr - os.chdir(self.old_cwd) - self.save() - return rval - - def setup(self): - if not os.path.isdir(self.path): - os.makedirs(self.path) - with self: - origf = os.path.join(self.path, 'orig.conf') - if not os.path.isfile(origf): - with open(origf, 'w') as orig: - orig.write(format_d(self.state)) - orig.write('\n') - currentf = os.path.join(self.path, 'current.conf') - if os.path.isfile(currentf): - with open(currentf, 'r') as current: - self.state = expand(parse(*map(str.strip, current.readlines()))) - - -class RSyncException(Exception): - pass - -class RSyncChannel(StandardChannel): - - def __init__(self, path, remote_path, experiment, state, redirect_stdout = False, redirect_stderr = False): - super(RSyncChannel, self).__init__(path, experiment, state, redirect_stdout, redirect_stderr) - - ssh_prefix='ssh://' - if remote_path.startswith(ssh_prefix): - remote_path = remote_path[len(ssh_prefix):] - colon_pos = remote_path.find(':') - self.host = remote_path[:colon_pos] - self.remote_path = remote_path[colon_pos+1:] - else: - self.host = '' - self.remote_path = os.path.realpath(remote_path) - - def rsync(self, direction): - """The directory at which experiment-related files are stored. - """ - - path = self.path - - remote_path = self.remote_path - if self.host: - remote_path = ':'.join([self.host, remote_path]) - - # TODO: use something more portable than os.system - if direction == 'push': - rsync_cmd = 'rsync -ac "%s/" "%s/"' % (path, remote_path) - elif direction == 'pull': - rsync_cmd = 'rsync -ac "%s/" "%s/"' % (remote_path, path) - else: - raise RSyncException('invalid direction', direction) - - rsync_rval = os.system(rsync_cmd) - if rsync_rval != 0: - raise RSyncException('rsync failure', (rsync_rval, rsync_cmd)) - - def touch(self): - if self.host: - host = self.host - touch_cmd = ('ssh %(host)s "mkdir -p \'%(path)s\'"' % dict(host = self.host, - path = self.remote_path)) - else: - touch_cmd = ("mkdir -p '%(path)s'" % dict(path = self.remote_path)) - # print "ECHO", touch_cmd - touch_rval = os.system(touch_cmd) - if 0 != touch_rval: - raise Exception('touch failure', (touch_rval, touch_cmd)) - - def pull(self): - return self.rsync('pull') - - def push(self): - return self.rsync('push') - - def save(self): - super(RSyncChannel, self).save() - self.push() - - def setup(self): - self.touch() - self.pull() - super(RSyncChannel, self).setup() - - -class DBRSyncChannel(RSyncChannel): - - RESTART_PRIORITY = 2.0 - - def __init__(self, username, password, hostname, dbname, tablename, path, remote_root, redirect_stdout = False, redirect_stderr = False): - self.username, self.password, self.hostname, self.dbname, self.tablename \ - = username, password, hostname, dbname, tablename - - self.db = sql.postgres_serial( - user = self.username, - password = self.password, - host = self.hostname, - database = self.dbname, - table_prefix = self.tablename) - - self.dbstate = sql.book_dct_postgres_serial(self.db) - if self.dbstate is None: - raise JobError(JobError.NOJOB, - 'No job was found to run.') - - try: - state = expand(self.dbstate) - experiment = resolve(state.dbdict.experiment) - remote_path = os.path.join(remote_root, self.dbname, self.tablename, str(self.dbstate.id)) - super(DBRSyncChannel, self).__init__(path, remote_path, experiment, state, redirect_stdout, redirect_stderr) - except: - self.dbstate['dbdict.status'] = self.DONE - raise - - def save(self): - super(DBRSyncChannel, self).save() - self.dbstate.update(flatten(self.state)) - - def setup(self): - # Extract a single experiment from the table that is not already running. - # set self.experiment and self.state - super(DBRSyncChannel, self).setup() - self.state.dbdict.sql.host_name = socket.gethostname() - self.state.dbdict.sql.host_workdir = self.path - self.dbstate.update(flatten(self.state)) - - def run(self): - # We pass the force flag as True because the status flag is - # already set to RUNNING by book_dct in __init__ - v = super(DBRSyncChannel, self).run(force = True) - if v is self.INCOMPLETE and self.state.dbdict.sql.priority != self.RESTART_PRIORITY: - self.state.dbdict.sql.priority = self.RESTART_PRIORITY - self.save() - return v - - - -################################################################################ -### running -################################################################################ - -import optparse -OptionParser = optparse.OptionParser -# class OptionParser(optparse.OptionParser): -# def error(self, message): -# pass - -def parse_and_run(command, arguments): - parser, runner = runner_registry.get(command, (None, None)) - if not runner: - raise UsageError('Unknown runner: "%s"' % command) - if parser: - options, arguments = parser.parse_args(arguments) - else: - options = optparse.Values() - run(runner, [options] + arguments) - -def run(runner, arguments): - argspec = inspect.getargspec(runner) - minargs = len(argspec[0])-(len(argspec[3]) if argspec[3] else 0) - maxargs = len(argspec[0]) - if minargs > len(arguments) or maxargs < len(arguments) and not argspec[1]: - s = format_help(runner) - raise UsageError(s) - runner(*arguments) - -runner_registry = dict() - - -parser_cmdline = OptionParser(usage = '%prog cmdline [options] ') -parser_cmdline.add_option('-f', '--force', action = 'store_true', dest = 'force', default = False, - help = 'force running the experiment even if it is already running or completed') -parser_cmdline.add_option('--redirect-stdout', action = 'store_true', dest = 'redirect_stdout', default = False, - help = 'redirect stdout to the workdir/stdout file') -parser_cmdline.add_option('--redirect-stderr', action = 'store_true', dest = 'redirect_stderr', default = False, - help = 'redirect stderr to the workdir/stdout file') -parser_cmdline.add_option('-r', '--redirect', action = 'store_true', dest = 'redirect', default = False, - help = 'redirect stdout and stderr to the workdir/stdout and workdir/stderr files') -parser_cmdline.add_option('-w', '--workdir', action = 'store', dest = 'workdir', default = None, - help = 'the working directory in which to run the experiment') -parser_cmdline.add_option('-n', '--dry-run', action = 'store_true', dest = 'dry_run', default = False, - help = 'use this option to run the whole experiment in a temporary working directory (cleaned after use)') -parser_cmdline.add_option('-2', '--sigint', action = 'store_true', dest = 'allow_sigint', default = False, - help = 'allow sigint (CTRL-C) to interrupt a process') - -def runner_cmdline(options, experiment, *strings): - """ - Start an experiment with parameters given on the command line. - - Usage: cmdline [options] - - Run an experiment with parameters provided on the command - line. See the help topics for experiment and parameters for - syntax information. - - Example use: - dbdict-run cmdline mymodule.my_experiment \\ - stopper::pylearn.stopper.nsteps \\ # use pylearn.stopper.nsteps - stopper.n=10000 \\ # the argument "n" of nsteps is 10000 - lr=0.03 - """ - state = expand(parse(*strings)) - state.setdefault('dbdict', DD()).experiment = experiment - experiment = resolve(experiment) - if options.workdir and options.dry_run: - raise UsageError('Please use only one of: --workdir, --dry-run.') - if options.workdir: - workdir = options.workdir - elif options.dry_run: - workdir = tempfile.mkdtemp() - else: - workdir = format_d(state, sep=',', space = False) - channel = StandardChannel(workdir, - experiment, state, - redirect_stdout = options.redirect or options.redirect_stdout, - redirect_stderr = options.redirect or options.redirect_stderr) - channel.catch_sigint = not options.allow_sigint - channel.run(force = options.force) - if options.dry_run: - shutil.rmtree(workdir, ignore_errors=True) - -runner_registry['cmdline'] = (parser_cmdline, runner_cmdline) - - - - -parser_filemerge = OptionParser(usage = '%prog filemerge [options] ...') -parser_filemerge.add_option('-f', '--force', action = 'store_true', dest = 'force', default = False, - help = 'force running the experiment even if it is already running or completed') -parser_filemerge.add_option('--redirect-stdout', action = 'store_true', dest = 'redirect_stdout', default = False, - help = 'redirect stdout to the workdir/stdout file') -parser_filemerge.add_option('--redirect-stderr', action = 'store_true', dest = 'redirect_stderr', default = False, - help = 'redirect stderr to the workdir/stdout file') -parser_filemerge.add_option('-r', '--redirect', action = 'store_true', dest = 'redirect', default = False, - help = 'redirect stdout and stderr to the workdir/stdout and workdir/stderr files') -parser_filemerge.add_option('-w', '--workdir', action = 'store', dest = 'workdir', default = None, - help = 'the working directory in which to run the experiment') -parser_filemerge.add_option('-n', '--dry-run', action = 'store_true', dest = 'dry_run', default = False, - help = 'use this option to run the whole experiment in a temporary working directory (cleaned after use)') - -def runner_filemerge(options, experiment, mainfile, *other_files): - """ - Start an experiment with parameters given in files. - - Usage: filemerge [options] ... - - Run an experiment with parameters provided in plain text files. - A single experiment will be run with the union of all the - parameters listed in the files. - - Example: - - text.first = "hello" - text.second = "world" - - - number = 12 - numbers.a = 55 - numbers.b = 56 - - Given these files, the following command using filemerge: - $ dbdict-run filemerge mymodule.my_experiment blah1.txt blah2.txt - - is equivalent to this one using cmdline: - $ dbdict-run cmdline mymodule.my_experiment \\ - text.first=hello text.second=world \\ - number=12 numbers.a=55 numbers.b=56 - """ - with open(mainfile) as f: - _state = parse(*map(str.strip, f.readlines())) - for file in other_files: - if '=' in file: - _state.update(parse(file)) - else: - with open(file) as f: - _state.update(parse(*map(str.strip, f.readlines()))) - state = expand(_state) - state.setdefault('dbdict', DD()).experiment = experiment - experiment = resolve(experiment) - if options.workdir and options.dry_run: - raise UsageError('Please use only one of: --workdir, --dry-run.') - if options.workdir: - workdir = options.workdir - elif options.dry_run: - workdir = tempfile.mkdtemp() - else: - workdir = format_d(state, sep=',', space = False) - channel = StandardChannel(workdir, - experiment, state, - redirect_stdout = options.redirect or options.redirect_stdout, - redirect_stderr = options.redirect or options.redirect_stderr) - channel.run(force = options.force) - if options.dry_run: - shutil.rmtree(workdir, ignore_errors=True) - -runner_registry['filemerge'] = (parser_filemerge, runner_filemerge) - - - - -parser_sqlschedule = OptionParser(usage = '%prog sqlschedule [options] ') -parser_sqlschedule.add_option('-f', '--force', action = 'store_true', dest = 'force', default = False, - help = 'force adding the experiment to the database even if it is already there') - -def runner_sqlschedule(options, dbdescr, experiment, *strings): - """ - Schedule a job to run using the sql command. - - Usage: sqlschedule - - See the experiment and parameters topics for more information about - these parameters. - - Assuming that a postgres database is running on `host`, contains a - database called `dbname` and that `user` has the permissions to - create, read and modify tables on that database, tablepath should - be of the following form: - postgres://user:pass@host/dbname/tablename - - If no table is named `tablename`, one will be created - automatically. The state corresponding to the experiment and - parameters specified in the command will be saved in the database, - but no experiment will be run. - - To run an experiment scheduled using sqlschedule, see the sql - command. - - Example use: - dbdict-run sqlschedule postgres://user:pass@host/dbname/tablename \\ - mymodule.my_experiment \\ - stopper::pylearn.stopper.nsteps \\ # use pylearn.stopper.nsteps - stopper.n=10000 \\ # the argument "n" of nsteps is 10000 - lr=0.03 - """ - - try: - username, password, hostname, dbname, tablename \ - = sql.parse_dbstring(dbdescr) - except: - raise UsageError('Wrong syntax for dbdescr') - - db = sql.postgres_serial( - user = username, - password = password, - host = hostname, - database = dbname, - table_prefix = tablename) - - state = parse(*strings) - resolve(experiment) # we try to load the function associated to the experiment - state['dbdict.experiment'] = experiment - sql.add_experiments_to_db([state], db, verbose = 1, add_dups = options.force) - -runner_registry['sqlschedule'] = (parser_sqlschedule, runner_sqlschedule) - - - -parser_sqlschedule_filemerge = OptionParser(usage = '%prog sqlschedule_filemerge [options] ') -parser_sqlschedule_filemerge.add_option('-f', '--force', action = 'store_true', dest = 'force', default = False, - help = 'force adding the experiment to the database even if it is already there') - -def runner_sqlschedule_filemerge(options, dbdescr, experiment, mainfile, *other_files): - """ - Schedule a job to run using the sql command using parameter files. - - This command is to sqlschedule what the filemerge command is to - cmdline. - """ - - try: - username, password, hostname, dbname, tablename \ - = sql.parse_dbstring(dbdescr) - except: - raise UsageError('Wrong syntax for dbdescr') - - db = sql.postgres_serial( - user = username, - password = password, - host = hostname, - database = dbname, - table_prefix = tablename) - - with open(mainfile) as f: - _state = parse(*map(str.strip, f.readlines())) - for file in other_files: - if '=' in file: - _state.update(parse(file)) - else: - with open(file) as f: - _state.update(parse(*map(str.strip, f.readlines()))) - state = _state - - resolve(experiment) # we try to load the function associated to the experiment - state['dbdict.experiment'] = experiment - sql.add_experiments_to_db([state], db, verbose = 1, add_dups = options.force) - -runner_registry['sqlschedule_filemerge'] = (parser_sqlschedule_filemerge, runner_sqlschedule_filemerge) - - - - -parser_sql = OptionParser(usage = '%prog sql [options] ') -parser_sql.add_option('-n', dest = 'n', type = 'int', default = 1, - help = 'Run N experiments sequentially (default 1) ' - '(if N is <= 0, runs as many experiments as possible).') - -def runner_sql(options, dbdescr, exproot): - """ - Run jobs from a sql table. - - Usage: sql - - The jobs should be scheduled first with the sqlschedule command. - - Assuming that a postgres database is running on `host`, contains a - database called `dbname` and that `user` has the permissions to - create, read and modify tables on that database, tablepath should - be of the following form: - postgres://user:pass@host/dbname/tablename - - exproot can be a local path or a remote path. Examples of exproots: - /some/local/path - ssh://some_host:/some/remote/path # relative to the filesystem root - ssh://some_host:other/remote/path # relative to the HOME on some_host - - The exproot will contain a subdirectory hierarchy corresponding to - the dbname, tablename and job id which is a unique integer. - - The sql runner will pick any job in the table which is not running - and is not done and will terminate when that job ends. You may call - the same command multiple times, sequentially or in parallel, to - run as many unfinished jobs as have been scheduled in that table - with sqlschedule. - - Example use: - dbdict-run sql \\ - postgres://user:pass@host/dbname/tablename \\ - ssh://central_host:myexperiments - """ - try: - username, password, hostname, dbname, tablename \ - = sql.parse_dbstring(dbdescr) - except: - raise UsageError('Wrong syntax for dbdescr') - - n = options.n if options.n else -1 - nrun = 0 - try: - while n != 0: - workdir = tempfile.mkdtemp() - #print 'wdir', workdir - channel = DBRSyncChannel(username, password, hostname, dbname, tablename, - workdir, - exproot, - redirect_stdout = True, - redirect_stderr = True) - channel.run() - shutil.rmtree(workdir, ignore_errors=True) - n -= 1 - nrun += 1 - except JobError, e: - if e.args[0] == JobError.NOJOB: - print 'No more jobs to run (run %i jobs)' % nrun - -runner_registry['sql'] = (parser_sql, runner_sql) - - - - - -def runner_help(options, topic = None): - """ - Get help for a topic. - - Usage: help - """ - def bold(x): - return '\033[1m%s\033[0m' % x - if topic is None: - print bold('Topics: (use help for more info)') - print 'example Example of defining and running an experiment.' - print 'experiment How to define an experiment.' - print 'parameters How to list the parameters for an experiment.' - print - print bold('Available commands: (use help for more info)') - for name, (parser, command) in sorted(runner_registry.iteritems()): - print name.ljust(20), format_help(command).split('\n')[0] - return - elif topic == 'experiment': - helptext = """ - - dbdict-run serves to run experiments. To define an experiment, you - only have to define a function respecting the following protocol in - a python file or module: - - def my_experiment(state, channel): - # experiment code goes here - - The return value of my_experiment may be channel.COMPLETE or - channel.INCOMPLETE. If the latter is returned, the experiment may - be resumed at a later point. Note that the return value `None` - is interpreted as channel.COMPLETE. - - If a command defined by dbdict-run has an parameter, - that parameter must be a string such that it could be used in a - python import statement to import the my_experiment function. For - example if you defined my_experiment in my_module.py, you can pass - 'my_module.my_experiment' as the experiment parameter. - - When entering my_experiment, the current working directory will be - set for you to a directory specially created for the experiment. - The location and name of that directory vary depending on which - dbdict-run command you run. You may create logs, save files, pictures, - results, etc. in it. - - state is an object containing the parameters given to the experiment. - For example, if you run the followinc command: - - dbdict-run cmdline my_module.my_experiment a.x=6 - - `state.a.x` will contain the integer 6, and so will `state['a']['x']`. - If the state is changed, it will be saved when the experiment ends - or when channel.save() is called. The next time the experiment is run - with the same working directory, the modified state will be provided. - - It is not recommended to store large amounts of data in the state. It - should be limited to scalar or string parameters. Results such as - weight matrices should be stored in files in the working directory. - - channel is an object with the following important methods: - - - channel.switch() (or channel()) will give the control back to the - user, if it is appropriate to do so. If a call to channel.switch() - returns the string 'stop', it typically means that the signal - SIGTERM (or SIGINT) was received. Therefore, the experiment may be - killed soon, so it should save and return True or - channel.INCOMPLETE so it can be resumed later. This should be - checked periodically or data loss may be incurred. - - - channel.save() will save the current state. It is automatically - called when the function returns, but it is a good idea to do it - periodically. - - - channel.save_and_switch() is an useful shortcut to do both operations - described above. - """ - - elif topic == 'parameters': - helptext = """ - If a command takes arguments, the arguments should each - take one of the following forms: - - key=value - - Set a parameter with name `key` to `value`. The value will be casted - to an appropriate type automatically and it will be accessible to - the experiment using `state.key`. - - If `key` is a dotted name, the value will be set in nested - dictionaries corresponding to each part. - - Examples: - a=1 state.a <- 1 - b=2.3 state.b <- 2.3 - c.d="hello" state.c.d <- "hello" - - key::builder - - This is equivalent to key.__builder__=builder. - - The builder should be a symbol that can be used with import or - __import__ and it should be callable. - - If a key has a builder defined, the experiment code may easily make - an object out of it using the `make` function. `obj = make(state.key)`. - This will call the builder on the substate corresponding to state.key, - as will be made clear in the example: - - Example: - regexp::re.compile - regexp.pattern='a.*c' - - from pylearn.dbdict.newstuff import make - def experiment(state, channel): - regexp = make(state.regexp) # regexp is now re.compile(pattern = 'a.*c') - print regexp.sub('blahblah', 'hello abbbbc there') - - If the above experiment was called with the state produced by the - parameters in the example, it would print 'hello blahblah there'. - """ - - elif topic == 'example': - helptext = """ - Example of an experiment that trains some model for 100000 iterations: - - # defined in: my_experiments.py - def experiment(state, channel): - try: - model = cPickle.load(open('model', 'r')) - except: - model = my_model(state.some_param, state.other_param) - state.n = 0 - dataset = my_dataset(skipto = state.n) - for i in xrange(100000 - state.n): - model.update(dataset.next()) - if i and i % 1000 == 0: - if channel.save_and_switch() == 'stop': - state.n += i + 1 - rval = channel.INCOMPLETE - break - else: - state.result = model.cost(some_test_set) - rval = channel.COMPLETE - cPickle.dump(model, open('model', 'w')) - return rval - - And then you could run it this way: - - dbdict-run cmdline my_experiments.experiment \\ - some_param=1 \\ - other_param=0.4 - - Or this way: - - dbdict-run sqlschedule postgres://user:pass@host/dbname/tablename \\ - my_experiments.experiment \\ - some_param=1 \\ - other_param=0.4 - - dbdict-run sql postgres://user:pass@host/dbname/tablename exproot - - You need to make sure that the module `my_experiments` is accessible - from python. You can check with the command - - $ python -m my_experiments - """ - else: - helptext = runner_registry.get(topic, None)[1] - print format_help(helptext) - -runner_registry['help'] = (None, runner_help) - -################################################################################ -### main -################################################################################ - -def run_cmdline(): - try: - if len(sys.argv) <= 1: - raise UsageError('Usage: %s [*]' % sys.argv[0]) - cmd = None - args = [] - for arg in sys.argv[1:]: - if cmd is not None or arg.startswith('-'): - args.append(arg) - else: - cmd = arg - parse_and_run(cmd, args) - except UsageError, e: - print 'Usage error:' - print e - -if __name__ == '__main__': - run_cmdline() - - diff -r d8af9c8a4bbb -r fa6c399dc292 pylearn/dbdict/sample_create_jobs.py --- a/pylearn/dbdict/sample_create_jobs.py Mon Feb 02 17:41:54 2009 -0500 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,135 +0,0 @@ -import sys, subprocess - -from .dconfig import Config -from .tools import Job # tools.py - -# -# Models -# - -class OldSampleJob(Job): - """ Sample Job object. Cut and paste this for your own experiments. """ - - # default parameters for the job - # config file parameters must be from this list. - a = 0 - b = 0 - c = 'a' - d = 0.0 - - def run(self, results, dry_run=False): - # attributes that you set in 'results' will be saved to job_results.py, next to - # job_config.py when this function returns. - results.f = self.a * self.b - - # The current working directory (cwd) is writable and readable - # it will also be left alone after the job terminates - f = open('some_file', 'w') - print >> f, "hello from the job?" - - return True #restart this job - - - -class SampleJobState(): - - # default parameters for the job - # config file parameters must be from this list. - a = 0 - b = 0 - c = 'a' - d = 0.0 - - -sample_job_table = Table('my_table_for_testing', metadata, - Column('a', Integer), - Column('b', Float(53))) - -metadata.create_all() - -mapper(SampleJobState, sample_job_table) - -s = SampleJobState() -s.a = 5 -s.b = 8.2 - -if Session().query(SampleJobState).filter_by(a=5, b=8.2).any(): - break; -else: - Session().save(s).commit() - - -class SampleJob(Job): - """ Sample Job object. Cut and paste this for your own experiments. """ - - def __init__(self, state): - pass - - def start(self): - pass - - def resume(self): - pass - - def run(self, switch = lambda : 'continue'): - # attributes that you set in 'results' will be saved to job_results.py, next to - # job_config.py when this function returns. - params.f = params.a * params.b - - - # The current working directory (cwd) is writable and readable - # it will also be left alone after the job terminates - f = open('some_file', 'w') - print >> f, "hello from the job?" - while True: - time.sleep(5) - if switch() == 'stop': - break - - return True #continue running if possible... - - def stop(self): - pass - - - def run(self, state, dry_run=False): - -def some_jobs(a = 0, - b = 2): - job_module = 'dbdict.sample_create_jobs' - job_symbol = 'SampleJob' - for c in ('a', 'b', 'c'): - for d in [0.0, 9.9]: - yield locals() - -def create(generator): - """Create a set of job directories""" - jobs = [] - dispatch = sys.stdout #open('jobs','w') - - for config in generator: - #print ' config', config - configdir = 'job_%016x'% abs(hash(config)) - jobs.append(configdir) - create_dirs = True - dry_run = 0 - if not dry_run: - if create_dirs and subprocess.call(('mkdir', configdir)): - print >> sys.stderr, 'Error creating directory: ', configdir - else: - #no problem creating the directory - config.save(configdir + '/job_config.py') - print >> dispatch, "dbdict-run-job run", configdir - else: - #print configdir - pass - dispatch.close() - -if __name__ == '__main__': - create(some_jobs()) - - j = SampleJob(SampleJobState()) - j.start() - j.run() - - diff -r d8af9c8a4bbb -r fa6c399dc292 pylearn/dbdict/scratch.py --- a/pylearn/dbdict/scratch.py Mon Feb 02 17:41:54 2009 -0500 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,451 +0,0 @@ -import threading, time, commands, os, sys, math, random, datetime - -import psycopg2, psycopg2.extensions -from sqlalchemy import create_engine, desc -from sqlalchemy.orm import sessionmaker -from sqlalchemy import Table, Column, MetaData, ForeignKey -from sqlalchemy import Integer, String, Float, DateTime -from sqlalchemy.orm import mapper, relation, backref, eagerload -from sqlalchemy.sql import operators, select - - -########## -# -# Connection stuff -# -# - -if 0: - _db_host = 'jais.iro.umontreal.ca' - - def _pwd(): - if not hasattr(_pwd,'rval'): - pw_cmd = 'ssh bergstrj@grieg.iro.umontreal.ca cat .lisa_db' - rval = commands.getoutput(pw_cmd) - return rval - - def engine(): - """Create an engine to access lisa_db on gershwin - - This function caches the return value between calls. - """ - if not hasattr(engine,'rval'): - pw = _pwd() - db_str ='postgres://bergstrj:%s@%s/lisa_db' % (pw,_db_host) - echo = False #spews pseudo-sql to stdout - engine.rval = create_engine(db_str - ,pool_size=1 # should force the app release extra connections - # releasing connections should let us schedule more jobs, since each one operates - # autonomously most of the time, just checking the db rarely. - # TODO: optimize this for large numbers of jobs - ,echo=echo - ) - return engine.rval - - def engine_serializable(): - """Create an engine to access lisa_db on gershwin, which uses serializable - transaction mode. - - This function caches the return value between calls. - """ - - this = engine_serializable - - if not hasattr(this,'rval'): - pw = _pwd() - def connect(): - c = psycopg2.connect(user='bergstrj', - password=pw, - database='lisa_db', - host='gershwin.iro.umontreal.ca') - c.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE) - return c - pool_size=0 - this.rval = create_engine('postgres://' - ,creator=connect - ,pool_size=0 # should force the app release connections - ) - return this.rval - - - Session = sessionmaker(bind=engine(), autoflush=True, transactional=True) - SessionSerial = sessionmaker(bind=engine_serializable(), - autoflush=True, transactional=True) - -else: - from sqlalchemy import create_engine - from sqlalchemy.orm import sessionmaker - engine = create_engine('sqlite:///:memory:', echo=False) - Session = sessionmaker(bind=engine, autoflush=True, transactional=True) - SessionSerial = Session - - -################ -# -# Database setup -# -table_prefix='bergstrj_scratch_test_' -metadata = MetaData() - -def table_with_id(name, *args): - return Table(name, metadata, - Column('id', Integer, primary_key=True), - *args) - -t_trial = table_with_id(table_prefix+'trial', - Column('desc', String(256)), #comment: why running this trial? - Column('priority', Float(53)), #aka Double - Column('start', DateTime), - Column('finish', DateTime), - Column('host', String(256))) - -t_keyval = table_with_id(table_prefix+'keyval', - Column('name', String(32), nullable=False), #name of attribute - Column('fval', Float(53)), #aka Double - Column('ival', Integer), - Column('sval', String(256))) #TODO: store text (strings of unbounded length) - -t_trial_keyval = table_with_id(table_prefix+'trial_keyval', - Column('trial_id', Integer, ForeignKey('%s.id' % t_trial)), - Column('keyval_id', Integer, ForeignKey('%s.id' % t_keyval))) - -class _KeyVal(object): - @staticmethod - def cache(name, val, session, create=True): - #TODO: consider using a local cache to remove the need to query db - # this takes advantage of common usage, which is to only add KeyVal - # pairs. - #check if it is in the DB - q = session.query(_KeyVal) - if isinstance(val, float): - q = q.filter_by(name=name, fval=val) - elif isinstance(val, int): - q = q.filter_by(name=name, ival=val) - elif isinstance(val, str): - q = q.filter_by(name=name, sval=val) - else: - raise TypeError(val) - rval = q.first() - if rval is None and create: - rval = _KeyVal(name, val) - session.save_or_update(rval) - return rval - - def __init__(self, name, val): - self.name = name - self.val = val - def __get_val(self): - val = None - if self.fval is not None: val = self.fval - if self.ival is not None: val = self.ival - if self.sval is not None: val = self.sval - return val - def __set_val(self, val): - if isinstance(val, float): - self.fval = val - self.ival = None - self.sval = None - elif isinstance(val, int): - self.fval = None - self.ival = val - self.sval = None - elif isinstance(val, str): - self.fval = None - self.ival = None - self.sval = val - else: - raise TypeError(val) - val = property(__get_val, __set_val) - def __repr__(self): - return "" % (self.id, self.name, repr(self.val)) -mapper(_KeyVal, t_keyval) - - -################### -# -# Job interface -# - -class Trial(object): - _native_cols = 'desc', 'priority', 'start', 'finish', 'host' - - #TODO: remove these forbidden keynames, and let all keynames work properly - _forbidden_keynames = set(['filter', 'desc', - 'priority', 'start', 'finish', 'host', - 'create', 'max_sleep', 'max_retry', 'session', 'c', - 'abort', 'complete']) - - #TODO: unittest cases to verify that having these kinds of keys is OK - - class ReserveError(Exception): """reserve failed""" - - @staticmethod - def filter(session=None, **kwargs): - """Construct a query for Trials. - - @param kwargs: each (kw,arg) pair in kwargs, will restrict the list of - Trials to those such that the 'kw' attr has been associated with the job, - and it has value 'arg' - - @return SqlAlchemy query object - - @note: will raise TypeError if any arg in kwargs has a type other than - float, int or string. - - """ - if session is None: - session = Session() - q = session.query(Trial) - for col in Trial._native_cols: - if col in kwargs: - q = q.filter_by(**{col:kwargs[col]}) - del kwargs[col] - for kw, arg in kwargs.items(): - if isinstance(arg, float): - q = q.filter(Trial._attrs.any(name=kw, fval=arg)) - elif isinstance(arg, int): - q = q.filter(Trial._attrs.any(name=kw, ival=arg)) - elif isinstance(arg, str): - q = q.filter(Trial._attrs.any(name=kw, sval=arg)) - else: - raise TypeError(arg) - return q - - @staticmethod - def reserve_unique(query_fn, max_retry=10, max_sleep=5.0): - """Reserve an un-reserved job. - - @param query_fn: build the query for the trial to reserve (see - L{reserve_unique_kw} for example usage). - - @param max_retry: try this many times to reserve a job before raising an exception - - @param max_sleep: L{time.sleep} up to this many seconds between retry attempts - - @return a trial which was reserved (uniquely) by this function call. If - no matching jobs remain, return None. - - """ - s = SessionSerial() - retry = max_retry - trial = None - - while (trial is None) and retry: - - q = query_fn(s.query(Trial)) - q = q.options(eagerload('_attrs')) #TODO is this a good idea? - - trial = q.first() - if trial is None: - return None # no jobs match the query - else: - try: - trial.reserve(session=s) - except Trial.ReserveError, e: - s.rollback() - waittime = random.random() * max_sleep - if debug: print 'another process stole our trial. Waiting %f secs' % wait - time.sleep(waittime) - retry -= 1 - if trial: - s.expunge(trial) - s.close() - return trial - - @staticmethod - def reserve_unique_kw(max_retry=10, max_sleep=5.0, **kwargs): - """Call reserve_unique with a query function that matches jobs with - attributes (and values) given by kwargs. Results are sorted by - priority. - - """ - assert 'start' not in kwargs - assert 'query_fn' not in kwargs - def query_fn(q): - q = q.filter_by(start=None, - **kwargs).order_by(desc(Trial.c.priority)) - return q - - - return Trial.reserve_unique(query_fn, max_retry, max_sleep) - - def __init__(self, desc=None, priority=None, start=None, finish=None, host=None, **kwargs): - self.desc = desc - self.priority = priority - self.start = start - self.finish = finish - self.host = host - self.attrs.update(kwargs) - def __repr__(self): - return "" \ - %(str(self.id), self.desc, self.priority, self.host, - self.start, self.finish) - - def _get_attrs(self): - #This bit of code makes it so that you can type something like: - # - # trial.attrs.score = 50 - # - # It will use the self._attrs list of _KeyVal instances as a backend, - # because these pseudo-attributes (here 'score') are not stored in any - # object's __dict__. - class AttrCatcher(object): - #TODO: make these methods faster with better data structures - def __getattr__(attr_self, attr): - attrlist = self._attrs - for i,a in enumerate(attrlist): - if a.name == attr: - return a.val - raise AttributeError(attr) - def __setattr__(attr_self, attr, val): - n = 0 - s = Session() - assert attr not in Trial._forbidden_keynames - for i,a in enumerate(self._attrs): - if a.name == attr: - attrlist[i] = _KeyVal.cache(attr,val, s) - n += 1 - assert n <= 1 - if n == 0: - self._attrs.append(_KeyVal.cache(attr,val, s)) - s.commit() - def __iter__(_self): - def gen(): - return self._attrs.__iter__() - return gen() - def update(attr_self, dct): - for k,v in dct.items(): - setattr(attr_self, k, v) - - #we can't add attributes to self, so just do this... - return AttrCatcher() #seriously, allocate a new one each time - attrs = property(_get_attrs, doc = ("Provide attribute-like access to the" - " key-value pairs associated with this trial")) - def reserve(self, session): #session should have serialized isolation mode - """Reserve the job for the current process, to the exclusion of all - other processes. In other words, lock it.""" - - serial_self = session.query(Trial).get(self.id) - if serial_self.start is not None: - raise Trial.ReserveError(self.host) - serial_self.start = datetime.datetime.now() - serial_self.host = 'asdf' #TODO: get hostname - try: - session.commit() - except Exception, e: - # different db backends raise different exceptions when a - # commit fails, so here we just treat all problems the same - #s.rollback() # doc says rollback or close after commit exception - session.close() - raise Trial.ReserveError(self.host) - - #Session().refresh(self) #load changes to serial_self into self - - def abort(self): - """Reset job to be available for reservation. - - @return None - - @note: Raises exception if job is finished - - """ - if self.finish is not None: - raise Exception('wtf?') - self.start = None - self.host = None - s = Session() - s.save_or_update(self) - s.commit() - - def complete(self, **kwargs): - """Mark job self as finished and update attrs with kwargs.""" - self.attrs.update(kwargs) - self.finish = datetime.datetime.now() - s = Session() - s.save_or_update(self) - s.commit() - -mapper(Trial, t_trial, - properties = { - '_attrs': relation(_KeyVal, - secondary=t_trial_keyval, - cascade="delete-orphan") - }) - -metadata.create_all(engine) # does nothing when tables already exist - - - - -############ -# -# Test Stuff -# -# - -if __name__ == '__main__': - s = Session() - - def add_some_jobs(): - dvalid, dtest = 'dvalid', 'dtest file' - desc = 'debugging' - - def blah(): - for lr in [0.001, 0.01]: - for scale in [0.0001 * math.sqrt(10.0)**i for i in range(4)]: - for rng_seed in [4, 5, 6]: - for priority in [None, 1, 2]: - yield dict(locals()) - - for kwargs in blah(): - t = Trial(desc=desc, dvalid=dvalid, dtest=dtest, **kwargs) - s.save(t) - - def work(jobtype): - try: - jid = reserve(jobtype) - except StopIteration: - return - - def blah(*args): - print 'blah: ', args - dct = get_next() - - add_some_jobs() - - if 0: - for t in s.query(Trial): - print 'saved:', t, [a.name for a in list(t.attrs)] - - def yield_unique_jobs(): - while True: - rval = Trial.reserve_unique_kw() - if rval is None: - break - else: - yield rval - - if 0: - for job in yield_unique_jobs(): - print 'yielded job', job - job.complete(score=random.random()) - - if 0: - for t in s.query(Trial): - print 'final:', t, [a.name for a in list(t.attrs)] - - q = s.query(Trial) - #q = q.order_by(Trial.attrs) - q = q.filter(Trial._attrs.any(name='rng_seed', ival=5)) - #q = q.filter(Trial._attrs.all(name='lr').order_by(_KeyVal.c.fval)) - - print dir(q) - - - print q - for t in q.all(): - print 'final:', t, [a.name for a in list(t.attrs)] - t.homebrew = 6 - - - diff -r d8af9c8a4bbb -r fa6c399dc292 pylearn/dbdict/sql.py --- a/pylearn/dbdict/sql.py Mon Feb 02 17:41:54 2009 -0500 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,311 +0,0 @@ - -import sys, os, copy, time - -import numpy.random - -import sqlalchemy -import sqlalchemy.pool -from sqlalchemy import create_engine, desc -from sqlalchemy.orm import eagerload -import psycopg2, psycopg2.extensions - -from api0 import db_from_engine, postgres_db, DbHandle - - -EXPERIMENT = 'dbdict.experiment' -#using the dictionary to store these is too slow -STATUS = 'dbdict.status' -HASH = 'dbdict.hash' -PRIORITY = 'dbdict.sql.priority' - -HOST = 'dbdict.sql.hostname' -HOST_WORKDIR = 'dbdict.sql.host_workdir' -PUSH_ERROR = 'dbdict.sql.push_error' - -START = 0 -RUNNING = 1 -DONE = 2 -FUCKED_UP = 666 - -_TEST_CONCURRENCY = False - -def postgres_serial(user, password, host, database, poolclass=sqlalchemy.pool.NullPool, **kwargs): - """Return a DbHandle instance that communicates with a postgres database at transaction - isolation_level 'SERIALIZABLE'. - - :param user: a username in the database - :param password: the password for the username - :param host: the network address of the host on which the postgres server is running - :param database: a database served by the postgres server - - """ - this = postgres_serial - - if not hasattr(this,'engine'): - def connect(): - c = psycopg2.connect(user=user, password=password, database=database, host=host) - c.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE) - return c - this.engine = create_engine('postgres://' - ,creator=connect - ,poolclass=poolclass - ) - - db = db_from_engine(this.engine, **kwargs) - db._is_serialized_session_db = True - return db - -def book_dct_postgres_serial(db, retry_max_sleep=10.0, verbose=0): - """Find a trial in the lisa_db with status START. - - A trial will be returned with status=RUNNING. - - Returns None if no such trial exists in DB. - - This function uses a serial access to the lisadb to guarantee that no other - process will retrieve the same dct. It is designed to facilitate writing - a "consumer" for a Producer-Consumer pattern based on the database. - - """ - print >> sys.stderr, """#TODO: use the priority field, not the status.""" - print >> sys.stderr, """#TODO: ignore entries with key PUSH_ERROR.""" - - s = db.session() #open a new session - - # NB. we need the query and attribute update to be in the same transaction - assert s.autocommit == False - - dcts_seen = set([]) - keep_trying = True - - dct = None - while (dct is None) and keep_trying: - #build a query - q = s.query(db._Dict) - - #N.B. - # use dedicated column to retrieve jobs, not the dictionary keyval pair - # This should be much faster. - q = q.filter(db._Dict.status==START) - q = q.order_by(db._Dict.priority.desc()) - - # this doesn't seem to work, hene the string hack below - q = q.options(eagerload('_attrs')) #hard-coded in api0 - - #try to reserve a dct - try: - #first() may raise psycopg2.ProgrammingError - dct = q.first() - - if dct is not None: - assert (dct not in dcts_seen) - if verbose: print 'book_unstarted_dct retrieved, ', dct - dct._set_in_session(STATUS, RUNNING, s) - if 1: - if _TEST_CONCURRENCY: - print >> sys.stderr, 'SLEEPING BEFORE BOOKING' - time.sleep(10) - - #commit() may raise psycopg2.ProgrammingError - s.commit() - else: - print >> sys.stderr, 'DEBUG MODE: NOT RESERVING JOB!', dct - #if we get this far, the job is ours! - else: - # no jobs are left - keep_trying = False - except (psycopg2.OperationalError, - sqlalchemy.exceptions.ProgrammingError, - sqlalchemy.exc.DBAPIError), e: - #either the first() or the commit() raised - s.rollback() # docs say to do this (or close) after commit raises exception - if verbose: print 'caught exception', e - if dct: - # first() succeeded, commit() failed - dcts_seen.add(dct) - dct = None - wait = numpy.random.rand(1)*retry_max_sleep - if verbose: print 'another process stole our dct. Waiting %f secs' % wait - time.sleep(wait) - - if dct: - str(dct) # for loading of attrs in UGLY WAY!!! - s.close() - return dct - -def book_dct_non_postgres(db): - print >> sys.stderr, """#TODO: use the priority field, not the status.""" - print >> sys.stderr, """#TODO: ignore entries with key self.push_error.""" - - raise NotImplementedError() - - -########### -# Connect -########### - -def parse_dbstring(dbstring): - postgres = 'postgres://' - if not dbstring.startswith(postgres): - raise ValueError('For now, dbdict dbstrings must start with postgres://', dbstring) - dbstring = dbstring[len(postgres):] - - #username_and_password - colon_pos = dbstring.find('@') - username_and_password = dbstring[:colon_pos] - dbstring = dbstring[colon_pos+1:] - - colon_pos = username_and_password.find(':') - if -1 == colon_pos: - username = username_and_password - password = None - else: - username = username_and_password[:colon_pos] - password = username_and_password[colon_pos+1:] - - #hostname - colon_pos = dbstring.find('/') - hostname = dbstring[:colon_pos] - dbstring = dbstring[colon_pos+1:] - - #dbname - colon_pos = dbstring.find('/') - dbname = dbstring[:colon_pos] - dbstring = dbstring[colon_pos+1:] - - #tablename - tablename = dbstring - - if password is None: - password = get_password(hostname, dbname) - - if False: - print 'USERNAME', username - print 'PASS', password - print 'HOST', hostname - print 'DB', dbname - print 'TABLE', tablename - - return username, password, hostname, dbname, tablename - -def get_password(hostname, dbname): - """Return the current user's password for a given database - - :TODO: Replace this mechanism with a section in the pylearn configuration file - """ - password_path = os.getenv('HOME')+'/.dbdict_%s'%dbname - try: - password = open(password_path).readline()[:-1] #cut the trailing newline - except: - raise ValueError( 'Failed to read password for db "%s" from %s' % (dbname, password_path)) - return password - -def db(dbstring): - username, password, hostname, dbname, tablename = parse_dbstring(dbstring) - try: - return postgres_db(username, password, hostname, dbname, table_prefix=tablename) - except: - print 'Error connecting with password', password - raise - - -########### -# Queue -########### - -def insert_dict(jobdict, db, force_dup=False, session=None, priority=1.0): - """Insert a new `job` dictionary into database `db`. - - :param force_dup: forces insertion even if an identical dictionary is already in the db - - """ - # compute hash for the job, will be used to avoid duplicates - job = copy.copy(jobdict) - jobhash = hash(`job`) - - if session is None: - s = db.session() - else: - s = session - - do_insert = force_dup or (None is s.query(db._Dict).filter(db._Dict.hash==jobhash).filter(db._Dict.status!=FUCKED_UP).first()) - - rval = None - if do_insert: - if STATUS not in job: - job[STATUS] = START - if HASH not in job: - job[HASH] = jobhash - if PRIORITY not in job: - job[PRIORITY] = priority - rval = db.insert(job, session=s) - s.commit() - - if session is None: - s.close() - return rval - - -def insert_job(experiment_fn, state, db, force_dup=False, session=None, priority=1.0): - state = copy.copy(state) - experiment_name = experiment_fn.__module__ + '.' + experiment_fn.__name__ - if EXPERIMENT in state: - if state[EXPERIMENT] != experiment_name: - raise Exception('Inconsistency: state element %s does not match experiment %s' %(EXPERIMENT, experiment_name)) - else: - state[EXPERIMENT] = experiment_name - return insert_dict(state, db, force_dup=force_dup, session=session, priority=priority) - - -# TODO: FIXME: WARNING -# Should use insert_dict instead of db.insert. Need one entry point for adding jobs to -# database, so that hashing can be done consistently -def add_experiments_to_db(jobs, db, verbose=0, add_dups=False, type_check=None, session=None): - """Add experiments paramatrized by jobs[i] to database db. - - Default behaviour is to ignore jobs which are already in the database. - - If type_check is a class (instead of None) then it will be used as a type declaration for - all the elements in each job dictionary. For each key,value pair in the dictionary, there - must exist an attribute,value pair in the class meeting the following criteria: - the attribute and the key are equal, and the types of the values are equal. - - :param jobs: The parameters of experiments to run. - :type jobs: an iterable object over dictionaries - :param verbose: print which jobs are added and which are skipped - :param add_dups: False will ignore a job if it matches (on all items()) with a db entry. - :type add_dups: Bool - - :returns: list of (Bool,job[i]) in which the flags mean the corresponding job actually was - inserted. - - """ - rval = [] - for job in jobs: - job = copy.copy(job) - if session is None: - s = db.session() - do_insert = force_dup or (None is db.query(s).filter_eq_dct(job).first()) - s.close() - else: - do_insert = force_dup or (None is db.query(session).filter_eq_dct(job).first()) - - if do_insert: - if type_check: - for k,v in job.items(): - if type(v) != getattr(type_check, k): - raise TypeError('Experiment contains value with wrong type', - ((k,v), getattr(type_check, k))) - - job[STATUS] = START - job[PRIORITY] = 1.0 - if verbose: - print 'ADDING ', job - db.insert(job) - rval.append((True, job)) - else: - if verbose: - print 'SKIPPING', job - rval.append((False, job)) - - diff -r d8af9c8a4bbb -r fa6c399dc292 pylearn/dbdict/sql_commands.py --- a/pylearn/dbdict/sql_commands.py Mon Feb 02 17:41:54 2009 -0500 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,28 +0,0 @@ - -def crazy_sql_command(viewname, cols, dicttab, keytab, id_col='id', dict_id='dict_id'): - - #create or replace view expview as select * from (select id as v1_id, value as nhid from - #test0 where name='nhid') nhid LEFT OUTER JOIN (select id as v2_id, value as lrate from - #test0 where name='lrate') lrate on nhid.v1_id = lrate.v2_id; - - col_queries = [] - colname0 = None - for i, (colname, table_col) in enumerate(cols): - safe_col = colname.replace('_','') # get rid of underscores - safe_col = safe_col.replace('.','_') # replace dots with underscores - - cols[i][0] = safe_col - - q = """LEFT OUTER JOIN - (select %(dict_id)s, %(table_col)s as %(safe_col)s from \"%(keytab)s\" - where name='%(colname)s') %(safe_col)s - on %(safe_col)s.dict_id = %(dicttab)s.%(id_col)s""" % locals() - - col_queries.append(q) - - header = "create or replace view %s as select %s.%s, %s from %s " \ - % (viewname, dicttab, id_col, (", ".join([c[0] for c in cols])), dicttab) - - rval = header + "\n".join(col_queries) - - return rval diff -r d8af9c8a4bbb -r fa6c399dc292 pylearn/dbdict/test_api0.py --- a/pylearn/dbdict/test_api0.py Mon Feb 02 17:41:54 2009 -0500 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,351 +0,0 @@ -from api0 import * -import threading, time, commands, os, sys, math, random, datetime - -import psycopg2, psycopg2.extensions -from sqlalchemy import create_engine, desc -from sqlalchemy.orm import sessionmaker -from sqlalchemy import Table, Column, MetaData, ForeignKey -from sqlalchemy import Integer, String, Float, DateTime, Text, Binary -from sqlalchemy.orm import mapper, relation, backref, eagerload -from sqlalchemy.sql import operators, select - -import unittest - -class T(unittest.TestCase): - - def test_bad_dict_table(self): - """Make sure our crude version of schema checking kinda works""" - engine = create_engine('sqlite:///:memory:', echo=False) - Session = sessionmaker(bind=engine, autoflush=True, transactional=True) - - table_prefix='bergstrj_scratch_test_' - metadata = MetaData() - t_trial = Table(table_prefix+'trial', metadata, - Column('id', Integer, primary_key=True), - Column('desc', String(256)), #comment: why running this trial? - Column('priority', Float(53)), #aka Double - Column('start', DateTime), - Column('finish', DateTime), - Column('host', String(256))) - metadata.create_all(engine) - - try: - h = DbHandle(None, t_trial, None, None) - except ValueError, e: - if e[0] == DbHandle.e_bad_table: - return - self.fail() - - - def go(self): - """Create tables and session_maker""" - engine = create_engine('sqlite:///:memory:', echo=False) - Session = sessionmaker(autoflush=True, transactional=True) - - table_prefix='bergstrj_scratch_test_' - metadata = MetaData() - - t_trial = Table(table_prefix+'trial', metadata, - Column('id', Integer, primary_key=True), - Column('create', DateTime), - Column('write', DateTime), - Column('read', DateTime)) - - t_keyval = Table(table_prefix+'keyval', metadata, - Column('id', Integer, primary_key=True), - Column('name', String(32), nullable=False), #name of attribute - Column('ntype', Integer), - Column('fval', Float(53)), #aka Double - Column('sval', Text), #aka Double - Column('bval', Binary)) #TODO: store text (strings of unbounded length) - - t_trial_keyval = Table(table_prefix+'trial_keyval', metadata, - Column('dict_id', Integer, ForeignKey('%s.id' % t_trial), - primary_key=True), - Column('pair_id', Integer, ForeignKey('%s.id' % t_keyval), - primary_key=True)) - - metadata.bind = engine - metadata.create_all() # does nothing when tables already exist - - self.engine = engine - return Session, t_trial, t_keyval, t_trial_keyval - - - def test_insert_save(self): - - Session, t_dict, t_pair, t_link = self.go() - - db = DbHandle(*self.go()) - - def jobs(): - dvalid, dtest = 'dvalid', 'dtest file' - desc = 'debugging' - for lr in [0.001]: - for scale in [0.0001 * math.sqrt(10.0)**i for i in range(4)]: - for rng_seed in [4, 5, 6]: - for priority in [None, 1]: - yield dict(locals()) - - jlist = list(jobs()) - assert len(jlist) == 1*4*3*2 - for i, dct in enumerate(jobs()): - t = db.insert(**dct) - - #make sure that they really got inserted into the db - orig_keycount = db._session.query(db._KeyVal).count() - self.failUnless(orig_keycount > 0, orig_keycount) - - orig_dctcount = Session().query(db._Dict).count() - self.failUnless(orig_dctcount ==len(jlist), orig_dctcount) - - orig_keycount = Session().query(db._KeyVal).count() - self.failUnless(orig_keycount > 0, orig_keycount) - - #queries - q0list = list(db.query().all()) - q1list = list(db.query()) - q2list = list(db) - - self.failUnless(q0list == q1list, (q0list,q1list)) - self.failUnless(q0list == q2list, (q0list,q1list)) - - self.failUnless(len(q0list) == len(jlist)) - - for i, (j, q) in enumerate(zip(jlist, q0list)): - jitems = list(j.items()) - qitems = list(q.items()) - jitems.sort() - qitems.sort() - if jitems != qitems: - print i - print jitems - print qitems - self.failUnless(jitems == qitems, (jitems, qitems)) - - def test_query_0(self): - Session, t_dict, t_pair, t_link = self.go() - - db = DbHandle(*self.go()) - - def jobs(): - dvalid, dtest = 'dvalid', 'dtest file' - desc = 'debugging' - for lr in [0.001]: - for scale in [0.0001 * math.sqrt(10.0)**i for i in range(4)]: - for rng_seed in [4, 5, 6]: - for priority in [None, 1]: - yield dict(locals()) - - jlist = list(jobs()) - assert len(jlist) == 1*4*3*2 - for i, dct in enumerate(jobs()): - t = db.insert(**dct) - - qlist = list(db.query(rng_seed=5)) - self.failUnless(len(qlist) == len(jlist)/3) - - jlist5 = [j for j in jlist if j['rng_seed'] == 5] - - for i, (j, q) in enumerate(zip(jlist5, qlist)): - jitems = list(j.items()) - qitems = list(q.items()) - jitems.sort() - qitems.sort() - if jitems != qitems: - print i - print jitems - print qitems - self.failUnless(jitems == qitems, (jitems, qitems)) - - def test_delete_keywise(self): - Session, t_dict, t_pair, t_link = self.go() - - db = DbHandle(*self.go()) - - def jobs(): - dvalid, dtest = 'dvalid', 'dtest file' - desc = 'debugging' - for lr in [0.001]: - for scale in [0.0001 * math.sqrt(10.0)**i for i in range(4)]: - for rng_seed in [4, 5, 6]: - for priority in [None, 1]: - yield dict(locals()) - - jlist = list(jobs()) - assert len(jlist) == 1*4*3*2 - for i, dct in enumerate(jobs()): - t = db.insert(**dct) - - orig_keycount = Session().query(db._KeyVal).count() - - del_count = Session().query(db._KeyVal).filter_by(name='rng_seed', - fval=5.0).count() - self.failUnless(del_count == 8, del_count) - - #delete all the rng_seed = 5 entries - qlist_before = list(db.query(rng_seed=5)) - for q in qlist_before: - del q['rng_seed'] - - #check that it's gone from our objects - for q in qlist_before: - self.failUnless('rng_seed' not in q) #via __contains__ - self.failUnless('rng_seed' not in q.keys()) #via keys() - exc=None - try: - r = q['rng_seed'] # via __getitem__ - print 'r,', r - except KeyError, e: - pass - - #check that it's gone from dictionaries in the database - qlist_after = list(db.query(rng_seed=5)) - self.failUnless(qlist_after == []) - - #check that exactly 8 keys were removed - new_keycount = Session().query(db._KeyVal).count() - self.failUnless(orig_keycount == new_keycount + 8, (orig_keycount, - new_keycount)) - - #check that no keys have rng_seed == 5 - gone_count = Session().query(db._KeyVal).filter_by(name='rng_seed', - fval=5.0).count() - self.failUnless(gone_count == 0, gone_count) - - - def test_delete_dictwise(self): - Session, t_dict, t_pair, t_link = self.go() - - db = DbHandle(*self.go()) - - def jobs(): - dvalid, dtest = 'dvalid', 'dtest file' - desc = 'debugging' - for lr in [0.001]: - for scale in [0.0001 * math.sqrt(10.0)**i for i in range(4)]: - for rng_seed in [4, 5, 6]: - for priority in [None, 1]: - yield dict(locals()) - - jlist = list(jobs()) - assert len(jlist) == 1*4*3*2 - for i, dct in enumerate(jobs()): - t = db.insert(**dct) - - orig_keycount = Session().query(db._KeyVal).count() - orig_dctcount = Session().query(db._Dict).count() - self.failUnless(orig_dctcount == len(jlist)) - - #delete all the rng_seed = 5 dictionaries - qlist_before = list(db.query(rng_seed=5)) - for q in qlist_before: - q.delete() - - #check that the right number has been removed - post_dctcount = Session().query(db._Dict).count() - self.failUnless(post_dctcount == len(jlist)-8) - - #check that the remaining ones are correct - for a, b, in zip( - [j for j in jlist if j['rng_seed'] != 5], - Session().query(db._Dict).all()): - self.failUnless(a == b) - - #check that the keys have all been removed - n_keys_per_dict = 8 - new_keycount = Session().query(db._KeyVal).count() - self.failUnless(orig_keycount - 8 * n_keys_per_dict == new_keycount, (orig_keycount, - new_keycount)) - - - def test_setitem_0(self): - Session, t_dict, t_pair, t_link = self.go() - - db = DbHandle(*self.go()) - - b0 = 6.0 - b1 = 9.0 - - job = dict(a=0, b=b0, c='hello') - - dbjob = db.insert(**job) - - dbjob['b'] = b1 - - #check that the change is in db - qjob = Session().query(db._Dict).filter(db._Dict._attrs.any(name='b', - fval=b1)).first() - self.failIf(qjob is dbjob) - self.failUnless(qjob == dbjob) - - #check that the b:b0 key is gone - count = Session().query(db._KeyVal).filter_by(name='b', fval=b0).count() - self.failUnless(count == 0, count) - - #check that the b:b1 key is there - count = Session().query(db._KeyVal).filter_by(name='b', fval=b1).count() - self.failUnless(count == 1, count) - - def test_setitem_1(self): - """replace with different sql type""" - Session, t_dict, t_pair, t_link = self.go() - - db = DbHandle(*self.go()) - - b0 = 6.0 - b1 = 'asdf' # a different dtype - - job = dict(a=0, b=b0, c='hello') - - dbjob = db.insert(**job) - - dbjob['b'] = b1 - - #check that the change is in db - qjob = Session().query(db._Dict).filter(db._Dict._attrs.any(name='b', - sval=b1)).first() - self.failIf(qjob is dbjob) - self.failUnless(qjob == dbjob) - - #check that the b:b0 key is gone - count = Session().query(db._KeyVal).filter_by(name='b', fval=b0).count() - self.failUnless(count == 0, count) - - #check that the b:b1 key is there - count = Session().query(db._KeyVal).filter_by(name='b', sval=b1, - fval=None).count() - self.failUnless(count == 1, count) - - def test_setitem_2(self): - """replace with different number type""" - Session, t_dict, t_pair, t_link = self.go() - - db = DbHandle(*self.go()) - - b0 = 6.0 - b1 = 7 - - job = dict(a=0, b=b0, c='hello') - - dbjob = db.insert(**job) - - dbjob['b'] = b1 - - #check that the change is in db - qjob = Session().query(db._Dict).filter(db._Dict._attrs.any(name='b', - fval=b1)).first() - self.failIf(qjob is dbjob) - self.failUnless(qjob == dbjob) - - #check that the b:b0 key is gone - count = Session().query(db._KeyVal).filter_by(name='b', fval=b0,ntype=1).count() - self.failUnless(count == 0, count) - - #check that the b:b1 key is there - count = Session().query(db._KeyVal).filter_by(name='b', fval=b1,ntype=0).count() - self.failUnless(count == 1, count) - - -if __name__ == '__main__': - unittest.main() diff -r d8af9c8a4bbb -r fa6c399dc292 pylearn/dbdict/tests/test_experiment.py --- a/pylearn/dbdict/tests/test_experiment.py Mon Feb 02 17:41:54 2009 -0500 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,66 +0,0 @@ -from pylearn.dbdict.experiment import * -from unittest import TestCase - -import StringIO - - -class T_subdict(TestCase): - - def test0(self): - a = {'aa':0, 'ab':1, 'bb':2} - s = subdict(a, 'a') # returns dict-like object with keyvals {'a':0, 'b':1} - s['a'] = 5 - s['c'] = 9 - - self.failUnless(s['c'] == 9) - self.failUnless(a['ac'] == 9) - - #check that the subview has the right stuff - sitems = s.items() - sitems.sort() - self.failUnless(sitems == [('a', 5), ('b', 1), ('c', 9)], str(sitems)) - self.failUnless(a['bb'] == 2) - - #add to the subview via the parent - a['az'] = -1 - self.failUnless(s['z'] == -1) - - def test1(self): - a = {'aa':0, 'ab':1, 'bb':2} - - s = subdict(a, 'a') - - r = {} - r.update(s) - - self.failUnless(len(r) == len(s)) - self.failUnless(r == s, (str(r), str(s))) - - -class T_call_with_kwargs_from_dict(TestCase): - - def test0(self): - - def f(a, c=5): - return a+c - - def g(a, **dct): - return a + dct['c'] - - - kwargs = dict(a=1, b=2, c=3) - - io = StringIO.StringIO() - - self.failUnless(call_with_kwargs_from_dict(f, kwargs, logfile=io) == 4) - self.failUnless(io.getvalue() == \ - "WARNING: DictProxyState.call_substate ignoring key-value pair: b 2\n") - self.failUnless(call_with_kwargs_from_dict(g, kwargs, logfile=io) == 4) - self.failUnless(io.getvalue() == \ - "WARNING: DictProxyState.call_substate ignoring key-value pair: b 2\n") - del kwargs['c'] - self.failUnless(call_with_kwargs_from_dict(f, kwargs, logfile=io) == 6) - self.failUnless(io.getvalue() == \ - ("WARNING: DictProxyState.call_substate ignoring key-value pair: b 2\n" - "WARNING: DictProxyState.call_substate ignoring key-value pair: b 2\n")) - diff -r d8af9c8a4bbb -r fa6c399dc292 pylearn/dbdict/tools.py --- a/pylearn/dbdict/tools.py Mon Feb 02 17:41:54 2009 -0500 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,143 +0,0 @@ -"""Helper code for dbdict job drivers.""" - -import sys, inspect - -from .experiment import COMPLETE, INCOMPLETE, subdict - -MODULE = 'dbdict_module' -SYMBOL = 'dbdict_symbol' -PREIMPORT = 'dbdict_preimport' - -def dummy_channel(*args, **kwargs): - return None - -# -#this proxy object lets experiments use a dict like a state object -# -def DictProxyState(dct): - """Convenient dict -> object interface for the state parameters of dbdict jobs. - - In the dbdict job running protocol, the user provides a job as a function with two - arguments: - - def myjob(state, channel): - a = getattr(state, 'a', blah) - b = state.blah - ... - - In the case that the caller of myjob has the attributes of this `state` in a dictionary, - then this `DictProxyState` function returns an appropriate object, whose attributes are - backed by this dictionary. - - """ - defaults_obj = [None] - class Proxy(object): - def substate(s, prefix=''): - return DictProxyState(subdict(dct, prefix)) - - def use_defaults(s, obj): - """Use `obj` to retrieve values when they are not in the `dict`. - - :param obj: a dictionary of default values. - """ - defaults_obj[0] = obj - - def __getitem__(s,a): - """Returns key `a` from the underlying dict, or from the defaults. - - Raises `KeyError` on failure. - """ - try: - return dct[a] - except Exception, e: - try: - return getattr(defaults_obj[0], a) - except: - raise e - - def __setitem__(s,a,v): - """Sets key `a` equal to `v` in the underlying dict. """ - dct[a] = v - - def __getattr__(s,a): - """Returns value of key `a` from the underlying dict first, then from the defaults. - - Raises `AttributeError` on failure. - """ - try: - return dct[a] - except KeyError: - return getattr(defaults_obj[0], a) - def __setattr__(s,a,v): - dct[a] = v - return Proxy() - -def load_state_fn(state): - - # - # load the experiment class - # - dbdict_module_name = state[MODULE] - dbdict_symbol = state[SYMBOL] - - preimport_list = state.get(PREIMPORT, "").split() - for preimport in preimport_list: - __import__(preimport, fromlist=[None], level=0) - - try: - dbdict_module = __import__(dbdict_module_name, fromlist=[None], level=0) - dbdict_fn = getattr(dbdict_module, dbdict_symbol) - except: - print >> sys.stderr, "FAILED to load job symbol:", dbdict_module_name, dbdict_symbol - print >> sys.stderr, "PATH", sys.path - raise - - return dbdict_fn - - -def run_state(state, channel = dummy_channel): - fn = load_state_fn(state) - rval = fn(state, channel) - if rval not in (COMPLETE, INCOMPLETE): - print >> sys.stderr, "WARNING: INVALID job function return value" - return rval - -def call_with_kwargs_from_dict(fn, dct, logfile='stderr'): - """Call function `fn` with kwargs taken from dct. - - When fn has a '**' parameter, this function is equivalent to fn(**dct). - - When fn has no '**' parameter, this function removes keys from dct which are not parameter - names of `fn`. The keys which are ignored in this way are logged to the `logfile`. If - logfile is the string 'stdout' or 'stderr', then errors are logged to sys.stdout or - sys.stderr respectively. - - :param fn: function to call - :param dct: dictionary from which to take arguments of fn - :param logfile: log ignored keys to this file - :type logfile: file-like object or string 'stdout' or string 'stderr' - - :returns: fn(**) - - """ - argspec = inspect.getargspec(fn) - argnames = argspec[0] - if argspec[2] == None: #if there is no room for a **args type-thing in fn... - kwargs = {} - for k,v in dct.items(): - if k in argnames: - kwargs[k] = v - else: - if not logfile: - pass - elif logfile == 'stderr': - print >> sys.stderr, "WARNING: DictProxyState.call_substate ignoring key-value pair:", k, v - elif logfile == 'stdout': - print >> sys.stdout, "WARNING: DictProxyState.call_substate ignoring key-value pair:", k, v - else: - print >> logfile, "WARNING: DictProxyState.call_substate ignoring key-value pair:", k, v - return fn(**kwargs) - else: - #there is a **args type thing in fn. Here we pass everything. - return fn(**s.subdict(prefix)) - diff -r d8af9c8a4bbb -r fa6c399dc292 pylearn/external/wrap_libsvm.py --- a/pylearn/external/wrap_libsvm.py Mon Feb 02 17:41:54 2009 -0500 +++ b/pylearn/external/wrap_libsvm.py Tue Feb 03 16:47:40 2009 -0500 @@ -61,6 +61,7 @@ #libsvm needs stuff in int32 on a 32bit machine #TODO: test this on a 64bit machine + # -> Both int32 and int64 (default) seem to be OK train_y = numpy.asarray(dataset.train.y, dtype='int32') valid_y = numpy.asarray(dataset.valid.y, dtype='int32') test_y = numpy.asarray(dataset.test.y, dtype='int32')