Skip to content
Snippets Groups Projects
Commit d8a00fa1 authored by Henrik (Grubba) Grubbström's avatar Henrik (Grubba) Grubbström
Browse files

Thread.Farm: Fixed several bugs.

The support for asynchronous results was broken in several ways.

Also added some initial documentation.
parent 4cf3b50d
Branches
Tags
No related merge requests found
......@@ -429,29 +429,44 @@ optional class Queue {
}
//! A thread farm.
optional class Farm
{
protected Mutex mutex = Mutex();
protected Condition ft_cond = Condition();
protected Queue job_queue = Queue();
//! An asynchronous result.
class Result
{
int ready;
mixed value;
function done_cb;
//! @returns
//! @int
//! @value 1
//! Returns @expr{1@} when the result is available.
//! @value 0
//! Returns @expr{0@} (zero) when the result hasn't
//! arrived yet.
//! @value -1
//! Returns negative on failure.
//! @endint
int status()
{
return ready;
}
//! @returns
//! Returns the result if available, a backtrace on failure,
//! and @expr{0@} (zero) otherwise.
mixed result()
{
return value;
}
//! Wait for completion.
mixed `()()
{
object key = mutex->lock();
......@@ -461,6 +476,15 @@ optional class Farm
return value;
}
//! Register a callback to be called when
//! the result is available.
//!
//! @param to
//! Callback to be called. The first
//! argument to the callback will be
//! the result or the failure backtrace,
//! and the second @expr{0@} (zero) on
//! success, and @expr{1@} on failure.
void set_done_cb( function to )
{
if( ready )
......@@ -469,6 +493,10 @@ optional class Farm
done_cb = to;
}
//! Register a failure.
//!
//! @param what
//! The corresponding backtrace.
void provide_error( mixed what )
{
value = what;
......@@ -477,6 +505,10 @@ optional class Farm
done_cb( what, 1 );
}
//! Register a completed result.
//!
//! @param what
//! The result to register.
void provide( mixed what )
{
ready = 1;
......@@ -498,6 +530,7 @@ optional class Farm
}
}
//! A worker thread.
protected class Handler
{
Mutex job_mutex = Mutex();
......@@ -556,6 +589,7 @@ optional class Farm
key = 0;
}
//! Get some statistics about the worker thread.
string debug_status()
{
return ("Thread:\n"
......@@ -624,16 +658,43 @@ optional class Farm
{
void go(mixed vn, int err)
{
if (!r->status()) {
([array]r->value)[ i ] = vn;
if( err )
r->provide_error( err );
if( !--v->num_left )
r->provide( r->value );
}
destruct();
}
}
object run_multiple( array fun_args )
//! Register multiple jobs.
//!
//! @param fun_args
//! An array of arrays where the first element
//! is a function to call, and the second is
//! a corresponding array of arguments.
//!
//! @returns
//! Returns a @[Result] object with an array
//! with one element for the result for each
//! of the functions in @[fun_args].
//!
//! @note
//! Do not modify the elements of @[fun_args]
//! before the result is available.
//!
//! @note
//! If any of the functions in @[fun_args] throws
//! and error, all of the accululated results
//! (if any) will be dropped from the result, and
//! the first backtrace be provided.
//!
//! @seealso
//! @[run_multiple_async()]
Result run_multiple( array(array(function|array)) fun_args )
{
Result r = Result(); // private result..
r->value = allocate( sizeof( fun_args ) );
......@@ -648,6 +709,20 @@ optional class Farm
}
//! Register multiple jobs where the return values
//! are to be ignored.
//!
//! @param fun_args
//! An array of arrays where the first element
//! is a function to call, and the second is
//! a corresponding array of arguments.
//!
//! @note
//! Do not modify the elements of @[fun_args]
//! before the result is available.
//!
//! @seealso
//! @[run_multiple()]
void run_multiple_async( array fun_args )
{
for( int i=0; i<sizeof( fun_args ); i++ )
......@@ -655,19 +730,62 @@ optional class Farm
}
object run( function f, mixed ... args )
//! Register a job for the thread farm.
//!
//! @param f
//! Function to call with @@@[args] to
//! perform the job.
//!
//! @param args
//! The parameters for @[f].
//!
//! @returns
//! Returns a @[Result] object for the job.
//!
//! @note
//! In Pike 7.8 and earlier this function
//! was broken and returned a @[Result]
//! object that wasn't connected to the job.
//!
//! @seealso
//! @[run_async()]
Result run( function f, mixed ... args )
{
object ro = Result();
job_queue->write( ({ 0, ({f, args }) }) );
Result ro = Result();
job_queue->write( ({ ro, ({f, args }) }) );
return ro;
}
//! Register a job for the thread farm
//! where the return value from @[f] is
//! ignored.
//!
//! @param f
//! Function to call with @@@[args] to
//! perform the job.
//!
//! @param args
//! The parameters for @[f].
//!
//! @seealso
//! @[run()]
void run_async( function f, mixed ... args )
{
job_queue->write( ({ 0, ({f, args }) }) );
}
int set_max_num_threads( int to )
//! Set the maximum number of worker threads
//! that the thread farm may have.
//!
//! @param to
//! The new maximum number.
//!
//! If there are more worker threads than @[to],
//! the function will wait until enough threads
//! have finished, so that the total is @[to] or less.
//!
//! The default maximum number of worker threads is @expr{20@}.
int set_max_num_threads( int(1..) to )
{
int omnt = max_num_threads;
if( to <= 0 )
......@@ -687,6 +805,7 @@ optional class Farm
return omnt;
}
//! Get some statistics for the thread farm.
string debug_status()
{
string res = sprintf("Thread farm\n"
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment