view mlp_factory_approach.py @ 212:9b57ea8c767f

previous commit was supposed to concern only one file, dataset.py, try to undo my other changes with this commit (nothing was broken though, just useless debugging prints)
author Thierry Bertin-Mahieux <bertinmt@iro.umontreal.ca>
date Wed, 21 May 2008 17:42:20 -0400
parents bd728c83faff
children 6fa8fbb0c3f6
line wrap: on
line source

import copy, sys
import numpy

import theano
from theano import tensor as t

from tlearn import dataset, nnet_ops, stopper


def _randshape(*shape): 
    return (numpy.random.rand(*shape) -0.5) * 0.001

def _cache(d, key, valfn):
    #valfn() is only evaluated if key isn't in dictionary d
    if key not in d:
        d[key] = valfn()
    return d[key]

class _Model(object):
    def __init__(self, algo, params):
        self.algo = algo
        self.params = params
        v = algo.v
        self.update_fn = algo._fn([v.input, v.target] + v.params, [v.nll] + v.new_params)
        self._fn_cache = {}

    def __copy__(self):
        return _Model(self.algo, [copy.copy(p) for p in params])

    def update(self, input_target):
        """Update this model from more training data."""
        params = self.params
        #TODO: why should we have to unpack target like this?
        for input, target in input_target:
            rval= self.update_fn(input, target[:,0], *params)
            #print rval[0]

    def __call__(self, testset, fieldnames=['output_class']):
        """Apply this model (as a function) to new data"""
        #TODO: cache fn between calls
        assert 'input' == testset.fieldNames()[0]
        assert len(testset.fieldNames()) <= 2
        v = self.algo.v
        outputs = [getattr(v, name) for name in fieldnames]
        inputs = [v.input] + ([v.target] if 'target' in testset else [])
        inputs.extend(v.params)
        theano_fn = _cache(self._fn_cache, (tuple(inputs), tuple(outputs)),
                lambda: self.algo._fn(inputs, outputs))
        lambda_fn = lambda *args: theano_fn(*(list(args) + self.params))
        return dataset.ApplyFunctionDataSet(testset, lambda_fn, fieldnames)

class AutonameVars(object):
    def __init__(self, dct):
        for key, val in dct.items():
            if type(key) is str and hasattr(val, 'name'):
                val.name = key
        self.__dict__.update(dct)

class MultiLayerPerceptron(object):

    def __init__(self, ninputs, nhid, nclass, lr,
            l2coef=0.0,
            linker='c&py', 
            hidden_layer=None,
            early_stopper=None,
            validation_portion=0.2,
            V_extern=None):
        class V_intern(AutonameVars):
            def __init__(v_self, lr, l2coef, **kwargs):
                lr = t.constant(lr)
                l2coef = t.constant(l2coef)
                input = t.matrix() # n_examples x n_inputs
                target = t.ivector() # len: n_examples
                W2, b2 = t.matrix(), t.vector()

                if hidden_layer:
                    hid, hid_params, hid_ivals, hid_regularization = hidden_layer(input)
                else:
                    W1, b1 = t.matrix(), t.vector()
                    hid = t.tanh(b1 + t.dot(input, W1))
                    hid_params = [W1, b1]
                    hid_regularization = l2coef * t.sum(W1*W1)
                    hid_ivals = lambda : [_randshape(ninputs, nhid), _randshape(nhid)]

                params = [W2, b2] + hid_params
                activations = b2 + t.dot(hid, W2)
                nll, predictions = nnet_ops.crossentropy_softmax_1hot(activations, target)
                regularization = l2coef * t.sum(W2*W2) + hid_regularization
                output_class = t.argmax(activations,1)
                loss_01 = t.neq(output_class, target)
                g_params = t.grad(nll + regularization, params)
                new_params = [t.sub_inplace(p, lr * gp) for p,gp in zip(params, g_params)]
                self.__dict__.update(locals()); del self.self
                AutonameVars.__init__(v_self, locals())
        self.nhid = nhid
        self.nclass = nclass
        self.v = V_intern(**locals()) if V_extern is None else V_extern(**locals())
        self.linker = linker
        self.early_stopper = early_stopper if early_stopper is not None else lambda: stopper.NStages(10,1)
        self.validation_portion = validation_portion

    def _fn(self, inputs, outputs):
        # Caching here would hamper multi-threaded apps
        # prefer caching in _Model.__call__
        return theano.function(inputs, outputs, unpack_single=False, linker=self.linker)

    def __call__(self, trainset=None, iparams=None, input='input', target='target'):
        """Allocate and optionally train a model"""
        if iparams is None:
            iparams = [_randshape(self.nhid, self.nclass), _randshape(self.nclass)]\
                    + self.v.hid_ivals()
        rval = _Model(self, iparams)
        if trainset:
            if len(trainset) == sys.maxint:
                raise NotImplementedError('Learning from infinite streams is not supported')
            nval = int(self.validation_portion * len(trainset))
            nmin = len(trainset) - nval
            assert nmin >= 0
            minset = trainset[:nmin] #real training set for minimizing loss
            valset = trainset[nmin:] #validation set for early stopping
            best = rval
            for stp in self.early_stopper():
                rval.update(
                    minset.minibatches([input, target], minibatch_size=min(32,
                        len(trainset))))
                #print 'mlp.__call__(), we did an update'
                if stp.set_score:
                    stp.score = rval(valset, ['loss_01'])
                    if (stp.score < stp.best_score):
                        best = copy.copy(rval)
            rval = best
        return rval


import unittest

class TestMLP(unittest.TestCase):
    def test0(self):

        training_set1 = dataset.ArrayDataSet(numpy.array([[0, 0, 0],
                                                         [0, 1, 1],
                                                         [1, 0, 1],
                                                         [1, 1, 1]]),
                                            {'input':slice(2),'target':2})
        training_set2 = dataset.ArrayDataSet(numpy.array([[0, 0, 0],
                                                         [0, 1, 1],
                                                         [1, 0, 0],
                                                         [1, 1, 1]]),
                                            {'input':slice(2),'target':2})
        test_data = dataset.ArrayDataSet(numpy.array([[0, 0, 0],
                                                         [0, 1, 1],
                                                         [1, 0, 0],
                                                         [1, 1, 1]]),
                                            {'input':slice(2)})

        learn_algo = MultiLayerPerceptron(2, 10, 2, .1
                , linker='c&py'
                , early_stopper = lambda:stopper.NStages(100,1))

        model1 = learn_algo(training_set1,input='input',target='target')

        model2 = learn_algo(training_set2)

        n_match = 0
        for o1, o2 in zip(model1(test_data), model2(test_data)):
            #print o1
            #print o2
            n_match += (o1 == o2) 

        assert n_match ==  (numpy.sum(training_set1.fields()['target'] ==
                training_set2.fields()['target']))

if __name__ == '__main__':
    unittest.main()