diff --git a/src/Makefile.src b/src/Makefile.src index 9e0ae0db44334a073c415532618e4881086ba242..374e6ca126e31e5f3811007d13cbc4041de796ec 100644 --- a/src/Makefile.src +++ b/src/Makefile.src @@ -100,6 +100,7 @@ OBJ= \ rusage.o \ stralloc.o \ stuff.o \ + threads.o \ svalue.o @EXTRA_OBJS@ # diff --git a/src/backend.c b/src/backend.c index b0017f348942f7a3c86b77ca9794a349efcd9e40..391c8a314779450c26e86c4f0770599aecbc9d77 100644 --- a/src/backend.c +++ b/src/backend.c @@ -18,6 +18,7 @@ #include "fd_control.h" #include "main.h" #include "callback.h" +#include "threads.h" #ifdef HAVE_SYS_SELECT_H #include <sys/select.h> @@ -214,9 +215,10 @@ void backend() next_timeout.tv_sec = 0; } + THREADS_ALLOW(); i=select(max_fd+1, &sets.read, &sets.write, 0, &next_timeout); - GETTIMEOFDAY(¤t_time); + THREADS_DISALLOW(); check_threads_etc(); diff --git a/src/builtin_functions.c b/src/builtin_functions.c index f422cf214c426fbce4440643cc0553b41df02843..65a0fd635255d42966e5bc3fed4a743e26b21907 100644 --- a/src/builtin_functions.c +++ b/src/builtin_functions.c @@ -25,6 +25,7 @@ #include "backend.h" #include "main.h" #include "memory.h" +#include "threads.h" #include "time_stuff.h" #include <math.h> #include <ctype.h> @@ -1178,7 +1179,9 @@ void f_sleep(INT32 args) t3=t1; my_subtract_timeval(&t3, &t2); + THREADS_ALLOW(); select(0,0,0,0,&t3); + THREADS_DISALLOW(); check_threads_etc(); } } diff --git a/src/configure.in b/src/configure.in index 6ed6c20eaf96f10729bbefd6683c01dc291405c1..611db64d847f96ec0d9ab882abc7527f27de6121 100644 --- a/src/configure.in +++ b/src/configure.in @@ -245,7 +245,7 @@ AC_HEADER_TIME AC_HEADER_STDC AC_CHECK_HEADERS(sys/rusage.h time.h sys/time.h sys/types.h unistd.h stdlib.h \ memory.h values.h string.h fcntl.h sys/filio.h sys/sockio.h crypt.h locale.h \ -sys/resource.h sys/select.h sys/mman.h setjmp.h limits.h) +sys/resource.h sys/select.h sys/mman.h setjmp.h limits.h pthread.h) AC_SIZEOF_TYPE(char *) AC_SIZEOF_TYPE(long) @@ -265,6 +265,7 @@ dnl AC_CHECK_LIB(PW, alloca) dnl AC_CHECK_LIB(crypt, crypt) AC_CHECK_LIB(m, floor) + if test "${ac_cv_lib_m}" = "no" -a "${pike_cv_sys_os}" = "Linux"; then AC_MSG_WARN(I will compensate for this by adding -lc -lm) LIBS="${LIBS} -lc -lm" @@ -272,7 +273,53 @@ fi AC_CHECK_LIB(crypt, crypt) -OLD_LIBOBJS="${LIBOBJS}" + +######################################################################## + +AC_MSG_CHECKING(posix threads) + +OLDLIBS="$LIBS" + +case "x$pike_cv_sys_os" in + xOSF1) + LIBS="${LIBS} -lpthread -lmach -lexc -lc" + ;; + + *) + LIBS="${LIBS} -lpthread" + ;; +esac + +AC_CACHE_VAL(pike_cv_posix_threads, +[ +AC_TRY_RUN([ +#include <pthread.h> + +void *foo(void *bar) { return ; } +pthread_mutex_t tmp; +pthread_t gazonk; + +int main() +{ + void *sune; + pthread_mutex_lock(& tmp); + pthread_create(&gazonk,0,foo,0); + pthread_join(gazonk,&sune); + exit(0); +} +],pike_cv_posix_threads=yes,pike_cv_posix_threads=no) +]) + +if test $pike_cv_posix_threads = yes ; then + AC_MSG_RESULT(yes) + AC_DEFINE(_REENTRANT) + AC_DEFINE(_THREAD_SAFE) +else + AC_MSG_RESULT(no) + LIBS="$OLDLIBS" +fi + +######################################################################## AC_FUNC_MEMCMP @@ -553,6 +600,10 @@ case "$LIBOBJS" in ;; esac +OLD_LIBOBJS="${LIBOBJS}" + +######################################################################## + AC_MSG_CHECKING(byteorder) AC_CACHE_VAL(pike_cv_hardware_byteorder, [ @@ -591,6 +642,8 @@ pike_cv_hardware_byteorder=0 AC_MSG_RESULT($pike_cv_hardware_byteorder) AC_DEFINE_UNQUOTED(BYTEORDER,$pike_cv_hardware_byteorder) +######################################################################## + AC_MSG_CHECKING(for working memmem) AC_CACHE_VAL(pike_cv_func_memmem, [ @@ -646,6 +699,8 @@ else AC_MSG_RESULT(no) fi +######################################################################## + AC_MSG_CHECKING(how to extract an unsigned char) AC_CACHE_VAL(pike_cv_method_extract_uchar, [ @@ -668,6 +723,8 @@ else AC_MSG_RESULT(not by cast) fi +######################################################################## + AC_MSG_CHECKING(how to extract a signed char) AC_CACHE_VAL(pike_cv_method_extract_char, [ @@ -690,6 +747,7 @@ else AC_MSG_RESULT(not by cast) fi +######################################################################## AC_MSG_CHECKING(if signal handlers reset automatically) AC_CACHE_VAL(pike_cv_sys_signal_oneshot, @@ -746,7 +804,7 @@ else AC_MSG_RESULT(no) fi - +######################################################################## AC_MSG_CHECKING(available file descriptors) AC_CACHE_VAL(pike_cv_max_open_fd, @@ -849,6 +907,8 @@ else AC_DEFINE(GETRUSAGE_RESTRICTED) fi +######################################################################## + AC_MSG_CHECKING(getrusage() through procfs) AC_CACHE_VAL(pike_cv_getrusage_procfs, [ diff --git a/src/constants.c b/src/constants.c index de0976b88b52cb44bca4aecf210aa7afee9f3f48..8f329c6738aaead60007bf9263c3370989fc2d2c 100644 --- a/src/constants.c +++ b/src/constants.c @@ -3,6 +3,7 @@ ||| Pike is distributed as GPL (General Public License) ||| See the files COPYING and DISCLAIMER for more information. \*/ +#include "global.h" #include "constants.h" #include "macros.h" #include "program.h" diff --git a/src/error.c b/src/error.c index aeda41fefe442ec3254cbb1bd1381d0c788a5f47..b4bd3c5cef7193b1eacbaec6ee56f320d4e9b85b 100644 --- a/src/error.c +++ b/src/error.c @@ -17,7 +17,6 @@ char *automatic_fatal, *exit_on_error; JMP_BUF *recoveries=0; -ONERROR *onerror_stack=0; JMP_BUF *init_recovery(JMP_BUF *r) { @@ -25,7 +24,7 @@ JMP_BUF *init_recovery(JMP_BUF *r) r->sp=sp-evaluator_stack; r->mark_sp=mark_sp - mark_stack; r->previous=recoveries; - r->onerror=onerror_stack; + r->onerror=0; recoveries=r; return r; } @@ -55,14 +54,10 @@ void throw() ATTRIBUTE((noreturn)) pop_n_elems(sp - evaluator_stack - recoveries->sp); mark_sp = mark_stack + recoveries->mark_sp; - while(recoveries->onerror != onerror_stack) + while(recoveries->onerror) { -#ifdef DEBUG - if(!onerror_stack) - fatal("Popped out of onerror stack!\n"); -#endif - (*onerror_stack->func)(onerror_stack->arg); - onerror_stack=onerror_stack->previous; + (*recoveries->onerror->func)(recoveries->onerror->arg); + recoveries->onerror=recoveries->onerror->previous; } longjmp(recoveries->recovery,1); diff --git a/src/error.h b/src/error.h index 527b7b8fbc335c251520b1aa1cfbcecfa136ae30..900a783513cf9c4db429703c9982d5050d42df93 100644 --- a/src/error.h +++ b/src/error.h @@ -39,7 +39,6 @@ typedef struct JMP_BUF ONERROR *onerror; } JMP_BUF; -extern ONERROR *onerror_stack; extern JMP_BUF *recoveries; extern struct svalue throw_value; extern char *automatic_fatal, *exit_on_error; @@ -51,11 +50,11 @@ extern char *automatic_fatal, *exit_on_error; do{ \ X.func=(error_call)(Y); \ X.arg=(void *)(Z); \ - X.previous=onerror_stack; \ - onerror_stack=&X; \ + X.previous=recoveries->onerror; \ + recoveries->onerror=&X; \ }while(0) -#define UNSET_ONERROR(X) onerror_stack=X.previous +#define UNSET_ONERROR(X) recoveries->onerror=X.previous /* Prototypes begin here */ JMP_BUF *init_recovery(JMP_BUF *r); diff --git a/src/interpret.c b/src/interpret.c index 0501236b4845faef5422219972b195ae574b40aa..eda985b07e4408ea27ad18ee9a21f409e791bf29 100644 --- a/src/interpret.c +++ b/src/interpret.c @@ -24,6 +24,7 @@ #include "builtin_functions.h" #include "signal_handler.h" #include "gc.h" +#include "threads.h" #ifdef HAVE_MMAP #ifdef HAVE_SYS_TYPES_H @@ -84,6 +85,8 @@ void init_interpreter() #define MMALLOC(X,Y) (Y *)mmap(0,X*sizeof(Y),PROT_READ|PROT_WRITE, MAP_NORESERVE | MAP_PRIVATE | MAP_ANONYMOUS, fd, 0) + evaluator_stack_malloced=0; + mark_stack_malloced=0; evaluator_stack=MMALLOC(stack_size,struct svalue); mark_stack=MMALLOC(stack_size, struct svalue *); @@ -91,22 +94,26 @@ void init_interpreter() if((char *)MAP_FAILED == (char *)evaluator_stack) evaluator_stack=0; if((char *)MAP_FAILED == (char *)mark_stack) mark_stack=0; - +#else + evaluator_stack=0; + mark_stack=0; #endif + if(!evaluator_stack) { - evaluator_stack=(struct svalue *)malloc(stack_size*sizeof(struct svalue)); + evaluator_stack=(struct svalue *)xalloc(stack_size*sizeof(struct svalue)); evaluator_stack_malloced=1; } if(!mark_stack) { - mark_stack=(struct svalue **)malloc(stack_size*sizeof(struct svalue *)); + mark_stack=(struct svalue **)xalloc(stack_size*sizeof(struct svalue *)); mark_stack_malloced=1; } sp=evaluator_stack; mark_sp=mark_stack; + fp=0; } void check_stack(INT32 size) @@ -291,6 +298,12 @@ void pop_n_elems(INT32 x) */ void check_threads_etc() { + THREADS_ALLOW(); + + /* Allow other threads to run */ + + THREADS_DISALLOW(); + check_signals(); if(objects_to_destruct) destruct_objects_to_destruct(); CHECK_FOR_GC(); @@ -437,6 +450,11 @@ static void eval_instruction(unsigned char *pc) again: #ifdef DEBUG +#ifdef _REENTRANT + if(!mt_trylock(& interpreter_lock)) + fatal("Interpreter running unlocked!\n"); +#endif + sp[0].type=99; /* an invalid type */ sp[1].type=99; sp[2].type=99; diff --git a/src/interpret.h b/src/interpret.h index 4ce6f2f20a1e17cce44849687709f95d9fe07baf..6669cdb15ff2a52362984950c3a0c0622c43fedc 100644 --- a/src/interpret.h +++ b/src/interpret.h @@ -101,5 +101,6 @@ extern struct svalue *evaluator_stack; extern struct svalue **mark_stack; extern struct frame *fp; /* frame pointer */ extern int stack_size; +extern int evaluator_stack_malloced, mark_stack_malloced; #endif diff --git a/src/machine.h.in b/src/machine.h.in index ad6a7e67441d0c1b0ee98f2b4f4325d7022526d8..d05f671ebfbe6030918cbf1d529084f58105d50c 100644 --- a/src/machine.h.in +++ b/src/machine.h.in @@ -328,4 +328,11 @@ /* Define this to the max value of an unsigned short unles <limits.h> does.. */ #undef USHRT_MAX +/* define this if you have pthreads.h */ +#undef HAVE_PTHREAD_H + +/* Define this if you are going to use threads */ +#undef _REENTRANT +#undef _THREAD_SAFE + #endif /* MACHINE_H */ diff --git a/src/main.c b/src/main.c index 9834a4ad6794fc80fb49433bbe81f2b1dd8d44c1..d1afd4c6afc357be93d908fdc8a00ef7685e5788 100644 --- a/src/main.c +++ b/src/main.c @@ -18,6 +18,7 @@ #include "macros.h" #include "callback.h" #include "signal_handler.h" +#include "threads.h" #ifdef HAVE_LOCALE_H #include <locale.h> @@ -251,10 +252,12 @@ void init_main_efuns() init_types(); init_builtin_efuns(); init_signals(); + th_init(); } void init_main_programs() { + th_init_programs(); } diff --git a/src/modules/files/configure.in b/src/modules/files/configure.in index 126da74149be02c07e65ffaeca8f724522ac033e..880f89bdd969030fffabfd8743d34703e6f6c403 100644 --- a/src/modules/files/configure.in +++ b/src/modules/files/configure.in @@ -11,7 +11,7 @@ AC_HEADER_DIRENT AC_CHECK_LIB(socket, socket) AC_CHECK_LIB(nsl, gethostbyname) -AC_HAVE_FUNCS(socketpair getwd perror) +AC_HAVE_FUNCS(socketpair getwd perror fork1) AC_MSG_CHECKING(for h_addr_list) AC_CACHE_VAL(pike_cv_struct_has_h_addr_list, diff --git a/src/modules/files/efuns.c b/src/modules/files/efuns.c index 338d88e80bdd59cbd19102eb4e42e4b2ad8aecce..f0744966acdaf930a0e2a38cd6fe2ffd351e2fff 100644 --- a/src/modules/files/efuns.c +++ b/src/modules/files/efuns.c @@ -12,6 +12,7 @@ #include "mapping.h" #include "macros.h" #include "fd_control.h" +#include "threads.h" #include "file_machine.h" @@ -229,7 +230,11 @@ void f_getcwd(INT32 args) void f_fork(INT32 args) { pop_n_elems(args); +#if defined(HAVE_FORK1) && defined(_REENTRANT) + push_int(fork1()); +#else push_int(fork()); +#endif } diff --git a/src/modules/files/file.c b/src/modules/files/file.c index 00efb82e466bbcdab4f36bd26eaa57870775baa5..2075d4c8504708cea6bf87da72deed230c51d7cd 100644 --- a/src/modules/files/file.c +++ b/src/modules/files/file.c @@ -21,6 +21,7 @@ #include "error.h" #include "signal_handler.h" #include "pike_types.h" +#include "threads.h" #ifdef HAVE_SYS_TYPE_H #include <sys/types.h> @@ -64,11 +65,11 @@ struct file_struct { short fd; - short errno; + short my_errno; }; #define FD (((struct file_struct *)(fp->current_storage))->fd) -#define ERRNO (((struct file_struct *)(fp->current_storage))->errno) +#define ERRNO (((struct file_struct *)(fp->current_storage))->my_errno) #define THIS (files + FD) static struct my_file files[MAX_OPEN_FILEDESCRIPTORS]; @@ -107,7 +108,12 @@ static int close_fd(int fd) { while(1) { - if(close(fd) < 0) + int i; + THREADS_ALLOW(); + i=close(fd); + THREADS_DISALLOW(); + + if(i < 0) { switch(errno) { @@ -248,7 +254,10 @@ static void file_read(INT32 args) SET_ONERROR(ebuf, call_free, str); do{ - i=read(FD, str->str+bytes_read, r); + int fd=FD; + THREADS_ALLOW(); + i=read(fd, str->str+bytes_read, r); + THREADS_DISALLOW(); check_signals(); @@ -287,7 +296,7 @@ static void file_read(INT32 args) } }else{ -#define CHUNK 16384 +#define CHUNK 65536 INT32 try_read; dynamic_buffer b; @@ -295,9 +304,12 @@ static void file_read(INT32 args) low_init_buf(&b); SET_ONERROR(ebuf, free_dynamic_buffer, &b); do{ + int fd=FD; try_read=MINIMUM(CHUNK,r); - i=read(FD, low_make_buf_space(try_read, &b), try_read); + THREADS_ALLOW(); + i=read(fd, low_make_buf_space(try_read, &b), try_read); + THREADS_DISALLOW(); check_signals(); @@ -353,6 +365,7 @@ static void file_write_callback(int fd, void *data) static void file_write(INT32 args) { INT32 written,i; + struct pike_string *str; if(args<1 || sp[-args].type != T_STRING) error("Bad argument 1 to file->write().\n"); @@ -361,9 +374,14 @@ static void file_write(INT32 args) error("File not open for write.\n"); written=0; - while(written < sp[-args].u.string->len) + str=sp[-args].u.string; + + while(written < str->len) { - i=write(FD, sp[-args].u.string->str + written, sp[-args].u.string->len - written); + int fd=FD; + THREADS_ALLOW(); + i=write(fd, str->str + written, str->len - written); + THREADS_DISALLOW(); if(i<0) { @@ -452,6 +470,7 @@ static void file_close(INT32 args) static void file_open(INT32 args) { int flags,fd; + struct pike_string *str; do_close(FD, FILE_READ | FILE_WRITE); FD=-1; @@ -463,6 +482,8 @@ static void file_open(INT32 args) if(sp[1-args].type != T_STRING) error("Bad argument 2 to file->open()\n"); + + str=sp[-args].u.string; flags=parse(sp[1-args].u.string->str); @@ -470,7 +491,12 @@ static void file_open(INT32 args) error("Must open file for at least one of read and write.\n"); retry: - fd=open(sp[-args].u.string->str,map(flags), 00666); + THREADS_ALLOW(); + fd=open(str->str,map(flags), 00666); + THREADS_DISALLOW(); + + if(!fp->current_object->prog) + error("Object destructed in file->open()\n"); if(fd >= MAX_OPEN_FILEDESCRIPTORS) { @@ -540,15 +566,23 @@ struct array *encode_stat(struct stat *); static void file_stat(INT32 args) { + int fd; struct stat s; + int tmp; if(FD < 0) error("File not open.\n"); pop_n_elems(args); + fd=FD; + retry: - if(fstat(FD, &s) < 0) + THREADS_ALLOW(); + tmp=fstat(fd, &s); + THREADS_DISALLOW(); + + if(tmp < 0) { if(errno == EINTR) goto retry; ERRNO=errno; @@ -575,7 +609,9 @@ static struct pike_string *do_read(INT32 *amount,int fd) { char buffer[READ_BUFFER]; + THREADS_ALLOW(); *amount = read(fd, buffer, READ_BUFFER); + THREADS_DISALLOW(); if(*amount>0) return make_shared_binary_string(buffer,*amount); return 0; @@ -756,7 +792,7 @@ struct object *file_make_object_from_fd(int fd, int mode) init_fd(fd, mode); o=clone(file_program,0); ((struct file_struct *)(o->storage))->fd=fd; - ((struct file_struct *)(o->storage))->errno=0; + ((struct file_struct *)(o->storage))->my_errno=0; return o; } @@ -949,7 +985,7 @@ static void file_dup(INT32 args) o=clone(file_program,0); ((struct file_struct *)o->storage)->fd=FD; - ((struct file_struct *)o->storage)->errno=0; + ((struct file_struct *)o->storage)->my_errno=0; ERRNO=0; files[FD].refs++; push_object(o); @@ -1079,6 +1115,7 @@ static void file_open_socket(INT32 args) static void file_connect(INT32 args) { struct sockaddr_in addr; + int tmp; if(args < 2) error("Too few arguments to file->connect()\n"); @@ -1095,7 +1132,12 @@ static void file_connect(INT32 args) get_inet_addr(&addr, sp[-args].u.string->str); addr.sin_port = htons(((u_short)sp[1-args].u.integer)); - if(connect(FD, (struct sockaddr *)&addr, sizeof(addr)) < 0) + tmp=FD; + THREADS_ALLOW(); + tmp=connect(tmp, (struct sockaddr *)&addr, sizeof(addr)); + THREADS_DISALLOW(); + + if(tmp < 0) { /* something went wrong */ ERRNO=errno; @@ -1128,7 +1170,20 @@ void get_inet_addr(struct sockaddr_in *addr,char *name) else { struct hostent *ret; + +#ifdef _REENTRANT + static MUTEX_T l; + + THREADS_ALLOW(); + + mt_lock(&l); ret=gethostbyname(name); + mt_unlock(&l); + + THREADS_DISALLOW(); +#else + ret=gethostbyname(name); +#endif if(!ret) error("Invalid address '%s'\n",name); diff --git a/src/modules/files/file_machine.h.in b/src/modules/files/file_machine.h.in index 2c22d2e21ebbbd5df1a985bb5a3ddd3981c30846..5c9482478e6f8818a3f4dd73121c50358e1730d9 100644 --- a/src/modules/files/file_machine.h.in +++ b/src/modules/files/file_machine.h.in @@ -40,6 +40,9 @@ /* Define if you have getwd. */ #undef HAVE_GETWD +/* Define if you have fork1. */ +#undef HAVE_FORK1 + /* Define if you have strerror. */ #undef HAVE_STRERROR diff --git a/src/modules/files/socket.c b/src/modules/files/socket.c index 803f785b2106f27017821266d6e47aa8e15e20f8..6ab53144aeaf28fa8e2cf5394fac458b261d5dec 100644 --- a/src/modules/files/socket.c +++ b/src/modules/files/socket.c @@ -13,6 +13,7 @@ #include "macros.h" #include "backend.h" #include "fd_control.h" +#include "threads.h" #include "file_machine.h" #include "file.h" @@ -40,17 +41,10 @@ #include <sys/socketvar.h> #endif -#ifdef SOLARIS -#include <synch.h> -#include <sys/mman.h> - -mutex_t *locks; -#endif - struct port { int fd; - int errno; + int my_errno; struct svalue accept_callback; struct svalue id; }; @@ -90,7 +84,7 @@ static void port_query_id(INT32 args) static void port_errno(INT32 args) { pop_n_elems(args); - push_int(THIS->errno); + push_int(THIS->my_errno); } static void port_accept_callback(int fd,void *data) @@ -122,7 +116,7 @@ static void port_listen_fd(INT32 args) if(fd<0 || fd >MAX_OPEN_FILEDESCRIPTORS) { - THIS->errno=EBADF; + THIS->my_errno=EBADF; pop_n_elems(args); push_int(0); return; @@ -130,7 +124,7 @@ static void port_listen_fd(INT32 args) if(listen(fd, 16384) < 0) { - THIS->errno=errno; + THIS->my_errno=errno; pop_n_elems(args); push_int(0); return; @@ -145,7 +139,7 @@ static void port_listen_fd(INT32 args) } THIS->fd=fd; - THIS->errno=0; + THIS->my_errno=0; pop_n_elems(args); push_int(1); } @@ -154,7 +148,7 @@ static void port_bind(INT32 args) { struct sockaddr_in addr; int o; - int fd; + int fd,tmp; do_close(THIS); @@ -168,14 +162,14 @@ static void port_bind(INT32 args) if(fd < 0) { - THIS->errno=errno; + THIS->my_errno=errno; pop_n_elems(args); push_int(0); return; } if(fd >= MAX_OPEN_FILEDESCRIPTORS) { - THIS->errno=EBADF; + THIS->my_errno=EBADF; close(fd); pop_n_elems(args); push_int(0); @@ -185,7 +179,7 @@ static void port_bind(INT32 args) o=1; if(setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *)&o, sizeof(int)) < 0) { - THIS->errno=errno; + THIS->my_errno=errno; close(fd); push_int(0); return; @@ -204,10 +198,13 @@ static void port_bind(INT32 args) addr.sin_port = htons( ((u_short)sp[-args].u.integer) ); - if(bind(fd, (struct sockaddr *)&addr, sizeof(addr)) < 0 || - listen(fd, 16384) < 0 ) + THREADS_ALLOW(); + tmp=bind(fd, (struct sockaddr *)&addr, sizeof(addr)) < 0 || listen(fd, 16384) < 0; + THREADS_DISALLOW(); + + if(tmp) { - THIS->errno=errno; + THIS->my_errno=errno; close(fd); pop_n_elems(args); push_int(0); @@ -224,7 +221,7 @@ static void port_bind(INT32 args) } THIS->fd=fd; - THIS->errno=0; + THIS->my_errno=0; pop_n_elems(args); push_int(1); } @@ -247,7 +244,7 @@ static void port_create(INT32 args) if(listen(THIS->fd, 16384) < 0) { - THIS->errno=errno; + THIS->my_errno=errno; }else{ if(args > 1) { @@ -266,6 +263,7 @@ extern struct program *file_program; static void port_accept(INT32 args) { + struct port *this=THIS; int fd,tmp; int len=0; struct object *o; @@ -273,24 +271,21 @@ static void port_accept(INT32 args) if(THIS->fd < 0) error("port->accept(): Port not open.\n"); -#ifdef SOLARIS - mutex_lock(locks+THIS->fd); - fd=accept(THIS->fd, 0, &len); - mutex_unlock(locks+THIS->fd); -#else - fd=accept(THIS->fd, 0, &len); -#endif + + THREADS_ALLOW(); + fd=accept(this->fd, 0, &len); + THREADS_DISALLOW(); if(fd < 0) { - THIS->errno=errno; + THIS->my_errno=errno; pop_n_elems(args); push_int(0); return; } if(fd >= MAX_OPEN_FILEDESCRIPTORS) { - THIS->errno=EBADF; + THIS->my_errno=EBADF; pop_n_elems(args); push_int(0); close(fd); @@ -317,7 +312,7 @@ static void init_port_struct(struct object *o) THIS->id.u.object=o; o->refs++; THIS->accept_callback.type=T_INT; - THIS->errno=0; + THIS->my_errno=0; } static void exit_port_struct(struct object *o) @@ -331,41 +326,6 @@ static void exit_port_struct(struct object *o) void port_setup_program() { -#ifdef SOLARIS - /* Note: - * This hack is because solaris accept() isn't atomic and problems arise - * when several processes do accept() on the same fd IF that fd is - * nonblocking. This patch puts an atomic mutex lock around accept(). - * We allocate one lock / filedescriptor with mmap, and use MAP_SHARED - * so they are shared across fork()... For people who doesn't use fork() - * or nonblocking sockets this patch won't do anything. - * - * This is an ugly workaround (tm) /Fredrik Hubinette - */ - int i; - i=open("/dev/zero",O_RDWR); - if(i<0) - { - perror("Failed to open /dev/zero"); - exit(5); - } - locks=(mutex_t *)mmap(0, sizeof(mutex_t)*MAX_OPEN_FILEDESCRIPTORS, - PROT_READ | PROT_WRITE, MAP_SHARED, i, 0); - if(!locks) - { - perror("Failed to mmap /dev/zero"); - exit(5); - } - close(i); - for(i=0;i<MAX_OPEN_FILEDESCRIPTORS;i++) - { - if(mutex_init(locks+i,USYNC_PROCESS,0)) - { - perror("Mutex init failed"); - exit(5); - } - } -#endif start_new_program(); add_storage(sizeof(struct port)); add_function("bind",port_bind,"function(int,void|mixed:int)",0); diff --git a/src/modules/gdbmmod/gdbmmod.c b/src/modules/gdbmmod/gdbmmod.c index ab1525b02b61de654725498ee28adee2e7be3b9a..f9778ec83bac5c31955596bd55fa6e6e34395b63 100644 --- a/src/modules/gdbmmod/gdbmmod.c +++ b/src/modules/gdbmmod/gdbmmod.c @@ -6,6 +6,9 @@ #include "global.h" #include "gdbm_machine.h" #include "types.h" +#include "threads.h" + +/* Todo: make sure only one thread accesses the same gdbmmod */ #if defined(HAVE_GDBM_H) && defined(HAVE_LIBGDBM) @@ -18,6 +21,10 @@ #include <gdbm.h> +#ifdef _REENTRANT +static MUTEX_T gdbm_lock; +#endif + struct gdbm_glue { GDBM_FILE dbf; @@ -29,8 +36,15 @@ static void do_free() { if(THIS->dbf) { - gdbm_close(THIS->dbf); + GDBM_FILE dbf; + dbf=THIS->dbf; THIS->dbf=0; + + THREADS_ALLOW(); + mt_lock(& gdbm_lock); + gdbm_close(dbf); + mt_unlock(& gdbm_lock); + THREADS_DISALLOW(); } } @@ -76,10 +90,14 @@ void gdbmmod_fatal(char *err) static void gdbmmod_create(INT32 args) { + struct gdbm_glue *this=THIS; do_free(); if(args) { + GDBM_FILE tmp; + struct pike_string *tmp2; int rwmode = GDBM_WRCREAT; + if(sp[-args].type != T_STRING) error("Bad argument 1 to gdbm->create()\n"); @@ -91,7 +109,21 @@ static void gdbmmod_create(INT32 args) rwmode=fixmods(sp[1-args].u.string->str); } - THIS->dbf=gdbm_open(sp[-args].u.string->str, 512, rwmode, 00666, gdbmmod_fatal); + tmp2=sp[-args].u.string; + + THREADS_ALLOW(); + mt_lock(& gdbm_lock); + tmp=gdbm_open(tmp2->str, 512, rwmode, 00666, gdbmmod_fatal); + mt_unlock(& gdbm_lock); + THREADS_DISALLOW(); + + if(!fp->current_object->prog) + { + if(tmp) gdbm_close(tmp); + error("Object destructed in gdbm->open()n"); + } + THIS->dbf=tmp; + pop_n_elems(args); if(!THIS->dbf) error("Failed to open GDBM database.\n"); @@ -103,7 +135,9 @@ static void gdbmmod_create(INT32 args) static void gdbmmod_fetch(INT32 args) { + struct gdbm_glue *this=THIS; datum key,ret; + if(!args) error("Too few arguments to gdbm->fetch()\n"); @@ -115,7 +149,12 @@ static void gdbmmod_fetch(INT32 args) STRING_TO_DATUM(key, sp[-args].u.string); - ret=gdbm_fetch(THIS->dbf, key); + THREADS_ALLOW(); + mt_lock(& gdbm_lock); + ret=gdbm_fetch(this->dbf, key); + mt_unlock(& gdbm_lock); + THREADS_DISALLOW(); + pop_n_elems(args); if(ret.dptr) { @@ -128,6 +167,7 @@ static void gdbmmod_fetch(INT32 args) static void gdbmmod_delete(INT32 args) { + struct gdbm_glue *this=THIS; datum key; int ret; if(!args) @@ -136,24 +176,35 @@ static void gdbmmod_delete(INT32 args) if(sp[-args].type != T_STRING) error("Bad argument 1 to gdbm->delete()\n"); - if(!THIS->dbf) + if(!this->dbf) error("GDBM database not open.\n"); STRING_TO_DATUM(key, sp[-args].u.string); - ret=gdbm_delete(THIS->dbf, key); + THREADS_ALLOW(); + mt_lock(& gdbm_lock); + ret=gdbm_delete(this->dbf, key); + mt_unlock(& gdbm_lock); + THREADS_DISALLOW(); + pop_n_elems(args); push_int(0); } static void gdbmmod_firstkey(INT32 args) { + struct gdbm_glue *this=THIS; datum ret; pop_n_elems(args); - if(!THIS->dbf) error("GDBM database not open.\n"); + if(!this->dbf) error("GDBM database not open.\n"); + + THREADS_ALLOW(); + mt_lock(& gdbm_lock); + ret=gdbm_firstkey(this->dbf); + mt_unlock(& gdbm_lock); + THREADS_DISALLOW(); - ret=gdbm_firstkey(THIS->dbf); if(ret.dptr) { push_string(DATUM_TO_STRING(ret)); @@ -165,6 +216,7 @@ static void gdbmmod_firstkey(INT32 args) static void gdbmmod_nextkey(INT32 args) { + struct gdbm_glue *this=THIS; datum key,ret; if(!args) error("Too few arguments to gdbm->nextkey()\n"); @@ -177,7 +229,12 @@ static void gdbmmod_nextkey(INT32 args) STRING_TO_DATUM(key, sp[-args].u.string); - ret=gdbm_nextkey(THIS->dbf, key); + THREADS_ALLOW(); + mt_lock(& gdbm_lock); + ret=gdbm_nextkey(this->dbf, key); + mt_unlock(& gdbm_lock); + THREADS_DISALLOW(); + pop_n_elems(args); if(ret.dptr) { @@ -190,6 +247,7 @@ static void gdbmmod_nextkey(INT32 args) static void gdbmmod_store(INT32 args) { + struct gdbm_glue *this=THIS; datum key,data; int ret; if(args<2) @@ -207,7 +265,12 @@ static void gdbmmod_store(INT32 args) STRING_TO_DATUM(key, sp[-args].u.string); STRING_TO_DATUM(data, sp[1-args].u.string); - ret=gdbm_store(THIS->dbf, key, data, GDBM_REPLACE); + THREADS_ALLOW(); + mt_lock(& gdbm_lock); + ret=gdbm_store(this->dbf, key, data, GDBM_REPLACE); + mt_unlock(& gdbm_lock); + THREADS_DISALLOW(); + if(ret == -1) error("GDBM database not open for writing.\n"); @@ -217,27 +280,31 @@ static void gdbmmod_store(INT32 args) static void gdbmmod_reorganize(INT32 args) { - datum ret; + struct gdbm_glue *this=THIS; + int ret; pop_n_elems(args); if(!THIS->dbf) error("GDBM database not open.\n"); - ret=gdbm_firstkey(THIS->dbf); + THREADS_ALLOW(); + mt_lock(& gdbm_lock); + ret=gdbm_reorganize(this->dbf); + mt_unlock(& gdbm_lock); + THREADS_DISALLOW(); pop_n_elems(args); - if(ret.dptr) - { - push_string(DATUM_TO_STRING(ret)); - free(ret.dptr); - }else{ - push_int(0); - } + push_int(ret); } static void gdbmmod_sync(INT32 args) { + struct gdbm_glue *this=THIS; pop_n_elems(args); if(!THIS->dbf) error("GDBM database not open.\n"); - gdbm_sync(THIS->dbf); + THREADS_ALLOW(); + mt_lock(& gdbm_lock); + gdbm_sync(this->dbf); + mt_unlock(& gdbm_lock); + THREADS_DISALLOW(); push_int(0); } diff --git a/src/modules/readlinemod/readlinemod.c b/src/modules/readlinemod/readlinemod.c index 786aef5c0a7e74c7161e2a99e005cc4da2ca9ee5..7e4a40a5eb51da74cf1e6605ff83c90d75fc9e56 100644 --- a/src/modules/readlinemod/readlinemod.c +++ b/src/modules/readlinemod/readlinemod.c @@ -12,6 +12,7 @@ #include "array.h" #include "object.h" #include "macros.h" +#include "threads.h" #ifndef HAVE_LIBTERMCAP #undef HAVE_LIBREADLINE @@ -50,13 +51,18 @@ static void f_readline(INT32 args) { char *r; + struct pike_string *str; if(args < 1) error("Too few arguments to readline().\n"); if(sp[-args].type != T_STRING) error("Bad argument 1 to readline()\n"); - r=readline(sp[-args].u.string->str); + str=sp[-args].u.string; + THREADS_ALLOW(); + r=readline(str->str); + THREADS_DISALLOW(); + pop_n_elems(args); if(r) { @@ -84,6 +90,8 @@ static void f_readline(INT32 args) { char line[BLOCK]; char *r; + int tmp; + if(args < 1) error("Too few arguments to readline().\n"); @@ -94,7 +102,11 @@ static void f_readline(INT32 args) fflush(stdout); pop_n_elems(args); - if(fgets(line,BLOCK,stdin)) + THREADS_ALLOW(); + r=fgets(line,BLOCK,stdin); + THREADS_DISALLOW(); + + if(r) { INT32 len; if(len=strlen(line)) diff --git a/src/modules/spider/configure.in b/src/modules/spider/configure.in index 1d198ad58c05a67988bf39387a89f5f5fe948285..784f0c2a725e20be9ddd0a6072ffe6139e5f5566 100644 --- a/src/modules/spider/configure.in +++ b/src/modules/spider/configure.in @@ -21,7 +21,7 @@ AC_HAVE_FUNCS(syslog perror getppid getpgrp strdup\ uname gethostbyname getpwnam getspnam initgroups \ mutex_unlock mmap sendmsg) -if test x$ac_cv_lib_thread_mutex_inlock = xno ; then +if test x$ac_cv_lib_thread_mutex_unlock = xno ; then AC_HAVE_HEADERS( pthread.h ) AC_HAVE_FUNCS(pthread_mutex_unlock) fi diff --git a/src/modules/spider/spider.c b/src/modules/spider/spider.c index be65a0089e2231cc2721ef22e724301f699c6b15..3d051f75f3bcdd82e0c5b38b742d0fac1b333f14 100644 --- a/src/modules/spider/spider.c +++ b/src/modules/spider/spider.c @@ -31,6 +31,7 @@ #include "array.h" #include "builtin_functions.h" #include "lock.h" +#include "threads.h" #ifdef HAVE_PWD_H #include <pwd.h> @@ -163,7 +164,7 @@ void f_parse_accessed_database(INT32 args) #define HAVE_SEND_FD void f_send_fd(INT32 args) { - int sock_fd, fd; + int sock_fd, fd, tmp; if(args != 2) error("RTSL\n"); @@ -172,7 +173,11 @@ void f_send_fd(INT32 args) pop_stack(); pop_stack(); - while(ioctl(sock_fd, I_SENDFD, fd) == -1) + THREADS_ALLOW(); + tmp=ioctl(sock_fd, I_SENDFD, fd); + THREADS_DISALLOW(); + + while(tmp == -1) { switch(errno) { @@ -220,7 +225,7 @@ void f_send_fd(INT32 args) { struct iovec iov; struct msghdr msg; - int sock_fd, fd; + int sock_fd, fd, tmp; if(args != 2) error("RTSL\n"); sock_fd = sp[-args].u.integer; @@ -236,7 +241,12 @@ void f_send_fd(INT32 args) msg.msg_accrights = (caddr_t)&fd; msg.msg_accrightslen = sizeof(fd); - while(sendmsg(sock_fd, &msg, 0) == -1) + + THREADS_ALLOW(); + tmp=sendmsg(sock_fd, &msg, 0); + THREADS_DISALLOW(); + + while(tmp == -1) { switch(errno) { diff --git a/src/program.c b/src/program.c index 1c280c3caa34141af7a4d692baf2b690ef6f2c92..b90d6b1cb1106d75b0ac1bdea7abd45ad1a788b7 100644 --- a/src/program.c +++ b/src/program.c @@ -20,6 +20,7 @@ #include "hashtable.h" #include "main.h" #include "gc.h" +#include "threads.h" #include <stdio.h> #include <fcntl.h> @@ -124,6 +125,7 @@ void start_new_program() struct inherit inherit; struct pike_string *name; + threads_disabled++; #define PROGRAM_STATE #define PUSH #include "compilation.h" @@ -528,6 +530,7 @@ struct program *end_program() #include "compilation.h" #undef POP #undef PROGRAM_STATE + threads_disabled--; return prog; } diff --git a/src/threads.c b/src/threads.c new file mode 100644 index 0000000000000000000000000000000000000000..766fdb6ed4d65f9070960c7f1ee7d62686d0b74d --- /dev/null +++ b/src/threads.c @@ -0,0 +1,178 @@ +#include "global.h" +#include "threads.h" +#include "array.h" +#include "object.h" + +int num_threads = 1; +int threads_disabled = 0; + +#ifdef _REENTRANT + +MUTEX_T interpreter_lock, compiler_lock; +struct program *mutex_key = 0; +pthread_attr_t pattr; + +void *new_thread_func(void * data) +{ + JMP_BUF back; + struct array *foo; + INT32 args; + foo=(struct array *)data; + args=foo->size; + mt_lock( & interpreter_lock); + init_interpreter(); + + + if(SETJMP(back)) + { + exit_on_error="Error in handle_error in master object!\nPrevious error:"; + assign_svalue_no_free(sp++, & throw_value); + APPLY_MASTER("handle_error", 1); + pop_stack(); + automatic_fatal=0; + } else { + push_array_items(foo); + f_call_function(args); + } + + UNSETJMP(back); + + cleanup_interpret(); + mt_unlock(& interpreter_lock); + num_threads--; + th_exit(0); +} + +void f_thread_create(INT32 args) +{ + pthread_t dummy; + int tmp; + struct array *a=aggregate_array(args); + num_threads++; + tmp=th_create(&dummy,new_thread_func,a); + if(tmp) num_threads--; + push_int(tmp); +} + +void th_init() +{ + mt_lock( & interpreter_lock); + pthread_attr_init(&pattr); + pthread_attr_setstacksize(&pattr, 2 * 1024 * 1204); + pthread_attr_setdetachstate(&pattr, PTHREAD_CREATE_DETACHED); + + add_efun("thread_create",f_thread_create,"function(mixed ...:int)",OPT_SIDE_EFFECT); +} + + +#define THIS_MUTEX ((MUTEX_T *)(fp->current_storage)) + +void f_mutex_lock(INT32 args) +{ + MUTEX_T *m; + struct object *o; + pop_n_elems(args); + m=THIS_MUTEX; + THREADS_ALLOW(); + mt_lock(m); + THREADS_DISALLOW(); + o=clone(mutex_key,0); + ((struct object **)(o->storage))[0]=fp->current_object; + fp->current_object->refs++; + push_object(o); +} + +void f_mutex_trylock(INT32 args) +{ + MUTEX_T *m; + int i; + pop_n_elems(args); + + m=THIS_MUTEX; + THREADS_ALLOW(); + i=mt_lock(m); + THREADS_DISALLOW(); + + if(i) + { + struct object *o; + o=clone(mutex_key,0); + ((struct object **)o->storage)[0]=fp->current_object; + fp->current_object->refs++; + push_object(o); + } else { + push_int(0); + } +} + +void init_mutex_obj(struct object *o) { mt_init(THIS_MUTEX); } +void exit_mutex_obj(struct object *o) { mt_destroy(THIS_MUTEX); } + +#define THIS_KEY (*(struct object **)fp->current_storage) +void init_mutex_key_obj(struct object *o) { THIS_KEY=0; } + +void exit_mutex_key_obj(struct object *o) +{ + if(THIS_KEY) + { + mt_unlock((MUTEX_T *)THIS_KEY->storage); + init_mutex_key_obj(o); + } +} + +#define THIS_COND ((COND_T *)(fp->current_storage)) +void f_cond_wait(INT32 args) +{ + COND_T *c; + + if(args > 1) pop_n_elems(args - 1); + + if(sp[-1].type != T_OBJECT) + error("Bad argument 1 to condition->wait()\n"); + + if(sp[-1].u.object->prog) + { + if(sp[-1].u.object->prog != mutex_key) + error("Bad argument 1 to condition->wait()\n"); + + destruct(sp[-1].u.object); + pop_stack(); + } + + c=THIS_COND; + THREADS_ALLOW(); + co_wait(c,0); + THREADS_DISALLOW(); +} + +void f_cond_signal(INT32 args) { pop_n_elems(args); co_signal(THIS_COND); } +void f_cond_broadcast(INT32 args) { pop_n_elems(args); co_broadcast(THIS_COND); } +void init_cond_obj(struct object *o) { co_init(THIS_COND); } +void exit_cond_obj(struct object *o) { co_destroy(THIS_COND); } + +void th_init_programs() +{ + start_new_program(); + add_storage(sizeof(MUTEX_T)); + add_function("lock",f_mutex_lock,"function(:object)",0); + add_function("trylock",f_mutex_trylock,"function(:object)",0); + set_init_callback(init_mutex_obj); + set_exit_callback(exit_mutex_obj); + end_c_program("/precompiled/mutex"); + + start_new_program(); + add_storage(sizeof(struct object *)); + set_exit_callback(exit_mutex_key_obj); + mutex_key=end_c_program("/precompiled/mutex_key"); + + start_new_program(); + add_storage(sizeof(COND_T)); + add_function("wait",f_cond_wait,"function(void|object:void)",0); + add_function("signal",f_cond_signal,"function(:void)",0); + add_function("broadcast",f_cond_broadcast,"function(:void)",0); + set_init_callback(init_cond_obj); + set_exit_callback(exit_cond_obj); + end_c_program("/precompiled/condition"); +} + +#endif diff --git a/src/threads.h b/src/threads.h new file mode 100644 index 0000000000000000000000000000000000000000..133b05d305f85cc701e5f467cdcf0b20d7638bc8 --- /dev/null +++ b/src/threads.h @@ -0,0 +1,110 @@ +#ifndef THREADS_H +#define THREADS_H + +#include "machine.h" +#include "interpret.h" +#include "error.h" + +#ifdef _REENTRANT + +#ifdef HAVE_PTHREAD_H +#include <pthread.h> +#undef HAVE_PTHREAD_H +#endif + +extern int num_threads; + +#define MUTEX_T pthread_mutex_t +#define mt_init(X) pthread_mutex_init((X),0) +#define mt_lock(X) pthread_mutex_lock(X) +#define mt_trylock(X) pthread_mutex_trylock(X) +#define mt_unlock(X) pthread_mutex_unlock(X) +#define mt_destroy(X) pthread_mutex_destroy(X) + +extern MUTEX_T interpreter_lock, compiler_lock; + +#define th_create(ID,fun,arg) pthread_create(ID,&pattr,fun,arg) +#define th_exit(foo) pthread_exit(foo) + +#define COND_T pthread_cond_t +#define co_init(X) pthread_cond_init((X), 0) +#define co_wait(COND, MUTEX) pthread_cond_wait((COND), (MUTEX)) +#define co_signal(X) pthread_cond_signal(X) +#define co_broadcast(X) pthread_cond_broadcast(X) +#define co_destroy(X) pthread_cond_destroy(X) + +struct svalue; +struct frame; + +struct thread_state { + int swapped; + struct svalue *sp,*evaluator_stack; + struct svalue **mark_sp,**mark_stack; + struct frame *fp; + int evaluator_stack_malloced; + int mark_stack_malloced; + JMP_BUF *recoveries; +}; + +#define THREADS_ALLOW() \ + do {\ + struct thread_state _tmp; \ + _tmp.swapped=0; \ + if(num_threads > 1 && !threads_disabled) { \ + _tmp.swapped=1; \ + _tmp.sp=sp; \ + _tmp.evaluator_stack=evaluator_stack; \ + _tmp.mark_sp=mark_sp; \ + _tmp.mark_stack=mark_stack; \ + _tmp.fp=fp; \ + _tmp.recoveries=recoveries; \ + _tmp.evaluator_stack_malloced=evaluator_stack_malloced; \ + _tmp.mark_stack_malloced=mark_stack_malloced; \ + mt_unlock(& interpreter_lock); \ + } + +#define THREADS_DISALLOW() \ + if(_tmp.swapped) { \ + mt_lock(& interpreter_lock); \ + sp=_tmp.sp; \ + evaluator_stack=_tmp.evaluator_stack; \ + mark_sp=_tmp.mark_sp; \ + mark_stack=_tmp.mark_stack; \ + fp=_tmp.fp; \ + recoveries=_tmp.recoveries; \ + evaluator_stack_malloced=_tmp.evaluator_stack_malloced; \ + mark_stack_malloced=_tmp.mark_stack_malloced; \ + } \ + } while(0) + +/* Prototypes begin here */ +void *new_thread_func(void * data); +void f_thread_create(INT32 args); +void th_init(); +void f_mutex_lock(INT32 args); +void f_mutex_trylock(INT32 args); +void init_mutex_obj(struct object *o); +void exit_mutex_obj(struct object *o); +void init_mutex_key_obj(struct object *o); +void exit_mutex_key_obj(struct object *o); +void f_cond_wait(INT32 args); +void f_cond_signal(INT32 args); +void f_cond_broadcast(INT32 args); +void init_cond_obj(struct object *o); +void exit_cond_obj(struct object *o); +void th_init_programs(); +/* Prototypes end here */ + + +#else +#define mt_init() +#define mt_lock() +#define mt_unlock() +#define THREADS_ALLOW() +#define THREADS_DISALLOW() +#define th_init() +#define th_init_programs() +#endif + +extern int threads_disabled; +#endif