Commit fc7818af authored by Niels Möller's avatar Niels Möller
Browse files

Made a new funtion, io_iter, for the contents of the io_run loop.

Rev: src/io.c:1.11
parent d072255f
......@@ -89,219 +89,223 @@ static int do_read(struct abstract_read **r, UINT32 length, UINT8 *buffer)
/* UNLINK_FD must be followed by a continue, to avoid updating _fd */
#define UNLINK_FD (*_fd = (*_fd)->next)
void io_run(struct io_backend *b)
static int io_iter(struct io_backend *b)
{
while(1)
struct pollfd *fds;
int i;
unsigned long nfds; /* FIXME: Should be nfds_t if that type is defined */
int timeout;
int res;
nfds = b->nio + b->nlisten + b->nconnect;
if (b->callouts)
{
struct pollfd *fds;
int i;
unsigned long nfds; /* FIXME: Should be nfds_t if that type is defined */
int timeout;
int res;
nfds = b->nio + b->nlisten + b->nconnect;
if (b->callouts)
{
time_t now = time(NULL);
if (now >= b->callouts->when)
timeout = 0;
else
{
if (b->callouts->when > now + MAX_TIMEOUT)
timeout = MAX_TIMEOUT * 1000;
else
timeout = (b->callouts->when - now) * 1000;
}
}
time_t now = time(NULL);
if (now >= b->callouts->when)
timeout = 0;
else
{
if (!nfds)
/* All done */
break;
timeout = -1;
}
{
if (b->callouts->when > now + MAX_TIMEOUT)
timeout = MAX_TIMEOUT * 1000;
else
timeout = (b->callouts->when - now) * 1000;
}
}
else
{
if (!nfds)
/* All done */
return 0;
timeout = -1;
}
fds = alloca(sizeof(struct pollfd) * nfds);
fds = alloca(sizeof(struct pollfd) * nfds);
/* Handle fds in order: write, read, accept, connect. */
i = 0;
/* Handle fds in order: write, read, accept, connect. */
i = 0;
FOR_FDS(struct io_fd, fd, b->io, i++)
{
fds[i].fd = fd->on_hold ? -1 : fd->fd;
fds[i].events = 0;
if (!fd->on_hold)
fds[i].events |= POLLIN;
/* pre_write returns 0 if the buffer is empty */
if (write_buffer_pre_write(fd->buffer))
fds[i].events |= POLLOUT;
}
END_FOR_FDS;
FOR_FDS(struct io_fd, fd, b->io, i++)
{
fds[i].fd = fd->fd;
fds[i].events = 0;
if (!fd->on_hold)
fds[i].events |= POLLIN;
/* pre_write returns 0 if the buffer is empty */
if (write_buffer_pre_write(fd->buffer))
fds[i].events |= POLLOUT;
}
END_FOR_FDS;
FOR_FDS(struct listen_fd, fd, b->listen, i++)
{
fds[i].fd = fd->fd;
fds[i].events = POLLIN;
}
END_FOR_FDS;
FOR_FDS(struct listen_fd, fd, b->listen, i++)
{
fds[i].fd = fd->fd;
fds[i].events = POLLIN;
}
END_FOR_FDS;
FOR_FDS(struct connect_fd, fd, b->connect, i++)
{
fds[i].fd = fd->fd;
fds[i].events = POLLOUT;
}
FOR_FDS(struct connect_fd, fd, b->connect, i++)
{
fds[i].fd = fd->fd;
fds[i].events = POLLOUT;
}
END_FOR_FDS;
res = poll(fds, nfds, timeout);
if (!res)
{
/* Timeout. Run the callout */
struct callout *f = b->callouts;
if (!CALLBACK(f->callout))
fatal("What now?");
b->callouts = f->next;
lsh_free(f);
}
if (res<0)
{
switch(errno)
{
case EAGAIN:
case EINTR:
return 1;
default:
fatal("io_run:poll failed: %s", strerror(errno));
}
}
else
{ /* Process files */
i = 0;
/* Handle writing first */
FOR_FDS(struct io_fd, fd, b->io, i++)
{
if (fds[i].revents & POLLOUT)
{
UINT32 size = MIN(fd->buffer->end - fd->buffer->start,
fd->buffer->block_size);
int res = write(fd->fd,
fd->buffer->buffer + fd->buffer->start,
size);
if (!res)
fatal("Closed?");
if (res < 0)
switch(errno)
{
case EINTR:
case EAGAIN:
break;
default:
werror("io.c: write failed, %s\n", strerror(errno));
CALLBACK(fd->close_callback);
fd->please_close = 1;
break;
}
else if (!res)
fatal("What now?");
else
fd->buffer->start += res;
}
}
END_FOR_FDS;
res = poll(fds, nfds, timeout);
/* Handle reading */
i = 0; /* Start over */
FOR_FDS(struct io_fd, fd, b->io, i++)
{
if (!fd->please_close
&& (fds[i].revents & POLLIN))
{
struct fd_read r =
{ { do_read }, fd->fd };
if (!res)
{
/* Timeout. Run the callout */
struct callout *f = b->callouts;
if (!CALLBACK(f->callout))
fatal("What now?");
b->callouts = f->next;
lsh_free(f);
}
if (res<0)
{
switch(errno)
{
case EAGAIN:
case EINTR:
continue;
default:
fatal("io_run:poll failed: %s", strerror(errno));
}
}
else
{ /* Process files */
i = 0;
/* Handle writing first */
FOR_FDS(struct io_fd, fd, b->io, i++)
{
if (fds[i].revents & POLLOUT)
{
UINT32 size = MIN(fd->buffer->end - fd->buffer->start,
fd->buffer->block_size);
int res = write(fd->fd,
fd->buffer->buffer + fd->buffer->start,
size);
if (!res)
fatal("Closed?");
if (res < 0)
switch(errno)
{
case EINTR:
case EAGAIN:
break;
default:
werror("io.c: write failed, %s\n", strerror(errno));
CALLBACK(fd->close_callback);
fd->please_close = 1;
break;
}
else if (!res)
fatal("What now?");
else
fd->buffer->start += res;
}
}
END_FOR_FDS;
/* Handle reading */
i = 0; /* Start over */
FOR_FDS(struct io_fd, fd, b->io, i++)
{
if (!fd->please_close
&& (fds[i].revents & POLLIN))
{
struct fd_read r =
{ { do_read }, fd->fd };
/* The handler function may install a new handler */
if (!READ_HANDLER(fd->handler,
&r.super))
{
/* FIXME: Perhaps we should not close yet, but
* stop reading and close as soon as the write
* buffer is flushed? But in that case, we
* probably also want to set some flag on the
* write buffer so that no more data can be
* written into it. */
fd->please_close = 1;
}
}
if (fd->please_close)
/* The handler function may install a new handler */
if (!READ_HANDLER(fd->handler,
&r.super))
{
/* FIXME: Cleanup properly...
*
* After a write error, read state must be freed,
* and vice versa. */
UNLINK_FD;
b->nio--;
if (fd->handler)
lsh_free(fd->handler);
lsh_free(fd->buffer);
lsh_free(fd);
continue;
/* FIXME: Perhaps we should not close yet, but
* stop reading and close as soon as the write
* buffer is flushed? But in that case, we
* probably also want to set some flag on the
* write buffer so that no more data can be
* written into it. */
fd->please_close = 1;
}
}
END_FOR_FDS;
if (fd->please_close)
{
/* FIXME: Cleanup properly...
*
* After a write error, read state must be freed,
* and vice versa. */
UNLINK_FD;
b->nio--;
if (fd->handler)
lsh_free(fd->handler);
lsh_free(fd->buffer);
lsh_free(fd);
continue;
}
}
END_FOR_FDS;
FOR_FDS(struct listen_fd, fd, b->listen, i++)
FOR_FDS(struct listen_fd, fd, b->listen, i++)
{
if (fds[i].revents & POLLIN)
{
if (fds[i].revents & POLLIN)
{
/* FIXME: Do something with the peer address? */
struct sockaddr_in peer;
size_t addr_len = sizeof(peer);
/* FIXME: Do something with the peer address? */
struct sockaddr_in peer;
size_t addr_len = sizeof(peer);
int conn = accept(fd->fd,
(struct sockaddr *) &peer, &addr_len);
if (conn < 0)
{
werror("io.c: accept() failed, %s", strerror(errno));
continue;
}
if (!FD_CALLBACK(fd->callback, conn))
{
/* FIXME: Should fd be closed here? */
UNLINK_FD;
lsh_free(fd);
continue;
}
int conn = accept(fd->fd,
(struct sockaddr *) &peer, &addr_len);
if (conn < 0)
{
werror("io.c: accept() failed, %s", strerror(errno));
continue;
}
}
END_FOR_FDS;
FOR_FDS(struct connect_fd, fd, b->connect, i++)
{
if (fds[i].revents & POLLOUT)
if (!FD_CALLBACK(fd->callback, conn))
{
if (!FD_CALLBACK(fd->callback, fd->fd))
fatal("What now?");
b->nconnect--;
/* FIXME: Should fd be closed here? */
UNLINK_FD;
lsh_free(fd);
continue;
}
}
END_FOR_FDS;
}
}
END_FOR_FDS;
FOR_FDS(struct connect_fd, fd, b->connect, i++)
{
if (fds[i].revents & POLLOUT)
{
if (!FD_CALLBACK(fd->callback, fd->fd))
fatal("What now?");
b->nconnect--;
UNLINK_FD;
lsh_free(fd);
continue;
}
}
END_FOR_FDS;
}
return 1;
}
void io_run(struct io_backend *b)
{
while(io_iter(b))
;
}
/*
* Fill in ADDR from HOST, SERVICE and PROTOCOL.
* Supplying a null pointer for HOST means use INADDR_ANY.
* Otherwise HOST is an numbers-and-dits ip-number or a dns name.
* Otherwise HOST is an numbers-and-dots ip-number or a dns name.
*
* PROTOCOL can be tcp or udp.
*
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment