diff --git a/lib/modules/Thread.pmod b/lib/modules/Thread.pmod index 41fa67c247db8295bc03be9593c8f9fd5c7b7006..950778131775acc20676c1948ea942bb740e33b1 100644 --- a/lib/modules/Thread.pmod +++ b/lib/modules/Thread.pmod @@ -1,6 +1,7 @@ constant Mutex=__builtin.mutex; constant Condition=__builtin.condition; + class Fifo { inherit Condition : r_cond; inherit Condition : w_cond; @@ -108,5 +109,239 @@ class Queue { key=0; // Must free this one _before_ the signal... r_cond::signal(); } -}; +} + + + +class Farm +{ + Mutex mutex = Mutex(); + Condition ft_cond = Condition(); + + class Result + { + int ready; + mixed value; + function done_cb; + + int status() + { + return ready; + } + + mixed result() + { + return value; + } + + mixed `()() + { + while(!ready) ft_cond->wait(); + if( ready < 0 ) throw( value ); + return value; + } + + void set_done_cb( function to ) + { + if( ready ) + to( value, ready<0 ); + else + done_cb = to; + } + + void provide_error( mixed what ) + { + value = what; + ready = -1; + if( done_cb ) + done_cb( what, 1 ); + } + + void provide( mixed what ) + { + ready = 1; + value = what; + if( done_cb ) + done_cb( what, 0 ); + } + } + + class Handler + { + Condition cond = Condition(); + array job; + object thread; + + float total_time; + int handled, max_time; + + static int ready; + + void handler() + { + array q; + while( 1 ) + { + ready = 1; + cond->wait(); + if( q = job ) + { + mixed res, err; + int st = gethrtime(); + if( err = catch(res = q[1][0]( @q[1][1] )) && q[0]) + q[0]->provide_error( err ); + else if( q[0] ) + q[0]->provide( res ); + object lock = mutex->lock(); + free_threads += ({ this_object() }); + lock = 0; + st = gethrtime()-st; + total_time += st/1000.0; + handled++; + job = 0; + if( st > max_time ) + max_time = st; + ft_cond->broadcast(); + } + } + } + + void run( array what, object|void resobj ) + { + while(!ready) sleep(0.1); + job = ({ resobj, what }); + cond->signal(); + } + + string debug_status() + { + return ("Thread:\n" + " Handled works: "+handled+"\n"+ + (handled? + " Average time: "+((int)(total_time / handled))+"ms\n" + " Max time: "+sprintf("%2.2fms\n", max_time/1000.0):"")+ + " Status: "+(job?"Working":"Idle" )+"\n"+ + (job? + (" "+ + replace( describe_backtrace(thread->backtrace()), + "\n", + "\n ")):"") + +"\n\n"); + } + + void create() + { + thread = thread_create( handler ); + } + } + + static array(Handler) threads = ({}); + array(Handler) free_threads = ({}); + int max_num_threads = 20; + + static Handler aquire_thread() + { + object lock = mutex->lock(); + while( !sizeof(free_threads) ) + { + if( sizeof(threads) < max_num_threads ) + { + threads += ({ Handler() }); + free_threads += ({ threads[-1] }); + } else { + lock = 0; + ft_cond->wait( ); + mutex->lock(); + } + } + object t = free_threads[0]; + free_threads = free_threads[1..]; + return t; + } + + Queue job_queue = Queue(); + + void dispatcher() + { + while( array q = job_queue->read() ) + aquire_thread()->run( q[1], q[0] ); + } + + object run( function f, mixed ... args ) + { + object ro = Result(); + job_queue->write( ({ 0, ({f, args }) }) ); + return ro; + } + + class ValueAdjuster + { + object r, r2; + mapping v; + int i; + + void go(mixed vn, int err) + { + r->value[ i ] = vn; + if( err ) + r->provide_error( err ); + if( !--v->num_left ) + r->provide( r->value ); + destruct(); + } + void create( object _1,object _2,int _3,mapping _4 ) + { + r = _1; r2 = _2; i=_3; v=_4; + } + } + + object run_multiple( array fun_args ) + { + Result r = Result(); // private result.. + r->value = allocate( sizeof( fun_args ) ); + mapping nl = ([ "num_left":sizeof( fun_args ) ]); + for( int i=0; i<sizeof( fun_args ); i++ ) + { + Result r2 = Result(); + r2->set_done_cb( ValueAdjuster( r, r2, i, nl )->go ); + job_queue->write( ({ r2, fun_args[i] }) ); + } + return r; + } + + void run_async( function f, mixed ... args ) + { + job_queue->write( ({ 0, ({f, args }) }) ); + } + + int set_max_num_threads( int to ) + { + int omnt = max_num_threads; + max_num_threads = to; + ft_cond->broadcast( ); + return omnt; + } + + string debug_status() + { + string res = sprintf("Thread farm\n" + " Max threads = %d\n" + " Current threads = %d\n" + " Working threads = %d\n" + " Jobs in queue = %d\n\n", + max_num_threads, sizeof(threads), + (sizeof(threads)-sizeof(free_threads)), + job_queue->size() ); + + foreach( threads, Handler t ) + res += t->debug_status(); + return res; + } + + void create() + { + thread_create( dispatcher ); + } +} +