Skip to content
Snippets Groups Projects
Select Git revision
  • 9fc89ec67cfb304d2416c045fa03a3790ce925fa
  • master default protected
  • 9.0
  • 8.0
  • nt-tools
  • 7.8
  • 7.6
  • 7.4
  • 7.2
  • 7.0
  • 0.6
  • rosuav/latex-markdown-renderer
  • rxnpatch/rxnpatch
  • marcus/gobject-introspection
  • rxnpatch/8.0
  • rosuav/pre-listening-ports
  • rosuav/async-annotations
  • rosuav/pgsql-ssl
  • rxnpatch/rxnpatch-broken/2023-10-06T094250
  • grubba/fdlib
  • grubba/wip/sakura/8.0
  • v8.0.2020
  • v8.0.2018
  • v8.0.2016
  • v8.0.2014
  • v8.0.2012
  • v8.0.2008
  • v8.0.2006
  • v8.0.2004
  • v8.0.2002
  • v8.0.2000
  • v8.0.1998
  • v8.0.1996
  • v8.0.1994
  • v8.0.1992
  • v8.0.1990
  • v8.0.1988
  • v8.0.1986
  • rxnpatch/clusters/8.0/2025-04-29T124414
  • rxnpatch/2025-04-29T124414
  • v8.0.1984
41 results

Function.pmod

Blame
  • Thread.pmod 23.39 KiB
    #pike __REAL_VERSION__
    
    #if constant(thread_create)
    constant Thread=__builtin.thread_id;
    
    //! Create a new thread.
    //!
    //! @deprecated predef::Thread.Thread
    optional __deprecated__ Thread `()( mixed f, mixed ... args )
    {
      return thread_create( f, @args );
    }
    
    optional constant MutexKey=__builtin.mutex_key;
    optional constant Mutex=__builtin.mutex;
    optional constant Condition=__builtin.condition;
    optional constant _Disabled=__builtin.threads_disabled;
    optional constant Local=__builtin.thread_local;
    
    optional constant thread_create = predef::thread_create;
    
    optional constant this_thread = predef::this_thread;
    
    optional constant all_threads = predef::all_threads;
    
    constant THREAD_NOT_STARTED = __builtin.THREAD_NOT_STARTED;
    constant THREAD_RUNNING = __builtin.THREAD_RUNNING;
    constant THREAD_EXITED = __builtin.THREAD_EXITED;
    
    
    //! @[Fifo] implements a fixed length first-in, first-out queue.
    //! A fifo is a queue of values and is often used as a stream of data
    //! between two threads.
    //!
    //! @seealso
    //!   @[Queue]
    //!
    optional class Fifo {
      inherit Condition : r_cond;
      inherit Condition : w_cond;
      inherit Mutex : lock;
      
      array buffer;
      int ptr, num;
      int read_tres, write_tres;
    
      //! This function returns the number of elements currently in the fifo.
      //!
      //! @seealso
      //!   @[read()], @[write()]
      //!
      int size() {  return num; }
    
      protected final mixed read_unlocked()
      {
        mixed tmp=buffer[ptr];
        buffer[ptr++] = 0;	// Throw away any references.
        ptr%=sizeof(buffer);
        if(read_tres < sizeof(buffer))
        {
          if(num-- == read_tres)
    	w_cond::broadcast();
        }else{
          num--;
          w_cond::broadcast();
        }
        return tmp;
      }
    
      //! This function retrieves a value from the fifo. Values will be
      //! returned in the order they were written. If there are no values
      //! present in the fifo the current thread will sleep until some other
      //! thread writes one.
      //!
      //! @seealso
      //!   @[try_read()], @[read_array()], @[write()]
      //!
      mixed read()
      {
        object key=lock::lock();
        while(!num) r_cond::wait(key);
        mixed res = read_unlocked();
        key = 0;
        return res;
      }
    
      //! This function retrieves a value from the fifo if there is any
      //! there. Values will be returned in the order they were written.
      //! If there are no values present in the fifo then @[UNDEFINED]
      //! will be returned.
      //!
      //! @seealso
      //!   @[read()]
      //!
      mixed try_read()
      {
        if (!num) return UNDEFINED;
        object key=lock::lock();
        if (!num) return UNDEFINED;
        mixed res = read_unlocked();
        key = 0;
        return res;
      }
    
      protected final array read_all_unlocked()
      {
        array ret;
    
        switch (num) {
          case 0:
    	ret = ({});
    	break;
    
          case 1:
    	ret=buffer[ptr..ptr];
    	buffer[ptr++] = 0;	// Throw away any references.
    	ptr%=sizeof(buffer);
    	num = 0;
    	w_cond::broadcast();
    	break;
    
          default:
    	if (ptr+num < sizeof(buffer)) {
    	  ret = buffer[ptr..ptr+num-1];
    	} else {
    	  ret = buffer[ptr..]+buffer[..num-(sizeof(buffer)-ptr)-1];
    	}
    	ptr=num=0;
    	buffer=allocate(sizeof(buffer)); // Throw away any references.
    	w_cond::broadcast();
    	break;
        }
    
        return ret;
      }
    
      //! This function returns all values in the fifo as an array. The
      //! values in the array will be in the order they were written. If
      //! there are no values present in the fifo the current thread will
      //! sleep until some other thread writes one.
      //!
      //! @seealso
      //!   @[read()], @[try_read_array()]
      //!
      array read_array()
      {
        object key=lock::lock();
        while(!num) r_cond::wait(key);
        array ret = read_all_unlocked();
        key = 0;
        return ret;
      }
    
      //! This function returns all values in the fifo as an array but
      //! doesn't wait if there are no values there. The values in the
      //! array will be in the order they were written.
      //!
      //! @seealso
      //!   @[read_array()]
      //!
      array try_read_array()
      {
        if (!num) return ({});
        object key=lock::lock();
        array ret = read_all_unlocked();
        key = 0;
        return ret;
      }
    
      protected final void write_unlocked (mixed value)
      {
        buffer[(ptr + num) % sizeof(buffer)] = value;
        if(write_tres)
        {
          if(num++ == write_tres)
    	r_cond::broadcast();
        }else{
          num++;
          r_cond::broadcast();
        }
      }
    
      //! Append a @[value] to the end of the fifo. If there is no more
      //! room in the fifo the current thread will sleep until space is
      //! available. The number of items in the queue after the write is
      //! returned.
      //!
      //! @seealso
      //!   @[read()]
      //!
      int write(mixed value)
      {
        object key=lock::lock();
        while(num == sizeof(buffer)) w_cond::wait(key);
        write_unlocked (value);
        int items = num;
        key = 0;
        return items;
      }
    
      //! Append a @[value] to the end of the fifo. If there is no more
      //! room in the fifo then zero will be returned, otherwise the
      //! number of items in the fifo after the write is returned.
      //!
      //! @seealso
      //!   @[read()]
      //!
      int try_write(mixed value)
      {
        if (num == sizeof (buffer)) return 0;
        object key=lock::lock();
        if (num == sizeof (buffer)) return 0;
        write_unlocked (value);
        int items = num;
        key = 0;
        return items;
      }
    
      //! @decl void create()
      //! @decl void create(int size)
      //!
      //! Create a fifo. If the optional @[size] argument is present it
      //! sets how many values can be written to the fifo without blocking.
      //! The default @[size] is 128.
      //!
      protected void create(int|void size)
      {
        write_tres=0;
        buffer=allocate(read_tres=size || 128);
      }
    
      protected string _sprintf( int f )
      {
        return f=='O' && sprintf( "%O(%d / %d)", this_program,
    			      size(), read_tres );
      }
    }
    
    //! @[Queue] implements a queue, or a pipeline. The main difference
    //! between @[Queue] and @[Fifo] is that @[Queue]
    //! will never block in write(), only allocate more memory.
    //!
    //! @seealso
    //!   @[Fifo]
    //!
    optional class Queue {
      inherit Condition : r_cond;
      inherit Mutex : lock;
      
      array buffer=allocate(16);
      int r_ptr, w_ptr;
      
      //! This function returns the number of elements currently in the queue.
      //!
      //! @seealso
      //!   @[read()], @[write()]
      //!
      int size() {  return w_ptr - r_ptr;  }
    
      //! This function retrieves a value from the queue. Values will be
      //! returned in the order they were written. If there are no values
      //! present in the queue the current thread will sleep until some other
      //! thread writes one.
      //!
      //! @seealso
      //!   @[try_read()], @[write()]
      //!
      mixed read()
      {
        mixed tmp;
        object key=lock::lock();
        while(w_ptr == r_ptr) r_cond::wait(key);
        tmp=buffer[r_ptr];
        buffer[r_ptr++] = 0;	// Throw away any references.
        key=0;
        return tmp;
      }
    
      //! This function retrieves a value from the queue if there is any
      //! there. Values will be returned in the order they were written.
      //! If there are no values present in the fifo then @[UNDEFINED]
      //! will be returned.
      //!
      //! @seealso
      //!   @[write()]
      //!
      mixed try_read()
      {
        if (w_ptr == r_ptr) return UNDEFINED;
        object key=lock::lock();
        if (w_ptr == r_ptr) return UNDEFINED;
        mixed tmp=buffer[r_ptr];
        buffer[r_ptr++] = 0;	// Throw away any references.
        key=0;
        return tmp;
      }
    
      protected final array read_all_unlocked()
      {
        array ret;
    
        switch (w_ptr - r_ptr) {
          case 0:
    	ret = ({});
    	break;
    
          case 1:
    	ret=buffer[r_ptr..r_ptr];
    	buffer[r_ptr++] = 0;	// Throw away any references.
    	break;
    
          default:
    	ret = buffer[r_ptr..w_ptr-1];
    	r_ptr = w_ptr = 0;
    	buffer=allocate(sizeof(buffer)); // Throw away any references.
    	break;
        }
    
        return ret;
      }
    
      //! This function returns all values in the queue as an array. The
      //! values in the array will be in the order they were written. If
      //! there are no values present in the queue the current thread will
      //! sleep until some other thread writes one.
      //!
      //! @seealso
      //!   @[read()], @[try_read_array()]
      //!
      array read_array()
      {
        object key=lock::lock();
        while (w_ptr == r_ptr) r_cond::wait(key);
        array ret = read_all_unlocked();
        key = 0;
        return ret;
      }
    
      //! This function returns all values in the queue as an array but
      //! doesn't wait if there are no values there. The values in the
      //! array will be in the order they were written.
      //!
      //! @seealso
      //!   @[read_array()]
      //!
      array try_read_array()
      {
        if (w_ptr == r_ptr) return ({});
        object key=lock::lock();
        array ret = read_all_unlocked();
        key = 0;
        return ret;
      }
    
      //! Returns a snapshot of all the values in the queue, in the order
      //! they were written. The values are still left in the queue, so if
      //! other threads are reading from it, the returned value should be
      //! considered stale already on return.
      array peek_array()
      {
        if (w_ptr == r_ptr) return ({});
        MutexKey key = lock::lock();
        array ret = buffer[r_ptr..w_ptr - 1];
        key = 0;
        return ret;
      }
    
      //! This function puts a @[value] last in the queue. If the queue is
      //! too small to hold the @[value] it will be expanded to make room.
      //! The number of items in the queue after the write is returned.
      //!
      //! @seealso
      //!   @[read()]
      //!
      int write(mixed value)
      {
        object key=lock::lock();
        if(w_ptr >= sizeof(buffer))
        {
          buffer=buffer[r_ptr..];
          buffer+=allocate(sizeof(buffer)+1);
          w_ptr-=r_ptr;
          r_ptr=0;
        }
        buffer[w_ptr] = value;
        w_ptr++;
        int items = w_ptr - r_ptr;
        r_cond::signal();
        key=0;
        return items;
      }
    
      protected string _sprintf( int f )
      {
        return f=='O' && sprintf( "%O(%d)", this_program, size() );
      }
    }
    
    
    //! A thread farm.
    optional class Farm
    {
      protected Mutex mutex = Mutex();
      protected Condition ft_cond = Condition();
      protected Queue job_queue = Queue();
    
      //! An asynchronous result.
      class Result
      {
        int ready;
        mixed value;
        function done_cb;
    
        //! @returns
        //!   @int
        //!     @value 1
        //!       Returns @expr{1@} when the result is available.
        //!     @value 0
        //!       Returns @expr{0@} (zero) when the result hasn't
        //!       arrived yet.
        //!     @value -1
        //!       Returns negative on failure.
        //!   @endint
        int status()
        {
          return ready;
        }
    
        //! @returns
        //!   Returns the result if available, a backtrace on failure,
        //!   and @expr{0@} (zero) otherwise.
        mixed result()
        {
          return value;
        }
    
        //! Wait for completion.
        mixed `()()
        {
          object key = mutex->lock();
          while(!ready)     ft_cond->wait(key);
          key = 0;
          if( ready < 0 )   throw( value );
          return value;
        }
    
        //! Register a callback to be called when
        //! the result is available.
        //!
        //! @param to
        //!   Callback to be called. The first
        //!   argument to the callback will be
        //!   the result or the failure backtrace,
        //!   and the second @expr{0@} (zero) on
        //!   success, and @expr{1@} on failure.
        void set_done_cb( function to )
        {
          if( ready )
            to( value, ready<0 );
          else
            done_cb = to;
        }
    
        //! Register a failure.
        //!
        //! @param what
        //!   The corresponding backtrace.
        void provide_error( mixed what )
        {
          value = what;
          ready = -1;
          if( done_cb )
            done_cb( what, 1 );
        }
    
        //! Register a completed result.
        //!
        //! @param what
        //!   The result to register.
        void provide( mixed what )
        {
          ready = 1;
          value = what;
          if( done_cb )
            done_cb( what, 0 );
        }
    
    
        protected string _sprintf( int f )
        {
          switch( f )
          {
    	case 't':
    	  return "Thread.Farm().Result";
    	case 'O':
    	  return sprintf( "%t(%d %O)", this, ready, value );
          }
        }
      }
    
      //! A worker thread.
      protected class Handler
      {
        Mutex job_mutex = Mutex();
        Condition cond = Condition();
        array(object|array(function|array)) job;
        object thread;
    
        float total_time;
        int handled, max_time;
    
        protected int ready;
    
        void handler()
        {
          array(object|array(function|array)) q;
          object key = job_mutex->lock();
          ready = 1;
          while( 1 )
          {
            cond->wait(key);
            if( q = job )
            {
              mixed res, err;
              int st = gethrtime();
              if( err = catch(res = q[1][0]( @q[1][1] )) && q[0])
                ([object]q[0])->provide_error( err );
              else if( q[0] )
                ([object]q[0])->provide( res );
              object lock = mutex->lock();
              free_threads += ({ this });
              lock = 0;
              st = gethrtime()-st;
              total_time += st/1000.0;
              handled++;
              job = 0;
              if( st > max_time )
                max_time = st;
              ft_cond->broadcast();
            } else  {
              object lock = mutex->lock();
              threads -= ({ this });
              free_threads -= ({ this });
              lock = 0;
              destruct();
              return;
            }
          }
        }
    
        void run( array(function|array) what, object|void resobj )
        {
          while(!ready) sleep(0.1);
          object key = job_mutex->lock();
          job = ({ resobj, what });
          cond->signal();
          key = 0;
        }
    
        //! Get some statistics about the worker thread.
        string debug_status()
        {
          return ("Thread:\n"
                  " Handled works: "+handled+"\n"+
                  (handled?
                   " Average time:  "+((int)(total_time / handled))+"ms\n"
                   " Max time:      "+sprintf("%2.2fms\n", max_time/1000.0):"")+
                  " Status:        "+(job?"Working":"Idle" )+"\n"+
                  (job?
                   ("    "+
                    replace( describe_backtrace(thread->backtrace()),
                             "\n",
                             "\n    ")):"")
                  +"\n\n");
        }
    
        protected void create()
        {
          thread = thread_create( handler );
        }
    
    
        protected string _sprintf( int f )
        {
          switch( f )
          {
    	case 't':
    	  return "Thread.Farm().Handler";
    	case 'O':
    	  return sprintf( "%t(%f / %d,  %d)", this,
    			  total_time, max_time, handled );
          }
        }
      }
    
      protected array(Handler) threads = ({});
      protected array(Handler) free_threads = ({});
      protected int max_num_threads = 20;
    
      protected Handler aquire_thread()
      {
        object lock = mutex->lock();
        while( !sizeof(free_threads) )
        {
          if( sizeof(threads) < max_num_threads )
          {
            threads += ({ Handler() });
            free_threads += ({ threads[-1] });
          } else {
            ft_cond->wait(mutex);
          }
        }
        object(Handler) t = free_threads[0];
        free_threads = free_threads[1..];
        return t;
      }
            
    
      protected void dispatcher()
      {
        while( array q = [array]job_queue->read() )
          aquire_thread()->run( q[1], q[0] );
      }
    
      protected class ValueAdjuster( object r, object r2, int i, mapping v )
      {
        void go(mixed vn, int err)
        {
          if (!r->status()) {
    	([array]r->value)[ i ] = vn;
    	if( err )
    	  r->provide_error( err );
    	if( !--v->num_left )
    	  r->provide( r->value );
          }
          destruct();
        }
      }
    
    
      //! Register multiple jobs.
      //!
      //! @param fun_args
      //!   An array of arrays where the first element
      //!   is a function to call, and the second is
      //!   a corresponding array of arguments.
      //!
      //! @returns
      //!   Returns a @[Result] object with an array
      //!   with one element for the result for each
      //!   of the functions in @[fun_args].
      //!
      //! @note
      //!   Do not modify the elements of @[fun_args]
      //!   before the result is available.
      //!
      //! @note
      //!   If any of the functions in @[fun_args] throws
      //!   and error, all of the accululated results
      //!   (if any) will be dropped from the result, and
      //!   the first backtrace be provided.
      //!
      //! @seealso
      //!   @[run_multiple_async()]
      Result run_multiple( array(array(function|array)) fun_args )
      {
        Result r = Result(); // private result..
        r->value = allocate( sizeof( fun_args ) );
        mapping nl = ([ "num_left":sizeof( fun_args ) ]);
        for( int i=0; i<sizeof( fun_args ); i++ )
        {
          Result r2 = Result();
          r2->set_done_cb( ValueAdjuster( r, r2, i, nl )->go );
          job_queue->write( ({ r2, fun_args[i] }) );
        }
        return r;
      }
    
    
      //! Register multiple jobs where the return values
      //! are to be ignored.
      //!
      //! @param fun_args
      //!   An array of arrays where the first element
      //!   is a function to call, and the second is
      //!   a corresponding array of arguments.
      //!
      //! @note
      //!   Do not modify the elements of @[fun_args]
      //!   before the result is available.
      //!
      //! @seealso
      //!   @[run_multiple()]
      void run_multiple_async( array fun_args )
      {
        for( int i=0; i<sizeof( fun_args ); i++ )
          job_queue->write( ({ 0, fun_args[i] }) );
      }
    
    
      //! Register a job for the thread farm.
      //!
      //! @param f
      //!   Function to call with @@@[args] to
      //!   perform the job.
      //!
      //! @param args
      //!   The parameters for @[f].
      //!
      //! @returns
      //!   Returns a @[Result] object for the job.
      //!
      //! @note
      //!   In Pike 7.8 and earlier this function
      //!   was broken and returned a @[Result]
      //!   object that wasn't connected to the job.
      //!
      //! @seealso
      //!   @[run_async()]
      Result run( function f, mixed ... args )
      {
        Result ro = Result();
        job_queue->write( ({ ro, ({f, args }) }) );
        return ro;
      }
    
      //! Register a job for the thread farm
      //! where the return value from @[f] is
      //! ignored.
      //!
      //! @param f
      //!   Function to call with @@@[args] to
      //!   perform the job.
      //!
      //! @param args
      //!   The parameters for @[f].
      //!
      //! @seealso
      //!   @[run()]
      void run_async( function f, mixed ... args )
      {
        job_queue->write( ({ 0, ({f, args }) }) );
      }
    
      //! Set the maximum number of worker threads
      //! that the thread farm may have.
      //!
      //! @param to
      //!   The new maximum number.
      //!
      //! If there are more worker threads than @[to],
      //! the function will wait until enough threads
      //! have finished, so that the total is @[to] or less.
      //!
      //! The default maximum number of worker threads is @expr{20@}.
      int set_max_num_threads( int(1..) to )
      {
        int omnt = max_num_threads;
        if( to <= 0 )
          error("Illegal argument 1 to set_max_num_threads,"
                "num_threads must be > 0\n");
    
        max_num_threads = to;
        while( sizeof( threads ) > max_num_threads )
        {
          object key = mutex->lock();
          while( sizeof( free_threads ) )
            free_threads[0]->cond->signal();
          if( sizeof( threads ) > max_num_threads)
            ft_cond->wait(key);
        }
        ft_cond->broadcast( );
        return omnt;
      }
    
      //! Get some statistics for the thread farm.
      string debug_status()
      {
        string res = sprintf("Thread farm\n"
                             "  Max threads     = %d\n"
                             "  Current threads = %d\n"
                             "  Working threads = %d\n"
                             "  Jobs in queue   = %d\n\n",
                             max_num_threads, sizeof(threads), 
                             (sizeof(threads)-sizeof(free_threads)),
                             job_queue->size() );
        
        foreach( threads, Handler t )
          res += t->debug_status();
        return res;
      }
    
      protected string _sprintf( int f )
      {
        return f=='O' && sprintf( "%O(/* %s */)", this_program, debug_status() );
      }
    
      protected void create()
      {
        thread_create( dispatcher );
      }
    }
    
    #else /* !constant(thread_create) */
    
    // Simulations of some of the classes for nonthreaded use.
    
    /* Fallback implementation of Thread.Local */
    optional class Local
    {
      protected mixed data;
      mixed get() {return data;}
      mixed set (mixed val) {return data = val;}
    }
    
    /* Fallback implementation of Thread.MutexKey */
    optional class MutexKey (protected function(:void) dec_locks)
    {
      int `!()
      {
        // Should be destructed when the mutex is, but we can't pull that
        // off. Try to simulate it as well as possible.
        if (dec_locks) return 0;
        destruct (this);
        return 1;
      }
    
      protected void destroy()
      {
        if (dec_locks) dec_locks();
      }
    }
    
    /* Fallback implementation of Thread.Mutex */
    optional class Mutex
    {
      protected int locks = 0;
      protected void dec_locks() {locks--;}
    
      MutexKey lock (int|void type)
      {
        switch (type) {
          default:
    	error ("Unknown mutex locking style: %d\n", type);
          case 0:
    	if (locks) error ("Recursive mutex locks.\n");
    	break;
          case 1:
    	if (locks)
    	  // To be really accurate we should hang now, but somehow
    	  // that doesn't seem too useful.
    	  error ("Deadlock detected.\n");
    	break;
          case 2:
    	if (locks) return 0;
        }
        locks++;
        return MutexKey (dec_locks);
      }
    
      MutexKey trylock (int|void type)
      {
        switch (type) {
          default:
    	error ("Unknown mutex locking style: %d\n", type);
          case 0:
    	if (locks) error ("Recursive mutex locks.\n");
    	break;
          case 1:
          case 2:
        }
        if (locks) return 0;
        locks++;
        return MutexKey (dec_locks);
      }
    }
    
    // Fallback implementation of Thread.Fifo.
    optional class Fifo
    {
      array buffer;
      int ptr, num;
      int read_tres, write_tres;
    
      int size() {  return num; }
    
      mixed read()
      {
        if (!num) error ("Deadlock detected - fifo empty.\n");
        return try_read();
      }
    
      mixed try_read()
      {
        if (!num) return UNDEFINED;
        mixed tmp=buffer[ptr];
        buffer[ptr++] = 0;	// Throw away any references.
        ptr%=sizeof(buffer);
        return tmp;
      }
    
      array read_array()
      {
        if (!num) error ("Deadlock detected - fifo empty.\n");
        return try_read_array();
      }
    
      array try_read_array()
      {
        array ret;
        switch (num) {
          case 0:
    	ret = ({});
    	break;
    
          case 1:
    	ret=buffer[ptr..ptr];
    	buffer[ptr++] = 0;	// Throw away any references.
    	ptr%=sizeof(buffer);
    	num = 0;
    	break;
    
          default:
    	if (ptr+num < sizeof(buffer)) {
    	  ret = buffer[ptr..ptr+num-1];
    	} else {
    	  ret = buffer[ptr..]+buffer[..num-(sizeof(buffer)-ptr)-1];
    	}
    	ptr=num=0;
    	buffer=allocate(sizeof(buffer)); // Throw away any references.
    	break;
        }
    
        return ret;
      }
    
      int try_write(mixed value)
      {
        if (num == sizeof (buffer)) return 0;
        buffer[(ptr + num) % sizeof(buffer)] = value;
        return ++num;
      }
    
      int write(mixed value)
      {
        if (!try_write(value)) error("Deadlock detected - fifo full.\n");
        return num;
      }
    
      protected void create(int|void size)
      {
        write_tres=0;
        buffer=allocate(read_tres=size || 128);
      }
    
      protected string _sprintf( int f )
      {
        return f=='O' && sprintf( "%O(%d / %d)", this_program,
    			      size(), read_tres );
      }
    }
    
    // Fallback implementation of Thread.Queue.
    optional class Queue
    {
      array buffer=allocate(16);
      int r_ptr, w_ptr;
      
      int size() {  return w_ptr - r_ptr;  }
    
      mixed read()
      {
        if (w_ptr == r_ptr) error ("Deadlock detected - queue empty.\n");
        return try_read();
      }
    
      mixed try_read()
      {
        mixed tmp=buffer[r_ptr];
        buffer[r_ptr++] = 0;	// Throw away any references.
        return tmp;
      }
    
      array read_array()
      {
        if (w_ptr == r_ptr) error ("Deadlock detected - queue empty.\n");
        return try_read_array();
      }
    
      array try_read_array()
      {
        array ret;
    
        switch (w_ptr - r_ptr) {
          case 0:
    	ret = ({});
    	break;
    
          case 1:
    	ret=buffer[r_ptr..r_ptr];
    	buffer[r_ptr++] = 0;	// Throw away any references.
    	break;
    
          default:
    	ret = buffer[r_ptr..w_ptr-1];
    	r_ptr = w_ptr = 0;
    	buffer=allocate(sizeof(buffer)); // Throw away any references.
    	break;
        }
    
        return ret;
      }
    
      int write(mixed value)
      {
        if(w_ptr >= sizeof(buffer))
        {
          buffer=buffer[r_ptr..];
          buffer+=allocate(sizeof(buffer)+1);
          w_ptr-=r_ptr;
          r_ptr=0;
        }
        buffer[w_ptr] = value;
        w_ptr++;
        return w_ptr - r_ptr;
      }
    
      protected string _sprintf( int f )
      {
        return f=='O' && sprintf( "%O(%d)", this_program, size() );
      }
    }
    
    #endif /* !constant(thread_create) */