view sandbox/denoising_aa.py @ 500:3c60c2db0319

Added new daa test
author Joseph Turian <turian@gmail.com>
date Tue, 28 Oct 2008 13:36:27 -0400
parents cf22ebfc90eb
children
line wrap: on
line source

"""
A denoising auto-encoder

@warning: You should use this interface. It is not complete and is not functional.
Instead, use::
    ssh://projects@lgcm.iro.umontreal.ca/repos/denoising_aa
"""

import theano
from theano.formula import *
from learner import *
from theano import tensor as t
from nnet_ops import *
import math
from misc import *
from misc_theano import *
from theano.tensor_random import binomial

def hiding_corruption_formula(seed,average_fraction_hidden):
    """
    Return a formula for the corruption process, in which a random
    subset of the input numbers are hidden (mapped to 0). 

    @param seed: seed of the random generator
    @type seed: anything that numpy.random.RandomState accepts
    
    @param average_fraction_hidden: the probability with which each
                                    input number is hidden (set to 0).
    @type average_fraction_hidden: 0 <= real number <= 1
    """
    class HidingCorruptionFormula(Formulas):
        x = t.matrix()
        corrupted_x = x * binomial(seed,x,1,fraction_sampled)

    return HidingCorruptionFormula()

def squash_affine_formula(squash_function=sigmoid):
    """
    Simply does: squash_function(b + xW)
    By convention prefix the parameters by _
    """
    class SquashAffineFormula(Formulas):
        x = t.matrix() # of dimensions minibatch_size x n_inputs
        _b = t.row() # of dimensions 1 x n_outputs
        _W = t.matrix() # of dimensions n_inputs x n_outputs
        a = _b + t.dot(x,_W) # of dimensions minibatch_size x n_outputs
        y = squash_function(a)
    return SquashAffineFormula()

def gradient_descent_update_formula():
    class GradientDescentUpdateFormula(Formula):
        param = t.matrix()
        learning_rate = t.scalar()
        cost = t.column() # cost of each example in a minibatch
        param_update = t.add_inplace(param, -learning_rate*t.sgrad(cost))
    return gradient_descent_update_formula()
    
def probabilistic_classifier_loss_formula():
    class ProbabilisticClassifierLossFormula(Formulas):
        a = t.matrix() # of dimensions minibatch_size x n_classes, pre-softmax output
        target_class = t.ivector() # dimension (minibatch_size)
        nll, probability_predictions = crossentropy_softmax_1hot(a, target_class) # defined in nnet_ops.py
    return ProbabilisticClassifierLossFormula()

def binomial_cross_entropy_formula():
    class BinomialCrossEntropyFormula(Formulas):
        a = t.matrix() # pre-sigmoid activations, minibatch_size x dim
        p = sigmoid(a) # model prediction
        q = t.matrix() # target binomial probabilities, minibatch_size x dim
        # using the identity softplus(a) - softplus(-a) = a,
        # we obtain that q log(p) + (1-q) log(1-p) = q a - softplus(a)
        nll = -t.sum(q*a - softplus(-a))
    # next line was missing... hope it's all correct above
    return BinomialCrossEntropyFormula()

def squash_affine_autoencoder_formula(hidden_squash=t.tanh,
                                      reconstruction_squash=sigmoid,
                                      share_weights=True,
                                      reconstruction_nll_formula=binomial_cross_entropy_formula(),
                                      update_formula=gradient_descent_update_formula):
    if share_weights:
        autoencoder = squash_affine_formula(hidden_squash).rename(a='code_a') + \
                      squash_affine_formula(reconstruction_squash).rename(x='hidden',y='reconstruction',_b='_c') + \
                      reconstruction_nll_formula
    else:
        autoencoder = squash_affine_formula(hidden_squash).rename(a='code_a',_W='_W1') + \
                      squash_affine_formula(reconstruction_squash).rename(x='hidden',y='reconstruction',_b='_c',_W='_W2') + \
                      reconstruction_nll_formula
    autoencoder = autoencoder + [update_formula().rename(cost = 'nll',
                                                         param = p)
                                 for p in autoencoder.get_all('_.*')]
    return autoencoder

    
# @todo: try other corruption formulae. The above is the default one.
# not quite used in the ICML paper... (had a fixed number of 0s).

