Commit 3dd6e4e7 authored by Niels Möller's avatar Niels Möller

Make it possible to mark a write_buffer as closed.

Rev: src/io.c:1.13
Rev: src/write_buffer.c:1.8
Rev: src/write_buffer.h:1.8
parent 24592d5b
...@@ -137,6 +137,12 @@ static int io_iter(struct io_backend *b) ...@@ -137,6 +137,12 @@ static int io_iter(struct io_backend *b)
/* pre_write returns 0 if the buffer is empty */ /* pre_write returns 0 if the buffer is empty */
if (write_buffer_pre_write(fd->buffer)) if (write_buffer_pre_write(fd->buffer))
fds[i].events |= POLLOUT; fds[i].events |= POLLOUT;
else
/* Buffer is empty. Should we close? */
if (fd->buffer->closed)
{
fd->close_now = 1;
}
} }
END_FOR_FDS; END_FOR_FDS;
...@@ -201,14 +207,12 @@ static int io_iter(struct io_backend *b) ...@@ -201,14 +207,12 @@ static int io_iter(struct io_backend *b)
break; break;
default: default:
werror("io.c: write failed, %s\n", strerror(errno)); werror("io.c: write failed, %s\n", strerror(errno));
CALLBACK(fd->close_callback);
fd->please_close = 1; fd->close_reason = CLOSE_WRITE_FAILED;
fd->close_now = 1;
break; break;
} }
// else if (!res)
// fatal("What now?");
else else
fd->buffer->start += res; fd->buffer->start += res;
} }
...@@ -219,31 +223,49 @@ static int io_iter(struct io_backend *b) ...@@ -219,31 +223,49 @@ static int io_iter(struct io_backend *b)
i = 0; /* Start over */ i = 0; /* Start over */
FOR_FDS(struct io_fd, fd, b->io, i++) FOR_FDS(struct io_fd, fd, b->io, i++)
{ {
if (!fd->please_close if (!fd->close_now
&& (fds[i].revents & POLLIN)) && (fds[i].revents & POLLIN))
{ {
int res;
struct fd_read r = struct fd_read r =
{ { STATIC_HEADER do_read }, fd->fd }; { { STATIC_HEADER do_read }, fd->fd };
/* The handler function may install a new handler */ /* The handler function may install a new handler */
if (!READ_HANDLER(fd->handler, res = READ_HANDLER(fd->handler,
&r.super)) &r.super);
switch(LSH_GET_ACTION(res))
{ {
/* FIXME: Perhaps we should not close yet, but case LSH_GOON:
* stop reading and close as soon as the write if (LSH_FAILUREP(res))
* buffer is flushed? But in that case, we fatal("Internal error\n");
* probably also want to set some flag on the break;
* write buffer so that no more data can be case LSH_CLOSE:
* written into it. */ if (fd->buffer)
fd->please_close = 1; write_buffer_close(fd->buffer);
fd->close_reason
= LSH_FAILUREP(res) ? CLOSE_PROTOCOL_FAILURE : CLOSE_EOF;
break;
case LSH_DIE:
fd->close_reason = CLOSE_PROTOCOL_FAILURE;
fd->close_now = 1;
break;
default:
fatal("Internal error!\n");
} }
} }
if (fd->please_close) if (fd->close_now)
{ {
/* FIXME: Cleanup properly... /* FIXME: Cleanup properly...
* *
* After a write error, read state must be freed, * After a write error, read state must be freed,
* and vice versa. */ * and vice versa. */
/* FIXME: The value returned from the close callback could be used
* to choose an exit code. */
if (fd->close_callback)
CLOSE_CALLBACK(fd->close_callback, fd->close_reason);
UNLINK_FD; UNLINK_FD;
b->nio--; b->nio--;
if (fd->handler) if (fd->handler)
...@@ -262,7 +284,8 @@ static int io_iter(struct io_backend *b) ...@@ -262,7 +284,8 @@ static int io_iter(struct io_backend *b)
/* FIXME: Do something with the peer address? */ /* FIXME: Do something with the peer address? */
struct sockaddr_in peer; struct sockaddr_in peer;
size_t addr_len = sizeof(peer); size_t addr_len = sizeof(peer);
int res;
int conn = accept(fd->fd, int conn = accept(fd->fd,
(struct sockaddr *) &peer, &addr_len); (struct sockaddr *) &peer, &addr_len);
if (conn < 0) if (conn < 0)
...@@ -270,9 +293,12 @@ static int io_iter(struct io_backend *b) ...@@ -270,9 +293,12 @@ static int io_iter(struct io_backend *b)
werror("io.c: accept() failed, %s", strerror(errno)); werror("io.c: accept() failed, %s", strerror(errno));
continue; continue;
} }
if (!FD_CALLBACK(fd->callback, conn)) res = FD_CALLBACK(fd->callback, conn);
if (LSH_PROBLEMP(res))
{ {
/* FIXME: Should fd be closed here? */ werror("Strange: Accepted a connection, "
"but failed before writing anything.\n");
close(fd->fd);
UNLINK_FD; UNLINK_FD;
lsh_free(fd); lsh_free(fd);
continue; continue;
...@@ -285,8 +311,11 @@ static int io_iter(struct io_backend *b) ...@@ -285,8 +311,11 @@ static int io_iter(struct io_backend *b)
{ {
if (fds[i].revents & POLLOUT) if (fds[i].revents & POLLOUT)
{ {
if (!FD_CALLBACK(fd->callback, fd->fd)) int res = FD_CALLBACK(fd->callback, fd->fd);
fatal("What now?");
if (LSH_PROBLEMP(res))
werror("Strange: Connected, "
"but failed before writing anything.\n");
b->nconnect--; b->nconnect--;
UNLINK_FD; UNLINK_FD;
lsh_free(fd); lsh_free(fd);
...@@ -298,6 +327,7 @@ static int io_iter(struct io_backend *b) ...@@ -298,6 +327,7 @@ static int io_iter(struct io_backend *b)
return 1; return 1;
} }
/* FIXME: Prehaps this function should return a suitable exit code? */
void io_run(struct io_backend *b) void io_run(struct io_backend *b)
{ {
while(io_iter(b)) while(io_iter(b))
...@@ -469,13 +499,15 @@ struct abstract_write *io_read_write(struct io_backend *b, ...@@ -469,13 +499,15 @@ struct abstract_write *io_read_write(struct io_backend *b,
int fd, int fd,
struct read_handler *read_callback, struct read_handler *read_callback,
UINT32 block_size, UINT32 block_size,
struct callback *close_callback) struct close_callback *close_callback)
{ {
struct io_fd *f= xalloc(sizeof(struct io_fd)); struct io_fd *f= xalloc(sizeof(struct io_fd));
struct write_buffer *buffer = write_buffer_alloc(block_size); struct write_buffer *buffer = write_buffer_alloc(block_size);
f->fd = fd; f->fd = fd;
f->please_close = 0;
f->close_reason = -1; /* Invalid reason */
f->close_now = 0;
/* Reading */ /* Reading */
f->handler = read_callback; f->handler = read_callback;
......
...@@ -40,9 +40,15 @@ static int do_write(struct abstract_write **w, ...@@ -40,9 +40,15 @@ static int do_write(struct abstract_write **w,
if (!packet->length) if (!packet->length)
{ {
lsh_string_free(packet); lsh_string_free(packet);
return 1; return LSH_OK | LSH_GOON;
} }
if (closure->closed)
{
lsh_string_free(packet);
return LSH_FAIL | LSH_CLOSE;
}
/* Enqueue packet */ /* Enqueue packet */
new = xalloc(sizeof(struct node)); new = xalloc(sizeof(struct node));
new->packet = packet; new->packet = packet;
...@@ -70,7 +76,7 @@ static int do_write(struct abstract_write **w, ...@@ -70,7 +76,7 @@ static int do_write(struct abstract_write **w,
closure->empty = 0; closure->empty = 0;
return 1; return LSH_OK | LSH_GOON;
} }
/* Copy data as necessary, before writing. /* Copy data as necessary, before writing.
...@@ -150,6 +156,11 @@ int write_buffer_pre_write(struct write_buffer *buffer) ...@@ -150,6 +156,11 @@ int write_buffer_pre_write(struct write_buffer *buffer)
return !buffer->empty; return !buffer->empty;
} }
void write_buffer_close(struct write_buffer *buffer)
{
buffer->closed = 1;
}
struct write_buffer *write_buffer_alloc(UINT32 size) struct write_buffer *write_buffer_alloc(UINT32 size)
{ {
struct write_buffer *res = xalloc(sizeof(struct write_buffer) - 1 + size*2); struct write_buffer *res = xalloc(sizeof(struct write_buffer) - 1 + size*2);
...@@ -159,7 +170,8 @@ struct write_buffer *write_buffer_alloc(UINT32 size) ...@@ -159,7 +170,8 @@ struct write_buffer *write_buffer_alloc(UINT32 size)
res->block_size = size; res->block_size = size;
res->empty = 1; res->empty = 1;
res->closed = 0;
#if 0 #if 0
res->try_write = try; res->try_write = try;
#endif #endif
......
...@@ -45,7 +45,10 @@ struct write_buffer ...@@ -45,7 +45,10 @@ struct write_buffer
UINT32 block_size; UINT32 block_size;
int empty; int empty;
/* If non-zero, don't accept any more data. The i/o-channel shoudl be closed
* once the current buffers are flushed. */
int closed;
#if 0 #if 0
int try_write; int try_write;
#endif #endif
...@@ -63,5 +66,6 @@ struct write_buffer ...@@ -63,5 +66,6 @@ struct write_buffer
struct write_buffer *write_buffer_alloc(UINT32 size); struct write_buffer *write_buffer_alloc(UINT32 size);
int write_buffer_pre_write(struct write_buffer *buffer); int write_buffer_pre_write(struct write_buffer *buffer);
void write_buffer_close(struct write_buffer *buffer);
#endif /* LSH_WRITE_BUFFER_H_INCLUDED */ #endif /* LSH_WRITE_BUFFER_H_INCLUDED */
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment