changeset 432:26b09e1481e7

beginning to reimplement threaded metadata discovery
author catherine@dellzilla
date Tue, 26 Jan 2010 12:43:13 -0500
parents cac7333f9ff5
children 439621c917c4
files sqlpython/connections.py sqlpython/sqlpyPlus.py
diffstat 2 files changed, 124 insertions(+), 113 deletions(-) [+]
line wrap: on
line diff
--- a/sqlpython/connections.py	Tue Jan 26 09:46:16 2010 -0500
+++ b/sqlpython/connections.py	Tue Jan 26 12:43:13 2010 -0500
@@ -3,7 +3,28 @@
 import getpass
 import gerald
 import schemagroup
+import time
+import threading
 
+class ObjectDescriptor(object):
+    def __init__(self, name, dbobj):
+        self.fullname = name
+        self.dbobj = dbobj
+        self.type = str(type(self.dbobj)).split()[-1].lower()
+        self.path = '%s/%s' % (self.type, self.fullname)
+        #self.type_path = '%s/' % self.dbobj.type
+        (self.owner, self.unqualified_name) = self.fullname.split('.')
+        self.owner = self.owner.lower()
+    def match_pattern(self, pattern, specific_owner=None):
+        return ( pattern.match(fullname) or 
+                  pattern.match(self.type) or 
+                  ((not specific_owner) and pattern.match(self.unqualified_name)) or
+                  ((self.owner == specific_owner.lower()) and pattern.match(self.unqualified_name)) )
+        
+class GeraldPlaceholder(object):
+    current = False
+    complete = False
+    
 class DatabaseInstance(object):
     password = None
     uri = None
@@ -14,7 +35,9 @@
         if not self.parse_connect_uri(arg):
             self.parse_connect_arg(arg, opts)
         self.connection = self.new_connection()
-        #self.discover_schemas()
+        self.gerald = GeraldPlaceholder()
+        self.metadata_discovery_thread = MetadataDiscoveryThread(self)
+        self.metadata_discovery_thread.start()
     
     def parse_connect_uri(self, uri):
         results = self.connection_uri_parser.search(uri)
@@ -50,10 +73,12 @@
         return self.uri.split('?mode=')[0]
         
     def discover_schemas(self):
-        self.schemas = schemagroup.SchemaDict(
-            {}, rdbms = self.rdbms, user = self.username, 
-            connection = self.connection, connection_string = self.gerald_uri())
-        self.schemas.refresh_asynch()
+        self.gerald = self.gerald_class(self.username, self.gerald_uri())
+        self.gerald.descriptions = {}
+        for (name, obj) in self.gerald.schema.items():
+            self.gerald.descriptions[name] = ObjectDescriptor(name, obj)            
+        self.gerald.current = True
+        self.gerald.complete = True
     
     def set_instance_number(self, instance_number):
         self.instance_number = instance_number
@@ -69,6 +94,7 @@
 try:
     import psycopg2
     class PostgresDatabaseInstance(OpenSourceDatabaseInstance):
+        gerald_class = gerald.PostgresSchema
         rdbms = 'postgres'
         default_port = 5432
         def assign_details(self, arg, opts):
@@ -90,6 +116,7 @@
 try:
     import MySQLdb
     class MySQLDatabaseInstance(OpenSourceDatabaseInstance):
+        gerald_class = gerald.MySQLSchema
         rdbms = 'mysql'
         default_port = 3306        
         def assign_details(self, arg, opts):
@@ -106,6 +133,7 @@
     import cx_Oracle
     
     class OracleDatabaseInstance(DatabaseInstance):
+        gerald_class = gerald.oracle_schema.User
         rdbms = 'oracle'
         connection_parser = re.compile('(?P<username>[^/\s]*)(/(?P<password>[^/\s]*))?@((?P<host>[^/\s:]*)(:(?P<port>\d{1,4}))?/)?(?P<db_name>[^/\s:]*)(\s+as\s+(?P<mode>sys(dba|oper)))?',
                                             re.IGNORECASE)
@@ -136,7 +164,19 @@
 except ImportError:
     class OracleDatabaseInstance(DatabaseInstance):
         pass
