Mercurial > pylearn
changeset 538:798607a058bd
added missing files
author | James Bergstra <bergstrj@iro.umontreal.ca> |
---|---|
date | Wed, 12 Nov 2008 22:00:20 -0500 |
parents | b054271b2504 |
children | e3f84d260023 |
files | LICENSE README.txt bin/dbdict-query bin/dbdict-run bin/dbdict-run-job pylearn/algorithms/minimizer.py pylearn/datasets/embeddings/percent.py pylearn/dbdict/__init__.py pylearn/dbdict/api0.py pylearn/dbdict/crap.py pylearn/dbdict/dbdict_run.py pylearn/dbdict/dbdict_run_sql.py pylearn/dbdict/dconfig.py pylearn/dbdict/design.txt pylearn/dbdict/experiment.py pylearn/dbdict/sample_create_jobs.py pylearn/dbdict/scratch.py pylearn/dbdict/sql_commands.py pylearn/dbdict/test_api0.py pylearn/dbdict/tools.py pylearn/external/__init__.py pylearn/io/__init__.py setup.py |
diffstat | 20 files changed, 2975 insertions(+), 0 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/LICENSE Wed Nov 12 22:00:20 2008 -0500 @@ -0,0 +1,24 @@ +Copyright (c) 2008, Theano Development Team +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. + * Neither the name of Theano nor the names of its contributors may be + used to endorse or promote products derived from this software without + specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY +EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDERS BE LIABLE FOR ANY +DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND +ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/bin/dbdict-query Wed Nov 12 22:00:20 2008 -0500 @@ -0,0 +1,3 @@ +#!/usr/bin/env python +from dbdict.tools import standalone_query +standalone_query()
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/bin/dbdict-run Wed Nov 12 22:00:20 2008 -0500 @@ -0,0 +1,3 @@ +#!/usr/bin/env python +from pylearn.dbdict.dbdict_run import RunExperiment +RunExperiment()
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/bin/dbdict-run-job Wed Nov 12 22:00:20 2008 -0500 @@ -0,0 +1,3 @@ +#!/usr/bin/env python +from dbdict.tools import standalone_run_job +standalone_run_job()
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/pylearn/algorithms/minimizer.py Wed Nov 12 22:00:20 2008 -0500 @@ -0,0 +1,43 @@ +"""Define the interface and factory for gradient-based minimizers. +""" +from theano.compile import module + +_minimizers = {} + +class DummyMinimizer(module.FancyModule): + """ The idea of a minimizer is that it provides an `step` function that will + eventually converge toward (maybe realize?) the minimum of a cost function. + + The step_cost function takes a step and returns the cost associated with either + the current or previous parameter values (return whichever is easiest to compute, it's + meant for user feedback.) + + """ + def __init__(self, args, cost, parameters, gradients=None): + super(DummyMinimizer, self).__init__() + #gradients = T.grad(cost, parameters) if gradients is None else gradients + #self.step = module.Method(args, None) + #self.step_cost = module.Method(args, cost) + def _instance_step(self, obj, *args): + pass + def _instance_step_cost(self, obj, *args): + pass + +def minimizer_factory(algo): + def decorator(fn): + if algo in _minimizers: + raise Exception('algo in use', algo) + else: + _minimizers[algo] = fn + return fn + return decorator + +@minimizer_factory('dummy') +def dummy_minimizer(): + def m(args, cost, parameters, gradients=None): + return DummyMinimizer(args, cost, parameters, gradients) + return m + +def make_minimizer(algo, **kwargs): + return _minimizers[algo](**kwargs) +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/pylearn/datasets/embeddings/percent.py Wed Nov 12 22:00:20 2008 -0500 @@ -0,0 +1,9 @@ +def percent(a, b): + """ + Return percentage string of a and b, e.g.: + "1 of 10 (10%)" + """ + assert a <= b + assert a >= 0 + assert b > 0 + return "%s of %s (%.2f%%)" % (a, b, 100.*a/b)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/pylearn/dbdict/__init__.py Wed Nov 12 22:00:20 2008 -0500 @@ -0,0 +1,1 @@ +from api0 import sqlite_file_db, sqlite_memory_db
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/pylearn/dbdict/api0.py Wed Nov 12 22:00:20 2008 -0500 @@ -0,0 +1,511 @@ +from sqlalchemy import create_engine, desc +from sqlalchemy.orm import sessionmaker +from sqlalchemy import Table, Column, MetaData, ForeignKey +from sqlalchemy import Integer, String, Float, Boolean, DateTime, Text, Binary +from sqlalchemy.orm import mapper, relation, backref, eagerload +from sqlalchemy.sql import operators, select +from sql_commands import crazy_sql_command + +class Todo(Exception): """Replace this with some working code!""" + +class DbHandle (object): + """ + This class also provides filtering shortcuts that hide the names of the + DbHandle internal databases. + + Attributes: + dict_table + pair_table + link_table + + + + dict_table + + An SqlAlchemy-mapped class corresponding to database table with the + following schema: + + Column('id', Integer, primary_key=True) + Column('create', DateTime) + Column('write', DateTime) + Column('read', DateTime) + + #TODO: reconsider create/read/write + + pair_table + + An SqlAlchemy-mapped class corresponding to database table with the + following schema: + + Column('id', Integer, primary_key=True) + Column('name', String(128)) + Column('ntype', Boolean) + Column('fval', Double) + Column('sval', Text)) + Column('bval', Blob) + + #TODO: Consider difference between text and binary + #TODO: Consider adding a 'type' column + #TODO: Consider union? + #TODO: Are there stanard ways of doing this kind of thing? + + link_table + + An SqlAlchemy-mapped class corresponding to database table with the + following schema: + + Column('dict_id', Integer, ForeignKey('%s.id' % t_trial), primary_key=True), + Column('keyval_id', Integer, ForeignKey('%s.id' % t_keyval), primary_key=True)) + + """ + + e_bad_table = 'incompatible columns in table' + + def __init__(h_self, Session, engine, dict_table, pair_table, link_table): + h_self._engine = engine; + h_self._dict_table = dict_table + h_self._pair_table = pair_table + h_self._link_table = link_table + + #TODO: replace this crude algorithm (ticket #17) + if ['id', 'create', 'write', 'read'] != [c.name for c in dict_table.c]: + raise ValueError(h_self.e_bad_table, dict_table) + if ['id', 'name', 'ntype', 'fval', 'sval', 'bval'] != [c.name for c in pair_table.c]: + raise ValueError(h_self.e_bad_table, pair_table) + if ['dict_id', 'pair_id'] != [c.name for c in link_table.c]: + raise ValueError(h_self.e_bad_table, pair_table) + + h_self._session_fn = Session + h_self._session = Session() + + class KeyVal (object): + """KeyVal interfaces between python types and the database. + + It encapsulates heuristics for type conversion. + """ + def __init__(k_self, name, val): + k_self.name = name + k_self.val = val + def __repr__(k_self): + return "<Param(%s,'%s', %s)>" % (k_self.id, k_self.name, repr(k_self.val)) + def __get_val(k_self): + val = None + if k_self.fval is not None: val = [int, float][k_self.ntype](k_self.fval) + if k_self.bval is not None: val = eval(str(k_self.bval)) + if k_self.sval is not None: val = k_self.sval + return val + def __set_val(k_self, val): + if isinstance(val, (str,unicode)): + k_self.fval = None + k_self.bval = None + k_self.sval = val + else: + k_self.sval = None + try: + f = float(val) + except (TypeError, ValueError): + f = None + if f is None: #binary data + k_self.bval = repr(val) + assert eval(k_self.bval) == val + k_self.fval = None + k_self.ntype = None + else: + k_self.bval = None + k_self.fval = f + k_self.ntype = isinstance(val,float) + val = property(__get_val, __set_val) + + mapper(KeyVal, pair_table) + + class Dict (object): + """ + Instances are dict-like objects with additional features for + communicating with an active database. + + This class will be mapped by SqlAlchemy to the dict_table. + + Attributes: + handle - reference to L{DbHandle} (creator) + + """ + def __init__(d_self): + s = h_self._session + s.save(d_self) + s.commit() + + _forbidden_keys = set(['session']) + + # + # dictionary interface + # + + def __contains__(d_self, key): + for a in d_self._attrs: + if a.name == key: + return True + return False + + def __eq__(self, other): + return dict(self) == dict(other) + def __neq__(self, other): + return dict(self) != dict(other) + + def __getitem__(d_self, key): + for a in d_self._attrs: + if a.name == key: + return a.val + raise KeyError(key) + + def __setitem__(d_self, key, val): + s = h_self._session + d_self._set_in_session(key, val, s) + s.update(d_self) #session update, not dict-like update + s.commit() + + def __delitem__(d_self, key): + s = h_self._session + #find the item to delete in d_self._attrs + to_del = None + for i,a in enumerate(d_self._attrs): + if a.name == key: + assert to_del is None + to_del = (i,a) + if to_del is None: + raise KeyError(key) + else: + i, a = to_del + s.delete(a) + del d_self._attrs[i] + s.commit() + s.update(d_self) + + def items(d_self): + return [(kv.name, kv.val) for kv in d_self._attrs] + + def keys(d_self): + return [kv.name for kv in d_self._attrs] + + def values(d_self): + return [kv.val for kv in d_self._attrs] + + def update(d_self, dct, **kwargs): + """Like dict.update(), set keys from kwargs""" + s = h_self._session + for k, v in dct.items(): + d_self._set_in_session(k, v, s) + for k, v in kwargs.items(): + d_self._set_in_session(k, v, s) + s.update(d_self) + s.commit() + + def get(d_self, key, default): + try: + return d_self[key] + except KeyError: + return default + + # + # database stuff + # + + def refresh(d_self, session=None): + """Sync key-value pairs from database to self + + @param session: use the given session, and do not commit. + + """ + if session is None: + session = h_self._session + session.refresh(d_self) + session.commit() + else: + session.refresh(self.dbrow) + + def delete(d_self, session=None): + """Delete this dictionary from the database + + @param session: use the given session, and do not commit. + """ + if session is None: + session = h_self._session + session.delete(d_self) + session.commit() + else: + session.delete(d_self) + + # helper routine by update() and __setitem__ + def _set_in_session(d_self, key, val, session): + """Modify an existing key or create a key to hold val""" + if key in d_self._forbidden_keys: + raise KeyError(key) + created = None + for i,a in enumerate(d_self._attrs): + if a.name == key: + assert created == None + created = h_self._KeyVal(key, val) + d_self._attrs[i] = created + if not created: + created = h_self._KeyVal(key, val) + d_self._attrs.append(created) + session.save(created) + + mapper(Dict, dict_table, + properties = { + '_attrs': relation(KeyVal, + secondary=link_table, + cascade="all, delete-orphan") + }) + + class _Query (object): + """ + Attributes: + _query - SqlAlchemy.Query object + """ + + def __init__(q_self, query): + q_self._query = query + + def __iter__(q_self): + return q_self.all().__iter__() + + def __getitem__(q_self, item): + return q_self._query.__getitem__(item) + + def filter_by(q_self, **kwargs): + """Return a Query object that restricts to dictionaries containing + the given kwargs""" + + #Note: when we add new types to the key columns, add them here + q = q_self._query + T = h_self._Dict + for kw, arg in kwargs.items(): + if isinstance(arg, (str,unicode)): + q = q.filter(T._attrs.any(name=kw, sval=arg)) + else: + try: + f = float(arg) + except (TypeError, ValueError): + f = None + if f is None: + q = q.filter(T._attrs.any(name=kw, bval=repr(arg))) + else: + q = q.filter(T._attrs.any(name=kw, fval=f)) + + return h_self._Query(q) + + def all(q_self): + """Return an iterator over all matching dictionaries. + + See L{SqlAlchemy.Query} + """ + return q_self._query.all() + + def count(q_self): + """Return the number of matching dictionaries. + + See L{SqlAlchemy.Query} + """ + return q_self._query.count() + + def first(q_self): + """Return some matching dictionary, or None + See L{SqlAlchemy.Query} + """ + return q_self._query.first() + + def all_ordered_by(q_self, key, desc=False): + """Return query results, sorted. + + @type key: string or tuple of string or list of string + @param: keys by which to sort the results. + + @rtype: list of L{DbHandle._Dict} instances + @return: query results, sorted by given keys + """ + + # order_by is not easy to do in SQL based on the data structures we're + # using. Considering we support different data types, it may not be + # possible at all. + # + # It would be easy if 'pivot' or 'crosstab' were provided as part of the + # underlying API, but they are not. For example, read this: + # http://www.simple-talk.com/sql/t-sql-programming/creating-cross-tab-queries-and-pivot-tables-in-sql/ + + # load query results + results = list(q_self.all()) + + if isinstance(key, (tuple, list)): + val_results = [([d[k] for k in key], d) for d in results] + else: + val_results = [(d[key], d) for d in results] + + val_results.sort() #interesting: there is an optional key parameter + if desc: + val_results.reverse() + return [vr[-1] for vr in val_results] + + h_self._KeyVal = KeyVal + h_self._Dict = Dict + h_self._Query = _Query + + def __iter__(h_self): + return h_self.query().__iter__() + + def insert_kwargs(h_self, **dct): + """ + @rtype: DbHandle with reference to self + @return: a DbHandle initialized as a copy of dct + + @type dct: dict-like instance whose keys are strings, and values are + either strings, integers, floats + + @param dct: dictionary to insert + + """ + rval = h_self._Dict() + if dct: rval.update(dct) + return rval + def insert(h_self, dct): + """ + @rtype: DbHandle with reference to self + @return: a DbHandle initialized as a copy of dct + + @type dct: dict-like instance whose keys are strings, and values are + either strings, integers, floats + + @param dct: dictionary to insert + + """ + rval = h_self._Dict() + if dct: rval.update(dct) + return rval + + def query(h_self, **kwargs): + """Construct an SqlAlchemy query, which can be subsequently filtered + using the instance methods of DbQuery""" + + return h_self._Query(h_self._session.query(h_self._Dict)\ + .options(eagerload('_attrs')))\ + .filter_by(**kwargs) + + def createView(h_self, view): + + s = h_self._session; + cols = []; + + for col in view.columns: + if col.name is "id": + continue; + elif isinstance(col.type, (Integer,Float)): + cols.append([col.name,'fval']); + elif isinstance(col.type,String): + cols.append([col.name,'sval']); + elif isinstance(col.type,Binary): + cols.append([col.name,'bval']); + else: + assert "Error: wrong column type in view",view.name; + + # generate raw sql command string + viewsql = crazy_sql_command(view.name, cols, \ + h_self._pair_table.name, \ + h_self._link_table.name); + + #print 'Creating sql view with command:\n', viewsql; + + h_self._engine.execute(viewsql); + s.commit(); + + class MappedClass(object): + pass + + mapper(MappedClass, view) + + return MappedClass + + def session(h_self): + return h_self._session_fn() + + +def db_from_engine(engine, + table_prefix='DbHandle_default_', + trial_suffix='trial', + keyval_suffix='keyval', + link_suffix='link'): + """Create a DbHandle instance + + @type engine: sqlalchemy engine (e.g. from create_engine) + @param engine: connect to this database for transactions + + @type table_prefix: string + @type trial_suffix: string + @type keyval_suffix: string + @type link_suffix: string + + @rtype: DbHandle instance + + @note: The returned DbHandle will use three tables to implement the + many-to-many pattern that it needs: + - I{table_prefix + trial_suffix}, + - I{table_prefix + keyval_suffix} + - I{table_prefix + link_suffix} + + """ + Session = sessionmaker(autoflush=True, autocommit=False) + + metadata = MetaData() + + t_trial = Table(table_prefix+trial_suffix, metadata, + Column('id', Integer, primary_key=True), + Column('create', DateTime), + Column('write', DateTime), + Column('read', DateTime)) + + t_keyval = Table(table_prefix+keyval_suffix, metadata, + Column('id', Integer, primary_key=True), + Column('name', String(128), nullable=False), #name of attribute + Column('ntype', Boolean), + Column('fval', Float(53)), + Column('sval', Text), + Column('bval', Binary)) + + t_link = Table(table_prefix+link_suffix, metadata, + Column('dict_id', Integer, ForeignKey('%s.id' % t_trial), + primary_key=True), + Column('pair_id', Integer, ForeignKey('%s.id' % t_keyval), + primary_key=True)) + + metadata.bind = engine + metadata.create_all() # no-op when tables already exist + #warning: tables can exist, but have incorrect schema + # see bug mentioned in DbHandle constructor + + return DbHandle(Session, engine, t_trial, t_keyval, t_link) + +def sqlite_memory_db(echo=False, **kwargs): + """Return a DbHandle backed by a memory-based database""" + engine = create_engine('sqlite:///:memory:', echo=False) + return db_from_engine(engine, **kwargs) + +def sqlite_file_db(filename, echo=False, **kwargs): + """Return a DbHandle backed by a file-based database""" + engine = create_engine('sqlite:///%s' % filename, echo=False) + return db_from_engine(engine, **kwargs) + +_db_host = 'jais.iro.umontreal.ca' +_pwd='potatomasher'; + +def postgres_db(user, password, host, database, echo=False, **kwargs): + """Create an engine to access a postgres_dbhandle + """ + db_str ='postgres://%(user)s:%(password)s@%(host)s/%(database)s' % locals() + + # should force the app release extra connections releasing + # connections should let us schedule more jobs, since each one + # operates autonomously most of the time, just checking the db + # rarely. TODO: optimize this for large numbers of jobs + pool_size = 0; + engine = create_engine(db_str, pool_size=pool_size, echo=echo) + + return db_from_engine(engine, **kwargs) +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/pylearn/dbdict/crap.py Wed Nov 12 22:00:20 2008 -0500 @@ -0,0 +1,256 @@ + +from __future__ import absolute_import + +import os, sys, socket, datetime, copy, tempfile, shutil +from .dconfig import Config, load +from .api0 import sqlite_memory_db +from .api0 import postgres_serial + + +class Job(object): + @classmethod + def is_a_default_option(cls, key, val): + if callable(val): + return False + if key.startswith('_'): + return False + if key in ('job_module', 'job_symbol'): + return False + return True + + @classmethod + def options(cls): + opts = [attr for attr in dir(cls) + if cls.is_a_default_option(attr, getattr(cls, attr))] + opts.sort() + return opts + + + def __init__(self, config): + for key, val in config.items(): + if self.is_a_default_option(key, val): + if hasattr(self, key): + setattr(self, key, val) + else: + raise AttributeError("Job has no such option:", key) + + @classmethod + def print_defaults(cls): + for attr in cls.options(): + print attr, '=', getattr(cls, attr) + + def print_config(self, outfile=sys.stdout): + for attr in self.options(): + print >> outfile, attr, '=', getattr(self, attr) + + def save_config(self, outfile=sys.stdout): + """Like print_config, but with parsable format rather than human-readable format""" + kwargs = dict([(attr,getattr(self, attr)) for attr in self.options()]) + Config(**kwargs).save_file(outfile) + +def run_job(fn=None, cwd=None, + + config_path = 'job_config.py', + result_path = 'job_result.py', + stdout_path = 'stdout', + stderr_path = 'stderr', + work_dir = 'workdir' + ): + cwd = os.getcwd() if cwd is None else cwd + +def _pop_cwd(): + try: + cwd = sys.argv.pop(0) + if not cwd.startswith('/'): + cwd = os.path.join(os.getcwd(), cwd) + except IndexError: + cwd = os.getcwd() + return cwd + + +class RunJob(object): + """ + """ + + path_perf = 'job_perf.py' + path_results = 'job_results.py' + path_config = 'job_config.py' + path_fullconfig = 'job_fullconfig.py' + path_stdout = 'stdout' + path_stderr = 'stderr' + path_workdir = 'workdir' + + def __init__(self, exename): + pass + + def _load_config(self, cwd = None): + cwd = _pop_cwd() if cwd is None else cwd + config = load(os.path.join(cwd, self.path_config)) + return config + + def _load_job_class(self, config=None): + config = self._load_config() if config is None else config + job_module_name = config.job_module + job_symbol = config.job_symbol + #level=0 -> absolute imports + #fromlist=[None] -> evaluate the rightmost element of the module name, when it has + # dots (e.g., "A.Z" will return module Z) + job_module = __import__(job_module_name, fromlist=[None], level=0) + try: + job_class = getattr(job_module, job_symbol) + except: + print >> sys.stderr, "failed to load job class:", job_module_name, job_symbol + raise + return job_class + + def print_config(self): + config = self._load_config() + job_class = self._load_job_class(config) + job = job_class(config) + job.print_config() + + def run(self): + cwd = _pop_cwd() + config = self._load_config(cwd) + job_class = self._load_job_class(config) + job = job_class(config) + + job.save_config(open(os.path.join(cwd, self.path_fullconfig),'w')) + + perf = Config() + perf.host_name = socket.gethostname() + perf.start_time = str(datetime.datetime.now()) + perf.save(os.path.join(cwd, self.path_perf)) + + stdout_orig = sys.stdout + stderr_orig = sys.stderr + + try: + sys.stdout = open(os.path.join(cwd, self.path_stdout), 'w') + sys.stderr = open(os.path.join(cwd, self.path_stderr), 'w') + + # + # mess around with the working directory + # + + wd = os.path.join(cwd, self.path_workdir) + try: + os.mkdir(wd) + except OSError, e: + print >> sys.stderr, "trouble making wordking directory:" + print >> sys.stderr, e + print >> sys.stderr, "ignoring error and proceeding anyway" + try: + os.chdir(wd) + except: + pass + + print >> sys.stderr, "cwd:", os.getcwd() + + # + # run the job... + # + + #TODO load the results file before running the job, to resume job + results = Config() + + job_rval = job.run(results) + + #TODO: use the return value to decide whether the job should be resumed or not, + # False means run is done + # True means the job has yielded, but should be continued + results.save(os.path.join(cwd, self.path_results)) + + finally: + #put back stderr and stdout + sys.stdout = stdout_orig + sys.stderr = stderr_orig + perf.end_time = str(datetime.datetime.now()) + perf.save(os.path.join(cwd, self.path_perf)) + + def defaults(self): + job_class = self._load_job_class() + job_class.print_defaults() + +def standalone_run_job(): + exe = RunJob(sys.argv.pop(0)) + try: + cmd = sys.argv.pop(0) + fn = getattr(exe, cmd) + except IndexError: + fn = getattr(exe, 'run') + except AttributeError: + print >> sys.stderr, "command not supported", cmd + + fn() + +def build_db(cwd, db=None): + """WRITEME""" + db = sqlite_memory_db() if db is None else db + for e in os.listdir(cwd): + e = os.path.join(cwd, e) + try: + e_config = open(os.path.join(e, 'job_config.py')) + except: + e_config = None + + try: + e_sentinel = open(os.path.join(e, '__jobdir__')) + except: + e_sentinel = None + + if not (e_config or e_sentinel): + continue #this is not a job dir + + if e_config: + e_config.close() + config = load(os.path.join(e, 'job_config.py')) + kwargs = copy.copy(config.__dict__) + try: + results = load(os.path.join(e, 'job_results.py')) + kwargs.update(results.__dict__) + except: + pass + try: + perf = load(os.path.join(e, 'job_perf.py')) + kwargs.update(perf.__dict__) + except: + pass + + #TODO: this is a backward-compatibility hack for AISTATS*09 + if 'perf_jobdir' not in kwargs: + kwargs['perf_jobdir'] = e + if 'perf_workdir' not in kwargs: + kwargs['perf_workdir'] = os.path.join(e, 'workdir') + + entry = db.insert(kwargs) + + if e_sentinel: + print >> sys.stderr, "NOT-IMPLEMENTED: RECURSION INTO SUBDIRECTORY", e + return db + + +class RunQuery(object): + + def __init__(self, exename): + pass + + def run(self): + cwd = _pop_cwd() + db = build_db(cwd) + for entry in db.query().all(): + print entry.items() + + +def standalone_query(): + exe = RunQuery(sys.argv.pop(0)) + try: + cmd = sys.argv.pop(0) + fn = getattr(exe, cmd) + except IndexError: + fn = getattr(exe, 'run') + except AttributeError: + print >> sys.stderr, "command not supported", cmd + + fn() +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/pylearn/dbdict/dbdict_run.py Wed Nov 12 22:00:20 2008 -0500 @@ -0,0 +1,66 @@ +import sys +from .tools import DictProxyState, load_state_fn, run_state + +# N.B. +# don't put exotic imports here +# see what I did with def sql() below... since we shouldn't try to import that stuff unless we +# need it. + +_experiment_usage = """Usage: + dbdict-experiment <cmd> + +Commands: + + help Print help. (You're looking at it.) + + cmdline Obtain experiment configuration by evaluating the commandline + sql Obtain experiment configuration by querying an sql database + +Help on individual commands might be available by typing 'dbdict-experiment <cmd> help' + +""" + #dbdict Obtain experiment configuration by loading a dbdict file + +def _dispatch_cmd(self, stack=sys.argv): + try: + cmd = stack.pop(0) + except IndexError: + cmd = 'help' + try: + fn = getattr(self, cmd) + except AttributeError: + print >> sys.stderr, "command not supported", cmd + fn = self.help + fn() + +class RunCmdline(object): + def __init__(self): + try: + dct = eval('dict(' + sys.argv.pop(0) + ')') + except Exception, e: + print >> sys.stderr, "Exception:", e + self.help() + return + + run_state(DictProxyState(dct)) + print dct + + def help(self): + print >> sys.stderr, "Usage: dbdict-experiment cmdline <config>" + + +class RunExperiment(object): + """This class handles the behaviour of the dbdict-run script.""" + def __init__(self): + exe = sys.argv.pop(0) + #print 'ARGV', sys.argv + _dispatch_cmd(self) + + def sql(self): + from .dbdict_run_sql import run_sql + return run_sql() + + cmdline = RunCmdline + + def help(self): + print _experiment_usage # string not inlined so as not to fool VIM's folding mechanism
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/pylearn/dbdict/dbdict_run_sql.py Wed Nov 12 22:00:20 2008 -0500 @@ -0,0 +1,434 @@ +"""Code related to using the api0 dictionary table to run experiments. + +This module has been tested with postgres. + +""" + +import sys, os, socket, tempfile, shutil, copy, time +import numpy # for the random wait time in book_unstarted_trial + +import sqlalchemy +from sqlalchemy import create_engine, desc +from sqlalchemy.orm import eagerload +import psycopg2, psycopg2.extensions + +from .api0 import db_from_engine, postgres_db +from .tools import run_state, DictProxyState, COMPLETE, INCOMPLETE, SYMBOL, MODULE # tools +from .dconfig import save_items + +# _TEST CONCURRENCY +# To ensure that concurrency is handled properly by the consumer (book_dct) +# set this flag to True and (manually) run the following test. +# +# Launch two processes side by side, starting one of them a few seconds after the other. +# There is an extra sleep(10) that will delay each process's job dequeue. +# +# You should see that the process that started first gets the job, +# and the process that started second tries to get the same job, +# fails, and then gets another one instead. +_TEST_CONCURRENCY = False + +_help = """Usage: +dbdict-run sql postgres://<user>:<pass>@<host>/<db>/<api0-table> <experiment-root> + + user - postgres username + pass - password + hostname - the network address of the host on which a postgres server is + running (on port ??) + database - a database served by the postgres server on <hostname> + api0-table - the name (actually, table_prefix) associated with tables, + created by dbdict.api0. + + experiment-root - a local or network path. Network paths begin with ssh:// + E.g. /tmp/blah + ssh://mammouth:blah + ssh://foo@linux.org:/tmp/blah + +Experiment-root is used to store the file results of experiments. If a job with a given <id> +creates file 'a.txt', and directory 'b' with file 'foo.py', then these will be rsync'ed to the +experiment-root when job <id> has finished running. They will be found here: + + <experiment-root>/<db>/<api0-table>/<id>/workdir/a.txt + <experiment-root>/<db>/<api0-table>/<id>/workdir/b/foo.py + +Files 'stdout', 'stderr', and 'state.py' will be created. + + <experiment-root>/<db>/<api0-table>/<id>/stdout - opened for append + <experiment-root>/<db>/<api0-table>/<id>/stderr - opened for append + <experiment-root>/<db>/<api0-table>/<id>/state.py - overwritten with database version + +If a job is restarted or resumed, then those files are rsync'ed back to the current working +directory, and stdout and stderr are re-opened for appending. When a resumed job stops for +the second (or more) time, the cwd is rsync'ed back to the experiment-root. In this way, the +experiment-root accumulates the results of experiments that run. + +""" + +STATUS = 'dbdict_sql_status' +PRIORITY = 'dbdict_sql_priority' +HOST = 'dbdict_sql_hostname' +HOST_WORKDIR = 'dbdict_sql_host_workdir' +PUSH_ERROR = 'dbdict_sql_push_error' + +START = 0 +"""dbdict_status == START means a experiment is ready to run""" + +RUNNING = 1 +"""dbdict_status == RUNNING means a experiment is running on dbdict_hostname""" + +DONE = 2 +"""dbdict_status == DONE means a experiment has completed (not necessarily successfully)""" + +RESTART_PRIORITY = 2.0 +"""Stopped experiments are marked with this priority""" + + +def postgres_serial(user, password, host, database, **kwargs): + """Return a DbHandle instance that communicates with a postgres database at transaction + isolation_level 'SERIALIZABLE'. + + :param user: a username in the database + :param password: the password for the username + :param host: the network address of the host on which the postgres server is running + :param database: a database served by the postgres server + + """ + this = postgres_serial + + if not hasattr(this,'engine'): + def connect(): + c = psycopg2.connect(user=user, password=password, database=database, host=host) + c.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE) + return c + pool_size = 0 + this.engine = create_engine('postgres://' + ,creator=connect + ,pool_size=0 # should force the app release connections + ) + + db = db_from_engine(this.engine, **kwargs) + db._is_serialized_session_db = True + return db + + +def add_experiments_to_db(exp_cls, jobs, db, verbose=0, add_dups=False, type_check=None): + """Add experiments paramatrized by exp_cls and jobs[i] to database db. + + Default behaviour is to ignore jobs which are already in the database. + + If type_check is a class (instead of None) then it will be used as a type declaration for + all the elements in each job dictionary. For each key,value pair in the dictionary, there + must exist an attribute,value pair in the class meeting the following criteria: + the attribute and the key are equal, and the types of the values are equal. + + :param exp_cls: The Experiment class to run these experiments. + :param jobs: The parameters of experiments to run. + :type jobs: an iterable object over dictionaries + :param verbose: print which jobs are added and which are skipped + :param add_dups: False will ignore a job if it matches (on all items()) with a db entry. + :type add_dups: Bool + + :returns: list of (Bool,job[i]) in which the flags mean the corresponding job actually was + inserted. + + """ + rval = [] + for job in jobs: + job = copy.copy(job) + do_insert = add_dups or (None is db.query(**job).first()) + + if do_insert: + if type_check: + for k,v in job.items(): + if type(v) != getattr(type_check, k): + raise TypeError('Experiment contains value with wrong type', + ((k,v), getattr(type_check, k))) + + job[STATUS] = START + job[SYMBOL] = exp_cls.__name__ + job[MODULE] = exp_cls.__module__ + job[PRIORITY] = 1.0 + if verbose: + print 'ADDING ', job + db.insert(job) + rval.append((True, job)) + else: + if verbose: + print 'SKIPPING', job + rval.append((False, job)) + + +def book_dct_postgres_serial(db, retry_max_sleep=10.0, verbose=0): + """Find a trial in the lisa_db with status START. + + A trial will be returned with dbdict_status=RUNNING. + + Returns None if no such trial exists in DB. + + This function uses a serial access to the lisadb to guarantee that no other + process will retrieve the same dct. It is designed to facilitate writing + a "consumer" for a Producer-Consumer pattern based on the database. + + """ + print >> sys.stderr, """#TODO: use the priority field, not the status.""" + print >> sys.stderr, """#TODO: ignore entries with key PUSH_ERROR.""" + + s = db._session + + # NB. we need the query and attribute update to be in the same transaction + assert s.autocommit == False + + dcts_seen = set([]) + keep_trying = True + + dct = None + while (dct is None) and keep_trying: + #build a query + q = s.query(db._Dict) + q = q.options(eagerload('_attrs')) #hard-coded in api0 + q = q.filter(db._Dict._attrs.any(name=STATUS, fval=START)) + + #try to reserve a dct + try: + #first() may raise psycopg2.ProgrammingError + dct = q.first() + + if dct is not None: + assert (dct not in dcts_seen) + if verbose: print 'book_unstarted_dct retrieved, ', dct + dct._set_in_session(STATUS, RUNNING, s) + if 1: + if _TEST_CONCURRENCY: + print >> sys.stderr, 'SLEEPING BEFORE BOOKING' + time.sleep(10) + + #commit() may raise psycopg2.ProgrammingError + s.commit() + else: + print >> sys.stderr, 'DEBUG MODE: NOT RESERVING JOB!', dct + #if we get this far, the job is ours! + else: + # no jobs are left + keep_trying = False + except (psycopg2.OperationalError, + sqlalchemy.exceptions.ProgrammingError), e: + #either the first() or the commit() raised + s.rollback() # docs say to do this (or close) after commit raises exception + if verbose: print 'caught exception', e + if dct: + # first() succeeded, commit() failed + dcts_seen.add(dct) + dct = None + wait = numpy.random.rand(1)*retry_max_sleep + if verbose: print 'another process stole our dct. Waiting %f secs' % wait + time.sleep(wait) + return dct + +def book_dct(db): + print >> sys.stderr, """#TODO: use the priority field, not the status.""" + print >> sys.stderr, """#TODO: ignore entries with key self.push_error.""" + + return db.query(dbdict_status=START).first() + +def parse_dbstring(dbstring): + postgres = 'postgres://' + assert dbstring.startswith(postgres) + dbstring = dbstring[len(postgres):] + + #username_and_password + colon_pos = dbstring.find('@') + username_and_password = dbstring[:colon_pos] + dbstring = dbstring[colon_pos+1:] + + colon_pos = username_and_password.find(':') + if -1 == colon_pos: + username = username_and_password + password = None + else: + username = username_and_password[:colon_pos] + password = username_and_password[colon_pos+1:] + + #hostname + colon_pos = dbstring.find('/') + hostname = dbstring[:colon_pos] + dbstring = dbstring[colon_pos+1:] + + #dbname + colon_pos = dbstring.find('/') + dbname = dbstring[:colon_pos] + dbstring = dbstring[colon_pos+1:] + + #tablename + tablename = dbstring + + if password is None: + password = open(os.getenv('HOME')+'/.dbdict_%s'%dbname).readline()[:-1] + if False: + print 'USERNAME', username + print 'PASS', password + print 'HOST', hostname + print 'DB', dbname + print 'TABLE', tablename + + return username, password, hostname, dbname, tablename + +class ExperimentLocation(object): + def __init__(self, root, dbname, tablename, id): + ssh_prefix='ssh://' + if root.startswith(ssh_prefix): + root = root[len(ssh_prefix):] + #at_pos = root.find('@') + colon_pos = root.find(':') + self.host, self.path = root[:colon_pos], root[colon_pos+1:] + else: + self.host, self.path = '', root + self.dbname = dbname + self.tablename = tablename + self.id = id + + def rsync(self, direction): + """The directory at which experiment-related files are stored. + + :returns: "<host>:<path>", of the sort used by ssh and rsync. + """ + path = os.path.join( + ':'.join([self.host, self.path]), + self.dbname, + self.tablename, + self.id) + + if direction == 'push': + rsync_cmd = 'rsync -r * "%s"' % path + elif direction == 'pull': + rsync_cmd = 'rsync -r "%s/*" .' % path + else: + raise Exception('invalid direction', direction) + + rsync_rval = os.system(rsync_cmd) + if rsync_rval != 0: + raise Exception('rsync failure', (rsync_rval, rsync_cmd)) + + def pull(self): + return self.rsync('pull') + + def push(self): + return self.rsync('push') + + def touch(self): + host = self.host + path = os.path.join(self.path, self.dbname, self.tablename, self.id) + ssh_cmd = ('ssh %(host)s "mkdir -p \'%(path)s\' && cd \'%(path)s\' ' + '&& touch stdout stderr && mkdir -p workdir"' % locals()) + ssh_rval = os.system(ssh_cmd) + if 0 != ssh_rval: + raise Exception('ssh failure', (ssh_rval, ssh_cmd)) + + def delete(self): + #something like ssh %s 'rm -Rf %s' should work, but it's pretty scary... + raise NotImplementedError() + +def run_sql(): + try: + username, password, hostname, dbname, tablename = parse_dbstring(sys.argv.pop(0)) + except Exception, e: + print >> sys.stderr, e + print >> sys.stderr, _help + raise + + #set experiment_root + try: + exproot = sys.argv.pop(0) + except: + exproot = os.getcwd() + + #TODO: THIS IS A GOOD IDEA RIGHT? + # It makes module-lookup work based on cwd-relative paths + # But possibly has really annoying side effects? Is there a cleaner + # way to change the import path just for loading the experiment class? + sys.path.insert(0, os.getcwd()) + + #TODO: refactor this so that we can use any kind of database (not just postgres) + + #a serialized session is necessary for jobs not be double-booked by listeners running + #in parallel on the cluster. + db = postgres_serial(user=username, + password=password, + host=hostname, + database=dbname, + table_prefix=tablename) + + while True: + dct = book_dct_postgres_serial(db, verbose=1) + if dct is None: + break + + try: + # + # chdir to a temp folder + # + workdir = tempfile.mkdtemp() + print >> sys.stderr, "INFO RUNNING ID %i IN %s" % (dct.id, workdir) + os.chdir(workdir) + + # not sure where else to put this... + dct[HOST] = socket.gethostname() + dct[HOST_WORKDIR] = workdir + + exploc = ExperimentLocation(exproot, dbname, tablename, str(dct.id)) + + # + # pull cwd contents + # + exploc.touch() + exploc.pull() + + # + # run the experiment + # + old_stdout, old_stderr = sys.stdout, sys.stderr + sys.stdout, sys.stderr = open('stdout', 'a+'), open('stderr', 'a+') + assert RUNNING == dct[STATUS] #set by get_dct + os.chdir(os.path.join(workdir, 'workdir')) + try: + run_rval = run_state(DictProxyState(dct)) + except Exception, e: + run_rval = COMPLETE + print >> sys.stderr, 'Exception:', e + print >> old_stderr, '#TODO: print a bigger traceback to stderr' + sys.stdout, sys.stderr = old_stdout, old_stderr + + # + # push the results back to the experiment_root + # + try: + os.chdir(workdir) + #pickle the state #TODO: write it human-readable + #cPickle.dump(dict((k,v) for k,v in dct.items()), open('state.pickle','w')) + save_items(dct.items(), open('state.py', 'w')) + exploc.push() + except Exception, e: + dct[PUSH_ERROR] = str(e) + raise + + # Cleanup the tempdir + # TODO: put this in a 'finally' block? + # + shutil.rmtree(workdir, ignore_errors=True) + + except: + dct[STATUS] = DONE + dct[PRIORITY] = None + raise + + if run_rval is INCOMPLETE: + #mark the job as needing restarting + dct[STATUS] = START + dct[PRIORITY] = RESTART_PRIORITY + else: + #mark the job as being done + dct[STATUS] = DONE + dct[PRIORITY] = None + + break # don't actually loop + +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/pylearn/dbdict/dconfig.py Wed Nov 12 22:00:20 2008 -0500 @@ -0,0 +1,145 @@ +import sys +# +# +# Utility +# +# + +class Config (object): + """A class to store experiment configurations. + + Configuration variables are stored in class instance __dict__. This class + ensures that keys are alphanumeric strings, and that values can be rebuilt + from their representations. + + It can be serialized to/from a python file. + + """ + def __init__(self, __dict__=None, **kwargs): + if __dict__: + Config.__checkkeys__(*__dict__.keys()) + self.__dict__.update(__dict__) + if kwargs: + Config.__checkkeys__(*kwargs.keys()) + self.__dict__.update(kwargs) + + def __setattr__(self, attr, value): + Config.__checkkeys__(attr) + self.__dict__[attr] = value + + def __hash__(self): + """Compute a hash string from a dict, based on items(). + + @type dct: dict of hashable keys and values. + @param dct: compute the hash of this dict + + @rtype: string + + """ + items = list(self.items()) + items.sort() + return hash(repr(items)) + + def keys(self): + return self.__dict__.keys() + + def items(self): + return self.__dict__.items() + + def update(self, dct): + Config.__checkkeys__(*dct.keys()) + self.__dict__.update(dct) + + def save_file(self, f=sys.stdout): + items = self.items() + items.sort() + save_dictlike(items, f) + + def save(self, filename): + """Write a python file as a way of serializing a dictionary + + @type filename: string + @param filename: filename to open (overwrite) to save dictionary contents + + @return None + + """ + f = open(filename, 'w') + self.save_file(f) + f.close() + + def update_fromfile(self, filename): + """Read the local variables from a python file + + @type filename: string, filename suitable for __import__() + @param filename: a file whose module variables will be returned as a + dictionary + + @rtype: dict + @return: the local variables of the imported filename + + @note: + This implementation silently ignores all module symbols that don't meet + the standards enforced by L{Config.__checkkeys__}. This is meant to + ignore module special variables. + + """ + #m = __import__(filename) #major security problem, or feature? + f = open(filename) + for line in f: + if line.startswith('#'): + continue + line = line[:-1] #trim the '\n' + tokens = line.split(' ') + try: + key = tokens[0] + assert '=' == tokens[1] + except: + raise Exception('trouble parsing line:', line) + repr_val = ' '.join(tokens[2:]) + val = eval(repr_val) + setattr(self, key, val) + + @staticmethod + def __checkkeys__(*args): + conf = Config() + for k in args: + #must be string + if type(k) != str: + raise KeyError(k) + #mustn't be part of Config class interface + if hasattr(conf, k): + raise KeyError(k) + #all alphanumeric + for c in k: + if c not in ('abcdefghijklmnopqrstuvwxyz' + 'ABCDEFGHIJKLMNOPQRSTUVWXYZ' + '_0123456789'): + raise KeyError(k) + #no symbols that look like reserved symbols + if k[:2]=='__' and k[-2:]=='__' and len(k)>4: + raise KeyError(k) + +def load(*filenames): + rval = Config() + for filename in filenames: + rval.update_fromfile(filename) + return rval + +def save_items(items, f): + """Write a python file as a way of serializing a dictionary + + @type f: writable file-like object + @param f: the attributes of this object will be written here + + @return None + + """ + print >> f, "#File generated by dbdict.dconfig.save_items" + for key, val in items: + Config.__checkkeys__(key) + # should never raise... illegal keys are not + # supposed to ever get into the dictionary + repr_val = repr(val) + assert val == eval(repr_val) + print >> f, key, '=', repr_val
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/pylearn/dbdict/design.txt Wed Nov 12 22:00:20 2008 -0500 @@ -0,0 +1,316 @@ + +TODO +==== + +- Class interface for jobs (SHAPING UP) + +- How exactly will state work (STILL A MYSTERY sql vs. file vs. script) + - How to insert jobs + - How to retrieve jobs + +- How will the driver work (rsync / file conventions) + + + +Components +========== + +DbDict +------ + +A DbDict is a restricted dictionary. + +It's keys are restricted to simple types, simple enough to be stored into a +database. Strings, integers, floating-points. + + +Experiment +---------- + +An Experiment is a python class with methods like start(), run(), stop(). A +user should make an Experiment class whose run() method does some computations +like training a neural network and measuring test errors, training an SVM, +doing some feature extraction, etc. + +An Experiment's arguments are stored in a DbDict. + +The results of an experiment can be stored in two places: in a DbDict (for +easy SQL searching later on) and in files. + + +ExperimentLauncher +------------------ + +Experiments are not standalone executables, they need to be run by some +additional code. There are several ways to launch an experiment: + +- running on your local machine vs. cluster vs. condor vs. mammouth + +- loading the arguments from the commandline vs. file vs. SQL query + +- running alone, or running in parallel with other experiments. + +The ExperimentLauncher is an executable python program that can run +experiments in these different ways. + + + +Workflow (How to run experiments) +================================= + +First, you'll need a Python program to compute what you want. Think about how +long it will take and whether you will want to be able to resume after being +stopped. + +If you want to run a simple or short experiment that is not restartable, +simply extend dbdict.Experiment like this: + +.. code-block:: python + + class MyExperiment(dbdict.Experiment): + + def __init__(self, dct): + self.dct = dct + + def run(self): + + # compute something + result = self.dct.a * self.dct.b + + # store it + self.dct.result = result + + # write some other results to a file + f = open('some_file', 'w') + print >> f, "hello from the job?" + + # return False or None to indicate the job is done + +If you want to run a more elaborate experiment, then extend dbdict.Experiment +like this: + +.. code-block:: python + + class MyRestartableExperiment(dbdict.Experiment): + + def __init__(self, state): + """Called once per lifetime of the class instance. Can be used to + create new jobs and save them to the database. This function will not + be called when a Job is retrieved from the database. + + Parent creates keys: dbdict_id, dbdict_module, dbdict_symbol, dbdict_state. + + dbdict_state = NEW (possibilities: NEW, RUNNING, DONE) + + """ + + def start(self): + """Called once per lifetime of the compute job. + + This is a good place to initialize internal variables. + + After this function returns, either stop() or run() will be called. + + dbdict_state -> RUNNING + + """ + + def resume(self): + """Called to resume computations on a previously stop()'ed job. The + os.getcwd() is just like it was after some previous stop() command. + + This is a good place to load internal variables from os.getcwd(). + + dbdict_state -> RUNNING + + """ + + def run(self, channel): + """Called after start() or resume(). + + channel() may return different things at different times. + None - run should continue. + 'stop' - the job should save state as soon as possible because + the process may soon be terminated + + When this function returns, dbdict_state -> DONE. + """ + + +Having written your Experiment subclass, you can now test it directly, or +using the ExperimentLauncher. + +.. code-block:: python + + if __name__ == '__main__': + + class Config(object): + a = 3 + b = 50 + + experiment = MyExperiment(Config()) + experiment.run() + +.. code-block:: bash + + dbdict-run my_file.MyExperiment 'dict(a=3, b=50)' + + +To run a batch of experiments on a cluster using 3 concurrent processes using +the SQL-based job list, you have to insert the jobs into a database table, and +then launch 10 instances of dbdict-run. + +.. code-block:: python + + from dbdict.tools import insert, view_table + + def add_jobs(): + experiment_module = 'my_repo.some_file' #match you module name + experiment_symbol = 'MyExperiment' #match your class name + experiment_name = 'showing some stuff' + for a in 3, 4, 5: + for b in 0.0, 0.1, 3.0: + insert(locals(), ignore_duplicates=False) + add_jobs() + + dbdict_run_table('my_launcher_view', experiment_name='showing_some_stuff') + + + +.. code-block:: bash + + dbidispatch --condor dbdict-run sql <dbstring> my_launcher_view + + + +Component Documentation +======================= + +DbDict +------ + + +Experiment +---------- + +class Experiment(object): + + new_stdout == 'job_stdout' + new_stderr == 'job_stderr' + + def remap_stdout(self): + """ + Called before start and resume. + + Parent-class behaviour is to replace sys.stdout with open(self.new_stdout, 'w+'). + + """ + + def remap_stderr(self): + """ + Called before start and resume. + + """ + + def tempdir(self): + """ + Return the recommended filesystem location for temporary files. + + The idea is that this will be a fast, local disk partition, suitable + for temporary storage. + + Files here will not generally be available at the time of resume(). + + The return value of this function may be controlled by one or more + environment variables. + + Will return $DBDICT_EXPERIMENT_TEMPDIR if present. + Failing that, will return $TMP/username-dbdict/hash(self) + Failing that, will return /tmp/username-dbdict/hash(self) + + .. note:: + Maybe we should use Python stdlib's tempdir mechanism. + + """ + + +Environment Variables +===================== + + +TMP - sometimes used by Experiment.tempdir() + +DBDICT_EXPERIMENT_TEMPDIR - used by Experiment.tempdir() + + +JOB DRIVER +---------- + +When a job is started, save: + +- workdir + +- dir + +- network (e.g. mammouth vs. diro... where are jobdir and workdir? + Alternatively, automatically rsync finished jobs' workdir and jobdir to diro.) + + +dbdict-run-job +-------------- + +prepare job dir +~~~~~~~~~~~~~~~~ + +On iro, cd to fringant2/.../job_id/ + +when restarting on mammouth, cd to somewhere and rsync -r fringant2/.../job_id job_id + +after stopping on mammouth rsync -r job_id fringant2/.../job_id + +.. note: + What to do on error in rsync? Where to report problems? + +.. note: + What to do with stderr, stdout? + +job queue table +~~~~~~~~~~~~~~~ +New table: id job_id priority + +This is the table that dbdict-run-job will pop jobs from. + + +How to run experiments using DbDict +----------------------------------- + +- Inherit from tools.Job. This is where you put the logic that takes some + parameters, and computes some results associated with those parameters. + +- Write a loop that creates a bunch of Jobs, and then tests to see if they are + already in the db before inserting them. + +- Insert the new job ids and priority into the qu +- Save result files, and files needed for job resume to os.getcwd(). + +- Save non-result files to a TEMP directory, obtained by e.g. self.gettmpdir() + from inside a Job instance. + +- Load datasets through pylearn. Pylearn has internal configuration that allows + it to find files both on mammouth and DIRO networks. You just call + MNIST.head() or shapeset1.train_valid_test() and bingo. + +- Manage experiments in a postgres database. Which ones have been queued, which + ones are running, which are finished, simple results, etc. + +- To analyze results, select from PG and create a dedicated table in sqlite. + Make columns for all the dictionary keys from all the selected entries. + + + +When running classification experiments +--------------------------------------- + +Have convention of saving (train, valid, test) X (NLL, RATE). +Should the convention be enforced? How? +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/pylearn/dbdict/experiment.py Wed Nov 12 22:00:20 2008 -0500 @@ -0,0 +1,102 @@ + +#These should be +INT = type(0) +FLT = type(0.0) +STR = type('') + +COMPLETE = None #jobs can return this by returning nothing as well +INCOMPLETE = True #jobs can return this and be restarted + + +class Experiment(object): + + new_stdout = 'job_stdout' + new_stderr = 'job_stderr' + + def remap_stdout(self): + """ + Called before start and resume. + + Default behaviour is to replace sys.stdout with open(self.new_stdout, 'w+'). + + """ + if self.new_stdout: + sys.stdout = open(self.new_stdout, 'w+') + + def remap_stderr(self): + """ + Called before start and resume. + + Default behaviour is to replace sys.stderr with open(self.new_stderr, 'w+'). + + """ + if self.new_stderr: + sys.stderr = open(self.new_stderr, 'w+') + + def tempdir(self): + """ + Return the recommended filesystem location for temporary files. + + The idea is that this will be a fast, local disk partition, suitable + for temporary storage. + + Files here will not generally be available at the time of resume(). + + The return value of this function may be controlled by one or more + environment variables. + + Will return $DBDICT_EXPERIMENT_TEMPDIR if present. + Failing that, will return $TMP/username-dbdict/hash(self) + Failing that, will return /tmp/username-dbdict/hash(self) + + .. note:: + Maybe we should use Python stdlib's tempdir mechanism. + + """ + + print >> sys.stderr, "TODO: get tempdir correctly" + return '/tmp/dbdict-experiment' + + + def __init__(self, state): + """Called once per lifetime of the class instance. Can be used to + create new jobs and save them to the database. This function will not + be called when a Job is retrieved from the database. + + Parent creates keys: dbdict_id, dbdict_module, dbdict_symbol, dbdict_status. + + """ + + def start(self): + """Called once per lifetime of the compute job. + + This is a good place to initialize internal variables. + + After this function returns, either stop() or run() will be called. + + dbdict_status -> RUNNING + + """ + + def resume(self): + """Called to resume computations on a previously stop()'ed job. The + os.getcwd() is just like it was after some previous stop() command. + + This is a good place to load internal variables from os.getcwd(). + + dbdict_status -> RUNNING + + """ + return self.start() + + def run(self, channel): + """Called after start() or resume(). + + channel() may return different things at different times. + None - run should continue. + 'stop' - the job should save state as soon as possible because + the process may soon be terminated + + When this function returns, dbdict_status -> DONE. + """ +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/pylearn/dbdict/sample_create_jobs.py Wed Nov 12 22:00:20 2008 -0500 @@ -0,0 +1,135 @@ +import sys, subprocess + +from .dconfig import Config +from .tools import Job # tools.py + +# +# Models +# + +class OldSampleJob(Job): + """ Sample Job object. Cut and paste this for your own experiments. """ + + # default parameters for the job + # config file parameters must be from this list. + a = 0 + b = 0 + c = 'a' + d = 0.0 + + def run(self, results, dry_run=False): + # attributes that you set in 'results' will be saved to job_results.py, next to + # job_config.py when this function returns. + results.f = self.a * self.b + + # The current working directory (cwd) is writable and readable + # it will also be left alone after the job terminates + f = open('some_file', 'w') + print >> f, "hello from the job?" + + return True #restart this job + + + +class SampleJobState(): + + # default parameters for the job + # config file parameters must be from this list. + a = 0 + b = 0 + c = 'a' + d = 0.0 + + +sample_job_table = Table('my_table_for_testing', metadata, + Column('a', Integer), + Column('b', Float(53))) + +metadata.create_all() + +mapper(SampleJobState, sample_job_table) + +s = SampleJobState() +s.a = 5 +s.b = 8.2 + +if Session().query(SampleJobState).filter_by(a=5, b=8.2).any(): + break; +else: + Session().save(s).commit() + + +class SampleJob(Job): + """ Sample Job object. Cut and paste this for your own experiments. """ + + def __init__(self, state): + pass + + def start(self): + pass + + def resume(self): + pass + + def run(self, switch = lambda : 'continue'): + # attributes that you set in 'results' will be saved to job_results.py, next to + # job_config.py when this function returns. + params.f = params.a * params.b + + + # The current working directory (cwd) is writable and readable + # it will also be left alone after the job terminates + f = open('some_file', 'w') + print >> f, "hello from the job?" + while True: + time.sleep(5) + if switch() == 'stop': + break + + return True #continue running if possible... + + def stop(self): + pass + + + def run(self, state, dry_run=False): + +def some_jobs(a = 0, + b = 2): + job_module = 'dbdict.sample_create_jobs' + job_symbol = 'SampleJob' + for c in ('a', 'b', 'c'): + for d in [0.0, 9.9]: + yield locals() + +def create(generator): + """Create a set of job directories""" + jobs = [] + dispatch = sys.stdout #open('jobs','w') + + for config in generator: + #print ' config', config + configdir = 'job_%016x'% abs(hash(config)) + jobs.append(configdir) + create_dirs = True + dry_run = 0 + if not dry_run: + if create_dirs and subprocess.call(('mkdir', configdir)): + print >> sys.stderr, 'Error creating directory: ', configdir + else: + #no problem creating the directory + config.save(configdir + '/job_config.py') + print >> dispatch, "dbdict-run-job run", configdir + else: + #print configdir + pass + dispatch.close() + +if __name__ == '__main__': + create(some_jobs()) + + j = SampleJob(SampleJobState()) + j.start() + j.run() + +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/pylearn/dbdict/scratch.py Wed Nov 12 22:00:20 2008 -0500 @@ -0,0 +1,451 @@ +import threading, time, commands, os, sys, math, random, datetime + +import psycopg2, psycopg2.extensions +from sqlalchemy import create_engine, desc +from sqlalchemy.orm import sessionmaker +from sqlalchemy import Table, Column, MetaData, ForeignKey +from sqlalchemy import Integer, String, Float, DateTime +from sqlalchemy.orm import mapper, relation, backref, eagerload +from sqlalchemy.sql import operators, select + + +########## +# +# Connection stuff +# +# + +if 0: + _db_host = 'jais.iro.umontreal.ca' + + def _pwd(): + if not hasattr(_pwd,'rval'): + pw_cmd = 'ssh bergstrj@grieg.iro.umontreal.ca cat .lisa_db' + rval = commands.getoutput(pw_cmd) + return rval + + def engine(): + """Create an engine to access lisa_db on gershwin + + This function caches the return value between calls. + """ + if not hasattr(engine,'rval'): + pw = _pwd() + db_str ='postgres://bergstrj:%s@%s/lisa_db' % (pw,_db_host) + echo = False #spews pseudo-sql to stdout + engine.rval = create_engine(db_str + ,pool_size=1 # should force the app release extra connections + # releasing connections should let us schedule more jobs, since each one operates + # autonomously most of the time, just checking the db rarely. + # TODO: optimize this for large numbers of jobs + ,echo=echo + ) + return engine.rval + + def engine_serializable(): + """Create an engine to access lisa_db on gershwin, which uses serializable + transaction mode. + + This function caches the return value between calls. + """ + + this = engine_serializable + + if not hasattr(this,'rval'): + pw = _pwd() + def connect(): + c = psycopg2.connect(user='bergstrj', + password=pw, + database='lisa_db', + host='gershwin.iro.umontreal.ca') + c.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE) + return c + pool_size=0 + this.rval = create_engine('postgres://' + ,creator=connect + ,pool_size=0 # should force the app release connections + ) + return this.rval + + + Session = sessionmaker(bind=engine(), autoflush=True, transactional=True) + SessionSerial = sessionmaker(bind=engine_serializable(), + autoflush=True, transactional=True) + +else: + from sqlalchemy import create_engine + from sqlalchemy.orm import sessionmaker + engine = create_engine('sqlite:///:memory:', echo=False) + Session = sessionmaker(bind=engine, autoflush=True, transactional=True) + SessionSerial = Session + + +################ +# +# Database setup +# +table_prefix='bergstrj_scratch_test_' +metadata = MetaData() + +def table_with_id(name, *args): + return Table(name, metadata, + Column('id', Integer, primary_key=True), + *args) + +t_trial = table_with_id(table_prefix+'trial', + Column('desc', String(256)), #comment: why running this trial? + Column('priority', Float(53)), #aka Double + Column('start', DateTime), + Column('finish', DateTime), + Column('host', String(256))) + +t_keyval = table_with_id(table_prefix+'keyval', + Column('name', String(32), nullable=False), #name of attribute + Column('fval', Float(53)), #aka Double + Column('ival', Integer), + Column('sval', String(256))) #TODO: store text (strings of unbounded length) + +t_trial_keyval = table_with_id(table_prefix+'trial_keyval', + Column('trial_id', Integer, ForeignKey('%s.id' % t_trial)), + Column('keyval_id', Integer, ForeignKey('%s.id' % t_keyval))) + +class _KeyVal(object): + @staticmethod + def cache(name, val, session, create=True): + #TODO: consider using a local cache to remove the need to query db + # this takes advantage of common usage, which is to only add KeyVal + # pairs. + #check if it is in the DB + q = session.query(_KeyVal) + if isinstance(val, float): + q = q.filter_by(name=name, fval=val) + elif isinstance(val, int): + q = q.filter_by(name=name, ival=val) + elif isinstance(val, str): + q = q.filter_by(name=name, sval=val) + else: + raise TypeError(val) + rval = q.first() + if rval is None and create: + rval = _KeyVal(name, val) + session.save_or_update(rval) + return rval + + def __init__(self, name, val): + self.name = name + self.val = val + def __get_val(self): + val = None + if self.fval is not None: val = self.fval + if self.ival is not None: val = self.ival + if self.sval is not None: val = self.sval + return val + def __set_val(self, val): + if isinstance(val, float): + self.fval = val + self.ival = None + self.sval = None + elif isinstance(val, int): + self.fval = None + self.ival = val + self.sval = None + elif isinstance(val, str): + self.fval = None + self.ival = None + self.sval = val + else: + raise TypeError(val) + val = property(__get_val, __set_val) + def __repr__(self): + return "<Param(%s,'%s', %s)>" % (self.id, self.name, repr(self.val)) +mapper(_KeyVal, t_keyval) + + +################### +# +# Job interface +# + +class Trial(object): + _native_cols = 'desc', 'priority', 'start', 'finish', 'host' + + #TODO: remove these forbidden keynames, and let all keynames work properly + _forbidden_keynames = set(['filter', 'desc', + 'priority', 'start', 'finish', 'host', + 'create', 'max_sleep', 'max_retry', 'session', 'c', + 'abort', 'complete']) + + #TODO: unittest cases to verify that having these kinds of keys is OK + + class ReserveError(Exception): """reserve failed""" + + @staticmethod + def filter(session=None, **kwargs): + """Construct a query for Trials. + + @param kwargs: each (kw,arg) pair in kwargs, will restrict the list of + Trials to those such that the 'kw' attr has been associated with the job, + and it has value 'arg' + + @return SqlAlchemy query object + + @note: will raise TypeError if any arg in kwargs has a type other than + float, int or string. + + """ + if session is None: + session = Session() + q = session.query(Trial) + for col in Trial._native_cols: + if col in kwargs: + q = q.filter_by(**{col:kwargs[col]}) + del kwargs[col] + for kw, arg in kwargs.items(): + if isinstance(arg, float): + q = q.filter(Trial._attrs.any(name=kw, fval=arg)) + elif isinstance(arg, int): + q = q.filter(Trial._attrs.any(name=kw, ival=arg)) + elif isinstance(arg, str): + q = q.filter(Trial._attrs.any(name=kw, sval=arg)) + else: + raise TypeError(arg) + return q + + @staticmethod + def reserve_unique(query_fn, max_retry=10, max_sleep=5.0): + """Reserve an un-reserved job. + + @param query_fn: build the query for the trial to reserve (see + L{reserve_unique_kw} for example usage). + + @param max_retry: try this many times to reserve a job before raising an exception + + @param max_sleep: L{time.sleep} up to this many seconds between retry attempts + + @return a trial which was reserved (uniquely) by this function call. If + no matching jobs remain, return None. + + """ + s = SessionSerial() + retry = max_retry + trial = None + + while (trial is None) and retry: + + q = query_fn(s.query(Trial)) + q = q.options(eagerload('_attrs')) #TODO is this a good idea? + + trial = q.first() + if trial is None: + return None # no jobs match the query + else: + try: + trial.reserve(session=s) + except Trial.ReserveError, e: + s.rollback() + waittime = random.random() * max_sleep + if debug: print 'another process stole our trial. Waiting %f secs' % wait + time.sleep(waittime) + retry -= 1 + if trial: + s.expunge(trial) + s.close() + return trial + + @staticmethod + def reserve_unique_kw(max_retry=10, max_sleep=5.0, **kwargs): + """Call reserve_unique with a query function that matches jobs with + attributes (and values) given by kwargs. Results are sorted by + priority. + + """ + assert 'start' not in kwargs + assert 'query_fn' not in kwargs + def query_fn(q): + q = q.filter_by(start=None, + **kwargs).order_by(desc(Trial.c.priority)) + return q + + + return Trial.reserve_unique(query_fn, max_retry, max_sleep) + + def __init__(self, desc=None, priority=None, start=None, finish=None, host=None, **kwargs): + self.desc = desc + self.priority = priority + self.start = start + self.finish = finish + self.host = host + self.attrs.update(kwargs) + def __repr__(self): + return "<Trial(%s, '%s', %s, '%s', %s, %s)>" \ + %(str(self.id), self.desc, self.priority, self.host, + self.start, self.finish) + + def _get_attrs(self): + #This bit of code makes it so that you can type something like: + # + # trial.attrs.score = 50 + # + # It will use the self._attrs list of _KeyVal instances as a backend, + # because these pseudo-attributes (here 'score') are not stored in any + # object's __dict__. + class AttrCatcher(object): + #TODO: make these methods faster with better data structures + def __getattr__(attr_self, attr): + attrlist = self._attrs + for i,a in enumerate(attrlist): + if a.name == attr: + return a.val + raise AttributeError(attr) + def __setattr__(attr_self, attr, val): + n = 0 + s = Session() + assert attr not in Trial._forbidden_keynames + for i,a in enumerate(self._attrs): + if a.name == attr: + attrlist[i] = _KeyVal.cache(attr,val, s) + n += 1 + assert n <= 1 + if n == 0: + self._attrs.append(_KeyVal.cache(attr,val, s)) + s.commit() + def __iter__(_self): + def gen(): + return self._attrs.__iter__() + return gen() + def update(attr_self, dct): + for k,v in dct.items(): + setattr(attr_self, k, v) + + #we can't add attributes to self, so just do this... + return AttrCatcher() #seriously, allocate a new one each time + attrs = property(_get_attrs, doc = ("Provide attribute-like access to the" + " key-value pairs associated with this trial")) + def reserve(self, session): #session should have serialized isolation mode + """Reserve the job for the current process, to the exclusion of all + other processes. In other words, lock it.""" + + serial_self = session.query(Trial).get(self.id) + if serial_self.start is not None: + raise Trial.ReserveError(self.host) + serial_self.start = datetime.datetime.now() + serial_self.host = 'asdf' #TODO: get hostname + try: + session.commit() + except Exception, e: + # different db backends raise different exceptions when a + # commit fails, so here we just treat all problems the same + #s.rollback() # doc says rollback or close after commit exception + session.close() + raise Trial.ReserveError(self.host) + + #Session().refresh(self) #load changes to serial_self into self + + def abort(self): + """Reset job to be available for reservation. + + @return None + + @note: Raises exception if job is finished + + """ + if self.finish is not None: + raise Exception('wtf?') + self.start = None + self.host = None + s = Session() + s.save_or_update(self) + s.commit() + + def complete(self, **kwargs): + """Mark job self as finished and update attrs with kwargs.""" + self.attrs.update(kwargs) + self.finish = datetime.datetime.now() + s = Session() + s.save_or_update(self) + s.commit() + +mapper(Trial, t_trial, + properties = { + '_attrs': relation(_KeyVal, + secondary=t_trial_keyval, + cascade="delete-orphan") + }) + +metadata.create_all(engine) # does nothing when tables already exist + + + + +############ +# +# Test Stuff +# +# + +if __name__ == '__main__': + s = Session() + + def add_some_jobs(): + dvalid, dtest = 'dvalid', 'dtest file' + desc = 'debugging' + + def blah(): + for lr in [0.001, 0.01]: + for scale in [0.0001 * math.sqrt(10.0)**i for i in range(4)]: + for rng_seed in [4, 5, 6]: + for priority in [None, 1, 2]: + yield dict(locals()) + + for kwargs in blah(): + t = Trial(desc=desc, dvalid=dvalid, dtest=dtest, **kwargs) + s.save(t) + + def work(jobtype): + try: + jid = reserve(jobtype) + except StopIteration: + return + + def blah(*args): + print 'blah: ', args + dct = get_next() + + add_some_jobs() + + if 0: + for t in s.query(Trial): + print 'saved:', t, [a.name for a in list(t.attrs)] + + def yield_unique_jobs(): + while True: + rval = Trial.reserve_unique_kw() + if rval is None: + break + else: + yield rval + + if 0: + for job in yield_unique_jobs(): + print 'yielded job', job + job.complete(score=random.random()) + + if 0: + for t in s.query(Trial): + print 'final:', t, [a.name for a in list(t.attrs)] + + q = s.query(Trial) + #q = q.order_by(Trial.attrs) + q = q.filter(Trial._attrs.any(name='rng_seed', ival=5)) + #q = q.filter(Trial._attrs.all(name='lr').order_by(_KeyVal.c.fval)) + + print dir(q) + + + print q + for t in q.all(): + print 'final:', t, [a.name for a in list(t.attrs)] + t.homebrew = 6 + + +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/pylearn/dbdict/sql_commands.py Wed Nov 12 22:00:20 2008 -0500 @@ -0,0 +1,36 @@ + +def crazy_sql_command(viewname, cols, keytab, linktab, id_col='id', dict_id='dict_id', pair_id='pair_id'): + + #create or replace view expview as select * from (select id as v1_id, value as nhid from + #test0 where name='nhid') nhid LEFT OUTER JOIN (select id as v2_id, value as lrate from + #test0 where name='lrate') lrate on nhid.v1_id = lrate.v2_id; + + col_queries = [] + colname0 = None + for i, (colname, table_col) in enumerate(cols): + if i == 0: + q = """(select %(dict_id)s as v%(i)s_id, %(table_col)s as %(colname)s + from \"%(keytab)s\", \"%(linktab)s\" + where name='%(colname)s' + and \"%(keytab)s\".%(id_col)s = \"%(linktab)s\".%(pair_id)s) + %(colname)s """ % locals() + colname0 = colname + else: + q = """ LEFT OUTER JOIN (select %(dict_id)s as v%(i)s_id, %(table_col)s as %(colname)s + from \"%(keytab)s\", \"%(linktab)s\" + where name='%(colname)s' + and \"%(keytab)s\".%(id_col)s = \"%(linktab)s\".%(pair_id)s) + %(colname)s + on %(colname0)s.v0_id = %(colname)s.v%(i)s_id""" % locals() + col_queries.append(q) + + header = " create or replace view %s as select %s.v0_id as id, %s from " \ + % (viewname, colname0, (", ".join([c[0] for c in cols]))) + + rval = header + "\n".join(col_queries) + + return rval + + +#print crazy_sql_command('test0', 'expview', (('nhid', 'value'), ('lrate', 'value'), ('a','value'))) +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/pylearn/dbdict/test_api0.py Wed Nov 12 22:00:20 2008 -0500 @@ -0,0 +1,351 @@ +from api0 import * +import threading, time, commands, os, sys, math, random, datetime + +import psycopg2, psycopg2.extensions +from sqlalchemy import create_engine, desc +from sqlalchemy.orm import sessionmaker +from sqlalchemy import Table, Column, MetaData, ForeignKey +from sqlalchemy import Integer, String, Float, DateTime, Text, Binary +from sqlalchemy.orm import mapper, relation, backref, eagerload +from sqlalchemy.sql import operators, select + +import unittest + +class T(unittest.TestCase): + + def test_bad_dict_table(self): + """Make sure our crude version of schema checking kinda works""" + engine = create_engine('sqlite:///:memory:', echo=False) + Session = sessionmaker(bind=engine, autoflush=True, transactional=True) + + table_prefix='bergstrj_scratch_test_' + metadata = MetaData() + t_trial = Table(table_prefix+'trial', metadata, + Column('id', Integer, primary_key=True), + Column('desc', String(256)), #comment: why running this trial? + Column('priority', Float(53)), #aka Double + Column('start', DateTime), + Column('finish', DateTime), + Column('host', String(256))) + metadata.create_all(engine) + + try: + h = DbHandle(None, t_trial, None, None) + except ValueError, e: + if e[0] == DbHandle.e_bad_table: + return + self.fail() + + + def go(self): + """Create tables and session_maker""" + engine = create_engine('sqlite:///:memory:', echo=False) + Session = sessionmaker(autoflush=True, transactional=True) + + table_prefix='bergstrj_scratch_test_' + metadata = MetaData() + + t_trial = Table(table_prefix+'trial', metadata, + Column('id', Integer, primary_key=True), + Column('create', DateTime), + Column('write', DateTime), + Column('read', DateTime)) + + t_keyval = Table(table_prefix+'keyval', metadata, + Column('id', Integer, primary_key=True), + Column('name', String(32), nullable=False), #name of attribute + Column('ntype', Integer), + Column('fval', Float(53)), #aka Double + Column('sval', Text), #aka Double + Column('bval', Binary)) #TODO: store text (strings of unbounded length) + + t_trial_keyval = Table(table_prefix+'trial_keyval', metadata, + Column('dict_id', Integer, ForeignKey('%s.id' % t_trial), + primary_key=True), + Column('pair_id', Integer, ForeignKey('%s.id' % t_keyval), + primary_key=True)) + + metadata.bind = engine + metadata.create_all() # does nothing when tables already exist + + self.engine = engine + return Session, t_trial, t_keyval, t_trial_keyval + + + def test_insert_save(self): + + Session, t_dict, t_pair, t_link = self.go() + + db = DbHandle(*self.go()) + + def jobs(): + dvalid, dtest = 'dvalid', 'dtest file' + desc = 'debugging' + for lr in [0.001]: + for scale in [0.0001 * math.sqrt(10.0)**i for i in range(4)]: + for rng_seed in [4, 5, 6]: + for priority in [None, 1]: + yield dict(locals()) + + jlist = list(jobs()) + assert len(jlist) == 1*4*3*2 + for i, dct in enumerate(jobs()): + t = db.insert(**dct) + + #make sure that they really got inserted into the db + orig_keycount = db._session.query(db._KeyVal).count() + self.failUnless(orig_keycount > 0, orig_keycount) + + orig_dctcount = Session().query(db._Dict).count() + self.failUnless(orig_dctcount ==len(jlist), orig_dctcount) + + orig_keycount = Session().query(db._KeyVal).count() + self.failUnless(orig_keycount > 0, orig_keycount) + + #queries + q0list = list(db.query().all()) + q1list = list(db.query()) + q2list = list(db) + + self.failUnless(q0list == q1list, (q0list,q1list)) + self.failUnless(q0list == q2list, (q0list,q1list)) + + self.failUnless(len(q0list) == len(jlist)) + + for i, (j, q) in enumerate(zip(jlist, q0list)): + jitems = list(j.items()) + qitems = list(q.items()) + jitems.sort() + qitems.sort() + if jitems != qitems: + print i + print jitems + print qitems + self.failUnless(jitems == qitems, (jitems, qitems)) + + def test_query_0(self): + Session, t_dict, t_pair, t_link = self.go() + + db = DbHandle(*self.go()) + + def jobs(): + dvalid, dtest = 'dvalid', 'dtest file' + desc = 'debugging' + for lr in [0.001]: + for scale in [0.0001 * math.sqrt(10.0)**i for i in range(4)]: + for rng_seed in [4, 5, 6]: + for priority in [None, 1]: + yield dict(locals()) + + jlist = list(jobs()) + assert len(jlist) == 1*4*3*2 + for i, dct in enumerate(jobs()): + t = db.insert(**dct) + + qlist = list(db.query(rng_seed=5)) + self.failUnless(len(qlist) == len(jlist)/3) + + jlist5 = [j for j in jlist if j['rng_seed'] == 5] + + for i, (j, q) in enumerate(zip(jlist5, qlist)): + jitems = list(j.items()) + qitems = list(q.items()) + jitems.sort() + qitems.sort() + if jitems != qitems: + print i + print jitems + print qitems + self.failUnless(jitems == qitems, (jitems, qitems)) + + def test_delete_keywise(self): + Session, t_dict, t_pair, t_link = self.go() + + db = DbHandle(*self.go()) + + def jobs(): + dvalid, dtest = 'dvalid', 'dtest file' + desc = 'debugging' + for lr in [0.001]: + for scale in [0.0001 * math.sqrt(10.0)**i for i in range(4)]: + for rng_seed in [4, 5, 6]: + for priority in [None, 1]: + yield dict(locals()) + + jlist = list(jobs()) + assert len(jlist) == 1*4*3*2 + for i, dct in enumerate(jobs()): + t = db.insert(**dct) + + orig_keycount = Session().query(db._KeyVal).count() + + del_count = Session().query(db._KeyVal).filter_by(name='rng_seed', + fval=5.0).count() + self.failUnless(del_count == 8, del_count) + + #delete all the rng_seed = 5 entries + qlist_before = list(db.query(rng_seed=5)) + for q in qlist_before: + del q['rng_seed'] + + #check that it's gone from our objects + for q in qlist_before: + self.failUnless('rng_seed' not in q) #via __contains__ + self.failUnless('rng_seed' not in q.keys()) #via keys() + exc=None + try: + r = q['rng_seed'] # via __getitem__ + print 'r,', r + except KeyError, e: + pass + + #check that it's gone from dictionaries in the database + qlist_after = list(db.query(rng_seed=5)) + self.failUnless(qlist_after == []) + + #check that exactly 8 keys were removed + new_keycount = Session().query(db._KeyVal).count() + self.failUnless(orig_keycount == new_keycount + 8, (orig_keycount, + new_keycount)) + + #check that no keys have rng_seed == 5 + gone_count = Session().query(db._KeyVal).filter_by(name='rng_seed', + fval=5.0).count() + self.failUnless(gone_count == 0, gone_count) + + + def test_delete_dictwise(self): + Session, t_dict, t_pair, t_link = self.go() + + db = DbHandle(*self.go()) + + def jobs(): + dvalid, dtest = 'dvalid', 'dtest file' + desc = 'debugging' + for lr in [0.001]: + for scale in [0.0001 * math.sqrt(10.0)**i for i in range(4)]: + for rng_seed in [4, 5, 6]: + for priority in [None, 1]: + yield dict(locals()) + + jlist = list(jobs()) + assert len(jlist) == 1*4*3*2 + for i, dct in enumerate(jobs()): + t = db.insert(**dct) + + orig_keycount = Session().query(db._KeyVal).count() + orig_dctcount = Session().query(db._Dict).count() + self.failUnless(orig_dctcount == len(jlist)) + + #delete all the rng_seed = 5 dictionaries + qlist_before = list(db.query(rng_seed=5)) + for q in qlist_before: + q.delete() + + #check that the right number has been removed + post_dctcount = Session().query(db._Dict).count() + self.failUnless(post_dctcount == len(jlist)-8) + + #check that the remaining ones are correct + for a, b, in zip( + [j for j in jlist if j['rng_seed'] != 5], + Session().query(db._Dict).all()): + self.failUnless(a == b) + + #check that the keys have all been removed + n_keys_per_dict = 8 + new_keycount = Session().query(db._KeyVal).count() + self.failUnless(orig_keycount - 8 * n_keys_per_dict == new_keycount, (orig_keycount, + new_keycount)) + + + def test_setitem_0(self): + Session, t_dict, t_pair, t_link = self.go() + + db = DbHandle(*self.go()) + + b0 = 6.0 + b1 = 9.0 + + job = dict(a=0, b=b0, c='hello') + + dbjob = db.insert(**job) + + dbjob['b'] = b1 + + #check that the change is in db + qjob = Session().query(db._Dict).filter(db._Dict._attrs.any(name='b', + fval=b1)).first() + self.failIf(qjob is dbjob) + self.failUnless(qjob == dbjob) + + #check that the b:b0 key is gone + count = Session().query(db._KeyVal).filter_by(name='b', fval=b0).count() + self.failUnless(count == 0, count) + + #check that the b:b1 key is there + count = Session().query(db._KeyVal).filter_by(name='b', fval=b1).count() + self.failUnless(count == 1, count) + + def test_setitem_1(self): + """replace with different sql type""" + Session, t_dict, t_pair, t_link = self.go() + + db = DbHandle(*self.go()) + + b0 = 6.0 + b1 = 'asdf' # a different dtype + + job = dict(a=0, b=b0, c='hello') + + dbjob = db.insert(**job) + + dbjob['b'] = b1 + + #check that the change is in db + qjob = Session().query(db._Dict).filter(db._Dict._attrs.any(name='b', + sval=b1)).first() + self.failIf(qjob is dbjob) + self.failUnless(qjob == dbjob) + + #check that the b:b0 key is gone + count = Session().query(db._KeyVal).filter_by(name='b', fval=b0).count() + self.failUnless(count == 0, count) + + #check that the b:b1 key is there + count = Session().query(db._KeyVal).filter_by(name='b', sval=b1, + fval=None).count() + self.failUnless(count == 1, count) + + def test_setitem_2(self): + """replace with different number type""" + Session, t_dict, t_pair, t_link = self.go() + + db = DbHandle(*self.go()) + + b0 = 6.0 + b1 = 7 + + job = dict(a=0, b=b0, c='hello') + + dbjob = db.insert(**job) + + dbjob['b'] = b1 + + #check that the change is in db + qjob = Session().query(db._Dict).filter(db._Dict._attrs.any(name='b', + fval=b1)).first() + self.failIf(qjob is dbjob) + self.failUnless(qjob == dbjob) + + #check that the b:b0 key is gone + count = Session().query(db._KeyVal).filter_by(name='b', fval=b0,ntype=1).count() + self.failUnless(count == 0, count) + + #check that the b:b1 key is there + count = Session().query(db._KeyVal).filter_by(name='b', fval=b1,ntype=0).count() + self.failUnless(count == 1, count) + + +if __name__ == '__main__': + unittest.main()
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/pylearn/dbdict/tools.py Wed Nov 12 22:00:20 2008 -0500 @@ -0,0 +1,72 @@ +import sys + +from .experiment import COMPLETE, INCOMPLETE + +MODULE = 'dbdict_module' +SYMBOL = 'dbdict_symbol' + +def dummy_channel(*args, **kwargs): + return None + +# +#this proxy object lets experiments use a dict like a state object +# +def DictProxyState(dct): + defaults_obj = [None] + class Proxy(object): + def subdict(s, prefix=''): + rval = {} + for k,v in dct.items(): + if k.startswith(prefix): + rval[k[len(prefix):]] = v + return rval + def use_defaults(s, obj): + defaults_obj[0] = obj + + def __getitem__(s,a): + try: + return dct[a] + except Exception, e: + try: + return getattr(defaults_obj[0], a) + except: + raise e + + def __setitem__(s,a,v): + dct[a] = v + + def __getattr__(s,a): + try: + return dct[a] + except KeyError: + return getattr(defaults_obj[0], a) + def __setattr__(s,a,v): + dct[a] = v + return Proxy() + +def load_state_fn(state): + + # + # load the experiment class + # + dbdict_module_name = getattr(state,MODULE) + dbdict_symbol = getattr(state, SYMBOL) + + try: + dbdict_module = __import__(dbdict_module_name, fromlist=[None], level=0) + dbdict_fn = getattr(dbdict_module, dbdict_symbol) + except: + print >> sys.stderr, "FAILED to load job symbol:", dbdict_module_name, dbdict_symbol + print >> sys.stderr, "PATH", sys.path + raise + + return dbdict_fn + + +def run_state(state, channel = dummy_channel): + fn = load_state_fn(state) + rval = fn(state, channel) + if rval not in (COMPLETE, INCOMPLETE): + print >> sys.stderr, "WARNING: INVALID job function return value" + return rval +
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/setup.py Wed Nov 12 22:00:20 2008 -0500 @@ -0,0 +1,14 @@ +#!/bin/env python + +from ez_setup import use_setuptools +use_setuptools() +from setuptools import setup, find_packages, Extension, Library +setup(name="Pylearn", + version="0.1", + description="Pylearn", + long_description="""Machine learning toolkit""", + author="LISA", + author_email="pylearn-dev@googlegroups.com", + packages=find_packages(exclude='tests'), +) +