view pylearn/algorithms/kernel_regression.py @ 1504:bf5c0f797161

Fix test.
author Frederic Bastien <nouiz@nouiz.org>
date Mon, 12 Sep 2011 10:48:33 -0400
parents 9b371879c6ab
children 723e2d761985
line wrap: on
line source

"""
Implementation of kernel regression:
"""

#from pylearn.old_dataset.learner import OfflineLearningAlgorithm
from theano import tensor as T
from theano.tensor.nnet import prepend_1_to_each_row
from theano.scalar import as_scalar
from common.autoname import AutoName
import theano
import numpy

# map a N-vector to a 1xN matrix
row_vector = theano.tensor.DimShuffle((False,),['x',0])
# map a N-vector to a Nx1 matrix
col_vector = theano.tensor.DimShuffle((False,),[0,'x'])

class KernelRegression():#OfflineLearningAlgorithm):
    """
Implementation of kernel regression:
* the data are n (x_t,y_t) pairs and we want to estimate E[y|x]
* the predictor computes
     f(x) = b + \sum_{t=1}^n \alpha_t K(x,x_t)
  with free parameters b and alpha, training inputs x_t,
  and kernel function K (gaussian by default).
  Clearly, each prediction involves O(n) computations.
* the learner chooses b and alpha to minimize
     lambda alpha' G' G alpha + \sum_{t=1}^n (f(x_t)-y_t)^2
  where G is the matrix with entries G_ij = K(x_i,x_j).
  The first (L2 regularization) term is the squared L2
  norm of the primal weights w = \sum_t \alpha_t phi(x_t)
  where phi is the function s.t. K(u,v)=phi(u).phi(v).
* this involves solving a linear system with (n+1,n+1)
  matrix, which is an O(n^3) computation. In addition,
  that linear system matrix requires O(n^2) memory.
  So this learning algorithm should be used only for
  small datasets.
* the linear system is
      (M + lambda I_n) theta = (1, y)'
  where theta = (b, alpha), I_n is the (n+1)x(n+1) matrix that is the identity 
  except with a 0 at (0,0), M is the matrix with G in the sub-matrix starting 
  at (1,1), 1's in column 0, except for a value of n at (0,0), and sum_i G_{i,j} 
  in the rest of row 0.
  
Note that this is gives an estimate of E[y|x,training_set] that is the
same as obtained with a Gaussian process regression. The GP
regression would also provide a Bayesian Var[y|x,training_set].
It corresponds to an assumption that f is a random variable
with Gaussian (process) prior distribution with covariance
function K. Because we assume Gaussian noise we obtain a Gaussian
posterior for f (whose mean is computed here).


    Usage:

       kernel_regressor=KernelRegression(L2_regularizer=0.1,gamma=0.5) (kernel=GaussianKernel(gamma=0.5))
       kernel_predictor=kernel_regressor(training_set)
       all_results_dataset=kernel_predictor(test_set) # creates a dataset with "output" and "squared_error" field
       outputs = kernel_predictor.compute_outputs(inputs) # inputs and outputs are numpy arrays
       outputs, errors = kernel_predictor.compute_outputs_and_errors(inputs,targets)
       errors = kernel_predictor.compute_errors(inputs,targets)
       mse = kernel_predictor.compute_mse(inputs,targets)
       
       

    The training_set must have fields "input" and "target".
    The test_set must have field "input", and needs "target" if
    we want to compute the squared errors.

    The predictor parameters are obtained analytically from the training set.
    Training is only done on a whole training set rather than on minibatches
    (no online implementation).

    The dataset fields expected and produced by the learning algorithm and the trained model
    are the following:

     - Input and output dataset fields (example-wise quantities):

       - 'input' (always expected as an input_dataset field)
       - 'target' (always expected by the learning algorithm, optional for learned model)
       - 'output' (always produced by learned model)
       - 'squared_error' (optionally produced by learned model if 'target' is provided)
          = example-wise squared error
    """
    def __init__(self, kernel=None, L2_regularizer=0, gamma=1, use_bias=False):
        # THE VERSION WITH BIAS DOES NOT SEEM RIGHT
        self.kernel = kernel
        self.L2_regularizer=L2_regularizer
        self.use_bias=use_bias
        self.gamma = gamma # until we fix things, the kernel type is fixed, Gaussian
        self.equations = KernelRegressionEquations()

    def __call__(self,trainset):
        n_examples = len(trainset)
        first_example = trainset[0]
        n_inputs = first_example['input'].size
        n_outputs = first_example['target'].size
        b1=1 if self.use_bias else 0
        M = numpy.zeros((n_examples+b1,n_examples+b1))
        Y = numpy.zeros((n_examples+b1,n_outputs))
        for i in xrange(n_examples):
            M[i+b1,i+b1]=self.L2_regularizer
        data = trainset.fields()
        train_inputs = numpy.array(data['input'])
        if self.use_bias:
            Y[0]=1
        Y[b1:,:] = numpy.array(data['target'])
        train_inputs_square,sumG,G=self.equations.compute_system_matrix(train_inputs,self.gamma)
        M[b1:,b1:] += G
        if self.use_bias:
            M[0,1:] = sumG
            M[1:,0] = 1
            M[0,0] = M.shape[0]
        self.M=M
        self.Y=Y
        theta=numpy.linalg.solve(M,Y)
        return KernelPredictor(theta,self.gamma, train_inputs, train_inputs_square)

