# HG changeset patch # User catherine@Elli.myhome.westell.com # Date 1215169701 14400 # Node ID f583663c610f08627fef8946c8fdee912b5da6e3 # Parent 86bb50447e99fd4db3f6912405d550814e8348e8 switch to pyparsing worked diff -r 86bb50447e99 -r f583663c610f cmd2.py --- a/cmd2.py Fri Jun 27 09:51:01 2008 -0400 +++ b/cmd2.py Fri Jul 04 07:08:21 2008 -0400 @@ -18,7 +18,7 @@ As of 0.3.0, options should be specified as `optparse` options. See README.txt. flagReader.py options are still supported for backward compatibility """ -import cmd, re, os, sys, optparse, subprocess, tempfile, pyparsing +import cmd, re, os, sys, optparse, subprocess, tempfile, pyparsing, doctest from optparse import make_option class OptionParser(optparse.OptionParser): @@ -123,7 +123,14 @@ def getPasteBuffer(): raise OSError, pastebufferr % ('xclip', 'On Debian/Ubuntu, install with "sudo apt-get install xclip"') setPasteBuffer = getPasteBuffer - + +pyparsing.ParserElement.setDefaultWhitespaceChars(' \t') +def parseSearchResults(pattern, s): + try: + return pattern.searchString(s)[0] + except IndexError: + return pyparsing.ParseResults('') + class Cmd(cmd.Cmd): caseInsensitive = True multilineCommands = [] @@ -168,40 +175,71 @@ def __init__(self, *args, **kwargs): cmd.Cmd.__init__(self, *args, **kwargs) self.history = History() + self.punctuationPattern = self.terminators ^ self.pipePattern ^ \ + self.redirectInPattern ^ \ + self.redirectOutPattern + self.punctuationPattern.ignore(pyparsing.sglQuotedString) + self.punctuationPattern.ignore(pyparsing.dblQuotedString) def do_shortcuts(self, args): """Lists single-key shortcuts available.""" result = "\n".join('%s: %s' % (sc[0], sc[1]) for sc in self.shortcuts.items()) self.stdout.write("Single-key shortcuts for other commands:\n%s\n" % (result)) - notAPipe = pyparsing.SkipTo('|') - notAPipe.ignore(pyparsing.sglQuotedString) - notAPipe.ignore(pyparsing.dblQuotedString) - pipeFinder = notAPipe + '|' + pyparsing.SkipTo(pyparsing.StringEnd()) - def parsePipe(self, statement, mustBeTerminated): + terminators = (pyparsing.Literal(';') ^ pyparsing.Literal('\n\n')) \ + ('terminator') + argSeparatorPattern = pyparsing.Word(pyparsing.printables)('command') \ + + pyparsing.SkipTo(pyparsing.StringEnd())('args') + filenamePattern = pyparsing.Word(pyparsing.alphanums + '#$-_~{},.!') + integerPattern = pyparsing.Word(pyparsing.nums).setParseAction( lambda s,l,t: [ int(t[0]) ] ) + pipePattern = pyparsing.Literal('|')('pipe') + pyparsing.restOfLine('pipeTo') + redirectOutPattern = (pyparsing.Literal('>>') ^ '>')('output') \ + + pyparsing.Optional(filenamePattern)('outputTo') + redirectInPattern = pyparsing.Literal('<')('input') \ + + pyparsing.Optional(filenamePattern)('inputFrom') + for p in (terminators, pipePattern, redirectInPattern, redirectOutPattern): + p.ignore(pyparsing.sglQuotedString) + p.ignore(pyparsing.dblQuotedString) + + def parsed(self, s): + ''' + >>> c = Cmd() + >>> c.parsed('quotes "are > ignored" < inp.txt').asDict() + {'args': ' "are > ignored"', 'inputFrom': 'inp.txt', 'command': 'quotes', 'statement': 'quotes "are > ignored"', 'input': '<', 'fullStatement': 'quotes "are > ignored" < inp.txt'} + >>> c.parsed('very complex; < from.txt >> to.txt etc.').asDict() + {'args': ' complex', 'inputFrom': 'from.txt', 'command': 'very', 'terminator': ';', 'statement': 'very complex', 'input': '<', 'output': '>>', 'outputTo': 'to.txt', 'fullStatement': 'very complex; < from.txt >> to.txt etc.'} + >>> c.parsed('nothing to parse').asDict() + {'args': ' to parse', 'command': 'nothing', 'statement': 'nothing to parse', 'fullStatement': 'nothing to parse'} + >>> c.parsed('send it to | sort | wc').asDict() + {'args': ' it to', 'pipe': '|', 'pipeTo': ' sort | wc', 'command': 'send', 'statement': 'send it to', 'fullStatement': 'send it to | sort | wc'} + >>> r = c.parsed('got from < thisfile.txt plus blah blah') + >>> r.asDict() + {'args': ' from', 'inputFrom': 'thisfile.txt', 'command': 'got', 'statement': 'got from', 'input': '<', 'fullStatement': 'got from < thisfile.txt plus blah blah'} + >>> c.parsed(r).asDict() + {'args': ' from', 'inputFrom': 'thisfile.txt', 'command': 'got', 'statement': 'got from', 'input': '<', 'fullStatement': 'got from < thisfile.txt plus blah blah'} + ''' + if isinstance(s, pyparsing.ParseResults): + return s + result = (pyparsing.SkipTo(pyparsing.StringEnd()))('fullStatement').parseString(s) + result['statement'] = result.fullStatement try: - statement, pipe, destination = self.pipeFinder.parseString(statement) - redirect = subprocess.Popen(destination, shell=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE) - return statement, redirect + result += (pyparsing.SkipTo(self.punctuationPattern)('statement') \ + + self.punctuationPattern).parseString(s) + result += parseSearchResults(self.pipePattern, s) + result += parseSearchResults(self.redirectInPattern, s) + result += parseSearchResults(self.redirectOutPattern, s) except pyparsing.ParseException: - return statement, None - - legalFileName = re.compile(r'''^[^"'\s]+$''') - def parseRedirector(self, statement, symbol, mustBeTerminated=False): - # pipeFinder.scanString(statement) - parts = statement.split(symbol) - if (len(parts) < 2): - return statement, None - if mustBeTerminated and (not self.statementEndPattern.search(parts[-2])): - return statement, None - (newStatement, redirect) = (symbol.join(parts[:-1]), parts[-1].strip()) - if redirect: - if not self.legalFileName.search(redirect): - return statement, None - else: - redirect = self._TO_PASTE_BUFFER - return newStatement, redirect - + pass + finally: + try: + result += self.argSeparatorPattern.parseString(result.statement) + except pyparsing.ParseException: + return result + if self.caseInsensitive: + result['command'] = result.command.lower() + result['statement'] = '%s %s' % (result.command, result.args) + return result + def extractCommand(self, statement): try: (command, args) = statement.split(None,1) @@ -210,23 +248,7 @@ if self.caseInsensitive: command = command.lower() return command, args - - def parseRedirectors(self, statement): - mustBeTerminated = self.extractCommand(statement)[0] in self.multilineCommands - newStatement, redirect = self.parsePipe(statement, mustBeTerminated) - if redirect: - return newStatement, redirect, 'pipe' - newStatement, redirect = self.parseRedirector(statement, '>>', mustBeTerminated) - if redirect: - return newStatement, redirect, 'a' - newStatement, redirect = self.parseRedirector(statement, '>', mustBeTerminated) - if redirect: - return newStatement, redirect, 'w' - newStatement, redirect = self.parseRedirector(statement, '<', mustBeTerminated) - if redirect: - return newStatement, redirect, 'r' - return statement, '', '' - + def onecmd(self, line, assumeComplete=False): """Interpret the argument as though it had been typed in response to the prompt. @@ -237,48 +259,56 @@ commands by the interpreter should stop. """ - command, args = self.extractCommand(line) - statement = originalStatement = ' '.join([command, args]) - if (not assumeComplete) and (command in self.multilineCommands): - statement = self.finishStatement(statement) + statement = self.parsed(line) + while (statement.command in self.multilineCommands) and not \ + (statement.terminator or assumeComplete): + statement = self.parsed('%s\n%s' % (statement.fullStatement, + self.pseudo_raw_input(self.continuationPrompt))) + statekeeper = None stop = 0 - statement, redirect, mode = self.parseRedirectors(statement) - if isinstance(redirect, subprocess.Popen): + if statement.input: + if statement.inputFrom: + try: + newinput = open(statement.inputFrom, 'r').read() + except OSError, e: + print e + return 0 + else: + newinput = getPasteBuffer() + start, end = self.redirectInPattern.scanString(statement.fullStatement).next()[1:] + return self.onecmd('%s%s%s' % (statement.fullStatement[:start], + newinput, statement.fullStatement[end:])) + if statement.pipe and statement.pipeTo: + redirect = subprocess.Popen(statement.pipeTo, shell=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE) statekeeper = Statekeeper(self, ('stdout',)) self.stdout = redirect.stdin - elif redirect == self._TO_PASTE_BUFFER: - try: - clipcontents = getPasteBuffer() - if mode in ('w', 'a'): - statekeeper = Statekeeper(self, ('stdout',)) - self.stdout = tempfile.TemporaryFile() - if mode == 'a': - self.stdout.write(clipcontents) - else: - statement = '%s %s' % (statement, clipcontents) - except OSError, e: - print e - return 0 - elif redirect: - if mode in ('w','a'): + elif statement.output: + statekeeper = Statekeeper(self, ('stdout',)) + if statement.outputTo: + mode = 'w' + if statement.output == '>>': + mode = 'a' + try: + self.stdout = open(statement.outputTo, mode) + except OSError, e: + print e + return 0 + else: statekeeper = Statekeeper(self, ('stdout',)) - self.stdout = open(redirect, mode) - else: - statement = '%s %s' % (statement, self.fileimport(statement=statement, source=redirect)) - if isinstance(redirect, subprocess.Popen): - stop = self.onecmd(statement) - else: - stop = cmd.Cmd.onecmd(self, statement) + self.stdout = tempfile.TemporaryFile() + if statement.output == '>>': + self.stdout.write(getPasteBuffer()) + stop = cmd.Cmd.onecmd(self, statement.statement) try: - if command not in self.excludeFromHistory: - self.history.append(originalStatement) + if statement.command not in self.excludeFromHistory: + self.history.append(statement.fullStatement) finally: if statekeeper: - if redirect == self._TO_PASTE_BUFFER: + if statement.output and not statement.outputTo: self.stdout.seek(0) writeToPasteBuffer(self.stdout.read()) - elif isinstance(redirect, subprocess.Popen): + elif statement.pipe: for result in redirect.communicate(): statekeeper.stdout.write(result or '') self.stdout.close() @@ -286,22 +316,6 @@ return stop - statementEndPattern = re.compile(r'[%s]\s*$' % terminators) - def statementHasEnded(self, lines): - return bool(self.statementEndPattern.search(lines)) \ - or lines[-3:] == 'EOF' \ - or self.parseRedirectors(lines)[1] - - def finishStatement(self, firstline): - statement = firstline - while not self.statementHasEnded(statement): - inp = self.pseudo_raw_input(self.continuationPrompt) - statement = '%s\n%s' % (statement, inp) - return statement - # assembling a list of lines and joining them at the end would be faster, - # but statementHasEnded needs a string arg; anyway, we're getting - # user input and users are slow. - def pseudo_raw_input(self, prompt): """copied from cmd's cmdloop; like raw_input, but accounts for changed stdin, stdout""" @@ -373,22 +387,6 @@ return s.strip().lower() return s.strip() - def parseline(self, line): - """Parse the line into a command name and a string containing - the arguments. Returns a tuple containing (command, args, line). - 'command' and 'args' may be None if the line couldn't be parsed. - """ - line = line.strip() - if not line: - return None, None, line - shortcut = self.shortcuts.get(line[0]) - if shortcut and hasattr(self, 'do_%s' % shortcut): - line = '%s %s' % (shortcut, line[1:]) - i, n = 0, len(line) - while i < n and line[i] in self.identchars: i = i+1 - cmd, arg = line[:i], line[i:].strip().strip(self.terminators) - return cmd, arg, line - def showParam(self, param): param = self.clean(param) if param in self.settable: @@ -639,3 +637,6 @@ def restore(self): for attrib in self.attribs: setattr(self.obj, attrib, getattr(self, attrib)) + +if __name__ == '__main__': + doctest.testmod() \ No newline at end of file