view mlp_factory_approach.py @ 220:1f527fe65e22

test on simple slicing works
author Thierry Bertin-Mahieux <bertinmt@iro.umontreal.ca>
date Fri, 23 May 2008 13:44:25 -0400
parents df3fae88ab46
children 3595ba2610f7
line wrap: on
line source

import copy, sys
import numpy

import theano
from theano import tensor as t

from pylearn import dataset, nnet_ops, stopper


def _randshape(*shape): 
    return (numpy.random.rand(*shape) -0.5) * 0.001

def _cache(d, key, valfn):
    #valfn() is only evaluated if key isn't in dictionary d
    if key not in d:
        d[key] = valfn()
    return d[key]

class _Model(object):
    def __init__(self, algo, params):
        self.algo = algo
        self.params = params
        v = algo.v
        self.update_fn = algo._fn([v.input, v.target] + v.params, [v.nll] + v.new_params)
        self._fn_cache = {}

    def __copy__(self):
        return _Model(self.algo, [copy.copy(p) for p in params])

    def update(self, input_target):
        """Update this model from more training data."""
        params = self.params
        #TODO: why should we have to unpack target like this?
        # tbm : creates problem...
        for input, target in input_target:
            rval= self.update_fn(input, target[:,0], *params)
            #print rval[0]

    def __call__(self, testset, fieldnames=['output_class'],input='input',target='target'):
        """Apply this model (as a function) to new data"""
        #TODO: cache fn between calls
        assert input == testset.fieldNames()[0] # why first one???
        assert len(testset.fieldNames()) <= 2
        v = self.algo.v
        outputs = [getattr(v, name) for name in fieldnames]
        inputs = [v.input] + ([v.target] if target in testset else [])
        inputs.extend(v.params)
        theano_fn = _cache(self._fn_cache, (tuple(inputs), tuple(outputs)),
                lambda: self.algo._fn(inputs, outputs))
        lambda_fn = lambda *args: theano_fn(*(list(args) + self.params))
        return dataset.ApplyFunctionDataSet(testset, lambda_fn, fieldnames)

class AutonameVars(object):
    def __init__(self, dct):
        for key, val in dct.items():
            if type(key) is str and hasattr(val, 'name'):
                val.name = key
        self.__dict__.update(dct)

class MultiLayerPerceptron(object):

    def __init__(self, ninputs, nhid, nclass, lr,
            l2coef=0.0,
            linker='c&py', 
            hidden_layer=None,
            early_stopper=None,
            validation_portion=0.2,
            V_extern=None):
        class V_intern(AutonameVars):
            def __init__(v_self, lr, l2coef, **kwargs):
                lr = t.constant(lr)
                l2coef = t.constant(l2coef)
                input = t.matrix() # n_examples x n_inputs
                target = t.ivector() # len: n_examples
                W2, b2 = t.matrix(), t.vector()

                if hidden_layer:
                    hid, hid_params, hid_ivals, hid_regularization = hidden_layer(input)
                else:
                    W1, b1 = t.matrix(), t.vector()
                    hid = t.tanh(b1 + t.dot(input, W1))
                    hid_params = [W1, b1]
                    hid_regularization = l2coef * t.sum(W1*W1)
                    hid_ivals = lambda : [_randshape(ninputs, nhid), _randshape(nhid)]

                params = [W2, b2] + hid_params
                activations = b2 + t.dot(hid, W2)
                nll, predictions = nnet_ops.crossentropy_softmax_1hot(activations, target)
                regularization = l2coef * t.sum(W2*W2) + hid_regularization
                output_class = t.argmax(activations,1)
                loss_01 = t.neq(output_class, target)
                g_params = t.grad(nll + regularization, params)
                new_params = [t.sub_inplace(p, lr * gp) for p,gp in zip(params, g_params)]
                self.__dict__.update(locals()); del self.self
                AutonameVars.__init__(v_self, locals())
        self.nhid = nhid
        self.nclass = nclass
        self.v = V_intern(**locals()) if V_extern is None else V_extern(**locals())
        self.linker = linker
        self.early_stopper = early_stopper if early_stopper is not None else lambda: stopper.NStages(10,1)
        self.validation_portion = validation_portion

    def _fn(self, inputs, outputs):
        # Caching here would hamper multi-threaded apps
        # prefer caching in _Model.__call__
        return theano.function(inputs, outputs, unpack_single=False, linker=self.linker)

    def __call__(self, trainset=None, iparams=None, input='input', target='target'):
        """Allocate and optionally train a model"""
        if iparams is None:
            iparams = [_randshape(self.nhid, self.nclass), _randshape(self.nclass)]\
                    + self.v.hid_ivals()
        rval = _Model(self, iparams)
        if trainset:
            if len(trainset) == sys.maxint:
                raise NotImplementedError('Learning from infinite streams is not supported')
            nval = int(self.validation_portion * len(trainset))
            nmin = len(trainset) - nval
            assert nmin >= 0
            minset = trainset[:nmin] #real training set for minimizing loss
            valset = trainset[nmin:] #validation set for early stopping
            best = rval
            for stp in self.early_stopper():
                rval.update(
                    minset.minibatches([input, target], minibatch_size=min(32,
                        len(trainset))))
                #print 'mlp.__call__(), we did an update'
                if stp.set_score:
                    stp.score = rval(valset, ['loss_01'])
                    if (stp.score < stp.best_score):
                        best = copy.copy(rval)
            rval = best
        return rval


import unittest

class TestMLP(unittest.TestCase):
    def test0(self):

        training_set1 = dataset.ArrayDataSet(numpy.array([[0, 0, 0],
                                                         [0, 1, 1],
                                                         [1, 0, 1],
                                                         [1, 1, 1]]),
                                            {'input':slice(2),'target':2})
        training_set2 = dataset.ArrayDataSet(numpy.array([[0, 0, 0],
                                                         [0, 1, 1],
                                                         [1, 0, 0],
                                                         [1, 1, 1]]),
                                            {'input':slice(2),'target':2})
        test_data = dataset.ArrayDataSet(numpy.array([[0, 0, 0],
                                                         [0, 1, 1],
                                                         [1, 0, 0],
                                                         [1, 1, 1]]),
                                            {'input':slice(2)})

        learn_algo = MultiLayerPerceptron(2, 10, 2, .1
                , linker='c&py'
                , early_stopper = lambda:stopper.NStages(100,1))

        model1 = learn_algo(training_set1,input='input',target='target')

        model2 = learn_algo(training_set2)

        n_match = 0
        for o1, o2 in zip(model1(test_data), model2(test_data)):
            #print o1
            #print o2
            n_match += (o1 == o2) 

        assert n_match ==  (numpy.sum(training_set1.fields()['target'] ==
                training_set2.fields()['target']))

if __name__ == '__main__':
    unittest.main()