view denoising_aa.py @ 212:9b57ea8c767f

previous commit was supposed to concern only one file, dataset.py, try to undo my other changes with this commit (nothing was broken though, just useless debugging prints)
author Thierry Bertin-Mahieux <bertinmt@iro.umontreal.ca>
date Wed, 21 May 2008 17:42:20 -0400
parents bd728c83faff
children df3fae88ab46
line wrap: on
line source

"""
A denoising auto-encoder
"""

import theano
from theano.formula import *
from learner import *
from theano import tensor as t
from nnet_ops import *
import math
from misc import *
from theano.tensor_random import binomial

def hiding_corruption_formula(seed,average_fraction_hidden):
    """
    Return a formula for the corruption process, in which a random
    subset of the input numbers are hidden (mapped to 0). 

    @param seed: seed of the random generator
    @type seed: anything that numpy.random.RandomState accepts
    
    @param average_fraction_hidden: the probability with which each
                                    input number is hidden (set to 0).
    @type average_fraction_hidden: 0 <= real number <= 1
    """
    class HidingCorruptionFormula(Formulas):
        x = t.matrix()
        corrupted_x = x * binomial(seed,x,1,fraction_sampled)

    return HidingCorruptionFormula()

def squash_affine_formula(squash_function=sigmoid):
    """
    By convention prefix the parameters by _
    """
    class SquashAffineFormula(Formulas):
        x = t.matrix() # of dimensions minibatch_size x n_inputs
        _b = t.row() # of dimensions 1 x n_outputs
        _W = t.matrix() # of dimensions n_inputs x n_outputs
        a = _b + t.dot(x,_W) # of dimensions minibatch_size x n_outputs
        y = squash_function(a)
    return SquashAffineFormula()

def gradient_descent_update_formula():
    class GradientDescentUpdateFormula(Formula):
        param = t.matrix()
        learning_rate = t.scalar()
        cost = t.column() # cost of each example in a minibatch
        param_update = t.add_inplace(param, -learning_rate*t.sgrad(cost))
    return gradient_descent_update_formula()
    
def probabilistic_classifier_loss_formula():
    class ProbabilisticClassifierLossFormula(Formulas):
        a = t.matrix() # of dimensions minibatch_size x n_classes, pre-softmax output
        target_class = t.ivector() # dimension (minibatch_size)
        nll, probability_predictions = crossentropy_softmax_1hot(a, target_class)
    return ProbabilisticClassifierLossFormula()

def binomial_cross_entropy_formula():
    class BinomialCrossEntropyFormula(Formulas):
        a = t.matrix() # pre-sigmoid activations, minibatch_size x dim
        p = sigmoid(a) # model prediction
        q = t.matrix() # target binomial probabilities, minibatch_size x dim
        # using the identity softplus(a) - softplus(-a) = a,
        # we obtain that q log(p) + (1-q) log(1-p) = q a - softplus(a)
        nll = -t.sum(q*a - softplus(-a))

def squash_affine_autoencoder_formula(hidden_squash=t.tanh,
                                      reconstruction_squash=sigmoid,
                                      share_weights=True,
                                      reconstruction_nll_formula=binomial_cross_entropy_formula(),
                                      update_formula=gradient_descent_update_formula):
    if share_weights:
        autoencoder = squash_affine_formula(hidden_squash).rename(a='code_a') + \
                      squash_affine_formula(reconstruction_squash).rename(x='hidden',y='reconstruction',_b='_c') + \
                      reconstruction_nll_formula
    else:
        autoencoder = squash_affine_formula(hidden_squash).rename(a='code_a',_W='_W1') + \
                      squash_affine_formula(reconstruction_squash).rename(x='hidden',y='reconstruction',_b='_c',_W='_W2') + \
                      reconstruction_nll_formula
    autoencoder = autoencoder + [update_formula().rename(cost = 'nll',
                                                         param = p)
                                 for p in autoencoder.get_all('_.*')]
    return autoencoder

    
# @todo: try other corruption formulae. The above is the default one.
# not quite used in the ICML paper... (had a fixed number of 0s).