class DenoisingAutoEncoder(LearningAlgorithm):
    
    def __init__(self,n_inputs,n_hidden_per_layer,
                 learning_rate=0.1,
                 max_n_epochs=100,
                 L1_regularizer=0,
                 init_range=1.,
                 corruption_formula = hiding_corruption_formula(),
                 autoencoder = squash_affine_autoencoder_formula(),
                 minibatch_size=None,linker = "c|py"):
        for name,val in locals().items():
            if val is not self: self.__setattribute__(name,val)
        self.denoising_autoencoder_formula = corruption_formula + autoencoder.rename(x='corrupted_x')
        
    def __call__(self, training_set=None):
        """ Allocate and optionnaly train a model

        @TODO enables passing in training and valid sets, instead of cutting one set in 80/20
        """
        model = DenoisingAutoEncoderModel(self)
        if training_set:
            print 'DenoisingAutoEncoder(): what do I do if training_set????'
            # copied from old mlp_factory_approach:
            if len(trainset) == sys.maxint:
                raise NotImplementedError('Learning from infinite streams is not supported')
            nval = int(self.validation_portion * len(trainset))
            nmin = len(trainset) - nval
            assert nmin >= 0
            minset = trainset[:nmin] #real training set for minimizing loss
            valset = trainset[nmin:] #validation set for early stopping
            best = model
            for stp in self.early_stopper():
                model.update(
                    minset.minibatches([input, target], minibatch_size=min(32,
                        len(trainset))))
                #print 'mlp.__call__(), we did an update'
                if stp.set_score:
                    stp.score = model(valset, ['loss_01'])
                    if (stp.score < stp.best_score):
                        best = copy.copy(model)
            model = best
            # end of the copy from mlp_factory_approach
 
        return model

            
    def compile(self, inputs, outputs):
        return theano.function(inputs,outputs,unpack_single=False,linker=self.linker)
    
class DenoisingAutoEncoderModel(LearnerModel):
    def __init__(self,learning_algorithm,params):
        self.learning_algorithm=learning_algorithm
        self.params=params
        v = learning_algorithm.v
        self.update_fn = learning_algorithm.compile(learning_algorithm.denoising_autoencoder_formula.inputs,
                                                    learning_algorithm.denoising_autoencoder_formula.outputs)

    def update(self, training_set, train_stats_collector=None):
        
        print 'dont update you crazy frog!'

# old stuff

#         self._learning_rate = t.scalar('learning_rate') # this is the symbol
#         self.L1_regularizer = L1_regularizer
#         self._L1_regularizer = t.scalar('L1_regularizer')
#         self._input = t.matrix('input') # n_examples x n_inputs
#         self._W = t.matrix('W')
#         self._b = t.row('b')
#         self._c = t.row('b')
#         self._regularization_term = self._L1_regularizer * t.sum(t.abs(self._W))
#         self._corrupted_input = corruption_process(self._input)
#         self._hidden = t.tanh(self._b + t.dot(self._input, self._W.T))
#         self._reconstruction_activations =self._c+t.dot(self._hidden,self._W)
#         self._nll,self._output = crossentropy_softmax_1hot(Print("output_activations")(self._output_activations),self._target_vector)
#         self._output_class = t.argmax(self._output,1)
#         self._class_error = t.neq(self._output_class,self._target_vector)
#         self._minibatch_criterion = self._nll + self._regularization_term / t.shape(self._input)[0]
#         OnlineGradientTLearner.__init__(self)
            
#     def attributeNames(self):
#         return ["parameters","b1","W2","b2","W2", "L2_regularizer","regularization_term"]

#     def parameterAttributes(self):
#         return ["b1","W1", "b2", "W2"]
    
#     def updateMinibatchInputFields(self):
#         return ["input","target"]
    
#     def updateEndOutputAttributes(self):
#         return ["regularization_term"]

#     def lossAttribute(self):
#         return "minibatch_criterion"
    
#     def defaultOutputFields(self, input_fields):
#         output_fields = ["output", "output_class",]
#         if "target" in input_fields:
#             output_fields += ["class_error", "nll"]
#         return output_fields
        
#     def allocate(self,minibatch):
#         minibatch_n_inputs  = minibatch["input"].shape[1]
#         if not self._n_inputs:
#             self._n_inputs = minibatch_n_inputs
#             self.b1 = numpy.zeros((1,self._n_hidden))
#             self.b2 = numpy.zeros((1,self._n_outputs))
#             self.forget()
#         elif self._n_inputs!=minibatch_n_inputs:
#             # if the input changes dimension on the fly, we resize and forget everything
#             self.forget()
            
#     def forget(self):
#         if self._n_inputs:
#             r = self._init_range/math.sqrt(self._n_inputs)
#             self.W1 = numpy.random.uniform(low=-r,high=r,
#                                            size=(self._n_hidden,self._n_inputs))
#             r = self._init_range/math.sqrt(self._n_hidden)
#             self.W2 = numpy.random.uniform(low=-r,high=r,
#                                            size=(self._n_outputs,self._n_hidden))
#             self.b1[:]=0
#             self.b2[:]=0
#             self._n_epochs=0

#     def isLastEpoch(self):
#         self._n_epochs +=1
#         return self._n_epochs>=self._max_n_epochs