Skip to content
Snippets Groups Projects
Commit cb49dd2d authored by Per Hedbor's avatar Per Hedbor
Browse files

Added Thread.Farm

Rev: lib/modules/Thread.pmod:1.15
parent 1b3ee6c7
No related branches found
No related tags found
No related merge requests found
constant Mutex=__builtin.mutex; constant Mutex=__builtin.mutex;
constant Condition=__builtin.condition; constant Condition=__builtin.condition;
class Fifo { class Fifo {
inherit Condition : r_cond; inherit Condition : r_cond;
inherit Condition : w_cond; inherit Condition : w_cond;
...@@ -108,5 +109,239 @@ class Queue { ...@@ -108,5 +109,239 @@ class Queue {
key=0; // Must free this one _before_ the signal... key=0; // Must free this one _before_ the signal...
r_cond::signal(); r_cond::signal();
} }
}; }
class Farm
{
Mutex mutex = Mutex();
Condition ft_cond = Condition();
class Result
{
int ready;
mixed value;
function done_cb;
int status()
{
return ready;
}
mixed result()
{
return value;
}
mixed `()()
{
while(!ready) ft_cond->wait();
if( ready < 0 ) throw( value );
return value;
}
void set_done_cb( function to )
{
if( ready )
to( value, ready<0 );
else
done_cb = to;
}
void provide_error( mixed what )
{
value = what;
ready = -1;
if( done_cb )
done_cb( what, 1 );
}
void provide( mixed what )
{
ready = 1;
value = what;
if( done_cb )
done_cb( what, 0 );
}
}
class Handler
{
Condition cond = Condition();
array job;
object thread;
float total_time;
int handled, max_time;
static int ready;
void handler()
{
array q;
while( 1 )
{
ready = 1;
cond->wait();
if( q = job )
{
mixed res, err;
int st = gethrtime();
if( err = catch(res = q[1][0]( @q[1][1] )) && q[0])
q[0]->provide_error( err );
else if( q[0] )
q[0]->provide( res );
object lock = mutex->lock();
free_threads += ({ this_object() });
lock = 0;
st = gethrtime()-st;
total_time += st/1000.0;
handled++;
job = 0;
if( st > max_time )
max_time = st;
ft_cond->broadcast();
}
}
}
void run( array what, object|void resobj )
{
while(!ready) sleep(0.1);
job = ({ resobj, what });
cond->signal();
}
string debug_status()
{
return ("Thread:\n"
" Handled works: "+handled+"\n"+
(handled?
" Average time: "+((int)(total_time / handled))+"ms\n"
" Max time: "+sprintf("%2.2fms\n", max_time/1000.0):"")+
" Status: "+(job?"Working":"Idle" )+"\n"+
(job?
(" "+
replace( describe_backtrace(thread->backtrace()),
"\n",
"\n ")):"")
+"\n\n");
}
void create()
{
thread = thread_create( handler );
}
}
static array(Handler) threads = ({});
array(Handler) free_threads = ({});
int max_num_threads = 20;
static Handler aquire_thread()
{
object lock = mutex->lock();
while( !sizeof(free_threads) )
{
if( sizeof(threads) < max_num_threads )
{
threads += ({ Handler() });
free_threads += ({ threads[-1] });
} else {
lock = 0;
ft_cond->wait( );
mutex->lock();
}
}
object t = free_threads[0];
free_threads = free_threads[1..];
return t;
}
Queue job_queue = Queue();
void dispatcher()
{
while( array q = job_queue->read() )
aquire_thread()->run( q[1], q[0] );
}
object run( function f, mixed ... args )
{
object ro = Result();
job_queue->write( ({ 0, ({f, args }) }) );
return ro;
}
class ValueAdjuster
{
object r, r2;
mapping v;
int i;
void go(mixed vn, int err)
{
r->value[ i ] = vn;
if( err )
r->provide_error( err );
if( !--v->num_left )
r->provide( r->value );
destruct();
}
void create( object _1,object _2,int _3,mapping _4 )
{
r = _1; r2 = _2; i=_3; v=_4;
}
}
object run_multiple( array fun_args )
{
Result r = Result(); // private result..
r->value = allocate( sizeof( fun_args ) );
mapping nl = ([ "num_left":sizeof( fun_args ) ]);
for( int i=0; i<sizeof( fun_args ); i++ )
{
Result r2 = Result();
r2->set_done_cb( ValueAdjuster( r, r2, i, nl )->go );
job_queue->write( ({ r2, fun_args[i] }) );
}
return r;
}
void run_async( function f, mixed ... args )
{
job_queue->write( ({ 0, ({f, args }) }) );
}
int set_max_num_threads( int to )
{
int omnt = max_num_threads;
max_num_threads = to;
ft_cond->broadcast( );
return omnt;
}
string debug_status()
{
string res = sprintf("Thread farm\n"
" Max threads = %d\n"
" Current threads = %d\n"
" Working threads = %d\n"
" Jobs in queue = %d\n\n",
max_num_threads, sizeof(threads),
(sizeof(threads)-sizeof(free_threads)),
job_queue->size() );
foreach( threads, Handler t )
res += t->debug_status();
return res;
}
void create()
{
thread_create( dispatcher );
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment