Skip to content
Snippets Groups Projects
Commit 4e28f2d7 authored by Stephen R. van den Berg's avatar Stephen R. van den Berg
Browse files

SocketIO: Streamline Universe API, events run in their own threads.

parent f3038766
No related branches found
No related tags found
No related merge requests found
...@@ -73,15 +73,14 @@ private enum { ...@@ -73,15 +73,14 @@ private enum {
private array emptyarray = ({}); private array emptyarray = ({});
private enum {
ICLIENTS=0, IEVENTS, IREADCB, ICLOSECB, IOPENCB
};
class Universe { class Universe {
// All Socket.IO _nsps with connected clients in this universe. // All Socket.IO nsps with connected clients in this universe.
final mapping(string: array(multiset(.EngineIO.Socket)| private mapping(string:
mapping(string:multiset(function))|function)) mapping(string|int:function(mixed, string, mixed ...:mixed)))
_nsps = ([]); nsps = ([]);
//! All SocketIO sessions in this universe.
final multiset(Client) clients = (<>);
//! @param _options //! @param _options
//! Optional options to override the defaults. //! Optional options to override the defaults.
...@@ -93,7 +92,7 @@ class Universe { ...@@ -93,7 +92,7 @@ class Universe {
//! //!
//!Protocols.SocketIO.Universe myuniverse; //!Protocols.SocketIO.Universe myuniverse;
//! //!
//!void echo(mixed id, function sendack, mixed ... data) { //!mixed echo(mixed id, string namespace, mixed ... data) {
//! id->write(data); //! id->write(data);
//! if (sendack) //! if (sendack)
//! sendack("Ack","me","harder"); //! sendack("Ack","me","harder");
...@@ -115,7 +114,7 @@ class Universe { ...@@ -115,7 +114,7 @@ class Universe {
//! //!
//!int main(int argc, array(string) argv) { //!int main(int argc, array(string) argv) {
//! myuniverse = Protocols.SocketIO.Universe(); // Create universe //! myuniverse = Protocols.SocketIO.Universe(); // Create universe
//! myuniverse.createnamespace("", echo); // Register root namespace //! myuniverse.register("", echo); // Register root namespace
//! Protocols.WebSocket.Port(httprequest, wsrequest, 80); //! Protocols.WebSocket.Port(httprequest, wsrequest, 80);
//! return -1; //! return -1;
//!} //!}
...@@ -131,79 +130,63 @@ class Universe { ...@@ -131,79 +130,63 @@ class Universe {
return con && Client(this, con); return con && Client(this, con);
} }
//! Create a new or update an existing namespace. private void low_register(string namespace, void|mixed event,
final void createnamespace(string namespace, void|function(mixed, string, mixed ...:mixed) fn) {
void|function(mixed, function(mixed, mixed ...:void), mixed ...:void) mapping nsp = nsps[namespace];
read_cb,
void|function(mixed:void) close_cb,
void|function(mixed:void) open_cb) {
array nsp = _nsps[namespace];
if (!nsp) if (!nsp)
_nsps[namespace] = ({(<>), ([]), read_cb, close_cb, open_cb}); nsps[namespace] = nsp = ([]);
else if(fn)
nsp[event] = fn;
else { else {
nsp[IREADCB] = read_cb; m_delete(nsp, event);
nsp[ICLOSECB] = close_cb; if (!sizeof(nsp))
nsp[IOPENCB] = open_cb; m_delete(nsps, namespace);
} }
} }
//! Drop a namespace. //! Register worker callback on namespace and event.
final void dropnamespace(string namespace) { //! There can be only one worker per tuple.
array nsp = _nsps[namespace]; //! Workers can be unregistered by omitting @expr{fn@}.
m_delete(_nsps, namespace); //! Having a default worker per namespace in addition zero or more
foreach (nsp[ICLIENTS];; Client client) //! event specific ones, is supported.
if (client) final variant void register(string namespace, string event,
client.close(); void|function(mixed, string, string, mixed ...:mixed) fn) {
low_register(namespace, event, fn);
} }
final variant void register(string namespace,
//! ack_cb can be specified as the first argument following namespace. void|function(mixed, string, mixed ...:mixed) fn) {
//! Returns the number of clients broadcast to. low_register(namespace, 0, fn);
final int broadcast(string namespace, mixed ... data) {
Client client;
int cnt = 0;
array nsp = _nsps[namespace];
foreach (nsp[ICLIENTS];; client)
cnt++, client.write(@data);
return cnt;
} }
//! private string invalidnamespace = "Invalid namespace";
final int connected(string namespace) {
array nsp = _nsps[namespace];
return sizeof(nsp[ICLIENTS]);
}
//! Connects and disconnects clients.
//! //!
final multiset clients(string namespace) { //! @returns
array nsp = _nsps[namespace]; //! 0 upon success, the error itself otherwise.
return nsp[ICLIENTS]; final mixed
connect(Client client, string oldnamespace, void|string newnamespace) {
clients[client] = !!newnamespace;
return nsps[newnamespace] ? 0 : invalidnamespace;
} }
//! Use the indices to get a list of the _nsps in use. //!
final mapping namespaces() { final mixed read(string namespace, mixed id,
return _nsps; function(mixed ...:void) sendackcb, mixed ... data) {
} function(mixed, string, mixed ...:mixed) fn;
mapping nsp = nsps[namespace];
private multiset getlisteners(string namespace, string event) { if (!nsp)
mapping events = _nsps[namespace][IEVENTS]; return "\"" + invalidnamespace + "\"";
multiset listeners = events[event]; if (!(fn = nsp[sizeof(data) && data[0]] || nsp[0]))
if (!listeners) return (sizeof(data) ? Standards.JSON.encode(data[0]) : "")
events[event] = listeners = (<>); + ",\"Unregistered event\"";
return listeners; mixed res = fn(id, namespace, @data);
} if (sendackcb)
if (arrayp(res))
//! Register listener to an event on a namespace. sendackcb(@res);
final void on(string namespace, string event, else
function(mixed, function(mixed, mixed ...:void), string, mixed ...:void) sendackcb(res);
event_cb) { return 0;
getlisteners(namespace, event)[event_cb] = 1;
}
//! Unregister listener to an event on a namespace.
final void off(string namespace, string event,
function(mixed, function(mixed, mixed ...:void), string, mixed ...:void)
event_cb) {
getlisteners(namespace, event)[event_cb] = 0;
} }
} }
...@@ -214,9 +197,6 @@ class Client { ...@@ -214,9 +197,6 @@ class Client {
//! //!
string namespace=""; string namespace="";
private function(mixed,
function(mixed, mixed ...:void), mixed ...:void) read_cb;
private function(mixed:void) open_cb, close_cb;
private mixed id; private mixed id;
private enum {RUNNING = 0, SDISCONNECT, RDISCONNECT}; private enum {RUNNING = 0, SDISCONNECT, RDISCONNECT};
private int state = RUNNING; private int state = RUNNING;
...@@ -249,19 +229,6 @@ class Client { ...@@ -249,19 +229,6 @@ class Client {
return conn.request; return conn.request;
} }
private void fetchcallbacks() {
array nsp = universe._nsps[namespace];
if (nsp) {
read_cb = nsp[IREADCB];
close_cb = nsp[ICLOSECB];
open_cb = nsp[IOPENCB];
} else {
read_cb = 0;
close_cb = 0;
open_cb = 0;
}
}
//! Send text or binary messages. //! Send text or binary messages.
final void write(mixed|function(mixed, mixed ...:void) ack_cb, final void write(mixed|function(mixed, mixed ...:void) ack_cb,
mixed ... data) { mixed ... data) {
...@@ -355,26 +322,25 @@ class Client { ...@@ -355,26 +322,25 @@ class Client {
} }
private void recvcb() { private void recvcb() {
int cackid = curackid; // Instantiate ackid in void readthread(string namespace, int cackid, mixed ... data) {
void sendackcb(mixed ... data) { // saved callback-stackframe void sendackcb(mixed ... data) { // saved callback-stackframe
PD("Ack %d %O\n", cackid, data); PD("Ack %d %O\n", cackid, data);
send(ACK, data, cackid); send(ACK, data, cackid);
}; };
if (sizeof(curevent) && stringp(curevent[0])) { string err = universe ? universe.read(namespace, query_id(),
multiset listeners = universe._nsps[namespace][IEVENTS][curevent[0]]; cackid >= 0 && sendackcb, @data) : "\"Unknown universe\"";
if (listeners && sizeof(listeners)) { if (err)
function(mixed ...:void) cachesendackcb = cackid >= 0 && sendackcb; send(ERROR, namespace + "," + err);
function(mixed, function(mixed, mixed ...:void), string, };
mixed ...:void) event_cb;
foreach (listeners; event_cb;)
event_cb(query_id(), cachesendackcb, @curevent);
return; // Skip the default callback
}
}
if (read_cb)
switch(curtype) { switch(curtype) {
default: { default: {
read_cb(query_id(), cackid >= 0 && sendackcb, @curevent); #if constant(Thread.Thread)
// Spawn one thread per message/callback.
// FIXME Too much overhead?
Thread.Thread(readthread, namespace, curackid, @curevent);
#else
readthread(namespace, curackid, @curevent);
#endif
break; break;
} }
case ACK: case ACK:
...@@ -389,19 +355,11 @@ class Client { ...@@ -389,19 +355,11 @@ class Client {
} }
} }
private void unregister() {
array nsp = universe._nsps[namespace];
if (nsp)
nsp[ICLIENTS][this] = 0;
}
private void closedown() { private void closedown() {
if (state < RDISCONNECT) { if (state < RDISCONNECT) {
close(); close();
state = RDISCONNECT; state = RDISCONNECT;
unregister(); universe.connect(this, namespace);
if (close_cb)
close_cb(query_id());
} }
} }
...@@ -423,22 +381,20 @@ class Client { ...@@ -423,22 +381,20 @@ class Client {
case ERROR: case ERROR:
SUSERERROR(data); // Pass error up SUSERERROR(data); // Pass error up
case CONNECT: case CONNECT:
if (universe._nsps[data]) { if (universe) {
unregister(); // Old namespace mixed e;
universe._nsps[namespace = data][ICLIENTS][this] = 1; if (e = universe.connect(this, namespace, data)) {
fetchcallbacks(); send(ERROR, data + "," + Standards.JSON.encode(e));
break;
}
state = RUNNING; state = RUNNING;
send(CONNECT, namespace); // Confirm namespace send(CONNECT, namespace = data); // Confirm namespace
if (open_cb)
open_cb(query_id());
} else } else
send(ERROR, data+",\"Invalid namespace\""); send(ERROR, data + ",\"No universe to register to\"");
break; break;
case DISCONNECT: case DISCONNECT:
closedown(); closedown();
close_cb = 0; universe = 0;
open_cb = 0;
read_cb = 0;
id = 0; // Delete all references to this Client id = 0; // Delete all references to this Client
break; break;
case EVENT: case EVENT:
...@@ -478,7 +434,6 @@ class Client { ...@@ -478,7 +434,6 @@ class Client {
protected void create(Universe _universe, Protocols.EngineIO.Socket _con) { protected void create(Universe _universe, Protocols.EngineIO.Socket _con) {
universe = _universe; universe = _universe;
conn = _con; conn = _con;
fetchcallbacks();
conn.set_callbacks(recv, closedown); conn.set_callbacks(recv, closedown);
send(CONNECT); // Autconnect to root namespace send(CONNECT); // Autconnect to root namespace
PD("New SocketIO sid: %O\n", sid); PD("New SocketIO sid: %O\n", sid);
...@@ -488,8 +443,8 @@ class Client { ...@@ -488,8 +443,8 @@ class Client {
string res=UNDEFINED; string res=UNDEFINED;
switch (type) { switch (type) {
case 'O': case 'O':
res = sprintf(DRIVERNAME"(%s.%d,%d,%d)", res = sprintf(DRIVERNAME"(%s.%d,%d)",
sid, protocol, state, sizeof(universe._nsps)); sid, protocol, state);
break; break;
} }
return res; return res;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment