diff --git a/lib/modules/Remote.pmod/connection.pike b/lib/modules/Remote.pmod/connection.pike index cb235794069b9e3e72605c9affe4e8db8393fa5b..361f24b9e9a07a45b28cee2a888b345c58bbe44c 100644 --- a/lib/modules/Remote.pmod/connection.pike +++ b/lib/modules/Remote.pmod/connection.pike @@ -8,11 +8,6 @@ array(function) close_callbacks = ({ }); int nice; // don't throw from call_sync -void handshake(int ignore, string s); -void read_some(int ignore, string s); -void write_some(int|void ignore); -void closed_connection(int|void ignore); - // - create void create(void|int _nice) @@ -51,11 +46,16 @@ int connect(string host, int port, int ... timeout) return 0; } } - if((sscanf(s,"Pike remote server %4s\n", sv) == 1) && (sv == PROTO_VERSION)) + if((sscanf(s,"Pike remote server %4s\n", sv) == 1) && + (sv == PROTO_VERSION)) { DEBUGMSG("connected\n"); ctx = Context(replace(con->query_address(1), " ", "-"), this_object()); +#if constant(thread_create) + thread_create( read_thread ); +#else con->set_nonblocking(read_some, write_some, closed_connection); +#endif return 1; } return 0; @@ -135,48 +135,64 @@ void closed_connection(int|void ignore) } string write_buffer = ""; -void write_some(int|void ignore) +int write_some(int|void ignore) { if(closed) { write_buffer=""; - return; + return 0; } if (sizeof (write_buffer)) { int c; c = con->write(write_buffer); - if(c <= 0) return; + if(c <= 0) return 0; write_buffer = write_buffer[c..]; DEBUGMSG("wrote "+c+" bytes\n"); } if (want_close && !(reading || sizeof(write_buffer))) if (!catch {con->set_blocking(); con->close();}) + { closed_connection(); + return 0; + } + return 1; } +#if constant( Thread.Condition ) +Thread.Condition write_cond = Thread.Condition(); +Thread.Condition read_cond = Thread.Condition(); +#endif + void send(string s) { +#if constant( Thread.Condition ) string ob = write_buffer; +#endif write_buffer += s; - if(!strlen(ob)) write_some(); +#if constant( Thread.Condition ) + write_cond->signal(); +#else + if(!strlen(ob)) + write_some(); +#endif } -mapping pending_calls = ([ ]); +// mapping pending_calls = ([ ]); mapping finished_calls = ([ ]); string read_buffer = ""; int request_size = 0; void provide_result(int refno, mixed result) { - if (functionp(pending_calls[refno])) - { - DEBUGMSG("calling completion function for request "+refno+"\n"); - pending_calls[refno]( result ); - } - else - { +// if (functionp(pending_calls[refno])) +// { +// DEBUGMSG("calling completion function for request "+refno+"\n"); +// pending_calls[refno]( result ); +// } +// else +// { finished_calls[refno] = result; - m_delete(pending_calls, refno); - } +// m_delete(pending_calls, refno); +// } } mixed get_result(int refno) @@ -249,6 +265,9 @@ void read_some(int ignore, string s) throw(({ "Remote error: "+data[1]+"\n", backtrace() })); case CTX_CALL_SYNC: // a synchronous call +#if constant(thread_create) + call_out( lambda(){ +#endif int refno = data[4]; object|function f = ctx->decode_call(data); array args = ctx->decode(data[3]); @@ -260,9 +279,16 @@ void read_some(int ignore, string s) } else return_value(refno, res); +#if constant(thread_create) + }, 0 ); +#endif break; case CTX_CALL_ASYNC: // an asynchronous call +// werror("async\n"); +#if constant(thread_create) + call_out( lambda(){ +#endif int refno = data[4]; object|function f = ctx->decode_call(data); array args = ctx->decode(data[3]); @@ -271,13 +297,14 @@ void read_some(int ignore, string s) catch (e[1] = e[1][sizeof(backtrace())..]); return_error(refno, e); } +#if constant(thread_create) + }, 0 ); +#endif break; case CTX_RETURN: // a returned value int refno = data[1]; mixed result = ctx->decode(data[2]); - if (!pending_calls[refno]) - error("Got return for odd call: "+refno+"\n"); DEBUGMSG("providing the result for request "+refno+": "+ ctx->describe(data)+"\n"); provide_result(refno, result); @@ -300,51 +327,75 @@ object(Thread.Mutex) block_read_mutex = Thread.Mutex(); // // Make a call and wait for the result // + +object reading_thread; + +int read_once() +{ + string s = con->read( 8192, 1 ); + if( !s || !strlen(s) ) + { + closed_connection( 0 ); + return 0; + } + read_some( 0, s ); + return 1; +} + +#if constant(thread_create) +void write_thread() +{ + while( write_some() ) + if(!strlen(write_buffer)) + write_cond->wait(); + closed_connection(); +} + +void read_thread() +{ + con->set_blocking(); + thread_create( write_thread ); + while( read_once() ) + read_cond->broadcast(); + closed_connection(); +} +#endif + +int block; mixed call_sync(array data) { if(closed) { error("connection closed\n"); } int refno = data[4]; +// werror("call_sync["+con->query_address()+"]["+refno+"] starting\n"); string s = encode_value(data); - con->set_blocking(); - mixed err = catch { + mixed err = catch + { DEBUGMSG("call_sync "+ctx->describe(data)+"\n"); - pending_calls[refno] = 17; // a mutex lock key maybe? +// pending_calls[refno] = 17; // a mutex lock key maybe? send(sprintf("%4c%s", sizeof(s), s)); +#if constant(thread_create) while(zero_type(finished_calls[refno])) - { -#if constant(Thread.Mutex) - // Only one thread does read(), the rest just waits. When the - // read() finishes, all threads loop once. - object lock = block_read_mutex->trylock(); - if (lock) { -#endif - string s = con->read(8192,1); - if(s && strlen(s)) read_some(0,s); - else - { -#if constant(Thread.Mutex) - lock = 0; -#endif - if (!catch (con->close())) - closed_connection(); - if (!nice) - error("Could not read"); - else - return ([])[0]; // failed, like - } -#if constant(Thread.Mutex) - lock = 0; - } - else block_read_mutex->lock(); + read_cond->wait(); +#else + con->set_blocking(); + while(zero_type(finished_calls[refno])) + read_once(); #endif - } }; - mixed err2 = catch { + mixed err2; +#if !constant(thread_create) + err2 = catch { con->set_nonblocking(read_some, write_some, closed_connection); }; - if (err || err2) throw (err || err2); +#endif + if (err || err2) + { + catch(get_result(refno)); + throw (err || err2); + } +// werror("call_sync["+con->query_address()+"]["+refno+"] done\n"); return get_result(refno); } @@ -354,7 +405,8 @@ mixed call_sync(array data) // void call_async(array data) { - if(closed) error("connection closed\n"); + if(closed) + error("connection closed\n"); string s = encode_value(data); DEBUGMSG("call_async "+ctx->describe(data)+"\n"); send(sprintf("%4c%s", sizeof(s), s));