Mercurial > traipse_dev
comparison orpg/networking/mplay_messaging.py @ 0:4385a7d0efd1 grumpy-goblin
Deleted and repushed it with the 'grumpy-goblin' branch. I forgot a y
author | sirebral |
---|---|
date | Tue, 14 Jul 2009 16:41:58 -0500 |
parents | |
children | c54768cffbd4 |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:4385a7d0efd1 |
---|---|
1 # Copyright (C) 2000-2001 The OpenRPG Project | |
2 # | |
3 # openrpg-dev@lists.sourceforge.net | |
4 # | |
5 # This program is free software; you can redistribute it and/or modify | |
6 # it under the terms of the GNU General Public License as published by | |
7 # the Free Software Foundation; either version 2 of the License, or | |
8 # (at your option) any later version. | |
9 # | |
10 # This program is distributed in the hope that it will be useful, | |
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
13 # GNU General Public License for more details. | |
14 # | |
15 # You should have received a copy of the GNU General Public License | |
16 # along with this program; if not, write to the Free Software | |
17 # Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. | |
18 # -- | |
19 # | |
20 # File: mplay_messaging.py | |
21 # Author: Dj Gilcrease | |
22 # Maintainer: | |
23 # Version: | |
24 # $Id: mplay_messaging.py,v 1.5 2007/05/06 16:42:59 digitalxero Exp $ | |
25 # | |
26 # Description: This file contains the code for the client / server messaging | |
27 # | |
28 | |
29 __version__ = "$Id: mplay_messaging.py,v 1.5 2007/05/06 16:42:59 digitalxero Exp $" | |
30 | |
31 import socket | |
32 import Queue | |
33 import thread | |
34 import traceback | |
35 from threading import Event, Lock | |
36 from xml.sax.saxutils import escape | |
37 from struct import pack, unpack, calcsize | |
38 from string import * | |
39 from orpg.orpg_version import * | |
40 import os | |
41 import time | |
42 | |
43 from orpg.orpgCore import * | |
44 | |
45 def myescape(data): | |
46 return escape(data,{"\"":""}) | |
47 | |
48 class messenger: | |
49 def __init__(self, *args, **kwargs): | |
50 self.log = open_rpg.get_component("log") | |
51 self.xml = open_rpg.get_component("xml") | |
52 self.dir_struct = open_rpg.get_component("dir_struct") | |
53 self.validate = open_rpg.get_component("validate") | |
54 self.settings = open_rpg.get_component("settings") | |
55 if kwargs.has_key('isServer'): | |
56 self.isServer = kwargs['isServer'] | |
57 else: | |
58 self.isServer = False | |
59 self.listen_event = Event() | |
60 self.outbox = Queue.Queue(0) | |
61 self.inbox_event = Event() | |
62 self.inbox = Queue.Queue(0) | |
63 self.startedEvent = Event() | |
64 self.exitEvent = Event() | |
65 self.sendThreadExitEvent = Event() | |
66 self.recvThreadExitEvent = Event() | |
67 self.port = int(self.settings.get_setting("port")) | |
68 self.ip = socket.gethostbyname(socket.gethostname()) | |
69 self.lensize = calcsize('i') | |
70 self.mplay_type = ('disconnected', 'connected', 'disconnecting', 'group change', 'group change failed') | |
71 self.status = self.mplay_type[0] | |
72 self.alive = False | |
73 self.sock = None | |
74 self.version = VERSION | |
75 self.protocol_version = PROTOCOL_VERSION | |
76 self.client_string = CLIENT_STRING | |
77 self.minClientVersion = SERVER_MIN_CLIENT_VERSION | |
78 self.id = "0" | |
79 self.group_id = "0" | |
80 self.name = "" | |
81 self.role = "GM" | |
82 self.ROLE_GM = "GM" | |
83 self.ROLE_PLAYER = "PLAYER" | |
84 self.ROLE_LURKER = "LURKER" | |
85 self.text_status = "Idle" | |
86 self.statLock = Lock() | |
87 self.useroles = 0 | |
88 self.lastmessagetime = time.time() | |
89 self.connecttime = time.time() | |
90 self.timeout_time = None | |
91 self.ignorelist = {} | |
92 self.players = {} | |
93 self.groups = {} | |
94 | |
95 #Setup Stuff from the Server | |
96 if kwargs.has_key('inbox'): | |
97 self.inbox = kwargs['inbox'] | |
98 if kwargs.has_key('sock'): | |
99 self.sock = kwargs['sock'] | |
100 if kwargs.has_key('ip'): | |
101 self.ip = kwargs['ip'] | |
102 if kwargs.has_key('role'): | |
103 self.role = kwargs['role'] | |
104 if kwargs.has_key('id'): | |
105 self.id = kwargs['id'] | |
106 if kwargs.has_key('group_id'): | |
107 self.group_id = kwargs['group_id'] | |
108 if kwargs.has_key('name'): | |
109 self.name = kwargs['name'] | |
110 if kwargs.has_key('version'): | |
111 self.version = kwargs['version'] | |
112 if kwargs.has_key('protocol_version'): | |
113 self.protocol_version = kwargs['protocol_version'] | |
114 if kwargs.has_key('client_string'): | |
115 self.client_string = kwargs['client_string'] | |
116 | |
117 def build_message(self, *args, **kwargs): | |
118 #print args | |
119 message = '<' + args[0] | |
120 | |
121 #Setup the attributes of the message | |
122 if len(kwargs) > 0: | |
123 for attrib in kwargs.keys(): | |
124 message += ' ' + attrib + '="' + str(kwargs[attrib]) + '"' | |
125 | |
126 #Add the actual message if there is one | |
127 if len(args) > 1: | |
128 #Close the first part | |
129 message += '>' | |
130 message += escape(args[1]) | |
131 | |
132 #Close the whole thing | |
133 message += '</' + args[0] + '>' | |
134 else: | |
135 message += ' />' | |
136 return message | |
137 | |
138 def disconnect(self): | |
139 self.set_status(2) | |
140 self.log.log("client stub " + self.ip +" disconnecting...", ORPG_DEBUG) | |
141 self.log.log("closing sockets...", ORPG_DEBUG) | |
142 try: | |
143 self.sock.shutdown( 2 ) | |
144 except: | |
145 self.log.log("Caught exception:\n" + traceback.format_exc(), ORPG_GENERAL) | |
146 self.set_status(0) | |
147 | |
148 def reset(self, sock): | |
149 self.disconnect() | |
150 self.sock = sock | |
151 self.initialize_threads() | |
152 | |
153 def update_role(self, role): | |
154 self.useroles = 1 | |
155 self.role = role | |
156 | |
157 def use_roles(self): | |
158 return self.useroles | |
159 | |
160 def update_self_from_player(self, player): | |
161 try: | |
162 (self.name, self.ip, self.id, self.text_status, self.version, self.protocol_version, self.client_string,role) = player | |
163 except: | |
164 self.log.log("Exception: messenger->update_self_from_player():\n" + traceback.format_exc(), ORPG_GENERAL) | |
165 | |
166 def toxml(self, act): | |
167 self.log.log("DEPRECIATED! messenger->toxml()", ORPG_CRITICAL) | |
168 xml_data = self.build_message('player', | |
169 name=myescape(self.name), | |
170 action=act, | |
171 id=self.id, | |
172 group_id=self.group_id, | |
173 ip=self.ip, | |
174 status=self.text_status, | |
175 version=self.version, | |
176 protocol_version=self.protocol_version, | |
177 client_string=self.client_string | |
178 ) | |
179 return xml_data | |
180 | |
181 def get_status(self): | |
182 self.statLock.acquire() | |
183 status = self.status | |
184 self.statLock.release() | |
185 return status | |
186 | |
187 def my_role(self): | |
188 return self.role | |
189 | |
190 def set_status(self, status): | |
191 self.statLock.acquire() | |
192 self.status = status | |
193 self.statLock.release() | |
194 | |
195 def initialize_threads(self): | |
196 "Starts up our threads (2) and waits for them to make sure they are running!" | |
197 self.status = 'connected' | |
198 self.sock.setblocking(1) | |
199 | |
200 # Confirm that our threads have started | |
201 thread.start_new_thread( self.sendThread,(0,) ) | |
202 thread.start_new_thread( self.recvThread,(0,) ) | |
203 self.startedEvent.set() | |
204 | |
205 def __str__(self): | |
206 return "%s(%s)\nIP:%s\ngroup_id:%s\n%s (%s)" % (self.name, self.id, self.ip, self.group_id, self.idle_time(), self.connected_time()) | |
207 | |
208 # idle time functions added by snowdog 3/31/04 | |
209 def update_idle_time(self): | |
210 self.lastmessagetime = time.time() | |
211 | |
212 def idle_time(self): | |
213 curtime = time.time() | |
214 idletime = curtime - self.lastmessagetime | |
215 return idletime | |
216 | |
217 def idle_status(self): | |
218 idletime = self.idle_time() | |
219 idlemins = idletime / 60 | |
220 status = "Unknown" | |
221 if idlemins < 3: | |
222 status = "Active" | |
223 elif idlemins < 10: | |
224 status = "Idle ("+str(int(idlemins))+" mins)" | |
225 else: | |
226 status = "Inactive ("+str(int(idlemins))+" mins)" | |
227 return status | |
228 | |
229 def connected_time(self): | |
230 curtime = time.time() | |
231 timeoffset = curtime - self.connecttime | |
232 return timeoffset | |
233 | |
234 def connected_time_string(self): | |
235 "returns the time client has been connected as a formated time string" | |
236 ct = self.connected_time() | |
237 d = int(ct/86400) | |
238 h = int( (ct-(86400*d))/3600 ) | |
239 m = int( (ct-(86400*d)-(3600*h))/60) | |
240 s = int( (ct-(86400*d)-(3600*h)-(60*m)) ) | |
241 cts = zfill(d,2)+":"+zfill(h,2)+":"+zfill(m,2)+":"+zfill(s,2) | |
242 return cts | |
243 | |
244 def clear_timeout(self): | |
245 self.timeout_time = None | |
246 | |
247 def check_time_out(self): | |
248 if self.timeout_time==None: | |
249 self.timeout_time = time.time() | |
250 curtime = time.time() | |
251 diff = curtime - self.timeout_time | |
252 if diff > 1800: | |
253 return 1 | |
254 else: | |
255 return 0 | |
256 | |
257 def send(self, msg): | |
258 if self.get_status() == 'connected': | |
259 self.outbox.put(msg) | |
260 | |
261 def change_group(self, group_id, groups): | |
262 old_group_id = str(self.group_id) | |
263 groups[group_id].add_player(self.id) | |
264 groups[old_group_id].remove_player(self.id) | |
265 self.group_id = group_id | |
266 self.outbox.put(self.toxml('group')) | |
267 msg = groups[group_id].game_map.get_all_xml() | |
268 self.send(msg) | |
269 return old_group_id | |
270 | |
271 def take_dom(self, xml_dom): | |
272 self.name = xml_dom.getAttribute("name") | |
273 self.text_status = xml_dom.getAttribute("status") | |
274 | |
275 def add_msg_handler(self, tag, function, core=False): | |
276 if not self.msg_handlers.has_key(tag): | |
277 self.msg_handlers[tag] = function | |
278 if core: | |
279 self.core_msg_handlers.append(tag) | |
280 else: | |
281 print 'XML Messages ' + tag + ' already has a handler' | |
282 | |
283 def remove_msg_handler(self, tag): | |
284 if self.msg_handlers.has_key(tag) and not tag in self.core_msg_handlers: | |
285 del self.msg_handlers[tag] | |
286 else: | |
287 print 'XML Messages ' + tag + ' already deleted' | |
288 | |
289 | |
290 #Message Handaling | |
291 def message_handler(self, arg): | |
292 xml_dom = None | |
293 self.log.log("message handler thread running...", ORPG_NOTE) | |
294 while self.alive or self.status == 'connected': | |
295 data = None | |
296 try: | |
297 data = self.inbox.get(0) | |
298 except Queue.Empty: | |
299 time.sleep(0.25) #sleep 1/4 second | |
300 continue | |
301 bytes = len(data) | |
302 if bytes < 5: | |
303 continue | |
304 try: | |
305 thread.start_new_thread(self.parse_incoming_dom,(str(data),)) | |
306 #data has been passed... unlink from the variable references | |
307 #so data in passed objects doesn't change (python passes by reference) | |
308 del data | |
309 data = None | |
310 except Exception, e: | |
311 self.log.log(traceback.format_exc(), ORPG_GENERAL) | |
312 if xml_dom: xml_dom.unlink() | |
313 if xml_dom: xml_dom.unlink() | |
314 self.log.log("message handler thread exiting...", ORPG_NOTE) | |
315 self.inbox_event.set() | |
316 | |
317 def parse_incoming_dom(self, data): | |
318 #print data | |
319 xml_dom = None | |
320 try: | |
321 xml_dom = self.xml.parseXml(data) | |
322 xml_dom = xml_dom._get_documentElement() | |
323 self.message_action(xml_dom, data) | |
324 | |
325 except Exception, e: | |
326 self.log.log("Error in parse of inbound message. Ignoring message.", ORPG_GENERAL) | |
327 self.log.log("\tOffending data(" + str(len(data)) + "bytes)=" + data, ORPG_GENERAL) | |
328 self.log.log("Exception=" + traceback.format_exc(), ORPG_GENERAL) | |
329 if xml_dom: xml_dom.unlink() | |
330 | |
331 def message_action(self, xml_dom, data): | |
332 tag_name = xml_dom._get_tagName() | |
333 if self.msg_handlers.has_key(tag_name): | |
334 self.msg_handlers[tag_name](xml_dom, data) | |
335 else: | |
336 self.log.log("Unknown Message Type", ORPG_GENERAL) | |
337 self.log.log(data, ORPG_GENERAL) | |
338 #Message Action thread expires and closes here. | |
339 return | |
340 | |
341 #Privet functions | |
342 def sendThread( self, arg ): | |
343 "Sending thread. This thread reads from the data queue and writes to the socket." | |
344 # Wait to be told it's okay to start running | |
345 self.startedEvent.wait() | |
346 | |
347 # Loop as long as we have a connection | |
348 while( self.get_status() == 'connected' ): | |
349 try: | |
350 readMsg = self.outbox.get( block=1 ) | |
351 | |
352 except Exception, text: | |
353 self.log.log("Exception: messenger->sendThread(): " + str(text), ORPG_CRITICAL) | |
354 | |
355 # If we are here, it's because we have data to send, no doubt! | |
356 if self.status == 'connected': | |
357 try: | |
358 # Send the entire message, properly formated/encoded | |
359 sent = self.sendMsg( self.sock, readMsg ) | |
360 except: | |
361 self.log.log("Exception: messenger->sendThread():\n" + traceback.format_exc(), ORPG_CRITICAL) | |
362 else: | |
363 # If we are not connected, purge the data queue | |
364 self.log.log("Data queued without a connection, purging data from queue...", ORPG_NOTE) | |
365 self.sendThreadExitEvent.set() | |
366 self.log.log( "sendThread has terminated...", ORPG_NOTE) | |
367 | |
368 def sendMsg( self, sock, msg ): | |
369 """Very simple function that will properly encode and send a message to te | |
370 remote on the specified socket.""" | |
371 | |
372 # Calculate our message length | |
373 length = len( msg ) | |
374 | |
375 # Encode the message length into network byte order | |
376 lp = pack( 'i', socket.htonl( length ) ) | |
377 | |
378 try: | |
379 # Send the encoded length | |
380 sentl = sock.send( lp ) | |
381 | |
382 # Now, send the message the the length was describing | |
383 sentm = sock.send( msg ) | |
384 if self.isServer: | |
385 self.log.log("('data_sent', " + str(sentl+sentm) + ")", ORPG_DEBUG) | |
386 return sentm | |
387 except socket.error, e: | |
388 self.log.log("Socket Error: messenger->sendMsg(): " + traceback.format_exc(), ORPG_CRITICAL) | |
389 except: | |
390 self.log.log("Exception: messenger->sendMsg(): " + traceback.format_exc(), ORPG_CRITICAL) | |
391 | |
392 def recvThread( self, arg ): | |
393 "Receiving thread. This thread reads from the socket and writes to the data queue." | |
394 | |
395 # Wait to be told it's okay to start running | |
396 self.startedEvent.wait() | |
397 while( self.get_status() == 'connected' ): | |
398 readMsg = self.recvMsg( self.sock ) | |
399 | |
400 # Make sure we didn't get disconnected | |
401 bytes = len( readMsg ) | |
402 if bytes == 0: | |
403 break | |
404 | |
405 # Check the length of the message | |
406 bytes = len( readMsg ) | |
407 | |
408 # Make sure we are still connected | |
409 if bytes == 0: | |
410 break | |
411 else: | |
412 # Pass along the message so it can be processed | |
413 self.inbox.put( readMsg ) | |
414 self.update_idle_time() #update the last message time | |
415 if bytes == 0: | |
416 self.log.log("Remote has disconnected!", ORPG_NOTE) | |
417 self.set_status(2) | |
418 self.outbox.put( "" ) # Make sure the other thread is woken up! | |
419 self.sendThreadExitEvent.set() | |
420 self.log.log("messenger->recvThread() has terminated...", ORPG_NOTE) | |
421 | |
422 def recvData( self, sock, readSize ): | |
423 """Simple socket receive method. This method will only return when the exact | |
424 byte count has been read from the connection, if remote terminates our | |
425 connection or we get some other socket exception.""" | |
426 data = "" | |
427 offset = 0 | |
428 try: | |
429 while offset != readSize: | |
430 frag = sock.recv( readSize - offset ) | |
431 | |
432 # See if we've been disconnected | |
433 rs = len( frag ) | |
434 if rs <= 0: | |
435 # Loudly raise an exception because we've been disconnected! | |
436 raise IOError, "Remote closed the connection!" | |
437 else: | |
438 # Continue to build complete message | |
439 offset += rs | |
440 data += frag | |
441 except socket.error, e: | |
442 self.log.log("Socket Error: messenger->recvData(): " + str(e), ORPG_CRITICAL) | |
443 data = "" | |
444 return data | |
445 | |
446 def recvMsg( self, sock ): | |
447 """This method now expects to receive a message having a 4-byte prefix length. It will ONLY read | |
448 completed messages. In the event that the remote's connection is terminated, it will throw an | |
449 exception which should allow for the caller to more gracefully handles this exception event. | |
450 | |
451 Because we use strictly reading ONLY based on the length that is told to use, we no longer have to | |
452 worry about partially adjusting for fragmented buffers starting somewhere within a buffer that we've | |
453 read. Rather, it will get ONLY a whole message and nothing more. Everything else will remain buffered | |
454 with the OS until we attempt to read the next complete message.""" | |
455 | |
456 msgData = "" | |
457 try: | |
458 lenData = self.recvData( sock, self.lensize ) | |
459 | |
460 # Now, convert to a usable form | |
461 (length,) = unpack( 'i', lenData ) | |
462 length = socket.ntohl( length ) | |
463 | |
464 # Read exactly the remaining amount of data | |
465 msgData = self.recvData( sock, length ) | |
466 | |
467 if self.isServer: | |
468 self.log.log("('data_recv', " + str(length+4) + ")", ORPG_DEBUG) | |
469 except: | |
470 self.log.log("Exception: messenger->recvMsg():\n" + traceback.format_exc(), ORPG_CRITICAL) | |
471 return msgData | |
472 | |
473 if __name__ == "__main__": | |
474 test = messenger(None) | |
475 print test.build_message('hello', "This is a test message", attrib1="hello world", attrib2="hello world2", attrib3="hello world3") |