changeset 339:ffbf0e41bcee

Aded code to run experiment on cluster, separate configuration from other machinery. Not tested yet.
author fsavard
date Sat, 17 Apr 2010 20:29:18 -0400
parents fca22114bb23
children 523e7b87c521
files deep/crbm/mnist_config.py.example deep/crbm/mnist_crbm.py deep/crbm/utils.py
diffstat 3 files changed, 286 insertions(+), 73 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/deep/crbm/mnist_config.py.example	Sat Apr 17 20:29:18 2010 -0400
@@ -0,0 +1,104 @@
+# ----------------------------------------------------------------------------
+# BEGIN EXPERIMENT ISOLATION CODE
+
+'''
+This makes sure we use the codebase clone created for this experiment.
+I.e. if you want to make modifications to the codebase but don't want your
+running experiment code to be impacted by those changes, first copy the
+codebase somewhere, and configure this section. It will make sure we import
+from the right place.
+
+MUST BE DONE BEFORE IMPORTING ANYTHING ELSE
+(Leave this comment there so others will understand what's going on)
+'''
+
+# Place where you copied modules that should be frozen for this experiment
+codebase_clone_path = "/u/savardf/ift6266/experiment_clones/ift6266_mnistcrbm_exp1"
+
+# Places where there might be conflicting modules from your $PYTHONPATH
+remove_these_from_pythonpath = ["/u/savardf/ift6266/dev_code"]
+
+import sys
+sys.path[0:0] = [codebase_clone_path]
+
+# remove paths we specifically don't want in $PYTHONPATH
+for bad_path in remove_these_from_pythonpath:
+    sys.path[:] = [el for el in sys.path if not el in (bad_path, bad_path+"/")]
+
+# Make the imports
+import ift6266
+
+# Just making sure we're importing from the right place
+modules_to_check = [ift6266]
+for module in modules_to_check:
+    if not codebase_clone_path in module.__path__[0]:
+        raise RuntimeError("Module loaded from incorrect path "+module.__path__[0])
+
+# Path to pass to jobman sqlschedule. IMPORTANT TO CHANGE TO REFLECT YOUR CLONE.
+# Make sure this is accessible from the default $PYTHONPATH (in your .bashrc)
+# (and make sure every subdirectory has its __init__.py file)
+EXPERIMENT_PATH = "ift6266_mnistcrbm_exp1.ift6266.deep.crbm.mnist_crbm.jobman_entrypoint"
+
+# END EXPERIMENT ISOLATION CODE
+# ----------------------------------------------------------------------------
+
+from jobman import DD
+
+'''
+These are parameters used by mnist_crbm.py. They'll end up as globals in there.
+
+Rename this file to config.py and configure as needed.
+DON'T add the renamed file to the repository, as others might use it
+without realizing it, with dire consequences.
+'''
+
+# change "sandbox" when you're ready
+JOBDB = 'postgres://ift6266h10@gershwin/ift6266h10_db/fsavard_mnistcrbm_exp1'
+
+# Set this to True when you want to run cluster tests, ie. you want
+# to run on the cluster, many jobs, but want to reduce the training
+# set size and the number of epochs, so you know everything runs
+# fine on the cluster.
+# Set this PRIOR to inserting your test jobs in the DB.
+TEST_CONFIG = False
+
+# save params at training end
+SAVE_PARAMS = True
+
+IMAGE_OUTPUT_DIR = 'img/'
+
+# number of minibatches before taking means for valid error etc.
+REDUCE_EVERY = 100
+
+# print series to stdout too (otherwise just produce the HDF5 file)
+SERIES_STDOUT_TOO = False
+
+VISUALIZE_EVERY = 20000
+GIBBS_STEPS_IN_VIZ_CHAIN = 1000
+
+# This is to configure insertion of jobs on the cluster.
+# Possible values the hyperparameters can take. These are then
+# combined with produit_cartesien_jobs so we get a list of all
+# possible combinations, each one resulting in a job inserted
+# in the jobman DB.
+JOB_VALS = {'learning_rate': [1.0, 0.1, 0.01],
+        'sparsity_lambda': [3.0,0.5],
+        'sparsity_p': [0.3,0.05],
+        'num_filters': [40,15],
+        'filter_size': [12,7],
+        'minibatch_size': [20]}
+
+# Just useful for tests... minimal number of epochs
+# Useful when launching a single local job
+DEFAULT_STATE = DD({'learning_rate': 0.1,
+        'sparsity_lambda': 1.0,
+        'sparsity_p': 0.05,
+        'num_filters': 40,
+        'filter_size': 12,
+        'minibatch_size': 10})
+
+# To reinsert duplicate of jobs that crashed
+REINSERT_COLS = ['learning_rate','sparsity_lambda','sparsity_p','num_filters','filter_size','minibatch_size','dupe']
+#REINSERT_JOB_VALS = [\
+#            [,2],]
+
--- a/deep/crbm/mnist_crbm.py	Sat Apr 17 12:42:48 2010 -0400
+++ b/deep/crbm/mnist_crbm.py	Sat Apr 17 20:29:18 2010 -0400
@@ -1,5 +1,8 @@
 #!/usr/bin/python
 
+# do this first
+from mnist_config import *
+
 import sys
 import os, os.path
 
@@ -18,68 +21,44 @@
 from pylearn.io.seriestables import *
 import tables
 
-IMAGE_OUTPUT_DIR = 'img/'
-
-REDUCE_EVERY = 100
-
-def filename_from_time(suffix):
-    import datetime
-    return str(datetime.datetime.now()) + suffix + ".png"
+import utils
 
-# Just a shortcut for a common case where we need a few
-# related Error (float) series
-def get_accumulator_series_array( \
-                hdf5_file, group_name, series_names, 
-                reduce_every,
-                index_names=('epoch','minibatch'),
-                stdout_too=True,
-                skip_hdf5_append=False):
-    all_series = []
-
-    hdf5_file.createGroup('/', group_name)
+#def filename_from_time(suffix):
+#    import datetime
+#    return str(datetime.datetime.now()) + suffix + ".png"
 
-    other_targets = []
-    if stdout_too:
-        other_targets = [StdoutAppendTarget()]
+def jobman_entrypoint(state, channel):
+    # record mercurial versions of each package
+    pylearn.version.record_versions(state,[theano,ift6266,pylearn])
+    channel.save()
 
-    for sn in series_names:
-        series_base = \
-            ErrorSeries(error_name=sn,
-                table_name=sn,
-                hdf5_file=hdf5_file,
-                hdf5_group='/'+group_name,
-                index_names=index_names,
-                other_targets=other_targets,
-                skip_hdf5_append=skip_hdf5_append)
+    crbm = MnistCrbm(state)
+    crbm.train()
 
-        all_series.append( \
-            AccumulatorSeriesWrapper( \
-                    base_series=series_base,
-                    reduce_every=reduce_every))
-
-    ret_wrapper = SeriesArrayWrapper(all_series)
-
-    return ret_wrapper
+    return channel.COMPLETE
 
 class MnistCrbm(object):
-    def __init__(self):
-        self.mnist = MNIST.full()#first_10k()
+    def __init__(self, state):
+        self.state = state
+
+        if TEST_CONFIG:
+            self.mnist = MNIST.full()#first_10k()
 
         self.cp = ConvolutionParams( \
-                    num_filters=40,
+                    num_filters=state.num_filters,
                     num_input_planes=1,
-                    height_filters=12,
-                    width_filters=12)
+                    height_filters=state.filter_size,
+                    width_filters=state.filter_size)
 
         self.image_size = (28,28)
 
-        self.minibatch_size = 10
+        self.minibatch_size = state.minibatch_size
 
-        self.lr = 0.01
-        self.sparsity_lambda = 1.0
+        self.lr = state.learning_rate
+        self.sparsity_lambda = state.sparsity_lambda
         # about 1/num_filters, so only one filter active at a time
         # 40 * 0.05 = ~2 filters active for any given pixel
-        self.sparsity_p = 0.05
+        self.sparsity_p = state.sparsity_p
 
         self.crbm = CRBM( \
                     minibatch_size=self.minibatch_size,
@@ -89,12 +68,11 @@
                     sparsity_lambda=self.sparsity_lambda,
                     sparsity_p=self.sparsity_p)
         
-        self.num_epochs = 10
+        self.num_epochs = state.num_epochs
 
         self.init_series()
  
     def init_series(self):
-
         series = {}
 
         basedir = os.getcwd()
@@ -103,38 +81,36 @@
 
         cd_series_names = self.crbm.cd_return_desc
         series['cd'] = \
