changeset 621:7041749cf804

untested changes to close sessions in api0
author James Bergstra <bergstrj@iro.umontreal.ca>
date Sun, 18 Jan 2009 21:14:14 -0500
parents 04752b23da8d
children d2d582bcf7dc
files pylearn/dbdict/api0.py pylearn/dbdict/sql.py
diffstat 2 files changed, 88 insertions(+), 45 deletions(-) [+]
line wrap: on
line diff
--- a/pylearn/dbdict/api0.py	Sat Jan 17 21:23:36 2009 -0500
+++ b/pylearn/dbdict/api0.py	Sun Jan 18 21:14:14 2009 -0500
@@ -1,4 +1,5 @@
 from sqlalchemy import create_engine, desc
+import sqlalchemy.pool
 from sqlalchemy.orm import sessionmaker
 from sqlalchemy import Table, Column, MetaData, ForeignKey    
 from sqlalchemy import Integer, String, Float, Boolean, DateTime, Text, Binary
@@ -76,7 +77,6 @@
             raise ValueError(h_self.e_bad_table, pair_table)
 
         h_self._session_fn = Session
-        h_self._session = Session()
 
         class KeyVal (object):
             """KeyVal interfaces between python types and the database.
@@ -129,10 +129,15 @@
             handle - reference to L{DbHandle} (creator)
 
             """
-            def __init__(d_self):
-                s = h_self._session
-                s.save(d_self)
-                s.commit()
+            def __init__(d_self, session=None):
+                if session is None:
+                    s = h_self._session_fn()
+                    s.add(d_self) #d_self transient -> pending
+                    s.commit()    #d_self -> persistent
+                    s.close()     #d_self -> detached
+                else:
+                    s = session
+                    s.save(d_self)
 
             _forbidden_keys = set(['session'])
 
@@ -157,14 +162,27 @@
                         return a.val
                 raise KeyError(key)
 
-            def __setitem__(d_self, key, val):
-                s = h_self._session
-                d_self._set_in_session(key, val, s)
-                s.update(d_self)  #session update, not dict-like update
-                s.commit()
+            def __setitem__(d_self, key, val, session=None):
+                if session is None:
+                    s = h_self._session_fn()
+                    s.add(d_self)
+                    d_self._set_in_session(key, val, s)
+                    s.commit()
+                    s.close()
+                else:
+                    s = session
+                    s.add(d_self)
+                    d_self._set_in_session(key, val, s)
 
-            def __delitem__(d_self, key):
-                s = h_self._session
+            def __delitem__(d_self, key, session=None):
+                if session is None:
+                    s = h_self._session_fn()
+                    commit_close = True
+                else:
+                    s = session
+                    commit_close = False
+                s.add(d_self)
+
                 #find the item to delete in d_self._attrs
                 to_del = None
                 for i,a in enumerate(d_self._attrs):
@@ -177,8 +195,12 @@
                     i, a = to_del
                     s.delete(a)
                     del d_self._attrs[i]
-                s.commit()
-                s.update(d_self)
+                if commit_close:
+                    s.commit()
+                    #s.add(d_self) #why is this here? maybe it should go earlier??
+                    s.close()
+                else:
+                    pass
 
             def iteritems(d_self):
                 return d_self.items()
@@ -192,15 +214,24 @@
             def values(d_self):
                 return [kv.val for kv in d_self._attrs]
 
-            def update(d_self, dct, **kwargs):
+            def update(d_self, dct, session=None, **kwargs):
                 """Like dict.update(), set keys from kwargs"""
-                s = h_self._session
+                if session is None:
+                    s = h_self._session_fn()
+                    commit_close = True
+                else:
+                    s = session
+                    commit_close = False
                 for k, v in dct.items():
                     d_self._set_in_session(k, v, s)
                 for k, v in kwargs.items():
                     d_self._set_in_session(k, v, s)
-                s.update(d_self)
-                s.commit()
+
+                if commit_close:
+                    s.commit()
+                    s.close()
+                else:
+                    s.add(d_self) #why is this here? maybe it should go earlier??
 
             def get(d_self, key, default):
                 try:
@@ -222,10 +253,13 @@
                 
                 """
                 if session is None:
-                    session = h_self._session
+                    session = h_self._session_fn()
+                    session.add(d_self) #so session knows about us
                     session.refresh(d_self)
                     session.commit()
+                    session.close()
                 else:
+                    session.add(d_self) #so session knows about us
                     session.refresh(self.dbrow)
 
             def delete(d_self, session=None):
@@ -234,10 +268,13 @@
                 @param session: use the given session, and do not commit.
                 """
                 if session is None:
-                    session = h_self._session
-                    session.delete(d_self)
+                    session = h_self._session_fn()
+                    session.add(d_self) #so session knows about us
+                    session.delete(d_self) #mark for deletion
                     session.commit()
+                    session.close()
                 else:
+                    session.add(d_self) #so session knows about us
                     session.delete(d_self)
 
             # helper routine by update() and __setitem__
@@ -400,18 +437,16 @@
         if dct: rval.update(dct)
         return rval
 
-    def __get_query(h_self): 
+    def query(h_self, session):
         """Construct an SqlAlchemy query, which can be subsequently filtered
         using the instance methods of DbQuery"""
