Mercurial > pylearn
view sandbox/denoising_aa.py @ 494:02a331ba833b
merge
author | Joseph Turian <turian@gmail.com> |
---|---|
date | Tue, 28 Oct 2008 11:40:56 -0400 |
parents | cf22ebfc90eb |
children |
line wrap: on
line source
""" A denoising auto-encoder @warning: You should use this interface. It is not complete and is not functional. Instead, use:: ssh://projects@lgcm.iro.umontreal.ca/repos/denoising_aa """ import theano from theano.formula import * from learner import * from theano import tensor as t from nnet_ops import * import math from misc import * from misc_theano import * from theano.tensor_random import binomial def hiding_corruption_formula(seed,average_fraction_hidden): """ Return a formula for the corruption process, in which a random subset of the input numbers are hidden (mapped to 0). @param seed: seed of the random generator @type seed: anything that numpy.random.RandomState accepts @param average_fraction_hidden: the probability with which each input number is hidden (set to 0). @type average_fraction_hidden: 0 <= real number <= 1 """ class HidingCorruptionFormula(Formulas): x = t.matrix() corrupted_x = x * binomial(seed,x,1,fraction_sampled) return HidingCorruptionFormula() def squash_affine_formula(squash_function=sigmoid): """ Simply does: squash_function(b + xW) By convention prefix the parameters by _ """ class SquashAffineFormula(Formulas): x = t.matrix() # of dimensions minibatch_size x n_inputs _b = t.row() # of dimensions 1 x n_outputs _W = t.matrix() # of dimensions n_inputs x n_outputs a = _b + t.dot(x,_W) # of dimensions minibatch_size x n_outputs y = squash_function(a) return SquashAffineFormula() def gradient_descent_update_formula(): class GradientDescentUpdateFormula(Formula): param = t.matrix() learning_rate = t.scalar() cost = t.column() # cost of each example in a minibatch param_update = t.add_inplace(param, -learning_rate*t.sgrad(cost)) return gradient_descent_update_formula() def probabilistic_classifier_loss_formula(): class ProbabilisticClassifierLossFormula(Formulas): a = t.matrix() # of dimensions minibatch_size x n_classes, pre-softmax output target_class = t.ivector() # dimension (minibatch_size) nll, probability_predictions = crossentropy_softmax_1hot(a, target_class) # defined in nnet_ops.py return ProbabilisticClassifierLossFormula() def binomial_cross_entropy_formula(): class BinomialCrossEntropyFormula(Formulas): a = t.matrix() # pre-sigmoid activations, minibatch_size x dim p = sigmoid(a) # model prediction q = t.matrix() # target binomial probabilities, minibatch_size x dim # using the identity softplus(a) - softplus(-a) = a, # we obtain that q log(p) + (1-q) log(1-p) = q a - softplus(a) nll = -t.sum(q*a - softplus(-a)) # next line was missing... hope it's all correct above return BinomialCrossEntropyFormula() def squash_affine_autoencoder_formula(hidden_squash=t.tanh, reconstruction_squash=sigmoid, share_weights=True, reconstruction_nll_formula=binomial_cross_entropy_formula(), update_formula=gradient_descent_update_formula): if share_weights: autoencoder = squash_affine_formula(hidden_squash).rename(a='code_a') + \ squash_affine_formula(reconstruction_squash).rename(x='hidden',y='reconstruction',_b='_c') + \ reconstruction_nll_formula else: autoencoder = squash_affine_formula(hidden_squash).rename(a='code_a',_W='_W1') + \ squash_affine_formula(reconstruction_squash).rename(x='hidden',y='reconstruction',_b='_c',_W='_W2') + \ reconstruction_nll_formula autoencoder = autoencoder + [update_formula().rename(cost = 'nll', param = p) for p in autoencoder.get_all('_.*')] return autoencoder # @todo: try other corruption formulae. The above is the default one. # not quite used in the ICML paper... (had a fixed number of 0s). class DenoisingAutoEncoder(LearningAlgorithm): def __init__(self,n_inputs,n_hidden_per_layer, learning_rate=0.1, max_n_epochs=100, L1_regularizer=0, init_range=1., corruption_formula = hiding_corruption_formula(), autoencoder = squash_affine_autoencoder_formula(), minibatch_size=None,linker = "c|py"): for name,val in locals().items(): if val is not self: self.__setattribute__(name,val) self.denoising_autoencoder_formula = corruption_formula + autoencoder.rename(x='corrupted_x') def __call__(self, training_set=None): """ Allocate and optionnaly train a model @TODO enables passing in training and valid sets, instead of cutting one set in 80/20 """ model = DenoisingAutoEncoderModel(self) if training_set: print 'DenoisingAutoEncoder(): what do I do if training_set????' # copied from old mlp_factory_approach: if len(trainset) == sys.maxint: raise NotImplementedError('Learning from infinite streams is not supported') nval = int(self.validation_portion * len(trainset)) nmin = len(trainset) - nval assert nmin >= 0 minset = trainset[:nmin] #real training set for minimizing loss valset = trainset[nmin:] #validation set for early stopping best = model for stp in self.early_stopper(): model.update( minset.minibatches([input, target], minibatch_size=min(32, len(trainset)))) #print 'mlp.__call__(), we did an update' if stp.set_score: stp.score = model(valset, ['loss_01']) if (stp.score < stp.best_score): best = copy.copy(model) model = best # end of the copy from mlp_factory_approach return model def compile(self, inputs, outputs): return theano.function(inputs,outputs,unpack_single=False,linker=self.linker) class DenoisingAutoEncoderModel(LearnerModel): def __init__(self,learning_algorithm,params): self.learning_algorithm=learning_algorithm self.params=params v = learning_algorithm.v self.update_fn = learning_algorithm.compile(learning_algorithm.denoising_autoencoder_formula.inputs, learning_algorithm.denoising_autoencoder_formula.outputs) def update(self, training_set, train_stats_collector=None): print 'dont update you crazy frog!' # old stuff # self._learning_rate = t.scalar('learning_rate') # this is the symbol # self.L1_regularizer = L1_regularizer # self._L1_regularizer = t.scalar('L1_regularizer') # self._input = t.matrix('input') # n_examples x n_inputs # self._W = t.matrix('W') # self._b = t.row('b') # self._c = t.row('b') # self._regularization_term = self._L1_regularizer * t.sum(t.abs(self._W)) # self._corrupted_input = corruption_process(self._input) # self._hidden = t.tanh(self._b + t.dot(self._input, self._W.T)) # self._reconstruction_activations =self._c+t.dot(self._hidden,self._W) # self._nll,self._output = crossentropy_softmax_1hot(Print("output_activations")(self._output_activations),self._target_vector) # self._output_class = t.argmax(self._output,1) # self._class_error = t.neq(self._output_class,self._target_vector) # self._minibatch_criterion = self._nll + self._regularization_term / t.shape(self._input)[0] # OnlineGradientTLearner.__init__(self) # def attributeNames(self): # return ["parameters","b1","W2","b2","W2", "L2_regularizer","regularization_term"] # def parameterAttributes(self): # return ["b1","W1", "b2", "W2"] # def updateMinibatchInputFields(self): # return ["input","target"] # def updateEndOutputAttributes(self): # return ["regularization_term"] # def lossAttribute(self): # return "minibatch_criterion" # def defaultOutputFields(self, input_fields): # output_fields = ["output", "output_class",] # if "target" in input_fields: # output_fields += ["class_error", "nll"] # return output_fields # def allocate(self,minibatch): # minibatch_n_inputs = minibatch["input"].shape[1] # if not self._n_inputs: # self._n_inputs = minibatch_n_inputs # self.b1 = numpy.zeros((1,self._n_hidden)) # self.b2 = numpy.zeros((1,self._n_outputs)) # self.forget() # elif self._n_inputs!=minibatch_n_inputs: # # if the input changes dimension on the fly, we resize and forget everything # self.forget() # def forget(self): # if self._n_inputs: # r = self._init_range/math.sqrt(self._n_inputs) # self.W1 = numpy.random.uniform(low=-r,high=r, # size=(self._n_hidden,self._n_inputs)) # r = self._init_range/math.sqrt(self._n_hidden) # self.W2 = numpy.random.uniform(low=-r,high=r, # size=(self._n_outputs,self._n_hidden)) # self.b1[:]=0 # self.b2[:]=0 # self._n_epochs=0 # def isLastEpoch(self): # self._n_epochs +=1 # return self._n_epochs>=self._max_n_epochs