Skip to content
Snippets Groups Projects
Commit b5faf3dc authored by Martin Stjernholm's avatar Martin Stjernholm
Browse files

Added try_* varieties of all functions that can block in Thread.Fifo

and Thread.Queue. Added functions to make Thread.Fifo and Thread.Queue
implement the same interface. Let write and try_write return the
number of elements in the fifo/queue. Added fallbacks for Thread.Fifo
and Thread.Queue in nonthreaded mode since it's now possible to use
them in a meaningful way then.

Rev: lib/modules/Thread.pmod:1.37
parent a2a934dd
No related branches found
No related tags found
No related merge requests found
......@@ -38,9 +38,6 @@ optional constant all_threads = predef::all_threads;
//! A fifo is a queue of values and is often used as a stream of data
//! between two threads.
//!
//! @note
//! Fifos are only available on systems with threads support.
//!
//! @seealso
//! @[Queue]
//!
......@@ -60,20 +57,9 @@ optional class Fifo {
//!
int size() { return num; }
//! This function retrieves a value from the fifo. Values will be
//! returned in the order they were written. If there are no values
//! present in the fifo the current thread will sleep until some other
//! thread writes a value to the fifo.
//!
//! @seealso
//! @[write()], @[read_array()]
//!
mixed read()
static nomask mixed read_unlocked()
{
mixed tmp;
object key=lock::lock(2);
while(!num) r_cond::wait(key);
tmp=buffer[ptr];
mixed tmp=buffer[ptr];
buffer[ptr++] = 0; // Throw away any references.
ptr%=sizeof(buffer);
if(read_tres < sizeof(buffer))
......@@ -84,54 +70,111 @@ optional class Fifo {
num--;
w_cond::broadcast();
}
key = 0;
return tmp;
}
//! This function returns all values currently in the fifo. Values in
//! the array will be in the order they were written. If there are no
//! values present in the fifo the current thread will sleep until
//! some other thread writes a value to the fifo.
//! This function retrieves a value from the fifo. Values will be
//! returned in the order they were written. If there are no values
//! present in the fifo the current thread will sleep until some other
//! thread writes one.
//!
//! @seealso
//! @[write()], @[read()]
//! @[try_read()], @[read_array()], @[write()]
//!
array read_array()
mixed read()
{
array ret;
object key=lock::lock(2);
object key=lock::lock();
while(!num) r_cond::wait(key);
if(num==1)
{
ret=buffer[ptr..ptr];
buffer[ptr++] = 0; // Throw away any references.
ptr%=sizeof(buffer);
num--;
}else{
if (ptr+num < sizeof(buffer)) {
ret = buffer[ptr..ptr+num-1];
} else {
ret = buffer[ptr..]+buffer[..num-(sizeof(buffer)-ptr)-1];
}
ptr=num=0;
buffer=allocate(sizeof(buffer)); // Throw away any references.
mixed res = read_unlocked();
key = 0;
return res;
}
//! This function retrieves a value from the fifo if there is any
//! there. Values will be returned in the order they were written.
//! If there are no values present in the fifo then @[UNDEFINED]
//! will be returned.
//!
//! @seealso
//! @[read()]
//!
mixed try_read()
{
if (!num) return UNDEFINED;
object key=lock::lock();
if (!num) return UNDEFINED;
mixed res = read_unlocked();
key = 0;
return res;
}
static nomask array read_all_unlocked()
{
array ret;
switch (num) {
case 0:
ret = ({});
break;
case 1:
ret=buffer[ptr..ptr];
buffer[ptr++] = 0; // Throw away any references.
ptr%=sizeof(buffer);
num = 0;
w_cond::broadcast();
break;
default:
if (ptr+num < sizeof(buffer)) {
ret = buffer[ptr..ptr+num-1];
} else {
ret = buffer[ptr..]+buffer[..num-(sizeof(buffer)-ptr)-1];
}
ptr=num=0;
buffer=allocate(sizeof(buffer)); // Throw away any references.
w_cond::broadcast();
break;
}
return ret;
}
//! This function returns all values in the fifo as an array. The
//! values in the array will be in the order they were written. If
//! there are no values present in the fifo the current thread will
//! sleep until some other thread writes one.
//!
//! @seealso
//! @[read()], @[try_read_array()]
//!
array read_array()
{
object key=lock::lock();
while(!num) r_cond::wait(key);
array ret = read_all_unlocked();
key = 0;
w_cond::broadcast();
return ret;
}
//! Append a @[value] to the end of the fifo. If there is no more
//! room in the fifo the current thread will sleep until space is
//! available.
//! This function returns all values in the fifo as an array but
//! doesn't wait if there are no values there. The values in the
//! array will be in the order they were written.
//!
//! @seealso
//! @[read()]
//! @[read_array()]
//!
void write(mixed value)
array try_read_array()
{
if (!num) return ({});
object key=lock::lock();
array ret = read_all_unlocked();
key = 0;
return ret;
}
static nomask void write_unlocked (mixed value)
{
object key=lock::lock(2);
while(num == sizeof(buffer)) w_cond::wait(key);
buffer[(ptr + num) % sizeof(buffer)] = value;
if(write_tres)
{
......@@ -141,7 +184,42 @@ optional class Fifo {
num++;
r_cond::broadcast();
}
}
//! Append a @[value] to the end of the fifo. If there is no more
//! room in the fifo the current thread will sleep until space is
//! available. The number of items in the queue after the write is
//! returned.
//!
//! @seealso
//! @[read()]
//!
int write(mixed value)
{
object key=lock::lock();
while(num == sizeof(buffer)) w_cond::wait(key);
write_unlocked (value);
int items = num;
key = 0;
return items;
}
//! Append a @[value] to the end of the fifo. If there is no more
//! room in the fifo then zero will be returned, otherwise the
//! number of items in the fifo after the write is returned.
//!
//! @seealso
//! @[read()]
//!
int try_write(mixed value)
{
if (num == sizeof (buffer)) return 0;
object key=lock::lock();
if (num == sizeof (buffer)) return 0;
write_unlocked (value);
int items = num;
key = 0;
return items;
}
//! @decl void create()
......@@ -168,10 +246,6 @@ optional class Fifo {
//! between @[Queue] and @[Fifo] is that @[Queue]
//! will never block in write(), only allocate more memory.
//!
//! @note
//! Queues are only available on systems with POSIX or UNIX or WIN32
//! thread support.
//!
//! @seealso
//! @[Fifo]
//!
......@@ -192,30 +266,106 @@ optional class Queue {
//! This function retrieves a value from the queue. Values will be
//! returned in the order they were written. If there are no values
//! present in the queue the current thread will sleep until some other
//! thread writes a value to the queue.
//! thread writes one.
//!
//! @seealso
//! @[write()]
//! @[try_read()], @[write()]
//!
mixed read()
{
mixed tmp;
object key=lock::lock();
while(!size()) r_cond::wait(key);
while(w_ptr == r_ptr) r_cond::wait(key);
tmp=buffer[r_ptr];
buffer[r_ptr++] = 0; // Throw away any references.
key=0;
return tmp;
}
//! This function retrieves a value from the queue if there is any
//! there. Values will be returned in the order they were written.
//! If there are no values present in the fifo then @[UNDEFINED]
//! will be returned.
//!
//! @seealso
//! @[write()]
//!
mixed try_read()
{
if (w_ptr == r_ptr) return UNDEFINED;
object key=lock::lock();
if (w_ptr == r_ptr) return UNDEFINED;
mixed tmp=buffer[r_ptr];
buffer[r_ptr++] = 0; // Throw away any references.
key=0;
return tmp;
}
static nomask array read_all_unlocked()
{
array ret;
switch (w_ptr - r_ptr) {
case 0:
ret = ({});
break;
case 1:
ret=buffer[r_ptr..r_ptr];
buffer[r_ptr++] = 0; // Throw away any references.
break;
default:
ret = buffer[r_ptr..w_ptr];
r_ptr = w_ptr = 0;
buffer=allocate(sizeof(buffer)); // Throw away any references.
break;
}
return ret;
}
//! This function returns all values in the queue as an array. The
//! values in the array will be in the order they were written. If
//! there are no values present in the queue the current thread will
//! sleep until some other thread writes one.
//!
//! @seealso
//! @[read()], @[try_read_array()]
//!
array read_array()
{
object key=lock::lock();
while (w_ptr == r_ptr) r_cond::wait(key);
array ret = read_all_unlocked();
key = 0;
return ret;
}
//! This function returns all values in the queue as an array but
//! doesn't wait if there are no values there. The values in the
//! array will be in the order they were written.
//!
//! @seealso
//! @[read_array()]
//!
array try_read_array()
{
if (w_ptr == r_ptr) return ({});
object key=lock::lock();
array ret = read_all_unlocked();
key = 0;
return ret;
}
//! This function puts a @[value] last in the queue. If the queue is
//! too small to hold the @[value] the queue will be expanded to make
//! room for it.
//! too small to hold the @[value] it will be expanded to make room.
//! The number of items in the queue after the write is returned.
//!
//! @seealso
//! @[read()]
//!
void write(mixed value)
int write(mixed value)
{
object key=lock::lock();
if(w_ptr >= sizeof(buffer))
......@@ -227,8 +377,10 @@ optional class Queue {
}
buffer[w_ptr] = value;
w_ptr++;
key=0; // Must free this one _before_ the signal...
int items = w_ptr - r_ptr;
r_cond::signal();
key=0;
return items;
}
static string _sprintf( int f )
......@@ -597,4 +749,160 @@ class Mutex
}
}
// Fallback implementation of Thread.Fifo.
class Fifo
{
array buffer;
int ptr, num;
int read_tres, write_tres;
int size() { return num; }
mixed read()
{
if (!num) error ("Deadlock detected - fifo empty.\n");
return try_read();
}
mixed try_read()
{
if (!num) return UNDEFINED;
mixed tmp=buffer[ptr];
buffer[ptr++] = 0; // Throw away any references.
ptr%=sizeof(buffer);
return tmp;
}
array read_array()
{
if (!num) error ("Deadlock detected - fifo empty.\n");
return try_read_array();
}
array try_read_array()
{
switch (num) {
case 0:
ret = ({});
break;
case 1:
ret=buffer[ptr..ptr];
buffer[ptr++] = 0; // Throw away any references.
ptr%=sizeof(buffer);
num = 0;
break;
default:
if (ptr+num < sizeof(buffer)) {
ret = buffer[ptr..ptr+num-1];
} else {
ret = buffer[ptr..]+buffer[..num-(sizeof(buffer)-ptr)-1];
}
ptr=num=0;
buffer=allocate(sizeof(buffer)); // Throw away any references.
break;
}
return ret;
}
int write(mixed value)
{
if (num == sizeof(buffer)) error ("Deadlock detected - fifo full.\n");
write_unlocked (value);
return num;
}
int try_write(mixed value)
{
if (num == sizeof (buffer)) return 0;
buffer[(ptr + num) % sizeof(buffer)] = value;
return ++num;
}
static void create(int|void size)
{
write_tres=0;
buffer=allocate(read_tres=size || 128);
}
static string _sprintf( int f )
{
return f=='O' && sprintf( "%O(%d / %d)", this_program,
size(), read_tres );
}
}
// Fallback implementation of Thread.Queue.
class Queue
{
array buffer=allocate(16);
int r_ptr, w_ptr;
int size() { return w_ptr - r_ptr; }
mixed read()
{
if (w_ptr == r_ptr) error ("Deadlock detected - queue empty.\n");
return try_read();
}
mixed try_read()
{
mixed tmp=buffer[r_ptr];
buffer[r_ptr++] = 0; // Throw away any references.
return tmp;
}
array read_array()
{
if (w_ptr == r_ptr) error ("Deadlock detected - queue empty.\n");
return try_read_array();
}
array try_read_array()
{
array ret;
switch (w_ptr - r_ptr) {
case 0:
ret = ({});
break;
case 1:
ret=buffer[r_ptr..r_ptr];
buffer[r_ptr++] = 0; // Throw away any references.
break;
default:
ret = buffer[r_ptr..w_ptr];
r_ptr = w_ptr = 0;
buffer=allocate(sizeof(buffer)); // Throw away any references.
break;
}
return ret;
}
int write(mixed value)
{
if(w_ptr >= sizeof(buffer))
{
buffer=buffer[r_ptr..];
buffer+=allocate(sizeof(buffer)+1);
w_ptr-=r_ptr;
r_ptr=0;
}
buffer[w_ptr] = value;
w_ptr++;
return w_ptr - r_ptr;
}
static string _sprintf( int f )
{
return f=='O' && sprintf( "%O(%d)", this_program, size() );
}
}
#endif /* !constant(thread_create) */
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment