# HG changeset patch # User catherine.devlin@gmail.com # Date 1283770725 14400 # Node ID 58696fca94e1380935916c5afef36661edfc2d5e # Parent 5ef58666373efceb3a244cdf1cc6652bf4238e08 findAll working diff -r 5ef58666373e -r 58696fca94e1 sqlpython/connections.py --- a/sqlpython/connections.py Mon Sep 06 05:42:41 2010 -0400 +++ b/sqlpython/connections.py Mon Sep 06 06:58:45 2010 -0400 @@ -123,7 +123,7 @@ self.set_corrections() if not self.password: self.password = getpass.getpass() - #self.connect() + self.connect() def parse_connect_uri(self, uri): results = self.connection_uri_parser.search(uri) if results: @@ -158,23 +158,27 @@ self.instance_number = instance_number self.prompt = "%d:%s@%s> " % (self.instance_number, self.username, self.database) sqlname = pyparsing.Word(pyparsing.alphas + '$_#%*', pyparsing.alphanums + '$_#%*') - ls_parser = ( (pyparsing.Optional(sqlname + pyparsing.Suppress("/"), default="%")("owner") + - pyparsing.Optional(sqlname + pyparsing.Suppress("/"), default="%")("type") + - pyparsing.Optional(sqlname, default="%")("name") + + ls_parser = ( (pyparsing.Optional(sqlname("owner") + "/") + + pyparsing.Optional(sqlname("type") + "/") + + pyparsing.Optional(sqlname("name")) + pyparsing.stringEnd ) - | ( pyparsing.Optional(sqlname + pyparsing.Suppress("/"), default="%")("type") + - pyparsing.Optional(sqlname + pyparsing.Suppress("."), default="%")("owner") + - pyparsing.Optional(sqlname, default="%")("name") ) + - pyparsing.stringEnd ) + | ( pyparsing.Optional(sqlname("type") + "/") + + pyparsing.Optional(sqlname("owner") + ".") + + pyparsing.Optional(sqlname("name")) + + pyparsing.stringEnd )) def parse_identifier(self, identifier): """ >>> opts = OptionTestDummy(postgres=True, password='password') >>> db = DatabaseInstance('thedatabase theuser', opts) >>> result = db.parse_identifier('scott.pets') - >>> result.owner - 'scott' - >>> result.name - 'pets' + >>> (result.owner, result.type, result.name) + ('scott', '%', 'pets') + >>> result = db.parse_identifier('pets') + >>> (result.owner, result.type, result.name) + ('%', '%', 'pets') + >>> result = db.parse_identifier('pe*') + >>> (result.owner, result.type, result.name) + ('%', '%', 'pe%') >>> result = db.parse_identifier('scott/table/pets') >>> (result.owner, result.type, result.name) ('scott', 'table', 'pets') @@ -183,15 +187,30 @@ ('scott', 'table', 'pets') >>> result = db.parse_identifier('') >>> (result.owner, result.type, result.name) - ('', '', '') + ('%', '%', '%') >>> result = db.parse_identifier('table/scott.*') >>> (str(result.owner), str(result.type), str(result.name)) ('scott', 'table', '%') """ identifier = identifier.replace('*', '%') - result = self.ls_parser.parseString(identifier) + result = {'owner': '%', 'type': '%', 'name': '%'} + result.update(dict(self.ls_parser.parseString(identifier))) + return result + def findAll(self, target): + identifier = self.parse_identifier(target) + clauses = [] + for col in ('owner', 'type', 'name'): + if ('%' in identifier[col]) or ('_' in identifier[col]): + operator = 'LIKE' + else: + operator = '=' + clause = '%s %s' % (operator, self.bindSyntax(col)) + clauses.append(clause) + qry = self.all_object_qry % tuple(clauses) + if isinstance(self, MySQLInstance): + identifier = (identifier['owner'], identifier['type'], identifier['name']) + result = self.connection.cursor().execute(qry, identifier) return result - @@ -227,7 +246,9 @@ self.connection = MySQLdb.connect(host = self.hostname, user = self.username, passwd = self.password, db = self.database, port = self.port, sql_mode = 'ANSI') - + def bindSyntax(self, varname): + return '%s' + class PostgresInstance(DatabaseInstance): rdbms = 'postgres' default_port = 5432 @@ -240,6 +261,8 @@ self.connection = psycopg2.connect(host = self.hostname, user = self.username, password = self.password, database = self.database, port = self.port) + def bindSyntax(self, varname): + return '%%(%s)s' % varname class OracleInstance(DatabaseInstance): rdbms = 'oracle' @@ -276,10 +299,17 @@ def connect(self): self.connection = cx_Oracle.connect(user = self.username, password = self.password, dsn = self.dsn, mode = self.mode) - def findAll(self, target): - - pass - + all_object_qry = """SELECT owner, object_type, object_name + FROM all_objects + WHERE owner %s + AND object_type %s + AND object_name %s""" + def bindSyntax(self, varname): + return ':' + varname + if __name__ == '__main__': - doctest.testmod() + opts = OptionTestDummy(password='password') + db = DatabaseInstance('oracle://system:twttatl@orcl', opts) + print list(db.findAll('')) + #doctest.testmod()