# HG changeset patch
# User bergstra@mlp4.ais.sandbox
# Date 1234334594 18000
# Node ID 2704c8688ced55212a044e1c90a0feef951568f3
# Parent d3d8f5a17909b5dfb7698d50e09ad6c2906bc8c7# Parent d03b5d8e4bf6f559eb7be31053c09269b2393407
merge
diff -r d3d8f5a17909 -r 2704c8688ced bin/dbdict-query
--- a/bin/dbdict-query Wed Feb 11 01:42:58 2009 -0500
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,3 +0,0 @@
-#!/usr/bin/env python
-from dbdict.tools import standalone_query
-standalone_query()
diff -r d3d8f5a17909 -r 2704c8688ced bin/dbdict-run
--- a/bin/dbdict-run Wed Feb 11 01:42:58 2009 -0500
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,3 +0,0 @@
-#!/usr/bin/env python
-from pylearn.dbdict.newstuff import run_cmdline
-run_cmdline()
diff -r d3d8f5a17909 -r 2704c8688ced bin/dbdict-run-job
--- a/bin/dbdict-run-job Wed Feb 11 01:42:58 2009 -0500
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,3 +0,0 @@
-#!/usr/bin/env python
-from dbdict.tools import standalone_run_job
-standalone_run_job()
diff -r d3d8f5a17909 -r 2704c8688ced pylearn/algorithms/logistic_regression.py
--- a/pylearn/algorithms/logistic_regression.py Wed Feb 11 01:42:58 2009 -0500
+++ b/pylearn/algorithms/logistic_regression.py Wed Feb 11 01:43:14 2009 -0500
@@ -98,6 +98,8 @@
#FIX : Guillaume suggested a convention: plugin handlers (dataset_factory, minimizer_factory,
# etc.) should never provide default arguments for parameters, and accept **kwargs to catch
# irrelevant parameters.
+#SOLUTION: the jobman deals in nested dictionaries. This means that there is no [dumb] reason that
+# irrelevant arguments should be passed at all.
class _fit_logreg_defaults(object):
minimizer_algo = 'dummy'
#minimizer_lr = 0.001
@@ -107,9 +109,6 @@
batchsize = 8
verbose = 1
-# consider pre-importing each file in algorithms, datasets (possibly with try/catch around each
-# import so that this import failure is ignored)
-
def fit_logistic_regression_online(state, channel=lambda *args, **kwargs:None):
#use stochastic gradient descent
state.use_defaults(_fit_logreg_defaults)
@@ -193,3 +192,95 @@
updates = dict((p, p - self.lr * g) for p, g in zip(self.params, gparams)))
+class classification: #this would go to a file called pylearn/algorithms/classification.py
+
+ @staticmethod
+ def xent(p, q):
+ """cross-entropy (row-wise)
+
+ :type p: M x N symbolic matrix (sparse or dense)
+
+ :param p: each row is a true distribution over N things
+
+ :type q: M x N symbolic matrix (sparse or dense)
+
+ :param q: each row is an approximating distribution over N things
+
+ :rtype: symbolic vector of length M
+
+ :returns: the cross entropy between each row of p and the corresponding row of q.
+
+
+ Hint: To sum row-wise costs into a scalar value, use "xent(p, q).sum()"
+ """
+ return (p * tensor.log(q)).sum(axis=1)
+
+ @staticmethod
+ def errors(target, prediction):
+ """classification error (row-wise)
+
+ :type p: M x N symbolic matrix (sparse or dense)
+
+ :param p: each row is a true distribution over N things
+
+ :type q: M x N symbolic matrix (sparse or dense)
+
+ :param q: each row is an approximating distribution over N things
+
+ :rtype: symbolic vector of length M
+
+ :returns: a vector with 0 for every row pair that has a maximum in the same position,
+ and 1 for every other row pair.
+
+
+ Hint: Count errors with "errors(prediction, target).sum()", and get the error-rate with
+ "errors(prediction, target).mean()"
+ """
+ return tensor.neq(
+ tensor.argmax(prediction, axis=1),
+ tensor.argmax(target, axis=1))
+
+class LogReg_New(module.FancyModule):
+ """A symbolic module for performing multi-class logistic regression."""
+
+ params = property(
+ lambda self: [p for p in [self.w, self.b] if p.owner is None],
+ doc="WRITEME"
+ )
+
+ def __init__(self, n_in=None, n_out=None, w=None, b=None):
+ super(LogRegNew, self).__init__() #boilerplate
+
+ self.n_in = n_in
+ self.n_out = n_out
+
+ self.w = w if w is not None else module.Member(T.dmatrix())
+ self.b = b if b is not None else module.Member(T.dvector())
+
+ def _instance_initialize(self, obj):
+ obj.w = N.zeros((self.n_in, self.n_out))
+ obj.b = N.zeros(self.n_out)
+ obj.__pp_hide__ = ['params']
+
+
+ def l1(self):
+ return abs(self.w).sum()
+
+ def l2(self):
+ return (self.w**2).sum()
+
+ def activation(self, input):
+ return theano.dot(input, self.w) + self.b
+
+ def softmax(self, input):
+ return nnet.softmax(self.activation(input))
+
+ def argmax(self, input):
+ return tensor.argmax(self.activation(input))
+
+ def xent(self, input, target):
+ return classification.xent(target, self.softmax(input))
+
+ def errors(self, input, target):
+ return classification.errors(target, self.softmax(input))
+
diff -r d3d8f5a17909 -r 2704c8688ced pylearn/datasets/flickr.py
--- a/pylearn/datasets/flickr.py Wed Feb 11 01:42:58 2009 -0500
+++ b/pylearn/datasets/flickr.py Wed Feb 11 01:43:14 2009 -0500
@@ -26,6 +26,10 @@
valid = filetensor.read(open(os.path.join(root, path_valid_10class)))
test = filetensor.read(open(os.path.join(root, path_test_10class)))
+ assert train.shape[1] == 75*75 +1
+ assert valid.shape[1] == 75*75 +1
+ assert test.shape[1] == 75*75 +1
+
rval = Dataset()
rval.train = Dataset.Obj(
@@ -46,3 +50,22 @@
def translations_10class():
raise NotImplementedError('TODO')
+
+def render_a_few_images(n=10, prefix='flickr_img', suffix='png'):
+ #TODO: document this and move it to a more common
+ # place where other datasets can use it
+ from PIL import Image
+ root = os.path.join(data_root(), 'flickr')
+ valid = filetensor.read(open(os.path.join(root, path_valid_10class)))
+ assert valid.shape == (1000,75*75+1)
+ for i in xrange(n):
+ pixelarray = valid[i,0:-1].reshape((75,75)).T
+ assert numpy.all(pixelarray >= 0)
+ assert numpy.all(pixelarray <= 1)
+
+ pixel_uint8 = numpy.asarray( pixelarray * 255.0, dtype='uint8')
+ im = Image.frombuffer('L', pixel_uint8.shape, pixel_uint8.data, 'raw', 'L', 0, 1)
+ im.save(prefix + str(i) + '.' + suffix)
+
+
+
diff -r d3d8f5a17909 -r 2704c8688ced pylearn/dbdict/__init__.py
--- a/pylearn/dbdict/__init__.py Wed Feb 11 01:42:58 2009 -0500
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,1 +0,0 @@
-from api0 import sqlite_file_db, sqlite_memory_db, postgres_db
diff -r d3d8f5a17909 -r 2704c8688ced pylearn/dbdict/api0.py
--- a/pylearn/dbdict/api0.py Wed Feb 11 01:42:58 2009 -0500
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,563 +0,0 @@
-from sqlalchemy import create_engine, desc
-import sqlalchemy.pool
-from sqlalchemy.orm import sessionmaker
-from sqlalchemy import Table, Column, MetaData, ForeignKey, ForeignKeyConstraint
-from sqlalchemy import Integer, String, Float, Boolean, DateTime, Text, Binary
-from sqlalchemy.databases import postgres
-from sqlalchemy.orm import mapper, relation, backref, eagerload
-from sqlalchemy.sql import operators, select
-from sql_commands import crazy_sql_command
-
-class Todo(Exception): """Replace this with some working code!"""
-
-class DbHandle (object):
- """
- This class also provides filtering shortcuts that hide the names of the
- DbHandle internal databases.
-
- Attributes:
- dict_table
- pair_table
-
-
-
- dict_table
-
- An SqlAlchemy-mapped class corresponding to database table with the
- following schema:
-
- Column('id', Integer, primary_key=True)
- Column('create', DateTime)
- Column('write', DateTime)
- Column('read', DateTime)
-
- #TODO: reconsider create/read/write
-
- pair_table
-
- An SqlAlchemy-mapped class corresponding to database table with the
- following schema:
-
- Column('id', Integer, primary_key=True)
- Column('name', String(128))
- Column('ntype', Boolean)
- Column('fval', Double)
- Column('sval', Text))
- Column('bval', Blob)
-
- #TODO: Consider difference between text and binary
- #TODO: Consider adding a 'type' column
- #TODO: Consider union?
- #TODO: Are there stanard ways of doing this kind of thing?
-
- """
-
- e_bad_table = 'incompatible columns in table'
-
- def __init__(h_self, Session, engine, dict_table, pair_table):
- h_self._engine = engine;
- h_self._dict_table = dict_table
- h_self._pair_table = pair_table
-
- #TODO: replace this crude algorithm (ticket #17)
- if ['id', 'create', 'write', 'read', 'status', 'priority','hash'] != [c.name for c in dict_table.c]:
- raise ValueError(h_self.e_bad_table, dict_table)
- if ['id', 'dict_id', 'name', 'ntype', 'fval', 'sval', 'bval'] != [c.name for c in pair_table.c]:
- raise ValueError(h_self.e_bad_table, pair_table)
-
- h_self._session_fn = Session
-
- class KeyVal (object):
- """KeyVal interfaces between python types and the database.
-
- It encapsulates heuristics for type conversion.
- """
- def __init__(k_self, name, val):
- k_self.name = name
- k_self.val = val
- def __repr__(k_self):
- return "" % (k_self.id, k_self.name, repr(k_self.val))
- def __get_val(k_self):
- val = None
- if k_self.fval is not None: val = [int, float][k_self.ntype](k_self.fval)
- if k_self.bval is not None: val = eval(str(k_self.bval))
- if k_self.sval is not None: val = k_self.sval
- return val
- def __set_val(k_self, val):
- if isinstance(val, (str,unicode)):
- k_self.fval = None
- k_self.bval = None
- k_self.sval = val
- else:
- k_self.sval = None
- try:
- f = float(val)
- except (TypeError, ValueError):
- f = None
- if f is None: #binary data
- k_self.bval = repr(val)
- assert eval(k_self.bval) == val
- k_self.fval = None
- k_self.ntype = None
- else:
- k_self.bval = None
- k_self.fval = f
- k_self.ntype = isinstance(val,float)
- val = property(__get_val, __set_val)
-
- mapper(KeyVal, pair_table)
-
- class Dict (object):
- """
- Instances are dict-like objects with additional features for
- communicating with an active database.
-
- This class will be mapped by SqlAlchemy to the dict_table.
-
- Attributes:
- handle - reference to L{DbHandle} (creator)
-
- """
- def __init__(d_self, session=None):
- if session is None:
- s = h_self._session_fn()
- s.add(d_self) #d_self transient -> pending
- s.commit() #d_self -> persistent
- s.close() #d_self -> detached
- else:
- s = session
- s.save(d_self)
-
- _forbidden_keys = set(['session'])
-
- #
- # dictionary interface
- #
-
- def __contains__(d_self, key):
- for a in d_self._attrs:
- if a.name == key:
- return True
- return False
-
- def __eq__(self, other):
- return dict(self) == dict(other)
- def __neq__(self, other):
- return dict(self) != dict(other)
-
- def __getitem__(d_self, key):
- for a in d_self._attrs:
- if a.name == key:
- return a.val
- raise KeyError(key)
-
- def __setitem__(d_self, key, val, session=None):
- if session is None:
- s = h_self._session_fn()
- s.add(d_self)
- d_self._set_in_session(key, val, s)
- s.commit()
- s.close()
- else:
- s = session
- s.add(d_self)
- d_self._set_in_session(key, val, s)
-
- def __delitem__(d_self, key, session=None):
- if session is None:
- s = h_self._session_fn()
- commit_close = True
- else:
- s = session
- commit_close = False
- s.add(d_self)
-
- #find the item to delete in d_self._attrs
- to_del = None
- for i,a in enumerate(d_self._attrs):
- if a.name == key:
- assert to_del is None
- to_del = (i,a)
- if to_del is None:
- raise KeyError(key)
- else:
- i, a = to_del
- s.delete(a)
- del d_self._attrs[i]
- if commit_close:
- s.commit()
- s.close()
-
- def iteritems(d_self):
- return d_self.items()
-
- def items(d_self):
- return [(kv.name, kv.val) for kv in d_self._attrs]
-
- def keys(d_self):
- return [kv.name for kv in d_self._attrs]
-
- def values(d_self):
- return [kv.val for kv in d_self._attrs]
-
- def update(d_self, dct, session=None, **kwargs):
- """Like dict.update(), set keys from kwargs"""
- if session is None:
- s = h_self._session_fn()
- commit_close = True
- else:
- s = session
- commit_close = False
- s.add(d_self)
- for k, v in dct.items():
- d_self._set_in_session(k, v, s)
- for k, v in kwargs.items():
- d_self._set_in_session(k, v, s)
-
- if commit_close:
- s.commit()
- s.close()
-
- def get(d_self, key, default):
- try:
- return d_self[key]
- except KeyError:
- return default
-
- def __str__(self):
- return 'Dict'+ str(dict(self))
-
- #
- # database stuff
- #
-
- def refresh(d_self, session=None):
- """Sync key-value pairs from database to self
-
- @param session: use the given session, and do not commit.
-
- """
- if session is None:
- session = h_self._session_fn()
- session.add(d_self) #so session knows about us
- session.refresh(d_self)
- session.commit()
- session.close()
- else:
- session.add(d_self) #so session knows about us
- session.refresh(self.dbrow)
-
- def delete(d_self, session=None):
- """Delete this dictionary from the database
-
- @param session: use the given session, and do not commit.
- """
- if session is None:
- session = h_self._session_fn()
- session.add(d_self) #so session knows about us
- session.delete(d_self) #mark for deletion
- session.commit()
- session.close()
- else:
- session.add(d_self) #so session knows about us
- session.delete(d_self)
-
- # helper routine by update() and __setitem__
- def _set_in_session(d_self, key, val, session):
- """Modify an existing key or create a key to hold val"""
-
- #FIRST SOME MIRRORING HACKS
- if key == 'dbdict.status':
- ival = int(val)
- d_self.status = ival
- if key == 'dbdict.sql.priority':
- fval = float(val)
- d_self.priority = fval
- if key == 'dbdict.hash':
- ival = int(val)
- d_self.hash = ival
-
- if key in d_self._forbidden_keys:
- raise KeyError(key)
- created = None
- for i,a in enumerate(d_self._attrs):
- if a.name == key:
- assert created == None
- created = h_self._KeyVal(key, val)
- d_self._attrs[i] = created
- if not created:
- created = h_self._KeyVal(key, val)
- d_self._attrs.append(created)
- session.save(created)
-
- mapper(Dict, dict_table,
- properties = {
- '_attrs': relation(KeyVal,
- cascade="all, delete-orphan")
- })
-
- class _Query (object):
- """
- Attributes:
- _query - SqlAlchemy.Query object
- """
-
- def __init__(q_self, query):
- q_self._query = query
-
- def __iter__(q_self):
- return q_self.all().__iter__()
-
- def __getitem__(q_self, item):
- return q_self._query.__getitem__(item)
-
- def filter_eq(q_self, kw, arg):
- """Return a Query object that restricts to dictionaries containing
- the given kwargs"""
-
- #Note: when we add new types to the key columns, add them here
- q = q_self._query
- T = h_self._Dict
- if isinstance(arg, (str,unicode)):
- q = q.filter(T._attrs.any(name=kw, sval=arg))
- else:
- try:
- f = float(arg)
- except (TypeError, ValueError):
- f = None
- if f is None:
- q = q.filter(T._attrs.any(name=kw, bval=repr(arg)))
- else:
- q = q.filter(T._attrs.any(name=kw, fval=f))
-
- return h_self._Query(q)
-
- def filter_eq_dct(q_self, dct):
- rval = q_self
- for key, val in dct.items():
- rval = rval.filter_eq(key,val)
- return rval
-
- def all(q_self):
- """Return an iterator over all matching dictionaries.
-
- See L{SqlAlchemy.Query}
- """
- return q_self._query.all()
-
- def count(q_self):
- """Return the number of matching dictionaries.
-
- See L{SqlAlchemy.Query}
- """
- return q_self._query.count()
-
- def first(q_self):
- """Return some matching dictionary, or None
- See L{SqlAlchemy.Query}
- """
- return q_self._query.first()
-
- def all_ordered_by(q_self, key, desc=False):
- """Return query results, sorted.
-
- @type key: string or tuple of string or list of string
- @param: keys by which to sort the results.
-
- @rtype: list of L{DbHandle._Dict} instances
- @return: query results, sorted by given keys
- """
-
- # order_by is not easy to do in SQL based on the data structures we're
- # using. Considering we support different data types, it may not be
- # possible at all.
- #
- # It would be easy if 'pivot' or 'crosstab' were provided as part of the
- # underlying API, but they are not. For example, read this:
- # http://www.simple-talk.com/sql/t-sql-programming/creating-cross-tab-queries-and-pivot-tables-in-sql/
-
- # load query results
- results = list(q_self.all())
-
- if isinstance(key, (tuple, list)):
- val_results = [([d[k] for k in key], d) for d in results]
- else:
- val_results = [(d[key], d) for d in results]
-
- val_results.sort() #interesting: there is an optional key parameter
- if desc:
- val_results.reverse()
- return [vr[-1] for vr in val_results]
-
- h_self._KeyVal = KeyVal
- h_self._Dict = Dict
- h_self._Query = _Query
-
- def __iter__(h_self):
- s = h_self.session()
- rval = list(h_self.query(s).__iter__())
- s.close()
- return rval.__iter__()
-
- def insert_kwargs(h_self, session=None, **dct):
- """
- @rtype: DbHandle with reference to self
- @return: a DbHandle initialized as a copy of dct
-
- @type dct: dict-like instance whose keys are strings, and values are
- either strings, integers, floats
-
- @param dct: dictionary to insert
-
- """
- return h_self.insert(dct, session=session)
-
- def insert(h_self, dct, session=None):
- """
- @rtype: DbHandle with reference to self
- @return: a DbHandle initialized as a copy of dct
-
- @type dct: dict-like instance whose keys are strings, and values are
- either strings, integers, floats
-
- @param dct: dictionary to insert
-
- """
- if session is None:
- s = h_self.session()
- rval = h_self._Dict(s)
- if dct: rval.update(dct, session=s)
- s.commit()
- s.close()
- else:
- rval = h_self._Dict(session)
- if dct: rval.update(dct, session=session)
- return rval
-
- def query(h_self, session):
- """Construct an SqlAlchemy query, which can be subsequently filtered
- using the instance methods of DbQuery"""
- return h_self._Query(session.query(h_self._Dict)\
- .options(eagerload('_attrs')))
-
- def createView(h_self, view):
-
- s = h_self.session()
- cols = []
-
- for col in view.columns:
- if col.name is "id":
- continue;
- elif isinstance(col.type, (Integer,Float)):
- cols.append([col.name,'fval']);
- elif isinstance(col.type,String):
- cols.append([col.name,'sval']);
- elif isinstance(col.type,Binary):
- cols.append([col.name,'bval']);
- else:
- assert "Error: wrong column type in view",view.name;
-
- # generate raw sql command string
- viewsql = crazy_sql_command(view.name, cols, \
- h_self._dict_table.name, \
- h_self._pair_table.name)
-
- print 'Creating sql view with command:\n', viewsql;
- h_self._engine.execute(viewsql);
- s.commit();
- s.close()
-
- class MappedClass(object):
- pass
-
- mapper(MappedClass, view)
-
- return MappedClass
-
- def session(h_self):
- return h_self._session_fn()
-
- def get(h_self, id):
- s = h_self.session()
- rval = s.query(h_self._Dict).get(id)
- if rval:
- #eagerload hack
- str(rval)
- rval.id
- s.close()
- return rval
-
-
-
-
-def db_from_engine(engine,
- table_prefix='DbHandle_default_',
- trial_suffix='trial',
- keyval_suffix='keyval'):
- """Create a DbHandle instance
-
- @type engine: sqlalchemy engine (e.g. from create_engine)
- @param engine: connect to this database for transactions
-
- @type table_prefix: string
- @type trial_suffix: string
- @type keyval_suffix: string
-
- @rtype: DbHandle instance
-
- @note: The returned DbHandle will use three tables to implement the
- many-to-many pattern that it needs:
- - I{table_prefix + trial_suffix},
- - I{table_prefix + keyval_suffix}
-
- """
- Session = sessionmaker(autoflush=True, autocommit=False)
-
- metadata = MetaData()
-
- t_trial = Table(table_prefix+trial_suffix, metadata,
- Column('id', Integer, primary_key=True),
- Column('create', DateTime),
- Column('write', DateTime),
- Column('read', DateTime),
- Column('status', Integer),
- Column('priority', Float(53)),
- Column('hash', postgres.PGBigInteger))
-
- t_keyval = Table(table_prefix+keyval_suffix, metadata,
- Column('id', Integer, primary_key=True),
- Column('dict_id', Integer, index=True),
- Column('name', String(128), index=True, nullable=False), #name of attribute
- Column('ntype', Boolean),
- Column('fval', Float(53)),
- Column('sval', Text),
- Column('bval', Binary),
- ForeignKeyConstraint(['dict_id'], [table_prefix+trial_suffix+'.id']))
-
- #, ForeignKey('%s.id' % t_trial)),
- metadata.bind = engine
- metadata.create_all() # no-op when tables already exist
- #warning: tables can exist, but have incorrect schema
- # see bug mentioned in DbHandle constructor
-
- return DbHandle(Session, engine, t_trial, t_keyval)
-
-def sqlite_memory_db(echo=False, **kwargs):
- """Return a DbHandle backed by a memory-based database"""
- engine = create_engine('sqlite:///:memory:', echo=False)
- return db_from_engine(engine, **kwargs)
-
-def sqlite_file_db(filename, echo=False, **kwargs):
- """Return a DbHandle backed by a file-based database"""
- engine = create_engine('sqlite:///%s' % filename, echo=False)
- return db_from_engine(engine, **kwargs)
-
-def postgres_db(user, password, host, database, echo=False, poolclass=sqlalchemy.pool.NullPool, **kwargs):
- """Create an engine to access a postgres_dbhandle
- """
- db_str ='postgres://%(user)s:%(password)s@%(host)s/%(database)s' % locals()
-
- engine = create_engine(db_str, echo=echo, poolclass=poolclass)
-
- return db_from_engine(engine, **kwargs)
-
diff -r d3d8f5a17909 -r 2704c8688ced pylearn/dbdict/crap.py
--- a/pylearn/dbdict/crap.py Wed Feb 11 01:42:58 2009 -0500
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,256 +0,0 @@
-
-from __future__ import absolute_import
-
-import os, sys, socket, datetime, copy, tempfile, shutil
-from .dconfig import Config, load
-from .api0 import sqlite_memory_db
-from .api0 import postgres_serial
-
-
-class Job(object):
- @classmethod
- def is_a_default_option(cls, key, val):
- if callable(val):
- return False
- if key.startswith('_'):
- return False
- if key in ('job_module', 'job_symbol'):
- return False
- return True
-
- @classmethod
- def options(cls):
- opts = [attr for attr in dir(cls)
- if cls.is_a_default_option(attr, getattr(cls, attr))]
- opts.sort()
- return opts
-
-
- def __init__(self, config):
- for key, val in config.items():
- if self.is_a_default_option(key, val):
- if hasattr(self, key):
- setattr(self, key, val)
- else:
- raise AttributeError("Job has no such option:", key)
-
- @classmethod
- def print_defaults(cls):
- for attr in cls.options():
- print attr, '=', getattr(cls, attr)
-
- def print_config(self, outfile=sys.stdout):
- for attr in self.options():
- print >> outfile, attr, '=', getattr(self, attr)
-
- def save_config(self, outfile=sys.stdout):
- """Like print_config, but with parsable format rather than human-readable format"""
- kwargs = dict([(attr,getattr(self, attr)) for attr in self.options()])
- Config(**kwargs).save_file(outfile)
-
-def run_job(fn=None, cwd=None,
-
- config_path = 'job_config.py',
- result_path = 'job_result.py',
- stdout_path = 'stdout',
- stderr_path = 'stderr',
- work_dir = 'workdir'
- ):
- cwd = os.getcwd() if cwd is None else cwd
-
-def _pop_cwd():
- try:
- cwd = sys.argv.pop(0)
- if not cwd.startswith('/'):
- cwd = os.path.join(os.getcwd(), cwd)
- except IndexError:
- cwd = os.getcwd()
- return cwd
-
-
-class RunJob(object):
- """
- """
-
- path_perf = 'job_perf.py'
- path_results = 'job_results.py'
- path_config = 'job_config.py'
- path_fullconfig = 'job_fullconfig.py'
- path_stdout = 'stdout'
- path_stderr = 'stderr'
- path_workdir = 'workdir'
-
- def __init__(self, exename):
- pass
-
- def _load_config(self, cwd = None):
- cwd = _pop_cwd() if cwd is None else cwd
- config = load(os.path.join(cwd, self.path_config))
- return config
-
- def _load_job_class(self, config=None):
- config = self._load_config() if config is None else config
- job_module_name = config.job_module
- job_symbol = config.job_symbol
- #level=0 -> absolute imports
- #fromlist=[None] -> evaluate the rightmost element of the module name, when it has
- # dots (e.g., "A.Z" will return module Z)
- job_module = __import__(job_module_name, fromlist=[None], level=0)
- try:
- job_class = getattr(job_module, job_symbol)
- except:
- print >> sys.stderr, "failed to load job class:", job_module_name, job_symbol
- raise
- return job_class
-
- def print_config(self):
- config = self._load_config()
- job_class = self._load_job_class(config)
- job = job_class(config)
- job.print_config()
-
- def run(self):
- cwd = _pop_cwd()
- config = self._load_config(cwd)
- job_class = self._load_job_class(config)
- job = job_class(config)
-
- job.save_config(open(os.path.join(cwd, self.path_fullconfig),'w'))
-
- perf = Config()
- perf.host_name = socket.gethostname()
- perf.start_time = str(datetime.datetime.now())
- perf.save(os.path.join(cwd, self.path_perf))
-
- stdout_orig = sys.stdout
- stderr_orig = sys.stderr
-
- try:
- sys.stdout = open(os.path.join(cwd, self.path_stdout), 'w')
- sys.stderr = open(os.path.join(cwd, self.path_stderr), 'w')
-
- #
- # mess around with the working directory
- #
-
- wd = os.path.join(cwd, self.path_workdir)
- try:
- os.mkdir(wd)
- except OSError, e:
- print >> sys.stderr, "trouble making wordking directory:"
- print >> sys.stderr, e
- print >> sys.stderr, "ignoring error and proceeding anyway"
- try:
- os.chdir(wd)
- except:
- pass
-
- print >> sys.stderr, "cwd:", os.getcwd()
-
- #
- # run the job...
- #
-
- #TODO load the results file before running the job, to resume job
- results = Config()
-
- job_rval = job.run(results)
-
- #TODO: use the return value to decide whether the job should be resumed or not,
- # False means run is done
- # True means the job has yielded, but should be continued
- results.save(os.path.join(cwd, self.path_results))
-
- finally:
- #put back stderr and stdout
- sys.stdout = stdout_orig
- sys.stderr = stderr_orig
- perf.end_time = str(datetime.datetime.now())
- perf.save(os.path.join(cwd, self.path_perf))
-
- def defaults(self):
- job_class = self._load_job_class()
- job_class.print_defaults()
-
-def standalone_run_job():
- exe = RunJob(sys.argv.pop(0))
- try:
- cmd = sys.argv.pop(0)
- fn = getattr(exe, cmd)
- except IndexError:
- fn = getattr(exe, 'run')
- except AttributeError:
- print >> sys.stderr, "command not supported", cmd
-
- fn()
-
-def build_db(cwd, db=None):
- """WRITEME"""
- db = sqlite_memory_db() if db is None else db
- for e in os.listdir(cwd):
- e = os.path.join(cwd, e)
- try:
- e_config = open(os.path.join(e, 'job_config.py'))
- except:
- e_config = None
-
- try:
- e_sentinel = open(os.path.join(e, '__jobdir__'))
- except:
- e_sentinel = None
-
- if not (e_config or e_sentinel):
- continue #this is not a job dir
-
- if e_config:
- e_config.close()
- config = load(os.path.join(e, 'job_config.py'))
- kwargs = copy.copy(config.__dict__)
- try:
- results = load(os.path.join(e, 'job_results.py'))
- kwargs.update(results.__dict__)
- except:
- pass
- try:
- perf = load(os.path.join(e, 'job_perf.py'))
- kwargs.update(perf.__dict__)
- except:
- pass
-
- #TODO: this is a backward-compatibility hack for AISTATS*09
- if 'perf_jobdir' not in kwargs:
- kwargs['perf_jobdir'] = e
- if 'perf_workdir' not in kwargs:
- kwargs['perf_workdir'] = os.path.join(e, 'workdir')
-
- entry = db.insert(kwargs)
-
- if e_sentinel:
- print >> sys.stderr, "NOT-IMPLEMENTED: RECURSION INTO SUBDIRECTORY", e
- return db
-
-
-class RunQuery(object):
-
- def __init__(self, exename):
- pass
-
- def run(self):
- cwd = _pop_cwd()
- db = build_db(cwd)
- for entry in db.query().all():
- print entry.items()
-
-
-def standalone_query():
- exe = RunQuery(sys.argv.pop(0))
- try:
- cmd = sys.argv.pop(0)
- fn = getattr(exe, cmd)
- except IndexError:
- fn = getattr(exe, 'run')
- except AttributeError:
- print >> sys.stderr, "command not supported", cmd
-
- fn()
-
diff -r d3d8f5a17909 -r 2704c8688ced pylearn/dbdict/dbdict_run.py
--- a/pylearn/dbdict/dbdict_run.py Wed Feb 11 01:42:58 2009 -0500
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,78 +0,0 @@
-import sys, signal
-from .tools import DictProxyState, load_state_fn, run_state
-
-# N.B.
-# don't put exotic imports here
-# see what I did with def sql() below... since we shouldn't try to import that stuff unless we
-# need it.
-
-_experiment_usage = """Usage:
- dbdict-experiment
-
-Commands:
-
- help Print help. (You're looking at it.)
-
- cmdline Obtain experiment configuration by evaluating the commandline
- sql Obtain experiment configuration by querying an sql database
-
-Help on individual commands might be available by typing 'dbdict-experiment help'
-
-"""
- #dbdict Obtain experiment configuration by loading a dbdict file
-
-def _dispatch_cmd(self, stack=sys.argv):
- try:
- cmd = stack.pop(0)
- except IndexError:
- cmd = 'help'
- try:
- fn = getattr(self, cmd)
- except AttributeError:
- print >> sys.stderr, "command not supported", cmd
- fn = self.help
- fn()
-
-class RunCmdline(object):
- def __init__(self):
- try:
- dct = eval('dict(' + sys.argv.pop(0) + ')')
- except Exception, e:
- print >> sys.stderr, "Exception:", e
- self.help()
- return
-
- channel_rval = [None]
-
- def on_sigterm(signo, frame):
- channel_rval[0] = 'stop'
-
- #install a SIGTERM handler that asks the run_state function to return
- signal.signal(signal.SIGTERM, on_sigterm)
- signal.signal(signal.SIGINT, on_sigterm)
-
- def channel(*args, **kwargs):
- return channel_rval[0]
-
- run_state(dct, channel)
- print dct
-
- def help(self):
- print >> sys.stderr, "Usage: dbdict-experiment cmdline "
-
-
-class RunExperiment(object):
- """This class handles the behaviour of the dbdict-run script."""
- def __init__(self):
- exe = sys.argv.pop(0)
- #print 'ARGV', sys.argv
- _dispatch_cmd(self)
-
- def sql(self):
- from .dbdict_run_sql import run_sql
- return run_sql()
-
- cmdline = RunCmdline
-
- def help(self):
- print _experiment_usage # string not inlined so as not to fool VIM's folding mechanism
diff -r d3d8f5a17909 -r 2704c8688ced pylearn/dbdict/dbdict_run_sql.py
--- a/pylearn/dbdict/dbdict_run_sql.py Wed Feb 11 01:42:58 2009 -0500
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,446 +0,0 @@
-"""Code related to using the api0 dictionary table to run experiments.
-
-This module has been tested with postgres.
-
-"""
-
-import sys, os, socket, tempfile, shutil, copy, time
-import numpy # for the random wait time in book_unstarted_trial
-
-import sqlalchemy
-from sqlalchemy import create_engine, desc
-from sqlalchemy.orm import eagerload
-import psycopg2, psycopg2.extensions
-
-from .api0 import db_from_engine, postgres_db
-from .tools import run_state, DictProxyState, COMPLETE, INCOMPLETE, SYMBOL, MODULE # tools
-from .dconfig import save_items
-
-# _TEST CONCURRENCY
-# To ensure that concurrency is handled properly by the consumer (book_dct)
-# set this flag to True and (manually) run the following test.
-#
-# Launch two processes side by side, starting one of them a few seconds after the other.
-# There is an extra sleep(10) that will delay each process's job dequeue.
-#
-# You should see that the process that started first gets the job,
-# and the process that started second tries to get the same job,
-# fails, and then gets another one instead.
-_TEST_CONCURRENCY = False
-
-_help = """Usage:
-dbdict-run sql postgres://:@//
-
- user - postgres username
- pass - password
- hostname - the network address of the host on which a postgres server is
- running (on port ??)
- database - a database served by the postgres server on
- api0-table - the name (actually, table_prefix) associated with tables,
- created by dbdict.api0.
-
- experiment-root - a local or network path. Network paths begin with ssh://
- E.g. /tmp/blah
- ssh://mammouth:blah
- ssh://foo@linux.org:/tmp/blah
-
-Experiment-root is used to store the file results of experiments. If a job with a given
-creates file 'a.txt', and directory 'b' with file 'foo.py', then these will be rsync'ed to the
-experiment-root when job has finished running. They will be found here:
-
- ////workdir/a.txt
- ////workdir/b/foo.py
-
-Files 'stdout', 'stderr', and 'state.py' will be created.
-
- ////stdout - opened for append
- ////stderr - opened for append
- ////state.py - overwritten with database version
-
-If a job is restarted or resumed, then those files are rsync'ed back to the current working
-directory, and stdout and stderr are re-opened for appending. When a resumed job stops for
-the second (or more) time, the cwd is rsync'ed back to the experiment-root. In this way, the
-experiment-root accumulates the results of experiments that run.
-
-"""
-
-STATUS = 'dbdict_sql_status'
-PRIORITY = 'dbdict_sql_priority'
-HOST = 'dbdict_sql_hostname'
-HOST_WORKDIR = 'dbdict_sql_host_workdir'
-PUSH_ERROR = 'dbdict_sql_push_error'
-
-START = 0
-"""dbdict_status == START means a experiment is ready to run"""
-
-RUNNING = 1
-"""dbdict_status == RUNNING means a experiment is running on dbdict_hostname"""
-
-DONE = 2
-"""dbdict_status == DONE means a experiment has completed (not necessarily successfully)"""
-
-RESTART_PRIORITY = 2.0
-"""Stopped experiments are marked with this priority"""
-
-
-def postgres_serial(user, password, host, database, **kwargs):
- """Return a DbHandle instance that communicates with a postgres database at transaction
- isolation_level 'SERIALIZABLE'.
-
- :param user: a username in the database
- :param password: the password for the username
- :param host: the network address of the host on which the postgres server is running
- :param database: a database served by the postgres server
-
- """
- this = postgres_serial
-
- if not hasattr(this,'engine'):
- def connect():
- c = psycopg2.connect(user=user, password=password, database=database, host=host)
- c.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE)
- return c
- pool_size = 0
- this.engine = create_engine('postgres://'
- ,creator=connect
- ,pool_size=0 # should force the app release connections
- )
-
- db = db_from_engine(this.engine, **kwargs)
- db._is_serialized_session_db = True
- return db
-
-
-def add_experiments_to_db(exp_cls, jobs, db, verbose=0, add_dups=False, type_check=None):
- """Add experiments paramatrized by exp_cls and jobs[i] to database db.
-
- Default behaviour is to ignore jobs which are already in the database.
-
- If type_check is a class (instead of None) then it will be used as a type declaration for
- all the elements in each job dictionary. For each key,value pair in the dictionary, there
- must exist an attribute,value pair in the class meeting the following criteria:
- the attribute and the key are equal, and the types of the values are equal.
-
- :param exp_cls: The Experiment class to run these experiments.
- :param jobs: The parameters of experiments to run.
- :type jobs: an iterable object over dictionaries
- :param verbose: print which jobs are added and which are skipped
- :param add_dups: False will ignore a job if it matches (on all items()) with a db entry.
- :type add_dups: Bool
-
- :returns: list of (Bool,job[i]) in which the flags mean the corresponding job actually was
- inserted.
-
- """
- rval = []
- for job in jobs:
- job = copy.copy(job)
- do_insert = add_dups or (None is db.query(**job).first())
-
- if do_insert:
- if type_check:
- for k,v in job.items():
- if type(v) != getattr(type_check, k):
- raise TypeError('Experiment contains value with wrong type',
- ((k,v), getattr(type_check, k)))
-
- job[STATUS] = START
- job[SYMBOL] = exp_cls.__name__
- job[MODULE] = exp_cls.__module__
- job[PRIORITY] = 1.0
- if verbose:
- print 'ADDING ', job
- db.insert(job)
- rval.append((True, job))
- else:
- if verbose:
- print 'SKIPPING', job
- rval.append((False, job))
-
-
-def book_dct_postgres_serial(db, retry_max_sleep=10.0, verbose=0):
- """Find a trial in the lisa_db with status START.
-
- A trial will be returned with dbdict_status=RUNNING.
-
- Returns None if no such trial exists in DB.
-
- This function uses a serial access to the lisadb to guarantee that no other
- process will retrieve the same dct. It is designed to facilitate writing
- a "consumer" for a Producer-Consumer pattern based on the database.
-
- """
- print >> sys.stderr, """#TODO: use the priority field, not the status."""
- print >> sys.stderr, """#TODO: ignore entries with key PUSH_ERROR."""
-
- s = db._session
-
- # NB. we need the query and attribute update to be in the same transaction
- assert s.autocommit == False
-
- dcts_seen = set([])
- keep_trying = True
-
- dct = None
- while (dct is None) and keep_trying:
- #build a query
- q = s.query(db._Dict)
- q = q.options(eagerload('_attrs')) #hard-coded in api0
- q = q.filter(db._Dict._attrs.any(name=STATUS, fval=START))
-
- #try to reserve a dct
- try:
- #first() may raise psycopg2.ProgrammingError
- dct = q.first()
-
- if dct is not None:
- assert (dct not in dcts_seen)
- if verbose: print 'book_unstarted_dct retrieved, ', dct
- dct._set_in_session(STATUS, RUNNING, s)
- if 1:
- if _TEST_CONCURRENCY:
- print >> sys.stderr, 'SLEEPING BEFORE BOOKING'
- time.sleep(10)
-
- #commit() may raise psycopg2.ProgrammingError
- s.commit()
- else:
- print >> sys.stderr, 'DEBUG MODE: NOT RESERVING JOB!', dct
- #if we get this far, the job is ours!
- else:
- # no jobs are left
- keep_trying = False
- except (psycopg2.OperationalError,
- sqlalchemy.exceptions.ProgrammingError), e:
- #either the first() or the commit() raised
- s.rollback() # docs say to do this (or close) after commit raises exception
- if verbose: print 'caught exception', e
- if dct:
- # first() succeeded, commit() failed
- dcts_seen.add(dct)
- dct = None
- wait = numpy.random.rand(1)*retry_max_sleep
- if verbose: print 'another process stole our dct. Waiting %f secs' % wait
- time.sleep(wait)
- return dct
-
-def book_dct(db):
- print >> sys.stderr, """#TODO: use the priority field, not the status."""
- print >> sys.stderr, """#TODO: ignore entries with key self.push_error."""
-
- return db.query(dbdict_status=START).first()
-
-def parse_dbstring(dbstring):
- postgres = 'postgres://'
- assert dbstring.startswith(postgres)
- dbstring = dbstring[len(postgres):]
-
- #username_and_password
- colon_pos = dbstring.find('@')
- username_and_password = dbstring[:colon_pos]
- dbstring = dbstring[colon_pos+1:]
-
- colon_pos = username_and_password.find(':')
- if -1 == colon_pos:
- username = username_and_password
- password = None
- else:
- username = username_and_password[:colon_pos]
- password = username_and_password[colon_pos+1:]
-
- #hostname
- colon_pos = dbstring.find('/')
- hostname = dbstring[:colon_pos]
- dbstring = dbstring[colon_pos+1:]
-
- #dbname
- colon_pos = dbstring.find('/')
- dbname = dbstring[:colon_pos]
- dbstring = dbstring[colon_pos+1:]
-
- #tablename
- tablename = dbstring
-
- if password is None:
- password = open(os.getenv('HOME')+'/.dbdict_%s'%dbname).readline()[:-1]
- if False:
- print 'USERNAME', username
- print 'PASS', password
- print 'HOST', hostname
- print 'DB', dbname
- print 'TABLE', tablename
-
- return username, password, hostname, dbname, tablename
-
-class ExperimentLocation(object):
- def __init__(self, root, dbname, tablename, id):
- ssh_prefix='ssh://'
- if root.startswith(ssh_prefix):
- root = root[len(ssh_prefix):]
- #at_pos = root.find('@')
- colon_pos = root.find(':')
- self.host, self.path = root[:colon_pos], root[colon_pos+1:]
- else:
- self.host, self.path = '', root
- self.dbname = dbname
- self.tablename = tablename
- self.id = id
-
- def rsync(self, direction):
- """The directory at which experiment-related files are stored.
-
- :returns: ":", of the sort used by ssh and rsync.
- """
- if self.host:
- path = os.path.join(
- ':'.join([self.host, self.path]),
- self.dbname,
- self.tablename,
- self.id)
- else:
- path = os.path.join(self.path,
- self.dbname, self.tablename, self.id)
-
- if direction == 'push':
- rsync_cmd = 'rsync -a * "%s/"' % path
- elif direction == 'pull':
- rsync_cmd = 'rsync -a "%s/" .' % path
- else:
- raise Exception('invalid direction', direction)
-
- rsync_rval = os.system(rsync_cmd)
- if rsync_rval != 0:
- raise Exception('rsync failure', (rsync_rval, rsync_cmd))
-
- def pull(self):
- return self.rsync('pull')
-
- def push(self):
- return self.rsync('push')
-
- def touch(self):
- path = os.path.join(self.path, self.dbname, self.tablename, self.id)
- if self.host:
- host = self.host
- touch_cmd = ('ssh %(host)s "mkdir -p \'%(path)s\' && cd \'%(path)s\' '
- '&& touch stdout stderr && mkdir workdir"' % locals())
- else:
- touch_cmd = ("mkdir -p '%(path)s' && cd '%(path)s' "
- '&& touch stdout stderr && mkdir workdir' % locals())
- print "ECHO", touch_cmd
- touch_rval = os.system(touch_cmd)
- if 0 != touch_rval:
- raise Exception('touch failure', (touch_rval, touch_cmd))
-
- def delete(self):
- #something like ssh %s 'rm -Rf %s' should work, but it's pretty scary...
- raise NotImplementedError()
-
-def run_sql():
- try:
- username, password, hostname, dbname, tablename = parse_dbstring(sys.argv.pop(0))
- except Exception, e:
- print >> sys.stderr, e
- print >> sys.stderr, _help
- raise
-
- #set experiment_root
- try:
- exproot = sys.argv.pop(0)
- except:
- exproot = os.getcwd()
-
- if not exproot.startswith('/'):
- exproot = os.path.join(os.getcwd(), exproot)
-
- #TODO: THIS IS A GOOD IDEA RIGHT?
- # It makes module-lookup work based on cwd-relative paths
- # But possibly has really annoying side effects? Is there a cleaner
- # way to change the import path just for loading the experiment class?
- sys.path.insert(0, os.getcwd())
-
- #TODO: refactor this so that we can use any kind of database (not just postgres)
-
- #a serialized session is necessary for jobs not be double-booked by listeners running
- #in parallel on the cluster.
- db = postgres_serial(user=username,
- password=password,
- host=hostname,
- database=dbname,
- table_prefix=tablename)
-
- while True:
- dct = book_dct_postgres_serial(db, verbose=1)
- if dct is None:
- break
-
- try:
- #
- # chdir to a temp folder
- #
- workdir = tempfile.mkdtemp()
- print >> sys.stderr, "INFO RUNNING ID %i IN %s" % (dct.id, workdir)
- os.chdir(workdir)
-
- # not sure where else to put this...
- dct[HOST] = socket.gethostname()
- dct[HOST_WORKDIR] = workdir
-
- exploc = ExperimentLocation(exproot, dbname, tablename, str(dct.id))
-
- #
- # pull cwd contents
- #
- exploc.touch()
- exploc.pull()
-
- #
- # run the experiment
- #
- old_stdout, old_stderr = sys.stdout, sys.stderr
- sys.stdout, sys.stderr = open('stdout', 'a+'), open('stderr', 'a+')
- assert RUNNING == dct[STATUS] #set by get_dct
- os.chdir(os.path.join(workdir, 'workdir'))
- try:
- run_rval = run_state(DictProxyState(dct))
- except Exception, e:
- run_rval = COMPLETE
- print >> sys.stderr, 'Exception:', e
- print >> old_stderr, '#TODO: print a bigger traceback to stderr'
- sys.stdout, sys.stderr = old_stdout, old_stderr
-
- #
- # push the results back to the experiment_root
- #
- try:
- os.chdir(workdir)
- #pickle the state #TODO: write it human-readable
- #cPickle.dump(dict((k,v) for k,v in dct.items()), open('state.pickle','w'))
- save_items(dct.items(), open('state.py', 'w'))
- exploc.push()
- except Exception, e:
- dct[PUSH_ERROR] = str(e)
- raise
-
- # Cleanup the tempdir
- # TODO: put this in a 'finally' block?
- #
- shutil.rmtree(workdir, ignore_errors=True)
-
- except:
- dct[STATUS] = DONE
- dct[PRIORITY] = None
- raise
-
- if run_rval is INCOMPLETE:
- #mark the job as needing restarting
- dct[STATUS] = START
- dct[PRIORITY] = RESTART_PRIORITY
- else:
- #mark the job as being done
- dct[STATUS] = DONE
- dct[PRIORITY] = None
-
- break # don't actually loop
-
-
diff -r d3d8f5a17909 -r 2704c8688ced pylearn/dbdict/dconfig.py
--- a/pylearn/dbdict/dconfig.py Wed Feb 11 01:42:58 2009 -0500
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,145 +0,0 @@
-import sys
-#
-#
-# Utility
-#
-#
-
-class Config (object):
- """A class to store experiment configurations.
-
- Configuration variables are stored in class instance __dict__. This class
- ensures that keys are alphanumeric strings, and that values can be rebuilt
- from their representations.
-
- It can be serialized to/from a python file.
-
- """
- def __init__(self, __dict__=None, **kwargs):
- if __dict__:
- Config.__checkkeys__(*__dict__.keys())
- self.__dict__.update(__dict__)
- if kwargs:
- Config.__checkkeys__(*kwargs.keys())
- self.__dict__.update(kwargs)
-
- def __setattr__(self, attr, value):
- Config.__checkkeys__(attr)
- self.__dict__[attr] = value
-
- def __hash__(self):
- """Compute a hash string from a dict, based on items().
-
- @type dct: dict of hashable keys and values.
- @param dct: compute the hash of this dict
-
- @rtype: string
-
- """
- items = list(self.items())
- items.sort()
- return hash(repr(items))
-
- def keys(self):
- return self.__dict__.keys()
-
- def items(self):
- return self.__dict__.items()
-
- def update(self, dct):
- Config.__checkkeys__(*dct.keys())
- self.__dict__.update(dct)
-
- def save_file(self, f=sys.stdout):
- items = self.items()
- items.sort()
- save_dictlike(items, f)
-
- def save(self, filename):
- """Write a python file as a way of serializing a dictionary
-
- @type filename: string
- @param filename: filename to open (overwrite) to save dictionary contents
-
- @return None
-
- """
- f = open(filename, 'w')
- self.save_file(f)
- f.close()
-
- def update_fromfile(self, filename):
- """Read the local variables from a python file
-
- @type filename: string, filename suitable for __import__()
- @param filename: a file whose module variables will be returned as a
- dictionary
-
- @rtype: dict
- @return: the local variables of the imported filename
-
- @note:
- This implementation silently ignores all module symbols that don't meet
- the standards enforced by L{Config.__checkkeys__}. This is meant to
- ignore module special variables.
-
- """
- #m = __import__(filename) #major security problem, or feature?
- f = open(filename)
- for line in f:
- if line.startswith('#'):
- continue
- line = line[:-1] #trim the '\n'
- tokens = line.split(' ')
- try:
- key = tokens[0]
- assert '=' == tokens[1]
- except:
- raise Exception('trouble parsing line:', line)
- repr_val = ' '.join(tokens[2:])
- val = eval(repr_val)
- setattr(self, key, val)
-
- @staticmethod
- def __checkkeys__(*args):
- conf = Config()
- for k in args:
- #must be string
- if type(k) != str:
- raise KeyError(k)
- #mustn't be part of Config class interface
- if hasattr(conf, k):
- raise KeyError(k)
- #all alphanumeric
- for c in k:
- if c not in ('abcdefghijklmnopqrstuvwxyz'
- 'ABCDEFGHIJKLMNOPQRSTUVWXYZ'
- '_0123456789'):
- raise KeyError(k)
- #no symbols that look like reserved symbols
- if k[:2]=='__' and k[-2:]=='__' and len(k)>4:
- raise KeyError(k)
-
-def load(*filenames):
- rval = Config()
- for filename in filenames:
- rval.update_fromfile(filename)
- return rval
-
-def save_items(items, f):
- """Write a python file as a way of serializing a dictionary
-
- @type f: writable file-like object
- @param f: the attributes of this object will be written here
-
- @return None
-
- """
- print >> f, "#File generated by dbdict.dconfig.save_items"
- for key, val in items:
- Config.__checkkeys__(key)
- # should never raise... illegal keys are not
- # supposed to ever get into the dictionary
- repr_val = repr(val)
- assert val == eval(repr_val)
- print >> f, key, '=', repr_val
diff -r d3d8f5a17909 -r 2704c8688ced pylearn/dbdict/design.txt
--- a/pylearn/dbdict/design.txt Wed Feb 11 01:42:58 2009 -0500
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,316 +0,0 @@
-
-TODO
-====
-
-- Class interface for jobs (SHAPING UP)
-
-- How exactly will state work (STILL A MYSTERY sql vs. file vs. script)
- - How to insert jobs
- - How to retrieve jobs
-
-- How will the driver work (rsync / file conventions)
-
-
-
-Components
-==========
-
-DbDict
-------
-
-A DbDict is a restricted dictionary.
-
-It's keys are restricted to simple types, simple enough to be stored into a
-database. Strings, integers, floating-points.
-
-
-Experiment
-----------
-
-An Experiment is a python class with methods like start(), run(), stop(). A
-user should make an Experiment class whose run() method does some computations
-like training a neural network and measuring test errors, training an SVM,
-doing some feature extraction, etc.
-
-An Experiment's arguments are stored in a DbDict.
-
-The results of an experiment can be stored in two places: in a DbDict (for
-easy SQL searching later on) and in files.
-
-
-ExperimentLauncher
-------------------
-
-Experiments are not standalone executables, they need to be run by some
-additional code. There are several ways to launch an experiment:
-
-- running on your local machine vs. cluster vs. condor vs. mammouth
-
-- loading the arguments from the commandline vs. file vs. SQL query
-
-- running alone, or running in parallel with other experiments.
-
-The ExperimentLauncher is an executable python program that can run
-experiments in these different ways.
-
-
-
-Workflow (How to run experiments)
-=================================
-
-First, you'll need a Python program to compute what you want. Think about how
-long it will take and whether you will want to be able to resume after being
-stopped.
-
-If you want to run a simple or short experiment that is not restartable,
-simply extend dbdict.Experiment like this:
-
-.. code-block:: python
-
- class MyExperiment(dbdict.Experiment):
-
- def __init__(self, dct):
- self.dct = dct
-
- def run(self):
-
- # compute something
- result = self.dct.a * self.dct.b
-
- # store it
- self.dct.result = result
-
- # write some other results to a file
- f = open('some_file', 'w')
- print >> f, "hello from the job?"
-
- # return False or None to indicate the job is done
-
-If you want to run a more elaborate experiment, then extend dbdict.Experiment
-like this:
-
-.. code-block:: python
-
- class MyRestartableExperiment(dbdict.Experiment):
-
- def __init__(self, state):
- """Called once per lifetime of the class instance. Can be used to
- create new jobs and save them to the database. This function will not
- be called when a Job is retrieved from the database.
-
- Parent creates keys: dbdict_id, dbdict_module, dbdict_symbol, dbdict_state.
-
- dbdict_state = NEW (possibilities: NEW, RUNNING, DONE)
-
- """
-
- def start(self):
- """Called once per lifetime of the compute job.
-
- This is a good place to initialize internal variables.
-
- After this function returns, either stop() or run() will be called.
-
- dbdict_state -> RUNNING
-
- """
-
- def resume(self):
- """Called to resume computations on a previously stop()'ed job. The
- os.getcwd() is just like it was after some previous stop() command.
-
- This is a good place to load internal variables from os.getcwd().
-
- dbdict_state -> RUNNING
-
- """
-
- def run(self, channel):
- """Called after start() or resume().
-
- channel() may return different things at different times.
- None - run should continue.
- 'stop' - the job should save state as soon as possible because
- the process may soon be terminated
-
- When this function returns, dbdict_state -> DONE.
- """
-
-
-Having written your Experiment subclass, you can now test it directly, or
-using the ExperimentLauncher.
-
-.. code-block:: python
-
- if __name__ == '__main__':
-
- class Config(object):
- a = 3
- b = 50
-
- experiment = MyExperiment(Config())
- experiment.run()
-
-.. code-block:: bash
-
- dbdict-run my_file.MyExperiment 'dict(a=3, b=50)'
-
-
-To run a batch of experiments on a cluster using 3 concurrent processes using
-the SQL-based job list, you have to insert the jobs into a database table, and
-then launch 10 instances of dbdict-run.
-
-.. code-block:: python
-
- from dbdict.tools import insert, view_table
-
- def add_jobs():
- experiment_module = 'my_repo.some_file' #match you module name
- experiment_symbol = 'MyExperiment' #match your class name
- experiment_name = 'showing some stuff'
- for a in 3, 4, 5:
- for b in 0.0, 0.1, 3.0:
- insert(locals(), ignore_duplicates=False)
- add_jobs()
-
- dbdict_run_table('my_launcher_view', experiment_name='showing_some_stuff')
-
-
-
-.. code-block:: bash
-
- dbidispatch --condor dbdict-run sql my_launcher_view
-
-
-
-Component Documentation
-=======================
-
-DbDict
-------
-
-
-Experiment
-----------
-
-class Experiment(object):
-
- new_stdout == 'job_stdout'
- new_stderr == 'job_stderr'
-
- def remap_stdout(self):
- """
- Called before start and resume.
-
- Parent-class behaviour is to replace sys.stdout with open(self.new_stdout, 'w+').
-
- """
-
- def remap_stderr(self):
- """
- Called before start and resume.
-
- """
-
- def tempdir(self):
- """
- Return the recommended filesystem location for temporary files.
-
- The idea is that this will be a fast, local disk partition, suitable
- for temporary storage.
-
- Files here will not generally be available at the time of resume().
-
- The return value of this function may be controlled by one or more
- environment variables.
-
- Will return $DBDICT_EXPERIMENT_TEMPDIR if present.
- Failing that, will return $TMP/username-dbdict/hash(self)
- Failing that, will return /tmp/username-dbdict/hash(self)
-
- .. note::
- Maybe we should use Python stdlib's tempdir mechanism.
-
- """
-
-
-Environment Variables
-=====================
-
-
-TMP - sometimes used by Experiment.tempdir()
-
-DBDICT_EXPERIMENT_TEMPDIR - used by Experiment.tempdir()
-
-
-JOB DRIVER
-----------
-
-When a job is started, save:
-
-- workdir
-
-- dir
-
-- network (e.g. mammouth vs. diro... where are jobdir and workdir?
- Alternatively, automatically rsync finished jobs' workdir and jobdir to diro.)
-
-
-dbdict-run-job
---------------
-
-prepare job dir
-~~~~~~~~~~~~~~~~
-
-On iro, cd to fringant2/.../job_id/
-
-when restarting on mammouth, cd to somewhere and rsync -r fringant2/.../job_id job_id
-
-after stopping on mammouth rsync -r job_id fringant2/.../job_id
-
-.. note:
- What to do on error in rsync? Where to report problems?
-
-.. note:
- What to do with stderr, stdout?
-
-job queue table
-~~~~~~~~~~~~~~~
-New table: id job_id priority
-
-This is the table that dbdict-run-job will pop jobs from.
-
-
-How to run experiments using DbDict
------------------------------------
-
-- Inherit from tools.Job. This is where you put the logic that takes some
- parameters, and computes some results associated with those parameters.
-
-- Write a loop that creates a bunch of Jobs, and then tests to see if they are
- already in the db before inserting them.
-
-- Insert the new job ids and priority into the qu
-- Save result files, and files needed for job resume to os.getcwd().
-
-- Save non-result files to a TEMP directory, obtained by e.g. self.gettmpdir()
- from inside a Job instance.
-
-- Load datasets through pylearn. Pylearn has internal configuration that allows
- it to find files both on mammouth and DIRO networks. You just call
- MNIST.head() or shapeset1.train_valid_test() and bingo.
-
-- Manage experiments in a postgres database. Which ones have been queued, which
- ones are running, which are finished, simple results, etc.
-
-- To analyze results, select from PG and create a dedicated table in sqlite.
- Make columns for all the dictionary keys from all the selected entries.
-
-
-
-When running classification experiments
----------------------------------------
-
-Have convention of saving (train, valid, test) X (NLL, RATE).
-Should the convention be enforced? How?
-
diff -r d3d8f5a17909 -r 2704c8688ced pylearn/dbdict/experiment.py
--- a/pylearn/dbdict/experiment.py Wed Feb 11 01:42:58 2009 -0500
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,122 +0,0 @@
-"""Helper code for implementing dbdict-compatible jobs"""
-import inspect, sys, copy
-
-#State values should be instances of these types:
-INT = type(0)
-FLT = type(0.0)
-STR = type('')
-
-COMPLETE = None #jobs can return this by returning nothing as well
-INCOMPLETE = True #jobs can return this and be restarted
-
-def subdict(dct, prefix):
- """Return the dictionary formed by keys in `dct` that start with the string `prefix`.
-
- In the returned dictionary, the `prefix` is removed from the keynames.
- Updates to the sub-dict are reflected in the original dictionary.
-
- Example:
- a = {'aa':0, 'ab':1, 'bb':2}
- s = subdict(a, 'a') # returns dict-like object with keyvals {'a':0, 'b':1}
- s['a'] = 5
- s['c'] = 9
- # a == {'aa':5, 'ab':1, 'ac':9, 'bb':2}
-
- """
- class SubDict(object):
- def __copy__(s):
- rval = {}
- rval.update(s)
- return rval
- def __eq__(s, other):
- if len(s) != len(other):
- return False
- for k in other:
- if other[k] != s[k]:
- return False
- return True
- def __len__(s):
- return len(s.items())
- def __str__(s):
- d = {}
- d.update(s)
- return str(d)
- def keys(s):
- return [k[len(prefix):] for k in dct if k.startswith(prefix)]
- def values(s):
- return [dct[k] for k in dct if k.startswith(prefix)]
- def items(s):
- return [(k[len(prefix):],dct[k]) for k in dct if k.startswith(prefix)]
- def update(s, other):
- for k,v in other.items():
- self[k] = v
- def __getitem__(s, a):
- return dct[prefix+a]
- def __setitem__(s, a, v):
- dct[prefix+a] = v
-
- return SubDict()
-
-def subdict_copy(dct, prefix):
- return copy.copy(subdict(dct, prefix))
-
-def call_with_kwargs_from_dict(fn, dct, logfile='stderr'):
- """Call function `fn` with kwargs taken from dct.
-
- When fn has a '**' parameter, this function is equivalent to fn(**dct).
-
- When fn has no '**' parameter, this function removes keys from dct which are not parameter
- names of `fn`. The keys which are ignored in this way are logged to the `logfile`. If
- logfile is the string 'stdout' or 'stderr', then errors are logged to sys.stdout or
- sys.stderr respectively.
-
- The reason this function exists is to make it easier to provide default arguments and
-
- :param fn: function to call
- :param dct: dictionary from which to take arguments of fn
- :param logfile: log ignored keys to this file
- :type logfile: file-like object or string 'stdout' or string 'stderr'
-
- :returns: fn(**)
-
- """
- argspec = inspect.getargspec(fn)
- argnames = argspec[0]
- if argspec[2] == None: #if there is no room for a **args type-thing in fn...
- kwargs = {}
- for k,v in dct.items():
- if k in argnames:
- kwargs[k] = v
- else:
- if not logfile:
- pass
- elif logfile == 'stderr':
- print >> sys.stderr, "WARNING: DictProxyState.call_substate ignoring key-value pair:", k, v
- elif logfile == 'stdout':
- print >> sys.stdout, "WARNING: DictProxyState.call_substate ignoring key-value pair:", k, v
- else:
- print >> logfile, "WARNING: DictProxyState.call_substate ignoring key-value pair:", k, v
- return fn(**kwargs)
- else:
- #there is a **args type thing in fn. Here we pass everything.
- return fn(**dct)
-
-#MAKE YOUR OWN DBDICT-COMPATIBLE EXPERIMENTS IN THIS MODEL
-def sample_experiment(state, channel):
-
- #read from the state to obtain parameters, configuration, etc.
- print >> sys.stdout, state.items()
-
- import time
- for i in xrange(100):
- time.sleep(1)
- # use the channel to know if the job should stop ASAP
- if channel() == 'stop':
- break
-
- # modify state to record results
- state['answer'] = 42
-
- #return either INCOMPLETE or COMPLETE to indicate that the job should be re-run or not.
- return COMPLETE
-
diff -r d3d8f5a17909 -r 2704c8688ced pylearn/dbdict/newstuff.py
--- a/pylearn/dbdict/newstuff.py Wed Feb 11 01:42:58 2009 -0500
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,1013 +0,0 @@
-
-from __future__ import with_statement
-
-from collections import defaultdict
-import re, sys, inspect, os, signal, tempfile, shutil, socket
-import traceback
-
-import sql
-
-
-################################################################################
-### misc
-################################################################################
-
-class DD(dict):
- def __getattr__(self, attr):
- return self[attr]
- def __setattr__(self, attr, value):
- self[attr] = value
- def __str__(self):
- return 'DD%s' % dict(self)
- def __repr__(self):
- return str(self)
-
-################################################################################
-### resolve
-################################################################################
-
-def resolve(name, try_import=True):
- """
- Resolve a string of the form X.Y...Z to a python object by repeatedly using getattr, and
- __import__ to introspect objects (in this case X, then Y, etc. until finally Z is loaded).
-
- """
- symbols = name.split('.')
- builder = __import__(symbols[0])
- try:
- for sym in symbols[1:]:
- try:
- builder = getattr(builder, sym)
- except AttributeError, e:
- if try_import:
- __import__(builder.__name__, fromlist=[sym])
- builder = getattr(builder, sym)
- else:
- raise e
- except (AttributeError, ImportError), e:
- raise type(e)('Failed to resolve compound symbol %s' % name, e)
- return builder
-
-################################################################################
-### dictionary
-################################################################################
-
-def convert(obj):
- try:
- return eval(obj, {}, {})
- except (NameError, SyntaxError):
- return obj
-
-def flatten(obj):
- """nested dictionary -> flat dictionary with '.' notation """
- d = {}
- def helper(d, prefix, obj):
- if isinstance(obj, (str, int, float, list, tuple)):
- d[prefix] = obj #convert(obj)
- else:
- if isinstance(obj, dict):
- subd = obj
- else:
- subd = obj.state()
- subd['__builder__'] = '%s.%s' % (obj.__module__, obj.__class__.__name__)
- for k, v in subd.iteritems():
- pfx = '.'.join([prefix, k]) if prefix else k
- helper(d, pfx, v)
- helper(d, '', obj)
- return d
-
-def expand(d):
- """inverse of flatten()"""
- #def dd():
- #return DD(dd)
- struct = DD()
- for k, v in d.iteritems():
- if k == '':
- raise NotImplementedError()
- else:
- keys = k.split('.')
- current = struct
- for k2 in keys[:-1]:
- current = current.setdefault(k2, DD())
- current[keys[-1]] = v #convert(v)
- return struct
-
-def realize(d):
- if not isinstance(d, dict):
- return d
- d = dict((k, realize(v)) for k, v in d.iteritems())
- if '__builder__' in d:
- builder = resolve(d.pop('__builder__'))
- return builder(**d)
- return d
-
-def make(d):
- return realize(expand(d))
-
-################################################################################
-### errors
-################################################################################
-
-class UsageError(Exception):
- pass
-
-################################################################################
-### parsing and formatting
-################################################################################
-
-def parse(*strings):
- d = {}
- for string in strings:
- s1 = re.split(' *= *', string, 1)
- s2 = re.split(' *:: *', string, 1)
- if len(s1) == 1 and len(s2) == 1:
- raise UsageError('Expected a keyword argument in place of "%s"' % s1[0])
- elif len(s1) == 2:
- k, v = s1
- v = convert(v)
- elif len(s2) == 2:
- k, v = s2
- k += '.__builder__'
- d[k] = v
- return d
-
-def format_d(d, sep = '\n', space = True):
- d = flatten(d)
- pattern = "%s = %r" if space else "%s=%r"
- return sep.join(pattern % (k, v) for k, v in d.iteritems())
-
-def format_help(topic):
- if topic is None:
- return 'No help.'
- elif isinstance(topic, str):
- help = topic
- elif hasattr(topic, 'help'):
- help = topic.help()
- else:
- help = topic.__doc__
- if not help:
- return 'No help.'
-
- ss = map(str.rstrip, help.split('\n'))
- try:
- baseline = min([len(line) - len(line.lstrip()) for line in ss if line])
- except:
- return 'No help.'
- s = '\n'.join([line[baseline:] for line in ss])
- s = re.sub(string = s, pattern = '\n{2,}', repl = '\n\n')
- s = re.sub(string = s, pattern = '(^\n*)|(\n*$)', repl = '')
-
- return s
-
-################################################################################
-### single channels
-################################################################################
-
-# try:
-# import greenlet
-# except:
-# try:
-# from py import greenlet
-# except:
-# print >>sys.stderr, 'the greenlet module is unavailable'
-# greenlet = None
-
-
-class Channel(object):
-
- COMPLETE = property(lambda s:None,
- doc=("Experiments should return this value to "
- "indicate that they are done (if not done, return `Incomplete`"))
- INCOMPLETE = property(lambda s:True,
- doc=("Experiments should return this value to indicate that "
- "they are not done (if done return `COMPLETE`)"))
-
- START = property(lambda s: 0,
- doc="dbdict.status == START means a experiment is ready to run")
- RUNNING = property(lambda s: 1,
- doc="dbdict.status == RUNNING means a experiment is running on dbdict_hostname")
- DONE = property(lambda s: 2,
- doc="dbdict.status == DONE means a experiment has completed (not necessarily successfully)")
-
- # Methods to be used by the experiment to communicate with the channel
-
- def save(self):
- """
- Save the experiment's state to the various media supported by
- the Channel.
- """
- raise NotImplementedError()
-
- def switch(self, message = None):
- """
- Called from the experiment to give the control back to the channel.
- The following return values are meaningful:
- * 'stop' -> the experiment must stop as soon as possible. It may save what
- it needs to save. This occurs when SIGTERM or SIGINT are sent (or in
- user-defined circumstances).
- switch() may give the control to the user. In this case, the user may
- resume the experiment by calling switch() again. If an argument is given
- by the user, it will be relayed to the experiment.
- """
- pass
-
- def __call__(self, message = None):
- return self.switch(message)
-
- def save_and_switch(self):
- self.save()
- self.switch()
-
- # Methods to run the experiment
-
- def setup(self):
- pass
-
- def __enter__(self):
- pass
-
- def __exit__(self):
- pass
-
- def run(self):
- pass
-
-
-
-class JobError(Exception):
- RUNNING = 0
- DONE = 1
- NOJOB = 2
-
-
-class SingleChannel(Channel):
-
- def __init__(self, experiment, state):
- self.experiment = experiment
- self.state = state
- self.feedback = None
-
- #TODO: make this a property and disallow changing it during a with block
- self.catch_sigterm = True
- self.catch_sigint = True
-
- def switch(self, message = None):
- feedback = self.feedback
- self.feedback = None
- return feedback
-
- def run(self, force = False):
- self.setup()
-
- status = self.state.dbdict.get('status', self.START)
- if status is self.DONE and not force:
- # If you want to disregard this, use the --force flag (not yet implemented)
- raise JobError(JobError.RUNNING,
- 'The job has already completed.')
- elif status is self.RUNNING and not force:
- raise JobError(JobError.DONE,
- 'The job is already running.')
- self.state.dbdict.status = self.RUNNING
-
- v = self.COMPLETE
- with self: #calls __enter__ and then __exit__
- try:
- v = self.experiment(self.state, self)
- finally:
- self.state.dbdict.status = self.DONE if v is self.COMPLETE else self.START
-
- return v
-
- def on_sigterm(self, signo, frame):
- # SIGTERM handler. It is the experiment function's responsibility to
- # call switch() often enough to get this feedback.
- self.feedback = 'stop'
-
- def __enter__(self):
- # install a SIGTERM handler that asks the experiment function to return
- # the next time it will call switch()
- if self.catch_sigterm:
- self.prev_sigterm = signal.getsignal(signal.SIGTERM)
- signal.signal(signal.SIGTERM, self.on_sigterm)
- if self.catch_sigint:
- self.prev_sigint = signal.getsignal(signal.SIGINT)
- signal.signal(signal.SIGINT, self.on_sigterm)
- return self
-
- def __exit__(self, type, value, tb_traceback, save = True):
- if type:
- try:
- raise type, value, tb_traceback
- except:
- traceback.print_exc()
- if self.catch_sigterm:
- signal.signal(signal.SIGTERM, self.prev_sigterm)
- self.prev_sigterm = None
- if self.catch_sigint:
- signal.signal(signal.SIGINT, self.prev_sigint)
- self.prev_sigint = None
- if save:
- self.save()
- return True
-
-
-class StandardChannel(SingleChannel):
-
- def __init__(self, path, experiment, state, redirect_stdout = False, redirect_stderr = False):
- super(StandardChannel, self).__init__(experiment, state)
- self.path = os.path.realpath(path)
- self.redirect_stdout = redirect_stdout
- self.redirect_stderr = redirect_stderr
-
- def save(self):
- with open(os.path.join(self.path, 'current.conf'), 'w') as current:
- current.write(format_d(self.state))
- current.write('\n')
-
- def __enter__(self):
- self.old_cwd = os.getcwd()
- os.chdir(self.path)
- if self.redirect_stdout:
- self.old_stdout = sys.stdout
- sys.stdout = open('stdout', 'a')
- if self.redirect_stderr:
- self.old_stderr = sys.stderr
- sys.stderr = open('stderr', 'a')
- return super(StandardChannel, self).__enter__()
-
- def __exit__(self, type, value, traceback):
- rval = super(StandardChannel, self).__exit__(type, value, traceback, save = False)
- if self.redirect_stdout:
- sys.stdout.close()
- sys.stdout = self.old_stdout
- if self.redirect_stderr:
- sys.stderr.close()
- sys.stderr = self.old_stderr
- os.chdir(self.old_cwd)
- self.save()
- return rval
-
- def setup(self):
- if not os.path.isdir(self.path):
- os.makedirs(self.path)
- with self:
- origf = os.path.join(self.path, 'orig.conf')
- if not os.path.isfile(origf):
- with open(origf, 'w') as orig:
- orig.write(format_d(self.state))
- orig.write('\n')
- currentf = os.path.join(self.path, 'current.conf')
- if os.path.isfile(currentf):
- with open(currentf, 'r') as current:
- self.state = expand(parse(*map(str.strip, current.readlines())))
-
-
-class RSyncException(Exception):
- pass
-
-class RSyncChannel(StandardChannel):
-
- def __init__(self, path, remote_path, experiment, state, redirect_stdout = False, redirect_stderr = False):
- super(RSyncChannel, self).__init__(path, experiment, state, redirect_stdout, redirect_stderr)
-
- ssh_prefix='ssh://'
- if remote_path.startswith(ssh_prefix):
- remote_path = remote_path[len(ssh_prefix):]
- colon_pos = remote_path.find(':')
- self.host = remote_path[:colon_pos]
- self.remote_path = remote_path[colon_pos+1:]
- else:
- self.host = ''
- self.remote_path = os.path.realpath(remote_path)
-
- def rsync(self, direction):
- """The directory at which experiment-related files are stored.
- """
-
- path = self.path
-
- remote_path = self.remote_path
- if self.host:
- remote_path = ':'.join([self.host, remote_path])
-
- # TODO: use something more portable than os.system
- if direction == 'push':
- rsync_cmd = 'rsync -ac "%s/" "%s/"' % (path, remote_path)
- elif direction == 'pull':
- rsync_cmd = 'rsync -ac "%s/" "%s/"' % (remote_path, path)
- else:
- raise RSyncException('invalid direction', direction)
-
- rsync_rval = os.system(rsync_cmd)
- if rsync_rval != 0:
- raise RSyncException('rsync failure', (rsync_rval, rsync_cmd))
-
- def touch(self):
- if self.host:
- host = self.host
- touch_cmd = ('ssh %(host)s "mkdir -p \'%(path)s\'"' % dict(host = self.host,
- path = self.remote_path))
- else:
- touch_cmd = ("mkdir -p '%(path)s'" % dict(path = self.remote_path))
- # print "ECHO", touch_cmd
- touch_rval = os.system(touch_cmd)
- if 0 != touch_rval:
- raise Exception('touch failure', (touch_rval, touch_cmd))
-
- def pull(self):
- return self.rsync('pull')
-
- def push(self):
- return self.rsync('push')
-
- def save(self):
- super(RSyncChannel, self).save()
- self.push()
-
- def setup(self):
- self.touch()
- self.pull()
- super(RSyncChannel, self).setup()
-
-
-class DBRSyncChannel(RSyncChannel):
-
- RESTART_PRIORITY = 2.0
-
- def __init__(self, username, password, hostname, dbname, tablename, path, remote_root, redirect_stdout = False, redirect_stderr = False):
- self.username, self.password, self.hostname, self.dbname, self.tablename \
- = username, password, hostname, dbname, tablename
-
- self.db = sql.postgres_serial(
- user = self.username,
- password = self.password,
- host = self.hostname,
- database = self.dbname,
- table_prefix = self.tablename)
-
- self.dbstate = sql.book_dct_postgres_serial(self.db)
- if self.dbstate is None:
- raise JobError(JobError.NOJOB,
- 'No job was found to run.')
-
- try:
- state = expand(self.dbstate)
- experiment = resolve(state.dbdict.experiment)
- remote_path = os.path.join(remote_root, self.dbname, self.tablename, str(self.dbstate.id))
- super(DBRSyncChannel, self).__init__(path, remote_path, experiment, state, redirect_stdout, redirect_stderr)
- except:
- self.dbstate['dbdict.status'] = self.DONE
- raise
-
- def save(self):
- super(DBRSyncChannel, self).save()
- self.dbstate.update(flatten(self.state))
-
- def setup(self):
- # Extract a single experiment from the table that is not already running.
- # set self.experiment and self.state
- super(DBRSyncChannel, self).setup()
- self.state.dbdict.sql.host_name = socket.gethostname()
- self.state.dbdict.sql.host_workdir = self.path
- self.dbstate.update(flatten(self.state))
-
- def run(self):
- # We pass the force flag as True because the status flag is
- # already set to RUNNING by book_dct in __init__
- v = super(DBRSyncChannel, self).run(force = True)
- if v is self.INCOMPLETE and self.state.dbdict.sql.priority != self.RESTART_PRIORITY:
- self.state.dbdict.sql.priority = self.RESTART_PRIORITY
- self.save()
- return v
-
-
-
-################################################################################
-### running
-################################################################################
-
-import optparse
-OptionParser = optparse.OptionParser
-# class OptionParser(optparse.OptionParser):
-# def error(self, message):
-# pass
-
-def parse_and_run(command, arguments):
- parser, runner = runner_registry.get(command, (None, None))
- if not runner:
- raise UsageError('Unknown runner: "%s"' % command)
- if parser:
- options, arguments = parser.parse_args(arguments)
- else:
- options = optparse.Values()
- run(runner, [options] + arguments)
-
-def run(runner, arguments):
- argspec = inspect.getargspec(runner)
- minargs = len(argspec[0])-(len(argspec[3]) if argspec[3] else 0)
- maxargs = len(argspec[0])
- if minargs > len(arguments) or maxargs < len(arguments) and not argspec[1]:
- s = format_help(runner)
- raise UsageError(s)
- runner(*arguments)
-
-runner_registry = dict()
-
-
-parser_cmdline = OptionParser(usage = '%prog cmdline [options] ')
-parser_cmdline.add_option('-f', '--force', action = 'store_true', dest = 'force', default = False,
- help = 'force running the experiment even if it is already running or completed')
-parser_cmdline.add_option('--redirect-stdout', action = 'store_true', dest = 'redirect_stdout', default = False,
- help = 'redirect stdout to the workdir/stdout file')
-parser_cmdline.add_option('--redirect-stderr', action = 'store_true', dest = 'redirect_stderr', default = False,
- help = 'redirect stderr to the workdir/stdout file')
-parser_cmdline.add_option('-r', '--redirect', action = 'store_true', dest = 'redirect', default = False,
- help = 'redirect stdout and stderr to the workdir/stdout and workdir/stderr files')
-parser_cmdline.add_option('-w', '--workdir', action = 'store', dest = 'workdir', default = None,
- help = 'the working directory in which to run the experiment')
-parser_cmdline.add_option('-n', '--dry-run', action = 'store_true', dest = 'dry_run', default = False,
- help = 'use this option to run the whole experiment in a temporary working directory (cleaned after use)')
-parser_cmdline.add_option('-2', '--sigint', action = 'store_true', dest = 'allow_sigint', default = False,
- help = 'allow sigint (CTRL-C) to interrupt a process')
-
-def runner_cmdline(options, experiment, *strings):
- """
- Start an experiment with parameters given on the command line.
-
- Usage: cmdline [options]
-
- Run an experiment with parameters provided on the command
- line. See the help topics for experiment and parameters for
- syntax information.
-
- Example use:
- dbdict-run cmdline mymodule.my_experiment \\
- stopper::pylearn.stopper.nsteps \\ # use pylearn.stopper.nsteps
- stopper.n=10000 \\ # the argument "n" of nsteps is 10000
- lr=0.03
- """
- state = expand(parse(*strings))
- state.setdefault('dbdict', DD()).experiment = experiment
- experiment = resolve(experiment)
- if options.workdir and options.dry_run:
- raise UsageError('Please use only one of: --workdir, --dry-run.')
- if options.workdir:
- workdir = options.workdir
- elif options.dry_run:
- workdir = tempfile.mkdtemp()
- else:
- workdir = format_d(state, sep=',', space = False)
- channel = StandardChannel(workdir,
- experiment, state,
- redirect_stdout = options.redirect or options.redirect_stdout,
- redirect_stderr = options.redirect or options.redirect_stderr)
- channel.catch_sigint = not options.allow_sigint
- channel.run(force = options.force)
- if options.dry_run:
- shutil.rmtree(workdir, ignore_errors=True)
-
-runner_registry['cmdline'] = (parser_cmdline, runner_cmdline)
-
-
-
-
-parser_filemerge = OptionParser(usage = '%prog filemerge [options] ...')
-parser_filemerge.add_option('-f', '--force', action = 'store_true', dest = 'force', default = False,
- help = 'force running the experiment even if it is already running or completed')
-parser_filemerge.add_option('--redirect-stdout', action = 'store_true', dest = 'redirect_stdout', default = False,
- help = 'redirect stdout to the workdir/stdout file')
-parser_filemerge.add_option('--redirect-stderr', action = 'store_true', dest = 'redirect_stderr', default = False,
- help = 'redirect stderr to the workdir/stdout file')
-parser_filemerge.add_option('-r', '--redirect', action = 'store_true', dest = 'redirect', default = False,
- help = 'redirect stdout and stderr to the workdir/stdout and workdir/stderr files')
-parser_filemerge.add_option('-w', '--workdir', action = 'store', dest = 'workdir', default = None,
- help = 'the working directory in which to run the experiment')
-parser_filemerge.add_option('-n', '--dry-run', action = 'store_true', dest = 'dry_run', default = False,
- help = 'use this option to run the whole experiment in a temporary working directory (cleaned after use)')
-
-def runner_filemerge(options, experiment, mainfile, *other_files):
- """
- Start an experiment with parameters given in files.
-
- Usage: filemerge [options] ...
-
- Run an experiment with parameters provided in plain text files.
- A single experiment will be run with the union of all the
- parameters listed in the files.
-
- Example:
-
- text.first = "hello"
- text.second = "world"
-
-
- number = 12
- numbers.a = 55
- numbers.b = 56
-
- Given these files, the following command using filemerge:
- $ dbdict-run filemerge mymodule.my_experiment blah1.txt blah2.txt
-
- is equivalent to this one using cmdline:
- $ dbdict-run cmdline mymodule.my_experiment \\
- text.first=hello text.second=world \\
- number=12 numbers.a=55 numbers.b=56
- """
- with open(mainfile) as f:
- _state = parse(*map(str.strip, f.readlines()))
- for file in other_files:
- if '=' in file:
- _state.update(parse(file))
- else:
- with open(file) as f:
- _state.update(parse(*map(str.strip, f.readlines())))
- state = expand(_state)
- state.setdefault('dbdict', DD()).experiment = experiment
- experiment = resolve(experiment)
- if options.workdir and options.dry_run:
- raise UsageError('Please use only one of: --workdir, --dry-run.')
- if options.workdir:
- workdir = options.workdir
- elif options.dry_run:
- workdir = tempfile.mkdtemp()
- else:
- workdir = format_d(state, sep=',', space = False)
- channel = StandardChannel(workdir,
- experiment, state,
- redirect_stdout = options.redirect or options.redirect_stdout,
- redirect_stderr = options.redirect or options.redirect_stderr)
- channel.run(force = options.force)
- if options.dry_run:
- shutil.rmtree(workdir, ignore_errors=True)
-
-runner_registry['filemerge'] = (parser_filemerge, runner_filemerge)
-
-
-
-
-parser_sqlschedule = OptionParser(usage = '%prog sqlschedule [options] ')
-parser_sqlschedule.add_option('-f', '--force', action = 'store_true', dest = 'force', default = False,
- help = 'force adding the experiment to the database even if it is already there')
-
-def runner_sqlschedule(options, dbdescr, experiment, *strings):
- """
- Schedule a job to run using the sql command.
-
- Usage: sqlschedule
-
- See the experiment and parameters topics for more information about
- these parameters.
-
- Assuming that a postgres database is running on `host`, contains a
- database called `dbname` and that `user` has the permissions to
- create, read and modify tables on that database, tablepath should
- be of the following form:
- postgres://user:pass@host/dbname/tablename
-
- If no table is named `tablename`, one will be created
- automatically. The state corresponding to the experiment and
- parameters specified in the command will be saved in the database,
- but no experiment will be run.
-
- To run an experiment scheduled using sqlschedule, see the sql
- command.
-
- Example use:
- dbdict-run sqlschedule postgres://user:pass@host/dbname/tablename \\
- mymodule.my_experiment \\
- stopper::pylearn.stopper.nsteps \\ # use pylearn.stopper.nsteps
- stopper.n=10000 \\ # the argument "n" of nsteps is 10000
- lr=0.03
- """
-
- try:
- username, password, hostname, dbname, tablename \
- = sql.parse_dbstring(dbdescr)
- except:
- raise UsageError('Wrong syntax for dbdescr')
-
- db = sql.postgres_serial(
- user = username,
- password = password,
- host = hostname,
- database = dbname,
- table_prefix = tablename)
-
- state = parse(*strings)
- resolve(experiment) # we try to load the function associated to the experiment
- state['dbdict.experiment'] = experiment
- sql.add_experiments_to_db([state], db, verbose = 1, add_dups = options.force)
-
-runner_registry['sqlschedule'] = (parser_sqlschedule, runner_sqlschedule)
-
-
-
-parser_sqlschedule_filemerge = OptionParser(usage = '%prog sqlschedule_filemerge [options] ')
-parser_sqlschedule_filemerge.add_option('-f', '--force', action = 'store_true', dest = 'force', default = False,
- help = 'force adding the experiment to the database even if it is already there')
-
-def runner_sqlschedule_filemerge(options, dbdescr, experiment, mainfile, *other_files):
- """
- Schedule a job to run using the sql command using parameter files.
-
- This command is to sqlschedule what the filemerge command is to
- cmdline.
- """
-
- try:
- username, password, hostname, dbname, tablename \
- = sql.parse_dbstring(dbdescr)
- except:
- raise UsageError('Wrong syntax for dbdescr')
-
- db = sql.postgres_serial(
- user = username,
- password = password,
- host = hostname,
- database = dbname,
- table_prefix = tablename)
-
- with open(mainfile) as f:
- _state = parse(*map(str.strip, f.readlines()))
- for file in other_files:
- if '=' in file:
- _state.update(parse(file))
- else:
- with open(file) as f:
- _state.update(parse(*map(str.strip, f.readlines())))
- state = _state
-
- resolve(experiment) # we try to load the function associated to the experiment
- state['dbdict.experiment'] = experiment
- sql.add_experiments_to_db([state], db, verbose = 1, add_dups = options.force)
-
-runner_registry['sqlschedule_filemerge'] = (parser_sqlschedule_filemerge, runner_sqlschedule_filemerge)
-
-
-
-
-parser_sql = OptionParser(usage = '%prog sql [options] ')
-parser_sql.add_option('-n', dest = 'n', type = 'int', default = 1,
- help = 'Run N experiments sequentially (default 1) '
- '(if N is <= 0, runs as many experiments as possible).')
-
-def runner_sql(options, dbdescr, exproot):
- """
- Run jobs from a sql table.
-
- Usage: sql
-
- The jobs should be scheduled first with the sqlschedule command.
-
- Assuming that a postgres database is running on `host`, contains a
- database called `dbname` and that `user` has the permissions to
- create, read and modify tables on that database, tablepath should
- be of the following form:
- postgres://user:pass@host/dbname/tablename
-
- exproot can be a local path or a remote path. Examples of exproots:
- /some/local/path
- ssh://some_host:/some/remote/path # relative to the filesystem root
- ssh://some_host:other/remote/path # relative to the HOME on some_host
-
- The exproot will contain a subdirectory hierarchy corresponding to
- the dbname, tablename and job id which is a unique integer.
-
- The sql runner will pick any job in the table which is not running
- and is not done and will terminate when that job ends. You may call
- the same command multiple times, sequentially or in parallel, to
- run as many unfinished jobs as have been scheduled in that table
- with sqlschedule.
-
- Example use:
- dbdict-run sql \\
- postgres://user:pass@host/dbname/tablename \\
- ssh://central_host:myexperiments
- """
- try:
- username, password, hostname, dbname, tablename \
- = sql.parse_dbstring(dbdescr)
- except:
- raise UsageError('Wrong syntax for dbdescr')
-
- n = options.n if options.n else -1
- nrun = 0
- try:
- while n != 0:
- workdir = tempfile.mkdtemp()
- #print 'wdir', workdir
- channel = DBRSyncChannel(username, password, hostname, dbname, tablename,
- workdir,
- exproot,
- redirect_stdout = True,
- redirect_stderr = True)
- channel.run()
- shutil.rmtree(workdir, ignore_errors=True)
- n -= 1
- nrun += 1
- except JobError, e:
- if e.args[0] == JobError.NOJOB:
- print 'No more jobs to run (run %i jobs)' % nrun
-
-runner_registry['sql'] = (parser_sql, runner_sql)
-
-
-
-
-
-def runner_help(options, topic = None):
- """
- Get help for a topic.
-
- Usage: help
- """
- def bold(x):
- return '\033[1m%s\033[0m' % x
- if topic is None:
- print bold('Topics: (use help for more info)')
- print 'example Example of defining and running an experiment.'
- print 'experiment How to define an experiment.'
- print 'parameters How to list the parameters for an experiment.'
- print
- print bold('Available commands: (use help for more info)')
- for name, (parser, command) in sorted(runner_registry.iteritems()):
- print name.ljust(20), format_help(command).split('\n')[0]
- return
- elif topic == 'experiment':
- helptext = """
-
- dbdict-run serves to run experiments. To define an experiment, you
- only have to define a function respecting the following protocol in
- a python file or module:
-
- def my_experiment(state, channel):
- # experiment code goes here
-
- The return value of my_experiment may be channel.COMPLETE or
- channel.INCOMPLETE. If the latter is returned, the experiment may
- be resumed at a later point. Note that the return value `None`
- is interpreted as channel.COMPLETE.
-
- If a command defined by dbdict-run has an parameter,
- that parameter must be a string such that it could be used in a
- python import statement to import the my_experiment function. For
- example if you defined my_experiment in my_module.py, you can pass
- 'my_module.my_experiment' as the experiment parameter.
-
- When entering my_experiment, the current working directory will be
- set for you to a directory specially created for the experiment.
- The location and name of that directory vary depending on which
- dbdict-run command you run. You may create logs, save files, pictures,
- results, etc. in it.
-
- state is an object containing the parameters given to the experiment.
- For example, if you run the followinc command:
-
- dbdict-run cmdline my_module.my_experiment a.x=6
-
- `state.a.x` will contain the integer 6, and so will `state['a']['x']`.
- If the state is changed, it will be saved when the experiment ends
- or when channel.save() is called. The next time the experiment is run
- with the same working directory, the modified state will be provided.
-
- It is not recommended to store large amounts of data in the state. It
- should be limited to scalar or string parameters. Results such as
- weight matrices should be stored in files in the working directory.
-
- channel is an object with the following important methods:
-
- - channel.switch() (or channel()) will give the control back to the
- user, if it is appropriate to do so. If a call to channel.switch()
- returns the string 'stop', it typically means that the signal
- SIGTERM (or SIGINT) was received. Therefore, the experiment may be
- killed soon, so it should save and return True or
- channel.INCOMPLETE so it can be resumed later. This should be
- checked periodically or data loss may be incurred.
-
- - channel.save() will save the current state. It is automatically
- called when the function returns, but it is a good idea to do it
- periodically.
-
- - channel.save_and_switch() is an useful shortcut to do both operations
- described above.
- """
-
- elif topic == 'parameters':
- helptext = """
- If a command takes arguments, the arguments should each
- take one of the following forms:
-
- key=value
-
- Set a parameter with name `key` to `value`. The value will be casted
- to an appropriate type automatically and it will be accessible to
- the experiment using `state.key`.
-
- If `key` is a dotted name, the value will be set in nested
- dictionaries corresponding to each part.
-
- Examples:
- a=1 state.a <- 1
- b=2.3 state.b <- 2.3
- c.d="hello" state.c.d <- "hello"
-
- key::builder
-
- This is equivalent to key.__builder__=builder.
-
- The builder should be a symbol that can be used with import or
- __import__ and it should be callable.
-
- If a key has a builder defined, the experiment code may easily make
- an object out of it using the `make` function. `obj = make(state.key)`.
- This will call the builder on the substate corresponding to state.key,
- as will be made clear in the example:
-
- Example:
- regexp::re.compile
- regexp.pattern='a.*c'
-
- from pylearn.dbdict.newstuff import make
- def experiment(state, channel):
- regexp = make(state.regexp) # regexp is now re.compile(pattern = 'a.*c')
- print regexp.sub('blahblah', 'hello abbbbc there')
-
- If the above experiment was called with the state produced by the
- parameters in the example, it would print 'hello blahblah there'.
- """
-
- elif topic == 'example':
- helptext = """
- Example of an experiment that trains some model for 100000 iterations:
-
- # defined in: my_experiments.py
- def experiment(state, channel):
- try:
- model = cPickle.load(open('model', 'r'))
- except:
- model = my_model(state.some_param, state.other_param)
- state.n = 0
- dataset = my_dataset(skipto = state.n)
- for i in xrange(100000 - state.n):
- model.update(dataset.next())
- if i and i % 1000 == 0:
- if channel.save_and_switch() == 'stop':
- state.n += i + 1
- rval = channel.INCOMPLETE
- break
- else:
- state.result = model.cost(some_test_set)
- rval = channel.COMPLETE
- cPickle.dump(model, open('model', 'w'))
- return rval
-
- And then you could run it this way:
-
- dbdict-run cmdline my_experiments.experiment \\
- some_param=1 \\
- other_param=0.4
-
- Or this way:
-
- dbdict-run sqlschedule postgres://user:pass@host/dbname/tablename \\
- my_experiments.experiment \\
- some_param=1 \\
- other_param=0.4
-
- dbdict-run sql postgres://user:pass@host/dbname/tablename exproot
-
- You need to make sure that the module `my_experiments` is accessible
- from python. You can check with the command
-
- $ python -m my_experiments
- """
- else:
- helptext = runner_registry.get(topic, None)[1]
- print format_help(helptext)
-
-runner_registry['help'] = (None, runner_help)
-
-################################################################################
-### main
-################################################################################
-
-def run_cmdline():
- try:
- if len(sys.argv) <= 1:
- raise UsageError('Usage: %s [*]' % sys.argv[0])
- cmd = None
- args = []
- for arg in sys.argv[1:]:
- if cmd is not None or arg.startswith('-'):
- args.append(arg)
- else:
- cmd = arg
- parse_and_run(cmd, args)
- except UsageError, e:
- print 'Usage error:'
- print e
-
-if __name__ == '__main__':
- run_cmdline()
-
-
diff -r d3d8f5a17909 -r 2704c8688ced pylearn/dbdict/sample_create_jobs.py
--- a/pylearn/dbdict/sample_create_jobs.py Wed Feb 11 01:42:58 2009 -0500
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,135 +0,0 @@
-import sys, subprocess
-
-from .dconfig import Config
-from .tools import Job # tools.py
-
-#
-# Models
-#
-
-class OldSampleJob(Job):
- """ Sample Job object. Cut and paste this for your own experiments. """
-
- # default parameters for the job
- # config file parameters must be from this list.
- a = 0
- b = 0
- c = 'a'
- d = 0.0
-
- def run(self, results, dry_run=False):
- # attributes that you set in 'results' will be saved to job_results.py, next to
- # job_config.py when this function returns.
- results.f = self.a * self.b
-
- # The current working directory (cwd) is writable and readable
- # it will also be left alone after the job terminates
- f = open('some_file', 'w')
- print >> f, "hello from the job?"
-
- return True #restart this job
-
-
-
-class SampleJobState():
-
- # default parameters for the job
- # config file parameters must be from this list.
- a = 0
- b = 0
- c = 'a'
- d = 0.0
-
-
-sample_job_table = Table('my_table_for_testing', metadata,
- Column('a', Integer),
- Column('b', Float(53)))
-
-metadata.create_all()
-
-mapper(SampleJobState, sample_job_table)
-
-s = SampleJobState()
-s.a = 5
-s.b = 8.2
-
-if Session().query(SampleJobState).filter_by(a=5, b=8.2).any():
- break;
-else:
- Session().save(s).commit()
-
-
-class SampleJob(Job):
- """ Sample Job object. Cut and paste this for your own experiments. """
-
- def __init__(self, state):
- pass
-
- def start(self):
- pass
-
- def resume(self):
- pass
-
- def run(self, switch = lambda : 'continue'):
- # attributes that you set in 'results' will be saved to job_results.py, next to
- # job_config.py when this function returns.
- params.f = params.a * params.b
-
-
- # The current working directory (cwd) is writable and readable
- # it will also be left alone after the job terminates
- f = open('some_file', 'w')
- print >> f, "hello from the job?"
- while True:
- time.sleep(5)
- if switch() == 'stop':
- break
-
- return True #continue running if possible...
-
- def stop(self):
- pass
-
-
- def run(self, state, dry_run=False):
-
-def some_jobs(a = 0,
- b = 2):
- job_module = 'dbdict.sample_create_jobs'
- job_symbol = 'SampleJob'
- for c in ('a', 'b', 'c'):
- for d in [0.0, 9.9]:
- yield locals()
-
-def create(generator):
- """Create a set of job directories"""
- jobs = []
- dispatch = sys.stdout #open('jobs','w')
-
- for config in generator:
- #print ' config', config
- configdir = 'job_%016x'% abs(hash(config))
- jobs.append(configdir)
- create_dirs = True
- dry_run = 0
- if not dry_run:
- if create_dirs and subprocess.call(('mkdir', configdir)):
- print >> sys.stderr, 'Error creating directory: ', configdir
- else:
- #no problem creating the directory
- config.save(configdir + '/job_config.py')
- print >> dispatch, "dbdict-run-job run", configdir
- else:
- #print configdir
- pass
- dispatch.close()
-
-if __name__ == '__main__':
- create(some_jobs())
-
- j = SampleJob(SampleJobState())
- j.start()
- j.run()
-
-
diff -r d3d8f5a17909 -r 2704c8688ced pylearn/dbdict/scratch.py
--- a/pylearn/dbdict/scratch.py Wed Feb 11 01:42:58 2009 -0500
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,451 +0,0 @@
-import threading, time, commands, os, sys, math, random, datetime
-
-import psycopg2, psycopg2.extensions
-from sqlalchemy import create_engine, desc
-from sqlalchemy.orm import sessionmaker
-from sqlalchemy import Table, Column, MetaData, ForeignKey
-from sqlalchemy import Integer, String, Float, DateTime
-from sqlalchemy.orm import mapper, relation, backref, eagerload
-from sqlalchemy.sql import operators, select
-
-
-##########
-#
-# Connection stuff
-#
-#
-
-if 0:
- _db_host = 'jais.iro.umontreal.ca'
-
- def _pwd():
- if not hasattr(_pwd,'rval'):
- pw_cmd = 'ssh bergstrj@grieg.iro.umontreal.ca cat .lisa_db'
- rval = commands.getoutput(pw_cmd)
- return rval
-
- def engine():
- """Create an engine to access lisa_db on gershwin
-
- This function caches the return value between calls.
- """
- if not hasattr(engine,'rval'):
- pw = _pwd()
- db_str ='postgres://bergstrj:%s@%s/lisa_db' % (pw,_db_host)
- echo = False #spews pseudo-sql to stdout
- engine.rval = create_engine(db_str
- ,pool_size=1 # should force the app release extra connections
- # releasing connections should let us schedule more jobs, since each one operates
- # autonomously most of the time, just checking the db rarely.
- # TODO: optimize this for large numbers of jobs
- ,echo=echo
- )
- return engine.rval
-
- def engine_serializable():
- """Create an engine to access lisa_db on gershwin, which uses serializable
- transaction mode.
-
- This function caches the return value between calls.
- """
-
- this = engine_serializable
-
- if not hasattr(this,'rval'):
- pw = _pwd()
- def connect():
- c = psycopg2.connect(user='bergstrj',
- password=pw,
- database='lisa_db',
- host='gershwin.iro.umontreal.ca')
- c.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE)
- return c
- pool_size=0
- this.rval = create_engine('postgres://'
- ,creator=connect
- ,pool_size=0 # should force the app release connections
- )
- return this.rval
-
-
- Session = sessionmaker(bind=engine(), autoflush=True, transactional=True)
- SessionSerial = sessionmaker(bind=engine_serializable(),
- autoflush=True, transactional=True)
-
-else:
- from sqlalchemy import create_engine
- from sqlalchemy.orm import sessionmaker
- engine = create_engine('sqlite:///:memory:', echo=False)
- Session = sessionmaker(bind=engine, autoflush=True, transactional=True)
- SessionSerial = Session
-
-
-################
-#
-# Database setup
-#
-table_prefix='bergstrj_scratch_test_'
-metadata = MetaData()
-
-def table_with_id(name, *args):
- return Table(name, metadata,
- Column('id', Integer, primary_key=True),
- *args)
-
-t_trial = table_with_id(table_prefix+'trial',
- Column('desc', String(256)), #comment: why running this trial?
- Column('priority', Float(53)), #aka Double
- Column('start', DateTime),
- Column('finish', DateTime),
- Column('host', String(256)))
-
-t_keyval = table_with_id(table_prefix+'keyval',
- Column('name', String(32), nullable=False), #name of attribute
- Column('fval', Float(53)), #aka Double
- Column('ival', Integer),
- Column('sval', String(256))) #TODO: store text (strings of unbounded length)
-
-t_trial_keyval = table_with_id(table_prefix+'trial_keyval',
- Column('trial_id', Integer, ForeignKey('%s.id' % t_trial)),
- Column('keyval_id', Integer, ForeignKey('%s.id' % t_keyval)))
-
-class _KeyVal(object):
- @staticmethod
- def cache(name, val, session, create=True):
- #TODO: consider using a local cache to remove the need to query db
- # this takes advantage of common usage, which is to only add KeyVal
- # pairs.
- #check if it is in the DB
- q = session.query(_KeyVal)
- if isinstance(val, float):
- q = q.filter_by(name=name, fval=val)
- elif isinstance(val, int):
- q = q.filter_by(name=name, ival=val)
- elif isinstance(val, str):
- q = q.filter_by(name=name, sval=val)
- else:
- raise TypeError(val)
- rval = q.first()
- if rval is None and create:
- rval = _KeyVal(name, val)
- session.save_or_update(rval)
- return rval
-
- def __init__(self, name, val):
- self.name = name
- self.val = val
- def __get_val(self):
- val = None
- if self.fval is not None: val = self.fval
- if self.ival is not None: val = self.ival
- if self.sval is not None: val = self.sval
- return val
- def __set_val(self, val):
- if isinstance(val, float):
- self.fval = val
- self.ival = None
- self.sval = None
- elif isinstance(val, int):
- self.fval = None
- self.ival = val
- self.sval = None
- elif isinstance(val, str):
- self.fval = None
- self.ival = None
- self.sval = val
- else:
- raise TypeError(val)
- val = property(__get_val, __set_val)
- def __repr__(self):
- return "" % (self.id, self.name, repr(self.val))
-mapper(_KeyVal, t_keyval)
-
-
-###################
-#
-# Job interface
-#
-
-class Trial(object):
- _native_cols = 'desc', 'priority', 'start', 'finish', 'host'
-
- #TODO: remove these forbidden keynames, and let all keynames work properly
- _forbidden_keynames = set(['filter', 'desc',
- 'priority', 'start', 'finish', 'host',
- 'create', 'max_sleep', 'max_retry', 'session', 'c',
- 'abort', 'complete'])
-
- #TODO: unittest cases to verify that having these kinds of keys is OK
-
- class ReserveError(Exception): """reserve failed"""
-
- @staticmethod
- def filter(session=None, **kwargs):
- """Construct a query for Trials.
-
- @param kwargs: each (kw,arg) pair in kwargs, will restrict the list of
- Trials to those such that the 'kw' attr has been associated with the job,
- and it has value 'arg'
-
- @return SqlAlchemy query object
-
- @note: will raise TypeError if any arg in kwargs has a type other than
- float, int or string.
-
- """
- if session is None:
- session = Session()
- q = session.query(Trial)
- for col in Trial._native_cols:
- if col in kwargs:
- q = q.filter_by(**{col:kwargs[col]})
- del kwargs[col]
- for kw, arg in kwargs.items():
- if isinstance(arg, float):
- q = q.filter(Trial._attrs.any(name=kw, fval=arg))
- elif isinstance(arg, int):
- q = q.filter(Trial._attrs.any(name=kw, ival=arg))
- elif isinstance(arg, str):
- q = q.filter(Trial._attrs.any(name=kw, sval=arg))
- else:
- raise TypeError(arg)
- return q
-
- @staticmethod
- def reserve_unique(query_fn, max_retry=10, max_sleep=5.0):
- """Reserve an un-reserved job.
-
- @param query_fn: build the query for the trial to reserve (see
- L{reserve_unique_kw} for example usage).
-
- @param max_retry: try this many times to reserve a job before raising an exception
-
- @param max_sleep: L{time.sleep} up to this many seconds between retry attempts
-
- @return a trial which was reserved (uniquely) by this function call. If
- no matching jobs remain, return None.
-
- """
- s = SessionSerial()
- retry = max_retry
- trial = None
-
- while (trial is None) and retry:
-
- q = query_fn(s.query(Trial))
- q = q.options(eagerload('_attrs')) #TODO is this a good idea?
-
- trial = q.first()
- if trial is None:
- return None # no jobs match the query
- else:
- try:
- trial.reserve(session=s)
- except Trial.ReserveError, e:
- s.rollback()
- waittime = random.random() * max_sleep
- if debug: print 'another process stole our trial. Waiting %f secs' % wait
- time.sleep(waittime)
- retry -= 1
- if trial:
- s.expunge(trial)
- s.close()
- return trial
-
- @staticmethod
- def reserve_unique_kw(max_retry=10, max_sleep=5.0, **kwargs):
- """Call reserve_unique with a query function that matches jobs with
- attributes (and values) given by kwargs. Results are sorted by
- priority.
-
- """
- assert 'start' not in kwargs
- assert 'query_fn' not in kwargs
- def query_fn(q):
- q = q.filter_by(start=None,
- **kwargs).order_by(desc(Trial.c.priority))
- return q
-
-
- return Trial.reserve_unique(query_fn, max_retry, max_sleep)
-
- def __init__(self, desc=None, priority=None, start=None, finish=None, host=None, **kwargs):
- self.desc = desc
- self.priority = priority
- self.start = start
- self.finish = finish
- self.host = host
- self.attrs.update(kwargs)
- def __repr__(self):
- return "" \
- %(str(self.id), self.desc, self.priority, self.host,
- self.start, self.finish)
-
- def _get_attrs(self):
- #This bit of code makes it so that you can type something like:
- #
- # trial.attrs.score = 50
- #
- # It will use the self._attrs list of _KeyVal instances as a backend,
- # because these pseudo-attributes (here 'score') are not stored in any
- # object's __dict__.
- class AttrCatcher(object):
- #TODO: make these methods faster with better data structures
- def __getattr__(attr_self, attr):
- attrlist = self._attrs
- for i,a in enumerate(attrlist):
- if a.name == attr:
- return a.val
- raise AttributeError(attr)
- def __setattr__(attr_self, attr, val):
- n = 0
- s = Session()
- assert attr not in Trial._forbidden_keynames
- for i,a in enumerate(self._attrs):
- if a.name == attr:
- attrlist[i] = _KeyVal.cache(attr,val, s)
- n += 1
- assert n <= 1
- if n == 0:
- self._attrs.append(_KeyVal.cache(attr,val, s))
- s.commit()
- def __iter__(_self):
- def gen():
- return self._attrs.__iter__()
- return gen()
- def update(attr_self, dct):
- for k,v in dct.items():
- setattr(attr_self, k, v)
-
- #we can't add attributes to self, so just do this...
- return AttrCatcher() #seriously, allocate a new one each time
- attrs = property(_get_attrs, doc = ("Provide attribute-like access to the"
- " key-value pairs associated with this trial"))
- def reserve(self, session): #session should have serialized isolation mode
- """Reserve the job for the current process, to the exclusion of all
- other processes. In other words, lock it."""
-
- serial_self = session.query(Trial).get(self.id)
- if serial_self.start is not None:
- raise Trial.ReserveError(self.host)
- serial_self.start = datetime.datetime.now()
- serial_self.host = 'asdf' #TODO: get hostname
- try:
- session.commit()
- except Exception, e:
- # different db backends raise different exceptions when a
- # commit fails, so here we just treat all problems the same
- #s.rollback() # doc says rollback or close after commit exception
- session.close()
- raise Trial.ReserveError(self.host)
-
- #Session().refresh(self) #load changes to serial_self into self
-
- def abort(self):
- """Reset job to be available for reservation.
-
- @return None
-
- @note: Raises exception if job is finished
-
- """
- if self.finish is not None:
- raise Exception('wtf?')
- self.start = None
- self.host = None
- s = Session()
- s.save_or_update(self)
- s.commit()
-
- def complete(self, **kwargs):
- """Mark job self as finished and update attrs with kwargs."""
- self.attrs.update(kwargs)
- self.finish = datetime.datetime.now()
- s = Session()
- s.save_or_update(self)
- s.commit()
-
-mapper(Trial, t_trial,
- properties = {
- '_attrs': relation(_KeyVal,
- secondary=t_trial_keyval,
- cascade="delete-orphan")
- })
-
-metadata.create_all(engine) # does nothing when tables already exist
-
-
-
-
-############
-#
-# Test Stuff
-#
-#
-
-if __name__ == '__main__':
- s = Session()
-
- def add_some_jobs():
- dvalid, dtest = 'dvalid', 'dtest file'
- desc = 'debugging'
-
- def blah():
- for lr in [0.001, 0.01]:
- for scale in [0.0001 * math.sqrt(10.0)**i for i in range(4)]:
- for rng_seed in [4, 5, 6]:
- for priority in [None, 1, 2]:
- yield dict(locals())
-
- for kwargs in blah():
- t = Trial(desc=desc, dvalid=dvalid, dtest=dtest, **kwargs)
- s.save(t)
-
- def work(jobtype):
- try:
- jid = reserve(jobtype)
- except StopIteration:
- return
-
- def blah(*args):
- print 'blah: ', args
- dct = get_next()
-
- add_some_jobs()
-
- if 0:
- for t in s.query(Trial):
- print 'saved:', t, [a.name for a in list(t.attrs)]
-
- def yield_unique_jobs():
- while True:
- rval = Trial.reserve_unique_kw()
- if rval is None:
- break
- else:
- yield rval
-
- if 0:
- for job in yield_unique_jobs():
- print 'yielded job', job
- job.complete(score=random.random())
-
- if 0:
- for t in s.query(Trial):
- print 'final:', t, [a.name for a in list(t.attrs)]
-
- q = s.query(Trial)
- #q = q.order_by(Trial.attrs)
- q = q.filter(Trial._attrs.any(name='rng_seed', ival=5))
- #q = q.filter(Trial._attrs.all(name='lr').order_by(_KeyVal.c.fval))
-
- print dir(q)
-
-
- print q
- for t in q.all():
- print 'final:', t, [a.name for a in list(t.attrs)]
- t.homebrew = 6
-
-
-
diff -r d3d8f5a17909 -r 2704c8688ced pylearn/dbdict/sql.py
--- a/pylearn/dbdict/sql.py Wed Feb 11 01:42:58 2009 -0500
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,311 +0,0 @@
-
-import sys, os, copy, time
-
-import numpy.random
-
-import sqlalchemy
-import sqlalchemy.pool
-from sqlalchemy import create_engine, desc
-from sqlalchemy.orm import eagerload
-import psycopg2, psycopg2.extensions
-
-from api0 import db_from_engine, postgres_db, DbHandle
-
-
-EXPERIMENT = 'dbdict.experiment'
-#using the dictionary to store these is too slow
-STATUS = 'dbdict.status'
-HASH = 'dbdict.hash'
-PRIORITY = 'dbdict.sql.priority'
-
-HOST = 'dbdict.sql.hostname'
-HOST_WORKDIR = 'dbdict.sql.host_workdir'
-PUSH_ERROR = 'dbdict.sql.push_error'
-
-START = 0
-RUNNING = 1
-DONE = 2
-FUCKED_UP = 666
-
-_TEST_CONCURRENCY = False
-
-def postgres_serial(user, password, host, database, poolclass=sqlalchemy.pool.NullPool, **kwargs):
- """Return a DbHandle instance that communicates with a postgres database at transaction
- isolation_level 'SERIALIZABLE'.
-
- :param user: a username in the database
- :param password: the password for the username
- :param host: the network address of the host on which the postgres server is running
- :param database: a database served by the postgres server
-
- """
- this = postgres_serial
-
- if not hasattr(this,'engine'):
- def connect():
- c = psycopg2.connect(user=user, password=password, database=database, host=host)
- c.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE)
- return c
- this.engine = create_engine('postgres://'
- ,creator=connect
- ,poolclass=poolclass
- )
-
- db = db_from_engine(this.engine, **kwargs)
- db._is_serialized_session_db = True
- return db
-
-def book_dct_postgres_serial(db, retry_max_sleep=10.0, verbose=0):
- """Find a trial in the lisa_db with status START.
-
- A trial will be returned with status=RUNNING.
-
- Returns None if no such trial exists in DB.
-
- This function uses a serial access to the lisadb to guarantee that no other
- process will retrieve the same dct. It is designed to facilitate writing
- a "consumer" for a Producer-Consumer pattern based on the database.
-
- """
- print >> sys.stderr, """#TODO: use the priority field, not the status."""
- print >> sys.stderr, """#TODO: ignore entries with key PUSH_ERROR."""
-
- s = db.session() #open a new session
-
- # NB. we need the query and attribute update to be in the same transaction
- assert s.autocommit == False
-
- dcts_seen = set([])
- keep_trying = True
-
- dct = None
- while (dct is None) and keep_trying:
- #build a query
- q = s.query(db._Dict)
-
- #N.B.
- # use dedicated column to retrieve jobs, not the dictionary keyval pair
- # This should be much faster.
- q = q.filter(db._Dict.status==START)
- q = q.order_by(db._Dict.priority.desc())
-
- # this doesn't seem to work, hene the string hack below
- q = q.options(eagerload('_attrs')) #hard-coded in api0
-
- #try to reserve a dct
- try:
- #first() may raise psycopg2.ProgrammingError
- dct = q.first()
-
- if dct is not None:
- assert (dct not in dcts_seen)
- if verbose: print 'book_unstarted_dct retrieved, ', dct
- dct._set_in_session(STATUS, RUNNING, s)
- if 1:
- if _TEST_CONCURRENCY:
- print >> sys.stderr, 'SLEEPING BEFORE BOOKING'
- time.sleep(10)
-
- #commit() may raise psycopg2.ProgrammingError
- s.commit()
- else:
- print >> sys.stderr, 'DEBUG MODE: NOT RESERVING JOB!', dct
- #if we get this far, the job is ours!
- else:
- # no jobs are left
- keep_trying = False
- except (psycopg2.OperationalError,
- sqlalchemy.exceptions.ProgrammingError,
- sqlalchemy.exc.DBAPIError), e:
- #either the first() or the commit() raised
- s.rollback() # docs say to do this (or close) after commit raises exception
- if verbose: print 'caught exception', e
- if dct:
- # first() succeeded, commit() failed
- dcts_seen.add(dct)
- dct = None
- wait = numpy.random.rand(1)*retry_max_sleep
- if verbose: print 'another process stole our dct. Waiting %f secs' % wait
- time.sleep(wait)
-
- if dct:
- str(dct) # for loading of attrs in UGLY WAY!!!
- s.close()
- return dct
-
-def book_dct_non_postgres(db):
- print >> sys.stderr, """#TODO: use the priority field, not the status."""
- print >> sys.stderr, """#TODO: ignore entries with key self.push_error."""
-
- raise NotImplementedError()
-
-
-###########
-# Connect
-###########
-
-def parse_dbstring(dbstring):
- postgres = 'postgres://'
- if not dbstring.startswith(postgres):
- raise ValueError('For now, dbdict dbstrings must start with postgres://', dbstring)
- dbstring = dbstring[len(postgres):]
-
- #username_and_password
- colon_pos = dbstring.find('@')
- username_and_password = dbstring[:colon_pos]
- dbstring = dbstring[colon_pos+1:]
-
- colon_pos = username_and_password.find(':')
- if -1 == colon_pos:
- username = username_and_password
- password = None
- else:
- username = username_and_password[:colon_pos]
- password = username_and_password[colon_pos+1:]
-
- #hostname
- colon_pos = dbstring.find('/')
- hostname = dbstring[:colon_pos]
- dbstring = dbstring[colon_pos+1:]
-
- #dbname
- colon_pos = dbstring.find('/')
- dbname = dbstring[:colon_pos]
- dbstring = dbstring[colon_pos+1:]
-
- #tablename
- tablename = dbstring
-
- if password is None:
- password = get_password(hostname, dbname)
-
- if False:
- print 'USERNAME', username
- print 'PASS', password
- print 'HOST', hostname
- print 'DB', dbname
- print 'TABLE', tablename
-
- return username, password, hostname, dbname, tablename
-
-def get_password(hostname, dbname):
- """Return the current user's password for a given database
-
- :TODO: Replace this mechanism with a section in the pylearn configuration file
- """
- password_path = os.getenv('HOME')+'/.dbdict_%s'%dbname
- try:
- password = open(password_path).readline()[:-1] #cut the trailing newline
- except:
- raise ValueError( 'Failed to read password for db "%s" from %s' % (dbname, password_path))
- return password
-
-def db(dbstring):
- username, password, hostname, dbname, tablename = parse_dbstring(dbstring)
- try:
- return postgres_db(username, password, hostname, dbname, table_prefix=tablename)
- except:
- print 'Error connecting with password', password
- raise
-
-
-###########
-# Queue
-###########
-
-def insert_dict(jobdict, db, force_dup=False, session=None, priority=1.0):
- """Insert a new `job` dictionary into database `db`.
-
- :param force_dup: forces insertion even if an identical dictionary is already in the db
-
- """
- # compute hash for the job, will be used to avoid duplicates
- job = copy.copy(jobdict)
- jobhash = hash(`job`)
-
- if session is None:
- s = db.session()
- else:
- s = session
-
- do_insert = force_dup or (None is s.query(db._Dict).filter(db._Dict.hash==jobhash).filter(db._Dict.status!=FUCKED_UP).first())
-
- rval = None
- if do_insert:
- if STATUS not in job:
- job[STATUS] = START
- if HASH not in job:
- job[HASH] = jobhash
- if PRIORITY not in job:
- job[PRIORITY] = priority
- rval = db.insert(job, session=s)
- s.commit()
-
- if session is None:
- s.close()
- return rval
-
-
-def insert_job(experiment_fn, state, db, force_dup=False, session=None, priority=1.0):
- state = copy.copy(state)
- experiment_name = experiment_fn.__module__ + '.' + experiment_fn.__name__
- if EXPERIMENT in state:
- if state[EXPERIMENT] != experiment_name:
- raise Exception('Inconsistency: state element %s does not match experiment %s' %(EXPERIMENT, experiment_name))
- else:
- state[EXPERIMENT] = experiment_name
- return insert_dict(state, db, force_dup=force_dup, session=session, priority=priority)
-
-
-# TODO: FIXME: WARNING
-# Should use insert_dict instead of db.insert. Need one entry point for adding jobs to
-# database, so that hashing can be done consistently
-def add_experiments_to_db(jobs, db, verbose=0, add_dups=False, type_check=None, session=None):
- """Add experiments paramatrized by jobs[i] to database db.
-
- Default behaviour is to ignore jobs which are already in the database.
-
- If type_check is a class (instead of None) then it will be used as a type declaration for
- all the elements in each job dictionary. For each key,value pair in the dictionary, there
- must exist an attribute,value pair in the class meeting the following criteria:
- the attribute and the key are equal, and the types of the values are equal.
-
- :param jobs: The parameters of experiments to run.
- :type jobs: an iterable object over dictionaries
- :param verbose: print which jobs are added and which are skipped
- :param add_dups: False will ignore a job if it matches (on all items()) with a db entry.
- :type add_dups: Bool
-
- :returns: list of (Bool,job[i]) in which the flags mean the corresponding job actually was
- inserted.
-
- """
- rval = []
- for job in jobs:
- job = copy.copy(job)
- if session is None:
- s = db.session()
- do_insert = force_dup or (None is db.query(s).filter_eq_dct(job).first())
- s.close()
- else:
- do_insert = force_dup or (None is db.query(session).filter_eq_dct(job).first())
-
- if do_insert:
- if type_check:
- for k,v in job.items():
- if type(v) != getattr(type_check, k):
- raise TypeError('Experiment contains value with wrong type',
- ((k,v), getattr(type_check, k)))
-
- job[STATUS] = START
- job[PRIORITY] = 1.0
- if verbose:
- print 'ADDING ', job
- db.insert(job)
- rval.append((True, job))
- else:
- if verbose:
- print 'SKIPPING', job
- rval.append((False, job))
-
-
diff -r d3d8f5a17909 -r 2704c8688ced pylearn/dbdict/sql_commands.py
--- a/pylearn/dbdict/sql_commands.py Wed Feb 11 01:42:58 2009 -0500
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,28 +0,0 @@
-
-def crazy_sql_command(viewname, cols, dicttab, keytab, id_col='id', dict_id='dict_id'):
-
- #create or replace view expview as select * from (select id as v1_id, value as nhid from
- #test0 where name='nhid') nhid LEFT OUTER JOIN (select id as v2_id, value as lrate from
- #test0 where name='lrate') lrate on nhid.v1_id = lrate.v2_id;
-
- col_queries = []
- colname0 = None
- for i, (colname, table_col) in enumerate(cols):
- safe_col = colname.replace('_','') # get rid of underscores
- safe_col = safe_col.replace('.','_') # replace dots with underscores
-
- cols[i][0] = safe_col
-
- q = """LEFT OUTER JOIN
- (select %(dict_id)s, %(table_col)s as %(safe_col)s from \"%(keytab)s\"
- where name='%(colname)s') %(safe_col)s
- on %(safe_col)s.dict_id = %(dicttab)s.%(id_col)s""" % locals()
-
- col_queries.append(q)
-
- header = "create or replace view %s as select %s.%s, %s from %s " \
- % (viewname, dicttab, id_col, (", ".join([c[0] for c in cols])), dicttab)
-
- rval = header + "\n".join(col_queries)
-
- return rval
diff -r d3d8f5a17909 -r 2704c8688ced pylearn/dbdict/test_api0.py
--- a/pylearn/dbdict/test_api0.py Wed Feb 11 01:42:58 2009 -0500
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,351 +0,0 @@
-from api0 import *
-import threading, time, commands, os, sys, math, random, datetime
-
-import psycopg2, psycopg2.extensions
-from sqlalchemy import create_engine, desc
-from sqlalchemy.orm import sessionmaker
-from sqlalchemy import Table, Column, MetaData, ForeignKey
-from sqlalchemy import Integer, String, Float, DateTime, Text, Binary
-from sqlalchemy.orm import mapper, relation, backref, eagerload
-from sqlalchemy.sql import operators, select
-
-import unittest
-
-class T(unittest.TestCase):
-
- def test_bad_dict_table(self):
- """Make sure our crude version of schema checking kinda works"""
- engine = create_engine('sqlite:///:memory:', echo=False)
- Session = sessionmaker(bind=engine, autoflush=True, transactional=True)
-
- table_prefix='bergstrj_scratch_test_'
- metadata = MetaData()
- t_trial = Table(table_prefix+'trial', metadata,
- Column('id', Integer, primary_key=True),
- Column('desc', String(256)), #comment: why running this trial?
- Column('priority', Float(53)), #aka Double
- Column('start', DateTime),
- Column('finish', DateTime),
- Column('host', String(256)))
- metadata.create_all(engine)
-
- try:
- h = DbHandle(None, t_trial, None, None)
- except ValueError, e:
- if e[0] == DbHandle.e_bad_table:
- return
- self.fail()
-
-
- def go(self):
- """Create tables and session_maker"""
- engine = create_engine('sqlite:///:memory:', echo=False)
- Session = sessionmaker(autoflush=True, transactional=True)
-
- table_prefix='bergstrj_scratch_test_'
- metadata = MetaData()
-
- t_trial = Table(table_prefix+'trial', metadata,
- Column('id', Integer, primary_key=True),
- Column('create', DateTime),
- Column('write', DateTime),
- Column('read', DateTime))
-
- t_keyval = Table(table_prefix+'keyval', metadata,
- Column('id', Integer, primary_key=True),
- Column('name', String(32), nullable=False), #name of attribute
- Column('ntype', Integer),
- Column('fval', Float(53)), #aka Double
- Column('sval', Text), #aka Double
- Column('bval', Binary)) #TODO: store text (strings of unbounded length)
-
- t_trial_keyval = Table(table_prefix+'trial_keyval', metadata,
- Column('dict_id', Integer, ForeignKey('%s.id' % t_trial),
- primary_key=True),
- Column('pair_id', Integer, ForeignKey('%s.id' % t_keyval),
- primary_key=True))
-
- metadata.bind = engine
- metadata.create_all() # does nothing when tables already exist
-
- self.engine = engine
- return Session, t_trial, t_keyval, t_trial_keyval
-
-
- def test_insert_save(self):
-
- Session, t_dict, t_pair, t_link = self.go()
-
- db = DbHandle(*self.go())
-
- def jobs():
- dvalid, dtest = 'dvalid', 'dtest file'
- desc = 'debugging'
- for lr in [0.001]:
- for scale in [0.0001 * math.sqrt(10.0)**i for i in range(4)]:
- for rng_seed in [4, 5, 6]:
- for priority in [None, 1]:
- yield dict(locals())
-
- jlist = list(jobs())
- assert len(jlist) == 1*4*3*2
- for i, dct in enumerate(jobs()):
- t = db.insert(**dct)
-
- #make sure that they really got inserted into the db
- orig_keycount = db._session.query(db._KeyVal).count()
- self.failUnless(orig_keycount > 0, orig_keycount)
-
- orig_dctcount = Session().query(db._Dict).count()
- self.failUnless(orig_dctcount ==len(jlist), orig_dctcount)
-
- orig_keycount = Session().query(db._KeyVal).count()
- self.failUnless(orig_keycount > 0, orig_keycount)
-
- #queries
- q0list = list(db.query().all())
- q1list = list(db.query())
- q2list = list(db)
-
- self.failUnless(q0list == q1list, (q0list,q1list))
- self.failUnless(q0list == q2list, (q0list,q1list))
-
- self.failUnless(len(q0list) == len(jlist))
-
- for i, (j, q) in enumerate(zip(jlist, q0list)):
- jitems = list(j.items())
- qitems = list(q.items())
- jitems.sort()
- qitems.sort()
- if jitems != qitems:
- print i
- print jitems
- print qitems
- self.failUnless(jitems == qitems, (jitems, qitems))
-
- def test_query_0(self):
- Session, t_dict, t_pair, t_link = self.go()
-
- db = DbHandle(*self.go())
-
- def jobs():
- dvalid, dtest = 'dvalid', 'dtest file'
- desc = 'debugging'
- for lr in [0.001]:
- for scale in [0.0001 * math.sqrt(10.0)**i for i in range(4)]:
- for rng_seed in [4, 5, 6]:
- for priority in [None, 1]:
- yield dict(locals())
-
- jlist = list(jobs())
- assert len(jlist) == 1*4*3*2
- for i, dct in enumerate(jobs()):
- t = db.insert(**dct)
-
- qlist = list(db.query(rng_seed=5))
- self.failUnless(len(qlist) == len(jlist)/3)
-
- jlist5 = [j for j in jlist if j['rng_seed'] == 5]
-
- for i, (j, q) in enumerate(zip(jlist5, qlist)):
- jitems = list(j.items())
- qitems = list(q.items())
- jitems.sort()
- qitems.sort()
- if jitems != qitems:
- print i
- print jitems
- print qitems
- self.failUnless(jitems == qitems, (jitems, qitems))
-
- def test_delete_keywise(self):
- Session, t_dict, t_pair, t_link = self.go()
-
- db = DbHandle(*self.go())
-
- def jobs():
- dvalid, dtest = 'dvalid', 'dtest file'
- desc = 'debugging'
- for lr in [0.001]:
- for scale in [0.0001 * math.sqrt(10.0)**i for i in range(4)]:
- for rng_seed in [4, 5, 6]:
- for priority in [None, 1]:
- yield dict(locals())
-
- jlist = list(jobs())
- assert len(jlist) == 1*4*3*2
- for i, dct in enumerate(jobs()):
- t = db.insert(**dct)
-
- orig_keycount = Session().query(db._KeyVal).count()
-
- del_count = Session().query(db._KeyVal).filter_by(name='rng_seed',
- fval=5.0).count()
- self.failUnless(del_count == 8, del_count)
-
- #delete all the rng_seed = 5 entries
- qlist_before = list(db.query(rng_seed=5))
- for q in qlist_before:
- del q['rng_seed']
-
- #check that it's gone from our objects
- for q in qlist_before:
- self.failUnless('rng_seed' not in q) #via __contains__
- self.failUnless('rng_seed' not in q.keys()) #via keys()
- exc=None
- try:
- r = q['rng_seed'] # via __getitem__
- print 'r,', r
- except KeyError, e:
- pass
-
- #check that it's gone from dictionaries in the database
- qlist_after = list(db.query(rng_seed=5))
- self.failUnless(qlist_after == [])
-
- #check that exactly 8 keys were removed
- new_keycount = Session().query(db._KeyVal).count()
- self.failUnless(orig_keycount == new_keycount + 8, (orig_keycount,
- new_keycount))
-
- #check that no keys have rng_seed == 5
- gone_count = Session().query(db._KeyVal).filter_by(name='rng_seed',
- fval=5.0).count()
- self.failUnless(gone_count == 0, gone_count)
-
-
- def test_delete_dictwise(self):
- Session, t_dict, t_pair, t_link = self.go()
-
- db = DbHandle(*self.go())
-
- def jobs():
- dvalid, dtest = 'dvalid', 'dtest file'
- desc = 'debugging'
- for lr in [0.001]:
- for scale in [0.0001 * math.sqrt(10.0)**i for i in range(4)]:
- for rng_seed in [4, 5, 6]:
- for priority in [None, 1]:
- yield dict(locals())
-
- jlist = list(jobs())
- assert len(jlist) == 1*4*3*2
- for i, dct in enumerate(jobs()):
- t = db.insert(**dct)
-
- orig_keycount = Session().query(db._KeyVal).count()
- orig_dctcount = Session().query(db._Dict).count()
- self.failUnless(orig_dctcount == len(jlist))
-
- #delete all the rng_seed = 5 dictionaries
- qlist_before = list(db.query(rng_seed=5))
- for q in qlist_before:
- q.delete()
-
- #check that the right number has been removed
- post_dctcount = Session().query(db._Dict).count()
- self.failUnless(post_dctcount == len(jlist)-8)
-
- #check that the remaining ones are correct
- for a, b, in zip(
- [j for j in jlist if j['rng_seed'] != 5],
- Session().query(db._Dict).all()):
- self.failUnless(a == b)
-
- #check that the keys have all been removed
- n_keys_per_dict = 8
- new_keycount = Session().query(db._KeyVal).count()
- self.failUnless(orig_keycount - 8 * n_keys_per_dict == new_keycount, (orig_keycount,
- new_keycount))
-
-
- def test_setitem_0(self):
- Session, t_dict, t_pair, t_link = self.go()
-
- db = DbHandle(*self.go())
-
- b0 = 6.0
- b1 = 9.0
-
- job = dict(a=0, b=b0, c='hello')
-
- dbjob = db.insert(**job)
-
- dbjob['b'] = b1
-
- #check that the change is in db
- qjob = Session().query(db._Dict).filter(db._Dict._attrs.any(name='b',
- fval=b1)).first()
- self.failIf(qjob is dbjob)
- self.failUnless(qjob == dbjob)
-
- #check that the b:b0 key is gone
- count = Session().query(db._KeyVal).filter_by(name='b', fval=b0).count()
- self.failUnless(count == 0, count)
-
- #check that the b:b1 key is there
- count = Session().query(db._KeyVal).filter_by(name='b', fval=b1).count()
- self.failUnless(count == 1, count)
-
- def test_setitem_1(self):
- """replace with different sql type"""
- Session, t_dict, t_pair, t_link = self.go()
-
- db = DbHandle(*self.go())
-
- b0 = 6.0
- b1 = 'asdf' # a different dtype
-
- job = dict(a=0, b=b0, c='hello')
-
- dbjob = db.insert(**job)
-
- dbjob['b'] = b1
-
- #check that the change is in db
- qjob = Session().query(db._Dict).filter(db._Dict._attrs.any(name='b',
- sval=b1)).first()
- self.failIf(qjob is dbjob)
- self.failUnless(qjob == dbjob)
-
- #check that the b:b0 key is gone
- count = Session().query(db._KeyVal).filter_by(name='b', fval=b0).count()
- self.failUnless(count == 0, count)
-
- #check that the b:b1 key is there
- count = Session().query(db._KeyVal).filter_by(name='b', sval=b1,
- fval=None).count()
- self.failUnless(count == 1, count)
-
- def test_setitem_2(self):
- """replace with different number type"""
- Session, t_dict, t_pair, t_link = self.go()
-
- db = DbHandle(*self.go())
-
- b0 = 6.0
- b1 = 7
-
- job = dict(a=0, b=b0, c='hello')
-
- dbjob = db.insert(**job)
-
- dbjob['b'] = b1
-
- #check that the change is in db
- qjob = Session().query(db._Dict).filter(db._Dict._attrs.any(name='b',
- fval=b1)).first()
- self.failIf(qjob is dbjob)
- self.failUnless(qjob == dbjob)
-
- #check that the b:b0 key is gone
- count = Session().query(db._KeyVal).filter_by(name='b', fval=b0,ntype=1).count()
- self.failUnless(count == 0, count)
-
- #check that the b:b1 key is there
- count = Session().query(db._KeyVal).filter_by(name='b', fval=b1,ntype=0).count()
- self.failUnless(count == 1, count)
-
-
-if __name__ == '__main__':
- unittest.main()
diff -r d3d8f5a17909 -r 2704c8688ced pylearn/dbdict/tests/test_experiment.py
--- a/pylearn/dbdict/tests/test_experiment.py Wed Feb 11 01:42:58 2009 -0500
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,66 +0,0 @@
-from pylearn.dbdict.experiment import *
-from unittest import TestCase
-
-import StringIO
-
-
-class T_subdict(TestCase):
-
- def test0(self):
- a = {'aa':0, 'ab':1, 'bb':2}
- s = subdict(a, 'a') # returns dict-like object with keyvals {'a':0, 'b':1}
- s['a'] = 5
- s['c'] = 9
-
- self.failUnless(s['c'] == 9)
- self.failUnless(a['ac'] == 9)
-
- #check that the subview has the right stuff
- sitems = s.items()
- sitems.sort()
- self.failUnless(sitems == [('a', 5), ('b', 1), ('c', 9)], str(sitems))
- self.failUnless(a['bb'] == 2)
-
- #add to the subview via the parent
- a['az'] = -1
- self.failUnless(s['z'] == -1)
-
- def test1(self):
- a = {'aa':0, 'ab':1, 'bb':2}
-
- s = subdict(a, 'a')
-
- r = {}
- r.update(s)
-
- self.failUnless(len(r) == len(s))
- self.failUnless(r == s, (str(r), str(s)))
-
-
-class T_call_with_kwargs_from_dict(TestCase):
-
- def test0(self):
-
- def f(a, c=5):
- return a+c
-
- def g(a, **dct):
- return a + dct['c']
-
-
- kwargs = dict(a=1, b=2, c=3)
-
- io = StringIO.StringIO()
-
- self.failUnless(call_with_kwargs_from_dict(f, kwargs, logfile=io) == 4)
- self.failUnless(io.getvalue() == \
- "WARNING: DictProxyState.call_substate ignoring key-value pair: b 2\n")
- self.failUnless(call_with_kwargs_from_dict(g, kwargs, logfile=io) == 4)
- self.failUnless(io.getvalue() == \
- "WARNING: DictProxyState.call_substate ignoring key-value pair: b 2\n")
- del kwargs['c']
- self.failUnless(call_with_kwargs_from_dict(f, kwargs, logfile=io) == 6)
- self.failUnless(io.getvalue() == \
- ("WARNING: DictProxyState.call_substate ignoring key-value pair: b 2\n"
- "WARNING: DictProxyState.call_substate ignoring key-value pair: b 2\n"))
-
diff -r d3d8f5a17909 -r 2704c8688ced pylearn/dbdict/tools.py
--- a/pylearn/dbdict/tools.py Wed Feb 11 01:42:58 2009 -0500
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,143 +0,0 @@
-"""Helper code for dbdict job drivers."""
-
-import sys, inspect
-
-from .experiment import COMPLETE, INCOMPLETE, subdict
-
-MODULE = 'dbdict_module'
-SYMBOL = 'dbdict_symbol'
-PREIMPORT = 'dbdict_preimport'
-
-def dummy_channel(*args, **kwargs):
- return None
-
-#
-#this proxy object lets experiments use a dict like a state object
-#
-def DictProxyState(dct):
- """Convenient dict -> object interface for the state parameters of dbdict jobs.
-
- In the dbdict job running protocol, the user provides a job as a function with two
- arguments:
-
- def myjob(state, channel):
- a = getattr(state, 'a', blah)
- b = state.blah
- ...
-
- In the case that the caller of myjob has the attributes of this `state` in a dictionary,
- then this `DictProxyState` function returns an appropriate object, whose attributes are
- backed by this dictionary.
-
- """
- defaults_obj = [None]
- class Proxy(object):
- def substate(s, prefix=''):
- return DictProxyState(subdict(dct, prefix))
-
- def use_defaults(s, obj):
- """Use `obj` to retrieve values when they are not in the `dict`.
-
- :param obj: a dictionary of default values.
- """
- defaults_obj[0] = obj
-
- def __getitem__(s,a):
- """Returns key `a` from the underlying dict, or from the defaults.
-
- Raises `KeyError` on failure.
- """
- try:
- return dct[a]
- except Exception, e:
- try:
- return getattr(defaults_obj[0], a)
- except:
- raise e
-
- def __setitem__(s,a,v):
- """Sets key `a` equal to `v` in the underlying dict. """
- dct[a] = v
-
- def __getattr__(s,a):
- """Returns value of key `a` from the underlying dict first, then from the defaults.
-
- Raises `AttributeError` on failure.
- """
- try:
- return dct[a]
- except KeyError:
- return getattr(defaults_obj[0], a)
- def __setattr__(s,a,v):
- dct[a] = v
- return Proxy()
-
-def load_state_fn(state):
-
- #
- # load the experiment class
- #
- dbdict_module_name = state[MODULE]
- dbdict_symbol = state[SYMBOL]
-
- preimport_list = state.get(PREIMPORT, "").split()
- for preimport in preimport_list:
- __import__(preimport, fromlist=[None], level=0)
-
- try:
- dbdict_module = __import__(dbdict_module_name, fromlist=[None], level=0)
- dbdict_fn = getattr(dbdict_module, dbdict_symbol)
- except:
- print >> sys.stderr, "FAILED to load job symbol:", dbdict_module_name, dbdict_symbol
- print >> sys.stderr, "PATH", sys.path
- raise
-
- return dbdict_fn
-
-
-def run_state(state, channel = dummy_channel):
- fn = load_state_fn(state)
- rval = fn(state, channel)
- if rval not in (COMPLETE, INCOMPLETE):
- print >> sys.stderr, "WARNING: INVALID job function return value"
- return rval
-
-def call_with_kwargs_from_dict(fn, dct, logfile='stderr'):
- """Call function `fn` with kwargs taken from dct.
-
- When fn has a '**' parameter, this function is equivalent to fn(**dct).
-
- When fn has no '**' parameter, this function removes keys from dct which are not parameter
- names of `fn`. The keys which are ignored in this way are logged to the `logfile`. If
- logfile is the string 'stdout' or 'stderr', then errors are logged to sys.stdout or
- sys.stderr respectively.
-
- :param fn: function to call
- :param dct: dictionary from which to take arguments of fn
- :param logfile: log ignored keys to this file
- :type logfile: file-like object or string 'stdout' or string 'stderr'
-
- :returns: fn(**)
-
- """
- argspec = inspect.getargspec(fn)
- argnames = argspec[0]
- if argspec[2] == None: #if there is no room for a **args type-thing in fn...
- kwargs = {}
- for k,v in dct.items():
- if k in argnames:
- kwargs[k] = v
- else:
- if not logfile:
- pass
- elif logfile == 'stderr':
- print >> sys.stderr, "WARNING: DictProxyState.call_substate ignoring key-value pair:", k, v
- elif logfile == 'stdout':
- print >> sys.stdout, "WARNING: DictProxyState.call_substate ignoring key-value pair:", k, v
- else:
- print >> logfile, "WARNING: DictProxyState.call_substate ignoring key-value pair:", k, v
- return fn(**kwargs)
- else:
- #there is a **args type thing in fn. Here we pass everything.
- return fn(**s.subdict(prefix))
-
diff -r d3d8f5a17909 -r 2704c8688ced pylearn/external/wrap_libsvm.py
--- a/pylearn/external/wrap_libsvm.py Wed Feb 11 01:42:58 2009 -0500
+++ b/pylearn/external/wrap_libsvm.py Wed Feb 11 01:43:14 2009 -0500
@@ -61,6 +61,7 @@
#libsvm needs stuff in int32 on a 32bit machine
#TODO: test this on a 64bit machine
+ # -> Both int32 and int64 (default) seem to be OK
train_y = numpy.asarray(dataset.train.y, dtype='int32')
valid_y = numpy.asarray(dataset.valid.y, dtype='int32')
test_y = numpy.asarray(dataset.test.y, dtype='int32')