-                                       
+        
+class MetadataDiscoveryThread(threading.Thread):
+    def __init__(self, db_instance):
+        threading.Thread.__init__(self)
+        self.db_instance = db_instance
+    def run(self):
+        self.db_instance.gerald = self.db_instance.gerald_class(self.db_instance.username, self.db_instance.gerald_uri())
+        self.db_instance.gerald.descriptions = {}
+        for (name, obj) in self.db_instance.gerald.schema.items():
+            self.db_instance.gerald.descriptions[name] = ObjectDescriptor(name, obj)            
+        self.db_instance.gerald.current = True
+        self.db_instance.gerald.complete = True
+
 rdbms_types = {'oracle': OracleDatabaseInstance, 'mysql': MySQLDatabaseInstance, 'postgres': PostgresDatabaseInstance}
                   
         
\ No newline at end of file
--- a/sqlpython/sqlpyPlus.py	Tue Jan 26 09:46:16 2010 -0500
+++ b/sqlpython/sqlpyPlus.py	Tue Jan 26 12:43:13 2010 -0500
@@ -28,7 +28,6 @@
 import traceback, operator
 from cmd2 import Cmd, make_option, options, Statekeeper, Cmd2TestCase
 from output_templates import output_templates
-from schemagroup import MetaData
 from metadata import metaqueries
 from plothandler import Plot
 from sqlpython import Parser, cx_Oracle, psycopg2
@@ -566,7 +565,7 @@
         r'select\s+(.*)from',
         re.IGNORECASE | re.DOTALL | re.MULTILINE)        
     def completedefault(self, text, line, begidx, endidx):
-        (username, schemas) = self.metadata()
+        (username, gerald_schema) = self.metadata()
         segment = completion.whichSegment(line)        
         text = text.upper()
         if segment in ('select', 'where', 'having', 'set', 'order by', 'group by'):
@@ -826,41 +825,35 @@
     def _pull(self, arg, opts, vc=None):        
         opts.exact = True
         statekeeper = Statekeeper(opts.dump and self, ('stdout',))
-        (username, schemas) = self.metadata()
+        (username, gerald_schema) = self.metadata()
         try:
-            for metadata in self._matching_database_objects(arg, opts):
-                self.poutput(metadata.descriptor(qualified=True))
-                txt = metadata.db_object.get_ddl()
+            for description in self._matching_database_objects(arg, opts):                
+                self.poutput(description.path)
+                txt = description.dbobj.get_ddl()
                 if opts.get('lines'):
-                    txt = self._with_line_numbers(txt)
-    
+                    txt = self._with_line_numbers(txt)    
                 if opts.dump:
-                    path = os.path.join(metadata.schema_name.lower(), metadata.db_type.lower()) \
+                    path = os.path.join(description.owner.lower(), description.type.lower()) \
                            .replace(' ', '_')
                     try:
                         os.makedirs(path)
                     except OSError:
                         pass
-                    filename = os.path.join(path, '%s.sql' % metadata.object_name.lower())
+                    filename = os.path.join(path, '%s.sql' % description.name.lower())
                     self.stdout = open(filename, 'w')
                 if opts.get('num') is not None:
                     txt = txt.splitlines()
                     txt = centeredSlice(txt, center=opts.num+1, width=opts.width)
                     txt = '\n'.join(txt)
                 else:
-                    txt = 'REMARK BEGIN %s\n%s\nREMARK END\n' % (metadata.descriptor(qualified=True), txt)
+                    txt = 'REMARK BEGIN %s\n%s\nREMARK END\n' % (description.path, txt)
 
                 self.poutput(txt)
                 if opts.full:
-                    # TODO: 'full' option for gerald metadata
-            
-                    if opts.full:
-                        for dependent_type in ('OBJECT_GRANT', 'CONSTRAINT', 'TRIGGER'):        
-                            try:
-                                self.poutput('REMARK BEGIN\n%s\nREMARK END\n\n' % str(self.curs.callfunc('DBMS_METADATA.GET_DEPENDENT_DDL', cx_Oracle.CLOB,
-                                                                         [dependent_type, object_name, owner])))
-                            except cx_Oracle.DatabaseError:
-                                pass
+                    for dependent_type in ('constraints', 'triggers', 'indexes'):
+                        if hasattr(description.dbobj, dependent_type):
+                            for (depname, depobj) in getattr(description.dbobj, dependent_type).items():
+                                self.poutput('REMARK BEGIN\n%s\nREMARK END\n\n' % depobj.get_ddl())
                 if opts.dump:
                     self.stdout.close()
                     if vc:
