changeset 567:d88c35e8f83a

another checkpoint
author Olivier Breuleux <breuleuo@iro.umontreal.ca>
date Wed, 03 Dec 2008 18:52:34 -0500
parents a71971ccc1e4
children 1f036d934ad9
files pylearn/dbdict/newstuff.py
diffstat 1 files changed, 171 insertions(+), 75 deletions(-) [+]
line wrap: on
line diff
--- a/pylearn/dbdict/newstuff.py	Wed Dec 03 16:07:04 2008 -0500
+++ b/pylearn/dbdict/newstuff.py	Wed Dec 03 18:52:34 2008 -0500
@@ -1,6 +1,11 @@
 
+from __future__ import with_statement
+
+import FileLock
+import portalocker
 from collections import defaultdict
 import re, sys, inspect, os
+import signal
 
 ################################################################################
 ### resolve
@@ -18,25 +23,26 @@
 ################################################################################
 
 def convert(obj):
-    if not isinstance(obj, str):
-        return obj
-    def kw(x):
-        x = x.lower()
-        return dict(true = True,
-                    false = False,
-                    none = None)[x]
-    for f in (kw, int, float):
-        try:
-            return f(obj)
-        except:
-            pass
-    return obj
+    return eval(obj)
+#     if not isinstance(obj, str):
+#         return obj
+#     def kw(x):
+#         x = x.lower()
+#         return dict(true = True,
+#                     false = False,
+#                     none = None)[x]
+#     for f in (kw, int, float):
+#         try:
+#             return f(obj)
+#         except:
+#             pass
+#     return obj
 
 def flatten(obj):
     d = {}
     def helper(d, prefix, obj):
         if isinstance(obj, (str, int, float)):
-            d[prefix] = convert(obj)
+            d[prefix] = obj #convert(obj)
         else:
             if isinstance(obj, dict):
                 subd = obj
@@ -61,7 +67,7 @@
         current = struct
         for k2 in keys[:-1]:
             current = current[k2]
-        current[keys[-1]] = convert(v)
+        current[keys[-1]] = v #convert(v)
     return struct
 
 def realize(d):
@@ -104,7 +110,7 @@
 
 def format_d(d, sep = '\n', space = True):
     d = flatten(d)
-    pattern = "%s = %s" if space else "%s=%s"
+    pattern = "%s = %r" if space else "%s=%r"
     return sep.join(pattern % (k, v) for k, v in d.iteritems())
 
 def format_help(topic):
@@ -125,97 +131,186 @@
 ### single channels
 ################################################################################
 
-try:
-    import greenlet
-except:
-    try:
-        from py import greenlet
-    except:
-        print >>sys.stderr, 'the greenlet module is unavailable'
-        greenlet = None
+# try:
+#     import greenlet
+# except:
+#     try:
+#         from py import greenlet
+#     except:
+#         print >>sys.stderr, 'the greenlet module is unavailable'
+#         greenlet = None
 
-class Complete(Exception):
-    pass
-class Incomplete(Exception):
-    pass
 
 class Channel(object):
+
     COMPLETE = None
     INCOMPLETE = True
+    
+    START = 0
+    """dbdict.status == START means a experiment is ready to run"""
+    RUNNING = 1
+    """dbdict.status == RUNNING means a experiment is running on dbdict_hostname"""
+    DONE = 2
+    """dbdict.status == DONE means a experiment has completed (not necessarily successfully)"""
+
+    # Methods to be used by the experiment to communicate with the channel
+
     def save(self):
+        """
+        Save the experiment's state to the various media supported by
+        the Channel.
+        """
         raise NotImplementedError()
-    def switch(self, *message):
+
+    def switch(self, message = None):
+        """
+        Called from the experiment to give the control back to the channel.
+        The following return values are meaningful:
+          * 'stop' -> the experiment must stop as soon as possible. It may save what
+            it needs to save. This occurs when SIGTERM or SIGINT are sent (or in
+            user-defined circumstances).
+        switch() may give the control to the user. In this case, the user may
+        resume the experiment by calling switch() again. If an argument is given
+        by the user, it will be relayed to the experiment.
+        """
         pass
-    def process_message(self, message):
-        pass
+
+    __call__ = switch
+
+    def save_and_switch(self):
+        self.save()
+        self.switch()
+
+    # Methods to run the experiment
+
     def setup(self):
         pass
+
+    def __enter__(self):
+        pass
+
+    def __exit__(self):
+        pass
+
     def run(self):
         pass
 
 
 
-        def on_sigterm(signo, frame):
-            channel_rval[0] = 'stop'
+class SingleChannel(Channel):
 
-        #install a SIGTERM handler that asks the run_state function to return
-        signal.signal(signal.SIGTERM, on_sigterm)
-        signal.signal(signal.SIGINT, on_sigterm)
-
-
-class SingleChannel(Channel):
     def __init__(self, experiment, state):
         self.experiment = experiment
         self.state = state
-#     def switch(self, *message):
-#         if greenlet:
-#             if greenlet.getcurrent() is self.expg:
-#                 self.feedback = message
-#                 self.manager.switch(message)
-#             else:
-#                 self.expg.switch(message)
-#         else:
-#             self.feedback = message
-    def run(self, interactive = False):
-        if interactive and not greenlet:
-            raise Exception('interactive mode requires the greenlet package to be installed (try easy_install greenlet or easy_install py)')
-        self.interactive = interactive
+        self.feedback = None
+
+    def switch(self, message):
+        feedback = self.feedback
+        self.feedback = None
+        return feedback
+
+    def run(self):
+        # install a SIGTERM handler that asks the experiment function to return
+        # the next time it will call switch()
+        def on_sigterm(signo, frame):
+            self.feedback = 'stop'
+        signal.signal(signal.SIGTERM, on_sigterm)
+        signal.signal(signal.SIGINT, on_sigterm)
+
         self.setup()
