Mercurial > luasocket
comparison etc/dispatch.lua @ 0:4b915342e2a8
LuaSocket 2.0.2 + CMake build description.
author | Eric Wing <ewing . public |-at-| gmail . com> |
---|---|
date | Tue, 26 Aug 2008 18:40:01 -0700 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:4b915342e2a8 |
---|---|
1 ----------------------------------------------------------------------------- | |
2 -- A hacked dispatcher module | |
3 -- LuaSocket sample files | |
4 -- Author: Diego Nehab | |
5 -- RCS ID: $$ | |
6 ----------------------------------------------------------------------------- | |
7 local base = _G | |
8 local table = require("table") | |
9 local socket = require("socket") | |
10 local coroutine = require("coroutine") | |
11 module("dispatch") | |
12 | |
13 -- if too much time goes by without any activity in one of our sockets, we | |
14 -- just kill it | |
15 TIMEOUT = 60 | |
16 | |
17 ----------------------------------------------------------------------------- | |
18 -- We implement 3 types of dispatchers: | |
19 -- sequential | |
20 -- coroutine | |
21 -- threaded | |
22 -- The user can choose whatever one is needed | |
23 ----------------------------------------------------------------------------- | |
24 local handlert = {} | |
25 | |
26 -- default handler is coroutine | |
27 function newhandler(mode) | |
28 mode = mode or "coroutine" | |
29 return handlert[mode]() | |
30 end | |
31 | |
32 local function seqstart(self, func) | |
33 return func() | |
34 end | |
35 | |
36 -- sequential handler simply calls the functions and doesn't wrap I/O | |
37 function handlert.sequential() | |
38 return { | |
39 tcp = socket.tcp, | |
40 start = seqstart | |
41 } | |
42 end | |
43 | |
44 ----------------------------------------------------------------------------- | |
45 -- Mega hack. Don't try to do this at home. | |
46 ----------------------------------------------------------------------------- | |
47 -- we can't yield across calls to protect, so we rewrite it with coxpcall | |
48 -- make sure you don't require any module that uses socket.protect before | |
49 -- loading our hack | |
50 function socket.protect(f) | |
51 return function(...) | |
52 local co = coroutine.create(f) | |
53 while true do | |
54 local results = {coroutine.resume(co, base.unpack(arg))} | |
55 local status = table.remove(results, 1) | |
56 if not status then | |
57 if type(results[1]) == 'table' then | |
58 return nil, results[1][1] | |
59 else base.error(results[1]) end | |
60 end | |
61 if coroutine.status(co) == "suspended" then | |
62 arg = {coroutine.yield(base.unpack(results))} | |
63 else | |
64 return base.unpack(results) | |
65 end | |
66 end | |
67 end | |
68 end | |
69 | |
70 ----------------------------------------------------------------------------- | |
71 -- Simple set data structure. O(1) everything. | |
72 ----------------------------------------------------------------------------- | |
73 local function newset() | |
74 local reverse = {} | |
75 local set = {} | |
76 return base.setmetatable(set, {__index = { | |
77 insert = function(set, value) | |
78 if not reverse[value] then | |
79 table.insert(set, value) | |
80 reverse[value] = table.getn(set) | |
81 end | |
82 end, | |
83 remove = function(set, value) | |
84 local index = reverse[value] | |
85 if index then | |
86 reverse[value] = nil | |
87 local top = table.remove(set) | |
88 if top ~= value then | |
89 reverse[top] = index | |
90 set[index] = top | |
91 end | |
92 end | |
93 end | |
94 }}) | |
95 end | |
96 | |
97 ----------------------------------------------------------------------------- | |
98 -- socket.tcp() wrapper for the coroutine dispatcher | |
99 ----------------------------------------------------------------------------- | |
100 local function cowrap(dispatcher, tcp, error) | |
101 if not tcp then return nil, error end | |
102 -- put it in non-blocking mode right away | |
103 tcp:settimeout(0) | |
104 -- metatable for wrap produces new methods on demand for those that we | |
105 -- don't override explicitly. | |
106 local metat = { __index = function(table, key) | |
107 table[key] = function(...) | |
108 arg[1] = tcp | |
109 return tcp[key](base.unpack(arg)) | |
110 end | |
111 return table[key] | |
112 end} | |
113 -- does our user want to do his own non-blocking I/O? | |
114 local zero = false | |
115 -- create a wrap object that will behave just like a real socket object | |
116 local wrap = { } | |
117 -- we ignore settimeout to preserve our 0 timeout, but record whether | |
118 -- the user wants to do his own non-blocking I/O | |
119 function wrap:settimeout(value, mode) | |
120 if value == 0 then zero = true | |
121 else zero = false end | |
122 return 1 | |
123 end | |
124 -- send in non-blocking mode and yield on timeout | |
125 function wrap:send(data, first, last) | |
126 first = (first or 1) - 1 | |
127 local result, error | |
128 while true do | |
129 -- return control to dispatcher and tell it we want to send | |
130 -- if upon return the dispatcher tells us we timed out, | |
131 -- return an error to whoever called us | |
132 if coroutine.yield(dispatcher.sending, tcp) == "timeout" then | |
133 return nil, "timeout" | |
134 end | |
135 -- try sending | |
136 result, error, first = tcp:send(data, first+1, last) | |
137 -- if we are done, or there was an unexpected error, | |
138 -- break away from loop | |
139 if error ~= "timeout" then return result, error, first end | |
140 end | |
141 end | |
142 -- receive in non-blocking mode and yield on timeout | |
143 -- or simply return partial read, if user requested timeout = 0 | |
144 function wrap:receive(pattern, partial) | |
145 local error = "timeout" | |
146 local value | |
147 while true do | |
148 -- return control to dispatcher and tell it we want to receive | |
149 -- if upon return the dispatcher tells us we timed out, | |
150 -- return an error to whoever called us | |
151 if coroutine.yield(dispatcher.receiving, tcp) == "timeout" then | |
152 return nil, "timeout" | |
153 end | |
154 -- try receiving | |
155 value, error, partial = tcp:receive(pattern, partial) | |
156 -- if we are done, or there was an unexpected error, | |
157 -- break away from loop. also, if the user requested | |
158 -- zero timeout, return all we got | |
159 if (error ~= "timeout") or zero then | |
160 return value, error, partial | |
161 end | |
162 end | |
163 end | |
164 -- connect in non-blocking mode and yield on timeout | |
165 function wrap:connect(host, port) | |
166 local result, error = tcp:connect(host, port) | |
167 if error == "timeout" then | |
168 -- return control to dispatcher. we will be writable when | |
169 -- connection succeeds. | |
170 -- if upon return the dispatcher tells us we have a | |
171 -- timeout, just abort | |
172 if coroutine.yield(dispatcher.sending, tcp) == "timeout" then | |
173 return nil, "timeout" | |
174 end | |
175 -- when we come back, check if connection was successful | |
176 result, error = tcp:connect(host, port) | |
177 if result or error == "already connected" then return 1 | |
178 else return nil, "non-blocking connect failed" end | |
179 else return result, error end | |
180 end | |
181 -- accept in non-blocking mode and yield on timeout | |
182 function wrap:accept() | |
183 while 1 do | |
184 -- return control to dispatcher. we will be readable when a | |
185 -- connection arrives. | |
186 -- if upon return the dispatcher tells us we have a | |
187 -- timeout, just abort | |
188 if coroutine.yield(dispatcher.receiving, tcp) == "timeout" then | |
189 return nil, "timeout" | |
190 end | |
191 local client, error = tcp:accept() | |
192 if error ~= "timeout" then | |
193 return cowrap(dispatcher, client, error) | |
194 end | |
195 end | |
196 end | |
197 -- remove cortn from context | |
198 function wrap:close() | |
199 dispatcher.stamp[tcp] = nil | |
200 dispatcher.sending.set:remove(tcp) | |
201 dispatcher.sending.cortn[tcp] = nil | |
202 dispatcher.receiving.set:remove(tcp) | |
203 dispatcher.receiving.cortn[tcp] = nil | |
204 return tcp:close() | |
205 end | |
206 return base.setmetatable(wrap, metat) | |
207 end | |
208 | |
209 | |
210 ----------------------------------------------------------------------------- | |
211 -- Our coroutine dispatcher | |
212 ----------------------------------------------------------------------------- | |
213 local cometat = { __index = {} } | |
214 | |
215 function schedule(cortn, status, operation, tcp) | |
216 if status then | |
217 if cortn and operation then | |
218 operation.set:insert(tcp) | |
219 operation.cortn[tcp] = cortn | |
220 operation.stamp[tcp] = socket.gettime() | |
221 end | |
222 else base.error(operation) end | |
223 end | |
224 | |
225 function kick(operation, tcp) | |
226 operation.cortn[tcp] = nil | |
227 operation.set:remove(tcp) | |
228 end | |
229 | |
230 function wakeup(operation, tcp) | |
231 local cortn = operation.cortn[tcp] | |
232 -- if cortn is still valid, wake it up | |
233 if cortn then | |
234 kick(operation, tcp) | |
235 return cortn, coroutine.resume(cortn) | |
236 -- othrewise, just get scheduler not to do anything | |
237 else | |
238 return nil, true | |
239 end | |
240 end | |
241 | |
242 function abort(operation, tcp) | |
243 local cortn = operation.cortn[tcp] | |
244 if cortn then | |
245 kick(operation, tcp) | |
246 coroutine.resume(cortn, "timeout") | |
247 end | |
248 end | |
249 | |
250 -- step through all active cortns | |
251 function cometat.__index:step() | |
252 -- check which sockets are interesting and act on them | |
253 local readable, writable = socket.select(self.receiving.set, | |
254 self.sending.set, 1) | |
255 -- for all readable connections, resume their cortns and reschedule | |
256 -- when they yield back to us | |
257 for _, tcp in base.ipairs(readable) do | |
258 schedule(wakeup(self.receiving, tcp)) | |
259 end | |
260 -- for all writable connections, do the same | |
261 for _, tcp in base.ipairs(writable) do | |
262 schedule(wakeup(self.sending, tcp)) | |
263 end | |
264 -- politely ask replacement I/O functions in idle cortns to | |
265 -- return reporting a timeout | |
266 local now = socket.gettime() | |
267 for tcp, stamp in base.pairs(self.stamp) do | |
268 if tcp.class == "tcp{client}" and now - stamp > TIMEOUT then | |
269 abort(self.sending, tcp) | |
270 abort(self.receiving, tcp) | |
271 end | |
272 end | |
273 end | |
274 | |
275 function cometat.__index:start(func) | |
276 local cortn = coroutine.create(func) | |
277 schedule(cortn, coroutine.resume(cortn)) | |
278 end | |
279 | |
280 function handlert.coroutine() | |
281 local stamp = {} | |
282 local dispatcher = { | |
283 stamp = stamp, | |
284 sending = { | |
285 name = "sending", | |
286 set = newset(), | |
287 cortn = {}, | |
288 stamp = stamp | |
289 }, | |
290 receiving = { | |
291 name = "receiving", | |
292 set = newset(), | |
293 cortn = {}, | |
294 stamp = stamp | |
295 }, | |
296 } | |
297 function dispatcher.tcp() | |
298 return cowrap(dispatcher, socket.tcp()) | |
299 end | |
300 return base.setmetatable(dispatcher, cometat) | |
301 end | |
302 |