# HG changeset patch # User fsavard # Date 1267461959 18000 # Node ID c03692aa6158ae62d587f515832033087c8160ef # Parent d364a130b221ae2b518417afde80807ccc4b6c07# Parent 81f8466dc12110fe0866c7a0a61cdba2f58889f6 Merge diff -r 81f8466dc121 -r c03692aa6158 deep/__init__.py diff -r 81f8466dc121 -r c03692aa6158 deep/stacked_dae/nist_sda.py --- a/deep/stacked_dae/nist_sda.py Sat Feb 27 19:26:26 2010 -0500 +++ b/deep/stacked_dae/nist_sda.py Mon Mar 01 11:45:59 2010 -0500 @@ -1,71 +1,86 @@ #!/usr/bin/python # coding: utf-8 +import ift6266 +import pylearn + import numpy import theano import time + +import pylearn.version import theano.tensor as T from theano.tensor.shared_randomstreams import RandomStreams + import copy - import sys +import os import os.path -from sgd_optimization import SdaSgdOptimizer - from jobman import DD import jobman, jobman.sql from pylearn.io import filetensor from utils import produit_croise_jobs +from sgd_optimization import SdaSgdOptimizer + +SERIES_AVAILABLE = False +try: + from scalar_series import * + SERIES_AVAILABLE = True +except ImportError: + print "Could not import Series" + TEST_CONFIG = False NIST_ALL_LOCATION = '/data/lisa/data/nist/by_class/all' -JOBDB = 'postgres://ift6266h10@gershwin/ift6266h10_db/' +JOBDB = 'postgres://ift6266h10@gershwin/ift6266h10_db/fsavard_sda2' + REDUCE_TRAIN_TO = None MAX_FINETUNING_EPOCHS = 1000 +REDUCE_EVERY = 1000 # number of minibatches before taking means for valid error etc. if TEST_CONFIG: - JOBDB = 'postgres://ift6266h10@gershwin/ift6266h10_sandbox_db/' REDUCE_TRAIN_TO = 1000 MAX_FINETUNING_EPOCHS = 2 - -JOBDB_JOBS = JOBDB + 'fsavard_sda1_jobs' -JOBDB_RESULTS = JOBDB + 'fsavard_sda1_results' -EXPERIMENT_PATH = "ift6266.scripts.stacked_dae.nist_sda.jobman_entrypoint" + REDUCE_EVERY = 10 -# There used to be -# 'finetuning_lr': [0.00001, 0.0001, 0.001, 0.01, 0.1] -# and -# 'num_hidden_layers':[1,2,3] -# but this is now handled by a special mechanism in SgdOptimizer -# to reuse intermediate results (for the same training of lower layers, -# we can test many finetuning_lr) -JOB_VALS = {'pretraining_lr': [0.1, 0.01, 0.001],#, 0.0001], +EXPERIMENT_PATH = "ift6266.deep.stacked_dae.nist_sda.jobman_entrypoint" + +JOB_VALS = {'pretraining_lr': [0.1, 0.01],#, 0.001],#, 0.0001], 'pretraining_epochs_per_layer': [10,20], 'hidden_layers_sizes': [300,800], - 'corruption_levels': [0.1,0.2], + 'corruption_levels': [0.1,0.2,0.3], 'minibatch_size': [20], - 'max_finetuning_epochs':[MAX_FINETUNING_EPOCHS]} -FINETUNING_LR_VALS = [0.1, 0.01, 0.001]#, 0.0001] -NUM_HIDDEN_LAYERS_VALS = [1,2,3] + 'max_finetuning_epochs':[MAX_FINETUNING_EPOCHS], + 'finetuning_lr':[0.1, 0.01], #0.001 was very bad, so we leave it out + 'num_hidden_layers':[2,3]} # Just useful for tests... minimal number of epochs DEFAULT_HP_NIST = DD({'finetuning_lr':0.01, 'pretraining_lr':0.01, 'pretraining_epochs_per_layer':1, 'max_finetuning_epochs':1, - 'hidden_layers_sizes':[1000], - 'corruption_levels':[0.2], - 'minibatch_size':20}) + 'hidden_layers_sizes':1000, + 'corruption_levels':0.2, + 'minibatch_size':20, + 'reduce_train_to':1000, + 'num_hidden_layers':1}) def jobman_entrypoint(state, channel): - state = copy.copy(state) + pylearn.version.record_versions(state,[theano,ift6266,pylearn]) + channel.save() + + workingdir = os.getcwd() print "Will load NIST" + sys.stdout.flush() + nist = NIST(20) + print "NIST loaded" + sys.stdout.flush() rtt = None if state.has_key('reduce_train_to'): @@ -83,50 +98,58 @@ n_ins = 32*32 n_outs = 62 # 10 digits, 26*2 (lower, capitals) - db = jobman.sql.db(JOBDB_RESULTS) - optimizer = SdaSgdOptimizer(dataset, state, n_ins, n_outs,\ - input_divider=255.0, job_tree=True, results_db=db, \ - experiment=EXPERIMENT_PATH, \ - finetuning_lr_to_try=FINETUNING_LR_VALS, \ - num_hidden_layers_to_try=NUM_HIDDEN_LAYERS_VALS) - optimizer.train() + hls = state.hidden_layers_sizes + cl = state.corruption_levels + nhl = state.num_hidden_layers + state.hidden_layers_sizes = [hls] * nhl + state.corruption_levels = [cl] * nhl + + # b,b',W for each hidden layer + b,W of last layer (logreg) + numparams = nhl * 3 + 2 + series_mux = None + if SERIES_AVAILABLE: + series_mux = create_series(workingdir, numparams) + + optimizer = SdaSgdOptimizer(dataset=dataset, hyperparameters=state, \ + n_ins=n_ins, n_outs=n_outs,\ + input_divider=255.0, series_mux=series_mux) + + optimizer.pretrain() + channel.save() + + optimizer.finetune() + channel.save() + + pylearn.version.record_versions(state,[theano,ift6266,pylearn]) + channel.save() return channel.COMPLETE -def estimate_pretraining_time(job): - job = DD(job) - # time spent on pretraining estimated as O(n^2) where n=num hidens - # no need to multiply by num_hidden_layers, as results from num=1 - # is reused for num=2, or 3, so in the end we get the same time - # as if we were training 3 times a single layer - # constants: - # - 20 mins to pretrain a layer with 1000 units (per 1 epoch) - # - 12 mins to finetune (per 1 epoch) - # basically the job_tree trick gives us a 5 times speedup on the - # pretraining time due to reusing for finetuning_lr - # and gives us a second x2 speedup for reusing previous layers - # to explore num_hidden_layers - return (job.pretraining_epochs_per_layer * 20 / (1000.0*1000) \ - * job.hidden_layer_sizes * job.hidden_layer_sizes) +def create_series(basedir, numparams): + mux = SeriesMultiplexer() + + # comment out series we don't want to save + mux.add_series(AccumulatorSeries(name="reconstruction_error", + reduce_every=REDUCE_EVERY, # every 1000 batches, we take the mean and save + mean=True, + directory=basedir, flush_every=1)) -def estimate_total_time(): - jobs = produit_croise_jobs(JOB_VALS) - sumtime = 0.0 - sum_without = 0.0 - for job in jobs: - sumtime += estimate_pretraining_time(job) - # 12 mins per epoch * 30 epochs - # 5 finetuning_lr per pretraining combination - sum_without = (12*20*len(jobs) + sumtime*2) * len(FINETUNING_LR_VALS) - sumtime += len(FINETUNING_LR_VALS) * len(jobs) * 12 * 20 - print "num jobs=", len(jobs) - print "estimate", sumtime/60, " hours" - print "estimate without tree optimization", sum_without/60, "ratio", sumtime / sum_without + mux.add_series(AccumulatorSeries(name="training_error", + reduce_every=REDUCE_EVERY, # every 1000 batches, we take the mean and save + mean=True, + directory=basedir, flush_every=1)) + + mux.add_series(BaseSeries(name="validation_error", directory=basedir, flush_every=1)) + mux.add_series(BaseSeries(name="test_error", directory=basedir, flush_every=1)) + + mux.add_series(ParamsArrayStats(numparams,name="params",directory=basedir)) + + return mux def jobman_insert_nist(): jobs = produit_croise_jobs(JOB_VALS) - db = jobman.sql.db(JOBDB_JOBS) + db = jobman.sql.db(JOBDB) for job in jobs: job.update({jobman.sql.EXPERIMENT: EXPERIMENT_PATH}) jobman.sql.insert_dict(job, db) @@ -250,13 +273,11 @@ elif len(args) > 0 and args[0] == 'jobman_insert': jobman_insert_nist() - elif len(args) > 0 and args[0] == 'test_job_tree': - # dont forget to comment out sql.inserts and make reduce_train_to=100 - print "TESTING JOB TREE" - chanmock = {'COMPLETE':0} - hp = copy.copy(DEFAULT_HP_NIST) - hp.update({'reduce_train_to':100}) - jobman_entrypoint(hp, chanmock) + + elif len(args) > 0 and args[0] == 'test_jobman_entrypoint': + chanmock = DD({'COMPLETE':0}) + jobman_entrypoint(DEFAULT_HP_NIST, chanmock) + elif len(args) > 0 and args[0] == 'estimate': estimate_total_time() else: diff -r 81f8466dc121 -r c03692aa6158 deep/stacked_dae/sgd_optimization.py --- a/deep/stacked_dae/sgd_optimization.py Sat Feb 27 19:26:26 2010 -0500 +++ b/deep/stacked_dae/sgd_optimization.py Mon Mar 01 11:45:59 2010 -0500 @@ -7,7 +7,6 @@ import theano import time import theano.tensor as T -import copy import sys from jobman import DD @@ -24,44 +23,34 @@ shared_y = theano.shared(data_y) return shared_x, shared_y +class DummyMux(): + def append(self, param1, param2): + pass + class SdaSgdOptimizer: - def __init__(self, dataset, hyperparameters, n_ins, n_outs, input_divider=1.0,\ - job_tree=False, results_db=None,\ - experiment="",\ - num_hidden_layers_to_try=[1,2,3], \ - finetuning_lr_to_try=[0.1, 0.01, 0.001, 0.0001, 0.00001]): - + def __init__(self, dataset, hyperparameters, n_ins, n_outs, input_divider=1.0, series_mux=None): self.dataset = dataset - self.hp = copy.copy(hyperparameters) + self.hp = hyperparameters self.n_ins = n_ins self.n_outs = n_outs - self.input_divider = numpy.asarray(input_divider, dtype=theano.config.floatX) - - self.job_tree = job_tree - self.results_db = results_db - self.experiment = experiment - if self.job_tree: - assert(not results_db is None) - # these hp should not be there, so we insert default values - # we use 3 hidden layers as we'll iterate through 1,2,3 - self.hp.finetuning_lr = 0.1 # dummy value, will be replaced anyway - cl = self.hp.corruption_levels - nh = self.hp.hidden_layers_sizes - self.hp.corruption_levels = [cl,cl,cl] - self.hp.hidden_layers_sizes = [nh,nh,nh] - - self.num_hidden_layers_to_try = num_hidden_layers_to_try - self.finetuning_lr_to_try = finetuning_lr_to_try - - self.printout_frequency = 1000 + self.input_divider = input_divider + + if not series_mux: + series_mux = DummyMux() + print "No series multiplexer set" + self.series_mux = series_mux self.rng = numpy.random.RandomState(1234) self.init_datasets() self.init_classifier() + + sys.stdout.flush() def init_datasets(self): print "init_datasets" + sys.stdout.flush() + train_set, valid_set, test_set = self.dataset self.test_set_x, self.test_set_y = shared_dataset(test_set) self.valid_set_x, self.valid_set_y = shared_dataset(valid_set) @@ -74,6 +63,7 @@ def init_classifier(self): print "Constructing classifier" + # construct the stacked denoising autoencoder class self.classifier = SdA( \ train_set_x= self.train_set_x, \ @@ -88,17 +78,15 @@ finetune_lr = self.hp.finetuning_lr,\ input_divider = self.input_divider ) + sys.stdout.flush() + def train(self): self.pretrain() - if not self.job_tree: - # if job_tree is True, finetuning was already performed - self.finetune() + self.finetune() def pretrain(self): print "STARTING PRETRAINING" - - printout_acc = 0.0 - last_error = 0.0 + sys.stdout.flush() start_time = time.clock() ## Pre-train layer-wise @@ -109,62 +97,17 @@ for batch_index in xrange(self.n_train_batches): c = self.classifier.pretrain_functions[i](batch_index) - printout_acc += c / self.printout_frequency - if (batch_index+1) % self.printout_frequency == 0: - print batch_index, "reconstruction cost avg=", printout_acc - last_error = printout_acc - printout_acc = 0.0 + self.series_mux.append("reconstruction_error", c) print 'Pre-training layer %i, epoch %d, cost '%(i,epoch),c - - self.job_splitter(i+1, time.clock()-start_time, last_error) + sys.stdout.flush() end_time = time.clock() print ('Pretraining took %f minutes' %((end_time-start_time)/60.)) - - # Save time by reusing intermediate results - def job_splitter(self, current_pretraining_layer, pretraining_time, last_error): - - state_copy = None - original_classifier = None - - if self.job_tree and current_pretraining_layer in self.num_hidden_layers_to_try: - for lr in self.finetuning_lr_to_try: - sys.stdout.flush() - sys.stderr.flush() - - state_copy = copy.copy(self.hp) - - self.hp.update({'num_hidden_layers':current_pretraining_layer, \ - 'finetuning_lr':lr,\ - 'pretraining_time':pretraining_time,\ - 'last_reconstruction_error':last_error}) + self.hp.update({'pretraining_time': end_time-start_time}) - original_classifier = self.classifier - print "ORIGINAL CLASSIFIER MEANS",original_classifier.get_params_means() - self.classifier = SdA.copy_reusing_lower_layers(original_classifier, current_pretraining_layer, new_finetuning_lr=lr) - - self.finetune() - - self.insert_finished_job() - - print "NEW CLASSIFIER MEANS AFTERWARDS",self.classifier.get_params_means() - print "ORIGINAL CLASSIFIER MEANS AFTERWARDS",original_classifier.get_params_means() - self.classifier = original_classifier - self.hp = state_copy - - def insert_finished_job(self): - job = copy.copy(self.hp) - job[jobman.sql.STATUS] = jobman.sql.DONE - job[jobman.sql.EXPERIMENT] = self.experiment - - # don,t try to store arrays in db - job['hidden_layers_sizes'] = job.hidden_layers_sizes[0] - job['corruption_levels'] = job.corruption_levels[0] - - print "Will insert finished job", job - jobman.sql.insert_dict(jobman.flatten(job), self.results_db) + sys.stdout.flush() def finetune(self): print "STARTING FINETUNING" @@ -174,14 +117,15 @@ # create a function to compute the mistakes that are made by the model # on the validation set, or testing set + shared_divider = theano.shared(numpy.asarray(self.input_divider, dtype=theano.config.floatX)) test_model = theano.function([index], self.classifier.errors, givens = { - self.classifier.x: self.test_set_x[index*minibatch_size:(index+1)*minibatch_size] / self.input_divider, + self.classifier.x: self.test_set_x[index*minibatch_size:(index+1)*minibatch_size] / shared_divider, self.classifier.y: self.test_set_y[index*minibatch_size:(index+1)*minibatch_size]}) validate_model = theano.function([index], self.classifier.errors, givens = { - self.classifier.x: self.valid_set_x[index*minibatch_size:(index+1)*minibatch_size] / self.input_divider, + self.classifier.x: self.valid_set_x[index*minibatch_size:(index+1)*minibatch_size] / shared_divider, self.classifier.y: self.valid_set_y[index*minibatch_size:(index+1)*minibatch_size]}) @@ -205,11 +149,6 @@ done_looping = False epoch = 0 - printout_acc = 0.0 - - if not self.hp.has_key('max_finetuning_epochs'): - self.hp.max_finetuning_epochs = 1000 - while (epoch < self.hp.max_finetuning_epochs) and (not done_looping): epoch = epoch + 1 for minibatch_index in xrange(self.n_train_batches): @@ -217,15 +156,13 @@ cost_ij = self.classifier.finetune(minibatch_index) iter = epoch * self.n_train_batches + minibatch_index - printout_acc += cost_ij / float(self.printout_frequency * minibatch_size) - if (iter+1) % self.printout_frequency == 0: - print iter, "cost avg=", printout_acc - printout_acc = 0.0 + self.series_mux.append("training_error", cost_ij) if (iter+1) % validation_frequency == 0: validation_losses = [validate_model(i) for i in xrange(self.n_valid_batches)] this_validation_loss = numpy.mean(validation_losses) + self.series_mux.append("validation_error", this_validation_loss) print('epoch %i, minibatch %i/%i, validation error %f %%' % \ (epoch, minibatch_index+1, self.n_train_batches, \ this_validation_loss*100.)) @@ -246,11 +183,15 @@ # test it on the test set test_losses = [test_model(i) for i in xrange(self.n_test_batches)] test_score = numpy.mean(test_losses) + self.series_mux.append("test_error", test_score) print((' epoch %i, minibatch %i/%i, test error of best ' 'model %f %%') % (epoch, minibatch_index+1, self.n_train_batches, test_score*100.)) + sys.stdout.flush() + + self.series_mux.append("params", self.classifier.all_params) if patience <= iter : done_looping = True @@ -261,6 +202,7 @@ 'best_validation_error':best_validation_loss,\ 'test_score':test_score, 'num_finetuning_epochs':epoch}) + print(('Optimization complete with best validation score of %f %%,' 'with test performance %f %%') % (best_validation_loss * 100., test_score*100.)) diff -r 81f8466dc121 -r c03692aa6158 deep/stacked_dae/stacked_dae.py --- a/deep/stacked_dae/stacked_dae.py Sat Feb 27 19:26:26 2010 -0500 +++ b/deep/stacked_dae/stacked_dae.py Mon Mar 01 11:45:59 2010 -0500 @@ -144,14 +144,20 @@ def __init__(self, train_set_x, train_set_y, batch_size, n_ins, hidden_layers_sizes, n_outs, corruption_levels, rng, pretrain_lr, finetune_lr, input_divider=1.0): + # Just to make sure those are not modified somewhere else afterwards + hidden_layers_sizes = copy.deepcopy(hidden_layers_sizes) + corruption_levels = copy.deepcopy(corruption_levels) update_locals(self, locals()) self.layers = [] self.pretrain_functions = [] self.params = [] + # MODIF: added this so we also get the b_primes + # (not used for finetuning... still using ".params") + self.all_params = [] self.n_layers = len(hidden_layers_sizes) - self.input_divider = numpy.asarray(input_divider, dtype=theano.config.floatX) + self.shared_divider = theano.shared(numpy.asarray(input_divider, dtype=theano.config.floatX)) if len(hidden_layers_sizes) < 1 : raiseException (' You must have at least one hidden layer ') @@ -193,6 +199,8 @@ corruption_level = corruption_levels[0],\ input = layer_input, \ shared_W = layer.W, shared_b = layer.b) + + self.all_params += dA_layer.params # Construct a function that trains this dA # compute gradients of layer parameters @@ -206,7 +214,7 @@ update_fn = theano.function([index], dA_layer.cost, \ updates = updates, givens = { - self.x : train_set_x[index*batch_size:(index+1)*batch_size] / self.input_divider}) + self.x : train_set_x[index*batch_size:(index+1)*batch_size] / self.shared_divider}) # collect this function into a list self.pretrain_functions += [update_fn] @@ -217,6 +225,7 @@ n_in = hidden_layers_sizes[-1], n_out = n_outs) self.params += self.logLayer.params + self.all_params += self.logLayer.params # construct a function that implements one step of finetunining # compute the cost, defined as the negative log likelihood @@ -231,7 +240,7 @@ self.finetune = theano.function([index], cost, updates = updates, givens = { - self.x : train_set_x[index*batch_size:(index+1)*batch_size]/self.input_divider, + self.x : train_set_x[index*batch_size:(index+1)*batch_size]/self.shared_divider, self.y : train_set_y[index*batch_size:(index+1)*batch_size]} ) # symbolic variable that points to the number of errors made on the @@ -239,48 +248,6 @@ self.errors = self.logLayer.errors(self.y) - @classmethod - def copy_reusing_lower_layers(cls, obj, num_hidden_layers, new_finetuning_lr=None): - assert(num_hidden_layers <= obj.n_layers) - - if not new_finetuning_lr: - new_finetuning_lr = obj.finetune_lr - - new_sda = cls(train_set_x= obj.train_set_x, \ - train_set_y = obj.train_set_y,\ - batch_size = obj.batch_size, \ - n_ins= obj.n_ins, \ - hidden_layers_sizes = obj.hidden_layers_sizes[:num_hidden_layers], \ - n_outs = obj.n_outs, \ - corruption_levels = obj.corruption_levels[:num_hidden_layers],\ - rng = obj.rng,\ - pretrain_lr = obj.pretrain_lr, \ - finetune_lr = new_finetuning_lr, \ - input_divider = obj.input_divider ) - - # new_sda.layers contains only the hidden layers actually - for i, layer in enumerate(new_sda.layers): - original_layer = obj.layers[i] - for p1,p2 in zip(layer.params, original_layer.params): - p1.value = p2.value.copy() - - return new_sda - - def get_params_copy(self): - return copy.deepcopy(self.params) - - def set_params_from_copy(self, copy): - # We don't want to replace the var, as the functions have pointers in there - # We only want to replace values. - for i, p in enumerate(self.params): - p.value = copy[i].value - - def get_params_means(self): - s = [] - for p in self.params: - s.append(numpy.mean(p.value)) - return s - if __name__ == '__main__': import sys args = sys.argv[1:] diff -r 81f8466dc121 -r c03692aa6158 utils/scalar_series/__init__.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/utils/scalar_series/__init__.py Mon Mar 01 11:45:59 2010 -0500 @@ -0,0 +1,2 @@ +from series import BaseSeries, AccumulatorSeries, SeriesContainer, BasicStatsSeries, SeriesMultiplexer, SeriesList, ParamsArrayStats + diff -r 81f8466dc121 -r c03692aa6158 utils/scalar_series/series.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/utils/scalar_series/series.py Mon Mar 01 11:45:59 2010 -0500 @@ -0,0 +1,311 @@ +#!/usr/bin/python +# coding: utf-8 + +from __future__ import with_statement + +import sys +import os +import os.path +import array + +# for BasicStatsSeries +import numpy + +# To access .value if necessary +import theano.tensor.sharedvar + +''' +* TODO: add xy series +* TODO: add graph() for base and accumulator +* TODO: flush_every for BaseStatsSeries +* TODO: warn when Mux append() is called with a nonexisting name +* SeriesContainers are also series, albeit with more complex elements appended +* Each series has a "name" which corresponds in some way to the directory or file in which it's saved +''' + +# Simple class to append numbers and flush them to a file once in a while +class BaseSeries(): + # for types, see http://docs.python.org/library/array.html + def __init__(self, name, directory, type='f', flush_every=1): + self.type = type + self.flush_every = flush_every + + if not name or not directory: + raise Exception("name and directory must be provided (strings)") + + self.directory = directory + self.name = name + + if name and directory: + self.filepath = os.path.join(directory, name) + + self._array = array.array(type) + # stores the length not stored in file, waiting to be flushed + self._buffered = 0 + + def append(self, newitem): + self._array.append(newitem) + + self._buffered += 1 + if self._buffered >= self.flush_every: + self.flush() + + def append_list(self, items): + self._array.fromlist(items) + self._buffered += len(items) + if self._buffered >= self.flush_every: + self.flush() + + def flush(self): + if self._buffered == 0: + return + with open(self.filepath, "wb") as f: + s = self._array[-self._buffered:].tostring() + f.write(s) + + def tolist(self): + return self._array.tolist() + + def load_from_file(self): + if not self.filepath: + raise Exception("No name/directory provided") + + self._array = array.array(self.type) + self._buffered = 0 + + statinfo = os.stat(self.filepath) + size = statinfo.st_size + num_items = size / self._array.itemsize + + with open(self.filepath, "rb") as f: + self._array.fromfile(f, num_items) + +class AccumulatorSeries(BaseSeries): + ''' + reduce_every: group (sum or mean) the last "reduce_every" items whenever we have enough + and create a new item added to the real, saved array + (if elements remain at the end, less then "reduce_every", they'll be discarded on program close) + flush_every: this is for items of the real, saved array, not in terms of number of calls to "append" + ''' + def __init__(self, reduce_every, + name, directory, flush_every=1, + mean=False): + BaseSeries.__init__(self, name=name, directory=directory, type='f', flush_every=flush_every) + self.reduce_every = reduce_every + self._accumulator = 0.0 + self._num_accumulated = 0 + self.use_mean = mean + + @classmethod + def series_constructor(cls, reduce_every, mean=False): + def cstr(name, directory, flush_every=1): + return cls(reduce_every=reduce_every, mean=mean, name=name, directory=directory, flush_every=flush_every) + return cstr + + def append(self, item): + self._accumulator += item + self._num_accumulated += 1 + if self._num_accumulated >= self.reduce_every: + n = self._accumulator + if self.use_mean: + n = n / self.reduce_every + BaseSeries.append(self, n) + + self._num_accumulated = 0 + self._accumulator = 0.0 + + def append_list(self, items): + for i in items: + self.append(i) + +class SeriesContainer(): + def __init__(self, parent_directory, name, + series_constructor=BaseSeries): + self.parent_directory = parent_directory + self.name = name + + if not parent_directory or not name: + raise Exception("parent_directory and name must be provided (strings)") + + self.directory_path = os.path.join(parent_directory, name) + + self.series_constructor = series_constructor + + # attempt to create directory for series + if not os.path.isdir(self.directory_path): + os.mkdir(self.directory_path) + + def graph(self): + pass + +class BasicStatsSeries(SeriesContainer): + def __init__(self, parent_directory, name, series_constructor=BaseSeries, + mean=True, minmax=True, std=True): + SeriesContainer.__init__(self, parent_directory=parent_directory, name=name, series_constructor=series_constructor) + + self.save_mean = mean + self.save_minmax = minmax + self.save_std = std + + self.create_series() + + @classmethod + def series_constructor(cls, mean=True, minmax=True, std=True): + def cstr(name, directory, flush_every=1): + return cls(name=name, parent_directory=directory, + mean=mean, minmax=minmax, std=std) + return cstr + + + def create_series(self): + if self.save_mean: + self.means = self.series_constructor(name="mean", directory=self.directory_path) + + if self.save_minmax: + self.mins = self.series_constructor(name="min", directory=self.directory_path) + self.maxes = self.series_constructor(name="max", directory=self.directory_path) + + if self.save_std: + self.stds = self.series_constructor(name="std", directory=self.directory_path) + + def append(self, array): + # TODO: shouldn't this be the job of the caller? (at least ParamsArraySeries) + if isinstance(array, theano.tensor.sharedvar.TensorSharedVariable): + array = array.value + + if self.save_mean: + n = numpy.mean(array) + self.means.append(n) + if self.save_minmax: + n = numpy.min(array) + self.mins.append(n) + n = numpy.max(array) + self.maxes.append(n) + if self.save_std: + n = numpy.std(array) + self.stds.append(n) + + def load_from_file(self): + self.load_from_directory() + + def load_from_directory(self): + if self.save_mean: + self.means.load_from_file() + + if self.save_minmax: + self.mins.load_from_file() + self.maxes.load_from_file() + + if self.save_std: + self.stds.load_from_file() + + def graph(self, xes=None): + import pylab + + if self.save_minmax: + mn = numpy.array(self.mins.tolist()) + mx = numpy.array(self.maxes.tolist()) + if self.save_mean: + y = numpy.array(self.means.tolist()) + else: + y = (mn+mx) / 2 + + above_y = mx - y + below_y = y - mn + + if not xes: + xes = numpy.arange(len(y)) + + pylab.errorbar(x=xes, y=y, yerr=[below_y, above_y]) + + elif self.save_mean: + y = numpy.array(self.means.tolist()) + if not xes: + xes = numpy.arange(len(y)) + + pylab.plot(x=xes, y=y) + + +class SeriesMultiplexer(): + def __init__(self): + self._series_dict = {} + self._warned_for = {} + + def append(self, series_name, item): + # if we don't have the series, just don't do anything + if self._series_dict.has_key(series_name): + s = self._series_dict[series_name] + s.append(item) + elif not self._warned_for.has_key(series_name): + print "WARNING: SeriesMultiplexer called with unknown name ", series_name + self._warned_for[series_name] = 1 + + def append_list(self, series_name, items): + if self._series_dict.has_key(series_name): + s = self._series_dict[series_name] + s.append_list(items) + elif not self._warned_for.has_key(series_name): + print "WARNING: SeriesMultiplexer called with unknown name ", series_name + self._warned_for[series_name] = 1 + + def add_series(self, series): + if self._series_dict.has_key(series.name): + raise Exception("A series with such a name already exists") + self._series_dict[series.name] = series + +class SeriesList(): + def __init__(self, num_elements, name, directory, series_constructor=BaseSeries): + self._subseries = [None] * num_elements + self.name = name + + for i in range(num_elements): + newname = name + "." + str(i) + self._subseries[i] = series_constructor(name=newname, directory=directory) + + def load_from_files(self): + self.load_from_file() + + def load_from_file(self): + for s in self._subseries: + s.load_from_file() + + # no "append_list", this would get confusing + def append(self, list_of_items): + if len(list_of_items) != len(self._subseries): + raise Exception("bad number of items, expected " + str(len(self._subseries)) + ", got " + str(len(list_of_items))) + for i in range(len(list_of_items)): + self._subseries[i].append(list_of_items[i]) + + +# Just a shortcut +class ParamsArrayStats(SeriesList): + def __init__(self, num_params_arrays, name, directory): + cstr = BasicStatsSeries.series_constructor() + + SeriesList.__init__(self, num_elements=num_params_arrays, + name=name, directory=directory, + series_constructor=cstr) + +# ------------------------ +# Utilities to work with the series files from the command line + +# "dumpf" +def dump_floats_file(filepath): + print "Floats dump of ", filepath + with open(filepath, "rb") as f: + s = os.stat(filepath) + size = s.st_size + num = size / 4 + a = array.array('f') + a.fromfile(f, num) + print a.tolist() + +if __name__ == '__main__': + args = sys.argv[1:] + + if len(args) == 2 and args[0] == "dumpf": + file = args[1] + dump_floats_file(file) + else: + print "Bad arguments" + diff -r 81f8466dc121 -r c03692aa6158 utils/scalar_series/test_series.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/utils/scalar_series/test_series.py Mon Mar 01 11:45:59 2010 -0500 @@ -0,0 +1,197 @@ +#!/usr/bin/python +# coding: utf-8 + +import sys +import tempfile +import os.path +import os + +import numpy + +from series import BaseSeries, AccumulatorSeries, SeriesContainer, BasicStatsSeries, SeriesMultiplexer, SeriesList, ParamsArrayStats + + +BASEDIR = tempfile.mkdtemp() + +def tempname(): + file = tempfile.NamedTemporaryFile(dir=BASEDIR) + filepath = file.name + return os.path.split(filepath) + +def tempdir(): + wholepath = os.path.dirname(tempfile.mkdtemp(dir=BASEDIR)) + # split again, interpreting the last directory as a filename + return os.path.split(wholepath) + +def tempseries(type='f', flush_every=1): + dir, filename = tempname() + + s = BaseSeries(name=filename, directory=dir, type=type, flush_every=flush_every) + + return s + +def test_Series_storeload(): + s = tempseries() + + s.append(12.0) + s.append_list([13.0,14.0,15.0]) + + s2 = BaseSeries(name=s.name, directory=s.directory, flush_every=15) + # also test if elements stored before load_from_file (and before a flush) + # are deleted (or array is restarted from scratch... both work) + s2.append(10.0) + s2.append_list([30.0,40.0]) + s2.load_from_file() + + assert s2.tolist() == [12.0,13.0,14.0,15.0] + + +def test_AccumulatorSeries_mean(): + dir, filename = tempname() + + s = AccumulatorSeries(reduce_every=15, mean=True, name=filename, directory=dir) + + for i in range(50): + s.append(i) + + assert s.tolist() == [7.0,22.0,37.0] + +def test_BasicStatsSeries_commoncase(): + a1 = numpy.arange(25).reshape((5,5)) + a2 = numpy.arange(40).reshape((8,5)) + + parent_dir, dir = tempdir() + + bss = BasicStatsSeries(parent_directory=parent_dir, name=dir) + + bss.append(a1) + bss.append(a2) + + assert bss.means.tolist() == [12.0, 19.5] + assert bss.mins.tolist() == [0.0, 0.0] + assert bss.maxes.tolist() == [24.0, 39.0] + assert (bss.stds.tolist()[0] - 7.211102) < 1e-3 + assert (bss.stds.tolist()[1] - 11.54339) < 1e-3 + + # try to reload + + bss2 = BasicStatsSeries(parent_directory=parent_dir, name=dir) + bss2.load_from_directory() + + assert bss2.means.tolist() == [12.0, 19.5] + assert bss2.mins.tolist() == [0.0, 0.0] + assert bss2.maxes.tolist() == [24.0, 39.0] + assert (bss2.stds.tolist()[0] - 7.211102) < 1e-3 + assert (bss2.stds.tolist()[1] - 11.54339) < 1e-3 + +def test_BasicStatsSeries_reload(): + a1 = numpy.arange(25).reshape((5,5)) + a2 = numpy.arange(40).reshape((8,5)) + + parent_dir, dir = tempdir() + + bss = BasicStatsSeries(parent_directory=parent_dir, name=dir) + + bss.append(a1) + bss.append(a2) + + # try to reload + + bss2 = BasicStatsSeries(parent_directory=parent_dir, name=dir) + bss2.load_from_directory() + + assert bss2.means.tolist() == [12.0, 19.5] + assert bss2.mins.tolist() == [0.0, 0.0] + assert bss2.maxes.tolist() == [24.0, 39.0] + assert (bss2.stds.tolist()[0] - 7.211102) < 1e-3 + assert (bss2.stds.tolist()[1] - 11.54339) < 1e-3 + + +def test_BasicStatsSeries_withaccumulator(): + a1 = numpy.arange(25).reshape((5,5)) + a2 = numpy.arange(40).reshape((8,5)) + a3 = numpy.arange(20).reshape((4,5)) + a4 = numpy.arange(48).reshape((6,8)) + + parent_dir, dir = tempdir() + + sc = AccumulatorSeries.series_constructor(reduce_every=2, mean=False) + + bss = BasicStatsSeries(parent_directory=parent_dir, name=dir, series_constructor=sc) + + bss.append(a1) + bss.append(a2) + bss.append(a3) + bss.append(a4) + + assert bss.means.tolist() == [31.5, 33.0] + +def test_SeriesList_withbasicstats(): + dir = tempfile.mkdtemp(dir=BASEDIR) + + bscstr = BasicStatsSeries.series_constructor() + + slist = SeriesList(num_elements=5, name="foo", directory=dir, series_constructor=bscstr) + + for i in range(10): # 10 elements in each list + curlist = [] + for j in range(5): # 5 = num_elements, ie. number of list to append to + dist = numpy.arange(i*j, i*j+10) + curlist.append(dist) + slist.append(curlist) + + slist2 = SeriesList(num_elements=5, name="foo", directory=dir, series_constructor=bscstr) + + slist2.load_from_files() + + l1 = slist2._subseries[0].means.tolist() + l2 = slist2._subseries[4].means.tolist() + + print l1 + print l2 + + assert l1 == [4.5, 4.5, 4.5, 4.5, 4.5, 4.5, 4.5, 4.5, 4.5, 4.5] + assert l2 == [4.5, 8.5, 12.5, 16.5, 20.5, 24.5, 28.5, 32.5, 36.5, 40.5] + +# same test as above, just with the shortcut +def test_ParamsArrayStats_reload(): + dir = tempfile.mkdtemp(dir=BASEDIR) + + slist = ParamsArrayStats(5, name="foo", directory=dir) + + for i in range(10): # 10 elements in each list + curlist = [] + for j in range(5): # 5 = num_elements, ie. number of list to append to + dist = numpy.arange(i*j, i*j+10) + curlist.append(dist) + slist.append(curlist) + + slist2 = ParamsArrayStats(5, name="foo", directory=dir) + + slist2.load_from_files() + + l1 = slist2._subseries[0].means.tolist() + l2 = slist2._subseries[4].means.tolist() + + print l1 + print l2 + + assert l1 == [4.5, 4.5, 4.5, 4.5, 4.5, 4.5, 4.5, 4.5, 4.5, 4.5] + assert l2 == [4.5, 8.5, 12.5, 16.5, 20.5, 24.5, 28.5, 32.5, 36.5, 40.5] + + +def manual_BasicStatsSeries_graph(): + parent_dir, dir = tempdir() + + bss = BasicStatsSeries(parent_directory=parent_dir, name=dir) + + for i in range(50): + bss.append(1.0/numpy.arange(i*5, i*5+5)) + + bss.graph() + +#if __name__ == '__main__': +# import pylab +# manual_BasicStatsSeries_graph() +# pylab.show() +