-        self.state['job'].setdefault('complete', False)
-        if self.state['job']['complete']:
-            raise Complete('The job has already completed.')
-#         if greenlet:
-# #             self.manager = greenlet.getcurrent()
-# #             self.expg = greenlet.greenlet(self.experiment)
-# #             expg.switch(self, self.state)
-#         else:
-        v = self.experiment(self, self.state)
-        self.state['job']['complete'] = v is COMPLETE
-        self.save()
+
+        status = self.state['dbdict'].get('status', self.START)
+        if status is self.DONE:
+            raise Exception('The job has already completed.')
+        elif status is self.RUNNING:
+            raise Exception('The job is already running.')
+        self.state['dbdict'].setdefault('status', self.START)
+
+        with self:
+            v = self.experiment(self, self.state)
+            self.state['dbdict']['status'] = self.DONE if v is self.COMPLETE else self.START
+
         return v
 
+    def __enter__(self):
+        return self
+
+    def __exit__(self, type, value, traceback):
+        self.save()
+
+
 class StandardChannel(SingleChannel):
-    def __init__(self, root, experiment, state):
+
+    def __init__(self, root, experiment, state, redirect_stdout = False, redirect_stderr = False):
         self.root = root
         self.experiment = experiment
         self.state = state
         self.dirname = format_d(self.state, sep=',', space=False)
         self.path = os.path.join(self.root, self.dirname)
+        self.lock = None
+        self.redirect_stdout = redirect_stdout
+        self.redirect_stderr = redirect_stderr
+
     def save(self):
+        with open(os.path.join(self.path, 'current.conf'), 'w') as current:
+            current.write(format_d(self.state))
+            current.write('\n')
+
+    def __enter__(self):
+        ###assert self.lock is None
+        ##lockf = os.path.join(self.path, 'lock')
+        ##self.lock = open(lockf, 'r+')
+        ##portalocker.lock(self.lock, portalocker.LOCK_EX)
+        #self.lock = FileLock.FileLock(os.path.join(self.path, 'lock'))
+        #self.lock.lock()
+        self.old_cwd = os.getcwd()
         os.chdir(self.path)
-        current = open('current.conf', 'w')
-        current.write(format_d(self.state))
-        current.write('\n')
+        if self.redirect_stdout:
+            self.old_stdout = sys.stdout
+            sys.stdout = open('stdout', 'a')
+        if self.redirect_stderr:
+            self.old_stderr = sys.stderr
+            sys.stderr = open('stderr', 'a')
+        return super(StandardChannel, self).__enter__()
+
+    def __exit__(self, type, value, traceback):
+        ###assert self.lock is not None
+        if self.redirect_stdout:
+            sys.stdout.close()
+            sys.stdout = self.old_stdout
+        if self.redirect_stderr:
+            sys.stderr.close()
+            sys.stderr = self.old_stderr
+        os.chdir(self.old_cwd)
+        ##self.lock.close()
+        #self.lock.unlock()
+        ###self.lock = None
+        return super(StandardChannel, self).__exit__(type, value, traceback)
+
     def setup(self):
         if not os.path.isdir(self.path):
             os.mkdir(self.path)
-        os.chdir(self.path)
-        if not os.path.isfile('orig.conf'):
-            orig = open('orig.conf', 'w')
-            orig.write(format_d(self.state))
-            orig.write('\n')
-        if os.path.isfile('current.conf'):
-            self.state = expand(parse(*map(str.strip, open('current.conf', 'r').readlines())))
+        with self:
+            origf = os.path.join(self.path, 'orig.conf')
+            if not os.path.isfile(origf):
+                with open(origf, 'w') as orig:
+                    orig.write(format_d(self.state))
+                    orig.write('\n')
+            currentf = os.path.join(self.path, 'current.conf')
+            if os.path.isfile(currentf):
+                with open(currentf, 'r') as current:
+                    self.state = expand(parse(*map(str.strip, current.readlines())))
+
+#         origf = os.path.join(self.path, 'orig.conf')
+#         if not os.path.isfile(origf):
+#             with open(os.path.isfile(origf), 'w') as orig:
+#                 orig.write(format_d(self.state))
+#                 orig.write('\n')
+#         currentf = os.path.join(self.path, 'current.conf')
+#         if os.path.isfile(currentf):
+#             with open(currentf, 'r') as current:
+#                 self.state = expand(parse(*map(str.strip, current.readlines())))
+            
 
 class RSyncException(Exception):
     pass
@@ -348,7 +443,8 @@
     """
     state = expand(parse(*strings))
     experiment = resolve(experiment)
-    channel = RSyncChannel(os.getcwd(), os.path.realpath('yaddayadda'), experiment, state)
+    #channel = RSyncChannel(os.getcwd(), os.path.realpath('yaddayadda'), experiment, state)
+    channel = StandardChannel(os.getcwd(), experiment, state)
     channel.run()
 
 runner_registry['cmdline'] = cmdline