Mercurial > pylearn
view mlp_factory_approach.py @ 234:9504940ef5ef
Automated merge with ssh://projects@lgcm.iro.umontreal.ca/hg/pylearn
author | Frederic Bastien <bastienf@iro.umontreal.ca> |
---|---|
date | Tue, 27 May 2008 15:59:24 -0400 |
parents | c047238e5b3f |
children | 3156a9976183 |
line wrap: on
line source
""" This file is deprecated. I'm continuing development in hpu/models.py. Get that project like this: hg clone ssh://user@lgcm/../bergstrj/hpu """ import copy, sys import numpy import theano from theano import tensor as t from pylearn import dataset, nnet_ops, stopper def _randshape(*shape): return (numpy.random.rand(*shape) -0.5) * 0.001 def _cache(d, key, valfn): #valfn() is only evaluated if key isn't in dictionary d if key not in d: d[key] = valfn() return d[key] class _Model(object): def __init__(self, algo, params): self.algo = algo self.params = params v = algo.v self.update_fn = algo._fn([v.input, v.target] + v.params, [v.nll] + v.new_params) self._fn_cache = {} def __copy__(self): return _Model(self.algo, [copy.copy(p) for p in params]) def update(self, input_target): """Update this model from more training data.""" params = self.params #TODO: why should we have to unpack target like this? # tbm : creates problem... for input, target in input_target: rval= self.update_fn(input, target, *params) #print rval[0] def __call__(self, testset, fieldnames=['output_class'],input='input',target='target'): """Apply this model (as a function) to new data""" #TODO: cache fn between calls assert input == testset.fieldNames()[0] # why first one??? assert len(testset.fieldNames()) <= 2 v = self.algo.v outputs = [getattr(v, name) for name in fieldnames] inputs = [v.input] + ([v.target] if target in testset else []) inputs.extend(v.params) theano_fn = _cache(self._fn_cache, (tuple(inputs), tuple(outputs)), lambda: self.algo._fn(inputs, outputs)) lambda_fn = lambda *args: theano_fn(*(list(args) + self.params)) return dataset.ApplyFunctionDataSet(testset, lambda_fn, fieldnames) class AutonameVars(object): def __init__(self, dct): for key, val in dct.items(): if type(key) is str and hasattr(val, 'name'): val.name = key self.__dict__.update(dct) class MultiLayerPerceptron(object): def __init__(self, ninputs, nhid, nclass, lr, l2coef=0.0, linker='c&py', hidden_layer=None, early_stopper=None, validation_portion=0.2, V_extern=None): class V_intern(AutonameVars): def __init__(v_self, lr, l2coef, **kwargs): lr = t.constant(lr) l2coef = t.constant(l2coef) input = t.matrix() # n_examples x n_inputs target = t.ivector() # len: n_examples W2, b2 = t.matrix(), t.vector() if hidden_layer: hid, hid_params, hid_ivals, hid_regularization = hidden_layer(input) else: W1, b1 = t.matrix(), t.vector() hid = t.tanh(b1 + t.dot(input, W1)) hid_params = [W1, b1] hid_regularization = l2coef * t.sum(W1*W1) hid_ivals = lambda : [_randshape(ninputs, nhid), _randshape(nhid)] params = [W2, b2] + hid_params activations = b2 + t.dot(hid, W2) nll, predictions = nnet_ops.crossentropy_softmax_1hot(activations, target) regularization = l2coef * t.sum(W2*W2) + hid_regularization output_class = t.argmax(activations,1) loss_01 = t.neq(output_class, target) g_params = t.grad(nll + regularization, params) new_params = [t.sub_inplace(p, lr * gp) for p,gp in zip(params, g_params)] self.__dict__.update(locals()); del self.self AutonameVars.__init__(v_self, locals()) self.nhid = nhid self.nclass = nclass self.v = V_intern(**locals()) if V_extern is None else V_extern(**locals()) self.linker = linker self.early_stopper = early_stopper if early_stopper is not None else lambda: stopper.NStages(10,1) self.validation_portion = validation_portion def _fn(self, inputs, outputs): # Caching here would hamper multi-threaded apps # prefer caching in _Model.__call__ return theano.function(inputs, outputs, unpack_single=False, linker=self.linker) def __call__(self, trainset=None, iparams=None, input='input', target='target'): """Allocate and optionally train a model""" if iparams is None: iparams = [_randshape(self.nhid, self.nclass), _randshape(self.nclass)]\ + self.v.hid_ivals() rval = _Model(self, iparams) if trainset: if len(trainset) == sys.maxint: raise NotImplementedError('Learning from infinite streams is not supported') nval = int(self.validation_portion * len(trainset)) nmin = len(trainset) - nval assert nmin >= 0 minset = trainset[:nmin] #real training set for minimizing loss valset = trainset[nmin:] #validation set for early stopping best = rval for stp in self.early_stopper(): rval.update( minset.minibatches([input, target], minibatch_size=min(32, len(minset)))) #print 'mlp.__call__(), we did an update' if stp.set_score: stp.score = rval(valset, ['loss_01']) if (stp.score < stp.best_score): best = copy.copy(rval) rval = best return rval import unittest class TestMLP(unittest.TestCase): def test0(self): training_set1 = dataset.ArrayDataSet(numpy.array([[0, 0, 0], [0, 1, 1], [1, 0, 1], [1, 1, 1]]), {'input':slice(2),'target':2}) training_set2 = dataset.ArrayDataSet(numpy.array([[0, 0, 0], [0, 1, 1], [1, 0, 0], [1, 1, 1]]), {'input':slice(2),'target':2}) test_data = dataset.ArrayDataSet(numpy.array([[0, 0, 0], [0, 1, 1], [1, 0, 0], [1, 1, 1]]), {'input':slice(2)}) learn_algo = MultiLayerPerceptron(2, 10, 2, .1 , linker='c&py' , early_stopper = lambda:stopper.NStages(100,1)) model1 = learn_algo(training_set1) model2 = learn_algo(training_set2) n_match = 0 for o1, o2 in zip(model1(test_data), model2(test_data)): #print o1 #print o2 n_match += (o1 == o2) assert n_match == (numpy.sum(training_set1.fields()['target'] == training_set2.fields()['target'])) if __name__ == '__main__': unittest.main()