Skip to content
Snippets Groups Projects
Commit fbb41489 authored by Per Hedbor's avatar Per Hedbor
Browse files

Fixed spider.shuffle

Rev: src/modules/spider/spider.c:1.53
parent eef116a2
No related branches found
No related tags found
No related merge requests found
...@@ -37,10 +37,12 @@ ...@@ -37,10 +37,12 @@
#include "mapping.h" #include "mapping.h"
#include "array.h" #include "array.h"
#include "builtin_functions.h" #include "builtin_functions.h"
#include "module_support.h"
#include "backend.h"
#include "threads.h" #include "threads.h"
#include "operators.h" #include "operators.h"
RCSID("$Id: spider.c,v 1.52 1998/02/10 14:50:23 grubba Exp $"); RCSID("$Id: spider.c,v 1.53 1998/02/10 15:44:57 per Exp $");
#ifdef HAVE_PWD_H #ifdef HAVE_PWD_H
#include <pwd.h> #include <pwd.h>
...@@ -79,8 +81,9 @@ RCSID("$Id: spider.c,v 1.52 1998/02/10 14:50:23 grubba Exp $"); ...@@ -79,8 +81,9 @@ RCSID("$Id: spider.c,v 1.52 1998/02/10 14:50:23 grubba Exp $");
#include <errno.h> #include <errno.h>
#include "dmalloc.h" /* #include <stdlib.h> */
#include "dmalloc.h"
#include "accesseddb.h" #include "accesseddb.h"
#define MAX_PARSE_RECURSE 102 #define MAX_PARSE_RECURSE 102
...@@ -1261,8 +1264,148 @@ static struct program *streamed_parser; ...@@ -1261,8 +1264,148 @@ static struct program *streamed_parser;
extern void init_udp(void); extern void init_udp(void);
/* Hohum. Here we go. This is try number three for a more optimized Roxen. */
#ifdef _REENTRANT
struct thread_args
{
struct thread_args *next;
struct object *from;
struct object *to;
int to_fd, from_fd;
struct svalue cb;
struct svalue args;
int len;
int sent;
THREAD_T tid;
};
MUTEX_T done_lock;
struct thread_args *done;
// WARNING! This function is running _without_ any stack etc.
void *do_shuffle(void *_a)
{
struct thread_args *a = (struct thread_args *)_a;
int sent = 0;
int fail=0;
char buffer[8192];
#ifdef DIRECTIO_ON
if(a->len >= 65536)
directio(a->from_fd, DIRECTIO_ON);
#endif
while(!fail && a->len)
{
int nread, ts=0;
nread = read(a->from_fd, buffer, 8192);
if(nread <= 0)
break;
while(nread)
{
int nsent = write(a->to_fd, buffer, nread);
if(nsent < 0)
{
fail=1;
break;
}
ts+=nsent;
nread -= nsent;
}
a->len -= ts;
sent += ts;
}
a->sent = sent;
// We are done. It is up to the backend callback to call the
// finish function
mt_lock(&done_lock);
a->next = done;
done = a;
mt_unlock(&done_lock);
return 0;
}
int num_shuffles = 0;
struct callback *my_callback;
void finished_p(struct callback *foo, void *b, void *c)
{
while(done)
{
struct thread_args *d;
mt_lock(&done_lock);
d = done;
done = d->next;
mt_unlock(&done_lock);
num_shuffles--;
push_int( d->len );
*(sp++) = d->args;
push_object( d->from );
push_object( d->to );
apply_svalue( &d->cb, 4 );
pop_stack();
free(d);
}
if(num_shuffles)
{
next_timeout = current_time;
next_timeout.tv_usec += 40000;
} else {
remove_callback( foo );
my_callback = 0;
}
}
void f_shuffle(INT32 args)
{
struct thread_args *a = malloc(sizeof(struct thread_args));
struct svalue *q, *w;
get_all_args("shuffle", args, "%o%o%*%*%d", &a->from, &a->to,&q,&w,&a->len);
a->sent = 0;
num_shuffles++;
apply(a->to, "query_fd", 0);
apply(a->from, "query_fd", 0);
get_all_args("shuffle2", 2, "%d%d", &a->to_fd, &a->from_fd);
a->from->refs++;
a->to->refs++;
a->cb = *q;
a->args = *w;
if(a->cb.type <= MAX_REF_TYPE) a->cb.u.refs[0]++;
if(a->args.type <= MAX_REF_TYPE) a->args.u.refs[0]++;
th_create_small(&a->tid, do_shuffle, (void *)a);
if(!my_callback)
{
/* next_timeout = current_time; */
/* next_timeout.tv_usec += 40000; */
my_callback = add_backend_callback( finished_p, 0, 0 );
wake_up_backend();
}
pop_n_elems(args+2);
}
#endif
void pike_module_init(void) void pike_module_init(void)
{ {
#ifdef _REENTRANT
add_function("shuffle", f_shuffle,
"function(object,object,function,mixed,int:void)", 0);
#endif
#if 0 #if 0
add_efun("fcgi_create_listen_socket", f_fcgi_create_listen_socket, add_efun("fcgi_create_listen_socket", f_fcgi_create_listen_socket,
"function(int:int)", OPT_SIDE_EFFECT); "function(int:int)", OPT_SIDE_EFFECT);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment