comparison orpg/networking/mplay_messaging.py @ 0:4385a7d0efd1 grumpy-goblin

Deleted and repushed it with the 'grumpy-goblin' branch. I forgot a y
author sirebral
date Tue, 14 Jul 2009 16:41:58 -0500
parents
children c54768cffbd4
comparison
equal deleted inserted replaced
-1:000000000000 0:4385a7d0efd1
1 # Copyright (C) 2000-2001 The OpenRPG Project
2 #
3 # openrpg-dev@lists.sourceforge.net
4 #
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation; either version 2 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program; if not, write to the Free Software
17 # Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18 # --
19 #
20 # File: mplay_messaging.py
21 # Author: Dj Gilcrease
22 # Maintainer:
23 # Version:
24 # $Id: mplay_messaging.py,v 1.5 2007/05/06 16:42:59 digitalxero Exp $
25 #
26 # Description: This file contains the code for the client / server messaging
27 #
28
29 __version__ = "$Id: mplay_messaging.py,v 1.5 2007/05/06 16:42:59 digitalxero Exp $"
30
31 import socket
32 import Queue
33 import thread
34 import traceback
35 from threading import Event, Lock
36 from xml.sax.saxutils import escape
37 from struct import pack, unpack, calcsize
38 from string import *
39 from orpg.orpg_version import *
40 import os
41 import time
42
43 from orpg.orpgCore import *
44
45 def myescape(data):
46 return escape(data,{"\"":""})
47
48 class messenger:
49 def __init__(self, *args, **kwargs):
50 self.log = open_rpg.get_component("log")
51 self.xml = open_rpg.get_component("xml")
52 self.dir_struct = open_rpg.get_component("dir_struct")
53 self.validate = open_rpg.get_component("validate")
54 self.settings = open_rpg.get_component("settings")
55 if kwargs.has_key('isServer'):
56 self.isServer = kwargs['isServer']
57 else:
58 self.isServer = False
59 self.listen_event = Event()
60 self.outbox = Queue.Queue(0)
61 self.inbox_event = Event()
62 self.inbox = Queue.Queue(0)
63 self.startedEvent = Event()
64 self.exitEvent = Event()
65 self.sendThreadExitEvent = Event()
66 self.recvThreadExitEvent = Event()
67 self.port = int(self.settings.get_setting("port"))
68 self.ip = socket.gethostbyname(socket.gethostname())
69 self.lensize = calcsize('i')
70 self.mplay_type = ('disconnected', 'connected', 'disconnecting', 'group change', 'group change failed')
71 self.status = self.mplay_type[0]
72 self.alive = False
73 self.sock = None
74 self.version = VERSION
75 self.protocol_version = PROTOCOL_VERSION
76 self.client_string = CLIENT_STRING
77 self.minClientVersion = SERVER_MIN_CLIENT_VERSION
78 self.id = "0"
79 self.group_id = "0"
80 self.name = ""
81 self.role = "GM"
82 self.ROLE_GM = "GM"
83 self.ROLE_PLAYER = "PLAYER"
84 self.ROLE_LURKER = "LURKER"
85 self.text_status = "Idle"
86 self.statLock = Lock()
87 self.useroles = 0
88 self.lastmessagetime = time.time()
89 self.connecttime = time.time()
90 self.timeout_time = None
91 self.ignorelist = {}
92 self.players = {}
93 self.groups = {}
94
95 #Setup Stuff from the Server
96 if kwargs.has_key('inbox'):
97 self.inbox = kwargs['inbox']
98 if kwargs.has_key('sock'):
99 self.sock = kwargs['sock']
100 if kwargs.has_key('ip'):
101 self.ip = kwargs['ip']
102 if kwargs.has_key('role'):
103 self.role = kwargs['role']
104 if kwargs.has_key('id'):
105 self.id = kwargs['id']
106 if kwargs.has_key('group_id'):
107 self.group_id = kwargs['group_id']
108 if kwargs.has_key('name'):
109 self.name = kwargs['name']
110 if kwargs.has_key('version'):
111 self.version = kwargs['version']
112 if kwargs.has_key('protocol_version'):
113 self.protocol_version = kwargs['protocol_version']
114 if kwargs.has_key('client_string'):
115 self.client_string = kwargs['client_string']
116
117 def build_message(self, *args, **kwargs):
118 #print args
119 message = '<' + args[0]
120
121 #Setup the attributes of the message
122 if len(kwargs) > 0:
123 for attrib in kwargs.keys():
124 message += ' ' + attrib + '="' + str(kwargs[attrib]) + '"'
125
126 #Add the actual message if there is one
127 if len(args) > 1:
128 #Close the first part
129 message += '>'
130 message += escape(args[1])
131
132 #Close the whole thing
133 message += '</' + args[0] + '>'
134 else:
135 message += ' />'
136 return message
137
138 def disconnect(self):
139 self.set_status(2)
140 self.log.log("client stub " + self.ip +" disconnecting...", ORPG_DEBUG)
141 self.log.log("closing sockets...", ORPG_DEBUG)
142 try:
143 self.sock.shutdown( 2 )
144 except:
145 self.log.log("Caught exception:\n" + traceback.format_exc(), ORPG_GENERAL)
146 self.set_status(0)
147
148 def reset(self, sock):
149 self.disconnect()
150 self.sock = sock
151 self.initialize_threads()
152
153 def update_role(self, role):
154 self.useroles = 1
155 self.role = role
156
157 def use_roles(self):
158 return self.useroles
159
160 def update_self_from_player(self, player):
161 try:
162 (self.name, self.ip, self.id, self.text_status, self.version, self.protocol_version, self.client_string,role) = player
163 except:
164 self.log.log("Exception: messenger->update_self_from_player():\n" + traceback.format_exc(), ORPG_GENERAL)
165
166 def toxml(self, act):
167 self.log.log("DEPRECIATED! messenger->toxml()", ORPG_CRITICAL)
168 xml_data = self.build_message('player',
169 name=myescape(self.name),
170 action=act,
171 id=self.id,
172 group_id=self.group_id,
173 ip=self.ip,
174 status=self.text_status,
175 version=self.version,
176 protocol_version=self.protocol_version,
177 client_string=self.client_string
178 )
179 return xml_data
180
181 def get_status(self):
182 self.statLock.acquire()
183 status = self.status
184 self.statLock.release()
185 return status
186
187 def my_role(self):
188 return self.role
189
190 def set_status(self, status):
191 self.statLock.acquire()
192 self.status = status
193 self.statLock.release()
194
195 def initialize_threads(self):
196 "Starts up our threads (2) and waits for them to make sure they are running!"
197 self.status = 'connected'
198 self.sock.setblocking(1)
199
200 # Confirm that our threads have started
201 thread.start_new_thread( self.sendThread,(0,) )
202 thread.start_new_thread( self.recvThread,(0,) )
203 self.startedEvent.set()
204
205 def __str__(self):
206 return "%s(%s)\nIP:%s\ngroup_id:%s\n%s (%s)" % (self.name, self.id, self.ip, self.group_id, self.idle_time(), self.connected_time())
207
208 # idle time functions added by snowdog 3/31/04
209 def update_idle_time(self):
210 self.lastmessagetime = time.time()
211
212 def idle_time(self):
213 curtime = time.time()
214 idletime = curtime - self.lastmessagetime
215 return idletime
216
217 def idle_status(self):
218 idletime = self.idle_time()
219 idlemins = idletime / 60
220 status = "Unknown"
221 if idlemins < 3:
222 status = "Active"
223 elif idlemins < 10:
224 status = "Idle ("+str(int(idlemins))+" mins)"
225 else:
226 status = "Inactive ("+str(int(idlemins))+" mins)"
227 return status
228
229 def connected_time(self):
230 curtime = time.time()
231 timeoffset = curtime - self.connecttime
232 return timeoffset
233
234 def connected_time_string(self):
235 "returns the time client has been connected as a formated time string"
236 ct = self.connected_time()
237 d = int(ct/86400)
238 h = int( (ct-(86400*d))/3600 )
239 m = int( (ct-(86400*d)-(3600*h))/60)
240 s = int( (ct-(86400*d)-(3600*h)-(60*m)) )
241 cts = zfill(d,2)+":"+zfill(h,2)+":"+zfill(m,2)+":"+zfill(s,2)
242 return cts
243
244 def clear_timeout(self):
245 self.timeout_time = None
246
247 def check_time_out(self):
248 if self.timeout_time==None:
249 self.timeout_time = time.time()
250 curtime = time.time()
251 diff = curtime - self.timeout_time
252 if diff > 1800:
253 return 1
254 else:
255 return 0
256
257 def send(self, msg):
258 if self.get_status() == 'connected':
259 self.outbox.put(msg)
260
261 def change_group(self, group_id, groups):
262 old_group_id = str(self.group_id)
263 groups[group_id].add_player(self.id)
264 groups[old_group_id].remove_player(self.id)
265 self.group_id = group_id
266 self.outbox.put(self.toxml('group'))
267 msg = groups[group_id].game_map.get_all_xml()
268 self.send(msg)
269 return old_group_id
270
271 def take_dom(self, xml_dom):
272 self.name = xml_dom.getAttribute("name")
273 self.text_status = xml_dom.getAttribute("status")
274
275 def add_msg_handler(self, tag, function, core=False):
276 if not self.msg_handlers.has_key(tag):
277 self.msg_handlers[tag] = function
278 if core:
279 self.core_msg_handlers.append(tag)
280 else:
281 print 'XML Messages ' + tag + ' already has a handler'
282
283 def remove_msg_handler(self, tag):
284 if self.msg_handlers.has_key(tag) and not tag in self.core_msg_handlers:
285 del self.msg_handlers[tag]
286 else:
287 print 'XML Messages ' + tag + ' already deleted'
288
289
290 #Message Handaling
291 def message_handler(self, arg):
292 xml_dom = None
293 self.log.log("message handler thread running...", ORPG_NOTE)
294 while self.alive or self.status == 'connected':
295 data = None
296 try:
297 data = self.inbox.get(0)
298 except Queue.Empty:
299 time.sleep(0.25) #sleep 1/4 second
300 continue
301 bytes = len(data)
302 if bytes < 5:
303 continue
304 try:
305 thread.start_new_thread(self.parse_incoming_dom,(str(data),))
306 #data has been passed... unlink from the variable references
307 #so data in passed objects doesn't change (python passes by reference)
308 del data
309 data = None
310 except Exception, e:
311 self.log.log(traceback.format_exc(), ORPG_GENERAL)
312 if xml_dom: xml_dom.unlink()
313 if xml_dom: xml_dom.unlink()
314 self.log.log("message handler thread exiting...", ORPG_NOTE)
315 self.inbox_event.set()
316
317 def parse_incoming_dom(self, data):
318 #print data
319 xml_dom = None
320 try:
321 xml_dom = self.xml.parseXml(data)
322 xml_dom = xml_dom._get_documentElement()
323 self.message_action(xml_dom, data)
324
325 except Exception, e:
326 self.log.log("Error in parse of inbound message. Ignoring message.", ORPG_GENERAL)
327 self.log.log("\tOffending data(" + str(len(data)) + "bytes)=" + data, ORPG_GENERAL)
328 self.log.log("Exception=" + traceback.format_exc(), ORPG_GENERAL)
329 if xml_dom: xml_dom.unlink()
330
331 def message_action(self, xml_dom, data):
332 tag_name = xml_dom._get_tagName()
333 if self.msg_handlers.has_key(tag_name):
334 self.msg_handlers[tag_name](xml_dom, data)
335 else:
336 self.log.log("Unknown Message Type", ORPG_GENERAL)
337 self.log.log(data, ORPG_GENERAL)
338 #Message Action thread expires and closes here.
339 return
340
341 #Privet functions
342 def sendThread( self, arg ):
343 "Sending thread. This thread reads from the data queue and writes to the socket."
344 # Wait to be told it's okay to start running
345 self.startedEvent.wait()
346
347 # Loop as long as we have a connection
348 while( self.get_status() == 'connected' ):
349 try:
350 readMsg = self.outbox.get( block=1 )
351
352 except Exception, text:
353 self.log.log("Exception: messenger->sendThread(): " + str(text), ORPG_CRITICAL)
354
355 # If we are here, it's because we have data to send, no doubt!
356 if self.status == 'connected':
357 try:
358 # Send the entire message, properly formated/encoded
359 sent = self.sendMsg( self.sock, readMsg )
360 except:
361 self.log.log("Exception: messenger->sendThread():\n" + traceback.format_exc(), ORPG_CRITICAL)
362 else:
363 # If we are not connected, purge the data queue
364 self.log.log("Data queued without a connection, purging data from queue...", ORPG_NOTE)
365 self.sendThreadExitEvent.set()
366 self.log.log( "sendThread has terminated...", ORPG_NOTE)
367
368 def sendMsg( self, sock, msg ):
369 """Very simple function that will properly encode and send a message to te
370 remote on the specified socket."""
371
372 # Calculate our message length
373 length = len( msg )
374
375 # Encode the message length into network byte order
376 lp = pack( 'i', socket.htonl( length ) )
377
378 try:
379 # Send the encoded length
380 sentl = sock.send( lp )
381
382 # Now, send the message the the length was describing
383 sentm = sock.send( msg )
384 if self.isServer:
385 self.log.log("('data_sent', " + str(sentl+sentm) + ")", ORPG_DEBUG)
386 return sentm
387 except socket.error, e:
388 self.log.log("Socket Error: messenger->sendMsg(): " + traceback.format_exc(), ORPG_CRITICAL)
389 except:
390 self.log.log("Exception: messenger->sendMsg(): " + traceback.format_exc(), ORPG_CRITICAL)
391
392 def recvThread( self, arg ):
393 "Receiving thread. This thread reads from the socket and writes to the data queue."
394
395 # Wait to be told it's okay to start running
396 self.startedEvent.wait()
397 while( self.get_status() == 'connected' ):
398 readMsg = self.recvMsg( self.sock )
399
400 # Make sure we didn't get disconnected
401 bytes = len( readMsg )
402 if bytes == 0:
403 break
404
405 # Check the length of the message
406 bytes = len( readMsg )
407
408 # Make sure we are still connected
409 if bytes == 0:
410 break
411 else:
412 # Pass along the message so it can be processed
413 self.inbox.put( readMsg )
414 self.update_idle_time() #update the last message time
415 if bytes == 0:
416 self.log.log("Remote has disconnected!", ORPG_NOTE)
417 self.set_status(2)
418 self.outbox.put( "" ) # Make sure the other thread is woken up!
419 self.sendThreadExitEvent.set()
420 self.log.log("messenger->recvThread() has terminated...", ORPG_NOTE)
421
422 def recvData( self, sock, readSize ):
423 """Simple socket receive method. This method will only return when the exact
424 byte count has been read from the connection, if remote terminates our
425 connection or we get some other socket exception."""
426 data = ""
427 offset = 0
428 try:
429 while offset != readSize:
430 frag = sock.recv( readSize - offset )
431
432 # See if we've been disconnected
433 rs = len( frag )
434 if rs <= 0:
435 # Loudly raise an exception because we've been disconnected!
436 raise IOError, "Remote closed the connection!"
437 else:
438 # Continue to build complete message
439 offset += rs
440 data += frag
441 except socket.error, e:
442 self.log.log("Socket Error: messenger->recvData(): " + str(e), ORPG_CRITICAL)
443 data = ""
444 return data
445
446 def recvMsg( self, sock ):
447 """This method now expects to receive a message having a 4-byte prefix length. It will ONLY read
448 completed messages. In the event that the remote's connection is terminated, it will throw an
449 exception which should allow for the caller to more gracefully handles this exception event.
450
451 Because we use strictly reading ONLY based on the length that is told to use, we no longer have to
452 worry about partially adjusting for fragmented buffers starting somewhere within a buffer that we've
453 read. Rather, it will get ONLY a whole message and nothing more. Everything else will remain buffered
454 with the OS until we attempt to read the next complete message."""
455
456 msgData = ""
457 try:
458 lenData = self.recvData( sock, self.lensize )
459
460 # Now, convert to a usable form
461 (length,) = unpack( 'i', lenData )
462 length = socket.ntohl( length )
463
464 # Read exactly the remaining amount of data
465 msgData = self.recvData( sock, length )
466
467 if self.isServer:
468 self.log.log("('data_recv', " + str(length+4) + ")", ORPG_DEBUG)
469 except:
470 self.log.log("Exception: messenger->recvMsg():\n" + traceback.format_exc(), ORPG_CRITICAL)
471 return msgData
472
473 if __name__ == "__main__":
474 test = messenger(None)
475 print test.build_message('hello', "This is a test message", attrib1="hello world", attrib2="hello world2", attrib3="hello world3")