# HG changeset patch # User Yoshua Bengio # Date 1210996907 14400 # Node ID ffd50efefb705bb116a72e6831a9a7b140f945ae # Parent 50a8302addaff3b31b368569461444fccf87ab25 work in progress denoising auto-encoder diff -r 50a8302addaf -r ffd50efefb70 denoising_aa.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/denoising_aa.py Sat May 17 00:01:47 2008 -0400 @@ -0,0 +1,187 @@ +""" +A denoising auto-encoder +""" + +import theano +from theano.formula import * +from learner import * +from theano import tensor as t +from nnet_ops import * +import math +from misc import * +from theano.tensor_random import binomial + +def hiding_corruption_formula(seed,average_fraction_hidden): + """ + Return a formula for the corruption process, in which a random + subset of the input numbers are hidden (mapped to 0). + + @param seed: seed of the random generator + @type seed: anything that numpy.random.RandomState accepts + + @param average_fraction_hidden: the probability with which each + input number is hidden (set to 0). + @type average_fraction_hidden: 0 <= real number <= 1 + """ + class HidingCorruptionFormula(Formulas): + x = t.matrix() + corrupted_x = x * binomial(seed,x,1,fraction_sampled) + + return HidingCorruptionFormula() + +def squash_affine_formula(squash_function=sigmoid): + """ + By convention prefix the parameters by _ + """ + class SquashAffineFormula(Formulas): + x = t.matrix() # of dimensions minibatch_size x n_inputs + _b = t.row() # of dimensions 1 x n_outputs + _W = t.matrix() # of dimensions n_inputs x n_outputs + a = _b + t.dot(x,_W) # of dimensions minibatch_size x n_outputs + y = squash_function(a) + return SquashAffineFormula() + +def gradient_descent_update_formula(): + class GradientDescentUpdateFormula(Formula): + param = t.matrix() + learning_rate = t.scalar() + cost = t.column() # cost of each example in a minibatch + param_update = t.add_inplace(param, -learning_rate*t.sgrad(cost)) + return gradient_descent_update_formula() + +def probabilistic_classifier_loss_formula(): + class ProbabilisticClassifierLossFormula(Formulas): + a = t.matrix() # of dimensions minibatch_size x n_classes, pre-softmax output + target_class = t.ivector() # dimension (minibatch_size) + nll, probability_predictions = crossentropy_softmax_1hot(a, target_class) + return ProbabilisticClassifierLossFormula() + +def binomial_cross_entropy_formula(): + class BinomialCrossEntropyFormula(Formulas): + a = t.matrix() # pre-sigmoid activations, minibatch_size x dim + p = sigmoid(a) # model prediction + q = t.matrix() # target binomial probabilities, minibatch_size x dim + # using the identity softplus(a) - softplus(-a) = a, + # we obtain that q log(p) + (1-q) log(1-p) = q a - softplus(a) + nll = -t.sum(q*a - softplus(-a)) + +def squash_affine_autoencoder_formula(hidden_squash=t.tanh, + reconstruction_squash=sigmoid, + share_weights=True, + reconstruction_nll_formula=binomial_cross_entropy_formula(), + update_formula=gradient_descent_update_formula): + if share_weights: + autoencoder = squash_affine_formula(hidden_squash).rename(a='code_a') + \ + squash_affine_formula(reconstruction_squash).rename(x='hidden',y='reconstruction',_b='_c') + \ + reconstruction_nll_formula + else: + autoencoder = squash_affine_formula(hidden_squash).rename(a='code_a',_W='_W1') + \ + squash_affine_formula(reconstruction_squash).rename(x='hidden',y='reconstruction',_b='_c',_W='_W2') + \ + reconstruction_nll_formula + autoencoder = autoencoder + [update_formula().rename(cost = 'nll', + param = p) + for p in autoencoder.get_all('_.*')] + return autoencoder + + +# @todo: try other corruption formulae. The above is the default one. +# not quite used in the ICML paper... (had a fixed number of 0s). + +class DenoisingAutoEncoder(LearningAlgorithm): + + def __init__(self,n_inputs,n_hidden_per_layer, + learning_rate=0.1, + max_n_epochs=100, + L1_regularizer=0, + init_range=1., + corruption_formula = hiding_corruption_formula(), + autoencoder = squash_affine_autoencoder_formula(), + minibatch_size=None,linker = "c|py"): + for name,val in locals().items(): + if val is not self: self.__setattribute__(name,val) + self.denoising_autoencoder_formula = corruption_formula + autoencoder.rename(x='corrupted_x') + + def __call__(self, training_set=None): + model = DenoisingAutoEncoderModel(self) + if training_set: + + def compile(self, inputs, outputs): + return theano.function(inputs,outputs,unpack_single=False,linker=self.linker) + +class DenoisingAutoEncoderModel(LearnerModel): + def __init__(self,learning_algorithm,params): + self.learning_algorithm=learning_algorithm + self.params=params + v = learning_algorithm.v + self.update_fn = learning_algorithm.compile(learning_algorithm.denoising_autoencoder_formula.inputs, + learning_algorithm.denoising_autoencoder_formula.outputs) + + def update(self, training_set, train_stats_collector=None): + + +# old stuff + +# self._learning_rate = t.scalar('learning_rate') # this is the symbol +# self.L1_regularizer = L1_regularizer +# self._L1_regularizer = t.scalar('L1_regularizer') +# self._input = t.matrix('input') # n_examples x n_inputs +# self._W = t.matrix('W') +# self._b = t.row('b') +# self._c = t.row('b') +# self._regularization_term = self._L1_regularizer * t.sum(t.abs(self._W)) +# self._corrupted_input = corruption_process(self._input) +# self._hidden = t.tanh(self._b + t.dot(self._input, self._W.T)) +# self._reconstruction_activations =self._c+t.dot(self._hidden,self._W) +# self._nll,self._output = crossentropy_softmax_1hot(Print("output_activations")(self._output_activations),self._target_vector) +# self._output_class = t.argmax(self._output,1) +# self._class_error = t.neq(self._output_class,self._target_vector) +# self._minibatch_criterion = self._nll + self._regularization_term / t.shape(self._input)[0] +# OnlineGradientTLearner.__init__(self) + +# def attributeNames(self): +# return ["parameters","b1","W2","b2","W2", "L2_regularizer","regularization_term"] + +# def parameterAttributes(self): +# return ["b1","W1", "b2", "W2"] + +# def updateMinibatchInputFields(self): +# return ["input","target"] + +# def updateEndOutputAttributes(self): +# return ["regularization_term"] + +# def lossAttribute(self): +# return "minibatch_criterion" + +# def defaultOutputFields(self, input_fields): +# output_fields = ["output", "output_class",] +# if "target" in input_fields: +# output_fields += ["class_error", "nll"] +# return output_fields + +# def allocate(self,minibatch): +# minibatch_n_inputs = minibatch["input"].shape[1] +# if not self._n_inputs: +# self._n_inputs = minibatch_n_inputs +# self.b1 = numpy.zeros((1,self._n_hidden)) +# self.b2 = numpy.zeros((1,self._n_outputs)) +# self.forget() +# elif self._n_inputs!=minibatch_n_inputs: +# # if the input changes dimension on the fly, we resize and forget everything +# self.forget() + +# def forget(self): +# if self._n_inputs: +# r = self._init_range/math.sqrt(self._n_inputs) +# self.W1 = numpy.random.uniform(low=-r,high=r, +# size=(self._n_hidden,self._n_inputs)) +# r = self._init_range/math.sqrt(self._n_hidden) +# self.W2 = numpy.random.uniform(low=-r,high=r, +# size=(self._n_outputs,self._n_hidden)) +# self.b1[:]=0 +# self.b2[:]=0 +# self._n_epochs=0 + +# def isLastEpoch(self): +# self._n_epochs +=1 +# return self._n_epochs>=self._max_n_epochs