class DenoisingAutoEncoder(LearningAlgorithm):
    
    def __init__(self,n_inputs,n_hidden_per_layer,
                 learning_rate=0.1,
                 max_n_epochs=100,
                 L1_regularizer=0,
                 init_range=1.,
                 corruption_formula = hiding_corruption_formula(),
                 autoencoder = squash_affine_autoencoder_formula(),
                 minibatch_size=None,linker = "c|py"):
        for name,val in locals().items():
            if val is not self: self.__setattribute__(name,val)
        self.denoising_autoencoder_formula = corruption_formula + autoencoder.rename(x='corrupted_x')
        
    def __call__(self, training_set=None):
        model = DenoisingAutoEncoderModel(self)
        if training_set:
            print 'what do I do if training set????'
            
    def compile(self, inputs, outputs):
        return theano.function(inputs,outputs,unpack_single=False,linker=self.linker)
    
class DenoisingAutoEncoderModel(LearnerModel):
    def __init__(self,learning_algorithm,params):
        self.learning_algorithm=learning_algorithm
        self.params=params
        v = learning_algorithm.v
        self.update_fn = learning_algorithm.compile(learning_algorithm.denoising_autoencoder_formula.inputs,
                                                    learning_algorithm.denoising_autoencoder_formula.outputs)

    def update(self, training_set, train_stats_collector=None):
        
        print 'dont update you crazy frog!'

# old stuff

#         self._learning_rate = t.scalar('learning_rate') # this is the symbol
#         self.L1_regularizer = L1_regularizer
#         self._L1_regularizer = t.scalar('L1_regularizer')
#         self._input = t.matrix('input') # n_examples x n_inputs
#         self._W = t.matrix('W')
#         self._b = t.row('b')
#         self._c = t.row('b')
#         self._regularization_term = self._L1_regularizer * t.sum(t.abs(self._W))
#         self._corrupted_input = corruption_process(self._input)
#         self._hidden = t.tanh(self._b + t.dot(self._input, self._W.T))
#         self._reconstruction_activations =self._c+t.dot(self._hidden,self._W)
#         self._nll,self._output = crossentropy_softmax_1hot(Print("output_activations")(self._output_activations),self._target_vector)
#         self._output_class = t.argmax(self._output,1)
#         self._class_error = t.neq(self._output_class,self._target_vector)
#         self._minibatch_criterion = self._nll + self._regularization_term / t.shape(self._input)[0]
#         OnlineGradientTLearner.__init__(self)
            
#     def attributeNames(self):
#         return ["parameters","b1","W2","b2","W2", "L2_regularizer","regularization_term"]

#     def parameterAttributes(self):
#         return ["b1","W1", "b2", "W2"]
    
#     def updateMinibatchInputFields(self):
#         return ["input","target"]
    
#     def updateEndOutputAttributes(self):
#         return ["regularization_term"]

#     def lossAttribute(self):
#         return "minibatch_criterion"
    
#     def defaultOutputFields(self, input_fields):
#         output_fields = ["output", "output_class",]
#         if "target" in input_fields:
#             output_fields += ["class_error", "nll"]
#         return output_fields
        
#     def allocate(self,minibatch):
#         minibatch_n_inputs  = minibatch["input"].shape[1]
#         if not self._n_inputs:
#             self._n_inputs = minibatch_n_inputs
#             self.b1 = numpy.zeros((1,self._n_hidden))
#             self.b2 = numpy.zeros((1,self._n_outputs))
#             self.forget()
#         elif self._n_inputs!=minibatch_n_inputs:
#             # if the input changes dimension on the fly, we resize and forget everything
#             self.forget()
            
#     def forget(self):
#         if self._n_inputs:
#             r = self._init_range/math.sqrt(self._n_inputs)
#             self.W1 = numpy.random.uniform(low=-r,high=r,
#                                            size=(self._n_hidden,self._n_inputs))
#             r = self._init_range/math.sqrt(self._n_hidden)
#             self.W2 = numpy.random.uniform(low=-r,high=r,
#                                            size=(self._n_outputs,self._n_hidden))
#             self.b1[:]=0
#             self.b2[:]=0
#             self._n_epochs=0

#     def isLastEpoch(self):
#         self._n_epochs +=1
#         return self._n_epochs>=self._max_n_epochs