diff --git a/lib/modules/Thread.pmod b/lib/modules/Thread.pmod index e9816e1864150d737019937b28a7ea061ee72ec8..52687023807142874394ce4a3ca97647f4ddd9c0 100644 --- a/lib/modules/Thread.pmod +++ b/lib/modules/Thread.pmod @@ -429,29 +429,44 @@ optional class Queue { } - +//! A thread farm. optional class Farm { protected Mutex mutex = Mutex(); protected Condition ft_cond = Condition(); protected Queue job_queue = Queue(); + //! An asynchronous result. class Result { int ready; mixed value; function done_cb; + //! @returns + //! @int + //! @value 1 + //! Returns @expr{1@} when the result is available. + //! @value 0 + //! Returns @expr{0@} (zero) when the result hasn't + //! arrived yet. + //! @value -1 + //! Returns negative on failure. + //! @endint int status() { return ready; } + //! @returns + //! Returns the result if available, a backtrace on failure, + //! and @expr{0@} (zero) otherwise. mixed result() { return value; } + //! Wait for completion. mixed `()() { object key = mutex->lock(); @@ -461,6 +476,15 @@ optional class Farm return value; } + //! Register a callback to be called when + //! the result is available. + //! + //! @param to + //! Callback to be called. The first + //! argument to the callback will be + //! the result or the failure backtrace, + //! and the second @expr{0@} (zero) on + //! success, and @expr{1@} on failure. void set_done_cb( function to ) { if( ready ) @@ -469,6 +493,10 @@ optional class Farm done_cb = to; } + //! Register a failure. + //! + //! @param what + //! The corresponding backtrace. void provide_error( mixed what ) { value = what; @@ -476,7 +504,11 @@ optional class Farm if( done_cb ) done_cb( what, 1 ); } - + + //! Register a completed result. + //! + //! @param what + //! The result to register. void provide( mixed what ) { ready = 1; @@ -498,6 +530,7 @@ optional class Farm } } + //! A worker thread. protected class Handler { Mutex job_mutex = Mutex(); @@ -556,6 +589,7 @@ optional class Farm key = 0; } + //! Get some statistics about the worker thread. string debug_status() { return ("Thread:\n" @@ -624,23 +658,50 @@ optional class Farm { void go(mixed vn, int err) { - ([array]r->value)[ i ] = vn; - if( err ) - r->provide_error( err ); - if( !--v->num_left ) - r->provide( r->value ); + if (!r->status()) { + ([array]r->value)[ i ] = vn; + if( err ) + r->provide_error( err ); + if( !--v->num_left ) + r->provide( r->value ); + } destruct(); } } - object run_multiple( array fun_args ) + + //! Register multiple jobs. + //! + //! @param fun_args + //! An array of arrays where the first element + //! is a function to call, and the second is + //! a corresponding array of arguments. + //! + //! @returns + //! Returns a @[Result] object with an array + //! with one element for the result for each + //! of the functions in @[fun_args]. + //! + //! @note + //! Do not modify the elements of @[fun_args] + //! before the result is available. + //! + //! @note + //! If any of the functions in @[fun_args] throws + //! and error, all of the accululated results + //! (if any) will be dropped from the result, and + //! the first backtrace be provided. + //! + //! @seealso + //! @[run_multiple_async()] + Result run_multiple( array(array(function|array)) fun_args ) { Result r = Result(); // private result.. r->value = allocate( sizeof( fun_args ) ); mapping nl = ([ "num_left":sizeof( fun_args ) ]); for( int i=0; i<sizeof( fun_args ); i++ ) { - Result r2 = Result(); + Result r2 = Result(); r2->set_done_cb( ValueAdjuster( r, r2, i, nl )->go ); job_queue->write( ({ r2, fun_args[i] }) ); } @@ -648,6 +709,20 @@ optional class Farm } + //! Register multiple jobs where the return values + //! are to be ignored. + //! + //! @param fun_args + //! An array of arrays where the first element + //! is a function to call, and the second is + //! a corresponding array of arguments. + //! + //! @note + //! Do not modify the elements of @[fun_args] + //! before the result is available. + //! + //! @seealso + //! @[run_multiple()] void run_multiple_async( array fun_args ) { for( int i=0; i<sizeof( fun_args ); i++ ) @@ -655,19 +730,62 @@ optional class Farm } - object run( function f, mixed ... args ) + //! Register a job for the thread farm. + //! + //! @param f + //! Function to call with @@@[args] to + //! perform the job. + //! + //! @param args + //! The parameters for @[f]. + //! + //! @returns + //! Returns a @[Result] object for the job. + //! + //! @note + //! In Pike 7.8 and earlier this function + //! was broken and returned a @[Result] + //! object that wasn't connected to the job. + //! + //! @seealso + //! @[run_async()] + Result run( function f, mixed ... args ) { - object ro = Result(); - job_queue->write( ({ 0, ({f, args }) }) ); + Result ro = Result(); + job_queue->write( ({ ro, ({f, args }) }) ); return ro; } + //! Register a job for the thread farm + //! where the return value from @[f] is + //! ignored. + //! + //! @param f + //! Function to call with @@@[args] to + //! perform the job. + //! + //! @param args + //! The parameters for @[f]. + //! + //! @seealso + //! @[run()] void run_async( function f, mixed ... args ) { job_queue->write( ({ 0, ({f, args }) }) ); } - int set_max_num_threads( int to ) + //! Set the maximum number of worker threads + //! that the thread farm may have. + //! + //! @param to + //! The new maximum number. + //! + //! If there are more worker threads than @[to], + //! the function will wait until enough threads + //! have finished, so that the total is @[to] or less. + //! + //! The default maximum number of worker threads is @expr{20@}. + int set_max_num_threads( int(1..) to ) { int omnt = max_num_threads; if( to <= 0 ) @@ -687,6 +805,7 @@ optional class Farm return omnt; } + //! Get some statistics for the thread farm. string debug_status() { string res = sprintf("Thread farm\n"