@@ -988,21 +981,21 @@
     def do_find(self, arg, opts):
         """Finds argument in source code or (with -c) in column definitions."""
               
-        seek = self._regex(arg, exact=opts.col)
+        seek = self._regex_form_of_search_pattern(arg, exact=opts.col)
         qualified = opts.get('all')
-        for m in self._matching_database_objects('*', opts):
+        for descrip in self._matching_database_objects('*', opts):
             if opts.col:
-                if hasattr(m.db_object, 'columns'):
-                    for col in m.db_object.columns:
+                if hasattr(descrip.dbobj, 'columns'):
+                    for col in descrip.dbobj.columns:
                         if seek.search(col):
-                            self.poutput('%s.%s' % (m.descriptor(qualified), col))
+                            self.poutput('%s.%s' % (m.fullname, col))
             else:
-                if hasattr(m.db_object, 'source'):
+                if hasattr(descrip.dbobj, 'source'):
                     name_printed = False
-                    for (line_num, line) in m.db_object.source:
+                    for (line_num, line) in descrip.dbobj.source:
                         if seek.search(line):
                             if not name_printed:
-                                self.poutput(m.descriptor(qualified))
+                                self.poutput(descrip.fullname)
                                 name_printed = True
                             self.poutput('%d: %s' % (line_num, line))
             
@@ -1035,17 +1028,17 @@
             sortkey = operator.itemgetter('name')
         else:
             sortkey = operator.itemgetter('sequence')
-        for m in self._matching_database_objects(arg, opts):
-            self.tblname = m.descriptor(qualified=opts.get('all'))
+        for descrip in self._matching_database_objects(arg, opts):
+            self.tblname = descrip.fullname
             self.pfeedback(self.tblname)
