189
|
1 import unittest, sys, tempfile, re, os.path, pyparsing
|
|
2 from sqlpyPlus import *
|
|
3
|
|
4 class Borg(object):
|
|
5 # from Python Cookbook, 2nd Ed., recipe 6.16
|
|
6 _shared_state = {}
|
|
7 def __new__(cls, *a, **k):
|
|
8 obj = object.__new__(cls, *a, **k)
|
|
9 obj.__dict__ = cls._shared_state
|
|
10 return obj
|
|
11
|
|
12 class OutputTrap(Borg):
|
|
13 old_stdout = sys.stdout
|
|
14 def __init__(self):
|
|
15 self.trap = tempfile.TemporaryFile()
|
|
16 sys.stdout = self.trap
|
|
17 def dump(self):
|
|
18 self.trap.seek(0)
|
|
19 result = self.trap.read()
|
|
20 self.trap.close()
|
|
21 self.trap = tempfile.TemporaryFile()
|
|
22 sys.stdout = self.trap
|
|
23 return result
|
|
24 def teardown(self):
|
|
25 sys.stdout = self.old_stdout
|
|
26
|
|
27 class TestSqlPyPlus(unittest.TestCase):
|
|
28 transcriptReader = re.compile('testdata@eqdev> (.*?)\n(.*?)(?=testdata@eqdev>)', re.DOTALL)
|
|
29 transcriptFileName = 'test_sqlpyPlus.txt'
|
|
30 def setUp(self):
|
|
31 self.outputTrap = OutputTrap()
|
|
32 transcriptFile = open(self.transcriptFileName)
|
|
33 self.transcript = transcriptFile.read()
|
|
34 transcriptFile.close()
|
|
35 self.directives = self.transcriptReader.finditer(self.transcript)
|
|
36 self.testsession = sqlpyPlus()
|
|
37 self.testsession.onecmd('connect ' + connectString)
|
|
38 self.transcriptReader = re.compile(
|
|
39 '%s(.*?)\n\n(.*?)(?=%s)' % (self.testsession.prompt, self.testsession.prompt), re.DOTALL)
|
|
40 self.commandCleaner = '\n%s' % (self.testsession.continuationPrompt)
|
|
41 def assertOutput(self, commandtext, expected, lineNum):
|
|
42 self.testsession.onecmd(commandtext)
|
|
43 result = self.outputTrap.dump()
|
|
44 self.assertEqual(expected.strip(), result.strip(),
|
|
45 '\nFile %s, line %d\nCommand was:\n%s\nExpected:\n%s\nGot:\n%s\n' %
|
|
46 (self.transcriptFileName, lineNum, commandtext, expected, result))
|
|
47 def testall(self):
|
|
48 for directive in self.directives:
|
|
49 (command, result) = directive.groups()
|
|
50 command = command.replace(self.commandCleaner, '\n')
|
|
51 self.assertOutput(command, result, lineNum=self.transcript.count('\n', 0, directive.start()))
|
|
52 def tearDown(self):
|
|
53 self.outputTrap.teardown()
|
|
54
|
|
55 try:
|
|
56 connectString = sys.argv.pop(1)
|
|
57 except IndexError:
|
|
58 print 'Usage: python %s username/password@oracleSID' % os.path.split(__file__)[-1]
|
|
59 sys.exit()
|
|
60 unittest.main()
|
|
61
|
|
62
|
|
63 def transcript(cmdapp, filename='test_sqlpyPlus.txt'):
|
|
64 tfile = open(filename)
|
|
65 txt = tfile.read()
|
|
66 tfile.close()
|
|
67 prompt = pyparsing.Suppress(pyparsing.lineStart + cmd.prompt)
|
|
68 continuationPrompt = pyparsing.Suppress(pyparsing.lineStart + cmd.continuationPrompt)
|
|
69 cmdtxtPattern = (prompt + pyparsing.restOfLine + pyparsing.ZeroOrMore(
|
|
70 pyparsing.lineEnd + continuationPrompt + pyparsing.restOfLine))("command")
|
|
71 previousStartPoint = 0
|
|
72 results = []
|
|
73 for onecmd in cmdtxtPattern.scanString(txt):
|
|
74 if len(results) > 0:
|
|
75 results[-1]['response'] = txt[previousStartPoint:onecmd[1]]
|
|
76 results.append({'command': ''.join(onecmd[0].command), 'response': txt[onecmd[2]:]})
|
|
77 previousStartPoint = onecmd[2] |