comparison etc/dispatch.lua @ 0:4b915342e2a8

LuaSocket 2.0.2 + CMake build description.
author Eric Wing <ewing . public |-at-| gmail . com>
date Tue, 26 Aug 2008 18:40:01 -0700
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4b915342e2a8
1 -----------------------------------------------------------------------------
2 -- A hacked dispatcher module
3 -- LuaSocket sample files
4 -- Author: Diego Nehab
5 -- RCS ID: $$
6 -----------------------------------------------------------------------------
7 local base = _G
8 local table = require("table")
9 local socket = require("socket")
10 local coroutine = require("coroutine")
11 module("dispatch")
12
13 -- if too much time goes by without any activity in one of our sockets, we
14 -- just kill it
15 TIMEOUT = 60
16
17 -----------------------------------------------------------------------------
18 -- We implement 3 types of dispatchers:
19 -- sequential
20 -- coroutine
21 -- threaded
22 -- The user can choose whatever one is needed
23 -----------------------------------------------------------------------------
24 local handlert = {}
25
26 -- default handler is coroutine
27 function newhandler(mode)
28 mode = mode or "coroutine"
29 return handlert[mode]()
30 end
31
32 local function seqstart(self, func)
33 return func()
34 end
35
36 -- sequential handler simply calls the functions and doesn't wrap I/O
37 function handlert.sequential()
38 return {
39 tcp = socket.tcp,
40 start = seqstart
41 }
42 end
43
44 -----------------------------------------------------------------------------
45 -- Mega hack. Don't try to do this at home.
46 -----------------------------------------------------------------------------
47 -- we can't yield across calls to protect, so we rewrite it with coxpcall
48 -- make sure you don't require any module that uses socket.protect before
49 -- loading our hack
50 function socket.protect(f)
51 return function(...)
52 local co = coroutine.create(f)
53 while true do
54 local results = {coroutine.resume(co, base.unpack(arg))}
55 local status = table.remove(results, 1)
56 if not status then
57 if type(results[1]) == 'table' then
58 return nil, results[1][1]
59 else base.error(results[1]) end
60 end
61 if coroutine.status(co) == "suspended" then
62 arg = {coroutine.yield(base.unpack(results))}
63 else
64 return base.unpack(results)
65 end
66 end
67 end
68 end
69
70 -----------------------------------------------------------------------------
71 -- Simple set data structure. O(1) everything.
72 -----------------------------------------------------------------------------
73 local function newset()
74 local reverse = {}
75 local set = {}
76 return base.setmetatable(set, {__index = {
77 insert = function(set, value)
78 if not reverse[value] then
79 table.insert(set, value)
80 reverse[value] = table.getn(set)
81 end
82 end,
83 remove = function(set, value)
84 local index = reverse[value]
85 if index then
86 reverse[value] = nil
87 local top = table.remove(set)
88 if top ~= value then
89 reverse[top] = index
90 set[index] = top
91 end
92 end
93 end
94 }})
95 end
96
97 -----------------------------------------------------------------------------
98 -- socket.tcp() wrapper for the coroutine dispatcher
99 -----------------------------------------------------------------------------
100 local function cowrap(dispatcher, tcp, error)
101 if not tcp then return nil, error end
102 -- put it in non-blocking mode right away
103 tcp:settimeout(0)
104 -- metatable for wrap produces new methods on demand for those that we
105 -- don't override explicitly.
106 local metat = { __index = function(table, key)
107 table[key] = function(...)
108 arg[1] = tcp
109 return tcp[key](base.unpack(arg))
110 end
111 return table[key]
112 end}
113 -- does our user want to do his own non-blocking I/O?
114 local zero = false
115 -- create a wrap object that will behave just like a real socket object
116 local wrap = { }
117 -- we ignore settimeout to preserve our 0 timeout, but record whether
118 -- the user wants to do his own non-blocking I/O
119 function wrap:settimeout(value, mode)
120 if value == 0 then zero = true
121 else zero = false end
122 return 1
123 end
124 -- send in non-blocking mode and yield on timeout
125 function wrap:send(data, first, last)
126 first = (first or 1) - 1
127 local result, error
128 while true do
129 -- return control to dispatcher and tell it we want to send
130 -- if upon return the dispatcher tells us we timed out,
131 -- return an error to whoever called us
132 if coroutine.yield(dispatcher.sending, tcp) == "timeout" then
133 return nil, "timeout"
134 end
135 -- try sending
136 result, error, first = tcp:send(data, first+1, last)
137 -- if we are done, or there was an unexpected error,
138 -- break away from loop
139 if error ~= "timeout" then return result, error, first end
140 end
141 end
142 -- receive in non-blocking mode and yield on timeout
143 -- or simply return partial read, if user requested timeout = 0
144 function wrap:receive(pattern, partial)
145 local error = "timeout"
146 local value
147 while true do
148 -- return control to dispatcher and tell it we want to receive
149 -- if upon return the dispatcher tells us we timed out,
150 -- return an error to whoever called us
151 if coroutine.yield(dispatcher.receiving, tcp) == "timeout" then
152 return nil, "timeout"
153 end
154 -- try receiving
155 value, error, partial = tcp:receive(pattern, partial)
156 -- if we are done, or there was an unexpected error,
157 -- break away from loop. also, if the user requested
158 -- zero timeout, return all we got
159 if (error ~= "timeout") or zero then
160 return value, error, partial
161 end
162 end
163 end
164 -- connect in non-blocking mode and yield on timeout
165 function wrap:connect(host, port)
166 local result, error = tcp:connect(host, port)
167 if error == "timeout" then
168 -- return control to dispatcher. we will be writable when
169 -- connection succeeds.
170 -- if upon return the dispatcher tells us we have a
171 -- timeout, just abort
172 if coroutine.yield(dispatcher.sending, tcp) == "timeout" then
173 return nil, "timeout"
174 end
175 -- when we come back, check if connection was successful
176 result, error = tcp:connect(host, port)
177 if result or error == "already connected" then return 1
178 else return nil, "non-blocking connect failed" end
179 else return result, error end
180 end
181 -- accept in non-blocking mode and yield on timeout
182 function wrap:accept()
183 while 1 do
184 -- return control to dispatcher. we will be readable when a
185 -- connection arrives.
186 -- if upon return the dispatcher tells us we have a
187 -- timeout, just abort
188 if coroutine.yield(dispatcher.receiving, tcp) == "timeout" then
189 return nil, "timeout"
190 end
191 local client, error = tcp:accept()
192 if error ~= "timeout" then
193 return cowrap(dispatcher, client, error)
194 end
195 end
196 end
197 -- remove cortn from context
198 function wrap:close()
199 dispatcher.stamp[tcp] = nil
200 dispatcher.sending.set:remove(tcp)
201 dispatcher.sending.cortn[tcp] = nil
202 dispatcher.receiving.set:remove(tcp)
203 dispatcher.receiving.cortn[tcp] = nil
204 return tcp:close()
205 end
206 return base.setmetatable(wrap, metat)
207 end
208
209
210 -----------------------------------------------------------------------------
211 -- Our coroutine dispatcher
212 -----------------------------------------------------------------------------
213 local cometat = { __index = {} }
214
215 function schedule(cortn, status, operation, tcp)
216 if status then
217 if cortn and operation then
218 operation.set:insert(tcp)
219 operation.cortn[tcp] = cortn
220 operation.stamp[tcp] = socket.gettime()
221 end
222 else base.error(operation) end
223 end
224
225 function kick(operation, tcp)
226 operation.cortn[tcp] = nil
227 operation.set:remove(tcp)
228 end
229
230 function wakeup(operation, tcp)
231 local cortn = operation.cortn[tcp]
232 -- if cortn is still valid, wake it up
233 if cortn then
234 kick(operation, tcp)
235 return cortn, coroutine.resume(cortn)
236 -- othrewise, just get scheduler not to do anything
237 else
238 return nil, true
239 end
240 end
241
242 function abort(operation, tcp)
243 local cortn = operation.cortn[tcp]
244 if cortn then
245 kick(operation, tcp)
246 coroutine.resume(cortn, "timeout")
247 end
248 end
249
250 -- step through all active cortns
251 function cometat.__index:step()
252 -- check which sockets are interesting and act on them
253 local readable, writable = socket.select(self.receiving.set,
254 self.sending.set, 1)
255 -- for all readable connections, resume their cortns and reschedule
256 -- when they yield back to us
257 for _, tcp in base.ipairs(readable) do
258 schedule(wakeup(self.receiving, tcp))
259 end
260 -- for all writable connections, do the same
261 for _, tcp in base.ipairs(writable) do
262 schedule(wakeup(self.sending, tcp))
263 end
264 -- politely ask replacement I/O functions in idle cortns to
265 -- return reporting a timeout
266 local now = socket.gettime()
267 for tcp, stamp in base.pairs(self.stamp) do
268 if tcp.class == "tcp{client}" and now - stamp > TIMEOUT then
269 abort(self.sending, tcp)
270 abort(self.receiving, tcp)
271 end
272 end
273 end
274
275 function cometat.__index:start(func)
276 local cortn = coroutine.create(func)
277 schedule(cortn, coroutine.resume(cortn))
278 end
279
280 function handlert.coroutine()
281 local stamp = {}
282 local dispatcher = {
283 stamp = stamp,
284 sending = {
285 name = "sending",
286 set = newset(),
287 cortn = {},
288 stamp = stamp
289 },
290 receiving = {
291 name = "receiving",
292 set = newset(),
293 cortn = {},
294 stamp = stamp
295 },
296 }
297 function dispatcher.tcp()
298 return cowrap(dispatcher, socket.tcp())
299 end
300 return base.setmetatable(dispatcher, cometat)
301 end
302