-            get_accumulator_series_array( \
+            utils.get_accumulator_series_array( \
                 h5f, 'cd', cd_series_names,
                 REDUCE_EVERY,
-                stdout_too=True)
+                stdout_too=SERIES_STDOUT_TOO)
 
         sparsity_series_names = self.crbm.sparsity_return_desc
         series['sparsity'] = \
-            get_accumulator_series_array( \
+            utils.get_accumulator_series_array( \
                 h5f, 'sparsity', sparsity_series_names,
                 REDUCE_EVERY,
-                stdout_too=True)
+                stdout_too=SERIES_STDOUT_TOO)
 
         # so first we create the names for each table, based on 
         # position of each param in the array
-        params_stdout = StdoutAppendTarget("\n------\nParams")
+        params_stdout = []
+        if SERIES_STDOUT_TOO:
+            params_stdout = [StdoutAppendTarget()]
         series['params'] = SharedParamsStatisticsWrapper(
                             new_group_name="params",
                             base_group="/",
                             arrays_names=['W','b_h','b_x'],
                             hdf5_file=h5f,
                             index_names=('epoch','minibatch'),
-                            other_targets=[params_stdout])
+                            other_targets=params_stdout)
 
         self.series = series
 
     def train(self):
         num_minibatches = len(self.mnist.train.x) / self.minibatch_size
 
-        print_every = 1000
-        visualize_every = 5000
-        gibbs_steps_from_random = 1000
-
         for epoch in xrange(self.num_epochs):
             for mb_index in xrange(num_minibatches):
                 mb_x = self.mnist.train.x \
@@ -158,13 +134,22 @@
                     self.series['params'].append( \
                         (epoch, mb_index), self.crbm.params)
 
-                if total_idx % visualize_every == 0:
+                if total_idx % VISUALIZE_EVERY == 0:
                     self.visualize_gibbs_result(\
-                        mb_x, gibbs_steps_from_random)
-                    self.visualize_gibbs_result(mb_x, 1)
-                    self.visualize_filters()
+                        mb_x, GIBBS_STEPS_IN_VIZ_CHAIN,
+                        "gibbs_chain_"+str(epoch)+"_"+str(mb_index))
+                    self.visualize_gibbs_result(mb_x, 1,
+                        "gibbs_1_"+str(epoch)+"_"+str(mb_index))
+                    self.visualize_filters(
+                        "filters_"+str(epoch)+"_"+str(mb_index))
+            if TEST_CONFIG:
+                # do a single epoch for cluster tests config
+                break
+
+        if SAVE_PARAMS:
+            utils.save_params(self.crbm.params, "params.pkl")
     
-    def visualize_gibbs_result(self, start_x, gibbs_steps):
+    def visualize_gibbs_result(self, start_x, gibbs_steps, filename):
         # Run minibatch_size chains for gibbs_steps
         x_samples = None
         if not start_x is None:
@@ -176,15 +161,14 @@
         tile = tile_raster_images(x_samples, self.image_size,
                     (1, self.minibatch_size), output_pixel_vals=True)
 
-        filepath = os.path.join(IMAGE_OUTPUT_DIR,
-                    filename_from_time("gibbs"))
+        filepath = os.path.join(IMAGE_OUTPUT_DIR, filename+".png")
         img = Image.fromarray(tile)
         img.save(filepath)
 
         print "Result of running Gibbs", \
                 gibbs_steps, "times outputed to", filepath
 
-    def visualize_filters(self):
+    def visualize_filters(self, filename):
         cp = self.cp
 
         # filter size
@@ -198,18 +182,26 @@
         tile = tile_raster_images(filters_flattened, fsz, 
                                     tile_shape, output_pixel_vals=True)
 
-        filepath = os.path.join(IMAGE_OUTPUT_DIR,
-                        filename_from_time("filters"))
+        filepath = os.path.join(IMAGE_OUTPUT_DIR, filename+".png")
         img = Image.fromarray(tile)
         img.save(filepath)
 
         print "Filters (as images) outputed to", filepath
-        print "b_h is", self.crbm.b_h.value
-
 
 
 
 if __name__ == '__main__':
-    mc = MnistCrbm()
-    mc.train()
+    args = sys.argv[1:]
 
+    if len(args) == 0:
+        print "Bad usage"
+    elif args[0] == 'jobman_insert':
+        utils.jobman_insert_job_vals(JOBDB, EXPERIMENT_PATH, JOB_VALS)
+    elif args[0] == 'test_jobman_entrypoint':
+        chanmock = DD({'COMPLETE':0,'save':(lambda:None)})
+        jobman_entrypoint(DEFAULT_STATE, chanmock)
+    elif args[0] == 'run_default':
+        mc = MnistCrbm(DEFAULT_STATE)
+        mc.train()
+
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/deep/crbm/utils.py	Sat Apr 17 20:29:18 2010 -0400
@@ -0,0 +1,117 @@
+#!/usr/bin/python
+# coding: utf-8
+
+from __future__ import with_statement
+
+from jobman import DD
+
+from pylearn.io.seriestables import *
+import tables
+
+
+
+# from pylearn codebase
+# useful in __init__(param1, param2, etc.) to save
+# values in self.param1, self.param2... just call
+# update_locals(self, locals())
+def update_locals(obj, dct):
+    if 'self' in dct:
+        del dct['self']
+    obj.__dict__.update(dct)
+
+# from a dictionary of possible values for hyperparameters, e.g.
+# hp_values = {'learning_rate':[0.1, 0.01], 'num_layers': [1,2]}
+# create a list of other dictionaries representing all the possible
+# combinations, thus in this example creating:
+# [{'learning_rate': 0.1, 'num_layers': 1}, ...]
+# (similarly for combinations (0.1, 2), (0.01, 1), (0.01, 2))
+def produit_cartesien_jobs(val_dict):
+    job_list = [DD()]
+    all_keys = val_dict.keys()
+
+    for key in all_keys:
+        possible_values = val_dict[key]
+        new_job_list = []
+        for val in possible_values:
+            for job in job_list:
+                to_insert = job.copy()
+                to_insert.update({key: val})
+                new_job_list.append(to_insert)
+        job_list = new_job_list
+
+    return job_list
+
+def jobs_from_reinsert_list(cols, job_vals):
+    job_list = []
+    for vals in job_vals:
+        job = DD()
+        for i, col in enumerate(cols):
+            job[col] = vals[i]
+        job_list.append(job)
+
+    return job_list
+
+def save_params(all_params, filename):
+    import pickle
+    with open(filename, 'wb') as f:
+        values = [p.value for p in all_params]
+
+        # -1 for HIGHEST_PROTOCOL
+        pickle.dump(values, f, -1)
+
+# Perform insertion into the Postgre DB based on combination
+# of hyperparameter values above
+# (see comment for produit_cartesien_jobs() to know how it works)
+def jobman_insert_job_vals(job_db, experiment_path, job_vals):
+    jobs = produit_cartesien_jobs(job_vals)
+
+    db = jobman.sql.db(job_db)
+    for job in jobs:
+        job.update({jobman.sql.EXPERIMENT: experiment_path})
+        jobman.sql.insert_dict(job, db)
+
+def jobman_insert_specific_jobs(job_db, experiment_path,
+                        insert_cols, insert_vals):
+    jobs = jobs_from_reinsert_list(insert_cols, insert_vals)
+
+    db = jobman.sql.db(job_db)
+    for job in jobs:
+        job.update({jobman.sql.EXPERIMENT: experiment_path})
+        jobman.sql.insert_dict(job, db)
+
+# Just a shortcut for a common case where we need a few
+# related Error (float) series
+def get_accumulator_series_array( \
+                hdf5_file, group_name, series_names, 
+                reduce_every,
+                index_names=('epoch','minibatch'),
+                stdout_too=True,
+                skip_hdf5_append=False):
+    all_series = []
+
+    new_group = hdf5_file.createGroup('/', group_name)
+
+    other_targets = []
+    if stdout_too:
+        other_targets = [StdoutAppendTarget()]
+
+    for sn in series_names:
+        series_base = \
+            ErrorSeries(error_name=sn,
+                table_name=sn,
+                hdf5_file=hdf5_file,
+                hdf5_group=new_group._v_pathname,
+                index_names=index_names,
+                other_targets=other_targets,
+                skip_hdf5_append=skip_hdf5_append)
+
+        all_series.append( \
+            AccumulatorSeriesWrapper( \
+                    base_series=series_base,
+                    reduce_every=reduce_every))
+
+    ret_wrapper = SeriesArrayWrapper(all_series)
+
+    return ret_wrapper
+
+