Skip to content
Snippets Groups Projects
Thread.pmod 19.56 KiB
#pike __REAL_VERSION__

#if constant(thread_create)
constant Thread=__builtin.thread_id;

// The reason for this inherit is rather simple.
// It's now possible to write Thread Thread( ... );
//
// This makes the interface look somewhat more thought-through.
//
inherit Thread;

// We don't want to create a thread of the module...
static void create(mixed ... args)
{
}

optional Thread `()( mixed f, mixed ... args )
{
  return thread_create( f, @args );
}

optional constant MutexKey=__builtin.mutex_key;
optional constant Mutex=__builtin.mutex;
optional constant Condition=__builtin.condition;
optional constant _Disabled=__builtin.threads_disabled;
optional constant Local=__builtin.thread_local;

optional constant thread_create = predef::thread_create;

optional constant this_thread = predef::this_thread;

optional constant all_threads = predef::all_threads;



//! @[Fifo] implements a fixed length first-in, first-out queue.
//! A fifo is a queue of values and is often used as a stream of data
//! between two threads.
//!
//! @seealso
//!   @[Queue]
//!
optional class Fifo {
  inherit Condition : r_cond;
  inherit Condition : w_cond;
  inherit Mutex : lock;
  
  array buffer;
  int ptr, num;
  int read_tres, write_tres;

  //! This function returns the number of elements currently in the fifo.
  //!
  //! @seealso
  //!   @[read()], @[write()]
  //!
  int size() {  return num; }

  static nomask mixed read_unlocked()
  {
    mixed tmp=buffer[ptr];
    buffer[ptr++] = 0;	// Throw away any references.
    ptr%=sizeof(buffer);
    if(read_tres < sizeof(buffer))
    {
      if(num-- == read_tres)
	w_cond::broadcast();
    }else{
      num--;
      w_cond::broadcast();
    }
    return tmp;
  }

  //! This function retrieves a value from the fifo. Values will be
  //! returned in the order they were written. If there are no values
  //! present in the fifo the current thread will sleep until some other
  //! thread writes one.
  //!
  //! @seealso
  //!   @[try_read()], @[read_array()], @[write()]
  //!
  mixed read()
  {
    object key=lock::lock();
    while(!num) r_cond::wait(key);
    mixed res = read_unlocked();
    key = 0;
    return res;
  }

  //! This function retrieves a value from the fifo if there is any
  //! there. Values will be returned in the order they were written.
  //! If there are no values present in the fifo then @[UNDEFINED]
  //! will be returned.
  //!
  //! @seealso
  //!   @[read()]
  //!
  mixed try_read()
  {
    if (!num) return UNDEFINED;
    object key=lock::lock();
    if (!num) return UNDEFINED;
    mixed res = read_unlocked();
    key = 0;
    return res;
  }

  static nomask array read_all_unlocked()
  {
    array ret;

    switch (num) {
      case 0:
	ret = ({});
	break;

      case 1:
	ret=buffer[ptr..ptr];
	buffer[ptr++] = 0;	// Throw away any references.
	ptr%=sizeof(buffer);
	num = 0;
	w_cond::broadcast();
	break;

      default:
	if (ptr+num < sizeof(buffer)) {
	  ret = buffer[ptr..ptr+num-1];
	} else {
	  ret = buffer[ptr..]+buffer[..num-(sizeof(buffer)-ptr)-1];
	}
	ptr=num=0;
	buffer=allocate(sizeof(buffer)); // Throw away any references.
	w_cond::broadcast();
	break;
    }

    return ret;
  }

  //! This function returns all values in the fifo as an array. The
  //! values in the array will be in the order they were written. If
  //! there are no values present in the fifo the current thread will
  //! sleep until some other thread writes one.
  //!
  //! @seealso
  //!   @[read()], @[try_read_array()]
  //!
  array read_array()
  {
    object key=lock::lock();
    while(!num) r_cond::wait(key);
    array ret = read_all_unlocked();
    key = 0;
    return ret;
  }

  //! This function returns all values in the fifo as an array but
  //! doesn't wait if there are no values there. The values in the
  //! array will be in the order they were written.
  //!
  //! @seealso
  //!   @[read_array()]
  //!
  array try_read_array()
  {
    if (!num) return ({});
    object key=lock::lock();
    array ret = read_all_unlocked();
    key = 0;
    return ret;
  }

  static nomask void write_unlocked (mixed value)
  {
    buffer[(ptr + num) % sizeof(buffer)] = value;
    if(write_tres)
    {
      if(num++ == write_tres)
	r_cond::broadcast();
    }else{
      num++;
      r_cond::broadcast();
    }
  }

  //! Append a @[value] to the end of the fifo. If there is no more
  //! room in the fifo the current thread will sleep until space is
  //! available. The number of items in the queue after the write is
  //! returned.
  //!
  //! @seealso
  //!   @[read()]
  //!
  int write(mixed value)
  {
    object key=lock::lock();
    while(num == sizeof(buffer)) w_cond::wait(key);
    write_unlocked (value);
    int items = num;
    key = 0;
    return items;
  }

  //! Append a @[value] to the end of the fifo. If there is no more
  //! room in the fifo then zero will be returned, otherwise the
  //! number of items in the fifo after the write is returned.
  //!
  //! @seealso
  //!   @[read()]
  //!
  int try_write(mixed value)
  {
    if (num == sizeof (buffer)) return 0;
    object key=lock::lock();
    if (num == sizeof (buffer)) return 0;
    write_unlocked (value);
    int items = num;
    key = 0;
    return items;
  }

  //! @decl void create()
  //! @decl void create(int size)
  //!
  //! Create a fifo. If the optional @[size] argument is present it
  //! sets how many values can be written to the fifo without blocking.
  //! The default @[size] is 128.
  //!
  static void create(int|void size)
  {
    write_tres=0;
    buffer=allocate(read_tres=size || 128);
  }

  static string _sprintf( int f )
  {
    return f=='O' && sprintf( "%O(%d / %d)", this_program,
			      size(), read_tres );
  }
}

//! @[Queue] implements a queue, or a pipeline. The main difference
//! between @[Queue] and @[Fifo] is that @[Queue]
//! will never block in write(), only allocate more memory.
//!
//! @seealso
//!   @[Fifo]
//!
optional class Queue {
  inherit Condition : r_cond;
  inherit Mutex : lock;
  
  array buffer=allocate(16);
  int r_ptr, w_ptr;
  
  //! This function returns the number of elements currently in the queue.
  //!
  //! @seealso
  //!   @[read()], @[write()]
  //!
  int size() {  return w_ptr - r_ptr;  }

  //! This function retrieves a value from the queue. Values will be
  //! returned in the order they were written. If there are no values
  //! present in the queue the current thread will sleep until some other
  //! thread writes one.
  //!
  //! @seealso
  //!   @[try_read()], @[write()]
  //!
  mixed read()
  {
    mixed tmp;
    object key=lock::lock();
    while(w_ptr == r_ptr) r_cond::wait(key);
    tmp=buffer[r_ptr];
    buffer[r_ptr++] = 0;	// Throw away any references.
    key=0;
    return tmp;
  }

  //! This function retrieves a value from the queue if there is any
  //! there. Values will be returned in the order they were written.
  //! If there are no values present in the fifo then @[UNDEFINED]
  //! will be returned.
  //!
  //! @seealso
  //!   @[write()]
  //!
  mixed try_read()
  {
    if (w_ptr == r_ptr) return UNDEFINED;
    object key=lock::lock();
    if (w_ptr == r_ptr) return UNDEFINED;
    mixed tmp=buffer[r_ptr];
    buffer[r_ptr++] = 0;	// Throw away any references.
    key=0;
    return tmp;
  }

  static nomask array read_all_unlocked()
  {
    array ret;

    switch (w_ptr - r_ptr) {
      case 0:
	ret = ({});
	break;

      case 1:
	ret=buffer[r_ptr..r_ptr];
	buffer[r_ptr++] = 0;	// Throw away any references.
	break;

      default:
	ret = buffer[r_ptr..w_ptr];
	r_ptr = w_ptr = 0;
	buffer=allocate(sizeof(buffer)); // Throw away any references.
	break;
    }

    return ret;
  }

  //! This function returns all values in the queue as an array. The
  //! values in the array will be in the order they were written. If
  //! there are no values present in the queue the current thread will
  //! sleep until some other thread writes one.
  //!
  //! @seealso
  //!   @[read()], @[try_read_array()]
  //!
  array read_array()
  {
    object key=lock::lock();
    while (w_ptr == r_ptr) r_cond::wait(key);
    array ret = read_all_unlocked();
    key = 0;
    return ret;
  }

  //! This function returns all values in the queue as an array but
  //! doesn't wait if there are no values there. The values in the
  //! array will be in the order they were written.
  //!
  //! @seealso
  //!   @[read_array()]
  //!
  array try_read_array()
  {
    if (w_ptr == r_ptr) return ({});
    object key=lock::lock();
    array ret = read_all_unlocked();
    key = 0;
    return ret;
  }

  //! This function puts a @[value] last in the queue. If the queue is
  //! too small to hold the @[value] it will be expanded to make room.
  //! The number of items in the queue after the write is returned.
  //!
  //! @seealso
  //!   @[read()]
  //!
  int write(mixed value)
  {
    object key=lock::lock();
    if(w_ptr >= sizeof(buffer))
    {
      buffer=buffer[r_ptr..];
      buffer+=allocate(sizeof(buffer)+1);
      w_ptr-=r_ptr;
      r_ptr=0;
    }
    buffer[w_ptr] = value;
    w_ptr++;
    int items = w_ptr - r_ptr;
    r_cond::signal();
    key=0;
    return items;
  }

  static string _sprintf( int f )
  {
    return f=='O' && sprintf( "%O(%d)", this_program, size() );
  }
}



optional class Farm
{
  static Mutex mutex = Mutex();
  static Condition ft_cond = Condition();
  static Queue job_queue = Queue();

  class Result
  {
    int ready;
    mixed value;
    function done_cb;

    int status()
    {
      return ready;
    }

    mixed result()
    {
      return value;
    }

    mixed `()()
    {
      object key = mutex->lock();
      while(!ready)     ft_cond->wait(key);
      key = 0;
      if( ready < 0 )   throw( value );
      return value;
    }

    void set_done_cb( function to )
    {
      if( ready )
        to( value, ready<0 );
      else
        done_cb = to;
    }

    void provide_error( mixed what )
    {
      value = what;
      ready = -1;
      if( done_cb )
        done_cb( what, 1 );
    }
      
    void provide( mixed what )
    {
      ready = 1;
      value = what;
      if( done_cb )
        done_cb( what, 0 );
    }


    static string _sprintf( int f )
    {
      switch( f )
      {
	case 't':
	  return "Thread.Farm().Result";
	case 'O':
	  return sprintf( "%t(%d %O)", this, ready, value );
      }
    }
  }

  static class Handler
  {
    Mutex job_mutex = Mutex();
    Condition cond = Condition();
    array(object|array(function|array)) job;
    object thread;

    float total_time;
    int handled, max_time;

    static int ready;

    void handler()
    {
      array(object|array(function|array)) q;
      object key = job_mutex->lock();
      ready = 1;
      while( 1 )
      {
        cond->wait(key);
        if( q = job )
        {
          mixed res, err;
          int st = gethrtime();
          if( err = catch(res = q[1][0]( @q[1][1] )) && q[0])
            ([object]q[0])->provide_error( err );
          else if( q[0] )
            ([object]q[0])->provide( res );
          object lock = mutex->lock();
          free_threads += ({ this });
          lock = 0;
          st = gethrtime()-st;
          total_time += st/1000.0;
          handled++;
          job = 0;
          if( st > max_time )
            max_time = st;
          ft_cond->broadcast();
        } else  {
          object lock = mutex->lock();
          threads -= ({ this });
          free_threads -= ({ this });
          lock = 0;
          destruct();
          return;
        }
      }
    }

    void run( array(function|array) what, object|void resobj )
    {
      while(!ready) sleep(0.1);
      object key = job_mutex->lock();
      job = ({ resobj, what });
      cond->signal();
      key = 0;
    }

    string debug_status()
    {
      return ("Thread:\n"
              " Handled works: "+handled+"\n"+
              (handled?
               " Average time:  "+((int)(total_time / handled))+"ms\n"
               " Max time:      "+sprintf("%2.2fms\n", max_time/1000.0):"")+
              " Status:        "+(job?"Working":"Idle" )+"\n"+
              (job?
               ("    "+
                replace( describe_backtrace(thread->backtrace()),
                         "\n",
                         "\n    ")):"")
              +"\n\n");
    }

    static void create()
    {
      thread = thread_create( handler );
    }


    static string _sprintf( int f )
    {
      switch( f )
      {
	case 't':
	  return "Thread.Farm().Handler";
	case 'O':
	  return sprintf( "%t(%f / %d,  %d)", this,
			  total_time, max_time, handled );
      }
    }
  }

  static array(Handler) threads = ({});
  static array(Handler) free_threads = ({});
  static int max_num_threads = 20;

  static Handler aquire_thread()
  {
    object lock = mutex->lock();
    while( !sizeof(free_threads) )
    {
      if( sizeof(threads) < max_num_threads )
      {
        threads += ({ Handler() });
        free_threads += ({ threads[-1] });
      } else {
        ft_cond->wait(mutex);
      }
    }
    object(Handler) t = free_threads[0];
    free_threads = free_threads[1..];
    return t;
  }
        

  static void dispatcher()
  {
    while( array q = [array]job_queue->read() )
      aquire_thread()->run( q[1], q[0] );
  }

  static class ValueAdjuster( object r, object r2, int i, mapping v )
  {
    void go(mixed vn, int err)
    {
      ([array]r->value)[ i ] = vn;
      if( err )
        r->provide_error( err );
      if( !--v->num_left )
        r->provide( r->value );
      destruct();
    }
  }

  object run_multiple( array fun_args )
  {
    Result r = Result(); // private result..
    r->value = allocate( sizeof( fun_args ) );
    mapping nl = ([ "num_left":sizeof( fun_args ) ]);
    for( int i=0; i<sizeof( fun_args ); i++ )
    {
      Result  r2 = Result();
      r2->set_done_cb( ValueAdjuster( r, r2, i, nl )->go );
      job_queue->write( ({ r2, fun_args[i] }) );
    }
    return r;
  }


  void run_multiple_async( array fun_args )
  {
    for( int i=0; i<sizeof( fun_args ); i++ )
      job_queue->write( ({ 0, fun_args[i] }) );
  }


  object run( function f, mixed ... args )
  {
    object ro = Result();
    job_queue->write( ({ 0, ({f, args }) }) );
    return ro;
  }

  void run_async( function f, mixed ... args )
  {
    job_queue->write( ({ 0, ({f, args }) }) );
  }
  int set_max_num_threads( int to )
  {
    int omnt = max_num_threads;
    if( to <= 0 )
      error("Illegal argument 1 to set_max_num_threads,"
            "num_threads must be > 0\n");

    max_num_threads = to;
    while( sizeof( threads ) > max_num_threads )
    {
      object key = mutex->lock();
      while( sizeof( free_threads ) )
        free_threads[0]->cond->signal();
      if( sizeof( threads ) > max_num_threads)
        ft_cond->wait(key);
    }
    ft_cond->broadcast( );
    return omnt;
  }

  string debug_status()
  {
    string res = sprintf("Thread farm\n"
                         "  Max threads     = %d\n"
                         "  Current threads = %d\n"
                         "  Working threads = %d\n"
                         "  Jobs in queue   = %d\n\n",
                         max_num_threads, sizeof(threads), 
                         (sizeof(threads)-sizeof(free_threads)),
                         job_queue->size() );
    
    foreach( threads, Handler t )
      res += t->debug_status();
    return res;
  }

  static string _sprintf( int f )
  {
    return f=='O' && sprintf( "%O(/* %s */)", this_program, debug_status() );
  }

  static void create()
  {
    thread_create( dispatcher );
  }
}

