changeset 0:8c7dfe40b70a

pysh
author Thinker K.F. Li <thinker@codemud.net>
date Mon, 28 Mar 2011 20:42:37 +0800
parents
children 79bbce42690e
files comm.py pysh.py shell_agent.py
diffstat 3 files changed, 516 insertions(+), 0 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/comm.py	Mon Mar 28 20:42:37 2011 +0800
@@ -0,0 +1,484 @@
+## \brief Communication Protocol from Shell to Python namespace.
+#
+# \section scenario Scenario
+#
+# User will interleave shell command and Python code.  They want to
+# run Python code for some points when they run a shell command.  The
+# invokation may pass some information generated dynamic by shell
+# command to Python code.  We need a mechanisim for passing
+# information from shell to Python.  These information include
+# environment variables, stdin, stdout, and arguments.
+#
+# This module provide a protocol running a server and several clients.
+# Python code that call shell would host a server for accepting
+# requests from client, while clients are an agent program called by
+# shell command to passing requested command and informations to
+# Python code.  When a server receives a request, it extracts
+# informations from the request and calling a callable specified by
+# the requested command.
+#
+# The server and clients are connected through a unix domain socket.
+# The server pass the listened address to clients through environment
+# variable, PYSHELL_SERVER.
+#
+# The caller function, run*(), will replace Python calls, in the shell
+# command, with running agent program.  The agent program will connect
+# to the server to passing a request and information.
+#
+# Once a client connect to the server.  It pass a request in following
+# format.
+#
+# <verbatim>
+# REQ<length of following object><a serialized request object>
+# </verbatim>
+# 
+# Packets for stdin and stdout are following the request.  The client
+# can encapsulate the data from stdin into a packet and passed to
+# server.  In reverse, the server can pass stdout stream by
+# encapsulate content in packets.  The packet format is
+#
+# <verbatim>
+# IPK<data length><data>
+# OPK<data length><data>
+# </verbatim>
+#
+# To close a stdin or stdout session.  Send a 'IED' or 'OED'.
+# A byte of exit code will follow the 'OED'.
+#
+# <verbatim>
+# IED
+# OED<exit code>
+# </verbatim>
+#
+# The fromat of request object looks like following example
+#
+# <verbatim>
+# request = {'request': 'callable name',
+#            'environ': {....},
+#            'args': ('arg1', 'arg2', ...}
+#            }
+# </verbatim>
+# 
+#
+import os
+import struct
+import cPickle as pickle
+import asyncore
+
+class server_session(object):
+    def __init__(self, server, sock, peer_addr):
+        self._server = server
+        self._sock = sock
+        self._peer_addr = peer_addr
+        self._calling_gen = None    # generator
+        self._ibuf = []             # input buffer
+        self._exit_flag = False
+        pass
+
+    def _run(self, callable_name, args, env):
+        server = self._server
+        calling_gen = server._ns[callable_name](args, env, self)
+        if calling_gen == None:
+            self.exit(0)
+        else:
+            calling_gen.next()
+            self._calling_gen = calling_gen # a generator
+            pass
+        pass
+    
+    def _handle_req(self):
+        if self._calling_gen:
+            raise RuntimeError, 'this session have called a callable'
+        
+        sock = self._sock
+        sz_str = sock.recv(4)
+        if len(sz_str) != 4:
+            raise RuntimeError, 'invalid REQ packet, invalid size'
+        
+        sz = struct.unpack('i', sz_str)[0]
+        if sz < 0:
+            raise ValueError, 'invalid REQ packet, invalid size'
+
+        data = sock.recv(sz)
+        req = pickle.loads(data)
+        callable_name = req['request']
+        args = req['args']
+        env = req['environ']
+
+        self._run(callable_name, args, env)
+        pass
+
+    def _handle_ipk(self):
+        if not self._calling_gen:
+            raise RuntimeError, 'this session have not yet call any callable'
+        calling_gen = self._calling_gen
+
+        sock = self._sock
+        sz_str = sock.recv(4)
+        if len(sz_str) != 4:
+            raise RuntimeError, 'invliad IPK packet, invalid size'
+        
+        sz = struct.unpack('i', sz_str)[0]
+        if sz < 0:
+            raise ValueError, 'invalid IPK packet, invalid size'
+
+        data = sock.recv(sz)
+        self._ibuf.append(data)
+        try:
+            calling_gen.next()
+        except StopIteration:
+            self.exit(0)
+            pass
+        pass
+
+    def _handle_ied(self):
+        if not self._calling_gen:
+            return
+
+        self._ibuf.append(None)
+        calling_gen = self._calling_gen
+        try:
+            calling_gen.next()
+        except StopIteration:
+            self.exit(0)
+            pass
+        pass
+
+    def handle_connection(self):
+        if self._exit_flag:
+            raise RuntimeError, 'the session is closed'
+        
+        sock = self._sock
+        
+        cmd = sock.recv(3)
+        if cmd == 'REQ':
+            self._handle_req()
+        elif cmd == 'IPK':
+            self._handle_ipk()
+        elif cmd == 'IED':
+            self._handle_ied()
+        else:
+            raise RuntimeError, 'invalid packet %s' % (cmd)
+        pass
+
+    def recv(self):
+        last = self._ibuf.pop(0)
+        return last
+
+    def send(self, data):
+        sz = len(data)
+        sz_str = struct.pack('i', sz)
+        
+        pkt = 'OPK' + sz_str + data
+        sock = self._sock
+        sock.send(pkt)
+        pass
+
+    def exit(self, code):
+        if self._exit_flag:
+            return
+        
+        pkt = 'OED' + chr(code)
+        sock = self._sock
+        sock.send(pkt)
+        sock.close()
+        
+        self._exit_flag = True
+        pass
+
+    def get_sock(self):
+        return self._sock
+
+    def is_exited(self):
+        return self._exit_flag
+    pass
+
+class server(object):
+    def __init__(self, ns):
+        self._addr = self._make_server_addr()
+        self._ns = ns
+        pass
+
+    def _make_server_addr(self):
+        import random
+        pid = os.getpid()
+        rn = random.randint(0, 10000)
+        addr = '/tmp/pyshell-server-%d-%d' % (pid, rn)
+        return addr
+
+    def listen(self):
+        import socket
+
+        sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+        sock.bind(self._addr)
+        sock.listen(5)
+        self._sock = sock
+        pass
+
+    def accept(self):
+        sock = self._sock
+        if not sock:
+            raise ValueError, 'invalid socket'
+        client_sock, peer_addr = sock.accept()
+        session = server_session(self, client_sock, peer_addr)
+        return session
+
+    def close(self):
+        sock = self._sock
+        sock.close()
+        self._sock = None
+        pass
+
+    def get_addr(self):
+        return self._addr
+
+    def get_sock(self):
+        return self._sock
+
+    def handle(self):
+        class server_dispatcher(object):
+            def __init__(self, server, socket_map):
+                self._sock = server.get_sock()
+                self._server = server
+                self._socket_map = socket_map
+                pass
+
+            def readable(self):
+                return True
+
+            def writable(self):
+                return False
+
+            def handle_error(self):
+                import traceback
+                traceback.print_exc()
+                pass
+
+            def handle_read_event(self):
+                server = self._server
+                session = server.accept()
+                if server.is_closed():
+                    socket_map = self._socket_map
+                    sock = self._sock
+                    del socket_map[sock]
+                    return
+
+                socket_map = self._socket_map
+                sock = session.get_sock()
+                socket_map[sock] = session_dispatcher(session, socket_map)
+                pass
+            pass
+
+        class session_dispatcher(object):
+            def __init__(self, session, socket_map):
+                self._session = session
+                self._socket_map = socket_map
+                pass
+
+            def readable(self):
+                return True
+
+            def writable(self):
+                return False
+
+            def handle_error(self):
+                import traceback
+                traceback.print_exc()
+                pass
+
+            def handle_read_event(self):
+                session = self._session
+                session.handle_connection()
+                if session.is_exited():
+                    sock = session.get_sock()
+                    del self._socket_map[sock]
+                    pass
+                pass
+            pass
+        
+        sock_map = {}
+        sock = self._sock
+        serv_disp = server_dispatcher(self, sock_map)
+        sock_map[sock] = serv_disp
+        
+        asyncore.loop(map=sock_map)
+        pass
+
+    def is_closed(self):
+        return self._sock == None
+    pass
+
+class client(object):
+    def __init__(self, callable_name, args, env, stdin, stdout):
+        self._callable_name = callable_name
+        self._args = args
+        self._env = env
+        self._stdin = stdin
+        self._stdout = stdout
+        self._sock = None
+        pass
+
+    def _server_addr(self):
+        env = self._env
+        addr = env['PYSHELL_SERVER']
+        return addr
+
+    def _connect(self, addr):
+        import socket
+
+        sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+        sock.connect(addr)
+        return sock
+
+    def connect(self):
+        server_addr = self._server_addr()
+        sock = self._connect(server_addr)
+        
+        req = {'request': self._callable_name,
+               'args': self._args,
+               'environ': self._env}
+        req_str = pickle.dumps(req)
+        
+        sz = len(req_str)
+        sz_str = struct.pack('i', sz)
+        
+        pkt = 'REQ' + sz_str + req_str
+        sock.send(pkt)
+
+        self._sock = sock
+        pass
+
+    def handle(self):
+        class stdin_dispatcher(object):
+            def __init__(self, stdin, sock, socket_map):
+                self._stdin = stdin
+                self._sock = sock
+                self._socket_map = socket_map
+                pass
+            
+            def readable(self):
+                return True
+
+            def writable(self):
+                return False
+
+            def handle_error(self):
+                import traceback
+                traceback.print_exc()
+                pass
+
+            def handle_read_event(self):
+                data = self._stdin.readline(1024)
+                sz = len(data)
+                
+                if sz == 0:
+                    del self._socket_map[self._stdin]
+                    self._stdin.close()
+                    pkt = 'IED'
+                else:
+                    sz_str = struct.pack('i', sz)
+                    pkt = 'IPK' + sz_str + data
+                    pass
+                
+                sock = self._sock
+                sock.send(pkt)
+                pass
+            pass
+
+        class server_displatcher(object):
+            def __init__(self, sock, stdout, socket_map):
+                self._sock = sock
+                self._stdout = stdout
+                self._socket_map = socket_map
+                
+                socket_map[sock] = self
+                pass
+
+            def readable(self):
+                return True
+
+            def writable(self):
+                return False
+
+            def handle_error(self):
+                import traceback
+                traceback.print_exc()
+                pass
+
+            def _handle_oed(self):
+                sock = self._sock
+                stdout = self._stdout
+                
+                stdout.close()
+                sock.close()
+                del self._socket_map[self._sock]
+                pass
+
+            def _handle_opk(self):
+                sock = self._sock
+                stdout = self._stdout
+                
+                sz_str = sock.recv(4)
+                sz = struct.unpack('i', sz_str)[0]
+                
+                data = sock.recv(sz)
+
+                stdout.write(data)
+                pass
+
+            def handle_read_event(self):
+                sock = self._sock
+                
+                cmd = sock.recv(3)
+                if cmd == 'OED':
+                    self._handle_oed()
+                elif cmd == 'OPK':
+                    self._handle_opk()
+                else:
+                    raise RuntimeError, 'invalid packet type'
+                pass
+            pass
+
+        sock_map = {}
+        i_disp = stdin_dispatcher(self._stdin, self._sock, sock_map)
+        serv_disp = server_displatcher(self._sock, self._stdout, sock_map)
+        
+        sock_map[self._stdin] = i_disp
+        sock_map[self._sock] = serv_disp
+
+        asyncore.loop(map=sock_map)
+        pass
+    pass
+
+if __name__ == '__main__':
+    import sys
+
+    def test_call(args, env, session):
+        print 'test_call'
+        yield
+        while True:
+            data = session.recv()
+            if data == None:
+                return
+            session.send('from client: ' + data)
+            yield
+            pass
+        pass
+    
+    ns = {'data': 'testdata', 'test_call': test_call}
+    s = server(ns)
+    s.listen()
+    addr = s.get_addr()
+    sock = s.get_sock()
+
+    pid = os.fork()
+    if pid == 0:                # child (client)
+        os.environ['PYSHELL_SERVER'] = addr
+        c = client('test_call', ['1', '2'], os.environ, sys.stdin, sys.stdout)
+        c.connect()
+        c.handle()
+        pass
+    else:                       # parent (server)
+        s.handle()
+        pass
+    pass
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/pysh.py	Mon Mar 28 20:42:37 2011 +0800
@@ -0,0 +1,19 @@
+import sys
+import os
+
+def run(cmd):
+    prev_frame = sys._getframe().f_back
+    prev_locals = prev_frame.f_locals
+    prev_globals = prev_frame.f_globals
+
+    env = ';'.join(['%s="%s"' % (k, v)
+                    for k, v in prev_locals.items()
+                    if isinstance(v, (str, int, float))])
+    os.system(env + ';' + cmd)
+    pass
+
+if __name__ == '__main__':
+    for i in range(20):
+        run('echo $i')
+        pass
+    pass
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/shell_agent.py	Mon Mar 28 20:42:37 2011 +0800
@@ -0,0 +1,13 @@
+import comm
+import sys, os
+
+if __name__ == '__main__':
+    if len(sys.argv) < 2:
+        sys.exit(255)
+        pass
+    
+    cmd = sys.argv[1]
+    args = sys.argv[2:]
+    client = comm.client(cmd, args, os.environ, sys.stdin, sys.stdout)
+    client.connect()
+    pass