Mercurial > traipse_dev
comparison orpg/networking/mplay_messaging.py @ 71:449a8900f9ac ornery-dev
Code refining almost completed, for this round. Some included files are still in need of some clean up, but this is test worthy.
author | sirebral |
---|---|
date | Thu, 20 Aug 2009 03:00:39 -0500 |
parents | c54768cffbd4 |
children | d1aff41c031b |
comparison
equal
deleted
inserted
replaced
70:52a5fa913008 | 71:449a8900f9ac |
---|---|
34 import traceback | 34 import traceback |
35 from threading import Event, Lock | 35 from threading import Event, Lock |
36 from xml.sax.saxutils import escape | 36 from xml.sax.saxutils import escape |
37 from struct import pack, unpack, calcsize | 37 from struct import pack, unpack, calcsize |
38 from string import * | 38 from string import * |
39 from orpg.orpg_version import * | 39 from orpg.orpg_version import VERSION, PROTOCOL_VERSION, CLIENT_STRING, SERVER_MIN_CLIENT_VERSION |
40 import os | 40 import os |
41 import time | 41 import time |
42 | 42 |
43 from orpg.orpgCore import * | 43 from orpg.tools.orpg_log import logger |
44 from orpg.orpgCore import component | |
44 | 45 |
45 def myescape(data): | 46 def myescape(data): |
46 return escape(data,{"\"":""}) | 47 return escape(data,{"\"":""}) |
47 | 48 |
48 class messenger: | 49 class messenger: |
49 def __init__(self, *args, **kwargs): | 50 def __init__(self, *args, **kwargs): |
50 self.log = component.get("log") | 51 #self.xml = component.get("xml") used once, no need for the object. |
51 self.xml = component.get("xml") | 52 self.dir_struct = component.get("dir_struct") #used? |
52 self.dir_struct = component.get("dir_struct") | 53 self.validate = component.get("validate") #used?? |
53 self.validate = component.get("validate") | 54 #self.settings = component.get("settings") ## used once, no need for the object. |
54 self.settings = component.get("settings") | |
55 if kwargs.has_key('isServer'): | 55 if kwargs.has_key('isServer'): |
56 self.isServer = kwargs['isServer'] | 56 self.isServer = kwargs['isServer'] |
57 else: | 57 else: |
58 self.isServer = False | 58 self.isServer = False |
59 self.listen_event = Event() | 59 self.listen_event = Event() |
62 self.inbox = Queue.Queue(0) | 62 self.inbox = Queue.Queue(0) |
63 self.startedEvent = Event() | 63 self.startedEvent = Event() |
64 self.exitEvent = Event() | 64 self.exitEvent = Event() |
65 self.sendThreadExitEvent = Event() | 65 self.sendThreadExitEvent = Event() |
66 self.recvThreadExitEvent = Event() | 66 self.recvThreadExitEvent = Event() |
67 self.port = int(self.settings.get_setting("port")) | 67 self.port = int(component.get("settings").get_setting("port")) ##used even? |
68 self.ip = socket.gethostbyname(socket.gethostname()) | 68 self.ip = socket.gethostbyname(socket.gethostname()) |
69 self.lensize = calcsize('i') | 69 self.lensize = calcsize('i') |
70 self.mplay_type = ('disconnected', 'connected', 'disconnecting', 'group change', 'group change failed') | 70 self.mplay_type = ('disconnected', 'connected', 'disconnecting', 'group change', 'group change failed') |
71 self.status = self.mplay_type[0] | 71 self.status = self.mplay_type[0] |
72 self.alive = False | 72 self.alive = False |
135 message += ' />' | 135 message += ' />' |
136 return message | 136 return message |
137 | 137 |
138 def disconnect(self): | 138 def disconnect(self): |
139 self.set_status(2) | 139 self.set_status(2) |
140 self.log.log("client stub " + self.ip +" disconnecting...", ORPG_DEBUG) | 140 logger.debug("client stub " + self.ip +" disconnecting...") |
141 self.log.log("closing sockets...", ORPG_DEBUG) | 141 logger.debug("closing sockets...") |
142 try: | 142 try: |
143 self.sock.shutdown( 2 ) | 143 self.sock.shutdown( 2 ) |
144 except: | 144 except: |
145 self.log.log("Caught exception:\n" + traceback.format_exc(), ORPG_GENERAL) | 145 logger.general("Caught exception:\n" + traceback.format_exc()) |
146 self.set_status(0) | 146 self.set_status(0) |
147 | 147 |
148 def reset(self, sock): | 148 def reset(self, sock): |
149 self.disconnect() | 149 self.disconnect() |
150 self.sock = sock | 150 self.sock = sock |
159 | 159 |
160 def update_self_from_player(self, player): | 160 def update_self_from_player(self, player): |
161 try: | 161 try: |
162 (self.name, self.ip, self.id, self.text_status, self.version, self.protocol_version, self.client_string,role) = player | 162 (self.name, self.ip, self.id, self.text_status, self.version, self.protocol_version, self.client_string,role) = player |
163 except: | 163 except: |
164 self.log.log("Exception: messenger->update_self_from_player():\n" + traceback.format_exc(), ORPG_GENERAL) | 164 logger.general("Exception: messenger->update_self_from_player():\n" + traceback.format_exc()) |
165 | 165 |
166 def toxml(self, act): | 166 def toxml(self, act): |
167 self.log.log("DEPRECIATED! messenger->toxml()", ORPG_CRITICAL) | 167 logger.exception("DEPRECIATED! messenger->toxml()") |
168 xml_data = self.build_message('player', | 168 xml_data = self.build_message('player', |
169 name=myescape(self.name), | 169 name=myescape(self.name), |
170 action=act, | 170 action=act, |
171 id=self.id, | 171 id=self.id, |
172 group_id=self.group_id, | 172 group_id=self.group_id, |
288 | 288 |
289 | 289 |
290 #Message Handaling | 290 #Message Handaling |
291 def message_handler(self, arg): | 291 def message_handler(self, arg): |
292 xml_dom = None | 292 xml_dom = None |
293 self.log.log("message handler thread running...", ORPG_NOTE) | 293 logger.note("message handler thread running...", ORPG_NOTE) |
294 while self.alive or self.status == 'connected': | 294 while self.alive or self.status == 'connected': |
295 data = None | 295 data = None |
296 try: | 296 try: |
297 data = self.inbox.get(0) | 297 data = self.inbox.get(0) |
298 except Queue.Empty: | 298 except Queue.Empty: |
306 #data has been passed... unlink from the variable references | 306 #data has been passed... unlink from the variable references |
307 #so data in passed objects doesn't change (python passes by reference) | 307 #so data in passed objects doesn't change (python passes by reference) |
308 del data | 308 del data |
309 data = None | 309 data = None |
310 except Exception, e: | 310 except Exception, e: |
311 self.log.log(traceback.format_exc(), ORPG_GENERAL) | 311 logger.general(traceback.format_exc()) |
312 if xml_dom: xml_dom.unlink() | 312 if xml_dom: xml_dom.unlink() |
313 if xml_dom: xml_dom.unlink() | 313 if xml_dom: xml_dom.unlink() |
314 self.log.log("message handler thread exiting...", ORPG_NOTE) | 314 logger.note("message handler thread exiting...") |
315 self.inbox_event.set() | 315 self.inbox_event.set() |
316 | 316 |
317 def parse_incoming_dom(self, data): | 317 def parse_incoming_dom(self, data): |
318 #print data | 318 #print data |
319 xml_dom = None | 319 xml_dom = None |
320 try: | 320 try: |
321 xml_dom = self.xml.parseXml(data) | 321 xml_dom = component.get("xml").parseXml(data) |
322 xml_dom = xml_dom._get_documentElement() | 322 xml_dom = xml_dom._get_documentElement() |
323 self.message_action(xml_dom, data) | 323 self.message_action(xml_dom, data) |
324 | 324 |
325 except Exception, e: | 325 except Exception, e: |
326 self.log.log("Error in parse of inbound message. Ignoring message.", ORPG_GENERAL) | 326 logger.general("Error in parse of inbound message. Ignoring message.") |
327 self.log.log("\tOffending data(" + str(len(data)) + "bytes)=" + data, ORPG_GENERAL) | 327 logger.general("\tOffending data(" + str(len(data)) + "bytes)=" + data) |
328 self.log.log("Exception=" + traceback.format_exc(), ORPG_GENERAL) | 328 logger.general("Exception=" + traceback.format_exc()) |
329 if xml_dom: xml_dom.unlink() | 329 if xml_dom: xml_dom.unlink() |
330 | 330 |
331 def message_action(self, xml_dom, data): | 331 def message_action(self, xml_dom, data): |
332 tag_name = xml_dom._get_tagName() | 332 tag_name = xml_dom._get_tagName() |
333 if self.msg_handlers.has_key(tag_name): | 333 if self.msg_handlers.has_key(tag_name): |
334 self.msg_handlers[tag_name](xml_dom, data) | 334 self.msg_handlers[tag_name](xml_dom, data) |
335 else: | 335 else: |
336 self.log.log("Unknown Message Type", ORPG_GENERAL) | 336 logger.general("Unknown Message Type") |
337 self.log.log(data, ORPG_GENERAL) | 337 logger.general(data) |
338 #Message Action thread expires and closes here. | 338 #Message Action thread expires and closes here. |
339 return | 339 return |
340 | 340 |
341 #Privet functions | 341 #Privet functions |
342 def sendThread( self, arg ): | 342 def sendThread( self, arg ): |
348 while( self.get_status() == 'connected' ): | 348 while( self.get_status() == 'connected' ): |
349 try: | 349 try: |
350 readMsg = self.outbox.get( block=1 ) | 350 readMsg = self.outbox.get( block=1 ) |
351 | 351 |
352 except Exception, text: | 352 except Exception, text: |
353 self.log.log("Exception: messenger->sendThread(): " + str(text), ORPG_CRITICAL) | 353 logger.exception("Exception: messenger->sendThread(): " + str(text) |
354 | 354 |
355 # If we are here, it's because we have data to send, no doubt! | 355 # If we are here, it's because we have data to send, no doubt! |
356 if self.status == 'connected': | 356 if self.status == 'connected': |
357 try: | 357 try: |
358 # Send the entire message, properly formated/encoded | 358 # Send the entire message, properly formated/encoded |
359 sent = self.sendMsg( self.sock, readMsg ) | 359 sent = self.sendMsg( self.sock, readMsg ) |
360 except: | 360 except: |
361 self.log.log("Exception: messenger->sendThread():\n" + traceback.format_exc(), ORPG_CRITICAL) | 361 logger.exception("Exception: messenger->sendThread():\n" + traceback.format_exc() |
362 else: | 362 else: |
363 # If we are not connected, purge the data queue | 363 # If we are not connected, purge the data queue |
364 self.log.log("Data queued without a connection, purging data from queue...", ORPG_NOTE) | 364 logger.note("Data queued without a connection, purging data from queue...") |
365 self.sendThreadExitEvent.set() | 365 self.sendThreadExitEvent.set() |
366 self.log.log( "sendThread has terminated...", ORPG_NOTE) | 366 logger.note( "sendThread has terminated...") |
367 | 367 |
368 def sendMsg( self, sock, msg ): | 368 def sendMsg( self, sock, msg ): |
369 """Very simple function that will properly encode and send a message to te | 369 """Very simple function that will properly encode and send a message to te |
370 remote on the specified socket.""" | 370 remote on the specified socket.""" |
371 | 371 |
380 sentl = sock.send( lp ) | 380 sentl = sock.send( lp ) |
381 | 381 |
382 # Now, send the message the the length was describing | 382 # Now, send the message the the length was describing |
383 sentm = sock.send( msg ) | 383 sentm = sock.send( msg ) |
384 if self.isServer: | 384 if self.isServer: |
385 self.log.log("('data_sent', " + str(sentl+sentm) + ")", ORPG_DEBUG) | 385 logger.debug("('data_sent', " + str(sentl+sentm) + ")") |
386 return sentm | 386 return sentm |
387 except socket.error, e: | 387 except socket.error, e: |
388 self.log.log("Socket Error: messenger->sendMsg(): " + traceback.format_exc(), ORPG_CRITICAL) | 388 logger.exception("Socket Error: messenger->sendMsg(): " + traceback.format_exc()) |
389 except: | 389 except: |
390 self.log.log("Exception: messenger->sendMsg(): " + traceback.format_exc(), ORPG_CRITICAL) | 390 logger.exception("Exception: messenger->sendMsg(): " + traceback.format_exc()) |
391 | 391 |
392 def recvThread( self, arg ): | 392 def recvThread( self, arg ): |
393 "Receiving thread. This thread reads from the socket and writes to the data queue." | 393 "Receiving thread. This thread reads from the socket and writes to the data queue." |
394 | 394 |
395 # Wait to be told it's okay to start running | 395 # Wait to be told it's okay to start running |
411 else: | 411 else: |
412 # Pass along the message so it can be processed | 412 # Pass along the message so it can be processed |
413 self.inbox.put( readMsg ) | 413 self.inbox.put( readMsg ) |
414 self.update_idle_time() #update the last message time | 414 self.update_idle_time() #update the last message time |
415 if bytes == 0: | 415 if bytes == 0: |
416 self.log.log("Remote has disconnected!", ORPG_NOTE) | 416 logger.note("Remote has disconnected!") |
417 self.set_status(2) | 417 self.set_status(2) |
418 self.outbox.put( "" ) # Make sure the other thread is woken up! | 418 self.outbox.put( "" ) # Make sure the other thread is woken up! |
419 self.sendThreadExitEvent.set() | 419 self.sendThreadExitEvent.set() |
420 self.log.log("messenger->recvThread() has terminated...", ORPG_NOTE) | 420 logger.note("messenger->recvThread() has terminated...") |
421 | 421 |
422 def recvData( self, sock, readSize ): | 422 def recvData( self, sock, readSize ): |
423 """Simple socket receive method. This method will only return when the exact | 423 """Simple socket receive method. This method will only return when the exact |
424 byte count has been read from the connection, if remote terminates our | 424 byte count has been read from the connection, if remote terminates our |
425 connection or we get some other socket exception.""" | 425 connection or we get some other socket exception.""" |
437 else: | 437 else: |
438 # Continue to build complete message | 438 # Continue to build complete message |
439 offset += rs | 439 offset += rs |
440 data += frag | 440 data += frag |
441 except socket.error, e: | 441 except socket.error, e: |
442 self.log.log("Socket Error: messenger->recvData(): " + str(e), ORPG_CRITICAL) | 442 logger.exception("Socket Error: messenger->recvData(): " + str(e)) |
443 data = "" | 443 data = "" |
444 return data | 444 return data |
445 | 445 |
446 def recvMsg( self, sock ): | 446 def recvMsg( self, sock ): |
447 """This method now expects to receive a message having a 4-byte prefix length. It will ONLY read | 447 """This method now expects to receive a message having a 4-byte prefix length. It will ONLY read |
463 | 463 |
464 # Read exactly the remaining amount of data | 464 # Read exactly the remaining amount of data |
465 msgData = self.recvData( sock, length ) | 465 msgData = self.recvData( sock, length ) |
466 | 466 |
467 if self.isServer: | 467 if self.isServer: |
468 self.log.log("('data_recv', " + str(length+4) + ")", ORPG_DEBUG) | 468 logger.debug("('data_recv', " + str(length+4) + ")") |
469 except: | 469 except: |
470 self.log.log("Exception: messenger->recvMsg():\n" + traceback.format_exc(), ORPG_CRITICAL) | 470 logger.exception("Exception: messenger->recvMsg():\n" + traceback.format_exc()) |
471 return msgData | 471 return msgData |
472 | 472 |
473 if __name__ == "__main__": | 473 if __name__ == "__main__": |
474 test = messenger(None) | 474 test = messenger(None) |
475 print test.build_message('hello', "This is a test message", attrib1="hello world", attrib2="hello world2", attrib3="hello world3") | 475 print test.build_message('hello', "This is a test message", attrib1="hello world", attrib2="hello world2", attrib3="hello world3") |