diff --git a/lib/modules/Thread.pmod b/lib/modules/Thread.pmod index 950778131775acc20676c1948ea942bb740e33b1..7e5f13b69ff0ea608d3ee8f81c10c4bd9efc52e2 100644 --- a/lib/modules/Thread.pmod +++ b/lib/modules/Thread.pmod @@ -115,9 +115,6 @@ class Queue { class Farm { - Mutex mutex = Mutex(); - Condition ft_cond = Condition(); - class Result { int ready; @@ -166,7 +163,7 @@ class Farm } } - class Handler + static class Handler { Condition cond = Condition(); array job; @@ -202,6 +199,13 @@ class Farm if( st > max_time ) max_time = st; ft_cond->broadcast(); + } else { + object lock = mutex->lock(); + threads -= ({ this_object() }); + free_threads -= ({ this_object() }); + lock = 0; + destruct(); + return; } } } @@ -235,9 +239,13 @@ class Farm } } + static Mutex mutex = Mutex(); + static Condition ft_cond = Condition(); + static Queue job_queue = Queue(); + static array(Handler) threads = ({}); - array(Handler) free_threads = ({}); - int max_num_threads = 20; + static array(Handler) free_threads = ({}); + static int max_num_threads = 20; static Handler aquire_thread() { @@ -259,22 +267,14 @@ class Farm return t; } - Queue job_queue = Queue(); - void dispatcher() + static void dispatcher() { while( array q = job_queue->read() ) aquire_thread()->run( q[1], q[0] ); } - object run( function f, mixed ... args ) - { - object ro = Result(); - job_queue->write( ({ 0, ({f, args }) }) ); - return ro; - } - - class ValueAdjuster + static class ValueAdjuster { object r, r2; mapping v; @@ -309,6 +309,21 @@ class Farm return r; } + + void run_multiple_async( array fun_args ) + { + for( int i=0; i<sizeof( fun_args ); i++ ) + job_queue->write( ({ 0, fun_args[i] }) ); + } + + + object run( function f, mixed ... args ) + { + object ro = Result(); + job_queue->write( ({ 0, ({f, args }) }) ); + return ro; + } + void run_async( function f, mixed ... args ) { job_queue->write( ({ 0, ({f, args }) }) ); @@ -317,7 +332,20 @@ class Farm int set_max_num_threads( int to ) { int omnt = max_num_threads; + if( to <= 0 ) + error("Illegal argument 1 to set_max_num_threads," + "num_threads must be > 0\n"); + max_num_threads = to; + while( sizeof( threads ) > max_num_threads ) + { + object key = mutex->lock(); + while( sizeof( free_threads ) ) + free_threads[0]->cond->signal(); + key = 0; + if( sizeof( threads ) > max_num_threads) + ft_cond->wait(); + } ft_cond->broadcast( ); return omnt; }