Mercurial > pylearn
view doc/v2_planning/plugin_PL.py @ 1305:b60a9b6eee68
API_coding_style: Added point mentioned during meeting
author | Olivier Delalleau <delallea@iro> |
---|---|
date | Fri, 01 Oct 2010 15:27:03 -0400 |
parents | 826d78f0135f |
children |
line wrap: on
line source
class RBM(Model): ''' Restricted Boltzmann Machine. ''' def __init__(self, n_visible, n_hidden, visible = None, name = None): if name is None: self.__name__ = self.__class__.__name__ else: self.__name__ = name self.n_visible = n_visible self.n_hidden = n_hidden if self.visible is None: self.visible = theano.tensor.matrix([name='.'.join(self.__name__, 'visible'])) else: self.visible = visible self.W = theano.shared( numpy.zeros((n_visible, n_hidden), dtype=theano.config.floatX), name=self.__name__ + '.W') self.b_hid = theano.shared( numpy.zeros((n_hidden,), dtype=theano.config.floatX), name=self.__name__ + '.b_hid') self.b_vis = theano.shared( numpy.zeros((n_hidden,), dtype=theano.config.floatX), name=self.__name__ + '.b_vis') self.inputs = [self.visible] self.targets = [] self.parameters = [self.W, self.b_hid, self.b_vis] self.outputs = ... self.cost = None self.gradients = [...] class LogisticRegression(Model): pass class GradientBasedLearner(Learner): ''' Learner that uses a gradient-base Optimizer to train a Model. ''' def __init__(self, model, optimizer, name = None): self.model = model self.optimizer = optimizer ... self.updates = optimizer.iterative_optimizer( parameters = model.parameters, cost = model.cost) # TODO: not sure of how to interface data set with the function self.train_fn = theano.function(model.inputs+model.targets, model.cost, updates=self.updates) def use_dataset(self, dataset): self.train_set = dataset # The decorator indicates that this function will declare some hooks. # More hooks could be automatically declared, for instance # 'begin_function' and 'end_function' @declare_hooks(['begin_train_iter', 'end_train_iter']) def adapt(self, n_steps=1): for i in xrange(n_steps): self.adapt.hooks.execute( 'begin_train_iter', context = dict(iter=i, total=n_steps, locals=locals())) data = self.train_set.next() self.train_fn(data) self.adapt.hooks.execute( 'end_train_iter', context = dict(iter = i, locals=locals())) class SGD(Optimizer): ''' Stochastic gradient descent with fixed learning rate. ''' def __init__(self, step_size): self.step_size = step_size def iterative_optimizer( parameters, cost=None, gradients=None, stop=None, updates=None, ): if updates is not None: ret = updates else: ret = {} if gradients is None: if cost is None: raise SomeError('SGD needs to be provided either a cost or a gradients list') gradients = theano.tensor.grad(cost, parameters) for p, g in izip(parameters, gradients): if p in updates: raise KeyError('Parameter %s already has an update value (%s)' % (p, g)) ret[p] = p - self.step_size * g # never stop if stop is not None: ret[stop] = False return ret class DBN(Learner): ''' Deep Belief Network. ''' def __init__(self, n_layers, layer_config, n_ft_steps, ft_step_size): # Layers are GradientBasedLearners, with DBN as Model self.layers = [] # Pretraining cumulative schedule self.pt_cumul_schedule = [0] # Build the layers and the fully-connected model self.input = theano.tensor.matrix(name='.'.join([self.__name__, 'input'])) self.output = self.input self.ft_params = [] for i,lconf in enumerate(layer_config): rbm = RBM(visible = self.output, ...) self.output = rbm.hidden_expectation layer = GradientBasedLearner(...) self.layers.append(layer) self.pt_cumul_schedule.append( self.pt_cumul_schedule[-1]+lc.n_pretrain_steps) self.ft_params.extend([rbm.W, rbm.b_hid]) # Build the fine-tunable model self.target = theano.tensor.ivector(name='.'.join([self.__name__, 'target'])) logreg = LogisticRegression(...) self.output = logreg.output self.cost = logreg.nll self.ft_params.extend([logreg.W, logreg.b]) ft_optimizer = SGD(ft_step_size) self.ft_updates = ft_optimizer.iterative_optimizer( parameters = self.ft_params, cost = self.cost) self.ft_fn = theano.function( [self.input, self.target], self.cost, updates = self.ft_updates, name='.'.join([self.__name__, 'ft_fn'])) self.stage = 0 @declare_hooks([ 'begin_pretrain_layer','end_pretrain_layer', 'begin_finetune_iter', 'end_finetune_iter']) def adapt(self, n_steps=1): ''' Each "step" is accomplished by the corresponding Learner (either an RBM, or the global NNet). ''' train_x, train_y = self.dataset n_remaining_steps = n_steps # Unsupervised pre-training for i, layer in ienumerate(self.layers): if (self.pt_cumul_schedule[i] <= self.stage and self.stage < self.pt_cumul_schedule[i+1]): self.adapt.hooks.execute( 'begin_pretrain_layer', context = dict(iter=i, total=len(self.layers), locals=locals())) n_pt_steps = min(n_remaining_steps, self.pt_cumul_schedule[i+1] - self.stage) layer.use_dataset(train_x) layer.adapt(n_steps = n_pt_steps) self.stage += self.n_pt_steps n_remaining_steps -= n_pt_steps self.adapt.hooks.execute( 'end_pretrain_layer', context = dict(iter=i, total=len(self.layers), locals=locals())) # For the next layer, the data needs to be preprocessed train_x = layer.compute_Eh_given_v(train_x) # or just compute_output? # Supervised fine-tuning if n_remaining_steps > 0: sup_data = train_x, train_y for i in xrange(n_remaining_steps): self.adapt.hooks.execute( 'begin_train_iter', context = dict(iter=i, total=n_steps, locals=locals())) data = self.train_set.next() self.ft_fn(data) self.adapt.hooks.execute( 'end_train_iter', context = dict(iter = i, locals=locals())) ## TODO: implement k-fold cross-validation class Hooks: def __init__(self): # The DB consists in a dictionary, # the keys are the hooks' names (as strings), # the values are lists of (function, exec_condition) pairs of functions self.db = {} def declare(self, name): if name in self.db: raise KeyError('Hook "%s" is already declared' % name) self.db[name] = [] def execute(self, name, context): if name not in self.db: raise KeyError('Hook "%s" does not exist', % name) #TODO: add contextual information to context, like current time, time of last call,... for fn, cond in self.db[name]: if cond(**context): fn(**context) def register(self, name, function, exec_condition): if name not in self.db: raise KeyError('Hook "%s" does not exist', % name) self.db[name].append((function, exec_condition)) #TODO: add __getattr__ to have more intuitive access to the hooks # Hook declaration mechanism def declare_hooks(hooks_list): def deco(f): f.hooks = Hooks() for hook_name in hooks_list: f.hooks.declare(hook_name) return deco # Conditions def always(): return lambda *args, **kwargs: True def main(): train_data = MNIST.gettrain() test_data = MNIST.gettest() train_x, train_y = train_data preprocessor = PCA(ndim = 80) preprocessor.train(train_x) preprocessed_x = preprocessor.compute_output(train_x) ## for more robustess, we can have something like: # peprocessed_x = ProcessDataSet(orig_data = train_x, function = preprocessor.compute_output) x = matrix() dbn = DBN( input = x, n_layers = 3, layer_config = [dict(n_hidden = 500, n_unsup_steps=1000)] * 3 ) dbn.layers[0].adapt.hooks.register( 'begin_train_iter', function = ..., exec_cond = always() ) dbn.layers[0].adapt.hooks.register( 'end_train_iter', function = ..., exec_cond = lambda iter, **kwargs: iter%20==0 ) if __name__ == '__main__': main()