diff --git a/lib/modules/Protocols.pmod/SocketIO.pmod b/lib/modules/Protocols.pmod/SocketIO.pmod index 282839c1374132f9dcdbae685100f6c2fee35bbc..cef99a56844f6f43c0b557a4d99c1010db324f9a 100644 --- a/lib/modules/Protocols.pmod/SocketIO.pmod +++ b/lib/modules/Protocols.pmod/SocketIO.pmod @@ -73,15 +73,14 @@ private enum { private array emptyarray = ({}); -private enum { - ICLIENTS=0, IEVENTS, IREADCB, ICLOSECB, IOPENCB -}; - class Universe { - // All Socket.IO _nsps with connected clients in this universe. - final mapping(string: array(multiset(.EngineIO.Socket)| - mapping(string:multiset(function))|function)) - _nsps = ([]); + // All Socket.IO nsps with connected clients in this universe. + private mapping(string: + mapping(string|int:function(mixed, string, mixed ...:mixed))) + nsps = ([]); + + //! All SocketIO sessions in this universe. + final multiset(Client) clients = (<>); //! @param _options //! Optional options to override the defaults. @@ -93,7 +92,7 @@ class Universe { //! //!Protocols.SocketIO.Universe myuniverse; //! - //!void echo(mixed id, function sendack, mixed ... data) { + //!mixed echo(mixed id, string namespace, mixed ... data) { //! id->write(data); //! if (sendack) //! sendack("Ack","me","harder"); @@ -115,7 +114,7 @@ class Universe { //! //!int main(int argc, array(string) argv) { //! myuniverse = Protocols.SocketIO.Universe(); // Create universe - //! myuniverse.createnamespace("", echo); // Register root namespace + //! myuniverse.register("", echo); // Register root namespace //! Protocols.WebSocket.Port(httprequest, wsrequest, 80); //! return -1; //!} @@ -131,79 +130,63 @@ class Universe { return con && Client(this, con); } - //! Create a new or update an existing namespace. - final void createnamespace(string namespace, - void|function(mixed, function(mixed, mixed ...:void), mixed ...:void) - read_cb, - void|function(mixed:void) close_cb, - void|function(mixed:void) open_cb) { - array nsp = _nsps[namespace]; + private void low_register(string namespace, void|mixed event, + void|function(mixed, string, mixed ...:mixed) fn) { + mapping nsp = nsps[namespace]; if (!nsp) - _nsps[namespace] = ({(<>), ([]), read_cb, close_cb, open_cb}); + nsps[namespace] = nsp = ([]); + else if(fn) + nsp[event] = fn; else { - nsp[IREADCB] = read_cb; - nsp[ICLOSECB] = close_cb; - nsp[IOPENCB] = open_cb; + m_delete(nsp, event); + if (!sizeof(nsp)) + m_delete(nsps, namespace); } } - //! Drop a namespace. - final void dropnamespace(string namespace) { - array nsp = _nsps[namespace]; - m_delete(_nsps, namespace); - foreach (nsp[ICLIENTS];; Client client) - if (client) - client.close(); + //! Register worker callback on namespace and event. + //! There can be only one worker per tuple. + //! Workers can be unregistered by omitting @expr{fn@}. + //! Having a default worker per namespace in addition zero or more + //! event specific ones, is supported. + final variant void register(string namespace, string event, + void|function(mixed, string, string, mixed ...:mixed) fn) { + low_register(namespace, event, fn); } - - //! ack_cb can be specified as the first argument following namespace. - //! Returns the number of clients broadcast to. - final int broadcast(string namespace, mixed ... data) { - Client client; - int cnt = 0; - array nsp = _nsps[namespace]; - foreach (nsp[ICLIENTS];; client) - cnt++, client.write(@data); - return cnt; + final variant void register(string namespace, + void|function(mixed, string, mixed ...:mixed) fn) { + low_register(namespace, 0, fn); } - //! - final int connected(string namespace) { - array nsp = _nsps[namespace]; - return sizeof(nsp[ICLIENTS]); - } + private string invalidnamespace = "Invalid namespace"; + //! Connects and disconnects clients. //! - final multiset clients(string namespace) { - array nsp = _nsps[namespace]; - return nsp[ICLIENTS]; - } - - //! Use the indices to get a list of the _nsps in use. - final mapping namespaces() { - return _nsps; + //! @returns + //! 0 upon success, the error itself otherwise. + final mixed + connect(Client client, string oldnamespace, void|string newnamespace) { + clients[client] = !!newnamespace; + return nsps[newnamespace] ? 0 : invalidnamespace; } - private multiset getlisteners(string namespace, string event) { - mapping events = _nsps[namespace][IEVENTS]; - multiset listeners = events[event]; - if (!listeners) - events[event] = listeners = (<>); - return listeners; - } - - //! Register listener to an event on a namespace. - final void on(string namespace, string event, - function(mixed, function(mixed, mixed ...:void), string, mixed ...:void) - event_cb) { - getlisteners(namespace, event)[event_cb] = 1; - } - - //! Unregister listener to an event on a namespace. - final void off(string namespace, string event, - function(mixed, function(mixed, mixed ...:void), string, mixed ...:void) - event_cb) { - getlisteners(namespace, event)[event_cb] = 0; + //! + final mixed read(string namespace, mixed id, + function(mixed ...:void) sendackcb, mixed ... data) { + function(mixed, string, mixed ...:mixed) fn; + mapping nsp = nsps[namespace]; + if (!nsp) + return "\"" + invalidnamespace + "\""; + if (!(fn = nsp[sizeof(data) && data[0]] || nsp[0])) + return (sizeof(data) ? Standards.JSON.encode(data[0]) : "") + + ",\"Unregistered event\""; + mixed res = fn(id, namespace, @data); + if (sendackcb) + if (arrayp(res)) + sendackcb(@res); + else + sendackcb(res); + return 0; } } @@ -214,9 +197,6 @@ class Client { //! string namespace=""; - private function(mixed, - function(mixed, mixed ...:void), mixed ...:void) read_cb; - private function(mixed:void) open_cb, close_cb; private mixed id; private enum {RUNNING = 0, SDISCONNECT, RDISCONNECT}; private int state = RUNNING; @@ -249,19 +229,6 @@ class Client { return conn.request; } - private void fetchcallbacks() { - array nsp = universe._nsps[namespace]; - if (nsp) { - read_cb = nsp[IREADCB]; - close_cb = nsp[ICLOSECB]; - open_cb = nsp[IOPENCB]; - } else { - read_cb = 0; - close_cb = 0; - open_cb = 0; - } - } - //! Send text or binary messages. final void write(mixed|function(mixed, mixed ...:void) ack_cb, mixed ... data) { @@ -355,53 +322,44 @@ class Client { } private void recvcb() { - int cackid = curackid; // Instantiate ackid in - void sendackcb(mixed ... data) { // saved callback-stackframe - PD("Ack %d %O\n", cackid, data); - send(ACK, data, cackid); + void readthread(string namespace, int cackid, mixed ... data) { + void sendackcb(mixed ... data) { // saved callback-stackframe + PD("Ack %d %O\n", cackid, data); + send(ACK, data, cackid); + }; + string err = universe ? universe.read(namespace, query_id(), + cackid >= 0 && sendackcb, @data) : "\"Unknown universe\""; + if (err) + send(ERROR, namespace + "," + err); }; - if (sizeof(curevent) && stringp(curevent[0])) { - multiset listeners = universe._nsps[namespace][IEVENTS][curevent[0]]; - if (listeners && sizeof(listeners)) { - function(mixed ...:void) cachesendackcb = cackid >= 0 && sendackcb; - function(mixed, function(mixed, mixed ...:void), string, - mixed ...:void) event_cb; - foreach (listeners; event_cb;) - event_cb(query_id(), cachesendackcb, @curevent); - return; // Skip the default callback + switch(curtype) { + default: { +#if constant(Thread.Thread) + // Spawn one thread per message/callback. + // FIXME Too much overhead? + Thread.Thread(readthread, namespace, curackid, @curevent); +#else + readthread(namespace, curackid, @curevent); +#endif + break; } - } - if (read_cb) - switch(curtype) { - default: { - read_cb(query_id(), cackid >= 0 && sendackcb, @curevent); - break; - } - case ACK: - case BINARY_ACK: { - function(mixed, mixed ...:void) ackfn; - if(ackfn = ack_cbs[curackid]) { - m_delete(ack_cbs, curackid); // ACK callbacks can only be - ackfn(id, @curevent); // executed once - } - break; + case ACK: + case BINARY_ACK: { + function(mixed, mixed ...:void) ackfn; + if(ackfn = ack_cbs[curackid]) { + m_delete(ack_cbs, curackid); // ACK callbacks can only be + ackfn(id, @curevent); // executed once } + break; } - } - - private void unregister() { - array nsp = universe._nsps[namespace]; - if (nsp) - nsp[ICLIENTS][this] = 0; + } } private void closedown() { if (state < RDISCONNECT) { close(); state = RDISCONNECT; - unregister(); - if (close_cb) - close_cb(query_id()); + universe.connect(this, namespace); } } @@ -423,22 +381,20 @@ class Client { case ERROR: SUSERERROR(data); // Pass error up case CONNECT: - if (universe._nsps[data]) { - unregister(); // Old namespace - universe._nsps[namespace = data][ICLIENTS][this] = 1; - fetchcallbacks(); + if (universe) { + mixed e; + if (e = universe.connect(this, namespace, data)) { + send(ERROR, data + "," + Standards.JSON.encode(e)); + break; + } state = RUNNING; - send(CONNECT, namespace); // Confirm namespace - if (open_cb) - open_cb(query_id()); + send(CONNECT, namespace = data); // Confirm namespace } else - send(ERROR, data+",\"Invalid namespace\""); + send(ERROR, data + ",\"No universe to register to\""); break; case DISCONNECT: closedown(); - close_cb = 0; - open_cb = 0; - read_cb = 0; + universe = 0; id = 0; // Delete all references to this Client break; case EVENT: @@ -478,7 +434,6 @@ class Client { protected void create(Universe _universe, Protocols.EngineIO.Socket _con) { universe = _universe; conn = _con; - fetchcallbacks(); conn.set_callbacks(recv, closedown); send(CONNECT); // Autconnect to root namespace PD("New SocketIO sid: %O\n", sid); @@ -488,8 +443,8 @@ class Client { string res=UNDEFINED; switch (type) { case 'O': - res = sprintf(DRIVERNAME"(%s.%d,%d,%d)", - sid, protocol, state, sizeof(universe._nsps)); + res = sprintf(DRIVERNAME"(%s.%d,%d)", + sid, protocol, state); break; } return res;