Mercurial > luasocket
diff etc/dispatch.lua @ 0:4b915342e2a8
LuaSocket 2.0.2 + CMake build description.
author | Eric Wing <ewing . public |-at-| gmail . com> |
---|---|
date | Tue, 26 Aug 2008 18:40:01 -0700 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/etc/dispatch.lua Tue Aug 26 18:40:01 2008 -0700 @@ -0,0 +1,302 @@ +----------------------------------------------------------------------------- +-- A hacked dispatcher module +-- LuaSocket sample files +-- Author: Diego Nehab +-- RCS ID: $$ +----------------------------------------------------------------------------- +local base = _G +local table = require("table") +local socket = require("socket") +local coroutine = require("coroutine") +module("dispatch") + +-- if too much time goes by without any activity in one of our sockets, we +-- just kill it +TIMEOUT = 60 + +----------------------------------------------------------------------------- +-- We implement 3 types of dispatchers: +-- sequential +-- coroutine +-- threaded +-- The user can choose whatever one is needed +----------------------------------------------------------------------------- +local handlert = {} + +-- default handler is coroutine +function newhandler(mode) + mode = mode or "coroutine" + return handlert[mode]() +end + +local function seqstart(self, func) + return func() +end + +-- sequential handler simply calls the functions and doesn't wrap I/O +function handlert.sequential() + return { + tcp = socket.tcp, + start = seqstart + } +end + +----------------------------------------------------------------------------- +-- Mega hack. Don't try to do this at home. +----------------------------------------------------------------------------- +-- we can't yield across calls to protect, so we rewrite it with coxpcall +-- make sure you don't require any module that uses socket.protect before +-- loading our hack +function socket.protect(f) + return function(...) + local co = coroutine.create(f) + while true do + local results = {coroutine.resume(co, base.unpack(arg))} + local status = table.remove(results, 1) + if not status then + if type(results[1]) == 'table' then + return nil, results[1][1] + else base.error(results[1]) end + end + if coroutine.status(co) == "suspended" then + arg = {coroutine.yield(base.unpack(results))} + else + return base.unpack(results) + end + end + end +end + +----------------------------------------------------------------------------- +-- Simple set data structure. O(1) everything. +----------------------------------------------------------------------------- +local function newset() + local reverse = {} + local set = {} + return base.setmetatable(set, {__index = { + insert = function(set, value) + if not reverse[value] then + table.insert(set, value) + reverse[value] = table.getn(set) + end + end, + remove = function(set, value) + local index = reverse[value] + if index then + reverse[value] = nil + local top = table.remove(set) + if top ~= value then + reverse[top] = index + set[index] = top + end + end + end + }}) +end + +----------------------------------------------------------------------------- +-- socket.tcp() wrapper for the coroutine dispatcher +----------------------------------------------------------------------------- +local function cowrap(dispatcher, tcp, error) + if not tcp then return nil, error end + -- put it in non-blocking mode right away + tcp:settimeout(0) + -- metatable for wrap produces new methods on demand for those that we + -- don't override explicitly. + local metat = { __index = function(table, key) + table[key] = function(...) + arg[1] = tcp + return tcp[key](base.unpack(arg)) + end + return table[key] + end} + -- does our user want to do his own non-blocking I/O? + local zero = false + -- create a wrap object that will behave just like a real socket object + local wrap = { } + -- we ignore settimeout to preserve our 0 timeout, but record whether + -- the user wants to do his own non-blocking I/O + function wrap:settimeout(value, mode) + if value == 0 then zero = true + else zero = false end + return 1 + end + -- send in non-blocking mode and yield on timeout + function wrap:send(data, first, last) + first = (first or 1) - 1 + local result, error + while true do + -- return control to dispatcher and tell it we want to send + -- if upon return the dispatcher tells us we timed out, + -- return an error to whoever called us + if coroutine.yield(dispatcher.sending, tcp) == "timeout" then + return nil, "timeout" + end + -- try sending + result, error, first = tcp:send(data, first+1, last) + -- if we are done, or there was an unexpected error, + -- break away from loop + if error ~= "timeout" then return result, error, first end + end + end + -- receive in non-blocking mode and yield on timeout + -- or simply return partial read, if user requested timeout = 0 + function wrap:receive(pattern, partial) + local error = "timeout" + local value + while true do + -- return control to dispatcher and tell it we want to receive + -- if upon return the dispatcher tells us we timed out, + -- return an error to whoever called us + if coroutine.yield(dispatcher.receiving, tcp) == "timeout" then + return nil, "timeout" + end + -- try receiving + value, error, partial = tcp:receive(pattern, partial) + -- if we are done, or there was an unexpected error, + -- break away from loop. also, if the user requested + -- zero timeout, return all we got + if (error ~= "timeout") or zero then + return value, error, partial + end + end + end + -- connect in non-blocking mode and yield on timeout + function wrap:connect(host, port) + local result, error = tcp:connect(host, port) + if error == "timeout" then + -- return control to dispatcher. we will be writable when + -- connection succeeds. + -- if upon return the dispatcher tells us we have a + -- timeout, just abort + if coroutine.yield(dispatcher.sending, tcp) == "timeout" then + return nil, "timeout" + end + -- when we come back, check if connection was successful + result, error = tcp:connect(host, port) + if result or error == "already connected" then return 1 + else return nil, "non-blocking connect failed" end + else return result, error end + end + -- accept in non-blocking mode and yield on timeout + function wrap:accept() + while 1 do + -- return control to dispatcher. we will be readable when a + -- connection arrives. + -- if upon return the dispatcher tells us we have a + -- timeout, just abort + if coroutine.yield(dispatcher.receiving, tcp) == "timeout" then + return nil, "timeout" + end + local client, error = tcp:accept() + if error ~= "timeout" then + return cowrap(dispatcher, client, error) + end + end + end + -- remove cortn from context + function wrap:close() + dispatcher.stamp[tcp] = nil + dispatcher.sending.set:remove(tcp) + dispatcher.sending.cortn[tcp] = nil + dispatcher.receiving.set:remove(tcp) + dispatcher.receiving.cortn[tcp] = nil + return tcp:close() + end + return base.setmetatable(wrap, metat) +end + + +----------------------------------------------------------------------------- +-- Our coroutine dispatcher +----------------------------------------------------------------------------- +local cometat = { __index = {} } + +function schedule(cortn, status, operation, tcp) + if status then + if cortn and operation then + operation.set:insert(tcp) + operation.cortn[tcp] = cortn + operation.stamp[tcp] = socket.gettime() + end + else base.error(operation) end +end + +function kick(operation, tcp) + operation.cortn[tcp] = nil + operation.set:remove(tcp) +end + +function wakeup(operation, tcp) + local cortn = operation.cortn[tcp] + -- if cortn is still valid, wake it up + if cortn then + kick(operation, tcp) + return cortn, coroutine.resume(cortn) + -- othrewise, just get scheduler not to do anything + else + return nil, true + end +end + +function abort(operation, tcp) + local cortn = operation.cortn[tcp] + if cortn then + kick(operation, tcp) + coroutine.resume(cortn, "timeout") + end +end + +-- step through all active cortns +function cometat.__index:step() + -- check which sockets are interesting and act on them + local readable, writable = socket.select(self.receiving.set, + self.sending.set, 1) + -- for all readable connections, resume their cortns and reschedule + -- when they yield back to us + for _, tcp in base.ipairs(readable) do + schedule(wakeup(self.receiving, tcp)) + end + -- for all writable connections, do the same + for _, tcp in base.ipairs(writable) do + schedule(wakeup(self.sending, tcp)) + end + -- politely ask replacement I/O functions in idle cortns to + -- return reporting a timeout + local now = socket.gettime() + for tcp, stamp in base.pairs(self.stamp) do + if tcp.class == "tcp{client}" and now - stamp > TIMEOUT then + abort(self.sending, tcp) + abort(self.receiving, tcp) + end + end +end + +function cometat.__index:start(func) + local cortn = coroutine.create(func) + schedule(cortn, coroutine.resume(cortn)) +end + +function handlert.coroutine() + local stamp = {} + local dispatcher = { + stamp = stamp, + sending = { + name = "sending", + set = newset(), + cortn = {}, + stamp = stamp + }, + receiving = { + name = "receiving", + set = newset(), + cortn = {}, + stamp = stamp + }, + } + function dispatcher.tcp() + return cowrap(dispatcher, socket.tcp()) + end + return base.setmetatable(dispatcher, cometat) +end +