diff --git a/lib/modules/Protocols.pmod/SocketIO.pmod b/lib/modules/Protocols.pmod/SocketIO.pmod index cef99a56844f6f43c0b557a4d99c1010db324f9a..3233fb034e5e17473209de076e3e39a2eeb5e5f3 100644 --- a/lib/modules/Protocols.pmod/SocketIO.pmod +++ b/lib/modules/Protocols.pmod/SocketIO.pmod @@ -60,7 +60,7 @@ //! Global options for all SocketIO instances. //! //! @seealso -//! @[SocketIO.farm()] +//! @[SocketIO.Universe.farm()] final mapping options = ([ ]); @@ -73,14 +73,21 @@ private enum { private array emptyarray = ({}); +final void sendackcb(function(mixed ...:void) fn, mixed arg) { + if (fn) + if (arrayp(arg)) + fn(@arg); + else + fn(arg); +} + class Universe { // All Socket.IO nsps with connected clients in this universe. - private mapping(string: - mapping(string|int:function(mixed, string, mixed ...:mixed))) - nsps = ([]); + private mapping(string:mapping(string|int: function(mixed, + void|function(mixed,mixed ...:void),mixed ...:void))) nsps = ([]); //! All SocketIO sessions in this universe. - final multiset(Client) clients = (<>); + final multiset(object) clients = (<>); //! @param _options //! Optional options to override the defaults. @@ -121,21 +128,27 @@ class Universe { //! //! @seealso //! @[EngineIO.farm()] - final Client farm(Protocols.WebSocket.Request req, void|mapping _options) { + Client farm(.WebSocket.Request req, void|mapping options) { + .EngineIO.Socket con = seed(req, options); + return con && Client(this, con); + } + + //! Seed farm with EngineIO connection. + protected .EngineIO.Socket seed(.WebSocket.Request req, + void|mapping _options) { if (_options) _options = options + _options; else _options = options; - Protocols.EngineIO.Socket con = Protocols.EngineIO.farm(req, _options); - return con && Client(this, con); + return .EngineIO.farm(req, _options); } - private void low_register(string namespace, void|mixed event, - void|function(mixed, string, mixed ...:mixed) fn) { + protected void low_register(string namespace, void|mixed event, + void|function(mixed, mixed, mixed, mixed ...:mixed) fn) { mapping nsp = nsps[namespace]; if (!nsp) nsps[namespace] = nsp = ([]); - else if(fn) + if(fn) nsp[event] = fn; else { m_delete(nsp, event); @@ -149,11 +162,12 @@ class Universe { //! Workers can be unregistered by omitting @expr{fn@}. //! Having a default worker per namespace in addition zero or more //! event specific ones, is supported. - final variant void register(string namespace, string event, - void|function(mixed, string, string, mixed ...:mixed) fn) { + variant void register(string namespace, string event, + void|function(mixed, void|function(mixed ...:void), + string, string, mixed ...:mixed) fn) { low_register(namespace, event, fn); } - final variant void register(string namespace, + variant void register(string namespace, void|function(mixed, string, mixed ...:mixed) fn) { low_register(namespace, 0, fn); } @@ -164,28 +178,32 @@ class Universe { //! //! @returns //! 0 upon success, the error itself otherwise. - final mixed + mixed connect(Client client, string oldnamespace, void|string newnamespace) { clients[client] = !!newnamespace; return nsps[newnamespace] ? 0 : invalidnamespace; } //! - final mixed read(string namespace, mixed id, - function(mixed ...:void) sendackcb, mixed ... data) { - function(mixed, string, mixed ...:mixed) fn; + protected string|function lookupcb(string namespace, array data) { + function fn; mapping nsp = nsps[namespace]; if (!nsp) return "\"" + invalidnamespace + "\""; if (!(fn = nsp[sizeof(data) && data[0]] || nsp[0])) return (sizeof(data) ? Standards.JSON.encode(data[0]) : "") + ",\"Unregistered event\""; - mixed res = fn(id, namespace, @data); - if (sendackcb) - if (arrayp(res)) - sendackcb(@res); - else - sendackcb(res); + return fn; + } + + //! + mixed read(string namespace, mixed id, + function(mixed ...:void) sendackcb, mixed ... data) { + string|function(mixed, void|function(mixed ...:void), + string, mixed ...:void) fn; + if (!functionp(fn = lookupcb(namespace, data))) + return fn; + fn(id, sendackcb, namespace, @data); return 0; } } @@ -197,6 +215,9 @@ class Client { //! string namespace=""; + //! + function(void:void) onclose; + private mixed id; private enum {RUNNING = 0, SDISCONNECT, RDISCONNECT}; private int state = RUNNING; @@ -225,10 +246,23 @@ class Client { //! Contains the last request seen on this connection. //! Can be used to obtain cookies etc. - final Protocols.WebSocket.Request `request() { + final .WebSocket.Request `request() { return conn.request; } + //! + final int `skip_compression() { + return conn.skip_compression; + } + + //! Set to any of @expr{Protocols.WebSocket.OVERRIDE_COMPRESS@} + //! or @expr{Protocols.WebSocket.OVERRIDE_SKIPCOMPRESS@} + //! to bypass of the compression heuristics on binary + //! frames in a WebSocket connection only. + final int `skip_compression=(int i) { + return conn.skip_compression = i; + } + //! Send text or binary messages. final void write(mixed|function(mixed, mixed ...:void) ack_cb, mixed ... data) { @@ -297,12 +331,14 @@ class Client { } //! Close the socket signalling the other side. - final void close() { + void close() { if (state < SDISCONNECT) { PD("Send disconnect, %s state %O\n", sid, state); state = SDISCONNECT; send(DISCONNECT); conn.close(); + if (onclose) + onclose(); } } @@ -348,7 +384,7 @@ class Client { function(mixed, mixed ...:void) ackfn; if(ackfn = ack_cbs[curackid]) { m_delete(ack_cbs, curackid); // ACK callbacks can only be - ackfn(id, @curevent); // executed once + ackfn(query_id(), @curevent); // executed once } break; } @@ -366,7 +402,7 @@ class Client { private void recv(mixed eid, string|Stdio.Buffer data) { PD("Recv %s %O\n", sid, (string)data); if (!stringp(data)) { - curbins[-bins] = data; + curbins[-bins] = data->read_buffer(sizeof(data),1); if (!--bins) { treeimport(curevent); recvcb(); @@ -431,7 +467,7 @@ class Client { closedown(); } - protected void create(Universe _universe, Protocols.EngineIO.Socket _con) { + protected void create(Universe _universe, .EngineIO.Socket _con) { universe = _universe; conn = _con; conn.set_callbacks(recv, closedown);