# HG changeset patch # User James Bergstra # Date 1232331254 18000 # Node ID 7041749cf804d87cb268a8f3f3685fbcda0f6f03 # Parent 04752b23da8d50f48a835013acae00ded3359c94 untested changes to close sessions in api0 diff -r 04752b23da8d -r 7041749cf804 pylearn/dbdict/api0.py --- a/pylearn/dbdict/api0.py Sat Jan 17 21:23:36 2009 -0500 +++ b/pylearn/dbdict/api0.py Sun Jan 18 21:14:14 2009 -0500 @@ -1,4 +1,5 @@ from sqlalchemy import create_engine, desc +import sqlalchemy.pool from sqlalchemy.orm import sessionmaker from sqlalchemy import Table, Column, MetaData, ForeignKey from sqlalchemy import Integer, String, Float, Boolean, DateTime, Text, Binary @@ -76,7 +77,6 @@ raise ValueError(h_self.e_bad_table, pair_table) h_self._session_fn = Session - h_self._session = Session() class KeyVal (object): """KeyVal interfaces between python types and the database. @@ -129,10 +129,15 @@ handle - reference to L{DbHandle} (creator) """ - def __init__(d_self): - s = h_self._session - s.save(d_self) - s.commit() + def __init__(d_self, session=None): + if session is None: + s = h_self._session_fn() + s.add(d_self) #d_self transient -> pending + s.commit() #d_self -> persistent + s.close() #d_self -> detached + else: + s = session + s.save(d_self) _forbidden_keys = set(['session']) @@ -157,14 +162,27 @@ return a.val raise KeyError(key) - def __setitem__(d_self, key, val): - s = h_self._session - d_self._set_in_session(key, val, s) - s.update(d_self) #session update, not dict-like update - s.commit() + def __setitem__(d_self, key, val, session=None): + if session is None: + s = h_self._session_fn() + s.add(d_self) + d_self._set_in_session(key, val, s) + s.commit() + s.close() + else: + s = session + s.add(d_self) + d_self._set_in_session(key, val, s) - def __delitem__(d_self, key): - s = h_self._session + def __delitem__(d_self, key, session=None): + if session is None: + s = h_self._session_fn() + commit_close = True + else: + s = session + commit_close = False + s.add(d_self) + #find the item to delete in d_self._attrs to_del = None for i,a in enumerate(d_self._attrs): @@ -177,8 +195,12 @@ i, a = to_del s.delete(a) del d_self._attrs[i] - s.commit() - s.update(d_self) + if commit_close: + s.commit() + #s.add(d_self) #why is this here? maybe it should go earlier?? + s.close() + else: + pass def iteritems(d_self): return d_self.items() @@ -192,15 +214,24 @@ def values(d_self): return [kv.val for kv in d_self._attrs] - def update(d_self, dct, **kwargs): + def update(d_self, dct, session=None, **kwargs): """Like dict.update(), set keys from kwargs""" - s = h_self._session + if session is None: + s = h_self._session_fn() + commit_close = True + else: + s = session + commit_close = False for k, v in dct.items(): d_self._set_in_session(k, v, s) for k, v in kwargs.items(): d_self._set_in_session(k, v, s) - s.update(d_self) - s.commit() + + if commit_close: + s.commit() + s.close() + else: + s.add(d_self) #why is this here? maybe it should go earlier?? def get(d_self, key, default): try: @@ -222,10 +253,13 @@ """ if session is None: - session = h_self._session + session = h_self._session_fn() + session.add(d_self) #so session knows about us session.refresh(d_self) session.commit() + session.close() else: + session.add(d_self) #so session knows about us session.refresh(self.dbrow) def delete(d_self, session=None): @@ -234,10 +268,13 @@ @param session: use the given session, and do not commit. """ if session is None: - session = h_self._session - session.delete(d_self) + session = h_self._session_fn() + session.add(d_self) #so session knows about us + session.delete(d_self) #mark for deletion session.commit() + session.close() else: + session.add(d_self) #so session knows about us session.delete(d_self) # helper routine by update() and __setitem__ @@ -400,18 +437,16 @@ if dct: rval.update(dct) return rval - def __get_query(h_self): + def query(h_self, session): """Construct an SqlAlchemy query, which can be subsequently filtered using the instance methods of DbQuery""" - - return h_self._Query(h_self._session.query(h_self._Dict)\ + return h_self._Query(session.query(h_self._Dict)\ .options(eagerload('_attrs'))) - query = property(__get_query) def createView(h_self, view): - s = h_self._session; - cols = []; + s = h_self.session() + cols = [] for col in view.columns: if col.name is "id": @@ -434,6 +469,7 @@ h_self._engine.execute(viewsql); s.commit(); + s.close() class MappedClass(object): pass @@ -514,17 +550,12 @@ engine = create_engine('sqlite:///%s' % filename, echo=False) return db_from_engine(engine, **kwargs) -def postgres_db(user, password, host, database, echo=False, **kwargs): +def postgres_db(user, password, host, database, echo=False, poolclass=sqlalchemy.pool.NullPool, **kwargs): """Create an engine to access a postgres_dbhandle """ db_str ='postgres://%(user)s:%(password)s@%(host)s/%(database)s' % locals() - # should force the app release extra connections releasing - # connections should let us schedule more jobs, since each one - # operates autonomously most of the time, just checking the db - # rarely. TODO: optimize this for large numbers of jobs - pool_size = 0; - engine = create_engine(db_str, pool_size=pool_size, echo=echo) + engine = create_engine(db_str, pool_size=pool_size, echo=echo, poolclass=poolclass) return db_from_engine(engine, **kwargs) diff -r 04752b23da8d -r 7041749cf804 pylearn/dbdict/sql.py --- a/pylearn/dbdict/sql.py Sat Jan 17 21:23:36 2009 -0500 +++ b/pylearn/dbdict/sql.py Sun Jan 18 21:14:14 2009 -0500 @@ -4,6 +4,7 @@ import numpy.random import sqlalchemy +import sqlalchemy.pool from sqlalchemy import create_engine, desc from sqlalchemy.orm import eagerload import psycopg2, psycopg2.extensions @@ -26,7 +27,7 @@ _TEST_CONCURRENCY = False -def postgres_serial(user, password, host, database, **kwargs): +def postgres_serial(user, password, host, database, poolclass=sqlalchemy.pool.NullPool, **kwargs): """Return a DbHandle instance that communicates with a postgres database at transaction isolation_level 'SERIALIZABLE'. @@ -46,7 +47,7 @@ pool_size = 0 this.engine = create_engine('postgres://' ,creator=connect - ,pool_size=0 # should force the app release connections + ,poolclass=poolclass ) db = db_from_engine(this.engine, **kwargs) @@ -68,7 +69,7 @@ print >> sys.stderr, """#TODO: use the priority field, not the status.""" print >> sys.stderr, """#TODO: ignore entries with key PUSH_ERROR.""" - s = db._session + s = db.session() #open a new session # NB. we need the query and attribute update to be in the same transaction assert s.autocommit == False @@ -122,13 +123,14 @@ wait = numpy.random.rand(1)*retry_max_sleep if verbose: print 'another process stole our dct. Waiting %f secs' % wait time.sleep(wait) + s.close() return dct def book_dct_non_postgres(db): print >> sys.stderr, """#TODO: use the priority field, not the status.""" print >> sys.stderr, """#TODO: ignore entries with key self.push_error.""" - return db.query(dbdict_status=START).first() + raise NotImplementedError() ########### @@ -204,28 +206,33 @@ # Queue ########### -def insert_dict(jobdict, db, force_dup=False): +def insert_dict(jobdict, db, force_dup=False, session=None): """Insert a new `job` dictionary into database `db`. :param force_dup: forces insertion even if an identical dictionary is already in the db """ job = copy.copy(jobdict) - do_insert = force_dup or (None is db.query.filter_eq_dct(job).first()) + if session is None: + s = db.session() + do_insert = force_dup or (None is db.query(s).filter_eq_dct(job).first()) + s.close() + else: + do_insert = force_dup or (None is db.query(session).filter_eq_dct(job).first()) if do_insert: job[STATUS] = START job[PRIORITY] = 1.0 - return db.insert(job) + return db.insert(job, session=session) else: return None -def insert_job(experiment_fn, state, db, force_dup=False): +def insert_job(experiment_fn, state, db, force_dup=False, session=None): state = copy.copy(state) state[EXPERIMENT] = experiment_fn.__module__ + '.' + experiment_fn.__name__ - return insert_dict(state, db, force_dup=force_dup) + return insert_dict(state, db, force_dup=force_dup, session=session) -def add_experiments_to_db(jobs, db, verbose=0, add_dups=False, type_check=None): +def add_experiments_to_db(jobs, db, verbose=0, add_dups=False, type_check=None, session=None): """Add experiments paramatrized by jobs[i] to database db. Default behaviour is to ignore jobs which are already in the database. @@ -248,7 +255,12 @@ rval = [] for job in jobs: job = copy.copy(job) - do_insert = add_dups or (None is db.query.filter_eq_dct(job).first()) + if session is None: + s = db.session() + do_insert = force_dup or (None is db.query(s).filter_eq_dct(job).first()) + s.close() + else: + do_insert = force_dup or (None is db.query(session).filter_eq_dct(job).first()) if do_insert: if type_check: