Mercurial > python-cmd2
comparison cmd2.py @ 66:f373aaa2390c
ready for first trial
author | catherine@Elli.myhome.westell.com |
---|---|
date | Mon, 23 Jun 2008 15:15:11 -0400 |
parents | 4e028e9ec4c2 |
children | a78dff1e7bca |
comparison
equal
deleted
inserted
replaced
65:4e028e9ec4c2 | 66:f373aaa2390c |
---|---|
207 """Lists single-key shortcuts available.""" | 207 """Lists single-key shortcuts available.""" |
208 result = "\n".join('%s: %s' % (sc[0], sc[1]) for sc in self.shortcuts.items()) | 208 result = "\n".join('%s: %s' % (sc[0], sc[1]) for sc in self.shortcuts.items()) |
209 self.stdout.write("Single-key shortcuts for other commands:\n%s\n" % (result)) | 209 self.stdout.write("Single-key shortcuts for other commands:\n%s\n" % (result)) |
210 | 210 |
211 commmand_terminator_finder = punctuationParser(terminators) | 211 commmand_terminator_finder = punctuationParser(terminators) |
212 outputTo_finder = punctuationParser(['>']) | 212 output_destination_finder = punctuationParser(['>>', '>']) |
213 inputFrom_finder = punctuationParser(['<']) | 213 input_source_finder = punctuationParser(['<']) |
214 pipe_finder = punctuationParser(['|']) | 214 pipe_destination_finder = punctuationParser(['|']) |
215 | |
216 notAPipe = pyparsing.SkipTo('|') | |
217 notAPipe.ignore(pyparsing.sglQuotedString) | |
218 notAPipe.ignore(pyparsing.dblQuotedString) | |
219 pipeFinder = notAPipe + '|' + pyparsing.SkipTo(pyparsing.StringEnd()) | |
220 def parsePipe(self, statement, mustBeTerminated): | |
221 try: | |
222 statement, pipe, destination = self.pipeFinder.parseString(statement) | |
223 redirect = subprocess.Popen(destination, shell=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE) | |
224 return statement, redirect | |
225 except pyparsing.ParseException: | |
226 return statement, None | |
227 | |
228 legalFileName = re.compile(r'''^[^"'\s]+$''') | |
229 def parseRedirector(self, statement, symbol, mustBeTerminated=False): | |
230 # pipeFinder.scanString(statement) | |
231 parts = statement.split(symbol) | |
232 if (len(parts) < 2): | |
233 return statement, None | |
234 if mustBeTerminated and (not self.statementEndPattern.search(parts[-2])): | |
235 return statement, None | |
236 (newStatement, redirect) = (symbol.join(parts[:-1]), parts[-1].strip()) | |
237 if redirect: | |
238 if not self.legalFileName.search(redirect): | |
239 return statement, None | |
240 else: | |
241 redirect = self._TO_PASTE_BUFFER | |
242 return newStatement, redirect | |
243 | 215 |
244 def extractCommand(self, statement): | 216 def extractCommand(self, statement): |
245 try: | 217 try: |
246 (command, args) = statement.split(None,1) | 218 (command, args) = statement.split(None,1) |
247 except ValueError: | 219 except ValueError: |
248 (command, args) = statement, '' | 220 (command, args) = statement, '' |
249 if self.caseInsensitive: | 221 if self.caseInsensitive: |
250 command = command.lower() | 222 command = command.lower() |
251 return command, args | 223 return command, args |
252 | 224 |
253 def parseRedirectors(self, statement): | 225 def completedStatement(self, firstline): |
254 mustBeTerminated = self.extractCommand(statement)[0] in self.multilineCommands | 226 statement = firstline |
255 newStatement, redirect = self.parsePipe(statement, mustBeTerminated) | 227 while not self.commmand_terminator_finder(statement): |
256 if redirect: | 228 inp = self.pseudo_raw_input(self.continuationPrompt) |
257 return newStatement, redirect, 'pipe' | 229 statement = '%s\n%s' % (statement, inp) |
258 newStatement, redirect = self.parseRedirector(statement, '>>', mustBeTerminated) | 230 return statement |
259 if redirect: | 231 # assembling a list of lines and joining them at the end would be faster, |
260 return newStatement, redirect, 'a' | 232 # but statementHasEnded needs a string arg; anyway, we're getting |
261 newStatement, redirect = self.parseRedirector(statement, '>', mustBeTerminated) | 233 # user input and users are slow. |
262 if redirect: | 234 |
263 return newStatement, redirect, 'w' | |
264 newStatement, redirect = self.parseRedirector(statement, '<', mustBeTerminated) | |
265 if redirect: | |
266 return newStatement, redirect, 'r' | |
267 return statement, '', '' | |
268 | |
269 def onecmd(self, line, assumeComplete=False): | 235 def onecmd(self, line, assumeComplete=False): |
270 """Interpret the argument as though it had been typed in response | 236 """Interpret the argument as though it had been typed in response |
271 to the prompt. | 237 to the prompt. |
272 | 238 |
273 This may be overridden, but should not normally need to be; | 239 This may be overridden, but should not normally need to be; |
277 | 243 |
278 """ | 244 """ |
279 command, args = self.extractCommand(line) | 245 command, args = self.extractCommand(line) |
280 statement = originalStatement = ' '.join([command, args]) | 246 statement = originalStatement = ' '.join([command, args]) |
281 if (not assumeComplete) and (command in self.multilineCommands): | 247 if (not assumeComplete) and (command in self.multilineCommands): |
282 statement = self.finishStatement(statement) | 248 statement = self.completedStatement(statement) |
283 statekeeper = None | 249 statekeeper = None |
284 stop = 0 | 250 stop = 0 |
285 statement, redirect, mode = self.parseRedirectors(statement) | 251 |
286 if isinstance(redirect, subprocess.Popen): | 252 inputFrom = self.input_source_finder(statement) |
287 statekeeper = Statekeeper(self, ('stdout',)) | 253 if inputFrom: |
254 statement, source = inputFrom[0], inputFrom[-1] | |
255 if source: | |
256 statement = '%s %s' % (statement, self.fileimport(statement=statement, source=source)) | |
257 else: | |
258 statement = getPasteBuffer() | |
259 | |
260 pipeTo = self.pipe_destination_finder(statement) | |
261 if pipeTo: | |
262 statement, pipeTo = pipeTo[0], [-1] | |
263 statekeeper = Statekeeper(self, ('stdout',)) | |
264 pipeTo = subprocess.Popen(PipeTo, shell=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE) | |
288 self.stdout = redirect.stdin | 265 self.stdout = redirect.stdin |
289 elif redirect == self._TO_PASTE_BUFFER: | 266 else: # can't pipe output AND send it to a file |
290 try: | 267 outputTo = self.outputTo_finder(statement) |
291 clipcontents = getPasteBuffer() | 268 if outputTo: |
292 if mode in ('w', 'a'): | 269 statement, destination = outputTo[0], outputTo[-1] |
293 statekeeper = Statekeeper(self, ('stdout',)) | 270 statekeeper = Statekeeper(self, ('stdout',)) |
271 mode = ((output[1] == '>>') and 'a') or 'w' | |
272 if destination: | |
273 self.stdout = open(destination, mode) | |
274 else: | |
294 self.stdout = tempfile.TemporaryFile() | 275 self.stdout = tempfile.TemporaryFile() |
295 if mode == 'a': | 276 if mode == 'a': |
296 self.stdout.write(clipcontents) | 277 self.stdout.write(getPasteBuffer()) |
297 else: | 278 |
298 statement = '%s %s' % (statement, clipcontents) | 279 stop = cmd.Cmd.onecmd(self, statement) |
299 except OSError, e: | |
300 print e | |
301 return 0 | |
302 elif redirect: | |
303 if mode in ('w','a'): | |
304 statekeeper = Statekeeper(self, ('stdout',)) | |
305 self.stdout = open(redirect, mode) | |
306 else: | |
307 statement = '%s %s' % (statement, self.fileimport(statement=statement, source=redirect)) | |
308 if isinstance(redirect, subprocess.Popen): | |
309 stop = self.onecmd(statement) | |
310 else: | |
311 stop = cmd.Cmd.onecmd(self, statement) | |
312 try: | 280 try: |
313 if command not in self.excludeFromHistory: | 281 if command not in self.excludeFromHistory: |
314 self.history.append(originalStatement) | 282 self.history.append(originalStatement) |
315 finally: | 283 finally: |
316 if statekeeper: | 284 if statekeeper: |
317 if redirect == self._TO_PASTE_BUFFER: | 285 if outputTo and not destination: |
318 self.stdout.seek(0) | 286 self.stdout.seek(0) |
319 writeToPasteBuffer(self.stdout.read()) | 287 writeToPasteBuffer(self.stdout.read()) |
320 elif isinstance(redirect, subprocess.Popen): | 288 elif pipeTo: # uh-oh. HUH? |
321 for result in redirect.communicate(): | 289 for result in redirect.communicate(): |
322 statekeeper.stdout.write(result or '') | 290 statekeeper.stdout.write(result or '') |
323 self.stdout.close() | 291 self.stdout.close() |
324 statekeeper.restore() | 292 statekeeper.restore() |
325 | 293 |
326 return stop | 294 return stop |
327 | 295 |
328 #TODO: This should be replaced by pyparsing. What if terminators come | |
329 #inside a string? What about statements that end in different ways? | |
330 # + searching for EOF string is sloppy! | |
331 statementEndPattern = re.compile(r'[%s]\s*$' % terminators) | |
332 lineEndFinder = pyparsing.Optional(pyparsing.CharsNotIn(';')) + ';' | |
333 def statementHasEnded(self, lines): | |
334 #import pdb; pdb.set_trace() | |
335 try: | |
336 self.lineEndFinder.parseString(lines) | |
337 return True | |
338 except pyparsing.ParseException: | |
339 return False | |
340 ''' | |
341 return bool(self.statementEndPattern.search(lines)) \ | |
342 or lines[-3:] == 'EOF' \ | |
343 or self.parseRedirectors(lines)[1]''' | |
344 | |
345 def finishStatement(self, firstline): | |
346 statement = firstline | |
347 while not self.statementHasEnded(statement): | |
348 inp = self.pseudo_raw_input(self.continuationPrompt) | |
349 statement = '%s\n%s' % (statement, inp) | |
350 return statement | |
351 # assembling a list of lines and joining them at the end would be faster, | |
352 # but statementHasEnded needs a string arg; anyway, we're getting | |
353 # user input and users are slow. | |
354 | |
355 def pseudo_raw_input(self, prompt): | 296 def pseudo_raw_input(self, prompt): |
356 """copied from cmd's cmdloop; like raw_input, but accounts for changed stdin, stdout""" | 297 """copied from cmd's cmdloop; like raw_input, but accounts for changed stdin, stdout""" |
357 | 298 |
358 if self.use_rawinput: | 299 if self.use_rawinput: |
359 try: | 300 try: |