Commit 82a1977f authored by Niels Möller's avatar Niels Möller

Rewrite of the backend.

Rev: src/io.c:1.26
Rev: src/io.h:1.18
parent 1a4b0e2b
......@@ -40,8 +40,147 @@
#include "write_buffer.h"
#include "xalloc.h"
/* A little more than an hour */
#define MAX_TIMEOUT 4000
int io_iter(struct io_backend *b)
{
unsigned long nfds; /* FIXME: Should be nfds_t if that type is defined */
struct pollfd *fds;
int timeout;
int res;
nfds = 0;
{
/* Prepare fd:s. This fase calls the prepare-methods, also closes
* and unlinks any fd:s that should be closed, and also counts how
* many fd:s there are. */
struct lsh_fd **_fd;
struct lsh_fd *fd;
for(_fd = &b->files; (fd = *_fd); )
{
if (!fd->close_now && fd->prepare)
PREPARE_FD(fd);
if (fd->close_now)
{
if (fd->fd < 0)
/* Unlink the file object, but don't close any underlying file. */
;
else
{
/* Used by write fd:s to make sure that writing to its
* buffer fails. */
if (fd->really_close)
REALLY_CLOSE_FD(fd);
/* FIXME: The value returned from the close callback could be used
* to choose an exit code. */
if (fd->close_callback && fd->close_reason)
(void) CLOSE_CALLBACK(fd->close_callback, fd->close_reason);
debug("Closing fd %d.\n", fd->fd);
close(fd->fd);
}
/* Unlink this fd */
*_fd = fd->next;
continue;
}
nfds++;
_fd = &fd->next;
}
}
if (!nfds)
/* Nothing more to do.
*
* NOTE: There might be some callouts left, but we won't wait for them. */
return 0;
/* FIXME: Callouts not implemented */
timeout = -1;
fds = alloca(sizeof(struct pollfd) * nfds);
/* Fill out fds-array */
{
struct lsh_fd *fd;
int i;
int all_events = 0;
for (fd = b->files, i = 0; fd; fd = fd->next, i++)
{
assert(i < nfds);
fds[i].fd = fd->fd;
fds[i].events = 0;
if (fd->want_read)
fds[i].events |= POLLIN;
if (fd->want_write)
fds[i].events |= POLLOUT;
all_events |= fds[i].events;
}
assert(i == nfds);
if (!all_events)
{
/* Nothing happens */
/* NOTE: There might be some callouts left, but we don't wait */
return 0;
}
}
res = poll(fds, nfds, timeout);
if (!res)
{
/* Callouts are not implemented */
fatal("Unexpected timeout\n");
}
if (res < 0)
switch(errno)
{
case EAGAIN:
case EINTR:
return 1;
default:
fatal("io_iter: poll failed: %s", strerror(errno));
}
{
/* Do io. Note that the callback functions may add new fds to the
* head of the list, or set the close_now flag on any fd. */
struct lsh_fd *fd;
int i;
for(fd = b->files, i=0; fd; fd = fd->next, i++)
{
assert(i<nfds);
if (fd->close_now)
continue;
if (fds[i].revents & POLLOUT)
WRITE_FD(fd);
if (fd->close_now)
continue;
if (fds[i].revents & POLLIN)
READ_FD(fd);
}
assert(i == nfds);
}
return 1;
}
struct fd_read
{
......@@ -84,426 +223,155 @@ static int do_read(struct abstract_read **r, UINT32 length, UINT8 *buffer)
}
}
#define FOR_FDS(type, fd, list, extra) \
{ \
type **(_fd); \
type *(fd); \
for(_fd = &(list); ((fd) = *_fd); (extra)) {
#define END_FOR_FDS _fd = &(*_fd)->next; } }
/* UNLINK_FD must be followed by a continue, to avoid updating _fd */
#define UNLINK_FD (*_fd = (*_fd)->next)
static void really_close_fd(struct io_fd *fd)
static void read_callback(struct lsh_fd *fd)
{
/* FIXME: The value returned from the close callback could be used
* to choose an exit code. */
if (fd->close_callback && fd->close_reason)
(void) CLOSE_CALLBACK(fd->close_callback, fd->close_reason);
struct io_fd *self = (struct io_fd *) fd;
int res;
debug("Closing fd %d.\n", fd->fd);
struct fd_read r =
{ { STACK_HEADER, do_read }, fd->fd };
close(fd->fd);
MDEBUG(self);
/* The handler function may install a new handler */
res = READ_HANDLER(self->handler,
&r.super);
/* Make sure writing to the buffer fails. */
if (fd->buffer)
write_buffer_close(fd->buffer);
/* NOTE: These flags are not mutually exclusive. All combination
* must be handled correctly. */
/* There can be other objects around that may still
* attempt to write to the buffer. So let gc handle it
* instead of freeing it explicitly */
#if 0
lsh_object_free(fd->buffer);
#endif
/* NOTE: (i) If LSH_DIE is set, LSH_CLOSE is ignored. (ii) If the fd
* is read_only, LSH_CLOSE is the same as LSH_DIE. */
/* There may be pointers to fd objects. So don't free them here. */
#if 0
/* Handlers are not shared, so it should be ok to free them. */
lsh_object_free(fd->handler);
lsh_object_free(fd);
#endif
}
/* This condition must be taken care of earlier. */
assert(!(res & LSH_CHANNEL_FINISHED));
/* FIXME: This code breaks horribly if new files are created by a
* callback function. */
int io_iter(struct io_backend *b)
{
struct pollfd *fds;
int i;
unsigned long nfds; /* FIXME: Should be nfds_t if that type is defined */
int timeout;
int res;
/* We must first look at the write descriptors, to see if any of
* them should be closed. */
/* Not implemented */
assert(!(res & LSH_KILL_OTHERS));
i = 0;
FOR_FDS(struct io_fd, fd, b->io, i++)
if (res & LSH_HOLD)
{
/* pre_write returns 0 if the buffer is empty */
if (fd->buffer)
{
/* Ignores return value. Look at the empty attribute instead- */
(void) write_buffer_pre_write(fd->buffer);
if (fd->buffer->empty && fd->buffer->closed)
{
really_close_fd(fd);
UNLINK_FD;
b->nio--;
continue;
}
}
/* This flag should not be combined with anything else */
assert(res == LSH_HOLD);
fd->want_read = 0;
}
END_FOR_FDS;
nfds = b->nio + b->nlisten + b->nconnect;
if (b->callouts)
if (res & LSH_DIE)
{
time_t now = time(NULL);
if (now >= b->callouts->when)
timeout = 0;
else
{
if (b->callouts->when > now + MAX_TIMEOUT)
timeout = MAX_TIMEOUT * 1000;
else
timeout = (b->callouts->when - now) * 1000;
}
if (self->buffer)
write_buffer_close(self->buffer);
fd->close_reason = LSH_FAILUREP(res)
? CLOSE_PROTOCOL_FAILURE : 0;
fd->close_now = 1;
}
else
else if (res & LSH_CLOSE)
{
if (!nfds)
/* All done */
return 0;
timeout = -1;
if (self->buffer)
{
write_buffer_close(self->buffer);
/* Don't attempt to read any further. */
/* FIXME: Is it safe to free the handler here? */
self->super.want_read = 0;
self->handler = NULL;
}
else
fd->close_now = 1;
fd->close_reason
= LSH_FAILUREP(res) ? CLOSE_PROTOCOL_FAILURE : CLOSE_EOF;
}
}
fds = alloca(sizeof(struct pollfd) * nfds);
static void write_callback(struct lsh_fd *fd)
{
struct io_fd *self = (struct io_fd *) fd;
UINT32 size;
int res;
MDEBUG(self);
/* Handle fds in order: write, read, accept, connect. */
i = 0;
size = MIN(self->buffer->end - self->buffer->start,
self->buffer->block_size);
assert(size);
res = write(fd->fd,
self->buffer->buffer + self->buffer->start,
size);
if (!res)
fatal("Closed?");
if (res < 0)
switch(errno)
{
case EINTR:
case EAGAIN:
break;
case EPIPE:
werror("Broken pipe\n");
fd->close_reason = CLOSE_WRITE_FAILED;
fd->close_now = 1;
break;
default:
werror("io.c: write failed, %s\n", strerror(errno));
fd->close_reason = CLOSE_WRITE_FAILED;
fd->close_now = 1;
break;
}
else
self->buffer->start += res;
}
FOR_FDS(struct io_fd, fd, b->io, i++)
{
fds[i].fd = fd->fd;
fds[i].events = 0;
if (fd->handler && !fd->on_hold)
fds[i].events |= POLLIN;
static void listen_callback(struct lsh_fd *fd)
{
struct listen_fd *self = (struct listen_fd *) fd;
struct sockaddr_in peer;
size_t addr_len = sizeof(peer);
int res;
int conn;
if (fd->buffer && !fd->buffer->empty)
fds[i].events |= POLLOUT;
}
END_FOR_FDS;
MDEBUG(self);
/* FIXME: Do something with the peer address? */
FOR_FDS(struct listen_fd, fd, b->listen, i++)
conn = accept(fd->fd,
(struct sockaddr *) &peer, &addr_len);
if (conn < 0)
{
fds[i].fd = fd->fd;
fds[i].events = POLLIN;
werror("io.c: accept() failed, %s", strerror(errno));
return;
}
END_FOR_FDS;
FOR_FDS(struct connect_fd, fd, b->connect, i++)
res = FD_CALLBACK(self->callback, conn);
if (LSH_ACTIONP(res))
{
fds[i].fd = fd->fd;
fds[i].events = POLLOUT;
werror("Strange: Accepted a connection, "
"but failed before writing anything.\n");
fd->close_now = 1;
fd->close_reason = LSH_FAILUREP(res) ? CLOSE_PROTOCOL_FAILURE
: CLOSE_EOF;
}
END_FOR_FDS;
}
res = poll(fds, nfds, timeout);
static void connect_callback(struct lsh_fd *fd)
{
struct connect_fd *self = (struct connect_fd *) fd;
int res;
MDEBUG(self);
if (!res)
{
/* Timeout. Run the callout */
struct callout *f = b->callouts;
res = FD_CALLBACK(self->callback, fd->fd);
if (!CALLBACK(f->callout))
fatal("What now?");
b->callouts = f->next;
lsh_object_free(f);
}
if (res<0)
if (LSH_ACTIONP(res))
{
switch(errno)
{
case EAGAIN:
case EINTR:
return 1;
default:
fatal("io_iter:poll failed: %s", strerror(errno));
}
werror("Strange: Connected, "
"but failed before writing anything.\n");
}
else
{ /* Process files */
i = 0;
/* Handle writing first */
FOR_FDS(struct io_fd, fd, b->io, i++)
{
if (fds[i].revents & POLLOUT)
{
UINT32 size = MIN(fd->buffer->end - fd->buffer->start,
fd->buffer->block_size);
int res = write(fd->fd,
fd->buffer->buffer + fd->buffer->start,
size);
if (!res)
fatal("Closed?");
if (res < 0)
switch(errno)
{
case EINTR:
case EAGAIN:
break;
case EPIPE:
werror("Broken pipe\n");
fd->close_reason = CLOSE_WRITE_FAILED;
fd->close_now = 1;
break;
default:
werror("io.c: write failed, %s\n", strerror(errno));
fd->close_reason = CLOSE_WRITE_FAILED;
fd->close_now = 1;
break;
}
else
fd->buffer->start += res;
}
}
END_FOR_FDS;
/* Handle reading */
i = 0; /* Start over */
FOR_FDS(struct io_fd, fd, b->io, i++)
{
if (!fd->close_now
&& (fds[i].revents & POLLIN))
{
int res;
struct fd_read r =
{ { STACK_HEADER, do_read }, fd->fd };
/* The handler function may install a new handler */
res = READ_HANDLER(fd->handler,
&r.super);
/* NOTE: These flags are not mutually exclusive. All
* combination must be handled correctly. */
/* NOTE: (i) If LSH_DIE is set, LSH_CLOSE is ignored.
* (ii) If the fd is read_only, LSH_CLOSE is the same as LSH_DIE.
*/
#if 0
if ( (res & (LSH_CLOSE | LSH_DIE)) == (LSH_CLOSE | LSH_DIE) )
{
debug("return code %x, both LSH_CLOSE and LSH_DIE set.\n",
res);
/* LSH_DIE takes precedence */
res &= ~LSH_CLOSE;
/* FIXME: Perhaps we should always set LSH_FAIL in
* this case? */
}
#endif
/* This condition must be taken care of earlier. */
assert(!(res & LSH_CHANNEL_FINISHED));
if (res & LSH_HOLD)
{
/* This flag should not be combined with anything else */
assert(res == LSH_HOLD);
fd->on_hold = 1;
}
if (res & LSH_DIE)
{
if (fd->buffer)
write_buffer_close(fd->buffer);
fd->close_reason = LSH_FAILUREP(res)
? CLOSE_PROTOCOL_FAILURE : 0;
fd->close_now = 1;
}
else if (res & LSH_CLOSE)
{
if (fd->buffer)
{
write_buffer_close(fd->buffer);
/* Don't attempt to read any further. */
/* FIXME: Is it safe to free the handler here? */
fd->handler = NULL;
}
else
fd->close_now = 1;
fd->close_reason
= LSH_FAILUREP(res) ? CLOSE_PROTOCOL_FAILURE : CLOSE_EOF;
}
if (res & LSH_KILL_OTHERS)
{
/* Close all other files. We have probably fork()ed. */
{
struct io_fd *p;
struct io_fd *next;
for (p = b->io; p; p = next)
{
next = p->next;
if (p->fd != fd->fd)
{
p->close_reason = 0;
/* In this case, it should be safe to
* deallocate the buffer immediately */
lsh_object_free(p->buffer);
really_close_fd(p);
}
}
if (fd->close_now)
{
/* Some error occured. So close this fd too! */
really_close_fd(fd);
b->io = NULL;
b->nio = 0;
}
else
{ /* Keep this single descriptor open */
fd->next = NULL;
b->io = fd;
b->nio = 1;
}
}{
struct listen_fd *p;
struct listen_fd *next;
for (p = b->listen; p; p = next)
{
next = p->next;
close(p->fd);
lsh_space_free(p);
}
b->listen = NULL;
b->nlisten = 0;
}{
struct connect_fd *p;
struct connect_fd *next;
for (p = b->connect; p; p = next)
{
next = p->next;
close(p->fd);
lsh_space_free(p);
}
b->connect = NULL;
b->nconnect = 0;
}{
struct callout *p;
struct callout *next;
for (p = b->callouts; p; p = next)
{
next = p->next;
lsh_space_free(p);
}
b->callouts = NULL;
}
/* Skip the rest of this iteration */
return 1;
}
}
#if 0
if (fd->close_now)
{
/* FIXME: Cleanup properly...
*
* After a write error, read state must be freed,
* and vice versa. */
really_close_fd(fd);
UNLINK_FD;
b->nio--;
continue;
}
#endif
}
END_FOR_FDS;
/* Close files */
i = 0; /* Start over */
FOR_FDS(struct io_fd, fd, b->io, i++)
if (fd->close_now)
{
/* FIXME: Cleanup properly...
*
* After a write error, read state must be freed,
* and vice versa. */
really_close_fd(fd);
UNLINK_FD;
b->nio--;
continue;
}
END_FOR_FDS;
FOR_FDS(struct listen_fd, fd, b->listen, i++)
{
if (fds[i].revents & POLLIN)
{