-            if opts.long and hasattr(m.db_object, 'comments'):
-                if m.db_object.comments:
-                    self.poutput(m.db_object.comments) 
-            if hasattr(m.db_object, 'columns'):
-                cols = sorted(m.db_object.columns.values(), key=sortkey)[:rowlimit]
-                if opts.long and hasattr(m.db_object, 'constraints'):
-                    primary_key_columns = self._key_columns(m.db_object, 'Primary')
-                    unique_key_columns = self._key_columns(m.db_object, 'Unique')
+            if opts.long and hasattr(descrip.dbobj, 'comments'):
+                if descrip.dbobj.comments:
+                    self.poutput(descrip.dbobj.comments) 
+            if hasattr(descrip.dbobj, 'columns'):
+                cols = sorted(descrip.dbobj.columns.values(), key=sortkey)[:rowlimit]
+                if opts.long and hasattr(descrip.dbobj, 'constraints'):
+                    primary_key_columns = self._key_columns(descrip.dbobj, 'Primary')
+                    unique_key_columns = self._key_columns(descrip.dbobj, 'Unique')
                     self.colnames = 'N Name Nullable Type Key Default Comments'.split()
                     self.rows = [(col['sequence'], col['name'], (col['nullable'] and 'NULL') or 'NOT NULL',
                                   self._col_type_descriptor(col), 
@@ -1059,51 +1052,38 @@
                                  for col in cols]
                 self.coltypes = [str] * len(self.colnames)
                 self.poutput(self.tabular_output(arg.parsed.terminator, self.tblname) + '\n\n')
-            elif hasattr(m.db_object, 'increment_by'):
+            elif hasattr(descrip.dbobj, 'increment_by'):
                 self.colnames = 'name min_value max_value increment_by'.split()
                 self.coltypes = [str, int, int, int]
-                self.rows = [(getattr(m.db_object, p) for p in self.colnames)]
+                self.rows = [(getattr(descrip.dbobj, p) for p in self.colnames)]
                 self.poutput(self.tabular_output(arg.parsed.terminator, self.tblname) + '\n\n')
-            elif hasattr(m.db_object, 'source'):
+            elif hasattr(descrip.dbobj, 'source'):
                 end_heading = re.compile(r'\bDECLARE|BEGIN\b', re.IGNORECASE)
-                for (index, (ln, line)) in enumerate(m.db_object.source):
+                for (index, (ln, line)) in enumerate(descrip.dbobj.source):
                     if end_heading.search(line):
                         break
-                self.poutput(''.join(l for (ln, l) in m.db_object.source[:index]))
+                self.poutput(''.join(l for (ln, l) in descrip.dbobj.source[:index]))
                         
     @options([all_users_option])            
     def do_deps(self, arg, opts):
         '''Lists all objects that are dependent upon the object.'''
-        #TODO: Can this be Geraldized?
-        for obj in self._matching_database_objects(arg, opts):
-            obj.db_object.triggers
-
-            if object_type == 'PACKAGE BODY':
-                q = "and (type != 'PACKAGE BODY' or name != :object_name)'"
-                object_type = 'PACKAGE'
-            else:
-                q = ""
-            q = """SELECT name,
-              type
-              from user_dependencies
-              where
-              referenced_name like :object_name
-              and	referenced_type like :object_type
-              and	referenced_owner like :owner
-              %s;""" % (q)
-            self.do_select(self.parsed(q, terminator=arg.parsed.terminator or ';', suffix=arg.parsed.suffix), 
-                           bindVarsIn={'object_name':object_name, 'object_type':object_type, 'owner':owner})
-
+        #TODO: doesn't account for views; don't know about primary keys
+        for descrip in self._matching_database_objects(arg, opts):
+            for deptype in ('indexes', 'constraints', 'triggers'):
+                if hasattr(descrip.dbobj, deptype):
+                    for (depname, depobj) in getattr(descrip.dbobj, deptype).items():
+                        self.poutput('%s %s' % (deptype, depname))
+                
     @options([all_users_option])        
     def do_comments(self, arg, opts):
         'Prints comments on a table and its columns.'
         qualified = opts.get('all')
-        for m in self._matching_database_objects(arg, opts):
-            if hasattr(m.db_object, 'comments'):
-                self.poutput(m.descriptor(qualified))
-                self.poutput(m.db_object.comments)
-                if hasattr(m.db_object, 'columns'):
-                    columns = m.db_object.columns.values()
+        for descrip in self._matching_database_objects(arg, opts):
+            if hasattr(descrip.dbobj, 'comments'):
+                self.poutput(descrip.fullname)
+                self.poutput(descrip.dbobj.comments)
+                if hasattr(descrip.dbobj, 'columns'):
+                    columns = descrip.dbobj.columns.values()
                     columns.sort(key=operator.itemgetter('sequence'))
                     for col in columns:
                         comment = col.get('comment')
@@ -1283,10 +1263,10 @@
     def do__dir_(self, arg, opts, plural_name, str_function):
         long = opts.get('long')
         reverse = opts.get('reverse') or False
-        for obj in self._matching_database_objects(arg, opts):
-            if hasattr(obj.db_object, plural_name):
-                self.pfeedback('%s on %s' % (plural_name.title(), obj.descriptor(opts.get('all'))))
-                result = [str_function(depobj, long) for depobj in getattr(obj.db_object, plural_name).values()]
+        for descrip in self._matching_database_objects(arg, opts):
+            if hasattr(descrip.dbobj, plural_name):
+                self.pfeedback('%s on %s' % (plural_name.title(), descrip.fullname))
+                result = [str_function(depobj, long) for depobj in getattr(descrip.dbobj, plural_name).values()]
                 result.sort(reverse=opts.get('reverse') or False)
                 self.poutput('\n'.join(result))
         
@@ -1522,7 +1502,7 @@
             username = username.upper()
         elif self.rdbms == 'postgres':
             username = username.lower()
-        return (username, self.current_instance.schemas)
+        return (username, self.current_instance.gerald)
         
     def _to_sql_wildcards(self, original):
         return original.replace('*','%').replace('?','_')
@@ -1532,7 +1512,7 @@
         result = re.escape(original)
         return result.replace('\\*','.*').replace('\\?','.')
         
-    def _regex(self, s, exact=False):
+    def _regex_form_of_search_pattern(self, s, exact=False):
         seekpatt = r'[/\\]?%s[/\\]?' % (
             s.replace('*', '.*').replace('?','.').replace('%', '.*'))        
         if exact:
@@ -1561,34 +1541,31 @@
         else:
             schemas.refresh_one(username)
         
+    def _print_gerald_status_warning(self, gerald_schema):
+        if not gerald_schema.complete:
+            self.pfeedback('Metadata is not available yet - still gathering')
+        elif not gerald_schema.current:
+            self.pfeedback('Metadata is stale - requested refresh still incomplete')
+                            
     def _matching_database_objects(self, arg, opts):
-        # jrrt.p* should work even if not --all
         # doesn't get java$options
-        (username, schemas) = self.metadata()
-        if opts.get('immediate'):
+        if opts.get('immediate'): #TODO
             if opts.get('all'):
                 self.perror('Cannot combine --all with --immediate - operation takes too long')
                 raise StopIteration
             else:
                 self.pfeedback('Refreshing metadata for %s...' % username)
                 schemas.refresh_one(username)
-        if schemas.complete == 0:
-            self.pfeedback('No data available yet - still gathering schema information')
+
+        (username, gerald_schema) = self.metadata()                
+        self._print_gerald_status_warning(gerald_schema)
+        if not gerald_schema.complete:
             raise StopIteration
-        elif (opts.get('all') and (schemas.complete != 'all')):
-            self.pfeedback('Results are incomplete - only %d schemas mapped so far' % schemas.complete)        
         
-        seek = self._regex(arg, opts.get('exact'))
-        qualified = opts.get('all') or '.' in arg   # TODO: is this working?
-        for (schema_name, schema) in schemas.items():
-            if schema_name == username or opts.get('all'):
-                for (name, dbobj) in schema.schema.items():                
-                    metadata = MetaData(object_name=name, schema_name=schema_name, db_object=dbobj)
-                    if (not arg) or (
-                           seek.search(metadata.descriptor(qualified)) or 
-                           seek.search(metadata.name(qualified)) or 
-                           seek.search(metadata.db_type)):
-                        yield metadata
+        seek = self._regex_form_of_search_pattern(arg, opts.get('exact'))
+        for (name, descrip) in gerald_schema.descriptions.items():
+            if descrip.match_pattern(seek, specific_owner = ((not opts.all) and username)):
+                yield descrip
    
     @options(standard_options)
     def do_ls(self, arg, opts):
@@ -1598,13 +1575,7 @@
         '''
         opts.exact = True
         (username, schemas) = self.metadata()
-        result = []
-        for obj in self._matching_database_objects(arg, opts):
-            if opts.long:
-                result.append('%s' % (obj.descriptor(qualified=True)))
-                # if opts.long: status, last_ddl_time
-            else:
-                result.append(obj.descriptor(qualified=opts.all))
+        result = [descrip.fullname for descrip in self._matching_database_objects(arg, opts)]
         if result:
             result.sort(reverse=bool(opts.reverse))
             self.poutput('\n'.join(result))
@@ -1631,19 +1602,19 @@
         re_pattern = re.compile(self._to_re_wildcards(pattern), 
                                 (opts.ignorecase and re.IGNORECASE) or 0)
         for target in targets:
-            for m in self._matching_database_objects(target, opts):
-                self.pfeedback(m.descriptor(qualified=opts.all))
-                if hasattr(m.db_object, 'columns'):
+            for descrip in self._matching_database_objects(target, opts):
+                self.pfeedback(descrip.fullname)
+                if hasattr(descrip.dbobj, 'columns'):
                     clauses = []
-                    for col in m.db_object.columns:
+                    for col in descrip.dbobj.columns:
                         clauses.append(comparitor % (col, sql_pattern))
-                    sql = "SELECT * FROM %s WHERE 1=0\n%s;" % (m.object_name, ' '.join(clauses))
+                    sql = "SELECT * FROM %s WHERE 1=0\n%s;" % (descrip.dbobj, ' '.join(clauses))
                     sql = self.parsed(sql, 
                                           terminator=arg.parsed.terminator or ';',
                                           suffix=arg.parsed.suffix)
                     self.do_select(sql)
-                elif hasattr(m.db_object, 'source'):
-                    for (line_num, line) in m.db_object.source:
+                elif hasattr(descrip.dbobj, 'source'):
+                    for (line_num, line) in descrip.dbobj.source:
                         if re_pattern.search(line):
                             self.poutput('%4d: %s' % (line_num, line))