diff --git a/src/modules/Pipe/pipe.c b/src/modules/Pipe/pipe.c index e3839f4ff38fa244c9dcd85ea542536792b092ad..388f503bf0cf5ef36a2ee886d5f86d65c30c337b 100644 --- a/src/modules/Pipe/pipe.c +++ b/src/modules/Pipe/pipe.c @@ -1,14 +1,9 @@ -#include "global.h" -#include "config.h" +#include <config.h> #include "machine.h" #include <sys/types.h> #include <sys/stat.h> -#ifdef HAVE_SYS_FILE_H -#include <sys/file.h> -#endif /* HAVE_SYS_FILE_H */ - #include <errno.h> #ifdef HAVE_SYS_MMAN_H @@ -26,9 +21,9 @@ #include <fcntl.h> -RCSID("$Id: pipe.c,v 1.25 1998/06/12 19:20:45 noring Exp $"); +#include "global.h" +RCSID("$Id: pipe.c,v 1.11 1998/07/19 03:57:13 hubbe Exp $"); -#include "threads.h" #include "stralloc.h" #include "pike_macros.h" #include "object.h" @@ -53,29 +48,6 @@ RCSID("$Id: pipe.c,v 1.25 1998/06/12 19:20:45 noring Exp $"); #define BLOCKING_CLOSE */ -#ifndef SEEK_SET -#ifdef L_SET -#define SEEK_SET L_SET -#else /* !L_SET */ -#define SEEK_SET 0 -#endif /* L_SET */ -#endif /* SEEK_SET */ -#ifndef SEEK_CUR -#ifdef L_INCR -#define SEEK_SET L_INCR -#else /* !L_INCR */ -#define SEEK_CUR 1 -#endif /* L_INCR */ -#endif /* SEEK_CUR */ -#ifndef SEEK_END -#ifdef L_XTND -#define SEEK_END L_XTND -#else /* !L_XTND */ -#define SEEK_END 2 -#endif /* L_XTND */ -#endif /* SEEK_END */ - - #if 0 #define INSISTANT_WRITE #endif @@ -84,7 +56,7 @@ RCSID("$Id: pipe.c,v 1.25 1998/06/12 19:20:45 noring Exp $"); # define MAP_FILE 0 #endif -#define READ_BUFFER_SIZE 65536 +#define READ_BUFFER_SIZE 32768 #define MAX_BYTES_IN_BUFFER 65536 /* @@ -98,15 +70,12 @@ RCSID("$Id: pipe.c,v 1.25 1998/06/12 19:20:45 noring Exp $"); static struct program *pipe_program, *output_program; -#ifdef THIS -#undef THIS -#endif #define THIS ((struct pipe *)(fp->current_storage)) #define THISOBJ (fp->current_object) struct input { - enum { I_NONE,I_OBJ,I_BLOCKING_OBJ,I_STRING,I_MMAP } type; + enum { I_NONE,I_OBJ,I_STRING,I_MMAP } type; union { struct object *obj; @@ -175,6 +144,8 @@ static int offset_output_write_callback; static int offset_output_close_callback; static int mmapped, nobjects, nstrings, noutputs, ninputs, nbuffers, sbuffers; +static char static_buffer[READ_BUFFER_SIZE]; + void close_and_free_everything(struct object *o,struct pipe *); static INLINE void output_finish(struct object *obj); static INLINE void output_try_write_some(struct object *obj); @@ -185,7 +156,8 @@ static INLINE void output_try_write_some(struct object *obj); */ static void push_callback(int no) { - add_ref(sp->u.object=THISOBJ); + sp->u.object=THISOBJ; + THISOBJ->refs++; sp->subtype=no+fp->context.identifier_level; sp->type=T_FUNCTION; sp++; @@ -214,7 +186,6 @@ static INLINE void free_input(struct input *i) switch (i->type) { case I_OBJ: - case I_BLOCKING_OBJ: if (!i->u.obj) break; if (i->u.obj->prog) { @@ -292,8 +263,8 @@ static INLINE int append_buffer(struct pike_string *s) if(THIS->fd!= -1) { - lseek(THIS->fd, THIS->pos, SEEK_SET); - write(THIS->fd, s->str, s->len); + lseek(THIS->fd,THIS->pos,0); + write(THIS->fd,s->str,s->len); THIS->pos+=s->len; return 0; } @@ -304,7 +275,7 @@ static INLINE int append_buffer(struct pike_string *s) b->next=NULL; b->s=s; sbuffers += s->len; - add_ref(s); + s->refs++; if (THIS->lastbuffer) THIS->lastbuffer->next=b; @@ -325,10 +296,10 @@ static void low_start(void) struct output *o; - add_ref(THISOBJ); /* dont kill yourself now */ + THISOBJ->refs++; /* dont kill yourself now */ for(obj=THIS->firstoutput;obj;obj=next) { - add_ref(obj); /* Hang on PLEASE!! /hubbe */ + obj->refs++; /* Hang on PLEASE!! /hubbe */ o=(struct output *)(obj->storage); if (o->obj && o->mode==O_SLEEP) { @@ -355,34 +326,6 @@ static void low_start(void) free_object(THISOBJ); } -/* Read some data from the blocking object. - * - */ -static int read_some_data(void) -{ - struct pipe *this = THIS; - struct input * i = this->firstinput; - - if (!i || i->type != I_BLOCKING_OBJ) { - fatal("PIPE: read_some_data(): Bad input type!\n"); - return -1; - } - push_int(8192); - push_int(1); /* We don't care if we don't get all 8192 bytes. */ - apply(i->u.obj, "read", 2); - if ((sp[-1].type == T_STRING) && (sp[-1].u.string->len > 0)) { - append_buffer(sp[-1].u.string); - pop_stack(); - THIS->sleeping = 1; - return(1); /* Success */ - } - - /* FIXME: Should we check the return value here? */ - pop_stack(); - /* EOF */ - return(0); /* EOF */ -} - /* Let's guess what this function does.... * */ @@ -392,7 +335,6 @@ static INLINE void input_finish(void) while(1) { - /* Get the next input from the queue */ i=THIS->firstinput->next; free_input(THIS->firstinput); THIS->firstinput=i; @@ -410,11 +352,6 @@ static INLINE void input_finish(void) pop_stack(); return; - case I_BLOCKING_OBJ: - if (read_some_data()) - return; - continue; - case I_MMAP: if (THIS->fd==-1) return; continue; @@ -438,45 +375,38 @@ static INLINE struct pike_string* gimme_some_data(unsigned long pos) { struct buffer *b; long len; - struct pipe *this = THIS; /* We have a file cache, read from it */ - if (this->fd!=-1) + if (THIS->fd!=-1) { - char buffer[READ_BUFFER_SIZE]; - - if (this->pos<=pos) return NULL; /* no data */ - len=this->pos-pos; + if (THIS->pos<=pos) return NULL; /* no data */ + len=THIS->pos-pos; if (len>READ_BUFFER_SIZE) len=READ_BUFFER_SIZE; - THREADS_ALLOW(); - lseek(this->fd, pos, SEEK_SET); - THREADS_DISALLOW(); + lseek(THIS->fd,pos,0); /* SEEK_SET */ do { - THREADS_ALLOW(); - len = read(this->fd, buffer, len); - THREADS_DISALLOW(); + len = read(THIS->fd,static_buffer,len); if (len < 0) { if (errno != EINTR) { return(NULL); } } } while(len < 0); - return make_shared_binary_string(buffer,len); + return make_shared_binary_string(static_buffer,len); } - if (pos<this->pos) + if (pos<THIS->pos) return make_shared_string("buffer underflow"); /* shit */ /* We want something in the next buffer */ - while (this->firstbuffer && pos>=this->pos+this->firstbuffer->s->len) + while (THIS->firstbuffer && pos>=THIS->pos+THIS->firstbuffer->s->len) { /* Free the first buffer, and update THIS->pos */ - b=this->firstbuffer; - this->pos+=b->s->len; - this->bytes_in_buffer-=b->s->len; - this->firstbuffer=b->next; + b=THIS->firstbuffer; + THIS->pos+=b->s->len; + THIS->bytes_in_buffer-=b->s->len; + THIS->firstbuffer=b->next; if (!b->next) - this->lastbuffer=NULL; + THIS->lastbuffer=NULL; sbuffers-=b->s->len; nbuffers--; free_string(b->s); @@ -485,72 +415,57 @@ static INLINE struct pike_string* gimme_some_data(unsigned long pos) /* Wake up first input if it was sleeping and we * have room for more in the buffer. */ - if (this->sleeping && - this->firstinput && - this->bytes_in_buffer<MAX_BYTES_IN_BUFFER) + if (THIS->sleeping && + THIS->firstinput && + THIS->bytes_in_buffer<MAX_BYTES_IN_BUFFER) { - if (this->firstinput->type == I_BLOCKING_OBJ) { - if (!read_some_data()) { - this->sleeping = 0; - input_finish(); - } - } else { - this->sleeping=0; - push_callback(offset_input_read_callback); - push_int(0); - push_callback(offset_input_close_callback); - apply(this->firstinput->u.obj, "set_nonblocking", 3); - pop_stack(); - } + THIS->sleeping=0; + push_callback(offset_input_read_callback); + push_int(0); + push_callback(offset_input_close_callback); + apply(THIS->firstinput->u.obj, "set_nonblocking", 3); + pop_stack(); } } - while (!this->firstbuffer) + while (!THIS->firstbuffer) { - if (this->firstinput) + if (THIS->firstinput) { #if defined(HAVE_MMAP) && defined(HAVE_MUNMAP) - if (this->firstinput->type==I_MMAP) + if (THIS->firstinput->type==I_MMAP) { - char *src; - struct pike_string *tmp; - - if (pos >= this->firstinput->len + this->pos) /* end of mmap */ + if (pos >= THIS->firstinput->len + THIS->pos) /* end of mmap */ { - this->pos += this->firstinput->len; + THIS->pos+=THIS->firstinput->len; input_finish(); continue; } - len = this->firstinput->len + this->pos - pos; - if (len > READ_BUFFER_SIZE) len=READ_BUFFER_SIZE; - tmp = begin_shared_string( len ); - src = this->firstinput->u.mmap + pos - this->pos; -/* This thread_allow/deny is at the cost of one extra memory copy */ - THREADS_ALLOW(); - MEMCPY(tmp->str, src, len); - THREADS_DISALLOW(); - return end_shared_string(tmp); + len=THIS->firstinput->len+THIS->pos-pos; + if (len>READ_BUFFER_SIZE) len=READ_BUFFER_SIZE; + return make_shared_binary_string(THIS->firstinput->u.mmap+ + pos-THIS->pos, + len); } else #endif - if (this->firstinput->type!=I_OBJ) + if (THIS->firstinput->type!=I_OBJ) { - /* FIXME: What about I_BLOCKING_OBJ? */ input_finish(); /* shouldn't be anything else ... maybe a finished object */ } } return NULL; /* no data */ } - if (pos==this->pos) + if (pos==THIS->pos) { - add_ref(this->firstbuffer->s); - return this->firstbuffer->s; + THIS->firstbuffer->s->refs++; + return THIS->firstbuffer->s; } - return make_shared_binary_string(this->firstbuffer->s->str+ - pos-this->pos, - this->firstbuffer->s->len- - pos+this->pos); + return make_shared_binary_string(THIS->firstbuffer->s->str+ + pos-THIS->pos, + THIS->firstbuffer->s->len- + pos+THIS->pos); } @@ -692,46 +607,12 @@ static void pipe_input(INT32 args) && ((long)(m=(char *)mmap(0,s.st_size - filep,PROT_READ, MAP_FILE|MAP_SHARED,fd,filep))!=-1)) { -#ifdef HAVE_GETEUID - int ou = 0; -#endif mmapped += s.st_size; i->type=I_MMAP; i->len=s.st_size; i->u.mmap=m; -#ifdef HAVE_MADVISE - /* Mark the pages as sequential read only access... */ - - /* NOTE: - * - * Potential race-condition with other threads - */ - -#ifdef HAVE_GETEUID - if((ou=geteuid()) && !getuid()) { -#ifdef HAVE_SETEUID - seteuid(0); -#else /* ! HAVE_SETEUID */ -#ifdef HAVE_SETRESUID - setresuid(-1, 0, -1); -#endif /* HAVE_SETRESUID */ -#endif /* HAVE_SETEUID */ - } -#endif - madvise(m, s.st_size, MADV_SEQUENTIAL); -#ifdef HAVE_GETEUID - if(ou) { -#ifdef HAVE_SETEUID - seteuid(ou); -#else /* ! HAVE_SETEUID */ -#ifdef HAVE_SETRESUID - setresuid(-1, ou, -1); -#endif /* HAVE_SETRESUID */ -#endif /* HAVE_SETEUID */ - } -#endif -#endif + pop_n_elems(args); push_int(0); return; @@ -743,31 +624,21 @@ static void pipe_input(INT32 args) i->u.obj=obj; nobjects++; i->type=I_OBJ; - add_ref(i->u.obj); + i->u.obj->refs++; i->set_nonblocking_offset=find_identifier("set_nonblocking",i->u.obj->prog); i->set_blocking_offset=find_identifier("set_blocking",i->u.obj->prog); if (i->set_nonblocking_offset<0 || i->set_blocking_offset<0) { - if (find_identifier("read", i->u.obj->prog) < 0) { - /* Not even a read function */ - free_object(i->u.obj); - i->u.obj=NULL; - i->type=I_NONE; - - nobjects--; - error("illegal file object%s%s\n", - ((i->set_nonblocking_offset<0)?"; no set_nonblocking":""), - ((i->set_blocking_offset<0)?"; no set_blocking":"")); - } else { - /* Try blocking mode */ - i->type = I_BLOCKING_OBJ; - if (i==THIS->firstinput) { - read_some_data(); - } - return; - } + free_object(i->u.obj); + i->u.obj=NULL; + i->type=I_NONE; + + nobjects--; + error("illegal file object%s%s\n", + ((i->set_nonblocking_offset<0)?"; no set_nonblocking":""), + ((i->set_blocking_offset<0)?"; no set_blocking":"")); } if (i==THIS->firstinput) @@ -810,7 +681,8 @@ static void pipe_write(INT32 args) i=new_input(); i->type=I_STRING; nstrings++; - add_ref(i->u.str=sp[-args].u.string); + i->u.str=sp[-args].u.string; + i->u.str->refs++; pop_n_elems(args-1); } @@ -858,7 +730,7 @@ static void pipe_output(INT32 args) { b=THIS->firstbuffer; THIS->firstbuffer=b->next; - lseek(THIS->fd, THIS->pos, SEEK_SET); + lseek(THIS->fd,THIS->pos,0); write(THIS->fd,b->s->str,b->s->len); sbuffers-=b->s->len; nbuffers--; @@ -878,7 +750,7 @@ static void pipe_output(INT32 args) } THIS->living_outputs++; - add_ref(THISOBJ); /* Weird */ + THISOBJ->refs++; /* Weird */ /* Allocate a new struct output */ obj=clone_object(output_program,0); @@ -888,7 +760,8 @@ static void pipe_output(INT32 args) noutputs++; o->obj=NULL; - add_ref(o->obj=sp[-args].u.object); + o->obj=sp[-args].u.object; + o->obj->refs++; o->write_offset=find_identifier("write",o->obj->prog); o->set_nonblocking_offset=find_identifier("set_nonblocking",o->obj->prog); @@ -909,7 +782,8 @@ static void pipe_output(INT32 args) o->pos=0; */ o->pos=THIS->pos; - ref_push_object(obj); + push_object(obj); + obj->refs++; apply(o->obj,"set_id",1); pop_stack(); @@ -1102,7 +976,7 @@ void close_and_free_everything(struct object *thisobj,struct pipe *p) p->done=1; if (thisobj) - add_ref(thisobj); /* don't kill object during this */ + thisobj->refs++; /* don't kill object during this */ while (p->firstbuffer) {