Mercurial > ift6266
view scripts/stacked_dae.py @ 116:3bec123dd75d
changes on pipeline mecanism: we now sample a different complexity for each transformations, this because when we use the same sampled complexity for all the modules 1/8 of the time we are close to 0 and we obtain an image very close to the source, we now save a complexity for each module in the parameters array
author | Xavier Glorot <glorotxa@iro.umontreal.ca> |
---|---|
date | Wed, 17 Feb 2010 16:22:54 -0500 |
parents | 0b4080394f2c |
children | 4f37755d301b |
line wrap: on
line source
#!/usr/bin/python # coding: utf-8 # Code for stacked denoising autoencoder # Tests with MNIST # TODO: adapt for NIST # Based almost entirely on deeplearning.net tutorial, modifications by # François Savard # Base LogisticRegression, SigmoidalLayer, dA, SdA code taken # from the deeplearning.net tutorial. Refactored a bit. # Changes (mainly): # - splitted initialization in smaller methods # - removed the "givens" thing involving an index in the whole dataset # (to allow flexibility in how data is inputted... not necessarily one big tensor) # - changed the "driver" a lot, altough for the moment the same logic is used import time import theano import theano.tensor as T import theano.tensor.nnet from theano.tensor.shared_randomstreams import RandomStreams import numpy, numpy.random from pylearn.datasets import MNIST # from pylearn codebase def update_locals(obj, dct): if 'self' in dct: del dct['self'] obj.__dict__.update(dct) class LogisticRegression(object): def __init__(self, input, n_in, n_out): # initialize with 0 the weights W as a matrix of shape (n_in, n_out) self.W = theano.shared(value=numpy.zeros((n_in,n_out), dtype = theano.config.floatX), name='W') # initialize the baises b as a vector of n_out 0s self.b = theano.shared(value=numpy.zeros((n_out,), dtype = theano.config.floatX), name='b') # compute vector of class-membership probabilities in symbolic form self.p_y_given_x = T.nnet.softmax(T.dot(input, self.W)+self.b) # compute prediction as class whose probability is maximal in # symbolic form self.y_pred=T.argmax(self.p_y_given_x, axis=1) self.params = [self.W, self.b] def negative_log_likelihood(self, y): return -T.mean(T.log(self.p_y_given_x)[T.arange(y.shape[0]),y]) def errors(self, y): # check if y has same dimension of y_pred if y.ndim != self.y_pred.ndim: raise TypeError('y should have the same shape as self.y_pred', ('y', target.type, 'y_pred', self.y_pred.type)) # check if y is of the correct datatype if y.dtype.startswith('int'): # the T.neq operator returns a vector of 0s and 1s, where 1 # represents a mistake in prediction return T.mean(T.neq(self.y_pred, y)) else: raise NotImplementedError() class SigmoidalLayer(object): def __init__(self, rng, input, n_in, n_out): self.input = input W_values = numpy.asarray( rng.uniform( \ low = -numpy.sqrt(6./(n_in+n_out)), \ high = numpy.sqrt(6./(n_in+n_out)), \ size = (n_in, n_out)), dtype = theano.config.floatX) self.W = theano.shared(value = W_values) b_values = numpy.zeros((n_out,), dtype= theano.config.floatX) self.b = theano.shared(value= b_values) self.output = T.nnet.sigmoid(T.dot(input, self.W) + self.b) self.params = [self.W, self.b] class dA(object): def __init__(self, n_visible= 784, n_hidden= 500, \ corruption_level = 0.1, input = None, \ shared_W = None, shared_b = None): update_locals(self, locals()) self.init_randomizer() self.init_params() self.init_functions() def init_randomizer(self): # create a Theano random generator that gives symbolic random values self.theano_rng = RandomStreams() # create a numpy random generator self.numpy_rng = numpy.random.RandomState() def init_params(self): if self.shared_W != None and self.shared_b != None : self.W = self.shared_W self.b = self.shared_b else: # initial values for weights and biases # note : W' was written as `W_prime` and b' as `b_prime` # W is initialized with `initial_W` which is uniformely sampled # from -6./sqrt(n_visible+n_hidden) and 6./sqrt(n_hidden+n_visible) # the output of uniform if converted using asarray to dtype # theano.config.floatX so that the code is runable on GPU initial_W = numpy.asarray( self.numpy_rng.uniform( \ low = -numpy.sqrt(6./(n_hidden+n_visible)), \ high = numpy.sqrt(6./(n_hidden+n_visible)), \ size = (n_visible, n_hidden)), dtype = theano.config.floatX) initial_b = numpy.zeros(n_hidden) # theano shared variables for weights and biases self.W = theano.shared(value = initial_W, name = "W") self.b = theano.shared(value = initial_b, name = "b") initial_b_prime= numpy.zeros(self.n_visible) # tied weights, therefore W_prime is W transpose self.W_prime = self.W.T self.b_prime = theano.shared(value = initial_b_prime, name = "b'") def init_functions(self): # if no input is given, generate a variable representing the input if self.input == None : # we use a matrix because we expect a minibatch of several examples, # each example being a row self.x = T.dmatrix(name = 'input') else: self.x = self.input # keep 90% of the inputs the same and zero-out randomly selected subset of # 10% of the inputs # note : first argument of theano.rng.binomial is the shape(size) of # random numbers that it should produce # second argument is the number of trials # third argument is the probability of success of any trial # # this will produce an array of 0s and 1s where 1 has a # probability of 1 - ``corruption_level`` and 0 with # ``corruption_level`` self.tilde_x = self.theano_rng.binomial(self.x.shape, 1, 1-self.corruption_level) * self.x # using tied weights self.y = T.nnet.sigmoid(T.dot(self.tilde_x, self.W) + self.b) self.z = T.nnet.sigmoid(T.dot(self.y, self.W_prime) + self.b_prime) self.L = - T.sum( self.x*T.log(self.z) + (1-self.x)*T.log(1-self.z), axis=1 ) # note : L is now a vector, where each element is the cross-entropy cost # of the reconstruction of the corresponding example of the # minibatch. We need to compute the average of all these to get # the cost of the minibatch self.cost = T.mean(self.L) self.params = [ self.W, self.b, self.b_prime ] class SdA(): def __init__(self, batch_size, n_ins, hidden_layers_sizes, n_outs, corruption_levels, rng, pretrain_lr, finetune_lr): update_locals(self, locals()) self.layers = [] self.pretrain_functions = [] self.params = [] self.n_layers = len(hidden_layers_sizes) if len(hidden_layers_sizes) < 1 : raiseException (' You must have at least one hidden layer ') # allocate symbolic variables for the data self.x = T.matrix('x') # the data is presented as rasterized images self.y = T.ivector('y') # the labels are presented as 1D vector of # [int] labels self.create_layers() self.init_finetuning() def create_layers(self): for i in xrange( self.n_layers ): # construct the sigmoidal layer # the size of the input is either the number of hidden units of # the layer below or the input size if we are on the first layer if i == 0 : input_size = self.n_ins else: input_size = self.hidden_layers_sizes[i-1] # the input to this layer is either the activation of the hidden # layer below or the input of the SdA if you are on the first # layer if i == 0 : layer_input = self.x else: layer_input = self.layers[-1].output layer = SigmoidalLayer(self.rng, layer_input, input_size, self.hidden_layers_sizes[i] ) # add the layer to the self.layers += [layer] self.params += layer.params # Construct a denoising autoencoder that shared weights with this # layer dA_layer = dA(input_size, self.hidden_layers_sizes[i], \ corruption_level = self.corruption_levels[0],\ input = layer_input, \ shared_W = layer.W, shared_b = layer.b) self.init_updates_for_layer(dA_layer) def init_updates_for_layer(self, dA_layer): # Construct a function that trains this dA # compute gradients of layer parameters gparams = T.grad(dA_layer.cost, dA_layer.params) # compute the list of updates updates = {} for param, gparam in zip(dA_layer.params, gparams): updates[param] = param - gparam * self.pretrain_lr # create a function that trains the dA update_fn = theano.function([self.x], dA_layer.cost, \ updates = updates) # collect this function into a list self.pretrain_functions += [update_fn] def init_finetuning(self): # We now need to add a logistic layer on top of the MLP self.logLayer = LogisticRegression(\ input = self.layers[-1].output,\ n_in = self.hidden_layers_sizes[-1], n_out = self.n_outs) self.params += self.logLayer.params # construct a function that implements one step of finetunining # compute the cost, defined as the negative log likelihood cost = self.logLayer.negative_log_likelihood(self.y) # compute the gradients with respect to the model parameters gparams = T.grad(cost, self.params) # compute list of updates updates = {} for param,gparam in zip(self.params, gparams): updates[param] = param - gparam*self.finetune_lr self.finetune = theano.function([self.x, self.y], cost, updates = updates) # symbolic variable that points to the number of errors made on the # minibatch given by self.x and self.y self.errors = self.logLayer.errors(self.y) class MnistIterators: def __init__(self, minibatch_size): self.minibatch_size = minibatch_size self.mnist = MNIST.first_1k() self.len_train = len(self.mnist.train.x) self.len_valid = len(self.mnist.valid.x) self.len_test = len(self.mnist.test.x) def train_x_batches(self): idx = 0 while idx < len(self.mnist.train.x): yield self.mnist.train.x[idx:idx+self.minibatch_size] idx += self.minibatch_size def train_xy_batches(self): idx = 0 while idx < len(self.mnist.train.x): mb_x = self.mnist.train.x[idx:idx+self.minibatch_size] mb_y = self.mnist.train.y[idx:idx+self.minibatch_size] yield mb_x, mb_y idx += self.minibatch_size def valid_xy_batches(self): idx = 0 while idx < len(self.mnist.valid.x): mb_x = self.mnist.valid.x[idx:idx+self.minibatch_size] mb_y = self.mnist.valid.y[idx:idx+self.minibatch_size] yield mb_x, mb_y idx += self.minibatch_size class MnistTrainingDriver: def __init__(self, rng=numpy.random): self.rng = rng self.init_SdA() def init_SdA(self): # Hyperparam hidden_layers_sizes = [1000, 1000, 1000] n_outs = 10 corruption_levels = [0.2, 0.2, 0.2] minibatch_size = 10 pretrain_lr = 0.001 finetune_lr = 0.001 update_locals(self, locals()) self.mnist = MnistIterators(minibatch_size) # construct the stacked denoising autoencoder class self.classifier = SdA( batch_size = minibatch_size, \ n_ins=28*28, \ hidden_layers_sizes = hidden_layers_sizes, \ n_outs=n_outs, \ corruption_levels = corruption_levels,\ rng = self.rng,\ pretrain_lr = pretrain_lr, \ finetune_lr = finetune_lr) def compute_validation_error(self): validation_error = 0.0 count = 0 for mb_x, mb_y in self.mnist.valid_xy_batches(): validation_error += self.classifier.errors(mb_x, mb_y) count += 1 return float(validation_error) / count def pretrain(self): pretraining_epochs = 20 for layer_idx, update_fn in enumerate(self.classifier.pretrain_functions): for epoch in xrange(pretraining_epochs): # go through the training set cost_acc = 0.0 for i, mb_x in enumerate(self.mnist.train_x_batches()): cost_acc += update_fn(mb_x) if i % 100 == 0: print i, "avg err = ", cost_acc / 100.0 cost_acc = 0.0 print 'Pre-training layer %d, epoch %d' % (layer_idx, epoch) def finetune(self): max_training_epochs = 1000 n_train_batches = self.mnist.len_train / self.minibatch_size # early-stopping parameters patience = 10000 # look as this many examples regardless patience_increase = 2. # wait this much longer when a new best is # found improvement_threshold = 0.995 # a relative improvement of this much is # considered significant validation_frequency = min(n_train_batches, patience/2) # go through this many # minibatche before checking the network # on the validation set; in this case we # check every epoch # TODO: use this best_params = None best_validation_loss = float('inf') test_score = 0. start_time = time.clock() done_looping = False epoch = 0 while (epoch < max_training_epochs) and (not done_looping): epoch = epoch + 1 for minibatch_index, (mb_x, mb_y) in enumerate(self.mnist.train_xy_batches()): cost_ij = classifier.finetune(mb_x, mb_y) iter = epoch * n_train_batches + minibatch_index if (iter+1) % validation_frequency == 0: this_validation_loss = self.compute_validation_error() print('epoch %i, minibatch %i/%i, validation error %f %%' % \ (epoch, minibatch_index+1, n_train_batches, \ this_validation_loss*100.)) # if we got the best validation score until now if this_validation_loss < best_validation_loss: #improve patience if loss improvement is good enough if this_validation_loss < best_validation_loss * \ improvement_threshold : patience = max(patience, iter * patience_increase) print "Improving patience" # save best validation score and iteration number best_validation_loss = this_validation_loss best_iter = iter # test it on the test set #test_losses = [test_model(i) for i in xrange(n_test_batches)] #test_score = numpy.mean(test_losses) #print((' epoch %i, minibatch %i/%i, test error of best ' # 'model %f %%') % # (epoch, minibatch_index+1, n_train_batches, # test_score*100.)) if patience <= iter : done_looping = True break def train(): driver = MnistTrainingDriver() start_time = time.clock() driver.pretrain() print "PRETRAINING DONE. STARTING FINETUNING." driver.finetune() end_time = time.clock() if __name__ == '__main__': train()