-
-        return h_self._Query(h_self._session.query(h_self._Dict)\
+        return h_self._Query(session.query(h_self._Dict)\
                         .options(eagerload('_attrs')))
-    query = property(__get_query)
 
     def createView(h_self, view):
 
-        s = h_self._session;
-        cols = [];
+        s = h_self.session()
+        cols = []
         
         for col in view.columns:
             if col.name is "id":
@@ -434,6 +469,7 @@
 
         h_self._engine.execute(viewsql);
         s.commit();
+        s.close()
 
         class MappedClass(object):
             pass
@@ -514,17 +550,12 @@
     engine = create_engine('sqlite:///%s' % filename, echo=False)
     return db_from_engine(engine, **kwargs)
 
-def postgres_db(user, password, host, database, echo=False, **kwargs):
+def postgres_db(user, password, host, database, echo=False, poolclass=sqlalchemy.pool.NullPool, **kwargs):
     """Create an engine to access a postgres_dbhandle
     """
     db_str ='postgres://%(user)s:%(password)s@%(host)s/%(database)s' % locals()
 
-    # should force the app release extra connections releasing
-    # connections should let us schedule more jobs, since each one
-    # operates autonomously most of the time, just checking the db
-    # rarely. TODO: optimize this for large numbers of jobs
-    pool_size = 0;
-    engine = create_engine(db_str, pool_size=pool_size, echo=echo)
+    engine = create_engine(db_str, pool_size=pool_size, echo=echo, poolclass=poolclass)
 
     return db_from_engine(engine, **kwargs)
 
--- a/pylearn/dbdict/sql.py	Sat Jan 17 21:23:36 2009 -0500
+++ b/pylearn/dbdict/sql.py	Sun Jan 18 21:14:14 2009 -0500
@@ -4,6 +4,7 @@
 import numpy.random
 
 import sqlalchemy
+import sqlalchemy.pool
 from sqlalchemy import create_engine, desc
 from sqlalchemy.orm import eagerload
 import psycopg2, psycopg2.extensions 
@@ -26,7 +27,7 @@
 
 _TEST_CONCURRENCY = False
 
-def postgres_serial(user, password, host, database, **kwargs):
+def postgres_serial(user, password, host, database, poolclass=sqlalchemy.pool.NullPool, **kwargs):
     """Return a DbHandle instance that communicates with a postgres database at transaction
     isolation_level 'SERIALIZABLE'.
 
@@ -46,7 +47,7 @@
         pool_size = 0
         this.engine = create_engine('postgres://'
                 ,creator=connect
-                ,pool_size=0 # should force the app release connections
+                ,poolclass=poolclass
                 )
 
     db = db_from_engine(this.engine, **kwargs)
@@ -68,7 +69,7 @@
     print >> sys.stderr, """#TODO: use the priority field, not the status."""
     print >> sys.stderr, """#TODO: ignore entries with key PUSH_ERROR."""
 
-    s = db._session
+    s = db.session() #open a new session
 
     # NB. we need the query and attribute update to be in the same transaction
     assert s.autocommit == False 
@@ -122,13 +123,14 @@
             wait = numpy.random.rand(1)*retry_max_sleep
             if verbose: print 'another process stole our dct. Waiting %f secs' % wait
             time.sleep(wait)
+    s.close()
     return dct
 
 def book_dct_non_postgres(db):
     print >> sys.stderr, """#TODO: use the priority field, not the status."""
     print >> sys.stderr, """#TODO: ignore entries with key self.push_error."""
 
-    return db.query(dbdict_status=START).first()
+    raise NotImplementedError()
 
 
 ###########
@@ -204,28 +206,33 @@
 # Queue
 ###########
 
-def insert_dict(jobdict, db, force_dup=False):
+def insert_dict(jobdict, db, force_dup=False, session=None):
     """Insert a new `job` dictionary into database `db`.
 
     :param force_dup: forces insertion even if an identical dictionary is already in the db
 
     """
     job = copy.copy(jobdict)
-    do_insert = force_dup or (None is db.query.filter_eq_dct(job).first())
+    if session is None:
+        s = db.session()
+        do_insert = force_dup or (None is db.query(s).filter_eq_dct(job).first())
+        s.close()
+    else:
+        do_insert = force_dup or (None is db.query(session).filter_eq_dct(job).first())
     if do_insert:
         job[STATUS] = START
         job[PRIORITY] = 1.0
-        return db.insert(job)
+        return db.insert(job, session=session)
     else:
         return None
 
-def insert_job(experiment_fn, state, db, force_dup=False):
+def insert_job(experiment_fn, state, db, force_dup=False, session=None):
     state = copy.copy(state)
     state[EXPERIMENT] = experiment_fn.__module__ + '.' + experiment_fn.__name__
-    return insert_dict(state, db, force_dup=force_dup)
+    return insert_dict(state, db, force_dup=force_dup, session=session)
 
 
-def add_experiments_to_db(jobs, db, verbose=0, add_dups=False, type_check=None):
+def add_experiments_to_db(jobs, db, verbose=0, add_dups=False, type_check=None, session=None):
     """Add experiments paramatrized by jobs[i] to database db.
 
     Default behaviour is to ignore jobs which are already in the database.
@@ -248,7 +255,12 @@
     rval = []
     for job in jobs:
         job = copy.copy(job)
-        do_insert = add_dups or (None is db.query.filter_eq_dct(job).first())
+        if session is None:
+            s = db.session()
+            do_insert = force_dup or (None is db.query(s).filter_eq_dct(job).first())
+            s.close()
+        else:
+            do_insert = force_dup or (None is db.query(session).filter_eq_dct(job).first())
 
         if do_insert:
             if type_check: