changeset 575:cf19655ec48b

Automated merge with ssh://projects@lgcm.iro.umontreal.ca/hg/pylearn_refactor
author Frederic Bastien <bastienf@iro.umontreal.ca>
date Thu, 04 Dec 2008 11:03:38 -0500
parents 9f5891cd4048 (diff) 220044be9fd8 (current diff)
children df2e2c7ba4ac 1972bc9bea6d
files
diffstat 19 files changed, 970 insertions(+), 52 deletions(-) [+]
line wrap: on
line diff
--- a/pylearn/algorithms/kernel_regression.py	Thu Dec 04 10:56:44 2008 -0500
+++ b/pylearn/algorithms/kernel_regression.py	Thu Dec 04 11:03:38 2008 -0500
@@ -4,16 +4,16 @@
 
 from pylearn.learner import OfflineLearningAlgorithm
 from theano import tensor as T
-from nnet_ops import prepend_1_to_each_row
+from theano.tensor.nnet import prepend_1_to_each_row
 from theano.scalar import as_scalar
 from common.autoname import AutoName
 import theano
 import numpy
 
 # map a N-vector to a 1xN matrix
-row_vector = theano.elemwise.DimShuffle((False,),['x',0])
+row_vector = theano.tensor.DimShuffle((False,),['x',0])
 # map a N-vector to a Nx1 matrix