class KernelPredictorEquations(AutoName):
    train_inputs = T.matrix() # n_examples x n_inputs
    train_inputs_square = T.vector() # n_examples
    inputs = T.matrix() # minibatchsize x n_inputs
    targets = T.matrix() # minibatchsize x n_outputs
    theta = T.matrix() # (n_examples+1) x n_outputs
    b1 = T.shape(train_inputs_square)[0]<T.shape(theta)[0]
    gamma = T.scalar()
    inv_gamma2 = 1./(gamma*gamma)
    b = b1*theta[0]
    alpha = theta[b1:,:]
    inputs_square = T.sum(inputs*inputs,axis=1)
    Kx = T.exp(-(row_vector(train_inputs_square)-2*T.dot(inputs,train_inputs.T)+col_vector(inputs_square))*inv_gamma2)
    outputs = T.dot(Kx,alpha) + b # minibatchsize x n_outputs
    squared_errors = T.sum(T.sqr(targets-outputs),axis=1)

    __compiled = False
    @classmethod
    def compile(cls,linker='c|py'):
        if cls.__compiled:
            return
        def fn(input_vars,output_vars):
            return staticmethod(theano.function(input_vars,output_vars, linker=linker))

        cls.compute_outputs = fn([cls.inputs,cls.theta,cls.gamma,cls.train_inputs,cls.train_inputs_square],[cls.outputs])
        cls.compute_errors = fn([cls.outputs,cls.targets],[cls.squared_errors])

        cls.__compiled = True

    def __init__(self):
        self.compile()
        
class KernelRegressionEquations(KernelPredictorEquations):
    #M = T.matrix() # (n_examples+1) x (n_examples+1)
    inputs = T.matrix() # n_examples x n_inputs
    gamma = T.scalar()
    inv_gamma2 = 1./(gamma*gamma)
    inputs_square = T.sum(inputs*inputs,axis=1)
    #new_G = G+T.dot(inputs,inputs.T)
    #new_G = T.gemm(G,1.,inputs,inputs.T,1.)
    G = T.exp(-(row_vector(inputs_square)-2*T.dot(inputs,inputs.T)+col_vector(inputs_square))*inv_gamma2)
    sumG = T.sum(G,axis=0)
    
    __compiled = False
    
    @classmethod
    def compile(cls,linker='c|py'):
        if cls.__compiled:
            return
        def fn(input_vars,output_vars):
            return staticmethod(theano.function(input_vars,output_vars, linker=linker))

        cls.compute_system_matrix = fn([cls.inputs,cls.gamma],[cls.inputs_square,cls.sumG,cls.G])

        cls.__compiled = True

    def __init__(self):
        self.compile()

class KernelPredictor(object):
    """
    A kernel predictor has parameters theta (a bias vector and a weight matrix alpha)
    it can use to make a non-linear prediction (according to the KernelPredictorEquations).
    It can compute its output (bias + alpha * kernel(train_inputs,input) and a squared error (||output - target||^2).
    """
    def __init__(self, theta, gamma, train_inputs, train_inputs_square=None):
        self.theta=theta
        self.gamma=gamma
        self.train_inputs=train_inputs
        if train_inputs_square==None:
            train_inputs_square = numpy.sum(train_inputs*train_inputs,axis=1)
        self.train_inputs_square=train_inputs_square
        self.equations = KernelPredictorEquations()

    def compute_outputs(self,inputs):
        return self.equations.compute_outputs(inputs,self.theta,self.gamma,self.train_inputs,self.train_inputs_square)
    def compute_errors(self,inputs,targets):
        return self.equations.compute_errors(self.compute_outputs(inputs),targets)
    def compute_outputs_and_errors(self,inputs,targets):
        outputs = self.compute_outputs(inputs)
        return [outputs,self.equations.compute_errors(outputs,targets)]
    def compute_mse(self,inputs,targets):
        errors = self.compute_errors(inputs,targets)
        return numpy.sum(errors)/errors.size
    
    def __call__(self,dataset,output_fieldnames=None,cached_output_dataset=False):
        assert dataset.hasFields(["input"])
        if output_fieldnames is None:
            if dataset.hasFields(["target"]):
                output_fieldnames = ["output","squared_error"]
            else:
                output_fieldnames = ["output"]
        output_fieldnames.sort()
        if output_fieldnames == ["squared_error"]:
            f = self.compute_errors
        elif output_fieldnames == ["output"]:
            f = self.compute_outputs
        elif output_fieldnames == ["output","squared_error"]:
            f = self.compute_outputs_and_errors
        else:
            raise ValueError("unknown field(s) in output_fieldnames: "+str(output_fieldnames))
        
        ds=ApplyFunctionDataSet(dataset,f,output_fieldnames)
        if cached_output_dataset:
            return CachedDataSet(ds)
        else:
            return ds
        

def kernel_predictor(inputs,params,*otherargs):
  p = KernelPredictor(params,*otherargs[0])
  return p.compute_outputs(inputs)