comparison orpg/networking/mplay_messaging.py @ 71:449a8900f9ac ornery-dev

Code refining almost completed, for this round. Some included files are still in need of some clean up, but this is test worthy.
author sirebral
date Thu, 20 Aug 2009 03:00:39 -0500
parents c54768cffbd4
children d1aff41c031b
comparison
equal deleted inserted replaced
70:52a5fa913008 71:449a8900f9ac
34 import traceback 34 import traceback
35 from threading import Event, Lock 35 from threading import Event, Lock
36 from xml.sax.saxutils import escape 36 from xml.sax.saxutils import escape
37 from struct import pack, unpack, calcsize 37 from struct import pack, unpack, calcsize
38 from string import * 38 from string import *
39 from orpg.orpg_version import * 39 from orpg.orpg_version import VERSION, PROTOCOL_VERSION, CLIENT_STRING, SERVER_MIN_CLIENT_VERSION
40 import os 40 import os
41 import time 41 import time
42 42
43 from orpg.orpgCore import * 43 from orpg.tools.orpg_log import logger
44 from orpg.orpgCore import component
44 45
45 def myescape(data): 46 def myescape(data):
46 return escape(data,{"\"":""}) 47 return escape(data,{"\"":""})
47 48
48 class messenger: 49 class messenger:
49 def __init__(self, *args, **kwargs): 50 def __init__(self, *args, **kwargs):
50 self.log = component.get("log") 51 #self.xml = component.get("xml") used once, no need for the object.
51 self.xml = component.get("xml") 52 self.dir_struct = component.get("dir_struct") #used?
52 self.dir_struct = component.get("dir_struct") 53 self.validate = component.get("validate") #used??
53 self.validate = component.get("validate") 54 #self.settings = component.get("settings") ## used once, no need for the object.
54 self.settings = component.get("settings")
55 if kwargs.has_key('isServer'): 55 if kwargs.has_key('isServer'):
56 self.isServer = kwargs['isServer'] 56 self.isServer = kwargs['isServer']
57 else: 57 else:
58 self.isServer = False 58 self.isServer = False
59 self.listen_event = Event() 59 self.listen_event = Event()
62 self.inbox = Queue.Queue(0) 62 self.inbox = Queue.Queue(0)
63 self.startedEvent = Event() 63 self.startedEvent = Event()
64 self.exitEvent = Event() 64 self.exitEvent = Event()
65 self.sendThreadExitEvent = Event() 65 self.sendThreadExitEvent = Event()
66 self.recvThreadExitEvent = Event() 66 self.recvThreadExitEvent = Event()
67 self.port = int(self.settings.get_setting("port")) 67 self.port = int(component.get("settings").get_setting("port")) ##used even?
68 self.ip = socket.gethostbyname(socket.gethostname()) 68 self.ip = socket.gethostbyname(socket.gethostname())
69 self.lensize = calcsize('i') 69 self.lensize = calcsize('i')
70 self.mplay_type = ('disconnected', 'connected', 'disconnecting', 'group change', 'group change failed') 70 self.mplay_type = ('disconnected', 'connected', 'disconnecting', 'group change', 'group change failed')
71 self.status = self.mplay_type[0] 71 self.status = self.mplay_type[0]
72 self.alive = False 72 self.alive = False
135 message += ' />' 135 message += ' />'
136 return message 136 return message
137 137
138 def disconnect(self): 138 def disconnect(self):
139 self.set_status(2) 139 self.set_status(2)
140 self.log.log("client stub " + self.ip +" disconnecting...", ORPG_DEBUG) 140 logger.debug("client stub " + self.ip +" disconnecting...")
141 self.log.log("closing sockets...", ORPG_DEBUG) 141 logger.debug("closing sockets...")
142 try: 142 try:
143 self.sock.shutdown( 2 ) 143 self.sock.shutdown( 2 )
144 except: 144 except:
145 self.log.log("Caught exception:\n" + traceback.format_exc(), ORPG_GENERAL) 145 logger.general("Caught exception:\n" + traceback.format_exc())
146 self.set_status(0) 146 self.set_status(0)
147 147
148 def reset(self, sock): 148 def reset(self, sock):
149 self.disconnect() 149 self.disconnect()
150 self.sock = sock 150 self.sock = sock
159 159
160 def update_self_from_player(self, player): 160 def update_self_from_player(self, player):
161 try: 161 try:
162 (self.name, self.ip, self.id, self.text_status, self.version, self.protocol_version, self.client_string,role) = player 162 (self.name, self.ip, self.id, self.text_status, self.version, self.protocol_version, self.client_string,role) = player
163 except: 163 except:
164 self.log.log("Exception: messenger->update_self_from_player():\n" + traceback.format_exc(), ORPG_GENERAL) 164 logger.general("Exception: messenger->update_self_from_player():\n" + traceback.format_exc())
165 165
166 def toxml(self, act): 166 def toxml(self, act):
167 self.log.log("DEPRECIATED! messenger->toxml()", ORPG_CRITICAL) 167 logger.exception("DEPRECIATED! messenger->toxml()")
168 xml_data = self.build_message('player', 168 xml_data = self.build_message('player',
169 name=myescape(self.name), 169 name=myescape(self.name),
170 action=act, 170 action=act,
171 id=self.id, 171 id=self.id,
172 group_id=self.group_id, 172 group_id=self.group_id,
288 288
289 289
290 #Message Handaling 290 #Message Handaling
291 def message_handler(self, arg): 291 def message_handler(self, arg):
292 xml_dom = None 292 xml_dom = None
293 self.log.log("message handler thread running...", ORPG_NOTE) 293 logger.note("message handler thread running...", ORPG_NOTE)
294 while self.alive or self.status == 'connected': 294 while self.alive or self.status == 'connected':
295 data = None 295 data = None
296 try: 296 try:
297 data = self.inbox.get(0) 297 data = self.inbox.get(0)
298 except Queue.Empty: 298 except Queue.Empty:
306 #data has been passed... unlink from the variable references 306 #data has been passed... unlink from the variable references
307 #so data in passed objects doesn't change (python passes by reference) 307 #so data in passed objects doesn't change (python passes by reference)
308 del data 308 del data
309 data = None 309 data = None
310 except Exception, e: 310 except Exception, e:
311 self.log.log(traceback.format_exc(), ORPG_GENERAL) 311 logger.general(traceback.format_exc())
312 if xml_dom: xml_dom.unlink() 312 if xml_dom: xml_dom.unlink()
313 if xml_dom: xml_dom.unlink() 313 if xml_dom: xml_dom.unlink()
314 self.log.log("message handler thread exiting...", ORPG_NOTE) 314 logger.note("message handler thread exiting...")
315 self.inbox_event.set() 315 self.inbox_event.set()
316 316
317 def parse_incoming_dom(self, data): 317 def parse_incoming_dom(self, data):
318 #print data 318 #print data
319 xml_dom = None 319 xml_dom = None
320 try: 320 try:
321 xml_dom = self.xml.parseXml(data) 321 xml_dom = component.get("xml").parseXml(data)
322 xml_dom = xml_dom._get_documentElement() 322 xml_dom = xml_dom._get_documentElement()
323 self.message_action(xml_dom, data) 323 self.message_action(xml_dom, data)
324 324
325 except Exception, e: 325 except Exception, e:
326 self.log.log("Error in parse of inbound message. Ignoring message.", ORPG_GENERAL) 326 logger.general("Error in parse of inbound message. Ignoring message.")
327 self.log.log("\tOffending data(" + str(len(data)) + "bytes)=" + data, ORPG_GENERAL) 327 logger.general("\tOffending data(" + str(len(data)) + "bytes)=" + data)
328 self.log.log("Exception=" + traceback.format_exc(), ORPG_GENERAL) 328 logger.general("Exception=" + traceback.format_exc())
329 if xml_dom: xml_dom.unlink() 329 if xml_dom: xml_dom.unlink()
330 330
331 def message_action(self, xml_dom, data): 331 def message_action(self, xml_dom, data):
332 tag_name = xml_dom._get_tagName() 332 tag_name = xml_dom._get_tagName()
333 if self.msg_handlers.has_key(tag_name): 333 if self.msg_handlers.has_key(tag_name):
334 self.msg_handlers[tag_name](xml_dom, data) 334 self.msg_handlers[tag_name](xml_dom, data)
335 else: 335 else:
336 self.log.log("Unknown Message Type", ORPG_GENERAL) 336 logger.general("Unknown Message Type")
337 self.log.log(data, ORPG_GENERAL) 337 logger.general(data)
338 #Message Action thread expires and closes here. 338 #Message Action thread expires and closes here.
339 return 339 return
340 340
341 #Privet functions 341 #Privet functions
342 def sendThread( self, arg ): 342 def sendThread( self, arg ):
348 while( self.get_status() == 'connected' ): 348 while( self.get_status() == 'connected' ):
349 try: 349 try:
350 readMsg = self.outbox.get( block=1 ) 350 readMsg = self.outbox.get( block=1 )
351 351
352 except Exception, text: 352 except Exception, text:
353 self.log.log("Exception: messenger->sendThread(): " + str(text), ORPG_CRITICAL) 353 logger.exception("Exception: messenger->sendThread(): " + str(text)
354 354
355 # If we are here, it's because we have data to send, no doubt! 355 # If we are here, it's because we have data to send, no doubt!
356 if self.status == 'connected': 356 if self.status == 'connected':
357 try: 357 try:
358 # Send the entire message, properly formated/encoded 358 # Send the entire message, properly formated/encoded
359 sent = self.sendMsg( self.sock, readMsg ) 359 sent = self.sendMsg( self.sock, readMsg )
360 except: 360 except:
361 self.log.log("Exception: messenger->sendThread():\n" + traceback.format_exc(), ORPG_CRITICAL) 361 logger.exception("Exception: messenger->sendThread():\n" + traceback.format_exc()
362 else: 362 else:
363 # If we are not connected, purge the data queue 363 # If we are not connected, purge the data queue
364 self.log.log("Data queued without a connection, purging data from queue...", ORPG_NOTE) 364 logger.note("Data queued without a connection, purging data from queue...")
365 self.sendThreadExitEvent.set() 365 self.sendThreadExitEvent.set()
366 self.log.log( "sendThread has terminated...", ORPG_NOTE) 366 logger.note( "sendThread has terminated...")
367 367
368 def sendMsg( self, sock, msg ): 368 def sendMsg( self, sock, msg ):
369 """Very simple function that will properly encode and send a message to te 369 """Very simple function that will properly encode and send a message to te
370 remote on the specified socket.""" 370 remote on the specified socket."""
371 371
380 sentl = sock.send( lp ) 380 sentl = sock.send( lp )
381 381
382 # Now, send the message the the length was describing 382 # Now, send the message the the length was describing
383 sentm = sock.send( msg ) 383 sentm = sock.send( msg )
384 if self.isServer: 384 if self.isServer:
385 self.log.log("('data_sent', " + str(sentl+sentm) + ")", ORPG_DEBUG) 385 logger.debug("('data_sent', " + str(sentl+sentm) + ")")
386 return sentm 386 return sentm
387 except socket.error, e: 387 except socket.error, e:
388 self.log.log("Socket Error: messenger->sendMsg(): " + traceback.format_exc(), ORPG_CRITICAL) 388 logger.exception("Socket Error: messenger->sendMsg(): " + traceback.format_exc())
389 except: 389 except:
390 self.log.log("Exception: messenger->sendMsg(): " + traceback.format_exc(), ORPG_CRITICAL) 390 logger.exception("Exception: messenger->sendMsg(): " + traceback.format_exc())
391 391
392 def recvThread( self, arg ): 392 def recvThread( self, arg ):
393 "Receiving thread. This thread reads from the socket and writes to the data queue." 393 "Receiving thread. This thread reads from the socket and writes to the data queue."
394 394
395 # Wait to be told it's okay to start running 395 # Wait to be told it's okay to start running
411 else: 411 else:
412 # Pass along the message so it can be processed 412 # Pass along the message so it can be processed
413 self.inbox.put( readMsg ) 413 self.inbox.put( readMsg )
414 self.update_idle_time() #update the last message time 414 self.update_idle_time() #update the last message time
415 if bytes == 0: 415 if bytes == 0:
416 self.log.log("Remote has disconnected!", ORPG_NOTE) 416 logger.note("Remote has disconnected!")
417 self.set_status(2) 417 self.set_status(2)
418 self.outbox.put( "" ) # Make sure the other thread is woken up! 418 self.outbox.put( "" ) # Make sure the other thread is woken up!
419 self.sendThreadExitEvent.set() 419 self.sendThreadExitEvent.set()
420 self.log.log("messenger->recvThread() has terminated...", ORPG_NOTE) 420 logger.note("messenger->recvThread() has terminated...")
421 421
422 def recvData( self, sock, readSize ): 422 def recvData( self, sock, readSize ):
423 """Simple socket receive method. This method will only return when the exact 423 """Simple socket receive method. This method will only return when the exact
424 byte count has been read from the connection, if remote terminates our 424 byte count has been read from the connection, if remote terminates our
425 connection or we get some other socket exception.""" 425 connection or we get some other socket exception."""
437 else: 437 else:
438 # Continue to build complete message 438 # Continue to build complete message
439 offset += rs 439 offset += rs
440 data += frag 440 data += frag
441 except socket.error, e: 441 except socket.error, e:
442 self.log.log("Socket Error: messenger->recvData(): " + str(e), ORPG_CRITICAL) 442 logger.exception("Socket Error: messenger->recvData(): " + str(e))
443 data = "" 443 data = ""
444 return data 444 return data
445 445
446 def recvMsg( self, sock ): 446 def recvMsg( self, sock ):
447 """This method now expects to receive a message having a 4-byte prefix length. It will ONLY read 447 """This method now expects to receive a message having a 4-byte prefix length. It will ONLY read
463 463
464 # Read exactly the remaining amount of data 464 # Read exactly the remaining amount of data
465 msgData = self.recvData( sock, length ) 465 msgData = self.recvData( sock, length )
466 466
467 if self.isServer: 467 if self.isServer:
468 self.log.log("('data_recv', " + str(length+4) + ")", ORPG_DEBUG) 468 logger.debug("('data_recv', " + str(length+4) + ")")
469 except: 469 except:
470 self.log.log("Exception: messenger->recvMsg():\n" + traceback.format_exc(), ORPG_CRITICAL) 470 logger.exception("Exception: messenger->recvMsg():\n" + traceback.format_exc())
471 return msgData 471 return msgData
472 472
473 if __name__ == "__main__": 473 if __name__ == "__main__":
474 test = messenger(None) 474 test = messenger(None)
475 print test.build_message('hello', "This is a test message", attrib1="hello world", attrib2="hello world2", attrib3="hello world3") 475 print test.build_message('hello', "This is a test message", attrib1="hello world", attrib2="hello world2", attrib3="hello world3")