-col_vector = theano.elemwise.DimShuffle((False,),[0,'x'])
+col_vector = theano.tensor.DimShuffle((False,),[0,'x'])
 
 class KernelRegression(OfflineLearningAlgorithm):
     """
--- a/pylearn/algorithms/linear_regression.py	Thu Dec 04 10:56:44 2008 -0500
+++ b/pylearn/algorithms/linear_regression.py	Thu Dec 04 11:03:38 2008 -0500
@@ -6,7 +6,7 @@
 
 from pylearn.learner import OfflineLearningAlgorithm,OnlineLearningAlgorithm
 from theano import tensor as T
-from nnet_ops import prepend_1_to_each_row
+from theano.tensor.nnet import prepend_1_to_each_row
 from theano.scalar import as_scalar
 from common.autoname import AutoName
 import theano
@@ -96,14 +96,14 @@
 
     __compiled = False
     @classmethod
-    def compile(cls,linker='c|py'):
+    def compile(cls, mode = "FAST_RUN"):
         if cls.__compiled:
             return
         def fn(input_vars,output_vars):
-            return staticmethod(theano.function(input_vars,output_vars, linker=linker))
+            return staticmethod(theano.function(input_vars, output_vars, mode=mode))
 
-        cls.compute_outputs = fn([cls.inputs,cls.theta],[cls.outputs])
-        cls.compute_errors = fn([cls.outputs,cls.targets],[cls.squared_errors])
+        cls.compute_outputs = fn([cls.inputs,cls.theta],cls.outputs)
+        cls.compute_errors = fn([cls.outputs,cls.targets],cls.squared_errors)
 
         cls.__compiled = True
 
@@ -115,17 +115,17 @@
     XtX = T.matrix() # (n_inputs+1) x (n_inputs+1)
     XtY = T.matrix() # (n_inputs+1) x n_outputs
     extended_input = prepend_1_to_each_row(P.inputs)
-    new_XtX = T.add_inplace(XtX,T.dot(extended_input.T,extended_input))
-    new_XtY = T.add_inplace(XtY,T.dot(extended_input.T,P.targets))
+    new_XtX = T.add(XtX,T.dot(extended_input.T,extended_input))
+    new_XtY = T.add(XtY,T.dot(extended_input.T,P.targets))
 
     __compiled = False
     
     @classmethod
-    def compile(cls,linker='c|py'):
+    def compile(cls, mode="FAST_RUN"):
         if cls.__compiled:
             return
         def fn(input_vars,output_vars):
-            return staticmethod(theano.function(input_vars,output_vars, linker=linker))
+            return staticmethod(theano.function(input_vars, output_vars, mode=mode))
 
         cls.update = fn([cls.XtX,cls.XtY,cls.P.inputs,cls.P.targets],[cls.new_XtX,cls.new_XtY])
 
--- a/pylearn/algorithms/logistic_regression.py	Thu Dec 04 10:56:44 2008 -0500
+++ b/pylearn/algorithms/logistic_regression.py	Thu Dec 04 11:03:38 2008 -0500
@@ -40,11 +40,15 @@
         #here we actually build the model
         self.linear_output = T.dot(self.input, self.w) + self.b
         if 0:
+            # TODO: pending support for target being a sparse matrix
             self.softmax = nnet.softmax(self.linear_output)
 
             self._max_pr, self.argmax = T.max_and_argmax(self.linear_output)
             self._xent = self.target * T.log(self.softmax)
         else:
+            # TODO: when above is fixed, remove this hack (need an argmax
+            # which is independent of targets)
+            self.argmax_standalone = T.argmax(self.linear_output);
             (self._xent, self.softmax, self._max_pr, self.argmax) =\
                     nnet.crossentropy_softmax_max_and_argmax_1hot(
                     self.linear_output, self.target)
@@ -149,12 +153,12 @@
     def __init__(self, input=None, targ=None, w=None, b=None, lr=None, regularize=False):
         super(LogReg2, self).__init__() #boilerplate
 
-        self.input = input if input is not None else T.matrix('input')
-        self.targ = targ if targ is not None else T.lcol()
+        self.input = module.Member(input) if input is not None else T.matrix('input')
+        self.targ = module.Member(targ) if targ is not None else T.lcol()
 
-        self.w = w if w is not None else module.Member(T.dmatrix())
-        self.b = b if b is not None else module.Member(T.dvector())
-        self.lr = lr if lr is not None else module.Member(T.dscalar())
+        self.w = module.Member(w) if w is not None else module.Member(T.dmatrix())
+        self.b = module.Member(b) if b is not None else module.Member(T.dvector())
+        self.lr = module.Member(lr) if lr is not None else module.Member(T.dscalar())
 
         self.params = [p for p in [self.w, self.b] if p.owner is None]
 
--- a/pylearn/algorithms/sandbox/_test_onehotop.py	Thu Dec 04 10:56:44 2008 -0500
+++ b/pylearn/algorithms/sandbox/_test_onehotop.py	Thu Dec 04 11:03:38 2008 -0500
@@ -3,7 +3,7 @@
 import unittest
 from theano import compile
 from theano import gradient
-
+from theano import function
 from theano.tensor import as_tensor
 
 import random
@@ -14,8 +14,8 @@
         x = as_tensor([3, 2, 1])
         y = as_tensor(5)
         o = one_hot(x, y)
-        y = compile.eval_outputs([o])
-        self.failUnless(numpy.all(y == numpy.asarray([[0, 0, 0, 1, 0], [0, 0, 1, 0, 0], [0, 1, 0, 0, 0]])))
+        f = function([],o)
+        self.failUnless(numpy.all(f() == numpy.asarray([[0, 0, 0, 1, 0], [0, 0, 1, 0, 0], [0, 1, 0, 0, 0]])))
 
 if __name__ == '__main__':
     unittest.main()
--- a/pylearn/algorithms/stopper.py	Thu Dec 04 10:56:44 2008 -0500
+++ b/pylearn/algorithms/stopper.py	Thu Dec 04 11:03:38 2008 -0500
@@ -122,6 +122,16 @@
 
         raise StopIteration
 
+class NStages(ICML08Stopper):
+    """Run for a fixed number of steps, checking validation set every so
+    often."""
+    def __init__(self, hard_limit, v_int):
+        ICML08Stopper.__init__(self, hard_limit, v_int, 1.0, 1.0, hard_limit)
+
+    #TODO: could optimize next() function. Most of what's in ICML08Stopper.next()
+    #is not necessary
+
+
 @stopper_factory('icml08')
 def icml08_stopper(i_wait, v_int, min_improvement, patience, hard_limit):
     return ICML08Stopper(i_wait, v_int, min_improvement, patience, hard_limit)
--- a/pylearn/algorithms/tests/test_aa.py	Thu Dec 04 10:56:44 2008 -0500
+++ b/pylearn/algorithms/tests/test_aa.py	Thu Dec 04 11:03:38 2008 -0500
@@ -29,6 +29,14 @@
 
 if __name__ == '__main__':
     numpy.random.seed(10)
+    print 'sanity check:'
+    t1 = test_train('SANITY_CHECK')
+#     t1 = test_train([theano.Mode('c|py', 'fast_compile'),
+#                      theano.Mode('c|py', 'fast_run')])
+    print 'time:',t1
+    print
+
+    numpy.random.seed(10)
     print 'optimized:'
     t1 = test_train(theano.Mode('c|py', 'fast_run'))
     print 'time:',t1
--- a/pylearn/algorithms/tests/test_daa.py	Thu Dec 04 10:56:44 2008 -0500
+++ b/pylearn/algorithms/tests/test_daa.py	Thu Dec 04 11:03:38 2008 -0500
@@ -28,7 +28,7 @@
             model.local_update[l]([[0, 1, 0, 1]])
             model.local_update[l]([[1, 0, 1, 0]])
 
-    for i in range(1):
+    for i in range(10):
         model.update([[0, 1, 0, 1]], [[1]])
         model.update([[1, 0, 1, 0]], [[0]])
     print model.classify([[0, 1, 0, 1]])
@@ -41,23 +41,31 @@
     daa = models.Stacker([(models.SigmoidXEDenoisingAA, 'hidden')] * ndaa + [(pylearn.algorithms.logistic_regression.Module_Nclass, 'pred')],
                          regularize = False)
 
-    model = daa.make([4, 20, 20, 20, 10],
+    model = daa.make([4] + [20] * ndaa + [10],
                      lr = 0.01,
                      mode = mode,
                      seed = 10)
 
-    model.layers[0].noise_level = 0.3
-    model.layers[1].noise_level = 0.3
-    model.layers[2].noise_level = 0.3
+    for l in range(ndaa): model.layers[l].noise_level = 0.3
 
-    for l in range(3):
+    instances = [([[0, 1, 0, 1]], [1]), ([[1, 0, 1, 0]], [0])]
+
+    for l in range(ndaa):
         for i in range(10):
-            model.local_update[l]([[0, 1, 0, 1]])
-            model.local_update[l]([[1, 0, 1, 0]])
+            for (input, output) in instances:
+                model.local_update[l](input)
 
-    for i in range(1):
-        model.update([[0, 1, 0, 1]], [1])
-        model.update([[1, 0, 1, 0]], [0])
+    for i in range(10):
+        for (input, output) in instances:
+#            model.update(input, output)
+            print "OLD:", 
+            print model.validate(input, output)
+            oldloss = model.update(input, output)
+            print oldloss
+            print "NEW:"
+            print model.validate(input, output)
+            print 
+
     print model.apply([[0, 1, 0, 1]])
     print model.apply([[1, 0, 1, 0]])
 
--- a/pylearn/algorithms/tests/test_linear_regression.py	Thu Dec 04 10:56:44 2008 -0500
+++ b/pylearn/algorithms/tests/test_linear_regression.py	Thu Dec 04 11:03:38 2008 -0500
@@ -21,5 +21,18 @@
         print 'mse = ',mse
         
 if __name__ == '__main__':
-    unittest.main()
-        
+    import sys
+
+    if len(sys.argv)==1:
+        unittest.main()
+    else:
+        assert sys.argv[1]=="--debug"
+        tests = []
+        for arg in sys.argv[2:]:
+            tests.append(arg)
+        if tests:
+            unittest.TestSuite(map(T_DataSet, tests)).debug()
+        else:
+            module = __import__("_test_linear_regression")
+            tests = unittest.TestLoader().loadTestsFromModule(module)
+            tests.debug()
--- a/pylearn/datasets/MNIST.py	Thu Dec 04 10:56:44 2008 -0500
+++ b/pylearn/datasets/MNIST.py	Thu Dec 04 11:03:38 2008 -0500
@@ -46,6 +46,7 @@
             y=all_targ[ntrain+nvalid:ntrain+nvalid+ntest])
 
     rval.n_classes = 10
+    rval.img_shape = (28,28)
     return rval
 
 
--- a/pylearn/datasets/embeddings/parameters.py	Thu Dec 04 10:56:44 2008 -0500
+++ b/pylearn/datasets/embeddings/parameters.py	Thu Dec 04 11:03:38 2008 -0500
@@ -1,10 +1,10 @@
 """
 Locations of the embedding data files.
 """
-WEIGHTSFILE     = "/home/fringant2/lisa/data/word_embeddings.collobert-and-weston/lm-weights.txt"
-VOCABFILE       = "/home/fringant2/lisa/data/word_embeddings.collobert-and-weston/words.asc"
-#WEIGHTSFILE     = "/home/joseph/data/word_embeddings.collobert-and-weston/lm-weights.txt"
-#VOCABFILE       = "/home/joseph/data/word_embeddings.collobert-and-weston/words.asc"
+#WEIGHTSFILE     = "/home/fringant2/lisa/data/word_embeddings.collobert-and-weston/lm-weights.txt"
+#VOCABFILE       = "/home/fringant2/lisa/data/word_embeddings.collobert-and-weston/words.asc"
+WEIGHTSFILE     = "/home/joseph/data/word_embeddings.collobert-and-weston/lm-weights.txt"
+VOCABFILE       = "/home/joseph/data/word_embeddings.collobert-and-weston/words.asc"
 NUMBER_OF_WORDS = 30000
 DIMENSIONS      = 50
 UNKNOWN         = "UNKNOWN"
--- a/pylearn/datasets/embeddings/process.py	Thu Dec 04 10:56:44 2008 -0500
+++ b/pylearn/datasets/embeddings/process.py	Thu Dec 04 11:03:38 2008 -0500
@@ -11,6 +11,12 @@
 __word_to_embedding = None
 __read = False
 
+def length():
+    """
+    @return: The length of embeddings
+    """
+    return len(__word_to_embedding[__words[0]])
+
 def word_to_embedding(w):
     read_embeddings()
     return __word_to_embedding[w]
@@ -39,29 +45,21 @@
         w = __words[i]
         __word_to_embedding[w] = l
     __read = True
+    for w in __word_to_embedding: assert len(__word_to_embedding[__words[0]]) == len(__word_to_embedding[w])
     sys.stderr.write("...done reading %s\n" % WEIGHTSFILE)
 
 import re
 numberre = re.compile("[0-9]")
-slashre = re.compile("\\\/")
 
-def preprocess_word(origw):
+def preprocess_word(w):
     """
     Convert a word so that it can be embedded directly.
     Returned the preprocessed sequence.
