diff --git a/src/abstract_io.h b/src/abstract_io.h new file mode 100644 index 0000000000000000000000000000000000000000..ef961d7922743992159b0a845b1d666038de55e2 --- /dev/null +++ b/src/abstract_io.h @@ -0,0 +1,36 @@ +/* abstract_io.h + * + * This is the layer separating protocol processing from actual io. + */ + +#ifndef LSH_ABSTRACT_IO_H_INCLUDED +#define LSH_ABSTRACT_IO_H_INCLUDED + +#include "lsh_types.h" + +struct abstract_read; +typedef int (*abstract_read_f)(struct abstract_read *closure, + UINT8 *buffer, UINT32 length); +struct abstract_read +{ + abstract_read_f read; +}; + +#define A_READ(f, buffer, length) (f)->read((f), (buffer), (length)) + +struct read_handler +{ + int (*handler)(struct read_handler *closure, + struct abstract_read *read); +}; + +struct abstract_write; +typedef int (*abstract_write_f)(struct abstract_write *closure, + struct lsh_string *packet); + +struct abstract_write +{ + abstract_write_f write; +}; + +#endif /*LSH_ABSTRACT_IO_H_INCLUDED */ diff --git a/src/io.c b/src/io.c new file mode 100644 index 0000000000000000000000000000000000000000..183bb2adf1ff3140ed978f79128b16540e3239da --- /dev/null +++ b/src/io.c @@ -0,0 +1,219 @@ +/* io.c + * + */ + +#include "io.h" +#include <unistd.h> +#include <poll.h> +#include <errno.h> + +#include <fcntl.h> +#include <sys/types.h> +#include <sys/socket.h> + +/* A little more than an hour */ +#define MAX_TIMEOUT 4000 + +struct fd_read +{ + struct abstract_read a; + int fd; +}; + +static int do_read(struct fd_read *closure, UINT8 *buffer, UINT32 length) +{ + return read(closure->fd, buffer, length); +}; + +void io_run(struct io_backend *b) +{ + while(1) + { + struct pollfd *fds; + int i; + nfds_t nfds; + int timeout; + int res; + + nfds = b->ninput + b->noutput + b->nlisten + n->nconnect; + + if (b->callouts) + { + time_t now = time(); + if (now >= b->callout->when) + timeout = 0; + else + { + if (b->callout->when > now + MAX_TIMEOUT) + timeout = MAX_TIMEOUT * 1000; + else + timeout = (b->callout->when - now) * 1000; + } + } + else + { + if (!nfds) + /* All done */ + break; + timeout = -1; + } + + fds = alloca(sizeof(struct pollfd) * nfds); + + /* Handle fds in order: read, accept, connect, write, */ + i = 0; + { + struct input_fd *fd = b->input; + for( ; fd; fd = fd->next, i++) + { + fds[i]->fd = fd->hold_on ? -1 : fd->fd; + fds[i]->events = POLLIN; + } + } + { + struct accept_fd *fd = b->accept; + for( ; fd; fd = fd->next, i++) + { + fds[i]->fd = fd->fd; + fds[i]->events = POLLIN; + } + } + { + struct connect_fd *fd = b->connect; + for( ; fd; fd = fd->next, i++) + { + fds[i]->fd = fd->fd; + fds[i]->events = POLLOUT; + } + } + { + struct output_fd *fd = b->output; + for( ; fd; fd = fd->next, i++) + { + write_buffer_pre_write(fd->buffer); + + fds[i]->fd = fd->buffer->empty ? -1 : fd->fd; + fds[i]->events = POLLOUT; + } + } + + res = poll(fds, nfds, timeout); + + if (!res) + { + /* Timeout. Run the callout */ + if (!CALLBACK(b->callouts->callout);) + fatal("What now?"); + b->callouts = b->callouts->next; + } + if (res<0) + { + switch(errno) + { + case EAGAIN: + case EINTR: + continue; + default: + fatal("io_run:poll failed: %s", strerror(errno)); + } + } + else + { /* Process files */ + i = 0; + { + struct input_fd *fd = b->input; + for( ; fd; fd = fd->next, i++) + { + if (fds[i]->revents & POLLIN) + { + struct fd_read r = + { { (abstract_read_f) do_read }, fd->fd }; + + if (!fd->callback->handler(fd->callback, &r)) + /* FIXME: Remove fd, or close, or? */ + fatal("What now?"); + } + } + + { + struct accept_fd *fd = b->accept; + for( ; fd; fd = fd->next, i++) + { + if (fds[i]->revents & POLLIN) + if (!CALLBACK(fd->callback)) + fatal("What now?"); + } + } + { + struct connect_fd *fd = b->connect; + for( ; fd; fd = fd->next, i++) + { + if (fds[i]->revents & POLLOUT) + if (!CALLBACK(fd->callback)) + fatal("What now?"); + } + } + { + struct output_fd *fd = b->output; + for( ; fd; fd = fd->next, i++) + { + if (fds[i]->revents & POLLOUT) + { + UINT32 size = MIN(fd->buffer->end - fd->buffer->start, + fd->buffer->block_size); + int res = write(fd->fd, fd->buffer->data + fd->buffer->start, + size); + if (!res) + fatal("Closed?"); + if (res < 0) + switch(errno) + { + case EINTR: + case EAGAIN: + break; + default: + CALLBACK(fd->close_Callback); + } + else + fd->buffer->start += res; + } + } + } + } + } + } +} + +void io_set_nonblocking(int fd) +{ + if (fcntl(fd, F_SETFL, O_NONBLOCK) < 0) + fatal("io_set_nonblocking: fcntl() failed, %s", strerror(errno)); +} + +int io_connect(struct sockaddr *sa, struct callback *f) +{ + struct connect_fd *file; + int fd = socket(AF_INET, SOCK_STREAM, ...); + + if (fd<0) + fatal("io_connect: socket() failed, %s", strerror(errno)); + + io_set_nonblocking(fd); + + if (connect(fd, sa, ...) < 0) + ...; + + file = xalloc(sizeof(struct connect_fd)); + info->next = ...; + info->fd = fd; + + /* FIXME: The fd must somehow be passed to the callback. */ + info->callback = callback; + + return info; +} + +int io_listen() + + + diff --git a/src/io.h b/src/io.h new file mode 100644 index 0000000000000000000000000000000000000000..120fa2535b47919c048c990d0ca2d022c2470b26 --- /dev/null +++ b/src/io.h @@ -0,0 +1,63 @@ +/* io.h + * + */ + +#ifndef LSH_IO_H_INCLUDED +#define LSH_IO_H_INCLUDED + +#include "abstract_io.h" +#include "write_buffer" + +struct input_fd +{ + struct input_fd *next; + int fd; + struct read_handler *callback; + int on_hold; /* For flow control */ +}; + +struct output_fd +{ + struct output_fd *next; + int fd; + struct write_buffer *buffer; + struct callback close_callback; +}; + +struct listen_fd +{ + struct listen_fd *next; + int fd; + struct callback *callback; +}; + +struct connect_fd +{ + struct connect_fd *next; + int fd; + struct callback *callback; + +}; + +struct callout +{ + struct callout_info *next; + struct callback *callout; + time_t when; + /* callback */ +}; + +struct io_backend +{ + unsigned ninput; + struct input_fd *input; + unsigned noutput; + struct output_fd *output; + unsigned nlisten; + struct listen_fd *listen; + unsigned nconnect; + struct connect_fd *connect; + struct callout *callouts; +}; + +#endif /* LSH_IO_H_INCLUDED */ diff --git a/src/write_buffer.c b/src/write_buffer.c new file mode 100644 index 0000000000000000000000000000000000000000..7ac1b8ab78077e020b8f9425185a05531da2b7ee --- /dev/null +++ b/src/write_buffer.c @@ -0,0 +1,133 @@ +/* write_buffer.c + * + */ + +#include "xalloc.h" + +static int do_write(struct write_buffer *closure, + struct lsh_string *packet) +{ + struct node *new; + if (!packet->length) + { + lsh_string_free(packet); + return; + } + + /* Enqueue packet */ + new = xalloc(sizeof(struct node)); + new->next = 0; + + if (closure->tail) + { + new->prev = closure->tail; + closure->tail->next = new; + } + else + { + new->prev = NULL; + closure->head = new; + } + closure->tail = new; + +#if 0 + if (closure->try_write) + { + /* Attempt writing to the corresponding fd. */ + } +#endif + + return 1; +} + +/* Copy data as necessary, before writing. + * + * FIXME: Writing of large packets could probably be optimized by + * avoiding copying it into the buffer. */ +void write_buffer_pre_write(struct write_buffer *buffer) +{ + UINT32 length = buffer->end - buffer->start; + + if (buffer->start > buffer->block_size) + { + /* Copy contents to the start of the buffer */ + memcpy(buffer->data, buffer->data + buffer->start, length); + buffer->start = 0; + buffer->end = length; + } + + while (length < buffer->block_size) + { + /* Copy more data into buffer */ + if (buffer->partial) + { + UINT32 partial_left = buffer->partial->length - buffer->pos; + UINT32 buffer_left = 2*buffer->block_size - length; + if (partial_left <= buffer_left) + { + /* The rest of the partial packet fits in the buffer */ + memcpy(buffer->data + length, + buffer->partial->data + buffer->pos, + partial_left); + + buffer->end += partial_left; + length += partial_left; + + lsh_string_free(buffer->partial); + buffer->partial = NULL; + } + else + { + memcpy(buffer->data + length, + buffer->partial->data + buffer->pos, + buffer_left); + + buffer->end += buffer_left; + length += buffer_left; + buffer->pos += buffer_left; + } + } + else + { + /* Dequeue a packet, if possible */ + struct node *n = buffer->head; + if (n) + { + buffer->partial = n->packet; + buffer->pos = 0; + buffer->head = n->next; + if (buffer->head) + buffer->head->->next = 0; + else + buffer->tail = 0; + } + else + break; + } + } + buffer->empty = !length; +} + +struct write_buffer *write_buffer_alloc(UINT32 size) +{ + struct write_buffer *res = xalloc(sizeof(write_callback) - 1 + size*2); + + res->a.write = (abstract_write_f) do_write; + + res->block_size = size; + + res->empty = 1; + +#if 0 + res->try_write = try; +#endif + + res->head = res->tail = 0; + + res->pos = 0; + res->packet = NULL; + + res->start = res->end = 0; + + return res; +} diff --git a/src/write_buffer.h b/src/write_buffer.h new file mode 100644 index 0000000000000000000000000000000000000000..0d8a26a92026afe25febfa222b0c9886f3be9cdb --- /dev/null +++ b/src/write_buffer.h @@ -0,0 +1,49 @@ +/* write_buffer.h + * + */ + +#ifndef LSH_WRITE_BUFFER_H_INCLUDED +#define LSH_WRITE_BUFFER_H_INCLUDED + +#include "abstract_io.h" + +/* For the packet queue */ +struct node +{ + struct node *next; + struct node *prev; + struct lsh_string *packet; +}; + +struct write_buffer +{ + struct abstract_write a; + + UINT32 block_size; + + int empty; + +#if 0 + int try_write; +#endif + + struct node *head; + struct node *tail; + + UINT32 pos; /* Partial packet */ + struct lsh_string *partial; + + UINT32 start; + UINT32 end; + UINT8 buffer[1]; /* Real size is twice the blocksize */ +}; + +struct write_callback +{ + struct callback c; + struct write_buffer buffer; +}; + +struct write_callback *write_buffer_alloc(UINT32 size); + +#endif /* LSH_WRITE_BUFFER_H_INCLUDED */