Mercurial > sqlpython
changeset 432:26b09e1481e7
beginning to reimplement threaded metadata discovery
author | catherine@dellzilla |
---|---|
date | Tue, 26 Jan 2010 12:43:13 -0500 |
parents | cac7333f9ff5 |
children | 439621c917c4 |
files | sqlpython/connections.py sqlpython/sqlpyPlus.py |
diffstat | 2 files changed, 124 insertions(+), 113 deletions(-) [+] |
line wrap: on
line diff
--- a/sqlpython/connections.py Tue Jan 26 09:46:16 2010 -0500 +++ b/sqlpython/connections.py Tue Jan 26 12:43:13 2010 -0500 @@ -3,7 +3,28 @@ import getpass import gerald import schemagroup +import time +import threading +class ObjectDescriptor(object): + def __init__(self, name, dbobj): + self.fullname = name + self.dbobj = dbobj + self.type = str(type(self.dbobj)).split()[-1].lower() + self.path = '%s/%s' % (self.type, self.fullname) + #self.type_path = '%s/' % self.dbobj.type + (self.owner, self.unqualified_name) = self.fullname.split('.') + self.owner = self.owner.lower() + def match_pattern(self, pattern, specific_owner=None): + return ( pattern.match(fullname) or + pattern.match(self.type) or + ((not specific_owner) and pattern.match(self.unqualified_name)) or + ((self.owner == specific_owner.lower()) and pattern.match(self.unqualified_name)) ) + +class GeraldPlaceholder(object): + current = False + complete = False + class DatabaseInstance(object): password = None uri = None @@ -14,7 +35,9 @@ if not self.parse_connect_uri(arg): self.parse_connect_arg(arg, opts) self.connection = self.new_connection() - #self.discover_schemas() + self.gerald = GeraldPlaceholder() + self.metadata_discovery_thread = MetadataDiscoveryThread(self) + self.metadata_discovery_thread.start() def parse_connect_uri(self, uri): results = self.connection_uri_parser.search(uri) @@ -50,10 +73,12 @@ return self.uri.split('?mode=')[0] def discover_schemas(self): - self.schemas = schemagroup.SchemaDict( - {}, rdbms = self.rdbms, user = self.username, - connection = self.connection, connection_string = self.gerald_uri()) - self.schemas.refresh_asynch() + self.gerald = self.gerald_class(self.username, self.gerald_uri()) + self.gerald.descriptions = {} + for (name, obj) in self.gerald.schema.items(): + self.gerald.descriptions[name] = ObjectDescriptor(name, obj) + self.gerald.current = True + self.gerald.complete = True def set_instance_number(self, instance_number): self.instance_number = instance_number @@ -69,6 +94,7 @@ try: import psycopg2 class PostgresDatabaseInstance(OpenSourceDatabaseInstance): + gerald_class = gerald.PostgresSchema rdbms = 'postgres' default_port = 5432 def assign_details(self, arg, opts): @@ -90,6 +116,7 @@ try: import MySQLdb class MySQLDatabaseInstance(OpenSourceDatabaseInstance): + gerald_class = gerald.MySQLSchema rdbms = 'mysql' default_port = 3306 def assign_details(self, arg, opts): @@ -106,6 +133,7 @@ import cx_Oracle class OracleDatabaseInstance(DatabaseInstance): + gerald_class = gerald.oracle_schema.User rdbms = 'oracle' connection_parser = re.compile('(?P<username>[^/\s]*)(/(?P<password>[^/\s]*))?@((?P<host>[^/\s:]*)(:(?P<port>\d{1,4}))?/)?(?P<db_name>[^/\s:]*)(\s+as\s+(?P<mode>sys(dba|oper)))?', re.IGNORECASE) @@ -136,7 +164,19 @@ except ImportError: class OracleDatabaseInstance(DatabaseInstance): pass - + +class MetadataDiscoveryThread(threading.Thread): + def __init__(self, db_instance): + threading.Thread.__init__(self) + self.db_instance = db_instance + def run(self): + self.db_instance.gerald = self.db_instance.gerald_class(self.db_instance.username, self.db_instance.gerald_uri()) + self.db_instance.gerald.descriptions = {} + for (name, obj) in self.db_instance.gerald.schema.items(): + self.db_instance.gerald.descriptions[name] = ObjectDescriptor(name, obj) + self.db_instance.gerald.current = True + self.db_instance.gerald.complete = True + rdbms_types = {'oracle': OracleDatabaseInstance, 'mysql': MySQLDatabaseInstance, 'postgres': PostgresDatabaseInstance} \ No newline at end of file
--- a/sqlpython/sqlpyPlus.py Tue Jan 26 09:46:16 2010 -0500 +++ b/sqlpython/sqlpyPlus.py Tue Jan 26 12:43:13 2010 -0500 @@ -28,7 +28,6 @@ import traceback, operator from cmd2 import Cmd, make_option, options, Statekeeper, Cmd2TestCase from output_templates import output_templates -from schemagroup import MetaData from metadata import metaqueries from plothandler import Plot from sqlpython import Parser, cx_Oracle, psycopg2 @@ -566,7 +565,7 @@ r'select\s+(.*)from', re.IGNORECASE | re.DOTALL | re.MULTILINE) def completedefault(self, text, line, begidx, endidx): - (username, schemas) = self.metadata() + (username, gerald_schema) = self.metadata() segment = completion.whichSegment(line) text = text.upper() if segment in ('select', 'where', 'having', 'set', 'order by', 'group by'): @@ -826,41 +825,35 @@ def _pull(self, arg, opts, vc=None): opts.exact = True statekeeper = Statekeeper(opts.dump and self, ('stdout',)) - (username, schemas) = self.metadata() + (username, gerald_schema) = self.metadata() try: - for metadata in self._matching_database_objects(arg, opts): - self.poutput(metadata.descriptor(qualified=True)) - txt = metadata.db_object.get_ddl() + for description in self._matching_database_objects(arg, opts): + self.poutput(description.path) + txt = description.dbobj.get_ddl() if opts.get('lines'): - txt = self._with_line_numbers(txt) - + txt = self._with_line_numbers(txt) if opts.dump: - path = os.path.join(metadata.schema_name.lower(), metadata.db_type.lower()) \ + path = os.path.join(description.owner.lower(), description.type.lower()) \ .replace(' ', '_') try: os.makedirs(path) except OSError: pass - filename = os.path.join(path, '%s.sql' % metadata.object_name.lower()) + filename = os.path.join(path, '%s.sql' % description.name.lower()) self.stdout = open(filename, 'w') if opts.get('num') is not None: txt = txt.splitlines() txt = centeredSlice(txt, center=opts.num+1, width=opts.width) txt = '\n'.join(txt) else: - txt = 'REMARK BEGIN %s\n%s\nREMARK END\n' % (metadata.descriptor(qualified=True), txt) + txt = 'REMARK BEGIN %s\n%s\nREMARK END\n' % (description.path, txt) self.poutput(txt) if opts.full: - # TODO: 'full' option for gerald metadata - - if opts.full: - for dependent_type in ('OBJECT_GRANT', 'CONSTRAINT', 'TRIGGER'): - try: - self.poutput('REMARK BEGIN\n%s\nREMARK END\n\n' % str(self.curs.callfunc('DBMS_METADATA.GET_DEPENDENT_DDL', cx_Oracle.CLOB, - [dependent_type, object_name, owner]))) - except cx_Oracle.DatabaseError: - pass + for dependent_type in ('constraints', 'triggers', 'indexes'): + if hasattr(description.dbobj, dependent_type): + for (depname, depobj) in getattr(description.dbobj, dependent_type).items(): + self.poutput('REMARK BEGIN\n%s\nREMARK END\n\n' % depobj.get_ddl()) if opts.dump: self.stdout.close() if vc: @@ -988,21 +981,21 @@ def do_find(self, arg, opts): """Finds argument in source code or (with -c) in column definitions.""" - seek = self._regex(arg, exact=opts.col) + seek = self._regex_form_of_search_pattern(arg, exact=opts.col) qualified = opts.get('all') - for m in self._matching_database_objects('*', opts): + for descrip in self._matching_database_objects('*', opts): if opts.col: - if hasattr(m.db_object, 'columns'): - for col in m.db_object.columns: + if hasattr(descrip.dbobj, 'columns'): + for col in descrip.dbobj.columns: if seek.search(col): - self.poutput('%s.%s' % (m.descriptor(qualified), col)) + self.poutput('%s.%s' % (m.fullname, col)) else: - if hasattr(m.db_object, 'source'): + if hasattr(descrip.dbobj, 'source'): name_printed = False - for (line_num, line) in m.db_object.source: + for (line_num, line) in descrip.dbobj.source: if seek.search(line): if not name_printed: - self.poutput(m.descriptor(qualified)) + self.poutput(descrip.fullname) name_printed = True self.poutput('%d: %s' % (line_num, line)) @@ -1035,17 +1028,17 @@ sortkey = operator.itemgetter('name') else: sortkey = operator.itemgetter('sequence') - for m in self._matching_database_objects(arg, opts): - self.tblname = m.descriptor(qualified=opts.get('all')) + for descrip in self._matching_database_objects(arg, opts): + self.tblname = descrip.fullname self.pfeedback(self.tblname) - if opts.long and hasattr(m.db_object, 'comments'): - if m.db_object.comments: - self.poutput(m.db_object.comments) - if hasattr(m.db_object, 'columns'): - cols = sorted(m.db_object.columns.values(), key=sortkey)[:rowlimit] - if opts.long and hasattr(m.db_object, 'constraints'): - primary_key_columns = self._key_columns(m.db_object, 'Primary') - unique_key_columns = self._key_columns(m.db_object, 'Unique') + if opts.long and hasattr(descrip.dbobj, 'comments'): + if descrip.dbobj.comments: + self.poutput(descrip.dbobj.comments) + if hasattr(descrip.dbobj, 'columns'): + cols = sorted(descrip.dbobj.columns.values(), key=sortkey)[:rowlimit] + if opts.long and hasattr(descrip.dbobj, 'constraints'): + primary_key_columns = self._key_columns(descrip.dbobj, 'Primary') + unique_key_columns = self._key_columns(descrip.dbobj, 'Unique') self.colnames = 'N Name Nullable Type Key Default Comments'.split() self.rows = [(col['sequence'], col['name'], (col['nullable'] and 'NULL') or 'NOT NULL', self._col_type_descriptor(col), @@ -1059,51 +1052,38 @@ for col in cols] self.coltypes = [str] * len(self.colnames) self.poutput(self.tabular_output(arg.parsed.terminator, self.tblname) + '\n\n') - elif hasattr(m.db_object, 'increment_by'): + elif hasattr(descrip.dbobj, 'increment_by'): self.colnames = 'name min_value max_value increment_by'.split() self.coltypes = [str, int, int, int] - self.rows = [(getattr(m.db_object, p) for p in self.colnames)] + self.rows = [(getattr(descrip.dbobj, p) for p in self.colnames)] self.poutput(self.tabular_output(arg.parsed.terminator, self.tblname) + '\n\n') - elif hasattr(m.db_object, 'source'): + elif hasattr(descrip.dbobj, 'source'): end_heading = re.compile(r'\bDECLARE|BEGIN\b', re.IGNORECASE) - for (index, (ln, line)) in enumerate(m.db_object.source): + for (index, (ln, line)) in enumerate(descrip.dbobj.source): if end_heading.search(line): break - self.poutput(''.join(l for (ln, l) in m.db_object.source[:index])) + self.poutput(''.join(l for (ln, l) in descrip.dbobj.source[:index])) @options([all_users_option]) def do_deps(self, arg, opts): '''Lists all objects that are dependent upon the object.''' - #TODO: Can this be Geraldized? - for obj in self._matching_database_objects(arg, opts): - obj.db_object.triggers - - if object_type == 'PACKAGE BODY': - q = "and (type != 'PACKAGE BODY' or name != :object_name)'" - object_type = 'PACKAGE' - else: - q = "" - q = """SELECT name, - type - from user_dependencies - where - referenced_name like :object_name - and referenced_type like :object_type - and referenced_owner like :owner - %s;""" % (q) - self.do_select(self.parsed(q, terminator=arg.parsed.terminator or ';', suffix=arg.parsed.suffix), - bindVarsIn={'object_name':object_name, 'object_type':object_type, 'owner':owner}) - + #TODO: doesn't account for views; don't know about primary keys + for descrip in self._matching_database_objects(arg, opts): + for deptype in ('indexes', 'constraints', 'triggers'): + if hasattr(descrip.dbobj, deptype): + for (depname, depobj) in getattr(descrip.dbobj, deptype).items(): + self.poutput('%s %s' % (deptype, depname)) + @options([all_users_option]) def do_comments(self, arg, opts): 'Prints comments on a table and its columns.' qualified = opts.get('all') - for m in self._matching_database_objects(arg, opts): - if hasattr(m.db_object, 'comments'): - self.poutput(m.descriptor(qualified)) - self.poutput(m.db_object.comments) - if hasattr(m.db_object, 'columns'): - columns = m.db_object.columns.values() + for descrip in self._matching_database_objects(arg, opts): + if hasattr(descrip.dbobj, 'comments'): + self.poutput(descrip.fullname) + self.poutput(descrip.dbobj.comments) + if hasattr(descrip.dbobj, 'columns'): + columns = descrip.dbobj.columns.values() columns.sort(key=operator.itemgetter('sequence')) for col in columns: comment = col.get('comment') @@ -1283,10 +1263,10 @@ def do__dir_(self, arg, opts, plural_name, str_function): long = opts.get('long') reverse = opts.get('reverse') or False - for obj in self._matching_database_objects(arg, opts): - if hasattr(obj.db_object, plural_name): - self.pfeedback('%s on %s' % (plural_name.title(), obj.descriptor(opts.get('all')))) - result = [str_function(depobj, long) for depobj in getattr(obj.db_object, plural_name).values()] + for descrip in self._matching_database_objects(arg, opts): + if hasattr(descrip.dbobj, plural_name): + self.pfeedback('%s on %s' % (plural_name.title(), descrip.fullname)) + result = [str_function(depobj, long) for depobj in getattr(descrip.dbobj, plural_name).values()] result.sort(reverse=opts.get('reverse') or False) self.poutput('\n'.join(result)) @@ -1522,7 +1502,7 @@ username = username.upper() elif self.rdbms == 'postgres': username = username.lower() - return (username, self.current_instance.schemas) + return (username, self.current_instance.gerald) def _to_sql_wildcards(self, original): return original.replace('*','%').replace('?','_') @@ -1532,7 +1512,7 @@ result = re.escape(original) return result.replace('\\*','.*').replace('\\?','.') - def _regex(self, s, exact=False): + def _regex_form_of_search_pattern(self, s, exact=False): seekpatt = r'[/\\]?%s[/\\]?' % ( s.replace('*', '.*').replace('?','.').replace('%', '.*')) if exact: @@ -1561,34 +1541,31 @@ else: schemas.refresh_one(username) + def _print_gerald_status_warning(self, gerald_schema): + if not gerald_schema.complete: + self.pfeedback('Metadata is not available yet - still gathering') + elif not gerald_schema.current: + self.pfeedback('Metadata is stale - requested refresh still incomplete') + def _matching_database_objects(self, arg, opts): - # jrrt.p* should work even if not --all # doesn't get java$options - (username, schemas) = self.metadata() - if opts.get('immediate'): + if opts.get('immediate'): #TODO if opts.get('all'): self.perror('Cannot combine --all with --immediate - operation takes too long') raise StopIteration else: self.pfeedback('Refreshing metadata for %s...' % username) schemas.refresh_one(username) - if schemas.complete == 0: - self.pfeedback('No data available yet - still gathering schema information') + + (username, gerald_schema) = self.metadata() + self._print_gerald_status_warning(gerald_schema) + if not gerald_schema.complete: raise StopIteration - elif (opts.get('all') and (schemas.complete != 'all')): - self.pfeedback('Results are incomplete - only %d schemas mapped so far' % schemas.complete) - seek = self._regex(arg, opts.get('exact')) - qualified = opts.get('all') or '.' in arg # TODO: is this working? - for (schema_name, schema) in schemas.items(): - if schema_name == username or opts.get('all'): - for (name, dbobj) in schema.schema.items(): - metadata = MetaData(object_name=name, schema_name=schema_name, db_object=dbobj) - if (not arg) or ( - seek.search(metadata.descriptor(qualified)) or - seek.search(metadata.name(qualified)) or - seek.search(metadata.db_type)): - yield metadata + seek = self._regex_form_of_search_pattern(arg, opts.get('exact')) + for (name, descrip) in gerald_schema.descriptions.items(): + if descrip.match_pattern(seek, specific_owner = ((not opts.all) and username)): + yield descrip @options(standard_options) def do_ls(self, arg, opts): @@ -1598,13 +1575,7 @@ ''' opts.exact = True (username, schemas) = self.metadata() - result = [] - for obj in self._matching_database_objects(arg, opts): - if opts.long: - result.append('%s' % (obj.descriptor(qualified=True))) - # if opts.long: status, last_ddl_time - else: - result.append(obj.descriptor(qualified=opts.all)) + result = [descrip.fullname for descrip in self._matching_database_objects(arg, opts)] if result: result.sort(reverse=bool(opts.reverse)) self.poutput('\n'.join(result)) @@ -1631,19 +1602,19 @@ re_pattern = re.compile(self._to_re_wildcards(pattern), (opts.ignorecase and re.IGNORECASE) or 0) for target in targets: - for m in self._matching_database_objects(target, opts): - self.pfeedback(m.descriptor(qualified=opts.all)) - if hasattr(m.db_object, 'columns'): + for descrip in self._matching_database_objects(target, opts): + self.pfeedback(descrip.fullname) + if hasattr(descrip.dbobj, 'columns'): clauses = [] - for col in m.db_object.columns: + for col in descrip.dbobj.columns: clauses.append(comparitor % (col, sql_pattern)) - sql = "SELECT * FROM %s WHERE 1=0\n%s;" % (m.object_name, ' '.join(clauses)) + sql = "SELECT * FROM %s WHERE 1=0\n%s;" % (descrip.dbobj, ' '.join(clauses)) sql = self.parsed(sql, terminator=arg.parsed.terminator or ';', suffix=arg.parsed.suffix) self.do_select(sql) - elif hasattr(m.db_object, 'source'): - for (line_num, line) in m.db_object.source: + elif hasattr(descrip.dbobj, 'source'): + for (line_num, line) in descrip.dbobj.source: if re_pattern.search(line): self.poutput('%4d: %s' % (line_num, line))