diff --git a/lib/modules/Remote.pmod/Client.pike b/lib/modules/Remote.pmod/Client.pike new file mode 100644 index 0000000000000000000000000000000000000000..704dea0550081678c692cf650cb5ee310c5c4bbe --- /dev/null +++ b/lib/modules/Remote.pmod/Client.pike @@ -0,0 +1,36 @@ + +#include "remote.h" + +int connected = 0; +object con; +function close_callback = 0; + +object get(string name) +{ + if(!connected) + error("Not connected"); + return con->get_named_object(name); +} + + +void create(string host, int port, void|int nice, int ...timeout) +{ + con = Connection(nice); + if(!con->connect(host, port, @timeout)) + error("Could not connect to server"); + connected = 1; + con->closed = 0; +} + +void provide(string name, mixed thing) +{ + con->ctx->add(name, thing); +} + +void set_close_callback(function f) +{ + if(close_callback) + con->remove_close_callback(close_callback); + close_callback = f; + con->add_close_callback(f); +} diff --git a/lib/modules/Remote.pmod/Server.pike b/lib/modules/Remote.pmod/Server.pike new file mode 100644 index 0000000000000000000000000000000000000000..478c4fd6afe4e84c32a3456eed40abf5c9118a46 --- /dev/null +++ b/lib/modules/Remote.pmod/Server.pike @@ -0,0 +1,71 @@ + +#include "remote.h" + +int portno; +object port; +array connections = ({ }); +object sctx; + +class Minicontext +{ + mapping(string:mixed) id2val = ([ ]); + mapping(mixed:string) val2id = ([ ]); + + string id_for(mixed thing) + { + return val2id[thing]; + } + + object object_for(string id, object con) + { + object o = id2val[id]; + if(functionp(o) || programp(o)) + o = o(con); + if(objectp(o) && functionp(o->close)) + con->add_close_callback(o->close); + return o; + } + + void add(string name, object|program what) + { + id2val[name] = what; + val2id[what] = name; + } +} + +void got_connection(object f) +{ + object c = f->accept(); + object con = Connection(); + object ctx = Context(gethostname()+"-"+portno); + if (!c) + error("accept failed"); + con->start_server(c, ctx); + ctx->set_server_context(sctx, con); + connections += ({ con }); +} + +void create(string host, int p) +{ + portno = p; + port = Stdio.Port(); + port->set_id(port); + if(host) + { + if(!port->bind(p, got_connection, host)) + throw(({"Failed to bind to port\n", backtrace()})); + } + else if(!port->bind(p, got_connection, host)) + throw(({"Failed to bind to port\n", backtrace()})); + + if(!portno) + sscanf(port->query_address(), "%*s %d", portno); + + sctx = Minicontext(); +} + +void provide(string name, mixed thing) +{ + DEBUGMSG("providing "+name+"\n"); + sctx->add(name, thing); +} diff --git a/lib/modules/Remote.pmod/call.pike b/lib/modules/Remote.pmod/call.pike new file mode 100644 index 0000000000000000000000000000000000000000..2fa3abc55e687d0a102247edd91c857474317f99 --- /dev/null +++ b/lib/modules/Remote.pmod/call.pike @@ -0,0 +1,49 @@ + +#include "remote.h" + +string objectid; +string name; +object con; +object ctx; +int _async; + +mixed `() (mixed ... args) +{ + mixed data = ctx->encode_call(objectid, name, args, _async); + if (_async) + con->call_async(data); + else + return con->call_sync(data); + return 0; +} + +mixed sync(mixed ... args) +{ + mixed data = ctx->encode_call(objectid, name, args, _async); + return con->call_sync(data); +} + +void async(mixed ... args) +{ + mixed data = ctx->encode_call(objectid, name, args, 1); + con->call_async(data); +} + +int is_async() +{ + return _async; +} + +void set_async(int a) +{ + _async = a; +} + +void create(string oid, string n, object cn, object ct, int a) +{ + objectid = oid; + name = n; + con = cn; + ctx = ct; + _async = a; +} diff --git a/lib/modules/Remote.pmod/connection.pike b/lib/modules/Remote.pmod/connection.pike new file mode 100644 index 0000000000000000000000000000000000000000..77fb91f2eab71f6304cb3df0884c315400c41b0a --- /dev/null +++ b/lib/modules/Remote.pmod/connection.pike @@ -0,0 +1,317 @@ + +#include "remote.h" + +int closed; +object con; +object ctx; +array(function) close_callbacks = ({ }); + +int nice; // don't throw from call_sync + +void handshake(int ignore, string s); +void read_some(int ignore, string s); +void write_some(int|void ignore); +void closed_connection(int|void ignore); + +// - create + +void create(void|int _nice) +{ + nice=_nice; +} + +// - connect +// +// This function is called by clients to connect to a server. +// +int connect(string host, int port, int ... timeout) +{ + string s, sv; + int end_time=time()+(sizeof(timeout)?(timeout[0]||1):60); + + DEBUGMSG("connecting to "+host+":"+port+"...\n"); + + if(con) + error("Already connected to "+con->query_address()); + + con = Stdio.File(); + if(!con->connect(host, port)) + return 0; + DEBUGMSG("connected\n"); + con->write("Pike remote client "+PROTO_VERSION+"\n"); + s=""; + con->set_nonblocking(); + for (;;) + { + s += (con->read(24-strlen(s),1)||""); + if (strlen(s)==24) break; + sleep(0.02); + if (time()>end_time) + { + con->close(); + return 0; + } + } + if((sscanf(s,"Pike remote server %4s\n", sv) == 1) && (sv == PROTO_VERSION)) + { + ctx = Context(replace(con->query_address(1), " ", "-"), this_object()); + con->set_nonblocking(read_some, write_some, closed_connection); + return 1; + } + return 0; +} + +// - start_server +// +// This function is called by servers when they have got a connection +// from a client. The first argument is the connection file object, and +// the second argument is the context to be used. +// +void start_server(object c, object cx) +{ + DEBUGMSG("starting server\n"); + if(con) + error("Already connected to "+con->query_address()); + + con = c; + con->write("Pike remote server "+PROTO_VERSION+"\n"); + ctx = cx; + + con->set_nonblocking(handshake, write_some, closed_connection); +} + +// - add_close_callback +// +// Add a function that is called when the connection is closed. +// +void add_close_callback(function f, mixed ... args) +{ + if(sizeof(args)) + close_callbacks += ({ ({f})+args }); + else + close_callbacks += ({ f }); +} + +// - remove_close_callback +// +// Remove a function that is called when the connection is closed. +// +void remove_close_callback(array f) +{ + close_callbacks -= ({ f }); +} + + +void closed_connection(int|void ignore) +{ + DEBUGMSG("connection closed\n"); + foreach(close_callbacks, function|array f) + if(functionp(f)) + f(); + else + f[0](@f[1..]); + closed=1; +} + +string write_buffer = ""; +void write_some(int|void ignore) +{ + if(closed) { + write_buffer=""; + return; + } + int c; + if (!sizeof(write_buffer)) + return; + c = con->write(write_buffer); + if(c <= 0) return; + write_buffer = write_buffer[c..]; + DEBUGMSG("wrote "+c+" bytes\n"); +} + +void send(string s) +{ + string ob = write_buffer; + write_buffer += s; + if(!strlen(ob)) write_some(); +} + +mapping pending_calls = ([ ]); +mapping finished_calls = ([ ]); +string read_buffer = ""; +int request_size = 0; + +void provide_result(int refno, mixed result) +{ + if (functionp(pending_calls[refno])) + { + DEBUGMSG("calling completion function for request "+refno+"\n"); + pending_calls[refno]( result ); + } + else + { + finished_calls[refno] = result; + m_delete(pending_calls, refno); + } +} + +mixed get_result(int refno) +{ + mixed r = finished_calls[refno]; + if (zero_type(r)) + error("Tried to get a result too early"); + m_delete(finished_calls, refno); + return r; +} + + +void return_error(int refno, mixed err) +{ + string s = encode_value(ctx->encode_error_return(refno, + describe_backtrace(err))); + send(sprintf("%4c%s", sizeof(s), s)); +} + +void return_value(int refno, mixed val) +{ + string s = encode_value(ctx->encode_return(refno, val)); + DEBUGMSG("return "+strlen(s)+" bytes ["+refno+"]\n"); + send(sprintf("%4c%s", sizeof(s), s)); +} + +void handshake(int ignore, string s) +{ + DEBUGMSG("handshake read "+sizeof(s)+" bytes\n"); + read_buffer += s; + if (sizeof(read_buffer) >= 24) + { + string proto; + if ((sscanf(read_buffer, "Pike remote client %4s\n", proto) == 1) && + (proto == PROTO_VERSION)) + { + DEBUGMSG("handshake complete (proto="+proto+")\n"); + read_buffer = read_buffer[24..]; + con->set_read_callback(read_some); + read_some(0,""); + } + else + con->close(); + } +} + +void read_some(int ignore, string s) +{ + if (!s) s = ""; + DEBUGMSG("read "+sizeof(s)+" bytes\n"); + read_buffer += s; + DEBUGMSG("has "+sizeof(read_buffer)+" bytes\n"); + if(!strlen(read_buffer)) return; + + if (!request_size && sizeof(read_buffer) > 4) + { + sscanf(read_buffer, "%4c%s", request_size, read_buffer); + } + + if (request_size && sizeof(read_buffer) >= request_size) + { + array data = decode_value(read_buffer[0..request_size-1]); + read_buffer = read_buffer[request_size..]; + request_size = 0; + DEBUGMSG("got message: "+ctx->describe(data)+"\n"); + switch(data[0]) { + + case CTX_ERROR: + throw(({ "Remote error: "+data[1]+"\n", backtrace() })); + + case CTX_CALL_SYNC: // a synchrounous call + int refno = data[4]; + object|function f = ctx->decode_call(data); + array args = ctx->decode(data[3]); + mixed res; + mixed e = catch { res = f(@args); }; + if (e) + return_error(refno, e); + else + return_value(refno, res); + break; + + case CTX_CALL_ASYNC: // an asynchrounous call + int refno = data[4]; + object|function f = ctx->decode_call(data); + array args = ctx->decode(data[3]); + mixed e = catch { f(@args); }; + if (e) + return_error(refno, e); + break; + + case CTX_RETURN: // a returned value + int refno = data[1]; + mixed result = ctx->decode(data[2]); + if (!pending_calls[refno]) + error("Got return for odd call: "+refno+"\n"); + DEBUGMSG("providing the result for request "+refno+": "+ + ctx->describe(data)+"\n"); + provide_result(refno, result); + break; + + default: + error("Unknown message"); + } + if(sizeof(read_buffer) > 4) read_some(ignore,""); + } +} + + +// - call_sync +// +// Make a call and wait for the result +// +mixed call_sync(array data) +{ + if(closed) { + error("connection closed\n"); + } + int refno = data[4]; + string s = encode_value(data); + con->set_blocking(); + DEBUGMSG("call_sync "+ctx->describe(data)+"\n"); + pending_calls[refno] = 17; // a mutex lock key maybe? + send(sprintf("%4c%s", sizeof(s), s)); + while(zero_type(finished_calls[refno])) + { + string s = con->read(8192,1); + if(!s || !strlen(s)) + { + closed_connection(); + if (!nice) + error("Could not read"); + else + return ([])[0]; // failed, like + } + read_some(0,s); + } + con->set_nonblocking(read_some, write_some, closed_connection); + return get_result(refno); +} + +// - call_async +// +// Make a call but don't wait for the result +// +void call_async(array data) +{ + if(closed) error("connection closed\n"); + string s = encode_value(data); + DEBUGMSG("call_async "+ctx->describe(data)+"\n"); + send(sprintf("%4c%s", sizeof(s), s)); +} + +// - get_named_object +// +// Get a named object provided by the server. +// +object get_named_object(string name) +{ + DEBUGMSG("getting "+name+"\n"); + return ctx->object_for(name); +} diff --git a/lib/modules/Remote.pmod/context.pike b/lib/modules/Remote.pmod/context.pike new file mode 100644 index 0000000000000000000000000000000000000000..87f9e84b89e0d7b94da3d1e2203ec10512854a97 --- /dev/null +++ b/lib/modules/Remote.pmod/context.pike @@ -0,0 +1,199 @@ + +#include "remote.h" + +object server_context; +object con; + +string base; +int counter; + +mapping id2val = ([ ]); +mapping val2id = ([ ]); +mapping other = ([ ]); + +string id_for(mixed thing) +{ + string id; + if(id=val2id[thing]) + return id; + + if(server_context && (id=server_context->id_for(thing))) + return id; + + val2id[thing] = id = (base+(counter++)); + id2val[id] = thing; + return id; +} + +object object_for(string id) +{ + object o; + if(o=id2val[id]) + return o; + if(o=other[id]) + return o; + if(server_context && (o=server_context->object_for(id, con))) + return other[id]=o; + return other[id] = Obj(id, con, this_object()); +} + +object function_for(string id) +{ + object o; + if(o=id2val[id]) + return o; + if(o=other[id]) + return o; + return other[id] = Call(0, id, con, this_object(), 0); +} + +// Encoding: +// +// error -> ({ CTX_ERROR, data }) +// object -> ({ CTX_OBJECT, id }) +// function -> ({ CTX_FUNCTION, id }) +// program -> ({ CTX_PROGRAM, id }) +// other -> ({ CTX_OTHER, data }) +// call -> ({ CTX_CALL_SYNC, ... }) +// array -> ({ CTX_ARRAY, a }) +// return -> ({ CTX_RETURN, id, data }) +// mapping -> ({ CTX_MAPPING, m }) +// + +array encode(mixed val) +{ + if (objectp(val)) + return ({ CTX_OBJECT, id_for(val) }); + if (functionp(val) || programp(val)) + return ({ CTX_FUNCTION, id_for(val) }); + if (arrayp(val)) + return ({ CTX_ARRAY, Array.map(val, encode) }); + if (mappingp(val)) + { + mapping m = ([ ]); + foreach(indices(val), mixed i) + m[i] = encode(val[i]); + return ({ CTX_MAPPING, m }); + } + return ({ CTX_OTHER, val }); +} + +array encode_error(string e) +{ + return ({ CTX_ERROR, gethostname()+":"+replace(e, "\n", "\n"+gethostname()+":") }); +} + +array encode_error_return(int id, string e) +{ + return ({ CTX_RETURN, id, encode_error(e) }); +} + +array encode_return(int id, mixed val) +{ + return ({ CTX_RETURN, id, encode(val) }); +} + +mixed decode(array a) +{ + // werror(sprintf("decode(%O)\n", a)); + switch(a[0]) { + case CTX_ERROR: + throw(({ "Remote error: "+a[1]+"\n", backtrace() })); + case CTX_OTHER: + return a[1]; + case CTX_OBJECT: + return object_for(a[1]); + case CTX_FUNCTION: + return function_for(a[1]); + case CTX_ARRAY: + return Array.map(a[1], decode); + case CTX_MAPPING: + mapping m = ([ ]); + foreach(indices(a[1]), mixed i) + m[i] = decode(a[1][i]); + return m; + } +} + +array encode_call(object|string o, string|function f, array args, int async) +{ + int type = (async ? CTX_CALL_ASYNC : CTX_CALL_SYNC); + if(objectp(o)) + if(stringp(f)) + return ({ type, encode(o), f, encode(args), counter++ }); + else + error("If the first argument is an object, the second must be a string"); + else if(stringp(o)) + if(stringp(f)) + return ({ type, ({ CTX_OBJECT, o }), f, encode(args), counter++ }); + else + error("If the first argument is an object reference, " + "the second must be a string"); + else if(o) + error("Error in arguments"); + else if(functionp(f)||programp(f)) + return ({ type, 0, encode(f), encode(args), counter++ }); + else if(stringp(f)) + return ({ type, 0, ({ CTX_FUNCTION, f }), encode(args), counter++ }); + error("Error in arguments"); +} + +function decode_call(array data) +{ + if((data[0] != CTX_CALL_SYNC) && (data[0] != CTX_CALL_ASYNC)) + error("This is not a call"); + if(data[1]) + { + object o = decode(data[1]); + return o[data[2]]; + } + else + return decode(data[2]); +} + +void add(object o, string id) +{ + id2val[id] = o; + val2id[o] = id; +} + +string describe(array data) +{ + switch(data[0]) { + case CTX_ERROR: + return "ERROR "+sprintf("%O",data[1]); + case CTX_OTHER: + if(stringp(data[1])) + return "\""+data[1]+"\""; + return (string)data[1]; + case CTX_OBJECT: + return "<object "+data[1]+">"; + case CTX_FUNCTION: + return "<function "+data[1]+">"; + case CTX_CALL_SYNC: + case CTX_CALL_ASYNC: + return (data[1] ? data[1][1]+"->"+data[2] : "<function "+data[2][1]+">") + + "(" + Array.map(data[3][1], describe)*"," + ") ["+data[4]+" "+ + (data[0]==CTX_CALL_SYNC ? "sync" : "async" ) + "]"; + case CTX_RETURN: + return "return["+data[1]+"]: " + describe(data[2]); + case CTX_MAPPING: + return "<mapping "+sizeof(indices(data[1]))+">"; + case CTX_ARRAY: + return "<array "+sizeof(data[1])+">"; + } + return "<unknown "+data[0]+">"; +} + +void set_server_context(object ctx, object cn) +{ + server_context = ctx; + con = cn; +} + +void create(string b, object|void cn) +{ + con = cn; + base = b; + counter = 0; +} diff --git a/lib/modules/Remote.pmod/obj.pike b/lib/modules/Remote.pmod/obj.pike new file mode 100644 index 0000000000000000000000000000000000000000..61d9c803c99d2a2f491605943eda0d67332eea11 --- /dev/null +++ b/lib/modules/Remote.pmod/obj.pike @@ -0,0 +1,33 @@ + +#include "remote.h" + +string id; +object con; +object ctx; + +mapping calls = ([ ]); + +mixed get_function(string f) +{ + object call = calls[f]; + if (!call) + call = calls[f] = Call(id, f, con, ctx, 0); + return call; +} + +mixed `[] (string f) +{ + return get_function(f); +} + +mixed `-> (string f) +{ + return get_function(f); +} + +void create(string i, object cn, object ct) +{ + id = i; + con = cn; + ctx = ct; +} diff --git a/lib/modules/Remote.pmod/remote.h b/lib/modules/Remote.pmod/remote.h new file mode 100644 index 0000000000000000000000000000000000000000..f04d4d5859726af8368948d12be6be02fdebfb24 --- /dev/null +++ b/lib/modules/Remote.pmod/remote.h @@ -0,0 +1,29 @@ +// -*- Pike -*- + +#define PROTO_VERSION "0001" + +#define error(E) throw( ({ E+"\n" , backtrace() }) ) + +// #define REMOTE_DEBUG + +#ifdef REMOTE_DEBUG +# define DEBUGMSG(X) werror(X) +#else +# define DEBUGMSG(X) +#endif + +program Context = ((program)"context"); +program Connection = ((program)"connection"); +program Call = ((program)"call"); +program Obj = ((program)"obj"); + +#define CTX_ERROR 0 +#define CTX_OTHER 1 +#define CTX_OBJECT 2 +#define CTX_FUNCTION 3 +#define CTX_PROGRAM 3 +#define CTX_CALL_SYNC 4 +#define CTX_ARRAY 5 +#define CTX_RETURN 6 +#define CTX_MAPPING 7 +#define CTX_CALL_ASYNC 8 diff --git a/src/ChangeLog b/src/ChangeLog index 11586920e83de5987efd74337a797c7f51e5fbba..ae269f30497ad93d905b1c7b2b91db63bc48f0cb 100644 --- a/src/ChangeLog +++ b/src/ChangeLog @@ -1,3 +1,7 @@ +1998-10-20 Peter Bortas <peter@idonex.se> + + * modules/Remote.pmod/*: Added Remote to the standard dist. + Sun Sep 6 01:32:48 1998 Henrik Grubbstr�m <grubba@idonex.se> * modules/HTTPAccept/timeout.c (aap_exit_timeouts): Added function.