-    @note: Preprocessing is appropriate for Penn Treebank style documents.
+    @note: Perhaps run L{common.penntreebank.preprocess} on the word first.
     """
     read_embeddings()
-    if origw == "-LRB-": w = "("
-    elif origw == "-RRB-": w = ")"
-    elif origw == "-LCB-": w = "{"
-    elif origw == "-RCB-": w = "}"
-    elif origw == "-LSB-": w = "["
-    elif origw == "-RSB-": w = "]"
-    else:
-        w = origw
+    if w not in __word_to_embedding:
         w = string.lower(w)
-        w = slashre.sub("/", w)
         w = numberre.sub("NUMBER", w)
     if w not in __word_to_embedding:
 #        sys.stderr.write("Word not in vocabulary, using %s: %s (original %s)\n" % (UNKNOWN, w, origw))
--- a/pylearn/datasets/make_test_datasets.py	Thu Dec 04 10:56:44 2008 -0500
+++ b/pylearn/datasets/make_test_datasets.py	Thu Dec 04 11:03:38 2008 -0500
@@ -1,4 +1,4 @@
-import dataset
+from dataset import ArrayDataSet
 from shapeset.dset import Polygons
 from linear_regression import linear_predictor
 from kernel_regression import kernel_predictor
@@ -110,6 +110,7 @@
     #  testset = ArrayDataSet(inputs[n_examples/2:],{'input':slice(0,n_inputs)}) | \
     #            ArrayDataSet(targets[n_examples/2:],{'target':slice(0,n_targets)})
     data = hstack((inputs,targets))
+
     trainset = ArrayDataSet(data[0:n_train],
                             {'input':slice(0,n_inputs),'target':slice(n_inputs,n_inputs+n_targets)})
     testset = ArrayDataSet(data[n_train:],
--- a/pylearn/datasets/shapeset1.py	Thu Dec 04 10:56:44 2008 -0500
+++ b/pylearn/datasets/shapeset1.py	Thu Dec 04 11:03:38 2008 -0500
@@ -7,7 +7,7 @@
 import os
 import numpy
 
-from ..amat import AMat
+from ..io.amat import AMat
 from .config import data_root
 
 def _head(path, n):
--- a/pylearn/datasets/smallNorb.py	Thu Dec 04 10:56:44 2008 -0500
+++ b/pylearn/datasets/smallNorb.py	Thu Dec 04 11:03:38 2008 -0500
@@ -1,6 +1,6 @@
 import os
 import numpy
-from ..filetensor import read
+from ..io.filetensor import read
 from .config import data_root
 
 #Path = '/u/bergstrj/pub/data/smallnorb'
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/pylearn/datasets/testDataset.py	Thu Dec 04 11:03:38 2008 -0500
@@ -0,0 +1,43 @@
+"""
+Various routines to load/access MNIST data.
+"""
+from __future__ import absolute_import
+
+import os
+import numpy
+
+from ..io.amat import AMat
+from .config import data_root
+from .dataset import dataset_factory, Dataset
+
+VALSEQ, VALRAND = range(2)
+
+@dataset_factory('DEBUG')
+def mnist_factory(variant='', ntrain=10, nvalid=10, ntest=10, \
+        nclass=2, ndim=1, dshape=None, valtype=VALSEQ):
+
+    temp = []
+    [temp.append(5) for i in range(ndim)]
+    dshape = temp if dshape is None else dshape
+
+    rval = Dataset()
+    rval.n_classes = nclass
+    rval.img_shape = dshape
+
+    dsize = numpy.prod(dshape);
+
+    print ntrain, nvalid, ntest, nclass, dshape, valtype
+
+    ntot = ntrain + nvalid + ntest
+    xdata = numpy.arange(ntot*numpy.prod(dshape)).reshape((ntot,dsize)) \
+            if valtype is VALSEQ else \
+            numpy.random.random((ntot,dsize));
+    ydata = numpy.round(numpy.random.random(ntot));
+
+    rval.train = Dataset.Obj(x=xdata[0:ntrain],y=ydata[0:ntrain])
+    rval.valid = Dataset.Obj(x=xdata[ntrain:ntrain+nvalid],\
+                             y=ydata[ntrain:ntrain+nvalid])
+    rval.test =  Dataset.Obj(x=xdata[ntrain+nvalid:ntrain+nvalid+ntest],
+                             y=ydata[ntrain+nvalid:ntrain+nvalid+ntest])
+
+    return rval
--- a/pylearn/dbdict/api0.py	Thu Dec 04 10:56:44 2008 -0500
+++ b/pylearn/dbdict/api0.py	Thu Dec 04 11:03:38 2008 -0500
@@ -180,6 +180,9 @@
                 s.commit()
                 s.update(d_self)
 
+            def iteritems(d_self):
+                return d_self.items()
+
             def items(d_self):
                 return [(kv.name, kv.val) for kv in d_self._attrs]
             
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/pylearn/dbdict/newstuff.py	Thu Dec 04 11:03:38 2008 -0500
@@ -0,0 +1,614 @@
+
+from __future__ import with_statement
+
+from collections import defaultdict
+import re, sys, inspect, os, signal, tempfile, shutil, socket
+
+import sql
+
+
+################################################################################
+### misc
+################################################################################
+
+class DD(defaultdict):
+    def __getattr__(self, attr):
+        return self[attr]
+    def __setattr__(self, attr, value):
+        self[attr] = value
+    def __str__(self):
+        return 'DD%s' % dict(self)
+    def __repr__(self):
+        return str(self)
+
+################################################################################
+### resolve
+################################################################################
+
+def resolve(name):
+    symbols = name.split('.')
+    builder = __import__(symbols[0])
+    for sym in symbols[1:]:
+        builder = getattr(builder, sym)
+    return builder
+
+################################################################################
+### dictionary
+################################################################################
+
+def convert(obj):
+    try:
+        return eval(obj, {}, {})
+    except NameError:
+        return obj
+
+def flatten(obj):
+    d = {}
+    def helper(d, prefix, obj):
+        if isinstance(obj, (str, int, float)):
+            d[prefix] = obj #convert(obj)
+        else:
+            if isinstance(obj, dict):
+                subd = obj
+            else:
+                subd = obj.state()
+                subd['__builder__'] = '%s.%s' % (obj.__module__, obj.__class__.__name__)
+            for k, v in subd.iteritems():
+                pfx = '.'.join([prefix, k]) if prefix else k
+                helper(d, pfx, v)
+    helper(d, '', obj)
+    return d
+
+def expand(d):
+    def dd():
+        return DD(dd)
+    struct = dd()
+    for k, v in d.iteritems():
+        if k == '':
+            raise NotImplementedError()
+        else:
+            keys = k.split('.')
+        current = struct
+        for k2 in keys[:-1]:
+            current = current[k2]
+        current[keys[-1]] = v #convert(v)
+    return struct
+
+def realize(d):
+    if not isinstance(d, dict):
+        return d
+    d = dict((k, realize(v)) for k, v in d.iteritems())
+    if '__builder__' in d:
+        builder = resolve(d.pop('__builder__'))
+        return builder(**d)
+    return d
+
+def make(d):
+    return realize(expand(d))
+
+################################################################################
+### errors
+################################################################################
+
+class UsageError(Exception):
+    pass
+
+################################################################################
+### parsing and formatting
+################################################################################
+
+def parse(*strings):
+    d = {}
+    for string in strings:
+        s1 = re.split(' *= *', string, 1)
+        s2 = re.split(' *:: *', string, 1)
+        if len(s1) == 1 and len(s2) == 1:
+            raise UsageError('Expected a keyword argument in place of "%s"' % s1[0])
+        elif len(s1) == 2:
+            k, v = s1
+            v = convert(v)
+        elif len(s2) == 2:
+            k, v = s2
+            k += '.__builder__'
+        d[k] = v
+    return d
+
+def format_d(d, sep = '\n', space = True):
+    d = flatten(d)
+    pattern = "%s = %r" if space else "%s=%r"
+    return sep.join(pattern % (k, v) for k, v in d.iteritems())
+
+def format_help(topic):
+    if topic is None:
+        return 'No help.'
+    elif hasattr(topic, 'help'):
+        help = topic.help()
+    else:
+        help = topic.__doc__
+    if not help:
+        return 'No help.'
+
+    ss = map(str.rstrip, help.split('\n'))
+    try:
+        baseline = min([len(line) - len(line.lstrip()) for line in ss if line])
+    except:
+        return 'No help.'
+    s = '\n'.join([line[baseline:] for line in ss])
+    s = re.sub(string = s, pattern = '\n{2,}', repl = '\n\n')
+    s = re.sub(string = s, pattern = '(^\n*)|(\n*$)', repl = '')
+
+    return s
+
+################################################################################
+### single channels
+################################################################################
+
+# try:
+#     import greenlet
+# except:
+#     try:
+#         from py import greenlet
+#     except:
+#         print >>sys.stderr, 'the greenlet module is unavailable'
+#         greenlet = None
+
+
+class Channel(object):
+
+    COMPLETE = None
+    INCOMPLETE = True
+    
+    START = 0
+    """dbdict.status == START means a experiment is ready to run"""
+    RUNNING = 1
+    """dbdict.status == RUNNING means a experiment is running on dbdict_hostname"""
+    DONE = 2
+    """dbdict.status == DONE means a experiment has completed (not necessarily successfully)"""
+
+    # Methods to be used by the experiment to communicate with the channel
+
+    def save(self):
+        """
+        Save the experiment's state to the various media supported by
+        the Channel.
+        """
+        raise NotImplementedError()
+
+    def switch(self, message = None):
+        """
+        Called from the experiment to give the control back to the channel.
+        The following return values are meaningful:
+          * 'stop' -> the experiment must stop as soon as possible. It may save what
+            it needs to save. This occurs when SIGTERM or SIGINT are sent (or in
+            user-defined circumstances).
+        switch() may give the control to the user. In this case, the user may
+        resume the experiment by calling switch() again. If an argument is given
+        by the user, it will be relayed to the experiment.
+        """
+        pass
+
+    def __call__(self, message = None):
+        return self.switch(message)
+
+    def save_and_switch(self):
+        self.save()
+        self.switch()
+
+    # Methods to run the experiment
+
+    def setup(self):
+        pass
+
+    def __enter__(self):
+        pass
+
+    def __exit__(self):
+        pass
+
+    def run(self):
+        pass
+
+
+
+class JobError(Exception):
+    RUNNING = 0
+    DONE = 1
+    NOJOB = 2
+
+
+class SingleChannel(Channel):
+
+    def __init__(self, experiment, state):
+        self.experiment = experiment
+        self.state = state
+        self.feedback = None
+
+    def switch(self, message = None):
+        feedback = self.feedback
+        self.feedback = None
+        return feedback
+
+    def run(self, force = False):
+        self.setup()
+
+        status = self.state.dbdict.get('status', self.START)
+        if status is self.DONE and not force:
+            # If you want to disregard this, use the --force flag (not yet implemented)
+            raise JobError(JobError.RUNNING,
+                           'The job has already completed.')
+        elif status is self.RUNNING and not force:
+            raise JobError(JobError.DONE,
+                           'The job is already running.')
+        self.state.dbdict.status = self.RUNNING
+
+        v = self.COMPLETE
+        with self:
+            try:
+                v = self.experiment(self, self.state)
+            finally:
+                self.state.dbdict.status = self.DONE if v is self.COMPLETE else self.START
+
+        return v
+
+    def on_sigterm(self, signo, frame):
+        # SIGTERM handler. It is the experiment function's responsibility to
+        # call switch() often enough to get this feedback.
+        self.feedback = 'stop'
+
+    def __enter__(self):
+        # install a SIGTERM handler that asks the experiment function to return
+        # the next time it will call switch()
+        self.prev_sigterm = signal.getsignal(signal.SIGTERM)
+        self.prev_sigint = signal.getsignal(signal.SIGINT)
+        signal.signal(signal.SIGTERM, self.on_sigterm)
+        signal.signal(signal.SIGINT, self.on_sigterm)
+        return self
+
+    def __exit__(self, type, value, traceback):
+        signal.signal(signal.SIGTERM, self.prev_sigterm)
+        signal.signal(signal.SIGINT, self.prev_sigint)
+        self.prev_sigterm = None
+        self.prev_sigint = None
+        self.save()
+
+
+class StandardChannel(SingleChannel):
+
+    def __init__(self, path, experiment, state, redirect_stdout = False, redirect_stderr = False):
+        super(StandardChannel, self).__init__(experiment, state)
+        self.path = os.path.realpath(path)
+        self.redirect_stdout = redirect_stdout
+        self.redirect_stderr = redirect_stderr
+
+    def save(self):
+        with open(os.path.join(self.path, 'current.conf'), 'w') as current:
+            current.write(format_d(self.state))
+            current.write('\n')
+
+    def __enter__(self):
+        self.old_cwd = os.getcwd()
+        os.chdir(self.path)
+        if self.redirect_stdout:
+            self.old_stdout = sys.stdout
+            sys.stdout = open('stdout', 'a')
+        if self.redirect_stderr:
+            self.old_stderr = sys.stderr
+            sys.stderr = open('stderr', 'a')
+        return super(StandardChannel, self).__enter__()
+
+    def __exit__(self, type, value, traceback):
+        if self.redirect_stdout:
+            sys.stdout.close()
+            sys.stdout = self.old_stdout
+        if self.redirect_stderr:
+            sys.stderr.close()
+            sys.stderr = self.old_stderr
+        os.chdir(self.old_cwd)
+        return super(StandardChannel, self).__exit__(type, value, traceback)
+
+    def setup(self):
+        if not os.path.isdir(self.path):
+            os.makedirs(self.path)
+        with self:
+            origf = os.path.join(self.path, 'orig.conf')
+            if not os.path.isfile(origf):
+                with open(origf, 'w') as orig:
+                    orig.write(format_d(self.state))
+                    orig.write('\n')
+            currentf = os.path.join(self.path, 'current.conf')
+            if os.path.isfile(currentf):
+                with open(currentf, 'r') as current:
+                    self.state = expand(parse(*map(str.strip, current.readlines())))
+
+
+class RSyncException(Exception):
+    pass
+
+class RSyncChannel(StandardChannel):
+
+    def __init__(self, path, remote_path, experiment, state):
+        super(RSyncChannel, self).__init__(path, experiment, state)
+
+        ssh_prefix='ssh://'
+        if remote_path.startswith(ssh_prefix):
+            remote_path = remote_path[len(ssh_prefix):]
+            colon_pos = remote_path.find(':')
+            self.host = remote_path[:colon_pos]
+            self.remote_path = remote_path[colon_pos+1:]
+        else:
+            self.host = ''
+            self.remote_path = os.path.realpath(remote_path)
+
+    def rsync(self, direction):
+        """The directory at which experiment-related files are stored.
+        """
+
+        path = self.path
+
+        remote_path = self.remote_path
+        if self.host:
+            remote_path = ':'.join([self.host, remote_path])
+
+        # TODO: use something more portable than os.system
+        if direction == 'push':
+            rsync_cmd = 'rsync -ar "%s/" "%s/"' % (path, remote_path)
+        elif direction == 'pull':
+            rsync_cmd = 'rsync -ar "%s/" "%s/"' % (remote_path, path)
+        else:
+            raise RSyncException('invalid direction', direction)
+
+        rsync_rval = os.system(rsync_cmd)
+        if rsync_rval != 0:
+            raise RSyncException('rsync failure', (rsync_rval, rsync_cmd))
+
+    def touch(self):
+        if self.host:
+            host = self.host
+            touch_cmd = ('ssh %(host)s  "mkdir -p \'%(path)s\'"' % dict(host = self.host,
+                                                                        path = self.remote_path))
+        else:
+            touch_cmd = ("mkdir -p '%(path)s'" % dict(path = self.remote_path))
+        print "ECHO", touch_cmd
+        touch_rval = os.system(touch_cmd)
+        if 0 != touch_rval:
+            raise Exception('touch failure', (touch_rval, touch_cmd))
+
+    def pull(self):
+        return self.rsync('pull')
+
+    def push(self):
+        return self.rsync('push')
+
+    def save(self):
+        super(RSyncChannel, self).save()
+        self.push()
+
+    def setup(self):
+        self.touch()
+        self.pull()
+        super(RSyncChannel, self).setup()
+
+
+class DBRSyncChannel(RSyncChannel):
+
+    RESTART_PRIORITY = 2.0
+
+    def __init__(self, username, password, hostname, dbname, tablename, path, remote_root):
+        self.username, self.password, self.hostname, self.dbname, self.tablename \
+            = username, password, hostname, dbname, tablename
+
+        self.db = sql.postgres_serial(
+            user = self.username, 
+            password = self.password, 
+            host = self.hostname,
+            database = self.dbname,
+            table_prefix = self.tablename)
+
+        self.dbstate = sql.book_dct_postgres_serial(self.db)
+        if self.dbstate is None:
+            raise JobError(JobError.NOJOB,
+                           'No job was found to run.')
+
+        try:
+            state = expand(self.dbstate)
+            experiment = resolve(state.dbdict.experiment)
+            remote_path = os.path.join(remote_root, self.dbname, self.tablename, str(self.dbstate.id))
+            super(DBRSyncChannel, self).__init__(path, remote_path, experiment, state)
+        except:
+            self.dbstate['dbdict.status'] = self.START
+            raise
+
+    def save(self):
+        super(DBRSyncChannel, self).save()
+        self.dbstate.update(flatten(self.state))
+    
+    def setup(self):
+        # Extract a single experiment from the table that is not already running.
+        # set self.experiment and self.state
+        super(DBRSyncChannel, self).setup()
+        self.state.dbdict.sql.host_name = socket.gethostname()
+        self.state.dbdict.sql.host_workdir = self.path
+        self.dbstate.update(flatten(self.state))
+
+    def run(self):
+        # We pass the force flag as True because the status flag is
+        # already set to RUNNING by book_dct in __init__
+        v = super(DBRSyncChannel, self).run(force = True)
+        if v is self.INCOMPLETE and self.state.dbdict.sql.priority != self.RESTART_PRIORITY:
+            self.state.dbdict.sql.priority = self.RESTART_PRIORITY
+            self.save()
+        return v
+
+
+
+################################################################################
+### running
+################################################################################
+
+def run(type, arguments):
+    runner = runner_registry.get(type, None)
+    if not runner:
+        raise UsageError('Unknown runner: "%s"' % type)
+    argspec = inspect.getargspec(runner)
+    minargs = len(argspec[0])-(len(argspec[3]) if argspec[3] else 0)
+    maxargs = len(argspec[0])
+    if minargs > len(arguments) or maxargs < len(arguments) and not argspec[1]:
+        s = format_help(runner)
+        raise UsageError(s)
+    runner(*arguments)
+
+runner_registry = dict()
+
+def runner_cmdline(experiment, *strings):
+    """
+    Start an experiment with parameters given on the command line.
+
+    Usage: cmdline <experiment> <prop1::type> <prop1=value1> <prop2=value2> ...
+
+    Run an experiment with parameters provided on the command
+    line.  The symbol described by <experiment> will be imported
+    using the normal python import rules and will be called with
+    the dictionary described on the command line.
+
+    The signature of the function located at <experiment> must
+    look like:
+        def my_experiment(state, channel):
+            ...
+
+    Examples of setting parameters:
+        a=2 => state['a'] = 2
+        b.c=3 => state['b']['c'] = 3
+        p::mymodule.Something => state['p']['__builder__']=mymodule.Something
+
+    Example call:
+        run_experiment cmdline mymodule.my_experiment \\
+            stopper::pylearn.stopper.nsteps \\ # use pylearn.stopper.nsteps
+            stopper.n=10000 \\ # the argument "n" of nsteps is 10000
+            lr=0.03
+    """
+    state = expand(parse(*strings))
+    state.dbdict.experiment = experiment
+    experiment = resolve(experiment)
+    #channel = RSyncChannel('.', 'yaddayadda', experiment, state)
+    channel = StandardChannel(format_d(state, sep=',', space = False),
+                              experiment, state)
+    channel.run()
+
+runner_registry['cmdline'] = runner_cmdline
+
+
+def runner_sqlschedule(dbdescr, experiment, *strings):
+    try:
+        username, password, hostname, dbname, tablename \
+            = sql.parse_dbstring(dbdescr)
+    except:
+        raise UsageError('Wrong syntax for dbdescr')
+
+    db = sql.postgres_serial(
+        user = username, 
+        password = password, 
+        host = hostname,
+        database = dbname,
+        table_prefix = tablename)
+
+    state = parse(*strings)
+    state['dbdict.experiment'] = experiment
+    sql.add_experiments_to_db([state], db, verbose = 1)
+
+runner_registry['sqlschedule'] = runner_sqlschedule
+
+
+
+def runner_sql(dbdescr, exproot):
+    try:
+        username, password, hostname, dbname, tablename \
+            = sql.parse_dbstring(dbdescr)
+    except:
+        raise UsageError('Wrong syntax for dbdescr')
+    workdir = tempfile.mkdtemp()
+    print 'wdir', workdir
+    channel = DBRSyncChannel(username, password, hostname, dbname, tablename,
+                             workdir,
+                             exproot)
+    channel.run()
+    shutil.rmtree(workdir, ignore_errors=True)
+
+runner_registry['sql'] = runner_sql
+
+    
+
+
+
+def help(topic = None):
+    """
+    Get help for a topic.
+
+    Usage: help <topic>
+    """
+    if topic is None:
+        print 'Available commands: (use help <command> for more info)'
+        print
+        for name, command in sorted(runner_registry.iteritems()):
+            print name.ljust(20), format_help(command).split('\n')[0]
+        return
+    print format_help(runner_registry.get(topic, None))
+
+runner_registry['help'] = help
+
+################################################################################
+### main
+################################################################################
+
+def run_cmdline():
+    try:
+        if len(sys.argv) <= 1:
+            raise UsageError('Usage: %s <run_type> [<arguments>*]' % sys.argv[0])
+        run(sys.argv[1], sys.argv[2:])
+    except UsageError, e:
+        print 'Usage error:'
+        print e
+
+if __name__ == '__main__':
+    run_cmdline()
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+# fuck this shit
+
+# ################################################################################
+# ### multiple channels
+# ################################################################################
+
+# class MultipleChannel(Channel):
+#     def switch(self, job, *message):
+#         raise NotImplementedError('This Channel does not allow switching between jobs.')
+#     def broadcast(self, *message):
+#         raise NotImplementedError()
+
+# class SpawnChannel(MultipleChannel):
+#     # spawns one process for each task
+#     pass
+
+# class GreenletChannel(MultipleChannel):
+#     # uses a single process for all tasks, using greenlets to switch between them
+#     pass
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/pylearn/dbdict/sql.py	Thu Dec 04 11:03:38 2008 -0500
@@ -0,0 +1,214 @@
+
+import sys
+
+import sqlalchemy
+from sqlalchemy import create_engine, desc
+from sqlalchemy.orm import eagerload
+import copy
+
+import psycopg2, psycopg2.extensions 
+
+from api0 import db_from_engine, postgres_db
+
+
+STATUS = 'dbdict.status'
+PRIORITY = 'dbdict.sql.priority'
+HOST = 'dbdict.sql.hostname'
+HOST_WORKDIR = 'dbdict.sql.host_workdir'
+PUSH_ERROR = 'dbdict.sql.push_error'
+
+START = 0
+RUNNING = 1
+DONE = 2
+
+_TEST_CONCURRENCY = False
+
+def postgres_serial(user, password, host, database, **kwargs):
+    """Return a DbHandle instance that communicates with a postgres database at transaction
+    isolation_level 'SERIALIZABLE'.
+
+    :param user: a username in the database
+    :param password: the password for the username
+    :param host: the network address of the host on which the postgres server is running
+    :param database: a database served by the postgres server
+    
+    """
+    this = postgres_serial
+
+    if not hasattr(this,'engine'):
+        def connect():
+            c = psycopg2.connect(user=user, password=password, database=database, host=host)
+            c.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE)
+            return c
+        pool_size = 0
+        this.engine = create_engine('postgres://'
+                ,creator=connect
+                ,pool_size=0 # should force the app release connections
+                )
+
+    db = db_from_engine(this.engine, **kwargs)
+    db._is_serialized_session_db = True
+    return db
+
+def book_dct_postgres_serial(db, retry_max_sleep=10.0, verbose=0):
+    """Find a trial in the lisa_db with status START.
+
+    A trial will be returned with dbdict_status=RUNNING.
+
+    Returns None if no such trial exists in DB.
+
+    This function uses a serial access to the lisadb to guarantee that no other
+    process will retrieve the same dct.  It is designed to facilitate writing
+    a "consumer" for a Producer-Consumer pattern based on the database.
+
+    """
+    print >> sys.stderr, """#TODO: use the priority field, not the status."""
+    print >> sys.stderr, """#TODO: ignore entries with key PUSH_ERROR."""
+
+    s = db._session
+
+    # NB. we need the query and attribute update to be in the same transaction
+    assert s.autocommit == False 
+
+    dcts_seen = set([])
+    keep_trying = True
+
+    dct = None
+    while (dct is None) and keep_trying:
+        #build a query
+        q = s.query(db._Dict)
+        q = q.options(eagerload('_attrs')) #hard-coded in api0
+        q = q.filter(db._Dict._attrs.any(name=STATUS, fval=START))
+
+        #try to reserve a dct
+        try:
+            #first() may raise psycopg2.ProgrammingError
+            dct = q.first()
+
+            if dct is not None:
+                assert (dct not in dcts_seen)
+                if verbose: print 'book_unstarted_dct retrieved, ', dct
+                dct._set_in_session(STATUS, RUNNING, s)
+                if 1:
+                    if _TEST_CONCURRENCY:
+                        print >> sys.stderr, 'SLEEPING BEFORE BOOKING'
+                        time.sleep(10)
+
+                    #commit() may raise psycopg2.ProgrammingError
+                    s.commit()
+                else:
+                    print >> sys.stderr, 'DEBUG MODE: NOT RESERVING JOB!', dct
+                #if we get this far, the job is ours!
+            else:
+                # no jobs are left
+                keep_trying = False
+        except (psycopg2.OperationalError,
+                sqlalchemy.exceptions.ProgrammingError), e:
+            #either the first() or the commit() raised
+            s.rollback() # docs say to do this (or close) after commit raises exception
+            if verbose: print 'caught exception', e
+            if dct:
+                # first() succeeded, commit() failed
+                dcts_seen.add(dct)
+                dct = None
+            wait = numpy.random.rand(1)*retry_max_sleep
+            if verbose: print 'another process stole our dct. Waiting %f secs' % wait
+            time.sleep(wait)
+    return dct
+
+def book_dct(db):
+    print >> sys.stderr, """#TODO: use the priority field, not the status."""
+    print >> sys.stderr, """#TODO: ignore entries with key self.push_error."""
+
+    return db.query(dbdict_status=START).first()
+
+def parse_dbstring(dbstring):
+    postgres = 'postgres://'
+    assert dbstring.startswith(postgres)
+    dbstring = dbstring[len(postgres):]
+
+    #username_and_password
+    colon_pos = dbstring.find('@')
+    username_and_password = dbstring[:colon_pos]
+    dbstring = dbstring[colon_pos+1:]
+
+    colon_pos = username_and_password.find(':')
+    if -1 == colon_pos:
+        username = username_and_password
+        password = None
+    else:
+        username = username_and_password[:colon_pos]
+        password = username_and_password[colon_pos+1:]
+    
+    #hostname
+    colon_pos = dbstring.find('/')
+    hostname = dbstring[:colon_pos]
+    dbstring = dbstring[colon_pos+1:]
+
+    #dbname
+    colon_pos = dbstring.find('/')
+    dbname = dbstring[:colon_pos]
+    dbstring = dbstring[colon_pos+1:]
+
+    #tablename
+    tablename = dbstring
+
+    if password is None:
+        password = open(os.getenv('HOME')+'/.dbdict_%s'%dbname).readline()[:-1]
+    if False:
+        print 'USERNAME', username
+        print 'PASS', password
+        print 'HOST', hostname
+        print 'DB', dbname
+        print 'TABLE', tablename
+
+    return username, password, hostname, dbname, tablename
+
+
+def add_experiments_to_db(jobs, db, verbose=0, add_dups=False, type_check=None):
+    """Add experiments paramatrized by jobs[i] to database db.
+
+    Default behaviour is to ignore jobs which are already in the database.
+
+    If type_check is a class (instead of None) then it will be used as a type declaration for
+    all the elements in each job dictionary.  For each key,value pair in the dictionary, there
+    must exist an attribute,value pair in the class meeting the following criteria:
+    the attribute and the key are equal, and the types of the values are equal.
+
+    :param jobs: The parameters of experiments to run.
+    :type jobs: an iterable object over dictionaries
+    :param verbose: print which jobs are added and which are skipped
+    :param add_dups: False will ignore a job if it matches (on all items()) with a db entry.
+    :type add_dups: Bool
+
+    :returns: list of (Bool,job[i]) in which the flags mean the corresponding job actually was
+    inserted.
+
+    """
+    rval = []
+    for job in jobs:
+        job = copy.copy(job)
+        do_insert = add_dups or (None is db.query(**job).first())
+
+        if do_insert:
+            if type_check:
+                for k,v in job.items():
+                    if type(v) != getattr(type_check, k):
+                        raise TypeError('Experiment contains value with wrong type',
+                                ((k,v), getattr(type_check, k)))
+
+            job[STATUS] = START
+            job[PRIORITY] = 1.0
+            if verbose:
+                print 'ADDING  ', job
+            db.insert(job)
+            rval.append((True, job))
+        else:
+            if verbose:
+                print 'SKIPPING', job
+            rval.append((False, job))
+
+
+
+
+
--- a/pylearn/old_dataset/_test_dataset.py	Thu Dec 04 10:56:44 2008 -0500
+++ b/pylearn/old_dataset/_test_dataset.py	Thu Dec 04 11:03:38 2008 -0500
@@ -315,6 +315,7 @@
 #ds[i] returns the (i+1)-th example of the dataset.
     ds2=ds[5]
     assert isinstance(ds2,Example)
+    test_ds(ds,ds2,[5])
     assert have_raised("var['ds']["+str(len(ds))+"]",ds=ds)  # index not defined
     assert not have_raised("var['ds']["+str(len(ds)-1)+"]",ds=ds)
     del ds2