Mercurial > ift6266
view scripts/stacked_dae/stacked_convolutional_dae.py @ 138:128507ac4edf
Initial commit for the stacked convolutional denoising autoencoders
author | Owner <salahmeister@gmail.com> |
---|---|
date | Sun, 21 Feb 2010 17:30:38 -0600 |
parents | |
children |
line wrap: on
line source
import numpy import theano import time import theano.tensor as T from theano.tensor.shared_randomstreams import RandomStreams import theano.sandbox.softsign from theano.tensor.signal import downsample from theano.tensor.nnet import conv import gzip import cPickle class LogisticRegression(object): def __init__(self, input, n_in, n_out): self.W = theano.shared( value=numpy.zeros((n_in,n_out), dtype = theano.config.floatX) ) self.b = theano.shared( value=numpy.zeros((n_out,), dtype = theano.config.floatX) ) self.p_y_given_x = T.nnet.softmax(T.dot(input, self.W)+self.b) self.y_pred=T.argmax(self.p_y_given_x, axis=1) self.params = [self.W, self.b] def negative_log_likelihood(self, y): return -T.mean(T.log(self.p_y_given_x)[T.arange(y.shape[0]),y]) def MSE(self, y): return -T.mean(abs((self.p_y_given_x)[T.arange(y.shape[0]),y]-y)**2) def errors(self, y): if y.ndim != self.y_pred.ndim: raise TypeError('y should have the same shape as self.y_pred', ('y', target.type, 'y_pred', self.y_pred.type)) if y.dtype.startswith('int'): return T.mean(T.neq(self.y_pred, y)) else: raise NotImplementedError() class SigmoidalLayer(object): def __init__(self, rng, input, n_in, n_out): self.input = input W_values = numpy.asarray( rng.uniform( \ low = -numpy.sqrt(6./(n_in+n_out)), \ high = numpy.sqrt(6./(n_in+n_out)), \ size = (n_in, n_out)), dtype = theano.config.floatX) self.W = theano.shared(value = W_values) b_values = numpy.zeros((n_out,), dtype= theano.config.floatX) self.b = theano.shared(value= b_values) self.output = T.tanh(T.dot(input, self.W) + self.b) self.params = [self.W, self.b] class dA_conv(object): def __init__(self, corruption_level = 0.1, input = None, shared_W = None,\ shared_b = None, filter_shape = None, image_shape = None, poolsize = (2,2)): theano_rng = RandomStreams() fan_in = numpy.prod(filter_shape[1:]) fan_out = filter_shape[0] * numpy.prod(filter_shape[2:]) center = theano.shared(value = 1, name="center") scale = theano.shared(value = 2, name="scale") if shared_W != None and shared_b != None : self.W = shared_W self.b = shared_b else: initial_W = numpy.asarray( numpy.random.uniform( \ low = -numpy.sqrt(6./(fan_in+fan_out)), \ high = numpy.sqrt(6./(fan_in+fan_out)), \ size = filter_shape), dtype = theano.config.floatX) initial_b = numpy.zeros((filter_shape[0],), dtype= theano.config.floatX) self.W = theano.shared(value = initial_W, name = "W") self.b = theano.shared(value = initial_b, name = "b") initial_b_prime= numpy.zeros((filter_shape[1],)) self.W_prime=T.dtensor4('W_prime') self.b_prime = theano.shared(value = initial_b_prime, name = "b_prime") self.x = input self.tilde_x = theano_rng.binomial( self.x.shape, 1, 1 - corruption_level) * self.x conv1_out = conv.conv2d(self.tilde_x, self.W, \ filter_shape=filter_shape, \ image_shape=image_shape, border_mode='valid') self.y = T.tanh(conv1_out + self.b.dimshuffle('x', 0, 'x', 'x')) da_filter_shape = [ filter_shape[1], filter_shape[0], filter_shape[2],\ filter_shape[3] ] da_image_shape = [ image_shape[0],filter_shape[0],image_shape[2]-filter_shape[2]+1, \ image_shape[3]-filter_shape[3]+1 ] initial_W_prime = numpy.asarray( numpy.random.uniform( \ low = -numpy.sqrt(6./(fan_in+fan_out)), \ high = numpy.sqrt(6./(fan_in+fan_out)), \ size = da_filter_shape), dtype = theano.config.floatX) self.W_prime = theano.shared(value = initial_W_prime, name = "W_prime") #import pdb;pdb.set_trace() conv2_out = conv.conv2d(self.y, self.W_prime, \ filter_shape = da_filter_shape, image_shape = da_image_shape ,\ border_mode='full') self.z = (T.tanh(conv2_out + self.b_prime.dimshuffle('x', 0, 'x', 'x'))+center) / scale scaled_x = (self.x + center) / scale self.L = - T.sum( scaled_x*T.log(self.z) + (1-scaled_x)*T.log(1-self.z), axis=1 ) self.cost = T.mean(self.L) self.params = [ self.W, self.b, self.b_prime ] class LeNetConvPoolLayer(object): def __init__(self, rng, input, filter_shape, image_shape, poolsize=(2,2)): assert image_shape[1]==filter_shape[1] self.input = input W_values = numpy.zeros(filter_shape, dtype=theano.config.floatX) self.W = theano.shared(value = W_values) b_values = numpy.zeros((filter_shape[0],), dtype= theano.config.floatX) self.b = theano.shared(value= b_values) conv_out = conv.conv2d(input, self.W, filter_shape=filter_shape, image_shape=image_shape) fan_in = numpy.prod(filter_shape[1:]) fan_out = filter_shape[0] * numpy.prod(filter_shape[2:]) / numpy.prod(poolsize) W_bound = numpy.sqrt(6./(fan_in + fan_out)) self.W.value = numpy.asarray( rng.uniform(low=-W_bound, high=W_bound, size=filter_shape), dtype = theano.config.floatX) pooled_out = downsample.max_pool2D(conv_out, poolsize, ignore_border=True) self.output = T.tanh(pooled_out + self.b.dimshuffle('x', 0, 'x', 'x')) self.params = [self.W, self.b] class SdA(): def __init__(self, input, n_ins_conv, n_ins_mlp, train_set_x, train_set_y, batch_size, \ conv_hidden_layers_sizes, mlp_hidden_layers_sizes, corruption_levels, \ rng, n_out, pretrain_lr, finetune_lr): self.layers = [] self.pretrain_functions = [] self.params = [] self.conv_n_layers = len(conv_hidden_layers_sizes) self.mlp_n_layers = len(mlp_hidden_layers_sizes) index = T.lscalar() # index to a [mini]batch self.x = T.dmatrix('x') # the data is presented as rasterized images self.y = T.ivector('y') # the labels are presented as 1D vector of for i in xrange( self.conv_n_layers ): filter_shape=conv_hidden_layers_sizes[i][0] image_shape=conv_hidden_layers_sizes[i][1] max_poolsize=conv_hidden_layers_sizes[i][2] if i == 0 : layer_input=self.x.reshape((batch_size,1,28,28)) else: layer_input=self.layers[-1].output layer = LeNetConvPoolLayer(rng, input=layer_input, \ image_shape=image_shape, \ filter_shape=filter_shape,poolsize=max_poolsize) print 'Convolutional layer '+str(i+1)+' created' self.layers += [layer] self.params += layer.params da_layer = dA_conv(corruption_level = corruption_levels[0],\ input = layer_input, \ shared_W = layer.W, shared_b = layer.b,\ filter_shape = filter_shape , image_shape = image_shape ) gparams = T.grad(da_layer.cost, da_layer.params) updates = {} for param, gparam in zip(da_layer.params, gparams): updates[param] = param - gparam * pretrain_lr update_fn = theano.function([index], da_layer.cost, \ updates = updates, givens = { self.x : train_set_x[index*batch_size:(index+1)*batch_size]} ) self.pretrain_functions += [update_fn] for i in xrange( self.mlp_n_layers ): if i == 0 : input_size = n_ins_mlp else: input_size = mlp_hidden_layers_sizes[i-1] if i == 0 : if len( self.layers ) == 0 : layer_input=self.x else : layer_input = self.layers[-1].output.flatten(2) else: layer_input = self.layers[-1].output layer = SigmoidalLayer(rng, layer_input, input_size, mlp_hidden_layers_sizes[i] ) self.layers += [layer] self.params += layer.params print 'MLP layer '+str(i+1)+' created' self.logLayer = LogisticRegression(input=self.layers[-1].output, \ n_in=mlp_hidden_layers_sizes[-1], n_out=n_out) self.params += self.logLayer.params cost = self.logLayer.negative_log_likelihood(self.y) gparams = T.grad(cost, self.params) updates = {} for param,gparam in zip(self.params, gparams): updates[param] = param - gparam*finetune_lr self.finetune = theano.function([index], cost, updates = updates, givens = { self.x : train_set_x[index*batch_size:(index+1)*batch_size], self.y : train_set_y[index*batch_size:(index+1)*batch_size]} ) self.errors = self.logLayer.errors(self.y) def sgd_optimization_mnist( learning_rate=0.1, pretraining_epochs = 2, \ pretrain_lr = 0.01, training_epochs = 1000, \ dataset='mnist.pkl.gz'): f = gzip.open(dataset,'rb') train_set, valid_set, test_set = cPickle.load(f) f.close() def shared_dataset(data_xy): data_x, data_y = data_xy shared_x = theano.shared(numpy.asarray(data_x, dtype=theano.config.floatX)) shared_y = theano.shared(numpy.asarray(data_y, dtype=theano.config.floatX)) return shared_x, T.cast(shared_y, 'int32') test_set_x, test_set_y = shared_dataset(test_set) valid_set_x, valid_set_y = shared_dataset(valid_set) train_set_x, train_set_y = shared_dataset(train_set) batch_size = 500 # size of the minibatch n_train_batches = train_set_x.value.shape[0] / batch_size n_valid_batches = valid_set_x.value.shape[0] / batch_size n_test_batches = test_set_x.value.shape[0] / batch_size # allocate symbolic variables for the data index = T.lscalar() # index to a [mini]batch x = T.matrix('x') # the data is presented as rasterized images y = T.ivector('y') # the labels are presented as 1d vector of # [int] labels layer0_input = x.reshape((batch_size,1,28,28)) # Setup the convolutional layers with their DAs(add as many as you want) corruption_levels = [ 0.2, 0.2, 0.2] rng = numpy.random.RandomState(1234) ker1=2 ker2=2 conv_layers=[] conv_layers.append([[ker1,1,5,5], [batch_size,1,28,28], [2,2] ]) conv_layers.append([[ker2,ker1,5,5], [batch_size,ker1,12,12], [2,2] ]) # Setup the MLP layers of the network mlp_layers=[500] network = SdA(input = layer0_input, n_ins_conv = 28*28, n_ins_mlp = ker2*4*4, \ train_set_x = train_set_x, train_set_y = train_set_y, batch_size = batch_size, conv_hidden_layers_sizes = conv_layers, \ mlp_hidden_layers_sizes = mlp_layers, \ corruption_levels = corruption_levels , n_out = 10, \ rng = rng , pretrain_lr = pretrain_lr , finetune_lr = learning_rate ) test_model = theano.function([index], network.errors, givens = { network.x: test_set_x[index*batch_size:(index+1)*batch_size], network.y: test_set_y[index*batch_size:(index+1)*batch_size]}) validate_model = theano.function([index], network.errors, givens = { network.x: valid_set_x[index*batch_size:(index+1)*batch_size], network.y: valid_set_y[index*batch_size:(index+1)*batch_size]}) start_time = time.clock() for i in xrange(len(network.layers)-len(mlp_layers)): for epoch in xrange(pretraining_epochs): for batch_index in xrange(n_train_batches): c = network.pretrain_functions[i](batch_index) print 'pre-training convolution layer %i, epoch %d, cost '%(i,epoch),c patience = 10000 # look as this many examples regardless patience_increase = 2. # WAIT THIS MUCH LONGER WHEN A NEW BEST IS # FOUND improvement_threshold = 0.995 # a relative improvement of this much is validation_frequency = min(n_train_batches, patience/2) best_params = None best_validation_loss = float('inf') test_score = 0. start_time = time.clock() done_looping = False epoch = 0 while (epoch < training_epochs) and (not done_looping): epoch = epoch + 1 for minibatch_index in xrange(n_train_batches): cost_ij = network.finetune(minibatch_index) iter = epoch * n_train_batches + minibatch_index if (iter+1) % validation_frequency == 0: validation_losses = [validate_model(i) for i in xrange(n_valid_batches)] this_validation_loss = numpy.mean(validation_losses) print('epoch %i, minibatch %i/%i, validation error %f %%' % \ (epoch, minibatch_index+1, n_train_batches, \ this_validation_loss*100.)) # if we got the best validation score until now if this_validation_loss < best_validation_loss: #improve patience if loss improvement is good enough if this_validation_loss < best_validation_loss * \ improvement_threshold : patience = max(patience, iter * patience_increase) # save best validation score and iteration number best_validation_loss = this_validation_loss best_iter = iter # test it on the test set test_losses = [test_model(i) for i in xrange(n_test_batches)] test_score = numpy.mean(test_losses) print((' epoch %i, minibatch %i/%i, test error of best ' 'model %f %%') % (epoch, minibatch_index+1, n_train_batches, test_score*100.)) if patience <= iter : done_looping = True break end_time = time.clock() print(('Optimization complete with best validation score of %f %%,' 'with test performance %f %%') % (best_validation_loss * 100., test_score*100.)) print ('The code ran for %f minutes' % ((end_time-start_time)/60.)) if __name__ == '__main__': sgd_optimization_mnist()