diff --git a/src/modules/spider/spider.c b/src/modules/spider/spider.c index e634227ec75f52c06d153b921b39e7462dfe0594..2973db22b2254e934bc9fc1309ea0022999442d2 100644 --- a/src/modules/spider/spider.c +++ b/src/modules/spider/spider.c @@ -37,10 +37,12 @@ #include "mapping.h" #include "array.h" #include "builtin_functions.h" +#include "module_support.h" +#include "backend.h" #include "threads.h" #include "operators.h" -RCSID("$Id: spider.c,v 1.52 1998/02/10 14:50:23 grubba Exp $"); +RCSID("$Id: spider.c,v 1.53 1998/02/10 15:44:57 per Exp $"); #ifdef HAVE_PWD_H #include <pwd.h> @@ -79,8 +81,9 @@ RCSID("$Id: spider.c,v 1.52 1998/02/10 14:50:23 grubba Exp $"); #include <errno.h> -#include "dmalloc.h" +/* #include <stdlib.h> */ +#include "dmalloc.h" #include "accesseddb.h" #define MAX_PARSE_RECURSE 102 @@ -1261,8 +1264,148 @@ static struct program *streamed_parser; extern void init_udp(void); + +/* Hohum. Here we go. This is try number three for a more optimized Roxen. */ + +#ifdef _REENTRANT +struct thread_args +{ + struct thread_args *next; + struct object *from; + struct object *to; + int to_fd, from_fd; + struct svalue cb; + struct svalue args; + int len; + int sent; + THREAD_T tid; +}; + +MUTEX_T done_lock; +struct thread_args *done; + +// WARNING! This function is running _without_ any stack etc. +void *do_shuffle(void *_a) +{ + struct thread_args *a = (struct thread_args *)_a; + int sent = 0; + int fail=0; + char buffer[8192]; + +#ifdef DIRECTIO_ON + if(a->len >= 65536) + directio(a->from_fd, DIRECTIO_ON); +#endif + + while(!fail && a->len) + { + int nread, ts=0; + nread = read(a->from_fd, buffer, 8192); + if(nread <= 0) + break; + + while(nread) + { + int nsent = write(a->to_fd, buffer, nread); + if(nsent < 0) + { + fail=1; + break; + } + ts+=nsent; + nread -= nsent; + } + a->len -= ts; + sent += ts; + } + a->sent = sent; + + // We are done. It is up to the backend callback to call the + // finish function + mt_lock(&done_lock); + a->next = done; + done = a; + mt_unlock(&done_lock); + return 0; +} + +int num_shuffles = 0; +struct callback *my_callback; + +void finished_p(struct callback *foo, void *b, void *c) +{ + while(done) + { + struct thread_args *d; + + mt_lock(&done_lock); + d = done; + done = d->next; + mt_unlock(&done_lock); + + num_shuffles--; + + push_int( d->len ); + *(sp++) = d->args; + push_object( d->from ); + push_object( d->to ); + apply_svalue( &d->cb, 4 ); + pop_stack(); + free(d); + } + + if(num_shuffles) + { + next_timeout = current_time; + next_timeout.tv_usec += 40000; + } else { + remove_callback( foo ); + my_callback = 0; + } +} + +void f_shuffle(INT32 args) +{ + struct thread_args *a = malloc(sizeof(struct thread_args)); + struct svalue *q, *w; + get_all_args("shuffle", args, "%o%o%*%*%d", &a->from, &a->to,&q,&w,&a->len); + a->sent = 0; + + num_shuffles++; + apply(a->to, "query_fd", 0); + apply(a->from, "query_fd", 0); + get_all_args("shuffle2", 2, "%d%d", &a->to_fd, &a->from_fd); + + a->from->refs++; + a->to->refs++; + + a->cb = *q; + a->args = *w; + if(a->cb.type <= MAX_REF_TYPE) a->cb.u.refs[0]++; + if(a->args.type <= MAX_REF_TYPE) a->args.u.refs[0]++; + + th_create_small(&a->tid, do_shuffle, (void *)a); + + if(!my_callback) + { +/* next_timeout = current_time; */ +/* next_timeout.tv_usec += 40000; */ + my_callback = add_backend_callback( finished_p, 0, 0 ); + wake_up_backend(); + } + + pop_n_elems(args+2); +} +#endif + + void pike_module_init(void) { +#ifdef _REENTRANT + add_function("shuffle", f_shuffle, + "function(object,object,function,mixed,int:void)", 0); +#endif + #if 0 add_efun("fcgi_create_listen_socket", f_fcgi_create_listen_socket, "function(int:int)", OPT_SIDE_EFFECT);