Mercurial > pylearn
view kernel_regression.py @ 494:02a331ba833b
merge
author | Joseph Turian <turian@gmail.com> |
---|---|
date | Tue, 28 Oct 2008 11:40:56 -0400 |
parents | 0f8c81b0776d |
children | 05f802184606 |
line wrap: on
line source
""" Implementation of kernel regression: """ from pylearn.learner import OfflineLearningAlgorithm from theano import tensor as T from nnet_ops import prepend_1_to_each_row from theano.scalar import as_scalar from common.autoname import AutoName import theano import numpy # map a N-vector to a 1xN matrix row_vector = theano.elemwise.DimShuffle((False,),['x',0]) # map a N-vector to a Nx1 matrix col_vector = theano.elemwise.DimShuffle((False,),[0,'x']) class KernelRegression(OfflineLearningAlgorithm): """ Implementation of kernel regression: * the data are n (x_t,y_t) pairs and we want to estimate E[y|x] * the predictor computes f(x) = b + \sum_{t=1}^n \alpha_t K(x,x_t) with free parameters b and alpha, training inputs x_t, and kernel function K (gaussian by default). Clearly, each prediction involves O(n) computations. * the learner chooses b and alpha to minimize lambda alpha' G' G alpha + \sum_{t=1}^n (f(x_t)-y_t)^2 where G is the matrix with entries G_ij = K(x_i,x_j). The first (L2 regularization) term is the squared L2 norm of the primal weights w = \sum_t \alpha_t phi(x_t) where phi is the function s.t. K(u,v)=phi(u).phi(v). * this involves solving a linear system with (n+1,n+1) matrix, which is an O(n^3) computation. In addition, that linear system matrix requires O(n^2) memory. So this learning algorithm should be used only for small datasets. * the linear system is (M + lambda I_n) theta = (1, y)' where theta = (b, alpha), I_n is the (n+1)x(n+1) matrix that is the identity except with a 0 at (0,0), M is the matrix with G in the sub-matrix starting at (1,1), 1's in column 0, except for a value of n at (0,0), and sum_i G_{i,j} in the rest of row 0. Note that this is gives an estimate of E[y|x,training_set] that is the same as obtained with a Gaussian process regression. The GP regression would also provide a Bayesian Var[y|x,training_set]. It corresponds to an assumption that f is a random variable with Gaussian (process) prior distribution with covariance function K. Because we assume Gaussian noise we obtain a Gaussian posterior for f (whose mean is computed here). Usage: kernel_regressor=KernelRegression(L2_regularizer=0.1,gamma=0.5) (kernel=GaussianKernel(gamma=0.5)) kernel_predictor=kernel_regressor(training_set) all_results_dataset=kernel_predictor(test_set) # creates a dataset with "output" and "squared_error" field outputs = kernel_predictor.compute_outputs(inputs) # inputs and outputs are numpy arrays outputs, errors = kernel_predictor.compute_outputs_and_errors(inputs,targets) errors = kernel_predictor.compute_errors(inputs,targets) mse = kernel_predictor.compute_mse(inputs,targets) The training_set must have fields "input" and "target". The test_set must have field "input", and needs "target" if we want to compute the squared errors. The predictor parameters are obtained analytically from the training set. Training is only done on a whole training set rather than on minibatches (no online implementation). The dataset fields expected and produced by the learning algorithm and the trained model are the following: - Input and output dataset fields (example-wise quantities): - 'input' (always expected as an input_dataset field) - 'target' (always expected by the learning algorithm, optional for learned model) - 'output' (always produced by learned model) - 'squared_error' (optionally produced by learned model if 'target' is provided) = example-wise squared error """ def __init__(self, kernel=None, L2_regularizer=0, gamma=1, use_bias=False): # THE VERSION WITH BIAS DOES NOT SEEM RIGHT self.kernel = kernel self.L2_regularizer=L2_regularizer self.use_bias=use_bias self.gamma = gamma # until we fix things, the kernel type is fixed, Gaussian self.equations = KernelRegressionEquations() def __call__(self,trainset): n_examples = len(trainset) first_example = trainset[0] n_inputs = first_example['input'].size n_outputs = first_example['target'].size b1=1 if self.use_bias else 0 M = numpy.zeros((n_examples+b1,n_examples+b1)) Y = numpy.zeros((n_examples+b1,n_outputs)) for i in xrange(n_examples): M[i+b1,i+b1]=self.L2_regularizer data = trainset.fields() train_inputs = numpy.array(data['input']) if self.use_bias: Y[0]=1 Y[b1:,:] = numpy.array(data['target']) train_inputs_square,sumG,G=self.equations.compute_system_matrix(train_inputs,self.gamma) M[b1:,b1:] += G if self.use_bias: M[0,1:] = sumG M[1:,0] = 1 M[0,0] = M.shape[0] self.M=M self.Y=Y theta=numpy.linalg.solve(M,Y) return KernelPredictor(theta,self.gamma, train_inputs, train_inputs_square) class KernelPredictorEquations(AutoName): train_inputs = T.matrix() # n_examples x n_inputs train_inputs_square = T.vector() # n_examples inputs = T.matrix() # minibatchsize x n_inputs targets = T.matrix() # minibatchsize x n_outputs theta = T.matrix() # (n_examples+1) x n_outputs b1 = T.shape(train_inputs_square)[0]<T.shape(theta)[0] gamma = T.scalar() inv_gamma2 = 1./(gamma*gamma) b = b1*theta[0] alpha = theta[b1:,:] inputs_square = T.sum(inputs*inputs,axis=1) Kx = T.exp(-(row_vector(train_inputs_square)-2*T.dot(inputs,train_inputs.T)+col_vector(inputs_square))*inv_gamma2) outputs = T.dot(Kx,alpha) + b # minibatchsize x n_outputs squared_errors = T.sum(T.sqr(targets-outputs),axis=1) __compiled = False @classmethod def compile(cls,linker='c|py'): if cls.__compiled: return def fn(input_vars,output_vars): return staticmethod(theano.function(input_vars,output_vars, linker=linker)) cls.compute_outputs = fn([cls.inputs,cls.theta,cls.gamma,cls.train_inputs,cls.train_inputs_square],[cls.outputs]) cls.compute_errors = fn([cls.outputs,cls.targets],[cls.squared_errors]) cls.__compiled = True def __init__(self): self.compile() class KernelRegressionEquations(KernelPredictorEquations): #M = T.matrix() # (n_examples+1) x (n_examples+1) inputs = T.matrix() # n_examples x n_inputs gamma = T.scalar() inv_gamma2 = 1./(gamma*gamma) inputs_square = T.sum(inputs*inputs,axis=1) #new_G = G+T.dot(inputs,inputs.T) #new_G = T.gemm(G,1.,inputs,inputs.T,1.) G = T.exp(-(row_vector(inputs_square)-2*T.dot(inputs,inputs.T)+col_vector(inputs_square))*inv_gamma2) sumG = T.sum(G,axis=0) __compiled = False @classmethod def compile(cls,linker='c|py'): if cls.__compiled: return def fn(input_vars,output_vars): return staticmethod(theano.function(input_vars,output_vars, linker=linker)) cls.compute_system_matrix = fn([cls.inputs,cls.gamma],[cls.inputs_square,cls.sumG,cls.G]) cls.__compiled = True def __init__(self): self.compile() class KernelPredictor(object): """ A kernel predictor has parameters theta (a bias vector and a weight matrix alpha) it can use to make a non-linear prediction (according to the KernelPredictorEquations). It can compute its output (bias + alpha * kernel(train_inputs,input) and a squared error (||output - target||^2). """ def __init__(self, theta, gamma, train_inputs, train_inputs_square=None): self.theta=theta self.gamma=gamma self.train_inputs=train_inputs if train_inputs_square==None: train_inputs_square = numpy.sum(train_inputs*train_inputs,axis=1) self.train_inputs_square=train_inputs_square self.equations = KernelPredictorEquations() def compute_outputs(self,inputs): return self.equations.compute_outputs(inputs,self.theta,self.gamma,self.train_inputs,self.train_inputs_square) def compute_errors(self,inputs,targets): return self.equations.compute_errors(self.compute_outputs(inputs),targets) def compute_outputs_and_errors(self,inputs,targets): outputs = self.compute_outputs(inputs) return [outputs,self.equations.compute_errors(outputs,targets)] def compute_mse(self,inputs,targets): errors = self.compute_errors(inputs,targets) return numpy.sum(errors)/errors.size def __call__(self,dataset,output_fieldnames=None,cached_output_dataset=False): assert dataset.hasFields(["input"]) if output_fieldnames is None: if dataset.hasFields(["target"]): output_fieldnames = ["output","squared_error"] else: output_fieldnames = ["output"] output_fieldnames.sort() if output_fieldnames == ["squared_error"]: f = self.compute_errors elif output_fieldnames == ["output"]: f = self.compute_outputs elif output_fieldnames == ["output","squared_error"]: f = self.compute_outputs_and_errors else: raise ValueError("unknown field(s) in output_fieldnames: "+str(output_fieldnames)) ds=ApplyFunctionDataSet(dataset,f,output_fieldnames) if cached_output_dataset: return CachedDataSet(ds) else: return ds def kernel_predictor(inputs,params,*otherargs): p = KernelPredictor(params,*otherargs[0]) return p.compute_outputs(inputs)