From b5faf3dc681ed4e43f449db9bb30e3e27c801986 Mon Sep 17 00:00:00 2001 From: Martin Stjernholm <mast@lysator.liu.se> Date: Mon, 14 Apr 2003 20:59:37 +0200 Subject: [PATCH] Added try_* varieties of all functions that can block in Thread.Fifo and Thread.Queue. Added functions to make Thread.Fifo and Thread.Queue implement the same interface. Let write and try_write return the number of elements in the fifo/queue. Added fallbacks for Thread.Fifo and Thread.Queue in nonthreaded mode since it's now possible to use them in a meaningful way then. Rev: lib/modules/Thread.pmod:1.37 --- lib/modules/Thread.pmod | 424 ++++++++++++++++++++++++++++++++++------ 1 file changed, 366 insertions(+), 58 deletions(-) diff --git a/lib/modules/Thread.pmod b/lib/modules/Thread.pmod index 19047b50ac..6e84496fc5 100644 --- a/lib/modules/Thread.pmod +++ b/lib/modules/Thread.pmod @@ -38,9 +38,6 @@ optional constant all_threads = predef::all_threads; //! A fifo is a queue of values and is often used as a stream of data //! between two threads. //! -//! @note -//! Fifos are only available on systems with threads support. -//! //! @seealso //! @[Queue] //! @@ -60,20 +57,9 @@ optional class Fifo { //! int size() { return num; } - //! This function retrieves a value from the fifo. Values will be - //! returned in the order they were written. If there are no values - //! present in the fifo the current thread will sleep until some other - //! thread writes a value to the fifo. - //! - //! @seealso - //! @[write()], @[read_array()] - //! - mixed read() + static nomask mixed read_unlocked() { - mixed tmp; - object key=lock::lock(2); - while(!num) r_cond::wait(key); - tmp=buffer[ptr]; + mixed tmp=buffer[ptr]; buffer[ptr++] = 0; // Throw away any references. ptr%=sizeof(buffer); if(read_tres < sizeof(buffer)) @@ -84,54 +70,111 @@ optional class Fifo { num--; w_cond::broadcast(); } - key = 0; return tmp; } - //! This function returns all values currently in the fifo. Values in - //! the array will be in the order they were written. If there are no - //! values present in the fifo the current thread will sleep until - //! some other thread writes a value to the fifo. + //! This function retrieves a value from the fifo. Values will be + //! returned in the order they were written. If there are no values + //! present in the fifo the current thread will sleep until some other + //! thread writes one. //! //! @seealso - //! @[write()], @[read()] + //! @[try_read()], @[read_array()], @[write()] //! - array read_array() + mixed read() { - array ret; - object key=lock::lock(2); + object key=lock::lock(); while(!num) r_cond::wait(key); - if(num==1) - { - ret=buffer[ptr..ptr]; - buffer[ptr++] = 0; // Throw away any references. - ptr%=sizeof(buffer); - num--; - }else{ - if (ptr+num < sizeof(buffer)) { - ret = buffer[ptr..ptr+num-1]; - } else { - ret = buffer[ptr..]+buffer[..num-(sizeof(buffer)-ptr)-1]; - } - ptr=num=0; - buffer=allocate(sizeof(buffer)); // Throw away any references. + mixed res = read_unlocked(); + key = 0; + return res; + } + + //! This function retrieves a value from the fifo if there is any + //! there. Values will be returned in the order they were written. + //! If there are no values present in the fifo then @[UNDEFINED] + //! will be returned. + //! + //! @seealso + //! @[read()] + //! + mixed try_read() + { + if (!num) return UNDEFINED; + object key=lock::lock(); + if (!num) return UNDEFINED; + mixed res = read_unlocked(); + key = 0; + return res; + } + + static nomask array read_all_unlocked() + { + array ret; + + switch (num) { + case 0: + ret = ({}); + break; + + case 1: + ret=buffer[ptr..ptr]; + buffer[ptr++] = 0; // Throw away any references. + ptr%=sizeof(buffer); + num = 0; + w_cond::broadcast(); + break; + + default: + if (ptr+num < sizeof(buffer)) { + ret = buffer[ptr..ptr+num-1]; + } else { + ret = buffer[ptr..]+buffer[..num-(sizeof(buffer)-ptr)-1]; + } + ptr=num=0; + buffer=allocate(sizeof(buffer)); // Throw away any references. + w_cond::broadcast(); + break; } + + return ret; + } + + //! This function returns all values in the fifo as an array. The + //! values in the array will be in the order they were written. If + //! there are no values present in the fifo the current thread will + //! sleep until some other thread writes one. + //! + //! @seealso + //! @[read()], @[try_read_array()] + //! + array read_array() + { + object key=lock::lock(); + while(!num) r_cond::wait(key); + array ret = read_all_unlocked(); key = 0; - w_cond::broadcast(); return ret; } - //! Append a @[value] to the end of the fifo. If there is no more - //! room in the fifo the current thread will sleep until space is - //! available. + //! This function returns all values in the fifo as an array but + //! doesn't wait if there are no values there. The values in the + //! array will be in the order they were written. //! //! @seealso - //! @[read()] + //! @[read_array()] //! - void write(mixed value) + array try_read_array() + { + if (!num) return ({}); + object key=lock::lock(); + array ret = read_all_unlocked(); + key = 0; + return ret; + } + + static nomask void write_unlocked (mixed value) { - object key=lock::lock(2); - while(num == sizeof(buffer)) w_cond::wait(key); buffer[(ptr + num) % sizeof(buffer)] = value; if(write_tres) { @@ -141,7 +184,42 @@ optional class Fifo { num++; r_cond::broadcast(); } + } + + //! Append a @[value] to the end of the fifo. If there is no more + //! room in the fifo the current thread will sleep until space is + //! available. The number of items in the queue after the write is + //! returned. + //! + //! @seealso + //! @[read()] + //! + int write(mixed value) + { + object key=lock::lock(); + while(num == sizeof(buffer)) w_cond::wait(key); + write_unlocked (value); + int items = num; key = 0; + return items; + } + + //! Append a @[value] to the end of the fifo. If there is no more + //! room in the fifo then zero will be returned, otherwise the + //! number of items in the fifo after the write is returned. + //! + //! @seealso + //! @[read()] + //! + int try_write(mixed value) + { + if (num == sizeof (buffer)) return 0; + object key=lock::lock(); + if (num == sizeof (buffer)) return 0; + write_unlocked (value); + int items = num; + key = 0; + return items; } //! @decl void create() @@ -168,10 +246,6 @@ optional class Fifo { //! between @[Queue] and @[Fifo] is that @[Queue] //! will never block in write(), only allocate more memory. //! -//! @note -//! Queues are only available on systems with POSIX or UNIX or WIN32 -//! thread support. -//! //! @seealso //! @[Fifo] //! @@ -192,30 +266,106 @@ optional class Queue { //! This function retrieves a value from the queue. Values will be //! returned in the order they were written. If there are no values //! present in the queue the current thread will sleep until some other - //! thread writes a value to the queue. + //! thread writes one. //! //! @seealso - //! @[write()] + //! @[try_read()], @[write()] //! mixed read() { mixed tmp; object key=lock::lock(); - while(!size()) r_cond::wait(key); + while(w_ptr == r_ptr) r_cond::wait(key); tmp=buffer[r_ptr]; buffer[r_ptr++] = 0; // Throw away any references. key=0; return tmp; } + //! This function retrieves a value from the queue if there is any + //! there. Values will be returned in the order they were written. + //! If there are no values present in the fifo then @[UNDEFINED] + //! will be returned. + //! + //! @seealso + //! @[write()] + //! + mixed try_read() + { + if (w_ptr == r_ptr) return UNDEFINED; + object key=lock::lock(); + if (w_ptr == r_ptr) return UNDEFINED; + mixed tmp=buffer[r_ptr]; + buffer[r_ptr++] = 0; // Throw away any references. + key=0; + return tmp; + } + + static nomask array read_all_unlocked() + { + array ret; + + switch (w_ptr - r_ptr) { + case 0: + ret = ({}); + break; + + case 1: + ret=buffer[r_ptr..r_ptr]; + buffer[r_ptr++] = 0; // Throw away any references. + break; + + default: + ret = buffer[r_ptr..w_ptr]; + r_ptr = w_ptr = 0; + buffer=allocate(sizeof(buffer)); // Throw away any references. + break; + } + + return ret; + } + + //! This function returns all values in the queue as an array. The + //! values in the array will be in the order they were written. If + //! there are no values present in the queue the current thread will + //! sleep until some other thread writes one. + //! + //! @seealso + //! @[read()], @[try_read_array()] + //! + array read_array() + { + object key=lock::lock(); + while (w_ptr == r_ptr) r_cond::wait(key); + array ret = read_all_unlocked(); + key = 0; + return ret; + } + + //! This function returns all values in the queue as an array but + //! doesn't wait if there are no values there. The values in the + //! array will be in the order they were written. + //! + //! @seealso + //! @[read_array()] + //! + array try_read_array() + { + if (w_ptr == r_ptr) return ({}); + object key=lock::lock(); + array ret = read_all_unlocked(); + key = 0; + return ret; + } + //! This function puts a @[value] last in the queue. If the queue is - //! too small to hold the @[value] the queue will be expanded to make - //! room for it. + //! too small to hold the @[value] it will be expanded to make room. + //! The number of items in the queue after the write is returned. //! //! @seealso //! @[read()] //! - void write(mixed value) + int write(mixed value) { object key=lock::lock(); if(w_ptr >= sizeof(buffer)) @@ -227,8 +377,10 @@ optional class Queue { } buffer[w_ptr] = value; w_ptr++; - key=0; // Must free this one _before_ the signal... + int items = w_ptr - r_ptr; r_cond::signal(); + key=0; + return items; } static string _sprintf( int f ) @@ -597,4 +749,160 @@ class Mutex } } +// Fallback implementation of Thread.Fifo. +class Fifo +{ + array buffer; + int ptr, num; + int read_tres, write_tres; + + int size() { return num; } + + mixed read() + { + if (!num) error ("Deadlock detected - fifo empty.\n"); + return try_read(); + } + + mixed try_read() + { + if (!num) return UNDEFINED; + mixed tmp=buffer[ptr]; + buffer[ptr++] = 0; // Throw away any references. + ptr%=sizeof(buffer); + return tmp; + } + + array read_array() + { + if (!num) error ("Deadlock detected - fifo empty.\n"); + return try_read_array(); + } + + array try_read_array() + { + switch (num) { + case 0: + ret = ({}); + break; + + case 1: + ret=buffer[ptr..ptr]; + buffer[ptr++] = 0; // Throw away any references. + ptr%=sizeof(buffer); + num = 0; + break; + + default: + if (ptr+num < sizeof(buffer)) { + ret = buffer[ptr..ptr+num-1]; + } else { + ret = buffer[ptr..]+buffer[..num-(sizeof(buffer)-ptr)-1]; + } + ptr=num=0; + buffer=allocate(sizeof(buffer)); // Throw away any references. + break; + } + + return ret; + } + + int write(mixed value) + { + if (num == sizeof(buffer)) error ("Deadlock detected - fifo full.\n"); + write_unlocked (value); + return num; + } + + int try_write(mixed value) + { + if (num == sizeof (buffer)) return 0; + buffer[(ptr + num) % sizeof(buffer)] = value; + return ++num; + } + + static void create(int|void size) + { + write_tres=0; + buffer=allocate(read_tres=size || 128); + } + + static string _sprintf( int f ) + { + return f=='O' && sprintf( "%O(%d / %d)", this_program, + size(), read_tres ); + } +} + +// Fallback implementation of Thread.Queue. +class Queue +{ + array buffer=allocate(16); + int r_ptr, w_ptr; + + int size() { return w_ptr - r_ptr; } + + mixed read() + { + if (w_ptr == r_ptr) error ("Deadlock detected - queue empty.\n"); + return try_read(); + } + + mixed try_read() + { + mixed tmp=buffer[r_ptr]; + buffer[r_ptr++] = 0; // Throw away any references. + return tmp; + } + + array read_array() + { + if (w_ptr == r_ptr) error ("Deadlock detected - queue empty.\n"); + return try_read_array(); + } + + array try_read_array() + { + array ret; + + switch (w_ptr - r_ptr) { + case 0: + ret = ({}); + break; + + case 1: + ret=buffer[r_ptr..r_ptr]; + buffer[r_ptr++] = 0; // Throw away any references. + break; + + default: + ret = buffer[r_ptr..w_ptr]; + r_ptr = w_ptr = 0; + buffer=allocate(sizeof(buffer)); // Throw away any references. + break; + } + + return ret; + } + + int write(mixed value) + { + if(w_ptr >= sizeof(buffer)) + { + buffer=buffer[r_ptr..]; + buffer+=allocate(sizeof(buffer)+1); + w_ptr-=r_ptr; + r_ptr=0; + } + buffer[w_ptr] = value; + w_ptr++; + return w_ptr - r_ptr; + } + + static string _sprintf( int f ) + { + return f=='O' && sprintf( "%O(%d)", this_program, size() ); + } +} + #endif /* !constant(thread_create) */ -- GitLab