#include "config.h" #include "machine.h" #include <sys/types.h> #include <sys/stat.h> #ifdef HAVE_SYS_FILE_H #include <sys/file.h> #endif /* HAVE_SYS_FILE_H */ #include <errno.h> #ifdef HAVE_SYS_MMAN_H #include <sys/mman.h> #else #ifdef HAVE_LINUX_MMAN_H #include <linux/mman.h> #else #ifdef HAVE_MMAP /* sys/mman.h is _probably_ there anyway. */ #include <sys/mman.h> #endif #endif #endif #include <fcntl.h> #include "global.h" RCSID("$Id: pipe.c,v 1.23 1998/05/17 18:53:03 grubba Exp $"); #include "threads.h" #include "stralloc.h" #include "pike_macros.h" #include "object.h" #include "constants.h" #include "interpret.h" #include "svalue.h" #include "error.h" #include "builtin_functions.h" #ifndef S_ISREG #ifdef S_IFREG #define S_ISREG(mode) (((mode) & (S_IFMT)) == (S_IFREG)) #else #define S_ISREG(mode) (((mode) & (_S_IFMT)) == (_S_IFREG)) #endif #endif /* #define PIPE_STRING_DEBUG #define PIPE_MMAP_DEBUG #define PIPE_FILE_DEBUG #define BLOCKING_CLOSE */ #ifndef SEEK_SET #ifdef L_SET #define SEEK_SET L_SET #else /* !L_SET */ #define SEEK_SET 0 #endif /* L_SET */ #endif /* SEEK_SET */ #ifndef SEEK_CUR #ifdef L_INCR #define SEEK_SET L_INCR #else /* !L_INCR */ #define SEEK_CUR 1 #endif /* L_INCR */ #endif /* SEEK_CUR */ #ifndef SEEK_END #ifdef L_XTND #define SEEK_END L_XTND #else /* !L_XTND */ #define SEEK_END 2 #endif /* L_XTND */ #endif /* SEEK_END */ #if 0 #define INSISTANT_WRITE #endif #ifndef MAP_FILE # define MAP_FILE 0 #endif #define READ_BUFFER_SIZE 65536 #define MAX_BYTES_IN_BUFFER 65536 /* * usage: * single socket output * or regular file output and (multiple, adding) socket output * with no mmap input * * multiple socket output without regular file output illegal */ static struct program *pipe_program, *output_program; #ifdef THIS #undef THIS #endif #define THIS ((struct pipe *)(fp->current_storage)) #define THISOBJ (fp->current_object) struct input { enum { I_NONE,I_OBJ,I_BLOCKING_OBJ,I_STRING,I_MMAP } type; union { struct object *obj; struct pike_string *str; char *mmap; } u; unsigned long len; /* current input: string or mmap len */ int set_blocking_offset, set_nonblocking_offset; struct input *next; }; struct output { struct object *obj; int write_offset, set_blocking_offset, set_nonblocking_offset; int fd; enum { O_RUN, /* waiting for callback */ O_SLEEP /* sleeping; waiting for more data */ } mode; unsigned long pos; /* position in buffer */ struct object *next; struct pipe *the_pipe; }; struct buffer { struct pike_string *s; struct buffer *next; }; struct pipe { int living_outputs; /* number of output objects */ struct svalue done_callback; struct svalue output_closed_callback; struct svalue id; /* * if fd is -1: use fd * else firstinput's type is I_MMAP: use firstinput's mmap * else use buffer */ int fd; /* buffer fd or -1 */ unsigned long bytes_in_buffer; unsigned long pos; /* fd: size of buffer file */ /* current position of first element (buffer or mmap) */ struct buffer *firstbuffer,*lastbuffer; short sleeping; /* sleeping; buffer is full */ short done; struct input *firstinput,*lastinput; struct object *firstoutput; unsigned long sent; }; static int offset_input_read_callback; static int offset_input_close_callback; static int offset_output_write_callback; static int offset_output_close_callback; static int mmapped, nobjects, nstrings, noutputs, ninputs, nbuffers, sbuffers; void close_and_free_everything(struct object *o,struct pipe *); static INLINE void output_finish(struct object *obj); static INLINE void output_try_write_some(struct object *obj); /********** internal ********************************************************/ /* Push a callback to this object given the internal function number. */ static void push_callback(int no) { add_ref(sp->u.object=THISOBJ); sp->subtype=no+fp->context.identifier_level; sp->type=T_FUNCTION; sp++; } /* Allocate a new struct input, link it last in the linked list */ static INLINE struct input *new_input(void) { struct input *i; ninputs++; i=ALLOC_STRUCT(input); i->type=I_NONE; i->next=NULL; if (THIS->lastinput) THIS->lastinput->next=i; else THIS->firstinput=i; THIS->lastinput=i; return i; } /* Free an input struct and all that it stands for */ static INLINE void free_input(struct input *i) { ninputs--; switch (i->type) { case I_OBJ: case I_BLOCKING_OBJ: if (!i->u.obj) break; if (i->u.obj->prog) { #ifdef BLOCKING_CLOSE apply_low(i->u.obj,i->set_blocking_offset,0); pop_stack(); #endif apply(i->u.obj,"close",0); pop_stack(); destruct(i->u.obj); } free_object(i->u.obj); nobjects--; i->u.obj=0; break; case I_STRING: free_string(i->u.str); nstrings--; break; case I_MMAP: #if defined(HAVE_MMAP) && defined(HAVE_MUNMAP) munmap(i->u.mmap,i->len); mmapped -= i->len; #else error("I_MMAP input when MMAP is diabled!"); #endif break; case I_NONE: break; } free((char *)i); } /* do the done_callback, then close and free everything */ static INLINE void pipe_done(void) { if (THIS->done_callback.type!=T_INT) { assign_svalue_no_free(sp++,&THIS->id); apply_svalue(&(THIS->done_callback),1); pop_stack(); if(!THISOBJ->prog) /* We will not free anything in this case. */ return; /* error("Pipe done callback destructed pipe.\n"); */ } close_and_free_everything(THISOBJ,THIS); } static void finished_p(void) { if(THIS->done) return; if(THIS->fd != -1) { if(THIS->living_outputs > 1) return; if(THIS->firstinput) return; }else{ if(THIS->living_outputs) return; } pipe_done(); } /* Allocate a new buffer and put it at the end of the chain of buffers * scheduled for output. Return 1 if we have more bytes in buffers * than allowed afterwards. */ static INLINE int append_buffer(struct pike_string *s) /* 1=buffer full */ { struct buffer *b; if(THIS->fd!= -1) { lseek(THIS->fd, THIS->pos, SEEK_SET); write(THIS->fd, s->str, s->len); THIS->pos+=s->len; return 0; } else { nbuffers++; b=ALLOC_STRUCT(buffer); b->next=NULL; b->s=s; sbuffers += s->len; add_ref(s); if (THIS->lastbuffer) THIS->lastbuffer->next=b; else THIS->firstbuffer=b; THIS->lastbuffer=b; THIS->bytes_in_buffer+=s->len; } return THIS->bytes_in_buffer > MAX_BYTES_IN_BUFFER; } /* Wake up the sleepers */ static void low_start(void) { struct object *obj, *next; struct output *o; add_ref(THISOBJ); /* dont kill yourself now */ for(obj=THIS->firstoutput;obj;obj=next) { add_ref(obj); /* Hang on PLEASE!! /hubbe */ o=(struct output *)(obj->storage); if (o->obj && o->mode==O_SLEEP) { if (!o->obj->prog) { output_finish(obj); } else { #if 0 push_int(0); push_callback(offset_output_write_callback); push_callback(offset_output_close_callback); apply_low(o->obj,o->set_nonblocking_offset,3); #endif output_try_write_some(obj); o->mode=O_RUN; /* Hubbe */ } } next=o->next; free_object(obj); } free_object(THISOBJ); } /* Read some data from the blocking object. * */ static int read_some_data(void) { struct pipe *this = THIS; struct input * i = this->firstinput; if (!i || i->type != I_BLOCKING_OBJ) { fatal("PIPE: read_some_data(): Bad input type!\n"); return -1; } push_int(8192); push_int(1); /* We don't care if we don't get all 8192 bytes. */ apply(i->u.obj, "read", 2); if ((sp[-1].type == T_STRING) && (sp[-1].u.string->len > 0)) { append_buffer(sp[-1].u.string); pop_stack(); THIS->sleeping = 1; return(1); /* Success */ } /* FIXME: Should we check the return value here? */ pop_stack(); /* EOF */ return(0); /* EOF */ } /* Let's guess what this function does.... * */ static INLINE void input_finish(void) { struct input *i; while(1) { /* Get the next input from the queue */ i=THIS->firstinput->next; free_input(THIS->firstinput); THIS->firstinput=i; if(!i) break; switch(i->type) { case I_OBJ: THIS->sleeping=0; push_callback(offset_input_read_callback); push_int(0); push_callback(offset_input_close_callback); apply_low(i->u.obj,i->set_nonblocking_offset,3); pop_stack(); return; case I_BLOCKING_OBJ: if (read_some_data()) return; continue; case I_MMAP: if (THIS->fd==-1) return; continue; case I_STRING: append_buffer(i->u.str); case I_NONE: break; } } THIS->sleeping=0; low_start(); finished_p(); } /* This function reads some data from the file cache.. * Called when we want some data to send. */ static INLINE struct pike_string* gimme_some_data(unsigned long pos) { struct buffer *b; long len; struct pipe *this = THIS; /* We have a file cache, read from it */ if (this->fd!=-1) { char buffer[READ_BUFFER_SIZE]; if (this->pos<=pos) return NULL; /* no data */ len=this->pos-pos; if (len>READ_BUFFER_SIZE) len=READ_BUFFER_SIZE; THREADS_ALLOW(); lseek(this->fd, pos, SEEK_SET); THREADS_DISALLOW(); do { THREADS_ALLOW(); len = read(this->fd, buffer, len); THREADS_DISALLOW(); if (len < 0) { if (errno != EINTR) { return(NULL); } } } while(len < 0); return make_shared_binary_string(buffer,len); } if (pos<this->pos) return make_shared_string("buffer underflow"); /* shit */ /* We want something in the next buffer */ while (this->firstbuffer && pos>=this->pos+this->firstbuffer->s->len) { /* Free the first buffer, and update THIS->pos */ b=this->firstbuffer; this->pos+=b->s->len; this->bytes_in_buffer-=b->s->len; this->firstbuffer=b->next; if (!b->next) this->lastbuffer=NULL; sbuffers-=b->s->len; nbuffers--; free_string(b->s); free((char *)b); /* Wake up first input if it was sleeping and we * have room for more in the buffer. */ if (this->sleeping && this->firstinput && this->bytes_in_buffer<MAX_BYTES_IN_BUFFER) { if (this->firstinput->type == I_BLOCKING_OBJ) { if (!read_some_data()) { this->sleeping = 0; input_finish(); } } else { this->sleeping=0; push_callback(offset_input_read_callback); push_int(0); push_callback(offset_input_close_callback); apply(this->firstinput->u.obj, "set_nonblocking", 3); pop_stack(); } } } while (!this->firstbuffer) { if (this->firstinput) { #if defined(HAVE_MMAP) && defined(HAVE_MUNMAP) if (this->firstinput->type==I_MMAP) { char *src; struct pike_string *tmp; if (pos >= this->firstinput->len + this->pos) /* end of mmap */ { this->pos += this->firstinput->len; input_finish(); continue; } len = this->firstinput->len + this->pos - pos; if (len > READ_BUFFER_SIZE) len=READ_BUFFER_SIZE; tmp = begin_shared_string( len ); src = this->firstinput->u.mmap + pos - this->pos; /* This thread_allow/deny is at the cost of one extra memory copy */ THREADS_ALLOW(); MEMCPY(tmp->str, src, len); THREADS_DISALLOW(); return end_shared_string(tmp); } else #endif if (this->firstinput->type!=I_OBJ) { /* FIXME: What about I_BLOCKING_OBJ? */ input_finish(); /* shouldn't be anything else ... maybe a finished object */ } } return NULL; /* no data */ } if (pos==this->pos) { add_ref(this->firstbuffer->s); return this->firstbuffer->s; } return make_shared_binary_string(this->firstbuffer->s->str+ pos-this->pos, this->firstbuffer->s->len- pos+this->pos); } /* * close and free the contents of a struct output * Note that the output struct is not freed or unlinked here, * that is taken care of later. */ static INLINE void output_finish(struct object *obj) { struct output *o; o=(struct output *)(obj->storage); if (o->obj) { if(o->obj->prog) { #ifdef BLOCKING_CLOSE apply_low(o->obj,o->set_blocking_offset,0); pop_stack(); #endif push_int(0); apply(o->obj,"set_id",1); pop_stack(); apply(o->obj,"close",0); pop_stack(); if(!THISOBJ->prog) error("Pipe done callback destructed pipe.\n"); destruct(o->obj); } free_object(o->obj); noutputs--; o->obj=NULL; THIS->living_outputs--; finished_p(); /* Moved by per, one line down.. :) */ free_object(THISOBJ); /* What? /Hubbe */ } } /* * Try to write some data to our precious output */ static INLINE void output_try_write_some(struct object *obj) { struct output *out; struct pike_string *s; unsigned long len; INT32 ret; out=(struct output*)(obj->storage); #ifdef INSISTANT_WRITE do { #endif /* Get some data to write */ s=gimme_some_data(out->pos); if (!s) /* out of data */ { /* out of data, goto sleep */ if (!THIS->firstinput || !out->obj->prog) /* end of life */ { output_finish(obj); } else { #if 0 apply_low(out->obj, out->set_blocking_offset, 0); pop_stack(); /* from apply */ #endif out->mode=O_SLEEP; } return; } len=s->len; push_string(s); apply_low(out->obj,out->write_offset,1); out->mode=O_RUN; ret=-1; if(sp[-1].type == T_INT) ret=sp[-1].u.integer; pop_stack(); if (ret==-1) /* error, byebye */ { output_finish(obj); return; } out->pos+=ret; THIS->sent+=ret; #ifdef INSISTANT_WRITE } while(ret == len); #endif } /********** methods *********************************************************/ /* Add an input to this pipe */ static void pipe_input(INT32 args) { struct input *i; int fd=-1; /* Per, one less warning to worry about... */ char *m; struct stat s; struct object *obj; if (args<1 || sp[-args].type != T_OBJECT) error("Bad/missing argument 1 to pipe->input().\n"); obj=sp[-args].u.object; if(!obj || !obj->prog) error("pipe->input() on destructed object.\n"); push_int(0); apply(sp[-args-1].u.object,"set_id", 1); pop_stack(); i=new_input(); #if defined(HAVE_MMAP) && defined(HAVE_MUNMAP) /* We do not handle mmaps if we have a buffer */ if(THIS->fd == -1) { apply(obj, "query_fd", 0); if(sp[-1].type == T_INT) fd=sp[-1].u.integer; pop_stack(); if (fd != -1 && fstat(fd,&s)==0) { int filep=lseek(fd, 0L, SEEK_CUR); /* keep the file pointer */ if(S_ISREG(s.st_mode) /* regular file */ && ((long)(m=(char *)mmap(0,s.st_size - filep,PROT_READ, MAP_FILE|MAP_SHARED,fd,filep))!=-1)) { #ifdef HAVE_GETEUID int ou = 0; #endif mmapped += s.st_size; i->type=I_MMAP; i->len=s.st_size; i->u.mmap=m; #ifdef HAVE_MADVISE /* Mark the pages as sequential read only access... */ /* NOTE: * * Potential race-condition with other threads */ #ifdef HAVE_GETEUID if((ou=geteuid()) && !getuid()) { #ifdef HAVE_SETEUID seteuid(0); #else /* ! HAVE_SETEUID */ #ifdef HAVE_SETREUID setresuid(-1, 0, -1); #endif /* HAVE_SETRESUID */ #endif /* HAVE_SETEUID */ } #endif madvise(m, s.st_size, MADV_SEQUENTIAL); #ifdef HAVE_GETEUID if(ou) { #ifdef HAVE_SETEUID seteuid(ou); #else /* ! HAVE_SETEUID */ #ifdef HAVE_SETREUID setresuid(-1, ou, -1); #endif /* HAVE_SETRESUID */ #endif /* HAVE_SETEUID */ } #endif #endif pop_n_elems(args); push_int(0); return; } } } #endif i->u.obj=obj; nobjects++; i->type=I_OBJ; add_ref(i->u.obj); i->set_nonblocking_offset=find_identifier("set_nonblocking",i->u.obj->prog); i->set_blocking_offset=find_identifier("set_blocking",i->u.obj->prog); if (i->set_nonblocking_offset<0 || i->set_blocking_offset<0) { if (find_identifier("read", i->u.obj->prog) < 0) { /* Not even a read function */ free_object(i->u.obj); i->u.obj=NULL; i->type=I_NONE; nobjects--; error("illegal file object%s%s\n", ((i->set_nonblocking_offset<0)?"; no set_nonblocking":""), ((i->set_blocking_offset<0)?"; no set_blocking":"")); } else { /* Try blocking mode */ i->type = I_BLOCKING_OBJ; if (i==THIS->firstinput) { read_some_data(); } return; } } if (i==THIS->firstinput) { push_callback(offset_input_read_callback); push_int(0); push_callback(offset_input_close_callback); apply_low(i->u.obj,i->set_nonblocking_offset,3); pop_stack(); } else { /* DOESN'T WORK!!! */ push_int(0); push_int(0); push_callback(offset_input_close_callback); apply_low(i->u.obj,i->set_nonblocking_offset,3); pop_stack(); } pop_n_elems(args); push_int(0); } static void pipe_write(INT32 args) { struct input *i; if (args<1 || sp[-args].type!=T_STRING) error("illegal argument to pipe->write()\n"); if (!THIS->firstinput) { append_buffer(sp[-args].u.string); pop_n_elems(args); push_int(0); return; } i=new_input(); i->type=I_STRING; nstrings++; add_ref(i->u.str=sp[-args].u.string); pop_n_elems(args-1); } void f_mark_fd(INT32 args); static void pipe_output(INT32 args) { struct object *obj; struct output *o; int fd; struct stat s; struct buffer *b; if (args<1 || sp[-args].type != T_OBJECT || !sp[-args].u.object || !sp[-args].u.object->prog) error("Bad/missing argument 1 to pipe->output().\n"); if (THIS->fd==-1) /* no buffer */ { /* test if usable as buffer */ apply(sp[-args].u.object,"query_fd",0); if ((sp[-1].type==T_INT) && (fd=sp[-1].u.integer)>=0 && (fstat(fd,&s)==0) && S_ISREG(s.st_mode) && (THIS->fd=dup(fd))!=-1 ) { /* keep the file pointer of the duped fd */ THIS->pos=lseek(fd, 0L, SEEK_CUR); #if 0 /* This won't work if the spider-module is dynamically linked. */ push_int(THIS->fd); push_string(make_shared_string("pipe.c: file buffer")); f_mark_fd(2); pop_stack(); #endif /* 0 */ THIS->living_outputs++; while (THIS->firstbuffer) { b=THIS->firstbuffer; THIS->firstbuffer=b->next; lseek(THIS->fd, THIS->pos, SEEK_SET); write(THIS->fd,b->s->str,b->s->len); sbuffers-=b->s->len; nbuffers--; free_string(b->s); free((char *)b); } THIS->lastbuffer=NULL; /* keep the file pointer of the duped fd THIS->pos=0; */ push_int(0); apply(sp[-args-2].u.object,"set_id", 1); pop_n_elems(args+2); /* ... and from apply x 2 */ return; } pop_stack(); /* from apply */ } THIS->living_outputs++; add_ref(THISOBJ); /* Weird */ /* Allocate a new struct output */ obj=clone_object(output_program,0); o=(struct output *)(obj->storage); o->next=THIS->firstoutput; THIS->firstoutput=obj; noutputs++; o->obj=NULL; add_ref(o->obj=sp[-args].u.object); o->write_offset=find_identifier("write",o->obj->prog); o->set_nonblocking_offset=find_identifier("set_nonblocking",o->obj->prog); o->set_blocking_offset=find_identifier("set_blocking",o->obj->prog); if (o->write_offset<0 || o->set_nonblocking_offset<0 || o->set_blocking_offset<0) { free_object(o->obj); error("illegal file object%s%s%s\n", ((o->write_offset<0)?"; no write":""), ((o->set_nonblocking_offset<0)?"; no set_nonblocking":""), ((o->set_blocking_offset<0)?"; no set_blocking":"")); } o->mode=O_RUN; /* keep the file pointer of the duped fd o->pos=0; */ o->pos=THIS->pos; ref_push_object(obj); apply(o->obj,"set_id",1); pop_stack(); push_int(0); push_callback(offset_output_write_callback); push_callback(offset_output_close_callback); apply_low(o->obj,o->set_nonblocking_offset,3); pop_stack(); pop_n_elems(args-1); } static void pipe_set_done_callback(INT32 args) { if (args==0) { free_svalue(&THIS->done_callback); THIS->done_callback.type=T_INT; return; } if (args<1 || (sp[-args].type!=T_FUNCTION && sp[-args].type!=T_ARRAY)) error("Illegal argument to set_done_callback()\n"); if (args>1) { free_svalue(&THIS->id); assign_svalue_no_free(&(THIS->id),sp-args+1); } free_svalue(&THIS->done_callback); assign_svalue_no_free(&(THIS->done_callback),sp-args); pop_n_elems(args-1); } static void pipe_set_output_closed_callback(INT32 args) { if (args==0) { free_svalue(&THIS->done_callback); THIS->output_closed_callback.type=T_INT; return; } if (args<1 || (sp[-args].type!=T_FUNCTION && sp[-args].type!=T_ARRAY)) error("Illegal argument to set_output_closed_callback()\n"); if (args>1) { free_svalue(&THIS->id); assign_svalue_no_free(&(THIS->id),sp-args+1); } free_svalue(&THIS->output_closed_callback); assign_svalue_no_free(&(THIS->output_closed_callback),sp-args); pop_n_elems(args-1); } static void pipe_finish(INT32 args) { pop_n_elems(args); push_int(0); pipe_done(); } static void pipe_start(INT32 args) /* force start */ { low_start(); if(args) pop_n_elems(args-1); } static void f_bytes_sent(INT32 args) { pop_n_elems(args); push_int(THIS->sent); } /********** callbacks *******************************************************/ static void pipe_write_output_callback(INT32 args) { if (args<1 || sp[-args].type!=T_OBJECT) error("Illegal argument to pipe->write_output_callback\n"); if(!sp[-args].u.object->prog) return; if(sp[-args].u.object->prog != output_program) error("Illegal argument to pipe->write_output_callback\n"); output_try_write_some(sp[-args].u.object); pop_n_elems(args-1); } static void pipe_close_output_callback(INT32 args) { struct output *o; if (args<1 || sp[-args].type!=T_OBJECT) if(!sp[-args].u.object->prog) return; if(sp[-args].u.object->prog != output_program) error("Illegal argument to pipe->close_output_callback\n"); o=(struct output *)(sp[-args].u.object->storage); if (THIS->output_closed_callback.type!=T_INT) { assign_svalue_no_free(sp++,&THIS->id); push_object(o->obj); apply_svalue(&(THIS->output_closed_callback),2); pop_stack(); } output_finish(sp[-args].u.object); pop_n_elems(args-1); } static void pipe_read_input_callback(INT32 args) { struct input *i; struct pike_string *s; if (args<2 || sp[1-args].type!=T_STRING) error("Illegal argument to pipe->read_input_callback\n"); i=THIS->firstinput; if (!i) error("Pipe read callback without any inputs left.\n"); s=sp[1-args].u.string; if(append_buffer(s)) { /* THIS DOES NOT WORK */ push_int(0); push_int(0); push_callback(offset_input_close_callback); apply_low(i->u.obj,i->set_nonblocking_offset,3); pop_stack(); THIS->sleeping=1; } low_start(); pop_n_elems(args-1); } static void pipe_close_input_callback(INT32 args) { struct input *i; i=THIS->firstinput; if(!i) error("Input close callback without inputs!\n"); if(i->type != I_OBJ) error("Premature close callback on pipe!.\n"); if (i->u.obj->prog) { #ifdef BLOCKING_CLOSE apply_low(i->u.obj,i->set_blocking_offset,0); pop_stack(); #endif apply(i->u.obj,"close",0); pop_stack(); } nobjects--; free_object(i->u.obj); i->type=I_NONE; input_finish(); if(args) pop_n_elems(args-1); } static void pipe_version(INT32 args) { pop_n_elems(args); push_string(make_shared_string("PIPE ver 2.0")); } /********** init/exit *******************************************************/ void close_and_free_everything(struct object *thisobj,struct pipe *p) { struct buffer *b; struct input *i; struct output *o; struct object *obj; p->done=1; if (thisobj) add_ref(thisobj); /* don't kill object during this */ while (p->firstbuffer) { b=p->firstbuffer; p->firstbuffer=b->next; sbuffers-=b->s->len; nbuffers--; free_string(b->s); b->next=NULL; free((char *)b); /* Hubbe */ } p->lastbuffer=NULL; while (p->firstinput) { i=p->firstinput; p->firstinput=i->next; free_input(i); } p->lastinput=NULL; while (p->firstoutput) { obj=p->firstoutput; o=(struct output *)(obj->storage); p->firstoutput=o->next; output_finish(obj); free_object(obj); } if (p->fd!=-1) { close(p->fd); p->fd=-1; } p->living_outputs=0; if (thisobj) free_object(thisobj); free_svalue(& p->done_callback); free_svalue(& p->output_closed_callback); free_svalue(& p->id); p->done_callback.type=T_INT; p->output_closed_callback.type=T_INT; p->id.type=T_INT; p->done=0; } static void init_pipe_struct(struct object *o) { THIS->firstbuffer=THIS->lastbuffer=NULL; THIS->firstinput=THIS->lastinput=NULL; THIS->firstoutput=NULL; THIS->bytes_in_buffer=0; THIS->pos=0; THIS->sleeping=0; THIS->done=0; THIS->fd=-1; THIS->done_callback.type=T_INT; THIS->output_closed_callback.type=T_INT; THIS->id.type=T_INT; THIS->id.u.integer=0; THIS->living_outputs=0; THIS->sent=0; } static void exit_pipe_struct(struct object *o) { close_and_free_everything(NULL,THIS); } static void exit_output_struct(struct object *obj) { struct output *o; o=(struct output *)(fp->current_storage); if (o->obj) { if(o->obj->prog) { #ifdef BLOCKING_CLOSE apply_low(o->obj,o->set_blocking_offset,0); pop_stack(); #endif push_int(0); apply(o->obj,"set_id",1); pop_stack(); apply(o->obj,"close",0); pop_stack(); if(!THISOBJ->prog) error("Pipe done callback destructed pipe.\n"); } free_object(o->obj); noutputs--; o->obj=0; } } static void init_output_struct(struct object *ob) { struct output *o; o=(struct output *)(fp->current_storage); o->obj=0; } /********** Pike init *******************************************************/ void port_setup_program(void); void f__pipe_debug(INT32 args) { pop_n_elems(args); push_int(noutputs); push_int(ninputs); push_int(nstrings); push_int(nobjects); push_int(mmapped); push_int(nbuffers); push_int(sbuffers); f_aggregate(7); } void pike_module_init(void) { start_new_program(); add_storage(sizeof(struct pipe)); add_efun("_pipe_debug", f__pipe_debug, "function(:array)", 0); add_function("input",pipe_input,"function(object:void)",0); add_function("output",pipe_output,"function(object:void)",0); add_function("write",pipe_write,"function(string:void)",0); add_function("start",pipe_start,"function(:void)",0); add_function("finish",pipe_finish,"function(:void)",0); add_function("set_output_closed_callback",pipe_set_output_closed_callback, "function(void|function(mixed,object:mixed),void|mixed:void)",0); add_function("set_done_callback",pipe_set_done_callback, "function(void|function(mixed:mixed),void|mixed:void)",0); add_function("_output_close_callback",pipe_close_output_callback, "function(int:void)",0); add_function("_input_close_callback",pipe_close_input_callback, "function(int:void)",0); add_function("_output_write_callback",pipe_write_output_callback, "function(int:void)",0); add_function("_input_read_callback",pipe_read_input_callback, "function(int,string:void)",0); add_function("version",pipe_version,"function(:string)",0); add_function("bytes_sent",f_bytes_sent,"function(:int)",0); set_init_callback(init_pipe_struct); set_exit_callback(exit_pipe_struct); pipe_program=end_program(); add_program_constant("pipe",pipe_program, 0); offset_output_close_callback=find_identifier("_output_close_callback", pipe_program); offset_input_close_callback=find_identifier("_input_close_callback", pipe_program); offset_output_write_callback=find_identifier("_output_write_callback", pipe_program); offset_input_read_callback=find_identifier("_input_read_callback", pipe_program); start_new_program(); add_storage(sizeof(struct output)); set_init_callback(init_output_struct); set_exit_callback(exit_output_struct); output_program=end_program(); add_program_constant("__output",output_program, 0); } void pike_module_exit(void) { if(pipe_program) free_program(pipe_program); pipe_program=0; if(output_program) free_program(output_program); output_program=0; }