From 54d9ff103eedcefe13a5984e1475b58b52a1fd92 Mon Sep 17 00:00:00 2001 From: Martin Stjernholm <mast@lysator.liu.se> Date: Mon, 30 Sep 2002 22:41:08 +0200 Subject: [PATCH] Fixed races that could cause synchronous calls to hang in threaded mode. It's still unclear whether a connection close is handled cleanly, though. Better handling of exceptions from close callbacks. Rev: lib/modules/Remote.pmod/connection.pike:1.24 --- lib/modules/Remote.pmod/connection.pike | 104 ++++++++++++++---------- 1 file changed, 59 insertions(+), 45 deletions(-) diff --git a/lib/modules/Remote.pmod/connection.pike b/lib/modules/Remote.pmod/connection.pike index d1f827cf44..929edd0aa2 100644 --- a/lib/modules/Remote.pmod/connection.pike +++ b/lib/modules/Remote.pmod/connection.pike @@ -154,7 +154,8 @@ void try_close() if (closed) return; #if constant(thread_create) - object rlock = read_cond_mutex->lock(); + Thread.MutexKey fc_lock = finished_calls_cond_mutex->lock(); + Thread.MutexKey wb_lock = write_buffer_cond_mutex->lock(); #endif if (!(sizeof (pending_calls) || outstanding_calls || sizeof (write_buffer))) { @@ -170,12 +171,12 @@ void try_close() con->write (sprintf("%4c%s", sizeof(s), s)); }; #endif - if (!catch {con->set_blocking(); con->close();}) { + catch {con->set_blocking(); con->close();}; #if constant(thread_create) - rlock = 0; + wb_lock = 0; + fc_lock = 0; #endif - catch (closed_connection()); - } + closed_connection(); } else DEBUGMSG("close delayed\n"); @@ -184,22 +185,29 @@ void try_close() void closed_connection(int|void ignore) { if (closed > 0) return; - closed=1; - DEBUGMSG("connection closed\n"); #if constant(thread_create) - write_cond->broadcast(); - object lock = read_cond_mutex->lock(); - read_cond->broadcast(); - lock = 0; + Thread.MutexKey fc_lock = finished_calls_cond_mutex->lock(); + Thread.MutexKey wb_lock = write_buffer_cond_mutex->lock(); + closed=1; + write_buffer_cond->broadcast(); + finished_calls_cond->broadcast(); calls->write(0); + wb_lock = 0; + fc_lock = 0; +#else + closed=1; #endif + DEBUGMSG("connection closed\n"); foreach(close_callbacks, function|array f) - if(functionp(f)) - f(); - else if (functionp(f[0])) - f[0](@f[1..]); + if (mixed err = catch { + if(functionp(f)) + f(); + else if (functionp(f[0])) + f[0](@f[1..]); + }) + master()->handle_error (err); } string write_buffer = ""; @@ -224,9 +232,18 @@ int write_some(int|void ignore) } #if constant(thread_create) -Thread.Condition write_cond = Thread.Condition(); -Thread.Condition read_cond = Thread.Condition(); -Thread.Mutex read_cond_mutex = Thread.Mutex(); +// Locking order: finished_calls_cond_mutex before write_buffer_cond_mutex. + +// This condition variable and mutex protects addition of new data to +// write_buffer and setting of the closed flag to nonzero. +Thread.Condition write_buffer_cond = Thread.Condition(); +Thread.Mutex write_buffer_cond_mutex = Thread.Mutex(); + +// This condition variable and mutex protects addition of new entries +// to finished_calls and setting of the closed flag to nonzero. +Thread.Condition finished_calls_cond = Thread.Condition(); +Thread.Mutex finished_calls_cond_mutex = Thread.Mutex(); + Thread.Queue calls = Thread.Queue(); int call_threads; #endif @@ -234,8 +251,10 @@ int call_threads; void send(string s) { #if constant(thread_create) + Thread.MutexKey lock = write_buffer_cond_mutex->lock(); write_buffer += s; - write_cond->signal(); + write_buffer_cond->signal(); + lock = 0; #else string ob = write_buffer; write_buffer += s; @@ -252,16 +271,14 @@ int request_size = 0; void provide_result(int refno, mixed result) { - finished_calls[ refno ] = result; -// if (functionp(pending_calls[refno])) -// { -// DEBUGMSG("calling completion function for request "+refno+"\n"); -// pending_calls[refno]( result ); -// } -// else -// { -// m_delete(pending_calls, refno); -// } +#if constant(thread_create) + Thread.MutexKey lock = finished_calls_cond_mutex->lock(); + finished_calls[ refno ] = result; + finished_calls_cond->broadcast(); + lock = 0; +#else + finished_calls[ refno ] = result; +#endif } mixed get_result(int refno) @@ -394,7 +411,7 @@ void read_some(int ignore, string s) DEBUGMSG("providing error for request "+refno+"\n"); if (pending_calls[refno]) { errors[refno] = data[2][1]; - finished_calls[refno] = 17; + provide_result(refno, 17); } else werror ("Remote async error: " + data[2][1] + "\n"); @@ -434,11 +451,14 @@ int read_once() void write_thread() { DEBUGMSG("write_thread\n"); - while( write_some() ) + while( write_some() ) { + Thread.MutexKey lock = write_buffer_cond_mutex->lock(); if(!(strlen(write_buffer) || closed)) { if (want_close) try_close(); - write_cond->wait(); + write_buffer_cond->wait(lock); } + lock = 0; + } closed_connection(); DEBUGMSG("write_thread exit\n"); } @@ -448,11 +468,7 @@ void read_thread() DEBUGMSG("read_thread\n"); con->set_blocking(); thread_create( write_thread ); - while( read_once() ) { - object lock = read_cond_mutex->lock(); - read_cond->broadcast(); - lock = 0; - } + while( read_once() ) {} closed_connection(); DEBUGMSG("read_thread exit\n"); } @@ -501,16 +517,17 @@ mixed call_sync(array data) int refno = data[4]; // werror("call_sync["+con->query_address()+"]["+refno+"] starting\n"); string s = encode_value(data); -#if constant(thread_create) - object lock = read_cond_mutex->lock(); -#endif pending_calls[refno] = 1; mixed err = catch { - send(sprintf("%4c%s", sizeof(s), s)); +#if constant(thread_create) + object lock = finished_calls_cond_mutex->lock(); +#endif + send(sprintf("%4c%s", sizeof(s), s)); // Locks write_buffer_cond_mutex. #if constant(thread_create) while(!closed && zero_type(finished_calls[refno])) - read_cond->wait(lock); + finished_calls_cond->wait(lock); + lock = 0; #else con->set_blocking(); while(!closed && zero_type(finished_calls[refno])) @@ -523,9 +540,6 @@ mixed call_sync(array data) } }; m_delete (pending_calls, refno); -#if constant(thread_create) - lock = 0; -#endif mixed err2 = catch { #if !constant(thread_create) con->set_nonblocking(read_some, write_some, closed_connection); -- GitLab