# HG changeset patch # User Frederic Bastien # Date 1228406618 18000 # Node ID cf19655ec48b4c3f30e8f491afbdb494d1763181 # Parent 9f5891cd4048feff9d45472f5d2b8242d6d66fc9# Parent 220044be9fd86680fc018ed1658296538b21335f Automated merge with ssh://projects@lgcm.iro.umontreal.ca/hg/pylearn_refactor diff -r 220044be9fd8 -r cf19655ec48b pylearn/algorithms/kernel_regression.py --- a/pylearn/algorithms/kernel_regression.py Thu Dec 04 10:56:44 2008 -0500 +++ b/pylearn/algorithms/kernel_regression.py Thu Dec 04 11:03:38 2008 -0500 @@ -4,16 +4,16 @@ from pylearn.learner import OfflineLearningAlgorithm from theano import tensor as T -from nnet_ops import prepend_1_to_each_row +from theano.tensor.nnet import prepend_1_to_each_row from theano.scalar import as_scalar from common.autoname import AutoName import theano import numpy # map a N-vector to a 1xN matrix -row_vector = theano.elemwise.DimShuffle((False,),['x',0]) +row_vector = theano.tensor.DimShuffle((False,),['x',0]) # map a N-vector to a Nx1 matrix -col_vector = theano.elemwise.DimShuffle((False,),[0,'x']) +col_vector = theano.tensor.DimShuffle((False,),[0,'x']) class KernelRegression(OfflineLearningAlgorithm): """ diff -r 220044be9fd8 -r cf19655ec48b pylearn/algorithms/linear_regression.py --- a/pylearn/algorithms/linear_regression.py Thu Dec 04 10:56:44 2008 -0500 +++ b/pylearn/algorithms/linear_regression.py Thu Dec 04 11:03:38 2008 -0500 @@ -6,7 +6,7 @@ from pylearn.learner import OfflineLearningAlgorithm,OnlineLearningAlgorithm from theano import tensor as T -from nnet_ops import prepend_1_to_each_row +from theano.tensor.nnet import prepend_1_to_each_row from theano.scalar import as_scalar from common.autoname import AutoName import theano @@ -96,14 +96,14 @@ __compiled = False @classmethod - def compile(cls,linker='c|py'): + def compile(cls, mode = "FAST_RUN"): if cls.__compiled: return def fn(input_vars,output_vars): - return staticmethod(theano.function(input_vars,output_vars, linker=linker)) + return staticmethod(theano.function(input_vars, output_vars, mode=mode)) - cls.compute_outputs = fn([cls.inputs,cls.theta],[cls.outputs]) - cls.compute_errors = fn([cls.outputs,cls.targets],[cls.squared_errors]) + cls.compute_outputs = fn([cls.inputs,cls.theta],cls.outputs) + cls.compute_errors = fn([cls.outputs,cls.targets],cls.squared_errors) cls.__compiled = True @@ -115,17 +115,17 @@ XtX = T.matrix() # (n_inputs+1) x (n_inputs+1) XtY = T.matrix() # (n_inputs+1) x n_outputs extended_input = prepend_1_to_each_row(P.inputs) - new_XtX = T.add_inplace(XtX,T.dot(extended_input.T,extended_input)) - new_XtY = T.add_inplace(XtY,T.dot(extended_input.T,P.targets)) + new_XtX = T.add(XtX,T.dot(extended_input.T,extended_input)) + new_XtY = T.add(XtY,T.dot(extended_input.T,P.targets)) __compiled = False @classmethod - def compile(cls,linker='c|py'): + def compile(cls, mode="FAST_RUN"): if cls.__compiled: return def fn(input_vars,output_vars): - return staticmethod(theano.function(input_vars,output_vars, linker=linker)) + return staticmethod(theano.function(input_vars, output_vars, mode=mode)) cls.update = fn([cls.XtX,cls.XtY,cls.P.inputs,cls.P.targets],[cls.new_XtX,cls.new_XtY]) diff -r 220044be9fd8 -r cf19655ec48b pylearn/algorithms/logistic_regression.py --- a/pylearn/algorithms/logistic_regression.py Thu Dec 04 10:56:44 2008 -0500 +++ b/pylearn/algorithms/logistic_regression.py Thu Dec 04 11:03:38 2008 -0500 @@ -40,11 +40,15 @@ #here we actually build the model self.linear_output = T.dot(self.input, self.w) + self.b if 0: + # TODO: pending support for target being a sparse matrix self.softmax = nnet.softmax(self.linear_output) self._max_pr, self.argmax = T.max_and_argmax(self.linear_output) self._xent = self.target * T.log(self.softmax) else: + # TODO: when above is fixed, remove this hack (need an argmax + # which is independent of targets) + self.argmax_standalone = T.argmax(self.linear_output); (self._xent, self.softmax, self._max_pr, self.argmax) =\ nnet.crossentropy_softmax_max_and_argmax_1hot( self.linear_output, self.target) @@ -149,12 +153,12 @@ def __init__(self, input=None, targ=None, w=None, b=None, lr=None, regularize=False): super(LogReg2, self).__init__() #boilerplate - self.input = input if input is not None else T.matrix('input') - self.targ = targ if targ is not None else T.lcol() + self.input = module.Member(input) if input is not None else T.matrix('input') + self.targ = module.Member(targ) if targ is not None else T.lcol() - self.w = w if w is not None else module.Member(T.dmatrix()) - self.b = b if b is not None else module.Member(T.dvector()) - self.lr = lr if lr is not None else module.Member(T.dscalar()) + self.w = module.Member(w) if w is not None else module.Member(T.dmatrix()) + self.b = module.Member(b) if b is not None else module.Member(T.dvector()) + self.lr = module.Member(lr) if lr is not None else module.Member(T.dscalar()) self.params = [p for p in [self.w, self.b] if p.owner is None] diff -r 220044be9fd8 -r cf19655ec48b pylearn/algorithms/sandbox/_test_onehotop.py --- a/pylearn/algorithms/sandbox/_test_onehotop.py Thu Dec 04 10:56:44 2008 -0500 +++ b/pylearn/algorithms/sandbox/_test_onehotop.py Thu Dec 04 11:03:38 2008 -0500 @@ -3,7 +3,7 @@ import unittest from theano import compile from theano import gradient - +from theano import function from theano.tensor import as_tensor import random @@ -14,8 +14,8 @@ x = as_tensor([3, 2, 1]) y = as_tensor(5) o = one_hot(x, y) - y = compile.eval_outputs([o]) - self.failUnless(numpy.all(y == numpy.asarray([[0, 0, 0, 1, 0], [0, 0, 1, 0, 0], [0, 1, 0, 0, 0]]))) + f = function([],o) + self.failUnless(numpy.all(f() == numpy.asarray([[0, 0, 0, 1, 0], [0, 0, 1, 0, 0], [0, 1, 0, 0, 0]]))) if __name__ == '__main__': unittest.main() diff -r 220044be9fd8 -r cf19655ec48b pylearn/algorithms/stopper.py --- a/pylearn/algorithms/stopper.py Thu Dec 04 10:56:44 2008 -0500 +++ b/pylearn/algorithms/stopper.py Thu Dec 04 11:03:38 2008 -0500 @@ -122,6 +122,16 @@ raise StopIteration +class NStages(ICML08Stopper): + """Run for a fixed number of steps, checking validation set every so + often.""" + def __init__(self, hard_limit, v_int): + ICML08Stopper.__init__(self, hard_limit, v_int, 1.0, 1.0, hard_limit) + + #TODO: could optimize next() function. Most of what's in ICML08Stopper.next() + #is not necessary + + @stopper_factory('icml08') def icml08_stopper(i_wait, v_int, min_improvement, patience, hard_limit): return ICML08Stopper(i_wait, v_int, min_improvement, patience, hard_limit) diff -r 220044be9fd8 -r cf19655ec48b pylearn/algorithms/tests/test_aa.py --- a/pylearn/algorithms/tests/test_aa.py Thu Dec 04 10:56:44 2008 -0500 +++ b/pylearn/algorithms/tests/test_aa.py Thu Dec 04 11:03:38 2008 -0500 @@ -29,6 +29,14 @@ if __name__ == '__main__': numpy.random.seed(10) + print 'sanity check:' + t1 = test_train('SANITY_CHECK') +# t1 = test_train([theano.Mode('c|py', 'fast_compile'), +# theano.Mode('c|py', 'fast_run')]) + print 'time:',t1 + print + + numpy.random.seed(10) print 'optimized:' t1 = test_train(theano.Mode('c|py', 'fast_run')) print 'time:',t1 diff -r 220044be9fd8 -r cf19655ec48b pylearn/algorithms/tests/test_daa.py --- a/pylearn/algorithms/tests/test_daa.py Thu Dec 04 10:56:44 2008 -0500 +++ b/pylearn/algorithms/tests/test_daa.py Thu Dec 04 11:03:38 2008 -0500 @@ -28,7 +28,7 @@ model.local_update[l]([[0, 1, 0, 1]]) model.local_update[l]([[1, 0, 1, 0]]) - for i in range(1): + for i in range(10): model.update([[0, 1, 0, 1]], [[1]]) model.update([[1, 0, 1, 0]], [[0]]) print model.classify([[0, 1, 0, 1]]) @@ -41,23 +41,31 @@ daa = models.Stacker([(models.SigmoidXEDenoisingAA, 'hidden')] * ndaa + [(pylearn.algorithms.logistic_regression.Module_Nclass, 'pred')], regularize = False) - model = daa.make([4, 20, 20, 20, 10], + model = daa.make([4] + [20] * ndaa + [10], lr = 0.01, mode = mode, seed = 10) - model.layers[0].noise_level = 0.3 - model.layers[1].noise_level = 0.3 - model.layers[2].noise_level = 0.3 + for l in range(ndaa): model.layers[l].noise_level = 0.3 - for l in range(3): + instances = [([[0, 1, 0, 1]], [1]), ([[1, 0, 1, 0]], [0])] + + for l in range(ndaa): for i in range(10): - model.local_update[l]([[0, 1, 0, 1]]) - model.local_update[l]([[1, 0, 1, 0]]) + for (input, output) in instances: + model.local_update[l](input) - for i in range(1): - model.update([[0, 1, 0, 1]], [1]) - model.update([[1, 0, 1, 0]], [0]) + for i in range(10): + for (input, output) in instances: +# model.update(input, output) + print "OLD:", + print model.validate(input, output) + oldloss = model.update(input, output) + print oldloss + print "NEW:" + print model.validate(input, output) + print + print model.apply([[0, 1, 0, 1]]) print model.apply([[1, 0, 1, 0]]) diff -r 220044be9fd8 -r cf19655ec48b pylearn/algorithms/tests/test_linear_regression.py --- a/pylearn/algorithms/tests/test_linear_regression.py Thu Dec 04 10:56:44 2008 -0500 +++ b/pylearn/algorithms/tests/test_linear_regression.py Thu Dec 04 11:03:38 2008 -0500 @@ -21,5 +21,18 @@ print 'mse = ',mse if __name__ == '__main__': - unittest.main() - + import sys + + if len(sys.argv)==1: + unittest.main() + else: + assert sys.argv[1]=="--debug" + tests = [] + for arg in sys.argv[2:]: + tests.append(arg) + if tests: + unittest.TestSuite(map(T_DataSet, tests)).debug() + else: + module = __import__("_test_linear_regression") + tests = unittest.TestLoader().loadTestsFromModule(module) + tests.debug() diff -r 220044be9fd8 -r cf19655ec48b pylearn/datasets/MNIST.py --- a/pylearn/datasets/MNIST.py Thu Dec 04 10:56:44 2008 -0500 +++ b/pylearn/datasets/MNIST.py Thu Dec 04 11:03:38 2008 -0500 @@ -46,6 +46,7 @@ y=all_targ[ntrain+nvalid:ntrain+nvalid+ntest]) rval.n_classes = 10 + rval.img_shape = (28,28) return rval diff -r 220044be9fd8 -r cf19655ec48b pylearn/datasets/embeddings/parameters.py --- a/pylearn/datasets/embeddings/parameters.py Thu Dec 04 10:56:44 2008 -0500 +++ b/pylearn/datasets/embeddings/parameters.py Thu Dec 04 11:03:38 2008 -0500 @@ -1,10 +1,10 @@ """ Locations of the embedding data files. """ -WEIGHTSFILE = "/home/fringant2/lisa/data/word_embeddings.collobert-and-weston/lm-weights.txt" -VOCABFILE = "/home/fringant2/lisa/data/word_embeddings.collobert-and-weston/words.asc" -#WEIGHTSFILE = "/home/joseph/data/word_embeddings.collobert-and-weston/lm-weights.txt" -#VOCABFILE = "/home/joseph/data/word_embeddings.collobert-and-weston/words.asc" +#WEIGHTSFILE = "/home/fringant2/lisa/data/word_embeddings.collobert-and-weston/lm-weights.txt" +#VOCABFILE = "/home/fringant2/lisa/data/word_embeddings.collobert-and-weston/words.asc" +WEIGHTSFILE = "/home/joseph/data/word_embeddings.collobert-and-weston/lm-weights.txt" +VOCABFILE = "/home/joseph/data/word_embeddings.collobert-and-weston/words.asc" NUMBER_OF_WORDS = 30000 DIMENSIONS = 50 UNKNOWN = "UNKNOWN" diff -r 220044be9fd8 -r cf19655ec48b pylearn/datasets/embeddings/process.py --- a/pylearn/datasets/embeddings/process.py Thu Dec 04 10:56:44 2008 -0500 +++ b/pylearn/datasets/embeddings/process.py Thu Dec 04 11:03:38 2008 -0500 @@ -11,6 +11,12 @@ __word_to_embedding = None __read = False +def length(): + """ + @return: The length of embeddings + """ + return len(__word_to_embedding[__words[0]]) + def word_to_embedding(w): read_embeddings() return __word_to_embedding[w] @@ -39,29 +45,21 @@ w = __words[i] __word_to_embedding[w] = l __read = True + for w in __word_to_embedding: assert len(__word_to_embedding[__words[0]]) == len(__word_to_embedding[w]) sys.stderr.write("...done reading %s\n" % WEIGHTSFILE) import re numberre = re.compile("[0-9]") -slashre = re.compile("\\\/") -def preprocess_word(origw): +def preprocess_word(w): """ Convert a word so that it can be embedded directly. Returned the preprocessed sequence. - @note: Preprocessing is appropriate for Penn Treebank style documents. + @note: Perhaps run L{common.penntreebank.preprocess} on the word first. """ read_embeddings() - if origw == "-LRB-": w = "(" - elif origw == "-RRB-": w = ")" - elif origw == "-LCB-": w = "{" - elif origw == "-RCB-": w = "}" - elif origw == "-LSB-": w = "[" - elif origw == "-RSB-": w = "]" - else: - w = origw + if w not in __word_to_embedding: w = string.lower(w) - w = slashre.sub("/", w) w = numberre.sub("NUMBER", w) if w not in __word_to_embedding: # sys.stderr.write("Word not in vocabulary, using %s: %s (original %s)\n" % (UNKNOWN, w, origw)) diff -r 220044be9fd8 -r cf19655ec48b pylearn/datasets/make_test_datasets.py --- a/pylearn/datasets/make_test_datasets.py Thu Dec 04 10:56:44 2008 -0500 +++ b/pylearn/datasets/make_test_datasets.py Thu Dec 04 11:03:38 2008 -0500 @@ -1,4 +1,4 @@ -import dataset +from dataset import ArrayDataSet from shapeset.dset import Polygons from linear_regression import linear_predictor from kernel_regression import kernel_predictor @@ -110,6 +110,7 @@ # testset = ArrayDataSet(inputs[n_examples/2:],{'input':slice(0,n_inputs)}) | \ # ArrayDataSet(targets[n_examples/2:],{'target':slice(0,n_targets)}) data = hstack((inputs,targets)) + trainset = ArrayDataSet(data[0:n_train], {'input':slice(0,n_inputs),'target':slice(n_inputs,n_inputs+n_targets)}) testset = ArrayDataSet(data[n_train:], diff -r 220044be9fd8 -r cf19655ec48b pylearn/datasets/shapeset1.py --- a/pylearn/datasets/shapeset1.py Thu Dec 04 10:56:44 2008 -0500 +++ b/pylearn/datasets/shapeset1.py Thu Dec 04 11:03:38 2008 -0500 @@ -7,7 +7,7 @@ import os import numpy -from ..amat import AMat +from ..io.amat import AMat from .config import data_root def _head(path, n): diff -r 220044be9fd8 -r cf19655ec48b pylearn/datasets/smallNorb.py --- a/pylearn/datasets/smallNorb.py Thu Dec 04 10:56:44 2008 -0500 +++ b/pylearn/datasets/smallNorb.py Thu Dec 04 11:03:38 2008 -0500 @@ -1,6 +1,6 @@ import os import numpy -from ..filetensor import read +from ..io.filetensor import read from .config import data_root #Path = '/u/bergstrj/pub/data/smallnorb' diff -r 220044be9fd8 -r cf19655ec48b pylearn/datasets/testDataset.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/pylearn/datasets/testDataset.py Thu Dec 04 11:03:38 2008 -0500 @@ -0,0 +1,43 @@ +""" +Various routines to load/access MNIST data. +""" +from __future__ import absolute_import + +import os +import numpy + +from ..io.amat import AMat +from .config import data_root +from .dataset import dataset_factory, Dataset + +VALSEQ, VALRAND = range(2) + +@dataset_factory('DEBUG') +def mnist_factory(variant='', ntrain=10, nvalid=10, ntest=10, \ + nclass=2, ndim=1, dshape=None, valtype=VALSEQ): + + temp = [] + [temp.append(5) for i in range(ndim)] + dshape = temp if dshape is None else dshape + + rval = Dataset() + rval.n_classes = nclass + rval.img_shape = dshape + + dsize = numpy.prod(dshape); + + print ntrain, nvalid, ntest, nclass, dshape, valtype + + ntot = ntrain + nvalid + ntest + xdata = numpy.arange(ntot*numpy.prod(dshape)).reshape((ntot,dsize)) \ + if valtype is VALSEQ else \ + numpy.random.random((ntot,dsize)); + ydata = numpy.round(numpy.random.random(ntot)); + + rval.train = Dataset.Obj(x=xdata[0:ntrain],y=ydata[0:ntrain]) + rval.valid = Dataset.Obj(x=xdata[ntrain:ntrain+nvalid],\ + y=ydata[ntrain:ntrain+nvalid]) + rval.test = Dataset.Obj(x=xdata[ntrain+nvalid:ntrain+nvalid+ntest], + y=ydata[ntrain+nvalid:ntrain+nvalid+ntest]) + + return rval diff -r 220044be9fd8 -r cf19655ec48b pylearn/dbdict/api0.py --- a/pylearn/dbdict/api0.py Thu Dec 04 10:56:44 2008 -0500 +++ b/pylearn/dbdict/api0.py Thu Dec 04 11:03:38 2008 -0500 @@ -180,6 +180,9 @@ s.commit() s.update(d_self) + def iteritems(d_self): + return d_self.items() + def items(d_self): return [(kv.name, kv.val) for kv in d_self._attrs] diff -r 220044be9fd8 -r cf19655ec48b pylearn/dbdict/newstuff.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/pylearn/dbdict/newstuff.py Thu Dec 04 11:03:38 2008 -0500 @@ -0,0 +1,614 @@ + +from __future__ import with_statement + +from collections import defaultdict +import re, sys, inspect, os, signal, tempfile, shutil, socket + +import sql + + +################################################################################ +### misc +################################################################################ + +class DD(defaultdict): + def __getattr__(self, attr): + return self[attr] + def __setattr__(self, attr, value): + self[attr] = value + def __str__(self): + return 'DD%s' % dict(self) + def __repr__(self): + return str(self) + +################################################################################ +### resolve +################################################################################ + +def resolve(name): + symbols = name.split('.') + builder = __import__(symbols[0]) + for sym in symbols[1:]: + builder = getattr(builder, sym) + return builder + +################################################################################ +### dictionary +################################################################################ + +def convert(obj): + try: + return eval(obj, {}, {}) + except NameError: + return obj + +def flatten(obj): + d = {} + def helper(d, prefix, obj): + if isinstance(obj, (str, int, float)): + d[prefix] = obj #convert(obj) + else: + if isinstance(obj, dict): + subd = obj + else: + subd = obj.state() + subd['__builder__'] = '%s.%s' % (obj.__module__, obj.__class__.__name__) + for k, v in subd.iteritems(): + pfx = '.'.join([prefix, k]) if prefix else k + helper(d, pfx, v) + helper(d, '', obj) + return d + +def expand(d): + def dd(): + return DD(dd) + struct = dd() + for k, v in d.iteritems(): + if k == '': + raise NotImplementedError() + else: + keys = k.split('.') + current = struct + for k2 in keys[:-1]: + current = current[k2] + current[keys[-1]] = v #convert(v) + return struct + +def realize(d): + if not isinstance(d, dict): + return d + d = dict((k, realize(v)) for k, v in d.iteritems()) + if '__builder__' in d: + builder = resolve(d.pop('__builder__')) + return builder(**d) + return d + +def make(d): + return realize(expand(d)) + +################################################################################ +### errors +################################################################################ + +class UsageError(Exception): + pass + +################################################################################ +### parsing and formatting +################################################################################ + +def parse(*strings): + d = {} + for string in strings: + s1 = re.split(' *= *', string, 1) + s2 = re.split(' *:: *', string, 1) + if len(s1) == 1 and len(s2) == 1: + raise UsageError('Expected a keyword argument in place of "%s"' % s1[0]) + elif len(s1) == 2: + k, v = s1 + v = convert(v) + elif len(s2) == 2: + k, v = s2 + k += '.__builder__' + d[k] = v + return d + +def format_d(d, sep = '\n', space = True): + d = flatten(d) + pattern = "%s = %r" if space else "%s=%r" + return sep.join(pattern % (k, v) for k, v in d.iteritems()) + +def format_help(topic): + if topic is None: + return 'No help.' + elif hasattr(topic, 'help'): + help = topic.help() + else: + help = topic.__doc__ + if not help: + return 'No help.' + + ss = map(str.rstrip, help.split('\n')) + try: + baseline = min([len(line) - len(line.lstrip()) for line in ss if line]) + except: + return 'No help.' + s = '\n'.join([line[baseline:] for line in ss]) + s = re.sub(string = s, pattern = '\n{2,}', repl = '\n\n') + s = re.sub(string = s, pattern = '(^\n*)|(\n*$)', repl = '') + + return s + +################################################################################ +### single channels +################################################################################ + +# try: +# import greenlet +# except: +# try: +# from py import greenlet +# except: +# print >>sys.stderr, 'the greenlet module is unavailable' +# greenlet = None + + +class Channel(object): + + COMPLETE = None + INCOMPLETE = True + + START = 0 + """dbdict.status == START means a experiment is ready to run""" + RUNNING = 1 + """dbdict.status == RUNNING means a experiment is running on dbdict_hostname""" + DONE = 2 + """dbdict.status == DONE means a experiment has completed (not necessarily successfully)""" + + # Methods to be used by the experiment to communicate with the channel + + def save(self): + """ + Save the experiment's state to the various media supported by + the Channel. + """ + raise NotImplementedError() + + def switch(self, message = None): + """ + Called from the experiment to give the control back to the channel. + The following return values are meaningful: + * 'stop' -> the experiment must stop as soon as possible. It may save what + it needs to save. This occurs when SIGTERM or SIGINT are sent (or in + user-defined circumstances). + switch() may give the control to the user. In this case, the user may + resume the experiment by calling switch() again. If an argument is given + by the user, it will be relayed to the experiment. + """ + pass + + def __call__(self, message = None): + return self.switch(message) + + def save_and_switch(self): + self.save() + self.switch() + + # Methods to run the experiment + + def setup(self): + pass + + def __enter__(self): + pass + + def __exit__(self): + pass + + def run(self): + pass + + + +class JobError(Exception): + RUNNING = 0 + DONE = 1 + NOJOB = 2 + + +class SingleChannel(Channel): + + def __init__(self, experiment, state): + self.experiment = experiment + self.state = state + self.feedback = None + + def switch(self, message = None): + feedback = self.feedback + self.feedback = None + return feedback + + def run(self, force = False): + self.setup() + + status = self.state.dbdict.get('status', self.START) + if status is self.DONE and not force: + # If you want to disregard this, use the --force flag (not yet implemented) + raise JobError(JobError.RUNNING, + 'The job has already completed.') + elif status is self.RUNNING and not force: + raise JobError(JobError.DONE, + 'The job is already running.') + self.state.dbdict.status = self.RUNNING + + v = self.COMPLETE + with self: + try: + v = self.experiment(self, self.state) + finally: + self.state.dbdict.status = self.DONE if v is self.COMPLETE else self.START + + return v + + def on_sigterm(self, signo, frame): + # SIGTERM handler. It is the experiment function's responsibility to + # call switch() often enough to get this feedback. + self.feedback = 'stop' + + def __enter__(self): + # install a SIGTERM handler that asks the experiment function to return + # the next time it will call switch() + self.prev_sigterm = signal.getsignal(signal.SIGTERM) + self.prev_sigint = signal.getsignal(signal.SIGINT) + signal.signal(signal.SIGTERM, self.on_sigterm) + signal.signal(signal.SIGINT, self.on_sigterm) + return self + + def __exit__(self, type, value, traceback): + signal.signal(signal.SIGTERM, self.prev_sigterm) + signal.signal(signal.SIGINT, self.prev_sigint) + self.prev_sigterm = None + self.prev_sigint = None + self.save() + + +class StandardChannel(SingleChannel): + + def __init__(self, path, experiment, state, redirect_stdout = False, redirect_stderr = False): + super(StandardChannel, self).__init__(experiment, state) + self.path = os.path.realpath(path) + self.redirect_stdout = redirect_stdout + self.redirect_stderr = redirect_stderr + + def save(self): + with open(os.path.join(self.path, 'current.conf'), 'w') as current: + current.write(format_d(self.state)) + current.write('\n') + + def __enter__(self): + self.old_cwd = os.getcwd() + os.chdir(self.path) + if self.redirect_stdout: + self.old_stdout = sys.stdout + sys.stdout = open('stdout', 'a') + if self.redirect_stderr: + self.old_stderr = sys.stderr + sys.stderr = open('stderr', 'a') + return super(StandardChannel, self).__enter__() + + def __exit__(self, type, value, traceback): + if self.redirect_stdout: + sys.stdout.close() + sys.stdout = self.old_stdout + if self.redirect_stderr: + sys.stderr.close() + sys.stderr = self.old_stderr + os.chdir(self.old_cwd) + return super(StandardChannel, self).__exit__(type, value, traceback) + + def setup(self): + if not os.path.isdir(self.path): + os.makedirs(self.path) + with self: + origf = os.path.join(self.path, 'orig.conf') + if not os.path.isfile(origf): + with open(origf, 'w') as orig: + orig.write(format_d(self.state)) + orig.write('\n') + currentf = os.path.join(self.path, 'current.conf') + if os.path.isfile(currentf): + with open(currentf, 'r') as current: + self.state = expand(parse(*map(str.strip, current.readlines()))) + + +class RSyncException(Exception): + pass + +class RSyncChannel(StandardChannel): + + def __init__(self, path, remote_path, experiment, state): + super(RSyncChannel, self).__init__(path, experiment, state) + + ssh_prefix='ssh://' + if remote_path.startswith(ssh_prefix): + remote_path = remote_path[len(ssh_prefix):] + colon_pos = remote_path.find(':') + self.host = remote_path[:colon_pos] + self.remote_path = remote_path[colon_pos+1:] + else: + self.host = '' + self.remote_path = os.path.realpath(remote_path) + + def rsync(self, direction): + """The directory at which experiment-related files are stored. + """ + + path = self.path + + remote_path = self.remote_path + if self.host: + remote_path = ':'.join([self.host, remote_path]) + + # TODO: use something more portable than os.system + if direction == 'push': + rsync_cmd = 'rsync -ar "%s/" "%s/"' % (path, remote_path) + elif direction == 'pull': + rsync_cmd = 'rsync -ar "%s/" "%s/"' % (remote_path, path) + else: + raise RSyncException('invalid direction', direction) + + rsync_rval = os.system(rsync_cmd) + if rsync_rval != 0: + raise RSyncException('rsync failure', (rsync_rval, rsync_cmd)) + + def touch(self): + if self.host: + host = self.host + touch_cmd = ('ssh %(host)s "mkdir -p \'%(path)s\'"' % dict(host = self.host, + path = self.remote_path)) + else: + touch_cmd = ("mkdir -p '%(path)s'" % dict(path = self.remote_path)) + print "ECHO", touch_cmd + touch_rval = os.system(touch_cmd) + if 0 != touch_rval: + raise Exception('touch failure', (touch_rval, touch_cmd)) + + def pull(self): + return self.rsync('pull') + + def push(self): + return self.rsync('push') + + def save(self): + super(RSyncChannel, self).save() + self.push() + + def setup(self): + self.touch() + self.pull() + super(RSyncChannel, self).setup() + + +class DBRSyncChannel(RSyncChannel): + + RESTART_PRIORITY = 2.0 + + def __init__(self, username, password, hostname, dbname, tablename, path, remote_root): + self.username, self.password, self.hostname, self.dbname, self.tablename \ + = username, password, hostname, dbname, tablename + + self.db = sql.postgres_serial( + user = self.username, + password = self.password, + host = self.hostname, + database = self.dbname, + table_prefix = self.tablename) + + self.dbstate = sql.book_dct_postgres_serial(self.db) + if self.dbstate is None: + raise JobError(JobError.NOJOB, + 'No job was found to run.') + + try: + state = expand(self.dbstate) + experiment = resolve(state.dbdict.experiment) + remote_path = os.path.join(remote_root, self.dbname, self.tablename, str(self.dbstate.id)) + super(DBRSyncChannel, self).__init__(path, remote_path, experiment, state) + except: + self.dbstate['dbdict.status'] = self.START + raise + + def save(self): + super(DBRSyncChannel, self).save() + self.dbstate.update(flatten(self.state)) + + def setup(self): + # Extract a single experiment from the table that is not already running. + # set self.experiment and self.state + super(DBRSyncChannel, self).setup() + self.state.dbdict.sql.host_name = socket.gethostname() + self.state.dbdict.sql.host_workdir = self.path + self.dbstate.update(flatten(self.state)) + + def run(self): + # We pass the force flag as True because the status flag is + # already set to RUNNING by book_dct in __init__ + v = super(DBRSyncChannel, self).run(force = True) + if v is self.INCOMPLETE and self.state.dbdict.sql.priority != self.RESTART_PRIORITY: + self.state.dbdict.sql.priority = self.RESTART_PRIORITY + self.save() + return v + + + +################################################################################ +### running +################################################################################ + +def run(type, arguments): + runner = runner_registry.get(type, None) + if not runner: + raise UsageError('Unknown runner: "%s"' % type) + argspec = inspect.getargspec(runner) + minargs = len(argspec[0])-(len(argspec[3]) if argspec[3] else 0) + maxargs = len(argspec[0]) + if minargs > len(arguments) or maxargs < len(arguments) and not argspec[1]: + s = format_help(runner) + raise UsageError(s) + runner(*arguments) + +runner_registry = dict() + +def runner_cmdline(experiment, *strings): + """ + Start an experiment with parameters given on the command line. + + Usage: cmdline ... + + Run an experiment with parameters provided on the command + line. The symbol described by will be imported + using the normal python import rules and will be called with + the dictionary described on the command line. + + The signature of the function located at must + look like: + def my_experiment(state, channel): + ... + + Examples of setting parameters: + a=2 => state['a'] = 2 + b.c=3 => state['b']['c'] = 3 + p::mymodule.Something => state['p']['__builder__']=mymodule.Something + + Example call: + run_experiment cmdline mymodule.my_experiment \\ + stopper::pylearn.stopper.nsteps \\ # use pylearn.stopper.nsteps + stopper.n=10000 \\ # the argument "n" of nsteps is 10000 + lr=0.03 + """ + state = expand(parse(*strings)) + state.dbdict.experiment = experiment + experiment = resolve(experiment) + #channel = RSyncChannel('.', 'yaddayadda', experiment, state) + channel = StandardChannel(format_d(state, sep=',', space = False), + experiment, state) + channel.run() + +runner_registry['cmdline'] = runner_cmdline + + +def runner_sqlschedule(dbdescr, experiment, *strings): + try: + username, password, hostname, dbname, tablename \ + = sql.parse_dbstring(dbdescr) + except: + raise UsageError('Wrong syntax for dbdescr') + + db = sql.postgres_serial( + user = username, + password = password, + host = hostname, + database = dbname, + table_prefix = tablename) + + state = parse(*strings) + state['dbdict.experiment'] = experiment + sql.add_experiments_to_db([state], db, verbose = 1) + +runner_registry['sqlschedule'] = runner_sqlschedule + + + +def runner_sql(dbdescr, exproot): + try: + username, password, hostname, dbname, tablename \ + = sql.parse_dbstring(dbdescr) + except: + raise UsageError('Wrong syntax for dbdescr') + workdir = tempfile.mkdtemp() + print 'wdir', workdir + channel = DBRSyncChannel(username, password, hostname, dbname, tablename, + workdir, + exproot) + channel.run() + shutil.rmtree(workdir, ignore_errors=True) + +runner_registry['sql'] = runner_sql + + + + + +def help(topic = None): + """ + Get help for a topic. + + Usage: help + """ + if topic is None: + print 'Available commands: (use help for more info)' + print + for name, command in sorted(runner_registry.iteritems()): + print name.ljust(20), format_help(command).split('\n')[0] + return + print format_help(runner_registry.get(topic, None)) + +runner_registry['help'] = help + +################################################################################ +### main +################################################################################ + +def run_cmdline(): + try: + if len(sys.argv) <= 1: + raise UsageError('Usage: %s [*]' % sys.argv[0]) + run(sys.argv[1], sys.argv[2:]) + except UsageError, e: + print 'Usage error:' + print e + +if __name__ == '__main__': + run_cmdline() + + + + + + + + + + + + + + + + + + + + + + + +# fuck this shit + +# ################################################################################ +# ### multiple channels +# ################################################################################ + +# class MultipleChannel(Channel): +# def switch(self, job, *message): +# raise NotImplementedError('This Channel does not allow switching between jobs.') +# def broadcast(self, *message): +# raise NotImplementedError() + +# class SpawnChannel(MultipleChannel): +# # spawns one process for each task +# pass + +# class GreenletChannel(MultipleChannel): +# # uses a single process for all tasks, using greenlets to switch between them +# pass diff -r 220044be9fd8 -r cf19655ec48b pylearn/dbdict/sql.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/pylearn/dbdict/sql.py Thu Dec 04 11:03:38 2008 -0500 @@ -0,0 +1,214 @@ + +import sys + +import sqlalchemy +from sqlalchemy import create_engine, desc +from sqlalchemy.orm import eagerload +import copy + +import psycopg2, psycopg2.extensions + +from api0 import db_from_engine, postgres_db + + +STATUS = 'dbdict.status' +PRIORITY = 'dbdict.sql.priority' +HOST = 'dbdict.sql.hostname' +HOST_WORKDIR = 'dbdict.sql.host_workdir' +PUSH_ERROR = 'dbdict.sql.push_error' + +START = 0 +RUNNING = 1 +DONE = 2 + +_TEST_CONCURRENCY = False + +def postgres_serial(user, password, host, database, **kwargs): + """Return a DbHandle instance that communicates with a postgres database at transaction + isolation_level 'SERIALIZABLE'. + + :param user: a username in the database + :param password: the password for the username + :param host: the network address of the host on which the postgres server is running + :param database: a database served by the postgres server + + """ + this = postgres_serial + + if not hasattr(this,'engine'): + def connect(): + c = psycopg2.connect(user=user, password=password, database=database, host=host) + c.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE) + return c + pool_size = 0 + this.engine = create_engine('postgres://' + ,creator=connect + ,pool_size=0 # should force the app release connections + ) + + db = db_from_engine(this.engine, **kwargs) + db._is_serialized_session_db = True + return db + +def book_dct_postgres_serial(db, retry_max_sleep=10.0, verbose=0): + """Find a trial in the lisa_db with status START. + + A trial will be returned with dbdict_status=RUNNING. + + Returns None if no such trial exists in DB. + + This function uses a serial access to the lisadb to guarantee that no other + process will retrieve the same dct. It is designed to facilitate writing + a "consumer" for a Producer-Consumer pattern based on the database. + + """ + print >> sys.stderr, """#TODO: use the priority field, not the status.""" + print >> sys.stderr, """#TODO: ignore entries with key PUSH_ERROR.""" + + s = db._session + + # NB. we need the query and attribute update to be in the same transaction + assert s.autocommit == False + + dcts_seen = set([]) + keep_trying = True + + dct = None + while (dct is None) and keep_trying: + #build a query + q = s.query(db._Dict) + q = q.options(eagerload('_attrs')) #hard-coded in api0 + q = q.filter(db._Dict._attrs.any(name=STATUS, fval=START)) + + #try to reserve a dct + try: + #first() may raise psycopg2.ProgrammingError + dct = q.first() + + if dct is not None: + assert (dct not in dcts_seen) + if verbose: print 'book_unstarted_dct retrieved, ', dct + dct._set_in_session(STATUS, RUNNING, s) + if 1: + if _TEST_CONCURRENCY: + print >> sys.stderr, 'SLEEPING BEFORE BOOKING' + time.sleep(10) + + #commit() may raise psycopg2.ProgrammingError + s.commit() + else: + print >> sys.stderr, 'DEBUG MODE: NOT RESERVING JOB!', dct + #if we get this far, the job is ours! + else: + # no jobs are left + keep_trying = False + except (psycopg2.OperationalError, + sqlalchemy.exceptions.ProgrammingError), e: + #either the first() or the commit() raised + s.rollback() # docs say to do this (or close) after commit raises exception + if verbose: print 'caught exception', e + if dct: + # first() succeeded, commit() failed + dcts_seen.add(dct) + dct = None + wait = numpy.random.rand(1)*retry_max_sleep + if verbose: print 'another process stole our dct. Waiting %f secs' % wait + time.sleep(wait) + return dct + +def book_dct(db): + print >> sys.stderr, """#TODO: use the priority field, not the status.""" + print >> sys.stderr, """#TODO: ignore entries with key self.push_error.""" + + return db.query(dbdict_status=START).first() + +def parse_dbstring(dbstring): + postgres = 'postgres://' + assert dbstring.startswith(postgres) + dbstring = dbstring[len(postgres):] + + #username_and_password + colon_pos = dbstring.find('@') + username_and_password = dbstring[:colon_pos] + dbstring = dbstring[colon_pos+1:] + + colon_pos = username_and_password.find(':') + if -1 == colon_pos: + username = username_and_password + password = None + else: + username = username_and_password[:colon_pos] + password = username_and_password[colon_pos+1:] + + #hostname + colon_pos = dbstring.find('/') + hostname = dbstring[:colon_pos] + dbstring = dbstring[colon_pos+1:] + + #dbname + colon_pos = dbstring.find('/') + dbname = dbstring[:colon_pos] + dbstring = dbstring[colon_pos+1:] + + #tablename + tablename = dbstring + + if password is None: + password = open(os.getenv('HOME')+'/.dbdict_%s'%dbname).readline()[:-1] + if False: + print 'USERNAME', username + print 'PASS', password + print 'HOST', hostname + print 'DB', dbname + print 'TABLE', tablename + + return username, password, hostname, dbname, tablename + + +def add_experiments_to_db(jobs, db, verbose=0, add_dups=False, type_check=None): + """Add experiments paramatrized by jobs[i] to database db. + + Default behaviour is to ignore jobs which are already in the database. + + If type_check is a class (instead of None) then it will be used as a type declaration for + all the elements in each job dictionary. For each key,value pair in the dictionary, there + must exist an attribute,value pair in the class meeting the following criteria: + the attribute and the key are equal, and the types of the values are equal. + + :param jobs: The parameters of experiments to run. + :type jobs: an iterable object over dictionaries + :param verbose: print which jobs are added and which are skipped + :param add_dups: False will ignore a job if it matches (on all items()) with a db entry. + :type add_dups: Bool + + :returns: list of (Bool,job[i]) in which the flags mean the corresponding job actually was + inserted. + + """ + rval = [] + for job in jobs: + job = copy.copy(job) + do_insert = add_dups or (None is db.query(**job).first()) + + if do_insert: + if type_check: + for k,v in job.items(): + if type(v) != getattr(type_check, k): + raise TypeError('Experiment contains value with wrong type', + ((k,v), getattr(type_check, k))) + + job[STATUS] = START + job[PRIORITY] = 1.0 + if verbose: + print 'ADDING ', job + db.insert(job) + rval.append((True, job)) + else: + if verbose: + print 'SKIPPING', job + rval.append((False, job)) + + + + + diff -r 220044be9fd8 -r cf19655ec48b pylearn/old_dataset/_test_dataset.py --- a/pylearn/old_dataset/_test_dataset.py Thu Dec 04 10:56:44 2008 -0500 +++ b/pylearn/old_dataset/_test_dataset.py Thu Dec 04 11:03:38 2008 -0500 @@ -315,6 +315,7 @@ #ds[i] returns the (i+1)-th example of the dataset. ds2=ds[5] assert isinstance(ds2,Example) + test_ds(ds,ds2,[5]) assert have_raised("var['ds']["+str(len(ds))+"]",ds=ds) # index not defined assert not have_raised("var['ds']["+str(len(ds)-1)+"]",ds=ds) del ds2