#else /* !constant(thread_create) */

// Simulations of some of the classes for nonthreaded use.

/* Fallback implementation of Thread.Local */
optional class Local
{
  static mixed data;
  mixed get() {return data;}
  mixed set (mixed val) {return data = val;}
}

/* Fallback implementation of Thread.MutexKey */
optional class MutexKey (static function(:void) dec_locks)
{
  int `!()
  {
    // Should be destructed when the mutex is, but we can't pull that
    // off. Try to simulate it as well as possible.
    if (dec_locks) return 0;
    destruct (this);
    return 1;
  }

  static void destroy()
  {
    if (dec_locks) dec_locks();
  }
}

/* Fallback implementation of Thread.Mutex */
optional class Mutex
{
  static int locks = 0;
  static void dec_locks() {locks--;}

  MutexKey lock (int|void type)
  {
    switch (type) {
      default:
	error ("Unknown mutex locking style: %d\n", type);
      case 0:
	if (locks) error ("Recursive mutex locks.\n");
	break;
      case 1:
	if (locks)
	  // To be really accurate we should hang now, but somehow
	  // that doesn't seem too useful.
	  error ("Deadlock detected.\n");
	break;
      case 2:
	if (locks) return 0;
    }
    locks++;
    return MutexKey (dec_locks);
  }

  MutexKey trylock (int|void type)
  {
    switch (type) {
      default:
	error ("Unknown mutex locking style: %d\n", type);
      case 0:
	if (locks) error ("Recursive mutex locks.\n");
	break;
      case 1:
      case 2:
    }
    if (locks) return 0;
    locks++;
    return MutexKey (dec_locks);
  }
}

// Fallback implementation of Thread.Fifo.
optional class Fifo
{
  array buffer;
  int ptr, num;
  int read_tres, write_tres;

  int size() {  return num; }

  mixed read()
  {
    if (!num) error ("Deadlock detected - fifo empty.\n");
    return try_read();
  }

  mixed try_read()
  {
    if (!num) return UNDEFINED;
    mixed tmp=buffer[ptr];
    buffer[ptr++] = 0;	// Throw away any references.
    ptr%=sizeof(buffer);
    return tmp;
  }

  array read_array()
  {
    if (!num) error ("Deadlock detected - fifo empty.\n");
    return try_read_array();
  }

  array try_read_array()
  {
    array ret;
    switch (num) {
      case 0:
	ret = ({});
	break;

      case 1:
	ret=buffer[ptr..ptr];
	buffer[ptr++] = 0;	// Throw away any references.
	ptr%=sizeof(buffer);
	num = 0;
	break;

      default:
	if (ptr+num < sizeof(buffer)) {
	  ret = buffer[ptr..ptr+num-1];
	} else {
	  ret = buffer[ptr..]+buffer[..num-(sizeof(buffer)-ptr)-1];
	}
	ptr=num=0;
	buffer=allocate(sizeof(buffer)); // Throw away any references.
	break;
    }

    return ret;
  }

  int try_write(mixed value)
  {
    if (num == sizeof (buffer)) return 0;
    buffer[(ptr + num) % sizeof(buffer)] = value;
    return ++num;
  }

  int write(mixed value)
  {
    if (!try_write(value)) error("Deadlock detected - fifo full.\n");
    return num;
  }

  static void create(int|void size)
  {
    write_tres=0;
    buffer=allocate(read_tres=size || 128);
  }

  static string _sprintf( int f )
  {
    return f=='O' && sprintf( "%O(%d / %d)", this_program,
			      size(), read_tres );
  }
}

// Fallback implementation of Thread.Queue.
optional class Queue
{
  array buffer=allocate(16);
  int r_ptr, w_ptr;
  
  int size() {  return w_ptr - r_ptr;  }

  mixed read()
  {
    if (w_ptr == r_ptr) error ("Deadlock detected - queue empty.\n");
    return try_read();
  }

  mixed try_read()
  {
    mixed tmp=buffer[r_ptr];
    buffer[r_ptr++] = 0;	// Throw away any references.
    return tmp;
  }

  array read_array()
  {
    if (w_ptr == r_ptr) error ("Deadlock detected - queue empty.\n");
    return try_read_array();
  }

  array try_read_array()
  {
    array ret;

    switch (w_ptr - r_ptr) {
      case 0:
	ret = ({});
	break;

      case 1:
	ret=buffer[r_ptr..r_ptr];
	buffer[r_ptr++] = 0;	// Throw away any references.
	break;

      default:
	ret = buffer[r_ptr..w_ptr];
	r_ptr = w_ptr = 0;
	buffer=allocate(sizeof(buffer)); // Throw away any references.
	break;
    }

    return ret;
  }

  int write(mixed value)
  {
    if(w_ptr >= sizeof(buffer))
    {
      buffer=buffer[r_ptr..];
      buffer+=allocate(sizeof(buffer)+1);
      w_ptr-=r_ptr;
      r_ptr=0;
    }
    buffer[w_ptr] = value;
    w_ptr++;
    return w_ptr - r_ptr;
  }

  static string _sprintf( int f )
  {
    return f=='O' && sprintf( "%O(%d)", this_program, size() );
  }
}

#endif /* !constant(thread_create) */