Mercurial > python-cmd2
changeset 87:683ed678b636
restoring IOError to xclip trap; backing out weird hg problem
author | catherine@Elli.myhome.westell.com |
---|---|
date | Mon, 30 Jun 2008 22:05:01 -0400 |
parents | 86bb50447e99 |
children | |
files | cmd2.py setup.py |
diffstat | 2 files changed, 137 insertions(+), 115 deletions(-) [+] |
line wrap: on
line diff
--- a/cmd2.py Fri Jun 27 09:51:01 2008 -0400 +++ b/cmd2.py Mon Jun 30 22:05:01 2008 -0400 @@ -18,7 +18,7 @@ As of 0.3.0, options should be specified as `optparse` options. See README.txt. flagReader.py options are still supported for backward compatibility """ -import cmd, re, os, sys, optparse, subprocess, tempfile, pyparsing +import cmd, re, os, sys, optparse, subprocess, tempfile, pyparsing, doctest from optparse import make_option class OptionParser(optparse.OptionParser): @@ -119,14 +119,102 @@ xclipproc = subprocess.Popen('xclip -sel clip', shell=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE) xclipproc.stdin.write(txt) xclipproc.stdin.close() + # but we want it in both the "primary" and "mouse" clipboards + xclipproc = subprocess.Popen('xclip', shell=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE) + xclipproc.stdin.write(txt) + xclipproc.stdin.close() else: def getPasteBuffer(): raise OSError, pastebufferr % ('xclip', 'On Debian/Ubuntu, install with "sudo apt-get install xclip"') setPasteBuffer = getPasteBuffer - + +pyparsing.ParserElement.setDefaultWhitespaceChars(' \t') # see http://pyparsing.wikispaces.com/message/view/home/1352689 + +class Arguments(str): + def __new__(cls, s, parent): + result = str.__new__(cls, s) + result.parent = parent + return result + +class UserCommand(str): + def __new__(cls, s, app): + return str.__new__(cls, s) + def __init__(self, s, app): + self.terminator = None + self.terminator_suffix = None + s = s.strip() + shortcut = app.shortcuts.get(s[0]) + if shortcut and hasattr(app, 'do_%s' % shortcut): + s = '%s %s' % (shortcut, s[1:]) + self.searchable = self.asEntered = s + self.app = app + self.terminator_pattern = self.punctuationPattern(self.app.terminators) + self.output_destination_pattern = self.punctuationPattern(['>>', '>']) + self.input_source_pattern = self.punctuationPattern(['<']) + self.pipe_destination_pattern = self.punctuationPattern(['|']) + def punctuationPattern(self, punctuators): + processed = punctuators[:] + if not hasattr(processed[0], 'parseString'): + processed[0] = pyparsing.Literal(processed[0]) + processed = reduce(lambda x, y: x ^ y, processed) + processed.ignore(pyparsing.sglQuotedString) + processed.ignore(pyparsing.dblQuotedString) + pattern = pyparsing.SkipTo(processed) + processed + pyparsing.restOfLine + return pattern + def parse(self): + termination = self.terminator_pattern.searchString(self.asEntered) + if termination: + self.terminator = termination[0][1] + if len(termination[0]) > 3: + self.terminator_suffix = termination[0][2] + punctuators = ['|','>','>>','<'] + punctuators.extend(self.app.terminators) + punctuated = self.punctuationPattern(punctuators).searchString(self.asEntered) + if punctuated: + self.executable, self.searchable = punctuated[0][0], self.asEntered[len(punctuated[0][0]):] + else: + self.executable, self.searchable = self.asEntered, '' + self.executable = self.executable.strip() + try: + self.cmd, self.arg = (pyparsing.Word(self.app.identchars)+pyparsing.restOfLine).parseString(self.executable) + self.arg = Arguments(self.arg, self) + except pyparsing.ParseException: + self.cmd, self.arg = None, None + + def complete(self): + while not self.terminator_pattern.searchString(self.asEntered): + inp = self.app.pseudo_raw_input(self.app.continuationPrompt) + self.asEntered = '%s\n%s' % (self.asEntered, inp) + def redirectedInput(self): + inputFrom = self.input_source_pattern.searchString(self.searchable) + if inputFrom: + if inputFrom[0][-1].strip(): + input = self.app.fileimport(source=inputFrom[0][-1].strip()) + else: + input = getPasteBuffer() + if self.terminator: + self.executable = '%s %s' % (self.executable, input) + else: + self.executable = '%s %s %s' % (self.executable, inputFrom[0][0], input) + def pipeDestination(self): + pipeTo = self.pipe_destination_pattern.searchString(self.searchable) + return (pipeTo and pipeTo[0][-1]) or None + def redirectedOutput(self): + outputTo = self.output_destination_pattern.searchString(self.searchable) + if outputTo: + dest = outputTo[0][-1].strip() + if outputTo[0][1] == '>>': + mode = 'a' + else: + mode = 'w' + return dest, mode + return None, None + class Cmd(cmd.Cmd): caseInsensitive = True - multilineCommands = [] + multilineCommands = [] # commands that need a terminator to be finished + terminators = [';', pyparsing.LineEnd() + pyparsing.LineEnd()] + terminatorKeepingCommands = [] # commands that expect to process their own terminators (else it will be stripped during parse) continuationPrompt = '> ' shortcuts = {'?': 'help', '!': 'shell', '@': 'load'} excludeFromHistory = '''run r list l history hi ed edit li eof'''.split() @@ -143,7 +231,6 @@ break settable = ['prompt', 'continuationPrompt', 'defaultFileName', 'editor', 'caseInsensitive'] - terminators = ';\n' _TO_PASTE_BUFFER = 1 def do_cmdenvironment(self, args): self.stdout.write(""" @@ -151,8 +238,8 @@ Commands may be terminated with: %(terminators)s Settable parameters: %(settable)s """ % - { 'casesensitive': ('not ' and self.caseInsensitive) or '', - 'terminators': ' '.join(self.terminators), + { 'casesensitive': (self.caseInsensitive or '') and 'not ', + 'terminators': str(self.terminators), 'settable': ' '.join(self.settable) }) @@ -174,33 +261,11 @@ result = "\n".join('%s: %s' % (sc[0], sc[1]) for sc in self.shortcuts.items()) self.stdout.write("Single-key shortcuts for other commands:\n%s\n" % (result)) - notAPipe = pyparsing.SkipTo('|') - notAPipe.ignore(pyparsing.sglQuotedString) - notAPipe.ignore(pyparsing.dblQuotedString) - pipeFinder = notAPipe + '|' + pyparsing.SkipTo(pyparsing.StringEnd()) - def parsePipe(self, statement, mustBeTerminated): - try: - statement, pipe, destination = self.pipeFinder.parseString(statement) - redirect = subprocess.Popen(destination, shell=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE) - return statement, redirect - except pyparsing.ParseException: - return statement, None - - legalFileName = re.compile(r'''^[^"'\s]+$''') - def parseRedirector(self, statement, symbol, mustBeTerminated=False): - # pipeFinder.scanString(statement) - parts = statement.split(symbol) - if (len(parts) < 2): - return statement, None - if mustBeTerminated and (not self.statementEndPattern.search(parts[-2])): - return statement, None - (newStatement, redirect) = (symbol.join(parts[:-1]), parts[-1].strip()) - if redirect: - if not self.legalFileName.search(redirect): - return statement, None - else: - redirect = self._TO_PASTE_BUFFER - return newStatement, redirect + def strip_terminators(self, txt): + termination = self.commmand_terminator_finder(txt) + if termination: + txt = termination[0] + return txt def extractCommand(self, statement): try: @@ -211,22 +276,6 @@ command = command.lower() return command, args - def parseRedirectors(self, statement): - mustBeTerminated = self.extractCommand(statement)[0] in self.multilineCommands - newStatement, redirect = self.parsePipe(statement, mustBeTerminated) - if redirect: - return newStatement, redirect, 'pipe' - newStatement, redirect = self.parseRedirector(statement, '>>', mustBeTerminated) - if redirect: - return newStatement, redirect, 'a' - newStatement, redirect = self.parseRedirector(statement, '>', mustBeTerminated) - if redirect: - return newStatement, redirect, 'w' - newStatement, redirect = self.parseRedirector(statement, '<', mustBeTerminated) - if redirect: - return newStatement, redirect, 'r' - return statement, '', '' - def onecmd(self, line, assumeComplete=False): """Interpret the argument as though it had been typed in response to the prompt. @@ -236,72 +285,49 @@ The return value is a flag indicating whether interpretation of commands by the interpreter should stop. - """ - command, args = self.extractCommand(line) - statement = originalStatement = ' '.join([command, args]) - if (not assumeComplete) and (command in self.multilineCommands): - statement = self.finishStatement(statement) + """ statekeeper = None - stop = 0 - statement, redirect, mode = self.parseRedirectors(statement) - if isinstance(redirect, subprocess.Popen): - statekeeper = Statekeeper(self, ('stdout',)) - self.stdout = redirect.stdin - elif redirect == self._TO_PASTE_BUFFER: - try: - clipcontents = getPasteBuffer() - if mode in ('w', 'a'): - statekeeper = Statekeeper(self, ('stdout',)) + stop = 0 + command, args = self.extractCommand(line) + originalStatement = ' '.join([command, args]) + statement = UserCommand(originalStatement, self) + if (not assumeComplete) and (command in self.multilineCommands): + statement.complete() + statement.parse() + statement.redirectedInput() + pipeTo = statement.pipeDestination() + if pipeTo: + statekeeper = Statekeeper(self, ('stdout',)) + dest = subprocess.Popen(pipeTo, shell=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE) + self.stdout = dest.stdin + else: # can't pipe output AND send it to a file + outputTo, outputMode = statement.redirectedOutput() + if outputMode: + statekeeper = Statekeeper(self, ('stdout',)) + if outputTo: + self.stdout = open(outputTo, outputMode) + else: self.stdout = tempfile.TemporaryFile() - if mode == 'a': - self.stdout.write(clipcontents) - else: - statement = '%s %s' % (statement, clipcontents) - except OSError, e: - print e - return 0 - elif redirect: - if mode in ('w','a'): - statekeeper = Statekeeper(self, ('stdout',)) - self.stdout = open(redirect, mode) - else: - statement = '%s %s' % (statement, self.fileimport(statement=statement, source=redirect)) - if isinstance(redirect, subprocess.Popen): - stop = self.onecmd(statement) - else: - stop = cmd.Cmd.onecmd(self, statement) + if outputMode == 'a': + self.stdout.write(getPasteBuffer()) + + stop = cmd.Cmd.onecmd(self, statement) try: if command not in self.excludeFromHistory: self.history.append(originalStatement) finally: if statekeeper: - if redirect == self._TO_PASTE_BUFFER: + if pipeTo: + for result in dest.communicate(): + statekeeper.stdout.write(result or '') + elif outputMode and not outputTo: self.stdout.seek(0) writeToPasteBuffer(self.stdout.read()) - elif isinstance(redirect, subprocess.Popen): - for result in redirect.communicate(): - statekeeper.stdout.write(result or '') self.stdout.close() statekeeper.restore() return stop - - statementEndPattern = re.compile(r'[%s]\s*$' % terminators) - def statementHasEnded(self, lines): - return bool(self.statementEndPattern.search(lines)) \ - or lines[-3:] == 'EOF' \ - or self.parseRedirectors(lines)[1] - - def finishStatement(self, firstline): - statement = firstline - while not self.statementHasEnded(statement): - inp = self.pseudo_raw_input(self.continuationPrompt) - statement = '%s\n%s' % (statement, inp) - return statement - # assembling a list of lines and joining them at the end would be faster, - # but statementHasEnded needs a string arg; anyway, we're getting - # user input and users are slow. - + def pseudo_raw_input(self, prompt): """copied from cmd's cmdloop; like raw_input, but accounts for changed stdin, stdout""" @@ -378,17 +404,9 @@ the arguments. Returns a tuple containing (command, args, line). 'command' and 'args' may be None if the line couldn't be parsed. """ - line = line.strip() - if not line: + if not line.executable: return None, None, line - shortcut = self.shortcuts.get(line[0]) - if shortcut and hasattr(self, 'do_%s' % shortcut): - line = '%s %s' % (shortcut, line[1:]) - i, n = 0, len(line) - while i < n and line[i] in self.identchars: i = i+1 - cmd, arg = line[:i], line[i:].strip().strip(self.terminators) - return cmd, arg, line - + return line.cmd, line.arg, line.executable def showParam(self, param): param = self.clean(param) if param in self.settable: @@ -416,7 +434,7 @@ if paramName not in self.settable: raise NotSettableError currentVal = getattr(self, paramName) - val = cast(currentVal, val.strip(self.terminators)) + val = cast(currentVal, self.strip_terminators(val)) setattr(self, paramName, val) self.stdout.write('%s - was: %s\nnow: %s\n' % (paramName, currentVal, val)) except (ValueError, AttributeError, NotSettableError), e: @@ -551,7 +569,7 @@ stop = self.postcmd(stop, runme) do_r = do_run - def fileimport(self, statement, source): + def fileimport(self, source): try: f = open(source) except IOError: @@ -639,3 +657,7 @@ def restore(self): for attrib in self.attribs: setattr(self.obj, attrib, getattr(self, attrib)) + +if __name__ == '__main__': + doctest.testmod() + \ No newline at end of file