Commit 2400cad4 authored by Niels Möller's avatar Niels Möller
Browse files

* src/io.c, src/io.h: Reorganized, and deleted all subclasses of

lsh_fd.
(io_connect): Take a continuation as argument.
(io_listen): Take an io_callback (typically of type
io_listen_callback) as argument.
(make_listen_callback): New function.

Rev: src/io.c:1.89
Rev: src/io.h:1.52
parent 56ee285e
......@@ -25,7 +25,6 @@
#include "format.h"
#include "werror.h"
#include "write_buffer.h"
#include "xalloc.h"
#include <assert.h>
......@@ -93,7 +92,7 @@ int io_iter(struct io_backend *b)
for(fd_p = &b->files; (fd = *fd_p); )
{
if (fd->super.alive && fd->prepare)
PREPARE_FD(fd);
FD_PREPARE(fd);
if (!fd->super.alive)
{
......@@ -104,8 +103,8 @@ int io_iter(struct io_backend *b)
{
/* Used by write fd:s to make sure that writing to its
* buffer fails. */
if (fd->really_close)
REALLY_CLOSE_FD(fd);
if (fd->write_close)
FD_WRITE_CLOSE(fd);
/* FIXME: The value returned from the close callback could be used
* to choose an exit code. */
......@@ -247,11 +246,11 @@ int io_iter(struct io_backend *b)
{
if (fd->want_write)
/* Will raise an i/o error */
WRITE_FD(fd);
FD_WRITE(fd);
else if (fd->want_read)
/* Ought to behave like EOF, but might raise an i/o
* error instead. */
READ_FD(fd);
FD_READ(fd);
else
{
werror("io.c: poll said POLLHUP on an inactive fd.\n");
......@@ -272,13 +271,13 @@ int io_iter(struct io_backend *b)
}
if (fds[i].revents & POLLOUT)
WRITE_FD(fd);
FD_WRITE(fd);
if (!fd->super.alive)
continue;
if (fds[i].revents & MY_POLLIN)
READ_FD(fd);
FD_READ(fd);
}
assert(i == nfds);
}
......@@ -287,8 +286,9 @@ int io_iter(struct io_backend *b)
}
static void do_buffered_read(struct io_read_callback *s,
struct lsh_fd *fd)
static void
do_buffered_read(struct io_callback *s,
struct lsh_fd *fd)
{
CAST(io_buffered_read, self, s);
UINT8 *buffer = alloca(self->buffer_size);
......@@ -357,21 +357,22 @@ static void do_buffered_read(struct io_read_callback *s,
}
struct io_read_callback *
struct io_callback *
make_buffered_read(UINT32 buffer_size,
struct read_handler *handler)
{
NEW(io_buffered_read, self);
self->super.read = do_buffered_read;
self->super.f = do_buffered_read;
self->buffer_size = buffer_size;
self->handler = handler;
return &self->super;
}
static void do_consuming_read(struct io_read_callback *c,
struct lsh_fd *fd)
static void
do_consuming_read(struct io_callback *c,
struct lsh_fd *fd)
{
CAST_SUBTYPE(io_consuming_read, self, c);
UINT32 wanted = READ_QUERY(self);
......@@ -423,23 +424,32 @@ static void do_consuming_read(struct io_read_callback *c,
void init_consuming_read(struct io_consuming_read *self,
struct abstract_write *consumer)
{
self->super.read = do_consuming_read;
self->super.f = do_consuming_read;
self->consumer = consumer;
}
static void write_callback(struct lsh_fd *fd)
/* ;; GABA:
(class
(name io_write_callback)
(super io_write_state)
(vars
(buffer object write_buffer)))
*/
static void
do_write_callback(struct io_callback *s UNUSED,
struct lsh_fd *fd)
{
CAST(io_fd, self, fd);
/* CAST(io_write_callback, self, s); */
UINT32 size;
int res;
size = MIN(self->write_buffer->end - self->write_buffer->start,
self->write_buffer->block_size);
size = MIN(fd->write_buffer->end - fd->write_buffer->start,
fd->write_buffer->block_size);
assert(size);
res = write(fd->fd,
self->write_buffer->buffer + self->write_buffer->start,
fd->write_buffer->buffer + fd->write_buffer->start,
size);
if (!res)
fatal("Closed?");
......@@ -462,14 +472,76 @@ static void write_callback(struct lsh_fd *fd)
break;
}
else
write_buffer_consume(self->write_buffer, res);
write_buffer_consume(fd->write_buffer, res);
}
static struct io_callback io_write_callback =
{ STATIC_HEADER, do_write_callback };
static void
do_write_prepare(struct lsh_fd *fd)
{
/* CAST(io_write_callback, self, s); */
assert(fd->write_buffer);
if (! (fd->want_write = write_buffer_pre_write(fd->write_buffer))
&& fd->write_buffer->closed)
close_fd(fd, CLOSE_EOF);
}
static void
do_write_close(struct lsh_fd *fd)
{
/* CAST(io_write_callback, self, s); */
assert(fd->write_buffer);
write_buffer_close(fd->write_buffer);
}
#if 0
static struct io_write_state *
make_io_write_callback(struct write_buffer *buffer)
{
NEW(io_write_callback, self);
self->super.super.f = do_write_callback;
self->super.prepare = do_write_prepare;
self->super.close = do_write_close;
self->buffer = buffer;
return &self->super;
}
#endif
struct listen_value *
make_listen_value(struct lsh_fd *fd,
struct address_info *peer)
{
NEW(listen_value, self);
self->fd = fd;
self->peer = peer;
return self;
}
/* GABA:
(class
(name io_listen_callback)
(super io_callback)
(vars
(backend object io_backend)
(c object command_continuation)
(e object exception_handler)))
*/
static void
do_listen_callback(struct io_read_callback *s UNUSED,
do_listen_callback(struct io_callback *s,
struct lsh_fd *fd)
{
CAST(listen_fd, self, fd);
CAST(io_listen_callback, self, s);
struct sockaddr_in peer;
size_t addr_len = sizeof(peer);
int conn;
......@@ -481,17 +553,40 @@ do_listen_callback(struct io_read_callback *s UNUSED,
werror("io.c: accept() failed, %z", STRERROR(errno));
return;
}
FD_LISTEN_CALLBACK(self->callback, conn,
sockaddr2info(addr_len,
(struct sockaddr *) &peer));
COMMAND_RETURN(self->c,
make_listen_value(make_lsh_fd(self->backend,
conn, self->e),
sockaddr2info(addr_len,
(struct sockaddr *) &peer)));
}
static struct io_read_callback listen_callback =
{ STATIC_HEADER, do_listen_callback };
struct io_callback *
make_listen_callback(struct io_backend *backend,
struct command_continuation *c,
struct exception_handler *e)
{
NEW(io_listen_callback, self);
self->super.f = do_listen_callback;
self->backend = backend;
self->c = c;
self->e = e;
return &self->super;
}
static void connect_callback(struct lsh_fd *fd)
/* GABA:
(class
(name io_connect_callback)
(super io_callback)
(vars
(c object command_continuation)))
*/
static void
do_connect_callback(struct io_callback *s,
struct lsh_fd *fd)
{
CAST(connect_fd, self, fd);
CAST(io_connect_callback, self, s);
int socket_error;
size_t len = sizeof(socket_error);
......@@ -503,15 +598,31 @@ static void connect_callback(struct lsh_fd *fd)
debug("io.c: connect_callback: Connect failed.\n");
EXCEPTION_RAISE(fd->e,
make_io_exception(EXC_IO_CONNECT, fd, 0, "connect() failed."));
kill_fd(fd);
}
else
{
FD_CALLBACK(self->callback, fd->fd);
/* To avoid actually closing the fd */
fd->fd = -1;
fd->write = NULL;
fd->want_write = 0;
COMMAND_RETURN(self->c, fd);
}
kill_fd(fd);
}
static struct io_callback *
make_connect_callback(struct command_continuation *c)
{
NEW(io_connect_callback, self);
self->super.f = do_connect_callback;
#if 0
self->super.prepare = NULL;
self->super.close = NULL;
#endif
self->c = c;
return &self->super;
}
/* FIXME: Perhaps this function should return a suitable exit code? */
......@@ -584,16 +695,16 @@ static void init_file(struct io_backend *b, struct lsh_fd *f, int fd,
f->close_callback = NULL;
f->prepare = NULL;
f->want_read = 0;
f->read = NULL;
f->want_write = 0;
f->write = NULL;
f->write_close = NULL;
f->super.alive = 1;
f->super.kill = do_kill_fd;
f->really_close = NULL;
f->next = b->files;
b->files = f;
......@@ -967,15 +1078,29 @@ void io_init_fd(int fd)
io_set_close_on_exec(fd);
}
struct lsh_fd *
make_lsh_fd(struct io_backend *b,
int fd,
struct exception_handler *e)
{
NEW(lsh_fd, f);
io_init_fd(fd);
init_file(b, f, fd, e);
return f;
}
/* Some code is taken from Thomas Bellman's tcputils. */
struct connect_fd *
struct lsh_fd *
io_connect(struct io_backend *b,
struct sockaddr_in *remote,
struct sockaddr_in *local,
struct fd_callback *f,
struct command_continuation *c,
struct exception_handler *e)
{
int s = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
struct lsh_fd *fd;
if (s<0)
return NULL;
......@@ -1001,26 +1126,22 @@ io_connect(struct io_backend *b,
return NULL;
}
{
NEW(connect_fd, fd);
init_file(b, &fd->super, s, e);
fd->super.want_write = 1;
fd->super.write = connect_callback;
fd->callback = f;
fd = make_lsh_fd(b, s, e);
fd->want_write = 1;
fd->write = make_connect_callback(c);
return fd;
}
return fd;
}
struct listen_fd *
struct lsh_fd *
io_listen(struct io_backend *b,
struct sockaddr_in *local,
struct fd_listen_callback *callback,
struct io_callback *callback,
struct exception_handler *e)
{
int s = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
struct lsh_fd *fd;
if (s<0)
return NULL;
......@@ -1046,112 +1167,74 @@ io_listen(struct io_backend *b,
return NULL;
}
{
NEW(listen_fd, fd);
/* FIXME: What handler to use? */
fd = make_lsh_fd(b, s, e);
/* FIXME: What handler to use? */
init_file(b, &fd->super, s, e);
fd->super.want_read = 1;
fd->super.read = &listen_callback;
fd->callback = callback;
return fd;
}
}
fd->want_read = 1;
fd->read = callback;
/* FIXME: Closing the write buffer should perhaps be done earlier,
* from kill_fd(). */
static void really_close(struct lsh_fd *fd)
{
CAST(io_fd, self, fd);
assert(self->write_buffer);
write_buffer_close(self->write_buffer);
}
static void prepare_write(struct lsh_fd *fd)
{
CAST(io_fd, self, fd);
assert(self->write_buffer);
if (! (fd->want_write = write_buffer_pre_write(self->write_buffer))
&& self->write_buffer->closed)
close_fd(fd, CLOSE_EOF);
return fd;
}
/* Constructors */
struct io_fd *make_io_fd(struct io_backend *b,
int fd,
struct exception_handler *e)
{
NEW(io_fd, f);
io_init_fd(fd);
init_file(b, &f->super, fd, e);
return f;
}
struct io_fd *io_read_write(struct io_fd *fd,
struct io_read_callback *read,
UINT32 block_size,
struct close_callback *close_callback)
struct lsh_fd *
io_read_write(struct lsh_fd *fd,
struct io_callback *read,
UINT32 block_size,
struct close_callback *close_callback)
{
struct write_buffer *buffer = write_buffer_alloc(block_size);
debug("io.c: Preparing fd %i for reading and writing\n",
fd->super.fd);
fd->fd);
/* Reading */
fd->super.read = read;
fd->super.want_read = !!read;
fd->read = read;
fd->want_read = !!read;
/* Writing */
fd->super.prepare = prepare_write;
fd->super.write = write_callback;
fd->write_buffer = buffer;
fd->write_buffer = write_buffer_alloc(block_size);
fd->write = &io_write_callback;
fd->prepare = do_write_prepare;
fd->write_close = do_write_close;
/* Closing */
fd->super.really_close = really_close;
fd->super.close_callback = close_callback;
fd->close_callback = close_callback;
return fd;
}
struct io_fd *io_read(struct io_fd *fd,
struct io_read_callback *read,
struct close_callback *close_callback)
struct lsh_fd *
io_read(struct lsh_fd *fd,
struct io_callback *read,
struct close_callback *close_callback)
{
debug("io.c: Preparing fd %i for reading\n", fd->super.fd);
debug("io.c: Preparing fd %i for reading\n", fd->fd);
/* Reading */
fd->super.want_read = !!read;
fd->super.read = read;
fd->want_read = !!read;
fd->read = read;
fd->super.close_callback = close_callback;
fd->close_callback = close_callback;
return fd;
}
struct io_fd *io_write(struct io_fd *fd,
UINT32 block_size,
struct close_callback *close_callback)
struct lsh_fd *
io_write(struct lsh_fd *fd,
UINT32 block_size,
struct close_callback *close_callback)
{
struct write_buffer *buffer = write_buffer_alloc(block_size);
debug("io.c: Preparing fd %i for writing\n", fd->super.fd);
debug("io.c: Preparing fd %i for writing\n", fd->fd);
/* Writing */
fd->super.prepare = prepare_write;
fd->super.write = write_callback;
fd->write_buffer = buffer;
fd->write_buffer = write_buffer_alloc(block_size);
fd->write = &io_write_callback;
fd->prepare = do_write_prepare;
fd->write_close = do_write_close;
fd->super.close_callback = close_callback;
fd->close_callback = close_callback;
return fd;
}
......@@ -1176,9 +1259,9 @@ void close_fd_nicely(struct lsh_fd *fd, int reason)
fd->want_read = 0;
fd->read = NULL;
if (fd->really_close)
if (fd->write_close)
/* Mark the write_buffer as closed */
REALLY_CLOSE_FD(fd);
FD_WRITE_CLOSE(fd);
else
/* There's no data buffered for write. */
kill_fd(fd);
......@@ -1255,7 +1338,7 @@ make_io_exception(UINT32 type, struct lsh_fd *fd, int error, const char *msg)
return &self->super;
}
struct io_fd *
struct lsh_fd *
io_write_file(struct io_backend *backend,
const char *fname, int flags, int mode,
UINT32 block_size,
......@@ -1266,10 +1349,10 @@ io_write_file(struct io_backend *backend,
if (fd < 0)
return NULL;
return io_write(make_io_fd(backend, fd, e), block_size, c);
return io_write(make_lsh_fd(backend, fd, e), block_size, c);
}
struct io_fd *
struct lsh_fd *
io_read_file(struct io_backend *backend,
const char *fname,
struct exception_handler *e)
......@@ -1278,5 +1361,5 @@ io_read_file(struct io_backend *backend,
if (fd < 0)
return NULL;
return make_io_fd(backend, fd, e);
return make_lsh_fd(backend, fd, e);
}
......@@ -27,6 +27,7 @@
#define LSH_IO_H_INCLUDED
#include "abstract_io.h"
#include "command.h"
#include "resource.h"
#include "write_buffer.h"
......@@ -41,9 +42,9 @@
#include "io.h.x"
#undef GABA_DECLARE
#if 0
/* A closed function with a file descriptor as argument */
/* GABA:
/* ;; GABA:
(class
(name fd_callback)
(vars
......@@ -51,6 +52,7 @@
*/
#define FD_CALLBACK(c, fd) ((c)->f(&(c), (fd)))
#endif
/* Close callbacks are called with a reason as argument. */
......@@ -78,17 +80,32 @@
#define CLOSE_CALLBACK(c, r) ((c)->f((c), (r)))
/* The fd read callback is a closure, in order to support different
* reading styles (buffered and consuming). */
/* The fd io callback is a closure, in order to support different
* reading styles (buffered and consuming). Also used for writing. */
/* GABA:
(class
(name io_read_callback)
(name io_callback)
(vars
(read method void "struct lsh_fd *fd")))
(f method void "struct lsh_fd *fd")))
*/
#define IO_READ_CALLBACK(c, fd) ((c)->read((c), (fd)))
#define IO_CALLBACK(c, fd) ((c)->f((c), (fd)))
#if 0
/* ;; GABA:
(class
(name io_write_state)
(super io_callback)
(vars
; Called before poll
(prepare method void "struct lsh_fd *fd")
; Called when fd is closed for writing.
(close method void)))
*/
#define IO_WRITE_PREPARE(s, f) (((s)->prepare)((s), (f)))
#define IO_WRITE_CLOSE(s) (((s)->close)((s)))
#endif
/* GABA:
(class
......@@ -110,27 +127,30 @@
; Called before poll
(prepare method void)
(want_read simple int)
; Called if poll indicates that data can be read.
;;(read method void)
(read object io_read_callback)
(read object io_callback)
(want_write simple in