1 """Variant on standard library's cmd with extra features.
3 To use, simply import cmd2.Cmd instead of cmd.Cmd; use precisely as though you
4 were using the standard library's cmd, while enjoying the extra features.
6 Searchable command history (commands: "hi", "li", "run")
7 Load commands from file, save to file, edit commands in file
8 Multi-line commands
9 Case-insensitive commands
10 Special-character shortcut commands (beyond cmd's "@" and "!")
11 Settable environment parameters
12 Optional _onchange_{paramname} called when environment parameter changes
13 Parsing commands with `optparse` options (flags)
14 Redirection to file with >, >>; input from file with <
15 Easy transcript-based testing of applications (see example/example.py)
17 Note that redirection with > and | will only work if `self.stdout.write()`
18 is used in place of `print`. The standard library's `cmd` module is
19 written to use `self.stdout.write()`,
21 - Catherine Devlin, Jan 03 2008 - catherinedevlin.blogspot.com
23 mercurial repository at http://www.assembla.com/wiki/show/python-cmd2
25 As of 0.3.0, options should be specified as `optparse` options. See README.txt.
26 flagReader.py options are still supported for backward compatibility
27 """
28 import cmd, re, os, sys, optparse, subprocess, tempfile, pyparsing, doctest
29 import unittest, string, datetime, urllib, inspect, code
30 from optparse import make_option
31 __version__ = '0.4.8'
33 class OptionParser(optparse.OptionParser):
34 def exit(self, status=0, msg=None):
35 self.values._exit = True
36 if msg:
37 print msg
39 def print_help(self, *args, **kwargs):
40 # now, I need to call help of the calling function. Hmm.
41 try:
42 print self._func.__doc__
43 except AttributeError:
44 pass
45 optparse.OptionParser.print_help(self, *args, **kwargs)
47 def error(self, msg):
48 """error(msg : string)
50 Print a usage message incorporating 'msg' to stderr and exit.
51 If you override this in a subclass, it should not return -- it
52 should either exit or raise an exception.
53 """
54 raise
56 def remainingArgs(oldArgs, newArgList):
57 '''
58 >>> remainingArgs('-f bar bar cow', ['bar', 'cow'])
59 'bar cow'
60 '''
61 pattern = '\s+'.join(re.escape(a) for a in newArgList) + '\s*$'
62 matchObj = re.search(pattern, oldArgs)
63 return oldArgs[matchObj.start():]
65 def options(option_list):
66 def option_setup(func):
67 optionParser = OptionParser()
68 for opt in option_list:
69 optionParser.add_option(opt)
70 optionParser.set_usage("%s [options] arg" % func.__name__.strip('do_'))
71 optionParser._func = func
72 def newFunc(instance, arg):
73 try:
74 opts, newArgList = optionParser.parse_args(arg.split()) # doesn't understand quoted strings shouldn't be dissected!
75 newArgs = remainingArgs(arg, newArgList) # should it permit flags after args?
76 except (optparse.OptionValueError, optparse.BadOptionError,
77 optparse.OptionError, optparse.AmbiguousOptionError,
78 optparse.OptionConflictError), e:
79 print e
80 optionParser.print_help()
81 return
82 if hasattr(opts, '_exit'):
83 return None
84 if hasattr(arg, 'parser'):
85 terminator = arg.parsed.terminator
86 try:
87 if arg.parsed.terminator[0] == '\n':
88 terminator = arg.parsed.terminator[0]
89 except IndexError:
90 pass
91 arg = arg.parser('%s %s%s%s' % (arg.parsed.command, newArgs,
92 terminator, arg.parsed.suffix))
93 else:
94 arg = newArgs
95 result = func(instance, arg, opts)
96 return result
97 newFunc.__doc__ = '%s\n%s' % (func.__doc__, optionParser.format_help())
98 return newFunc
99 return option_setup
101 class PasteBufferError(EnvironmentError):
102 if sys.platform[:3] == 'win':
103 errmsg = """Redirecting to or from paste buffer requires pywin32
104 to be installed on operating system.
105 Download from http://sourceforge.net/projects/pywin32/"""
106 else:
107 errmsg = """Redirecting to or from paste buffer requires xclip
108 to be installed on operating system.
109 On Debian/Ubuntu, 'sudo apt-get install xclip' will install it."""
110 def __init__(self):
111 Exception.__init__(self, self.errmsg)
113 '''check here if functions exist; otherwise, stub out'''
114 pastebufferr = """Redirecting to or from paste buffer requires %s
115 to be installed on operating system.
116 %s"""
117 if subprocess.mswindows:
118 try:
119 import win32clipboard
120 def getPasteBuffer():
121 win32clipboard.OpenClipboard(0)
122 try:
123 result = win32clipboard.GetClipboardData()
124 except TypeError:
125 result = '' #non-text
126 win32clipboard.CloseClipboard()
127 return result
128 def writeToPasteBuffer(txt):
129 win32clipboard.OpenClipboard(0)
130 win32clipboard.EmptyClipboard()
131 win32clipboard.SetClipboardText(txt)
132 win32clipboard.CloseClipboard()
133 except ImportError:
134 def getPasteBuffer(*args):
135 raise OSError, pastebufferr % ('pywin32', 'Download from http://sourceforge.net/projects/pywin32/')
136 setPasteBuffer = getPasteBuffer
137 else:
138 can_clip = False
139 try:
140 subprocess.check_call('xclip -o -sel clip', shell=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE)
141 can_clip = True
142 except AttributeError: # check_call not defined, Python < 2.5
143 teststring = 'Testing for presence of xclip.'
144 xclipproc = subprocess.Popen('xclip -sel clip', shell=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE)
145 xclipproc.stdin.write(teststring)
146 xclipproc.stdin.close()
147 xclipproc = subprocess.Popen('xclip -o -sel clip', shell=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE)
148 if xclipproc.stdout.read() == teststring:
149 can_clip = True
150 except (subprocess.CalledProcessError, OSError, IOError):
151 pass
152 if can_clip:
153 def getPasteBuffer():
154 xclipproc = subprocess.Popen('xclip -o -sel clip', shell=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE)
155 return xclipproc.stdout.read()
156 def writeToPasteBuffer(txt):
157 xclipproc = subprocess.Popen('xclip -sel clip', shell=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE)
158 xclipproc.stdin.write(txt)
159 xclipproc.stdin.close()
160 # but we want it in both the "primary" and "mouse" clipboards
161 xclipproc = subprocess.Popen('xclip', shell=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE)
162 xclipproc.stdin.write(txt)
163 xclipproc.stdin.close()
164 else:
165 def getPasteBuffer(*args):
166 raise OSError, pastebufferr % ('xclip', 'On Debian/Ubuntu, install with "sudo apt-get install xclip"')
167 setPasteBuffer = getPasteBuffer
168 writeToPasteBuffer = getPasteBuffer
170 pyparsing.ParserElement.setDefaultWhitespaceChars(' \t')
172 class ParsedString(str):
173 pass
175 class SkipToLast(pyparsing.SkipTo):
176 def parseImpl( self, instring, loc, doActions=True ):
177 resultStore = []
178 startLoc = loc
179 instrlen = len(instring)
180 expr = self.expr
181 failParse = False
182 while loc <= instrlen:
183 try:
184 if self.failOn:
185 failParse = True
186 self.failOn.tryParse(instring, loc)
187 failParse = False
188 loc = expr._skipIgnorables( instring, loc )
189 expr._parse( instring, loc, doActions=False, callPreParse=False )
190 skipText = instring[startLoc:loc]
191 if self.includeMatch:
192 loc,mat = expr._parse(instring,loc,doActions,callPreParse=False)
193 if mat:
194 skipRes = ParseResults( skipText )
195 skipRes += mat
196 resultStore.append((loc, [ skipRes ]))
197 else:
198 resultStore,append((loc, [ skipText ]))
199 else:
200 resultStore.append((loc, [ skipText ]))
201 loc += 1
202 except (pyparsing.ParseException,IndexError):
203 if failParse:
204 raise
205 else:
206 loc += 1
207 if resultStore:
208 return resultStore[-1]
209 else:
210 exc = self.myException
211 exc.loc = loc
212 exc.pstr = instring
213 raise exc
215 def replace_with_file_contents(fname):
216 if fname:
217 try:
218 result = open(os.path.expanduser(fname[0])).read()
219 except IOError:
220 result = '< %s' % fname[0] # wasn't a file after all
221 else:
222 result = getPasteBuffer()
223 return result
225 class EmbeddedConsoleExit(Exception):
226 pass
228 class InteractiveConsole(code.InteractiveConsole):
229 def runcode(self, code):
230 """Execute a code object.
232 When an exception occurs, self.showtraceback() is called to
233 display a traceback. All exceptions are caught except
234 SystemExit, which is reraised.
236 A note about KeyboardInterrupt: this exception may occur
237 elsewhere in this code, and may not always be caught. The
238 caller should be prepared to deal with it.
240 Copied directly from code.InteractiveInterpreter, except for
241 EmbeddedConsoleExit exceptions.
242 """
243 try:
244 exec code in self.locals
245 except (SystemExit, EmbeddedConsoleExit):
246 raise
247 except:
248 self.showtraceback()
249 else:
250 if code.softspace(sys.stdout, 0):
251 print
253 class Cmd(cmd.Cmd):
254 echo = False
255 case_insensitive = True
256 continuation_prompt = '> '
257 timing = False
258 legalChars = '!#$%.:?@_' + pyparsing.alphanums + pyparsing.alphas8bit # make sure your terminators are not in here!
259 shortcuts = {'?': 'help', '!': 'shell', '@': 'load' }
260 excludeFromHistory = '''run r list l history hi ed edit li eof'''.split()
261 noSpecialParse = 'set ed edit exit'.split()
262 defaultExtension = 'txt'
263 default_file_name = 'command.txt'
264 abbrev = True
265 nonpythoncommand = 'cmd'
266 settable = ['prompt', 'continuation_prompt', 'default_file_name', 'editor',
267 'case_insensitive', 'echo', 'timing', 'abbrev']
268 settable.sort()
270 editor = os.environ.get('EDITOR')
271 _STOP_AND_EXIT = 2
272 if not editor:
273 if sys.platform[:3] == 'win':
274 editor = 'notepad'
275 else:
276 for editor in ['gedit', 'kate', 'vim', 'emacs', 'nano', 'pico']:
277 if not os.system('which %s' % (editor)):
278 break
280 def do_cmdenvironment(self, args):
281 '''Summary report of interactive parameters.'''
282 self.stdout.write("""
283 Commands are %(casesensitive)scase-sensitive.
284 Commands may be terminated with: %(terminators)s
285 Settable parameters: %(settable)s
286 """ %
287 { 'casesensitive': (self.case_insensitive and 'not ') or '',
288 'terminators': str(self.terminators),
289 'settable': ' '.join(self.settable)
290 })
292 def do_help(self, arg):
293 try:
294 fn = getattr(self, 'do_' + arg)
295 if fn and fn.optionParser:
296 fn.optionParser.print_help(file=self.stdout)
297 return
298 except AttributeError:
299 pass
300 cmd.Cmd.do_help(self, arg)
302 def __init__(self, *args, **kwargs):
303 cmd.Cmd.__init__(self, *args, **kwargs)
304 self.history = History()
305 self._init_parser()
306 self.pystate = {}
308 def do_shortcuts(self, args):
309 """Lists single-key shortcuts available."""
310 result = "\n".join('%s: %s' % (sc[0], sc[1]) for sc in self.shortcuts.items())
311 self.stdout.write("Single-key shortcuts for other commands:\n%s\n" % (result))
313 prefixParser = pyparsing.Empty()
314 commentGrammars = pyparsing.Or([pyparsing.pythonStyleComment, pyparsing.cStyleComment])
315 commentGrammars.addParseAction(lambda x: '')
316 commentInProgress = ((pyparsing.White() | pyparsing.lineStart) +
317 pyparsing.Literal('/*') + pyparsing.SkipTo(pyparsing.stringEnd))
318 # `blah/*` means `everything in directory `blah`, not comment
319 terminators = [';']
320 blankLinesAllowed = False
321 multilineCommands = []
323 def _init_parser(self):
324 r'''
325 >>> c = Cmd()
326 >>> c.multilineCommands = ['multiline']
327 >>> c.case_insensitive = True
328 >>> c._init_parser()
329 >>> print c.parser.parseString('').dump()
330 []
331 >>> print c.parser.parseString('/* empty command */').dump()
332 []
333 >>> print c.parser.parseString('plainword').dump()
334 ['plainword', '']
335 - command: plainword
336 - statement: ['plainword', '']
337 - command: plainword
338 >>> print c.parser.parseString('termbare;').dump()
339 ['termbare', '', ';', '']
340 - command: termbare
341 - statement: ['termbare', '', ';']
342 - command: termbare
343 - terminator: ;
344 - terminator: ;
345 >>> print c.parser.parseString('termbare; suffx').dump()
346 ['termbare', '', ';', 'suffx']
347 - command: termbare
348 - statement: ['termbare', '', ';']
349 - command: termbare
350 - terminator: ;
351 - suffix: suffx
352 - terminator: ;
353 >>> print c.parser.parseString('barecommand').dump()
354 ['barecommand', '']
355 - command: barecommand
356 - statement: ['barecommand', '']
357 - command: barecommand
358 >>> print c.parser.parseString('COMmand with args').dump()
359 ['command', 'with args']
360 - args: with args
361 - command: command
362 - statement: ['command', 'with args']
363 - args: with args
364 - command: command
365 >>> print c.parser.parseString('command with args and terminator; and suffix').dump()
366 ['command', 'with args and terminator', ';', 'and suffix']
367 - args: with args and terminator
368 - command: command
369 - statement: ['command', 'with args and terminator', ';']
370 - args: with args and terminator
371 - command: command
372 - terminator: ;
373 - suffix: and suffix
374 - terminator: ;
375 >>> print c.parser.parseString('simple | piped').dump()
376 ['simple', '', '|', ' piped']
377 - command: simple
378 - pipeTo: piped
379 - statement: ['simple', '']
380 - command: simple
381 >>> print c.parser.parseString('double-pipe || is not a pipe').dump()
382 ['double', '-pipe || is not a pipe']
383 - args: -pipe || is not a pipe
384 - command: double
385 - statement: ['double', '-pipe || is not a pipe']
386 - args: -pipe || is not a pipe
387 - command: double
388 >>> print c.parser.parseString('command with args, terminator;sufx | piped').dump()
389 ['command', 'with args, terminator', ';', 'sufx', '|', ' piped']
390 - args: with args, terminator
391 - command: command
392 - pipeTo: piped
393 - statement: ['command', 'with args, terminator', ';']
394 - args: with args, terminator
395 - command: command
396 - terminator: ;
397 - suffix: sufx
398 - terminator: ;
399 >>> print c.parser.parseString('output into > afile.txt').dump()
400 ['output', 'into', '>', 'afile.txt']
401 - args: into
402 - command: output
403 - output: >
404 - outputTo: afile.txt
405 - statement: ['output', 'into']
406 - args: into
407 - command: output
408 >>> print c.parser.parseString('output into;sufx | pipethrume plz > afile.txt').dump()
409 ['output', 'into', ';', 'sufx', '|', ' pipethrume plz', '>', 'afile.txt']
410 - args: into
411 - command: output
412 - output: >
413 - outputTo: afile.txt
414 - pipeTo: pipethrume plz
415 - statement: ['output', 'into', ';']
416 - args: into
417 - command: output
418 - terminator: ;
419 - suffix: sufx
420 - terminator: ;
421 >>> print c.parser.parseString('output to paste buffer >> ').dump()
422 ['output', 'to paste buffer', '>>', '']
423 - args: to paste buffer
424 - command: output
425 - output: >>
426 - statement: ['output', 'to paste buffer']
427 - args: to paste buffer
428 - command: output
429 >>> print c.parser.parseString('ignore the /* commented | > */ stuff;').dump()
430 ['ignore', 'the /* commented | > */ stuff', ';', '']
431 - args: the /* commented | > */ stuff
432 - command: ignore
433 - statement: ['ignore', 'the /* commented | > */ stuff', ';']
434 - args: the /* commented | > */ stuff
435 - command: ignore
436 - terminator: ;
437 - terminator: ;
438 >>> print c.parser.parseString('has > inside;').dump()
439 ['has', '> inside', ';', '']
440 - args: > inside
441 - command: has
442 - statement: ['has', '> inside', ';']
443 - args: > inside
444 - command: has
445 - terminator: ;
446 - terminator: ;
447 >>> print c.parser.parseString('multiline has > inside an unfinished command').dump()
448 ['multiline', ' has > inside an unfinished command']
449 - multilineCommand: multiline
450 >>> print c.parser.parseString('multiline has > inside;').dump()
451 ['multiline', 'has > inside', ';', '']
452 - args: has > inside
453 - multilineCommand: multiline
454 - statement: ['multiline', 'has > inside', ';']
455 - args: has > inside
456 - multilineCommand: multiline
457 - terminator: ;
458 - terminator: ;
459 >>> print c.parser.parseString('multiline command /* with comment in progress;').dump()
460 ['multiline', ' command /* with comment in progress;']
461 - multilineCommand: multiline
462 >>> print c.parser.parseString('multiline command /* with comment complete */ is done;').dump()
463 ['multiline', 'command /* with comment complete */ is done', ';', '']
464 - args: command /* with comment complete */ is done
465 - multilineCommand: multiline
466 - statement: ['multiline', 'command /* with comment complete */ is done', ';']
467 - args: command /* with comment complete */ is done
468 - multilineCommand: multiline
469 - terminator: ;
470 - terminator: ;
471 >>> print c.parser.parseString('multiline command ends\n\n').dump()
472 ['multiline', 'command ends', '\n', '\n']
473 - args: command ends
474 - multilineCommand: multiline
475 - statement: ['multiline', 'command ends', '\n', '\n']
476 - args: command ends
477 - multilineCommand: multiline
478 - terminator: ['\n', '\n']
479 - terminator: ['\n', '\n']
480 '''
481 outputParser = (pyparsing.Literal('>>') | (pyparsing.WordStart() + '>') | pyparsing.Regex('[^=]>'))('output')
483 terminatorParser = pyparsing.Or([(hasattr(t, 'parseString') and t) or pyparsing.Literal(t) for t in self.terminators])('terminator')
484 stringEnd = pyparsing.stringEnd ^ '\nEOF'
485 self.multilineCommand = pyparsing.Or([pyparsing.Keyword(c, caseless=self.case_insensitive) for c in self.multilineCommands])('multilineCommand')
486 oneLineCommand = (~self.multilineCommand + pyparsing.Word(self.legalChars))('command')
487 pipe = pyparsing.Keyword('|', identChars='|')
488 self.commentGrammars.ignore(pyparsing.quotedString).setParseAction(lambda x: '')
489 self.commentInProgress.ignore(pyparsing.quotedString).ignore(pyparsing.cStyleComment)
490 afterElements = \
491 pyparsing.Optional(pipe + pyparsing.SkipTo(outputParser ^ stringEnd)('pipeTo')) + \
492 pyparsing.Optional(outputParser + pyparsing.SkipTo(stringEnd).setParseAction(lambda x: x[0].strip())('outputTo'))
493 if self.case_insensitive:
494 self.multilineCommand.setParseAction(lambda x: x[0].lower())
495 oneLineCommand.setParseAction(lambda x: x[0].lower())
496 if self.blankLinesAllowed:
497 self.blankLineTerminationParser = pyparsing.NoMatch
498 else:
499 self.blankLineTerminator = (pyparsing.lineEnd + pyparsing.lineEnd)('terminator')
500 self.blankLineTerminator.setResultsName('terminator')
501 self.blankLineTerminationParser = ((self.multilineCommand ^ oneLineCommand) + pyparsing.SkipTo(self.blankLineTerminator).setParseAction(lambda x: x[0].strip())('args') + self.blankLineTerminator)('statement')
502 self.multilineParser = (((self.multilineCommand ^ oneLineCommand) + SkipToLast(terminatorParser).setParseAction(lambda x: x[0].strip())('args') + terminatorParser)('statement') +
503 pyparsing.SkipTo(outputParser ^ pipe ^ stringEnd).setParseAction(lambda x: x[0].strip())('suffix') + afterElements)
504 self.singleLineParser = ((oneLineCommand + pyparsing.SkipTo(terminatorParser ^ stringEnd ^ pipe ^ outputParser).setParseAction(lambda x:x[0].strip())('args'))('statement') +
505 pyparsing.Optional(terminatorParser) + afterElements)
506 #self.multilineParser = self.multilineParser.setResultsName('multilineParser')
507 #self.singleLineParser = self.singleLineParser.setResultsName('singleLineParser')
508 #self.blankLineTerminationParser = self.blankLineTerminationParser.setResultsName('blankLineTerminatorParser')
509 self.parser = (
510 stringEnd |
511 self.prefixParser + self.multilineParser |
512 self.prefixParser + self.singleLineParser |
513 self.prefixParser + self.blankLineTerminationParser |
514 self.prefixParser + self.multilineCommand + pyparsing.SkipTo(stringEnd)
515 )
516 self.parser.ignore(pyparsing.quotedString).ignore(self.commentGrammars).ignore(self.commentInProgress)
518 inputMark = pyparsing.Literal('<')
519 inputMark.setParseAction(lambda x: '')
520 fileName = pyparsing.Word(self.legalChars + '/\\')
521 inputFrom = fileName('inputFrom')
522 inputFrom.setParseAction(replace_with_file_contents)
523 # a not-entirely-satisfactory way of distinguishing < as in "import from" from <
524 # as in "lesser than"
525 self.inputParser = inputMark + pyparsing.Optional(inputFrom) + pyparsing.Optional('>') + \
526 pyparsing.Optional(fileName) + (pyparsing.stringEnd | '|')
527 self.inputParser.ignore(pyparsing.quotedString).ignore(self.commentGrammars).ignore(self.commentInProgress)
529 def preparse(self, raw, **kwargs):
530 return raw
532 def parsed(self, raw, **kwargs):
533 if isinstance(raw, ParsedString):
534 p = raw
535 else:
536 raw = self.preparse(raw, **kwargs)
537 s = self.inputParser.transformString(raw.lstrip())
538 for (shortcut, expansion) in self.shortcuts.items():
539 if s.lower().startswith(shortcut):
540 s = s.replace(shortcut, expansion + ' ', 1)
541 break
542 result = self.parser.parseString(s)
543 result['command'] = result.multilineCommand or result.command
544 result['raw'] = raw
545 result['clean'] = self.commentGrammars.transformString(result.args)
546 result['expanded'] = s
547 p = ParsedString(result.clean)
548 p.parsed = result
549 p.parser = self.parsed
550 for (key, val) in kwargs.items():
551 p.parsed[key] = val
552 return p
554 def postparsing_precmd(self, statement):
555 stop = 0
556 return stop, statement
557 def postparsing_postcmd(self, stop):
558 return stop
559 def onecmd(self, line):
560 """Interpret the argument as though it had been typed in response
561 to the prompt.
563 This may be overridden, but should not normally need to be;
564 see the precmd() and postcmd() methods for useful execution hooks.
565 The return value is a flag indicating whether interpretation of
566 commands by the interpreter should stop.
568 This (`cmd2`) version of `onecmd` already override's `cmd`'s `onecmd`.
570 """
571 if not line:
572 return self.emptyline()
573 if not pyparsing.Or(self.commentGrammars).setParseAction(lambda x: '').transformString(line):
574 return 0 # command was empty except for comments
575 try:
576 statement = self.parsed(line)
577 while statement.parsed.multilineCommand and (statement.parsed.terminator == ''):
578 statement = '%s\n%s' % (statement.parsed.raw,
579 self.pseudo_raw_input(self.continuation_prompt))
580 statement = self.parsed(statement)
581 except Exception, e:
582 print e
583 return 0
585 try:
586 (stop, statement) = self.postparsing_precmd(statement)
587 except Exception, e:
588 print str(e)
589 return 0
590 if stop:
591 return self.postparsing_postcmd(stop)
593 if not statement.parsed.command:
594 return self.postparsing_postcmd(stop=0)
596 statekeeper = None
598 if statement.parsed.pipeTo:
599 redirect = subprocess.Popen(statement.parsed.pipeTo, shell=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE)
600 statekeeper = Statekeeper(self, ('stdout',))
601 self.stdout = redirect.stdin
602 elif statement.parsed.output:
603 statekeeper = Statekeeper(self, ('stdout',))
604 if statement.parsed.outputTo:
605 mode = 'w'
606 if statement.parsed.output == '>>':
607 mode = 'a'
608 try:
609 self.stdout = open(os.path.expanduser(statement.parsed.outputTo), mode)
610 except OSError, e:
611 print e
612 return self.postparsing_postcmd(stop=0)
613 else:
614 statekeeper = Statekeeper(self, ('stdout',))
615 self.stdout = tempfile.TemporaryFile()
616 if statement.parsed.output == '>>':
617 self.stdout.write(getPasteBuffer())
618 try:
619 # "heart" of the command, replace's cmd's onecmd()
620 self.lastcmd = statement.parsed.expanded
621 try:
622 func = getattr(self, 'do_' + statement.parsed.command)
623 except AttributeError:
624 func = None
625 if self.abbrev: # accept shortened versions of commands
626 funcs = [f for (fname, function) in inspect.getmembers(self, inspect.ismethod)
627 if fname.startswith('do_' + statement.parsed.command)]
628 if len(funcs) == 1:
629 func = funcs[0]
630 if not func:
631 return self.postparsing_postcmd(self.default(statement))
632 timestart = datetime.datetime.now()
633 stop = func(statement)
634 if self.timing:
635 print 'Elapsed: %s' % str(datetime.datetime.now() - timestart)
636 except Exception, e:
637 print e
638 try:
639 if statement.parsed.command not in self.excludeFromHistory:
640 self.history.append(statement.parsed.raw)
641 finally:
642 if statekeeper:
643 if statement.parsed.output and not statement.parsed.outputTo:
644 self.stdout.seek(0)
645 try:
646 writeToPasteBuffer(self.stdout.read())
647 except Exception, e:
648 print str(e)
649 elif statement.parsed.pipeTo:
650 for result in redirect.communicate():
651 statekeeper.stdout.write(result or '')
652 self.stdout.close()
653 statekeeper.restore()
655 return self.postparsing_postcmd(stop)
657 def pseudo_raw_input(self, prompt):
658 """copied from cmd's cmdloop; like raw_input, but accounts for changed stdin, stdout"""
660 if self.use_rawinput:
661 try:
662 line = raw_input(prompt)
663 except EOFError:
664 line = 'EOF'
665 else:
666 self.stdout.write(prompt)
667 self.stdout.flush()
668 line = self.stdin.readline()
669 if not len(line):
670 line = 'EOF'
671 else:
672 if line[-1] == '\n': # this was always true in Cmd
673 line = line[:-1]
674 return line
676 def cmdloop(self, intro=None):
677 """Repeatedly issue a prompt, accept input, parse an initial prefix
678 off the received input, and dispatch to action methods, passing them
679 the remainder of the line as argument.
680 """
682 # An almost perfect copy from Cmd; however, the pseudo_raw_input portion
683 # has been split out so that it can be called separately
685 self.preloop()
686 if self.use_rawinput and self.completekey:
687 try:
688 import readline
689 self.old_completer = readline.get_completer()
690 readline.set_completer(self.complete)
691 readline.parse_and_bind(self.completekey+": complete")
692 except ImportError:
693 pass
694 try:
695 if intro is not None:
696 self.intro = intro
697 if self.intro:
698 self.stdout.write(str(self.intro)+"\n")
699 stop = None
700 while not stop:
701 if self.cmdqueue:
702 line = self.cmdqueue.pop(0)
703 else:
704 line = self.pseudo_raw_input(self.prompt)
705 if (self.echo) and (isinstance(self.stdin, file)):
706 self.stdout.write(line + '\n')
707 line = self.precmd(line)
708 stop = self.onecmd(line)
709 stop = self.postcmd(stop, line)
710 self.postloop()
711 finally:
712 if self.use_rawinput and self.completekey:
713 try:
714 import readline
715 readline.set_completer(self.old_completer)
716 except ImportError:
717 pass
718 return stop
720 def do_EOF(self, arg):
721 return True
722 do_eof = do_EOF
724 def showParam(self, param):
725 any_shown = False
726 param = param.strip().lower()
727 for p in self.settable:
728 if p.startswith(param):
729 val = getattr(self, p)
730 self.stdout.write('%s: %s\n' % (p, str(getattr(self, p))))
731 any_shown = True
732 if not any_shown:
733 print "Parameter '%s' not supported (type 'show' for list of parameters)." % param
735 def do_quit(self, arg):
736 return self._STOP_AND_EXIT
737 do_exit = do_quit
738 do_q = do_quit
740 def do_show(self, arg):
741 '''Shows value of a parameter.'''
742 if arg.strip():
743 self.showParam(arg)
744 else:
745 for param in self.settable:
746 self.showParam(param)
748 def do_set(self, arg):
749 '''Sets a cmd2 parameter. Accepts abbreviated parameter names so long as there is no ambiguity.
750 Call without arguments for a list of settable parameters with their values.'''
751 try:
752 paramName, val = arg.split(None, 1)
753 paramName = paramName.strip().lower()
754 hits = [paramName in p for p in self.settable]
755 if hits.count(True) == 1:
756 paramName = self.settable[hits.index(True)]
757 currentVal = getattr(self, paramName)
758 if (val[0] == val[-1]) and val[0] in ("'", '"'):
759 val = val[1:-1]
760 else:
761 val = cast(currentVal, val)
762 setattr(self, paramName, val)
763 self.stdout.write('%s - was: %s\nnow: %s\n' % (paramName, currentVal, val))
764 if currentVal != val:
765 try:
766 onchange_hook = getattr(self, '_onchange_%s' % paramName)
767 onchange_hook(old=currentVal, new=val)
768 except AttributeError:
769 pass
770 else:
771 self.do_show(paramName)
772 except (ValueError, AttributeError, NotSettableError), e:
773 self.do_show(arg)
775 def do_pause(self, arg):
776 'Displays the specified text then waits for the user to press RETURN.'
777 raw_input(arg + '\n')
779 def do_shell(self, arg):
780 'execute a command as if at the OS prompt.'
781 os.system(arg)
783 def _attempt_py_command(self, arg):
784 try:
785 result = eval(arg, self.pystate)
786 print repr(result)
787 except SyntaxError:
788 exec(arg, self.pystate)
789 return
791 def do_py(self, arg):
792 '''
793 py <command>: Executes a Python command.
794 py: Enters interactive Python mode; end with `Ctrl-D`.
795 Do not end with Ctrl-Z, or it will end your entire cmd2 session!
796 Non-python commands can be issued with cmd('your non-python command here').
797 '''
798 if arg.strip():
799 interp = code.InteractiveInterpreter(locals=self.pystate)
800 interp.runcode(arg)
801 else:
802 interp = InteractiveConsole(locals=self.pystate)
803 def quit():
804 raise EmbeddedConsoleExit
805 self.pystate['quit'] = quit
806 self.pystate['exit'] = quit
807 try:
808 interp.interact()
809 except (EmbeddedConsoleExit, SystemExit):
810 return
812 def do_history(self, arg):
813 """history [arg]: lists past commands issued
815 no arg -> list all
816 arg is integer -> list one history item, by index
817 arg is string -> string search
818 arg is /enclosed in forward-slashes/ -> regular expression search
819 """
820 if arg:
821 history = self.history.get(arg)
822 else:
823 history = self.history
824 for hi in history:
825 self.stdout.write(hi.pr())
826 def last_matching(self, arg):
827 try:
828 if arg:
829 return self.history.get(arg)[-1]
830 else:
831 return self.history[-1]
832 except IndexError:
833 return None
834 def do_list(self, arg):
835 """list [arg]: lists last command issued
837 no arg -> list absolute last
838 arg is integer -> list one history item, by index
839 - arg, arg - (integer) -> list up to or after #arg
840 arg is string -> list last command matching string search
841 arg is /enclosed in forward-slashes/ -> regular expression search
842 """
843 try:
844 self.stdout.write(self.last_matching(arg).pr())
845 except:
846 pass
847 do_hi = do_history
848 do_l = do_list
849 do_li = do_list
851 def do_ed(self, arg):
852 """ed: edit most recent command in text editor
853 ed [N]: edit numbered command from history
854 ed [filename]: edit specified file name
856 commands are run after editor is closed.
857 "set edit (program-name)" or set EDITOR environment variable
858 to control which editing program is used."""
859 if not self.editor:
860 print "please use 'set editor' to specify your text editing program of choice."
861 return
862 filename = self.default_file_name
863 if arg:
864 try:
865 buffer = self.last_matching(int(arg))
866 except ValueError:
867 filename = arg
868 buffer = ''
869 else:
870 buffer = self.history[-1]
872 if buffer:
873 f = open(os.path.expanduser(filename), 'w')
874 f.write(buffer or '')
875 f.close()
877 os.system('%s %s' % (self.editor, filename))
878 self.do__load(filename)
879 do_edit = do_ed
881 saveparser = (pyparsing.Optional(pyparsing.Word(pyparsing.nums)^'*')("idx") +
882 pyparsing.Optional(pyparsing.Word(legalChars + '/\\'))("fname") +
883 pyparsing.stringEnd)
884 def do_save(self, arg):
885 """`save [N] [filename.ext]`
886 Saves command from history to file.
887 N => Number of command (from history), or `*`;
888 most recent command if omitted"""
890 try:
891 args = self.saveparser.parseString(arg)
892 except pyparsing.ParseException:
893 print self.do_save.__doc__
894 return
895 fname = args.fname or self.default_file_name
896 if args.idx == '*':
897 saveme = '\n\n'.join(self.history[:])
898 elif args.idx:
899 saveme = self.history[int(args.idx)-1]
900 else:
901 saveme = self.history[-1]
902 try:
903 f = open(os.path.expanduser(fname), 'w')
904 f.write(saveme)
905 f.close()
906 print 'Saved to %s' % (fname)
907 except Exception, e:
908 print 'Error saving %s: %s' % (fname, str(e))
910 urlre = re.compile('(https?://[-\\w\\./]+)')
911 def do_load(self, fname=None):
912 """Runs script of command(s) from a file or URL."""
913 if fname is None:
914 fname = self.default_file_name
915 keepstate = Statekeeper(self, ('stdin','use_rawinput','prompt','continuation_prompt'))
916 try:
917 if isinstance(fname, file):
918 target = open(fname, 'r')
919 else:
920 match = self.urlre.match(fname)
921 if match:
922 target = urllib.urlopen(match.group(1))
923 else:
924 fname = os.path.expanduser(fname)
925 try:
926 target = open(os.path.expanduser(fname), 'r')
927 except IOError, e:
928 target = open('%s.%s' % (os.path.expanduser(fname),
929 self.defaultExtension), 'r')
930 except IOError, e:
931 print 'Problem accessing script from %s: \n%s' % (fname, e)
932 keepstate.restore()
933 return
934 self.stdin = target
935 self.use_rawinput = False
936 self.prompt = self.continuation_prompt = ''
937 stop = self.cmdloop()
938 self.stdin.close()
939 keepstate.restore()
940 self.lastcmd = ''
941 return (stop == self._STOP_AND_EXIT) and self._STOP_AND_EXIT
942 do__load = do_load # avoid an unfortunate legacy use of do_load from sqlpython
944 def do_run(self, arg):
945 """run [arg]: re-runs an earlier command
947 no arg -> run most recent command
948 arg is integer -> run one history item, by index
949 arg is string -> run most recent command by string search
950 arg is /enclosed in forward-slashes/ -> run most recent by regex
951 """
952 'run [N]: runs the SQL that was run N commands ago'
953 runme = self.last_matching(arg)
954 print runme
955 if runme:
956 runme = self.precmd(runme)
957 stop = self.onecmd(runme)
958 stop = self.postcmd(stop, runme)
959 do_r = do_run
961 def fileimport(self, statement, source):
962 try:
963 f = open(os.path.expanduser(source))
964 except IOError:
965 self.stdout.write("Couldn't read from file %s\n" % source)
966 return ''
967 data = f.read()
968 f.close()
969 return data
971 class HistoryItem(str):
972 def __init__(self, instr):
973 str.__init__(self)
974 self.lowercase = self.lower()
975 self.idx = None
976 def pr(self):
977 return '-------------------------[%d]\n%s\n' % (self.idx, str(self))
979 class History(list):
980 rangeFrom = re.compile(r'^([\d])+\s*\-$')
981 def append(self, new):
982 new = HistoryItem(new)
983 list.append(self, new)
984 new.idx = len(self)
985 def extend(self, new):
986 for n in new:
987 self.append(n)
988 def get(self, getme):
989 try:
990 getme = int(getme)
991 if getme < 0:
992 return self[:(-1 * getme)]
993 else:
994 return [self[getme-1]]
995 except IndexError:
996 return []
997 except (ValueError, TypeError):
998 getme = getme.strip()
999 mtch = self.rangeFrom.search(getme)
1000 if mtch:
1001 return self[(int(mtch.group(1))-1):]
1002 if getme.startswith(r'/') and getme.endswith(r'/'):
1003 finder = re.compile(getme[1:-1], re.DOTALL | re.MULTILINE | re.IGNORECASE)
1004 def isin(hi):
1005 return finder.search(hi)
1006 else:
1007 def isin(hi):
1008 return (getme.lower() in hi.lowercase)
1009 return [itm for itm in self if isin(itm)]
1011 class NotSettableError(Exception):
1012 pass
1014 def cast(current, new):
1015 """Tries to force a new value into the same type as the current."""
1016 typ = type(current)
1017 if typ == bool:
1018 try:
1019 return bool(int(new))
1020 except ValueError, TypeError:
1021 pass
1022 try:
1023 new = new.lower()
1024 except:
1025 pass
1026 if (new=='on') or (new[0] in ('y','t')):
1027 return True
1028 if (new=='off') or (new[0] in ('n','f')):
1029 return False
1030 else:
1031 try:
1032 return typ(new)
1033 except:
1034 pass
1035 print "Problem setting parameter (now %s) to %s; incorrect type?" % (current, new)
1036 return current
1038 class Statekeeper(object):
1039 def __init__(self, obj, attribs):
1040 self.obj = obj
1041 self.attribs = attribs
1042 self.save()
1043 def save(self):
1044 for attrib in self.attribs:
1045 setattr(self, attrib, getattr(self.obj, attrib))
1046 def restore(self):
1047 for attrib in self.attribs:
1048 setattr(self.obj, attrib, getattr(self, attrib))
1050 class Borg(object):
1051 '''All instances of any Borg subclass will share state.
1052 from Python Cookbook, 2nd Ed., recipe 6.16'''
1053 _shared_state = {}
1054 def __new__(cls, *a, **k):
1055 obj = object.__new__(cls, *a, **k)
1056 obj.__dict__ = cls._shared_state
1057 return obj
1059 class OutputTrap(Borg):
1060 '''Instantiate an OutputTrap to divert/capture ALL stdout output. For use in unit testing.
1061 Call `tearDown()` to return to normal output.'''
1062 def __init__(self):
1063 self.old_stdout = sys.stdout
1064 self.trap = tempfile.TemporaryFile()
1065 sys.stdout = self.trap
1066 def read(self):
1067 self.trap.seek(0)
1068 result = self.trap.read()
1069 self.trap.truncate(0)
1070 return result.strip('\x00')
1071 def tearDown(self):
1072 sys.stdout = self.old_stdout
1074 class Cmd2TestCase(unittest.TestCase):
1075 '''Subclass this, setting CmdApp and transcriptFileName, to make a unittest.TestCase class
1076 that will execute the commands in transcriptFileName and expect the results shown.
1077 See example.py'''
1078 CmdApp = None
1079 transcriptFileName = ''
1080 def setUp(self):
1081 if self.CmdApp:
1082 self.outputTrap = OutputTrap()
1083 self.cmdapp = self.CmdApp()
1084 try:
1085 tfile = open(os.path.expanduser(self.transcriptFileName))
1086 self.transcript = iter(tfile.readlines())
1087 tfile.close()
1088 except IOError:
1089 self.transcript = []
1090 def assertEqualEnough(self, got, expected, message):
1091 got = got.strip().splitlines()
1092 expected = expected.strip().splitlines()
1093 self.assertEqual(len(got), len(expected), message)
1094 for (linegot, lineexpected) in zip(got, expected):
1095 matchme = re.escape(lineexpected.strip()).replace('\\*', '.*'). \
1096 replace('\\ ', ' ')
1097 self.assert_(re.match(matchme, linegot.strip()), message)
1098 def testall(self):
1099 if self.CmdApp:
1100 lineNum = 0
1101 try:
1102 line = self.transcript.next()
1103 while True:
1104 while not line.startswith(self.cmdapp.prompt):
1105 line = self.transcript.next()
1106 command = [line[len(self.cmdapp.prompt):]]
1107 line = self.transcript.next()
1108 while line.startswith(self.cmdapp.continuation_prompt):
1109 command.append(line[len(self.cmdapp.continuation_prompt):])
1110 line = self.transcript.next()
1111 command = ''.join(command)
1112 self.cmdapp.onecmd(command)
1113 result = self.outputTrap.read()
1114 if line.startswith(self.cmdapp.prompt):
1115 self.assertEqualEnough(result.strip(), '',
1116 '\nFile %s, line %d\nCommand was:\n%s\nExpected: (nothing) \nGot:\n%s\n' %
1117 (self.transcriptFileName, lineNum, command, result))
1118 continue
1119 expected = []
1120 while not line.startswith(self.cmdapp.prompt):
1121 expected.append(line)
1122 line = self.transcript.next()
1123 expected = ''.join(expected)
1124 self.assertEqualEnough(expected.strip(), result.strip(),
1125 '\nFile %s, line %d\nCommand was:\n%s\nExpected:\n%s\nGot:\n%s\n' %
1126 (self.transcriptFileName, lineNum, command, expected, result))
1127 # this needs to account for a line-by-line strip()ping
1128 except StopIteration:
1129 pass
1130 # catch the final output?
1131 def tearDown(self):
1132 if self.CmdApp:
1133 self.outputTrap.tearDown()
1135 if __name__ == '__main__':
1136 doctest.testmod(optionflags = doctest.NORMALIZE_WHITESPACE)
1137 #c = Cmd()