comparison cmd2.py @ 113:7d215852f9a6

still unpackaging
author catherine@Elli.myhome.westell.com
date Sat, 25 Oct 2008 19:34:16 -0400
parents cmd2/cmd2.py@e3b8eaadea56
children 33cd0e1bebb8
comparison
equal deleted inserted replaced
112:e3b8eaadea56 113:7d215852f9a6
1 """Variant on standard library's cmd with extra features.
2
3 To use, simply import cmd2.Cmd instead of cmd.Cmd; use precisely as though you
4 were using the standard library's cmd, while enjoying the extra features.
5
6 Searchable command history (commands: "hi", "li", "run")
7 Load commands from file, save to file, edit commands in file
8 Multi-line commands
9 Case-insensitive commands
10 Special-character shortcut commands (beyond cmd's "@" and "!")
11 Settable environment parameters
12 Parsing commands with `optparse` options (flags)
13 Redirection to file with >, >>; input from file with <
14 Easy transcript-based testing of applications (see example/example.py)
15
16 Note that redirection with > and | will only work if `self.stdout.write()`
17 is used in place of `print`. The standard library's `cmd` module is
18 written to use `self.stdout.write()`,
19
20 - Catherine Devlin, Jan 03 2008 - catherinedevlin.blogspot.com
21
22 CHANGES:
23 As of 0.3.0, options should be specified as `optparse` options. See README.txt.
24 flagReader.py options are still supported for backward compatibility
25 """
26 import cmd, re, os, sys, optparse, subprocess, tempfile, pyparsing, doctest, unittest
27 from optparse import make_option
28 __version__ = '0.4'
29
30 class OptionParser(optparse.OptionParser):
31 def exit(self, status=0, msg=None):
32 self.values._exit = True
33 if msg:
34 print msg
35
36 def error(self, msg):
37 """error(msg : string)
38
39 Print a usage message incorporating 'msg' to stderr and exit.
40 If you override this in a subclass, it should not return -- it
41 should either exit or raise an exception.
42 """
43 raise
44
45 def options(option_list):
46 def option_setup(func):
47 optionParser = OptionParser()
48 for opt in option_list:
49 optionParser.add_option(opt)
50 optionParser.set_usage("%s [options] arg" % func.__name__.strip('do_'))
51 def newFunc(instance, arg):
52 try:
53 opts, arg = optionParser.parse_args(arg.split())
54 arg = ' '.join(arg)
55 except (optparse.OptionValueError, optparse.BadOptionError,
56 optparse.OptionError, optparse.AmbiguousOptionError,
57 optparse.OptionConflictError), e:
58 print e
59 optionParser.print_help()
60 return
61 if hasattr(opts, '_exit'):
62 return None
63 result = func(instance, arg, opts)
64 return result
65 newFunc.__doc__ = '%s\n%s' % (func.__doc__, optionParser.format_help())
66 return newFunc
67 return option_setup
68
69 class PasteBufferError(EnvironmentError):
70 if sys.platform[:3] == 'win':
71 errmsg = """Redirecting to or from paste buffer requires pywin32
72 to be installed on operating system.
73 Download from http://sourceforge.net/projects/pywin32/"""
74 else:
75 errmsg = """Redirecting to or from paste buffer requires xclip
76 to be installed on operating system.
77 On Debian/Ubuntu, 'sudo apt-get install xclip' will install it."""
78 def __init__(self):
79 Exception.__init__(self, self.errmsg)
80
81 '''check here if functions exist; otherwise, stub out'''
82 pastebufferr = """Redirecting to or from paste buffer requires %s
83 to be installed on operating system.
84 %s"""
85 if subprocess.mswindows:
86 try:
87 import win32clipboard
88 def getPasteBuffer():
89 win32clipboard.OpenClipboard(0)
90 try:
91 result = win32clipboard.GetClipboardData()
92 except TypeError:
93 result = '' #non-text
94 win32clipboard.CloseClipboard()
95 return result
96 def writeToPasteBuffer(txt):
97 win32clipboard.OpenClipboard(0)
98 win32clipboard.EmptyClipboard()
99 win32clipboard.SetClipboardText(txt)
100 win32clipboard.CloseClipboard()
101 except ImportError:
102 def getPasteBuffer(*args):
103 raise OSError, pastebufferr % ('pywin32', 'Download from http://sourceforge.net/projects/pywin32/')
104 setPasteBuffer = getPasteBuffer
105 else:
106 can_clip = False
107 try:
108 subprocess.check_call('xclip -o -sel clip', shell=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE)
109 can_clip = True
110 except AttributeError: # check_call not defined, Python < 2.5
111 teststring = 'Testing for presence of xclip.'
112 xclipproc = subprocess.Popen('xclip -sel clip', shell=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE)
113 xclipproc.stdin.write(teststring)
114 xclipproc.stdin.close()
115 xclipproc = subprocess.Popen('xclip -o -sel clip', shell=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE)
116 if xclipproc.stdout.read() == teststring:
117 can_clip = True
118 except (subprocess.CalledProcessError, OSError, IOError):
119 pass
120 if can_clip:
121 def getPasteBuffer():
122 xclipproc = subprocess.Popen('xclip -o -sel clip', shell=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE)
123 return xclipproc.stdout.read()
124 def writeToPasteBuffer(txt):
125 xclipproc = subprocess.Popen('xclip -sel clip', shell=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE)
126 xclipproc.stdin.write(txt)
127 xclipproc.stdin.close()
128 # but we want it in both the "primary" and "mouse" clipboards
129 xclipproc = subprocess.Popen('xclip', shell=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE)
130 xclipproc.stdin.write(txt)
131 xclipproc.stdin.close()
132 else:
133 def getPasteBuffer(*args):
134 raise OSError, pastebufferr % ('xclip', 'On Debian/Ubuntu, install with "sudo apt-get install xclip"')
135 setPasteBuffer = getPasteBuffer
136 writeToPasteBuffer = getPasteBuffer
137
138 pyparsing.ParserElement.setDefaultWhitespaceChars(' \t')
139 def parseSearchResults(pattern, s):
140 generator = pattern.scanString(s)
141 try:
142 result, start, stop = generator.next()
143 result['before'], result['after'] = s[:start], s[stop:]
144 result['upToIncluding'] = s[:stop]
145 except StopIteration:
146 result = pyparsing.ParseResults('')
147 result['before'] = s
148 return result
149
150 class Cmd(cmd.Cmd):
151 echo = False
152 caseInsensitive = True
153 multilineCommands = []
154 continuationPrompt = '> '
155 shortcuts = {'?': 'help', '!': 'shell', '@': 'load'}
156 excludeFromHistory = '''run r list l history hi ed edit li eof'''.split()
157 noSpecialParse = 'set ed edit exit'.split()
158 defaultExtension = 'txt'
159 defaultFileName = 'command.txt'
160 editor = os.environ.get('EDITOR')
161 _STOP_AND_EXIT = 2
162 if not editor:
163 if sys.platform[:3] == 'win':
164 editor = 'notepad'
165 else:
166 for editor in ['gedit', 'kate', 'vim', 'emacs', 'nano', 'pico']:
167 if not os.system('which %s' % (editor)):
168 break
169
170 settable = ['prompt', 'continuationPrompt', 'defaultFileName', 'editor', 'caseInsensitive', 'echo']
171 _TO_PASTE_BUFFER = 1
172 def do_cmdenvironment(self, args):
173 self.stdout.write("""
174 Commands are %(casesensitive)scase-sensitive.
175 Commands may be terminated with: %(terminators)s
176 Settable parameters: %(settable)s
177 """ %
178 { 'casesensitive': ('not ' and self.caseInsensitive) or '',
179 'terminators': self.terminatorPattern,
180 'settable': ' '.join(self.settable)
181 })
182
183 def do_help(self, arg):
184 cmd.Cmd.do_help(self, arg)
185 try:
186 fn = getattr(self, 'do_' + arg)
187 if fn and fn.optionParser:
188 fn.optionParser.print_help(file=self.stdout)
189 except AttributeError:
190 pass
191
192 def __init__(self, *args, **kwargs):
193 cmd.Cmd.__init__(self, *args, **kwargs)
194 self.history = History()
195
196 def do_shortcuts(self, args):
197 """Lists single-key shortcuts available."""
198 result = "\n".join('%s: %s' % (sc[0], sc[1]) for sc in self.shortcuts.items())
199 self.stdout.write("Single-key shortcuts for other commands:\n%s\n" % (result))
200
201 terminatorPattern = ((pyparsing.Literal(';') ^ pyparsing.Literal('\n\n'))
202 ^ (pyparsing.Literal('\nEOF') + pyparsing.lineEnd))('terminator')
203 argSeparatorPattern = pyparsing.Word(pyparsing.printables)('command') \
204 + pyparsing.SkipTo(pyparsing.StringEnd())('args')
205 filenamePattern = pyparsing.Word(pyparsing.alphanums + '#$-_~{},.!:\\/')
206 integerPattern = pyparsing.Word(pyparsing.nums).setParseAction( lambda s,l,t: [ int(t[0]) ] )
207 pipePattern = pyparsing.Literal('|')('pipe') + pyparsing.restOfLine('pipeTo')
208 redirectOutPattern = (pyparsing.Literal('>>') ^ '>')('output') \
209 + pyparsing.Optional(filenamePattern)('outputTo')
210 redirectInPattern = pyparsing.Literal('<')('input') \
211 + pyparsing.Optional(filenamePattern)('inputFrom')
212 punctuationPattern = pipePattern ^ redirectInPattern ^ redirectOutPattern
213 for p in (terminatorPattern, pipePattern, redirectInPattern, redirectOutPattern, punctuationPattern):
214 p.ignore(pyparsing.sglQuotedString)
215 p.ignore(pyparsing.dblQuotedString)
216
217 def parsed(self, s):
218 '''
219 >>> c = Cmd()
220 >>> r = c.parsed('quotes "are > ignored" < inp.txt')
221 >>> r.statement, r.input, r.inputFrom, r.output, r.outputFrom
222 ('quotes "are > ignored" ', '<', 'inp.txt', '', '')
223 >>> r = c.parsed('very complex; < from.txt >> to.txt etc.')
224 >>> r.statement, r.terminator, r.input, r.inputFrom, r.output, r.outputTo
225 ('very complex;', ';', '<', 'from.txt', '>>', 'to.txt')
226 >>> c.parsed('nothing to parse').statement
227 'nothing to parse'
228 >>> r = c.parsed('ignore > within a terminated statement; > out.txt')
229 >>> r.statement, r.terminator, r.input, r.inputFrom, r.output, r.outputTo
230 ('ignore > within a terminated statement;', ';', '', '', '>', 'out.txt')
231 >>> r = c.parsed('send it to | sort | wc')
232 >>> r.statement, r.pipe, r.pipeTo
233 ('send it to ', '|', ' sort | wc')
234 >>> r = c.parsed('got from < thisfile.txt plus blah blah')
235 >>> r.statement, r.input, r.inputFrom
236 ('got from ', '<', 'thisfile.txt')
237 '''
238 if isinstance(s, pyparsing.ParseResults):
239 return s
240 result = (pyparsing.SkipTo(pyparsing.StringEnd()))('fullStatement').parseString(s)
241 command = s.split()[0]
242 if self.caseInsensitive:
243 command = command.lower()
244 '''if command in self.noSpecialParse:
245 result['statement'] = result['upToIncluding'] = result['unterminated'] = result.fullStatement
246 result['command'] = command
247 result['args'] = ' '.join(result.fullStatement.split()[1:])
248 return result'''
249 if s[0] in self.shortcuts:
250 s = self.shortcuts[s[0]] + ' ' + s[1:]
251 result['statement'] = s
252 result['parseable'] = s
253 result += parseSearchResults(self.terminatorPattern, s)
254 if result.terminator:
255 result['statement'] = result.upToIncluding
256 result['unterminated'] = result.before
257 result['parseable'] = result.after
258 else:
259 result += parseSearchResults(self.punctuationPattern, s)
260 result['statement'] = result['unterminated'] = result.before
261 result += parseSearchResults(self.pipePattern, result.parseable)
262 result += parseSearchResults(self.redirectInPattern, result.parseable)
263 result += parseSearchResults(self.redirectOutPattern, result.parseable)
264 result += parseSearchResults(self.argSeparatorPattern, result.statement)
265 if self.caseInsensitive:
266 result['command'] = result.command.lower()
267 result['statement'] = '%s %s' % (result.command, result.args)
268 return result
269
270 def extractCommand(self, statement):
271 try:
272 (command, args) = statement.split(None,1)
273 except ValueError:
274 (command, args) = statement, ''
275 if self.caseInsensitive:
276 command = command.lower()
277 return command, args
278
279 def onecmd(self, line, assumeComplete=False):
280 """Interpret the argument as though it had been typed in response
281 to the prompt.
282
283 This may be overridden, but should not normally need to be;
284 see the precmd() and postcmd() methods for useful execution hooks.
285 The return value is a flag indicating whether interpretation of
286 commands by the interpreter should stop.
287
288 """
289 line = line.strip()
290 if not line:
291 return
292 statement = self.parsed(line)
293 while (statement.command in self.multilineCommands) and not \
294 (statement.terminator or assumeComplete):
295 statement = self.parsed('%s\n%s' % (statement.fullStatement,
296 self.pseudo_raw_input(self.continuationPrompt)))
297
298 statekeeper = None
299 stop = 0
300 if statement.input:
301 if statement.inputFrom:
302 try:
303 newinput = open(statement.inputFrom, 'r').read()
304 except OSError, e:
305 print e
306 return 0
307 else:
308 newinput = getPasteBuffer()
309 start, end = self.redirectInPattern.scanString(statement.fullStatement).next()[1:]
310 return self.onecmd('%s%s%s' % (statement.fullStatement[:start],
311 newinput, statement.fullStatement[end:]))
312 if statement.pipe and statement.pipeTo:
313 redirect = subprocess.Popen(statement.pipeTo, shell=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE)
314 statekeeper = Statekeeper(self, ('stdout',))
315 self.stdout = redirect.stdin
316 elif statement.output:
317 statekeeper = Statekeeper(self, ('stdout',))
318 if statement.outputTo:
319 mode = 'w'
320 if statement.output == '>>':
321 mode = 'a'
322 try:
323 self.stdout = open(statement.outputTo, mode)
324 except OSError, e:
325 print e
326 return 0
327 else:
328 statekeeper = Statekeeper(self, ('stdout',))
329 self.stdout = tempfile.TemporaryFile()
330 if statement.output == '>>':
331 self.stdout.write(getPasteBuffer())
332 stop = cmd.Cmd.onecmd(self, statement.statement)
333 try:
334 if statement.command not in self.excludeFromHistory:
335 self.history.append(statement.fullStatement)
336 finally:
337 if statekeeper:
338 if statement.output and not statement.outputTo:
339 self.stdout.seek(0)
340 writeToPasteBuffer(self.stdout.read())
341 elif statement.pipe:
342 for result in redirect.communicate():
343 statekeeper.stdout.write(result or '')
344 self.stdout.close()
345 statekeeper.restore()
346
347 return stop
348
349 def pseudo_raw_input(self, prompt):
350 """copied from cmd's cmdloop; like raw_input, but accounts for changed stdin, stdout"""
351
352 if self.use_rawinput:
353 try:
354 line = raw_input(prompt)
355 except EOFError:
356 line = 'EOF'
357 else:
358 self.stdout.write(prompt)
359 self.stdout.flush()
360 line = self.stdin.readline()
361 if not len(line):
362 line = 'EOF'
363 else:
364 if line[-1] == '\n': # this was always true in Cmd
365 line = line[:-1]
366 return line
367
368 def cmdloop(self, intro=None):
369 """Repeatedly issue a prompt, accept input, parse an initial prefix
370 off the received input, and dispatch to action methods, passing them
371 the remainder of the line as argument.
372 """
373
374 # An almost perfect copy from Cmd; however, the pseudo_raw_input portion
375 # has been split out so that it can be called separately
376
377 self.preloop()
378 if self.use_rawinput and self.completekey:
379 try:
380 import readline
381 self.old_completer = readline.get_completer()
382 readline.set_completer(self.complete)
383 readline.parse_and_bind(self.completekey+": complete")
384 except ImportError:
385 pass
386 try:
387 if intro is not None:
388 self.intro = intro
389 if self.intro:
390 self.stdout.write(str(self.intro)+"\n")
391 stop = None
392 while not stop:
393 if self.cmdqueue:
394 line = self.cmdqueue.pop(0)
395 else:
396 line = self.pseudo_raw_input(self.prompt)
397 if (self.echo) and (isinstance(self.stdin, file)):
398 self.stdout.write(line + '\n')
399 line = self.precmd(line)
400 stop = self.onecmd(line)
401 stop = self.postcmd(stop, line)
402 self.postloop()
403 finally:
404 if self.use_rawinput and self.completekey:
405 try:
406 import readline
407 readline.set_completer(self.old_completer)
408 except ImportError:
409 pass
410 return stop
411
412 def do_EOF(self, arg):
413 return True
414 do_eof = do_EOF
415
416 def clean(self, s):
417 """cleans up a string"""
418 if self.caseInsensitive:
419 return s.strip().lower()
420 return s.strip()
421
422 def showParam(self, param):
423 param = self.clean(param)
424 if param in self.settable:
425 val = getattr(self, param)
426 self.stdout.write('%s: %s\n' % (param, str(getattr(self, param))))
427
428 def do_quit(self, arg):
429 return self._STOP_AND_EXIT
430 do_exit = do_quit
431 do_q = do_quit
432
433 def do_show(self, arg):
434 'Shows value of a parameter'
435 if arg.strip():
436 self.showParam(arg)
437 else:
438 for param in self.settable:
439 self.showParam(param)
440
441 def do_set(self, arg):
442 'Sets a parameter'
443 try:
444 paramName, val = arg.split(None, 1)
445 paramName = self.clean(paramName)
446 if paramName not in self.settable:
447 raise NotSettableError
448 currentVal = getattr(self, paramName)
449 if (val[0] == val[-1]) and val[0] in ("'", '"'):
450 val = val[1:-1]
451 else:
452 val = cast(currentVal, self.parsed(val).unterminated)
453 setattr(self, paramName, val)
454 self.stdout.write('%s - was: %s\nnow: %s\n' % (paramName, currentVal, val))
455 except (ValueError, AttributeError, NotSettableError), e:
456 self.do_show(arg)
457
458 def do_shell(self, arg):
459 'execute a command as if at the OS prompt.'
460 os.system(arg)
461
462 def do_history(self, arg):
463 """history [arg]: lists past commands issued
464
465 no arg -> list all
466 arg is integer -> list one history item, by index
467 arg is string -> string search
468 arg is /enclosed in forward-slashes/ -> regular expression search
469 """
470 if arg:
471 history = self.history.get(arg)
472 else:
473 history = self.history
474 for hi in history:
475 self.stdout.write(hi.pr())
476 def last_matching(self, arg):
477 try:
478 if arg:
479 return self.history.get(arg)[-1]
480 else:
481 return self.history[-1]
482 except:
483 return None
484 def do_list(self, arg):
485 """list [arg]: lists last command issued
486
487 no arg -> list absolute last
488 arg is integer -> list one history item, by index
489 - arg, arg - (integer) -> list up to or after #arg
490 arg is string -> list last command matching string search
491 arg is /enclosed in forward-slashes/ -> regular expression search
492 """
493 try:
494 self.stdout.write(self.last_matching(arg).pr())
495 except:
496 pass
497 do_hi = do_history
498 do_l = do_list
499 do_li = do_list
500
501 def do_ed(self, arg):
502 """ed: edit most recent command in text editor
503 ed [N]: edit numbered command from history
504 ed [filename]: edit specified file name
505
506 commands are run after editor is closed.
507 "set edit (program-name)" or set EDITOR environment variable
508 to control which editing program is used."""
509 if not self.editor:
510 print "please use 'set editor' to specify your text editing program of choice."
511 return
512 filename = self.defaultFileName
513 buffer = ''
514 try:
515 arg = int(arg)
516 buffer = self.last_matching(arg)
517 except:
518 if arg:
519 filename = arg
520 else:
521 buffer = self.last_matching(arg)
522
523 if buffer:
524 f = open(filename, 'w')
525 f.write(buffer or '')
526 f.close()
527
528 os.system('%s %s' % (self.editor, filename))
529 self.do__load(filename)
530 do_edit = do_ed
531
532 saveparser = (pyparsing.Optional(pyparsing.Word(pyparsing.nums)^'*')("idx") +
533 pyparsing.Optional(pyparsing.Word(pyparsing.printables))("fname") +
534 pyparsing.stringEnd)
535 def do_save(self, arg):
536 """`save [N] [filename.ext]`
537 Saves command from history to file.
538 N => Number of command (from history), or `*`;
539 most recent command if omitted"""
540
541 try:
542 args = self.saveparser.parseString(arg)
543 except pyparsing.ParseException:
544 print self.do_save.__doc__
545 return
546 fname = args.fname or self.defaultFileName
547 if args.idx == '*':
548 saveme = '\n\n'.join(self.history[:])
549 elif args.idx:
550 saveme = self.history[int(args.idx)-1]
551 else:
552 saveme = self.history[-1]
553 try:
554 f = open(fname, 'w')
555 f.write(saveme)
556 f.close()
557 print 'Saved to %s' % (fname)
558 except Exception, e:
559 print 'Error saving %s: %s' % (fname, str(e))
560
561 def do_load(self, fname=None):
562 """Runs command(s) from a file."""
563 if fname is None:
564 fname = self.defaultFileName
565 keepstate = Statekeeper(self, ('stdin','use_rawinput','prompt','continuationPrompt'))
566 if isinstance(fname, file):
567 self.stdin = fname
568 else:
569 try:
570 self.stdin = open(fname, 'r')
571 except IOError, e:
572 try:
573 self.stdin = open('%s.%s' % (fname, self.defaultExtension), 'r')
574 except IOError:
575 print 'Problem opening file %s: \n%s' % (fname, e)
576 keepstate.restore()
577 return
578 self.use_rawinput = False
579 self.prompt = self.continuationPrompt = ''
580 stop = self.cmdloop()
581 self.stdin.close()
582 keepstate.restore()
583 self.lastcmd = ''
584 return (stop == self._STOP_AND_EXIT) and self._STOP_AND_EXIT
585 do__load = do_load # avoid an unfortunate legacy use of do_load from sqlpython
586
587 def do_run(self, arg):
588 """run [arg]: re-runs an earlier command
589
590 no arg -> run most recent command
591 arg is integer -> run one history item, by index
592 arg is string -> run most recent command by string search
593 arg is /enclosed in forward-slashes/ -> run most recent by regex
594 """
595 'run [N]: runs the SQL that was run N commands ago'
596 runme = self.last_matching(arg)
597 print runme
598 if runme:
599 runme = self.precmd(runme)
600 stop = self.onecmd(runme)
601 stop = self.postcmd(stop, runme)
602 do_r = do_run
603
604 def fileimport(self, statement, source):
605 try:
606 f = open(source)
607 except IOError:
608 self.stdout.write("Couldn't read from file %s\n" % source)
609 return ''
610 data = f.read()
611 f.close()
612 return data
613
614 class HistoryItem(str):
615 def __init__(self, instr):
616 str.__init__(self, instr)
617 self.lowercase = self.lower()
618 self.idx = None
619 def pr(self):
620 return '-------------------------[%d]\n%s\n' % (self.idx, str(self))
621
622 class History(list):
623 rangeFrom = re.compile(r'^([\d])+\s*\-$')
624 def append(self, new):
625 new = HistoryItem(new)
626 list.append(self, new)
627 new.idx = len(self)
628 def extend(self, new):
629 for n in new:
630 self.append(n)
631 def get(self, getme):
632 try:
633 getme = int(getme)
634 if getme < 0:
635 return self[:(-1 * getme)]
636 else:
637 return [self[getme-1]]
638 except IndexError:
639 return []
640 except (ValueError, TypeError):
641 getme = getme.strip()
642 mtch = self.rangeFrom.search(getme)
643 if mtch:
644 return self[(int(mtch.group(1))-1):]
645 if getme.startswith(r'/') and getme.endswith(r'/'):
646 finder = re.compile(getme[1:-1], re.DOTALL | re.MULTILINE | re.IGNORECASE)
647 def isin(hi):
648 return finder.search(hi)
649 else:
650 def isin(hi):
651 return (getme.lower() in hi.lowercase)
652 return [itm for itm in self if isin(itm)]
653
654 class NotSettableError(Exception):
655 pass
656
657 def cast(current, new):
658 """Tries to force a new value into the same type as the current."""
659 typ = type(current)
660 if typ == bool:
661 try:
662 return bool(int(new))
663 except ValueError, TypeError:
664 pass
665 try:
666 new = new.lower()
667 except:
668 pass
669 if (new=='on') or (new[0] in ('y','t')):
670 return True
671 if (new=='off') or (new[0] in ('n','f')):
672 return False
673 else:
674 try:
675 return typ(new)
676 except:
677 pass
678 print "Problem setting parameter (now %s) to %s; incorrect type?" % (current, new)
679 return current
680
681 class Statekeeper(object):
682 def __init__(self, obj, attribs):
683 self.obj = obj
684 self.attribs = attribs
685 self.save()
686 def save(self):
687 for attrib in self.attribs:
688 setattr(self, attrib, getattr(self.obj, attrib))
689 def restore(self):
690 for attrib in self.attribs:
691 setattr(self.obj, attrib, getattr(self, attrib))
692
693 class Borg(object):
694 '''All instances of any Borg subclass will share state.
695 from Python Cookbook, 2nd Ed., recipe 6.16'''
696 _shared_state = {}
697 def __new__(cls, *a, **k):
698 obj = object.__new__(cls, *a, **k)
699 obj.__dict__ = cls._shared_state
700 return obj
701
702 class OutputTrap(Borg):
703 '''Instantiate an OutputTrap to divert/capture ALL stdout output. For use in unit testing.
704 Call `tearDown()` to return to normal output.'''
705 def __init__(self):
706 self.old_stdout = sys.stdout
707 self.trap = tempfile.TemporaryFile()
708 sys.stdout = self.trap
709 def read(self):
710 self.trap.seek(0)
711 result = self.trap.read()
712 self.trap.truncate(0)
713 return result.strip('\x00')
714 def tearDown(self):
715 sys.stdout = self.old_stdout
716
717 class TranscriptReader(object):
718 def __init__(self, cmdapp, filename='test_cmd2.txt'):
719 self.cmdapp = cmdapp
720 try:
721 tfile = open(filename)
722 self.transcript = tfile.read()
723 tfile.close()
724 except IOError:
725 self.transcript = ''
726 self.bookmark = 0
727 def refreshCommandFinder(self):
728 prompt = pyparsing.Suppress(pyparsing.lineStart + self.cmdapp.prompt)
729 continuationPrompt = pyparsing.Suppress(pyparsing.lineStart + self.cmdapp.continuationPrompt)
730 self.cmdtxtPattern = (prompt + pyparsing.restOfLine + pyparsing.ZeroOrMore(
731 pyparsing.lineEnd + continuationPrompt + pyparsing.restOfLine))("command")
732 def inputGenerator(self):
733 while True:
734 self.refreshCommandFinder()
735 (thiscmd, startpos, endpos) = self.cmdtxtPattern.scanString(self.transcript[self.bookmark:], maxMatches=1).next()
736 lineNum = self.transcript.count('\n', 0, self.bookmark+startpos) + 2
737 self.bookmark += endpos
738 yield (''.join(thiscmd.command), lineNum)
739 def nextExpected(self):
740 self.refreshCommandFinder()
741 try:
742 (thiscmd, startpos, endpos) = self.cmdtxtPattern.scanString(self.transcript[self.bookmark:], maxMatches=1).next()
743 result = self.transcript[self.bookmark:self.bookmark+startpos]
744 self.bookmark += startpos
745 return result
746 except StopIteration:
747 return self.transcript[self.bookmark:]
748
749 class Cmd2TestCase(unittest.TestCase):
750 '''Subclass this, setting CmdApp and transcriptFileName, to make a unittest.TestCase class
751 that will execute the commands in transcriptFileName and expect the results shown.
752 See example.py'''
753 # problem: this (raw) case gets called by unittest.main - we don't want it to be. hmm
754 CmdApp = None
755 transcriptFileName = ''
756 def setUp(self):
757 if self.CmdApp:
758 self.outputTrap = OutputTrap()
759 self.cmdapp = self.CmdApp()
760 self.transcriptReader = TranscriptReader(self.cmdapp, self.transcriptFileName)
761 def testall(self):
762 if self.CmdApp:
763 for (cmdInput, lineNum) in self.transcriptReader.inputGenerator():
764 self.cmdapp.onecmd(cmdInput)
765 result = self.outputTrap.read()
766 expected = self.transcriptReader.nextExpected()
767 self.assertEqual(self.stripByLine(result), self.stripByLine(expected),
768 '\nFile %s, line %d\nCommand was:\n%s\nExpected:\n%s\nGot:\n%s\n' %
769 (self.transcriptFileName, lineNum, cmdInput, expected, result))
770 def stripByLine(self, s):
771 lines = s.splitlines()
772 return '\n'.join(line.rstrip() for line in lines).strip()
773 def tearDown(self):
774 if self.CmdApp:
775 self.outputTrap.tearDown()
776
777 if __name__ == '__main__':
778 doctest.testmod()