# HG changeset patch # User fsavard # Date 1266416959 18000 # Node ID 0b4080394f2c084ad1f522f62195d7849e26840f # Parent 291d749452df299d5dce404984dcf42081d86e2d Added stacked DAE code for my experiments, based on tutorial code. Quite unfinished. diff -r 291d749452df -r 0b4080394f2c scripts/stacked_dae.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/scripts/stacked_dae.py Wed Feb 17 09:29:19 2010 -0500 @@ -0,0 +1,422 @@ +#!/usr/bin/python +# coding: utf-8 + +# Code for stacked denoising autoencoder +# Tests with MNIST +# TODO: adapt for NIST +# Based almost entirely on deeplearning.net tutorial, modifications by +# François Savard + +# Base LogisticRegression, SigmoidalLayer, dA, SdA code taken +# from the deeplearning.net tutorial. Refactored a bit. +# Changes (mainly): +# - splitted initialization in smaller methods +# - removed the "givens" thing involving an index in the whole dataset +# (to allow flexibility in how data is inputted... not necessarily one big tensor) +# - changed the "driver" a lot, altough for the moment the same logic is used + +import time +import theano +import theano.tensor as T +import theano.tensor.nnet +from theano.tensor.shared_randomstreams import RandomStreams +import numpy, numpy.random + +from pylearn.datasets import MNIST + + +# from pylearn codebase +def update_locals(obj, dct): + if 'self' in dct: + del dct['self'] + obj.__dict__.update(dct) + + +class LogisticRegression(object): + def __init__(self, input, n_in, n_out): + # initialize with 0 the weights W as a matrix of shape (n_in, n_out) + self.W = theano.shared(value=numpy.zeros((n_in,n_out), dtype = theano.config.floatX), + name='W') + # initialize the baises b as a vector of n_out 0s + self.b = theano.shared(value=numpy.zeros((n_out,), dtype = theano.config.floatX), + name='b') + + # compute vector of class-membership probabilities in symbolic form + self.p_y_given_x = T.nnet.softmax(T.dot(input, self.W)+self.b) + + # compute prediction as class whose probability is maximal in + # symbolic form + self.y_pred=T.argmax(self.p_y_given_x, axis=1) + + self.params = [self.W, self.b] + + def negative_log_likelihood(self, y): + return -T.mean(T.log(self.p_y_given_x)[T.arange(y.shape[0]),y]) + + def errors(self, y): + # check if y has same dimension of y_pred + if y.ndim != self.y_pred.ndim: + raise TypeError('y should have the same shape as self.y_pred', + ('y', target.type, 'y_pred', self.y_pred.type)) + # check if y is of the correct datatype + if y.dtype.startswith('int'): + # the T.neq operator returns a vector of 0s and 1s, where 1 + # represents a mistake in prediction + return T.mean(T.neq(self.y_pred, y)) + else: + raise NotImplementedError() + + +class SigmoidalLayer(object): + def __init__(self, rng, input, n_in, n_out): + self.input = input + + W_values = numpy.asarray( rng.uniform( \ + low = -numpy.sqrt(6./(n_in+n_out)), \ + high = numpy.sqrt(6./(n_in+n_out)), \ + size = (n_in, n_out)), dtype = theano.config.floatX) + self.W = theano.shared(value = W_values) + + b_values = numpy.zeros((n_out,), dtype= theano.config.floatX) + self.b = theano.shared(value= b_values) + + self.output = T.nnet.sigmoid(T.dot(input, self.W) + self.b) + self.params = [self.W, self.b] + + +class dA(object): + def __init__(self, n_visible= 784, n_hidden= 500, \ + corruption_level = 0.1, input = None, \ + shared_W = None, shared_b = None): + update_locals(self, locals()) + + self.init_randomizer() + self.init_params() + self.init_functions() + + def init_randomizer(self): + # create a Theano random generator that gives symbolic random values + self.theano_rng = RandomStreams() + # create a numpy random generator + self.numpy_rng = numpy.random.RandomState() + + def init_params(self): + if self.shared_W != None and self.shared_b != None : + self.W = self.shared_W + self.b = self.shared_b + else: + # initial values for weights and biases + # note : W' was written as `W_prime` and b' as `b_prime` + + # W is initialized with `initial_W` which is uniformely sampled + # from -6./sqrt(n_visible+n_hidden) and 6./sqrt(n_hidden+n_visible) + # the output of uniform if converted using asarray to dtype + # theano.config.floatX so that the code is runable on GPU + initial_W = numpy.asarray( self.numpy_rng.uniform( \ + low = -numpy.sqrt(6./(n_hidden+n_visible)), \ + high = numpy.sqrt(6./(n_hidden+n_visible)), \ + size = (n_visible, n_hidden)), dtype = theano.config.floatX) + initial_b = numpy.zeros(n_hidden) + + # theano shared variables for weights and biases + self.W = theano.shared(value = initial_W, name = "W") + self.b = theano.shared(value = initial_b, name = "b") + + initial_b_prime= numpy.zeros(self.n_visible) + # tied weights, therefore W_prime is W transpose + self.W_prime = self.W.T + self.b_prime = theano.shared(value = initial_b_prime, name = "b'") + + def init_functions(self): + # if no input is given, generate a variable representing the input + if self.input == None : + # we use a matrix because we expect a minibatch of several examples, + # each example being a row + self.x = T.dmatrix(name = 'input') + else: + self.x = self.input + + # keep 90% of the inputs the same and zero-out randomly selected subset of + # 10% of the inputs + # note : first argument of theano.rng.binomial is the shape(size) of + # random numbers that it should produce + # second argument is the number of trials + # third argument is the probability of success of any trial + # + # this will produce an array of 0s and 1s where 1 has a + # probability of 1 - ``corruption_level`` and 0 with + # ``corruption_level`` + self.tilde_x = self.theano_rng.binomial(self.x.shape, 1, 1-self.corruption_level) * self.x + # using tied weights + self.y = T.nnet.sigmoid(T.dot(self.tilde_x, self.W) + self.b) + self.z = T.nnet.sigmoid(T.dot(self.y, self.W_prime) + self.b_prime) + self.L = - T.sum( self.x*T.log(self.z) + (1-self.x)*T.log(1-self.z), axis=1 ) + # note : L is now a vector, where each element is the cross-entropy cost + # of the reconstruction of the corresponding example of the + # minibatch. We need to compute the average of all these to get + # the cost of the minibatch + self.cost = T.mean(self.L) + + self.params = [ self.W, self.b, self.b_prime ] + +class SdA(): + def __init__(self, batch_size, n_ins, + hidden_layers_sizes, n_outs, + corruption_levels, rng, pretrain_lr, finetune_lr): + update_locals(self, locals()) + + self.layers = [] + self.pretrain_functions = [] + self.params = [] + self.n_layers = len(hidden_layers_sizes) + + if len(hidden_layers_sizes) < 1 : + raiseException (' You must have at least one hidden layer ') + + # allocate symbolic variables for the data + self.x = T.matrix('x') # the data is presented as rasterized images + self.y = T.ivector('y') # the labels are presented as 1D vector of + # [int] labels + + self.create_layers() + self.init_finetuning() + + def create_layers(self): + for i in xrange( self.n_layers ): + # construct the sigmoidal layer + + # the size of the input is either the number of hidden units of + # the layer below or the input size if we are on the first layer + if i == 0 : + input_size = self.n_ins + else: + input_size = self.hidden_layers_sizes[i-1] + + # the input to this layer is either the activation of the hidden + # layer below or the input of the SdA if you are on the first + # layer + if i == 0 : + layer_input = self.x + else: + layer_input = self.layers[-1].output + + layer = SigmoidalLayer(self.rng, layer_input, input_size, + self.hidden_layers_sizes[i] ) + # add the layer to the + self.layers += [layer] + self.params += layer.params + + # Construct a denoising autoencoder that shared weights with this + # layer + dA_layer = dA(input_size, self.hidden_layers_sizes[i], \ + corruption_level = self.corruption_levels[0],\ + input = layer_input, \ + shared_W = layer.W, shared_b = layer.b) + + self.init_updates_for_layer(dA_layer) + + def init_updates_for_layer(self, dA_layer): + # Construct a function that trains this dA + # compute gradients of layer parameters + gparams = T.grad(dA_layer.cost, dA_layer.params) + # compute the list of updates + updates = {} + for param, gparam in zip(dA_layer.params, gparams): + updates[param] = param - gparam * self.pretrain_lr + + # create a function that trains the dA + update_fn = theano.function([self.x], dA_layer.cost, \ + updates = updates) + + # collect this function into a list + self.pretrain_functions += [update_fn] + + def init_finetuning(self): + # We now need to add a logistic layer on top of the MLP + self.logLayer = LogisticRegression(\ + input = self.layers[-1].output,\ + n_in = self.hidden_layers_sizes[-1], n_out = self.n_outs) + + self.params += self.logLayer.params + # construct a function that implements one step of finetunining + + # compute the cost, defined as the negative log likelihood + cost = self.logLayer.negative_log_likelihood(self.y) + # compute the gradients with respect to the model parameters + gparams = T.grad(cost, self.params) + # compute list of updates + updates = {} + for param,gparam in zip(self.params, gparams): + updates[param] = param - gparam*self.finetune_lr + + self.finetune = theano.function([self.x, self.y], cost, + updates = updates) + + # symbolic variable that points to the number of errors made on the + # minibatch given by self.x and self.y + + self.errors = self.logLayer.errors(self.y) + +class MnistIterators: + def __init__(self, minibatch_size): + self.minibatch_size = minibatch_size + + self.mnist = MNIST.first_1k() + + self.len_train = len(self.mnist.train.x) + self.len_valid = len(self.mnist.valid.x) + self.len_test = len(self.mnist.test.x) + + def train_x_batches(self): + idx = 0 + while idx < len(self.mnist.train.x): + yield self.mnist.train.x[idx:idx+self.minibatch_size] + idx += self.minibatch_size + + def train_xy_batches(self): + idx = 0 + while idx < len(self.mnist.train.x): + mb_x = self.mnist.train.x[idx:idx+self.minibatch_size] + mb_y = self.mnist.train.y[idx:idx+self.minibatch_size] + yield mb_x, mb_y + idx += self.minibatch_size + + def valid_xy_batches(self): + idx = 0 + while idx < len(self.mnist.valid.x): + mb_x = self.mnist.valid.x[idx:idx+self.minibatch_size] + mb_y = self.mnist.valid.y[idx:idx+self.minibatch_size] + yield mb_x, mb_y + idx += self.minibatch_size + + +class MnistTrainingDriver: + def __init__(self, rng=numpy.random): + self.rng = rng + + self.init_SdA() + + def init_SdA(self): + # Hyperparam + hidden_layers_sizes = [1000, 1000, 1000] + n_outs = 10 + corruption_levels = [0.2, 0.2, 0.2] + minibatch_size = 10 + pretrain_lr = 0.001 + finetune_lr = 0.001 + + update_locals(self, locals()) + + self.mnist = MnistIterators(minibatch_size) + + # construct the stacked denoising autoencoder class + self.classifier = SdA( batch_size = minibatch_size, \ + n_ins=28*28, \ + hidden_layers_sizes = hidden_layers_sizes, \ + n_outs=n_outs, \ + corruption_levels = corruption_levels,\ + rng = self.rng,\ + pretrain_lr = pretrain_lr, \ + finetune_lr = finetune_lr) + + def compute_validation_error(self): + validation_error = 0.0 + + count = 0 + for mb_x, mb_y in self.mnist.valid_xy_batches(): + validation_error += self.classifier.errors(mb_x, mb_y) + count += 1 + + return float(validation_error) / count + + def pretrain(self): + pretraining_epochs = 20 + + for layer_idx, update_fn in enumerate(self.classifier.pretrain_functions): + for epoch in xrange(pretraining_epochs): + # go through the training set + cost_acc = 0.0 + for i, mb_x in enumerate(self.mnist.train_x_batches()): + cost_acc += update_fn(mb_x) + + if i % 100 == 0: + print i, "avg err = ", cost_acc / 100.0 + cost_acc = 0.0 + print 'Pre-training layer %d, epoch %d' % (layer_idx, epoch) + + def finetune(self): + max_training_epochs = 1000 + + n_train_batches = self.mnist.len_train / self.minibatch_size + + # early-stopping parameters + patience = 10000 # look as this many examples regardless + patience_increase = 2. # wait this much longer when a new best is + # found + improvement_threshold = 0.995 # a relative improvement of this much is + # considered significant + validation_frequency = min(n_train_batches, patience/2) + # go through this many + # minibatche before checking the network + # on the validation set; in this case we + # check every epoch + + + # TODO: use this + best_params = None + best_validation_loss = float('inf') + test_score = 0. + start_time = time.clock() + + done_looping = False + epoch = 0 + + while (epoch < max_training_epochs) and (not done_looping): + epoch = epoch + 1 + for minibatch_index, (mb_x, mb_y) in enumerate(self.mnist.train_xy_batches()): + cost_ij = classifier.finetune(mb_x, mb_y) + iter = epoch * n_train_batches + minibatch_index + + if (iter+1) % validation_frequency == 0: + this_validation_loss = self.compute_validation_error() + print('epoch %i, minibatch %i/%i, validation error %f %%' % \ + (epoch, minibatch_index+1, n_train_batches, \ + this_validation_loss*100.)) + + # if we got the best validation score until now + if this_validation_loss < best_validation_loss: + + #improve patience if loss improvement is good enough + if this_validation_loss < best_validation_loss * \ + improvement_threshold : + patience = max(patience, iter * patience_increase) + print "Improving patience" + + # save best validation score and iteration number + best_validation_loss = this_validation_loss + best_iter = iter + + # test it on the test set + #test_losses = [test_model(i) for i in xrange(n_test_batches)] + #test_score = numpy.mean(test_losses) + #print((' epoch %i, minibatch %i/%i, test error of best ' + # 'model %f %%') % + # (epoch, minibatch_index+1, n_train_batches, + # test_score*100.)) + + + if patience <= iter : + done_looping = True + break + +def train(): + driver = MnistTrainingDriver() + start_time = time.clock() + driver.pretrain() + print "PRETRAINING DONE. STARTING FINETUNING." + driver.finetune() + end_time = time.clock() + +if __name__ == '__main__': + train() +