Skip to content
Snippets Groups Projects
Commit 54d9ff10 authored by Martin Stjernholm's avatar Martin Stjernholm
Browse files

Fixed races that could cause synchronous calls to hang in threaded mode.

It's still unclear whether a connection close is handled cleanly, though.
Better handling of exceptions from close callbacks.

Rev: lib/modules/Remote.pmod/connection.pike:1.24
parent 4a59c05f
No related branches found
No related tags found
No related merge requests found
......@@ -154,7 +154,8 @@ void try_close()
if (closed) return;
#if constant(thread_create)
object rlock = read_cond_mutex->lock();
Thread.MutexKey fc_lock = finished_calls_cond_mutex->lock();
Thread.MutexKey wb_lock = write_buffer_cond_mutex->lock();
#endif
if (!(sizeof (pending_calls) || outstanding_calls || sizeof (write_buffer))) {
......@@ -170,12 +171,12 @@ void try_close()
con->write (sprintf("%4c%s", sizeof(s), s));
};
#endif
if (!catch {con->set_blocking(); con->close();}) {
catch {con->set_blocking(); con->close();};
#if constant(thread_create)
rlock = 0;
wb_lock = 0;
fc_lock = 0;
#endif
catch (closed_connection());
}
closed_connection();
}
else
DEBUGMSG("close delayed\n");
......@@ -184,22 +185,29 @@ void try_close()
void closed_connection(int|void ignore)
{
if (closed > 0) return;
closed=1;
DEBUGMSG("connection closed\n");
#if constant(thread_create)
write_cond->broadcast();
object lock = read_cond_mutex->lock();
read_cond->broadcast();
lock = 0;
Thread.MutexKey fc_lock = finished_calls_cond_mutex->lock();
Thread.MutexKey wb_lock = write_buffer_cond_mutex->lock();
closed=1;
write_buffer_cond->broadcast();
finished_calls_cond->broadcast();
calls->write(0);
wb_lock = 0;
fc_lock = 0;
#else
closed=1;
#endif
DEBUGMSG("connection closed\n");
foreach(close_callbacks, function|array f)
if (mixed err = catch {
if(functionp(f))
f();
else if (functionp(f[0]))
f[0](@f[1..]);
})
master()->handle_error (err);
}
string write_buffer = "";
......@@ -224,9 +232,18 @@ int write_some(int|void ignore)
}
#if constant(thread_create)
Thread.Condition write_cond = Thread.Condition();
Thread.Condition read_cond = Thread.Condition();
Thread.Mutex read_cond_mutex = Thread.Mutex();
// Locking order: finished_calls_cond_mutex before write_buffer_cond_mutex.
// This condition variable and mutex protects addition of new data to
// write_buffer and setting of the closed flag to nonzero.
Thread.Condition write_buffer_cond = Thread.Condition();
Thread.Mutex write_buffer_cond_mutex = Thread.Mutex();
// This condition variable and mutex protects addition of new entries
// to finished_calls and setting of the closed flag to nonzero.
Thread.Condition finished_calls_cond = Thread.Condition();
Thread.Mutex finished_calls_cond_mutex = Thread.Mutex();
Thread.Queue calls = Thread.Queue();
int call_threads;
#endif
......@@ -234,8 +251,10 @@ int call_threads;
void send(string s)
{
#if constant(thread_create)
Thread.MutexKey lock = write_buffer_cond_mutex->lock();
write_buffer += s;
write_cond->signal();
write_buffer_cond->signal();
lock = 0;
#else
string ob = write_buffer;
write_buffer += s;
......@@ -252,16 +271,14 @@ int request_size = 0;
void provide_result(int refno, mixed result)
{
#if constant(thread_create)
Thread.MutexKey lock = finished_calls_cond_mutex->lock();
finished_calls[ refno ] = result;
// if (functionp(pending_calls[refno]))
// {
// DEBUGMSG("calling completion function for request "+refno+"\n");
// pending_calls[refno]( result );
// }
// else
// {
// m_delete(pending_calls, refno);
// }
finished_calls_cond->broadcast();
lock = 0;
#else
finished_calls[ refno ] = result;
#endif
}
mixed get_result(int refno)
......@@ -394,7 +411,7 @@ void read_some(int ignore, string s)
DEBUGMSG("providing error for request "+refno+"\n");
if (pending_calls[refno]) {
errors[refno] = data[2][1];
finished_calls[refno] = 17;
provide_result(refno, 17);
}
else
werror ("Remote async error: " + data[2][1] + "\n");
......@@ -434,10 +451,13 @@ int read_once()
void write_thread()
{
DEBUGMSG("write_thread\n");
while( write_some() )
while( write_some() ) {
Thread.MutexKey lock = write_buffer_cond_mutex->lock();
if(!(strlen(write_buffer) || closed)) {
if (want_close) try_close();
write_cond->wait();
write_buffer_cond->wait(lock);
}
lock = 0;
}
closed_connection();
DEBUGMSG("write_thread exit\n");
......@@ -448,11 +468,7 @@ void read_thread()
DEBUGMSG("read_thread\n");
con->set_blocking();
thread_create( write_thread );
while( read_once() ) {
object lock = read_cond_mutex->lock();
read_cond->broadcast();
lock = 0;
}
while( read_once() ) {}
closed_connection();
DEBUGMSG("read_thread exit\n");
}
......@@ -501,16 +517,17 @@ mixed call_sync(array data)
int refno = data[4];
// werror("call_sync["+con->query_address()+"]["+refno+"] starting\n");
string s = encode_value(data);
#if constant(thread_create)
object lock = read_cond_mutex->lock();
#endif
pending_calls[refno] = 1;
mixed err = catch
{
send(sprintf("%4c%s", sizeof(s), s));
#if constant(thread_create)
object lock = finished_calls_cond_mutex->lock();
#endif
send(sprintf("%4c%s", sizeof(s), s)); // Locks write_buffer_cond_mutex.
#if constant(thread_create)
while(!closed && zero_type(finished_calls[refno]))
read_cond->wait(lock);
finished_calls_cond->wait(lock);
lock = 0;
#else
con->set_blocking();
while(!closed && zero_type(finished_calls[refno]))
......@@ -523,9 +540,6 @@ mixed call_sync(array data)
}
};
m_delete (pending_calls, refno);
#if constant(thread_create)
lock = 0;
#endif
mixed err2 = catch {
#if !constant(thread_create)
con->set_nonblocking(read_some, write_some, closed_connection);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment