Commit b49e4d3c authored by Niels Möller's avatar Niels Möller
Browse files

New files

Rev: src/abstract_io.h:1.1
Rev: src/io.c:1.1
Rev: src/io.h:1.1
Rev: src/write_buffer.c:1.1
Rev: src/write_buffer.h:1.1
parent 0cc7ad81
/* abstract_io.h
*
* This is the layer separating protocol processing from actual io.
*/
#ifndef LSH_ABSTRACT_IO_H_INCLUDED
#define LSH_ABSTRACT_IO_H_INCLUDED
#include "lsh_types.h"
struct abstract_read;
typedef int (*abstract_read_f)(struct abstract_read *closure,
UINT8 *buffer, UINT32 length);
struct abstract_read
{
abstract_read_f read;
};
#define A_READ(f, buffer, length) (f)->read((f), (buffer), (length))
struct read_handler
{
int (*handler)(struct read_handler *closure,
struct abstract_read *read);
};
struct abstract_write;
typedef int (*abstract_write_f)(struct abstract_write *closure,
struct lsh_string *packet);
struct abstract_write
{
abstract_write_f write;
};
#endif /*LSH_ABSTRACT_IO_H_INCLUDED */
/* io.c
*
*/
#include "io.h"
#include <unistd.h>
#include <poll.h>
#include <errno.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/socket.h>
/* A little more than an hour */
#define MAX_TIMEOUT 4000
struct fd_read
{
struct abstract_read a;
int fd;
};
static int do_read(struct fd_read *closure, UINT8 *buffer, UINT32 length)
{
return read(closure->fd, buffer, length);
};
void io_run(struct io_backend *b)
{
while(1)
{
struct pollfd *fds;
int i;
nfds_t nfds;
int timeout;
int res;
nfds = b->ninput + b->noutput + b->nlisten + n->nconnect;
if (b->callouts)
{
time_t now = time();
if (now >= b->callout->when)
timeout = 0;
else
{
if (b->callout->when > now + MAX_TIMEOUT)
timeout = MAX_TIMEOUT * 1000;
else
timeout = (b->callout->when - now) * 1000;
}
}
else
{
if (!nfds)
/* All done */
break;
timeout = -1;
}
fds = alloca(sizeof(struct pollfd) * nfds);
/* Handle fds in order: read, accept, connect, write, */
i = 0;
{
struct input_fd *fd = b->input;
for( ; fd; fd = fd->next, i++)
{
fds[i]->fd = fd->hold_on ? -1 : fd->fd;
fds[i]->events = POLLIN;
}
}
{
struct accept_fd *fd = b->accept;
for( ; fd; fd = fd->next, i++)
{
fds[i]->fd = fd->fd;
fds[i]->events = POLLIN;
}
}
{
struct connect_fd *fd = b->connect;
for( ; fd; fd = fd->next, i++)
{
fds[i]->fd = fd->fd;
fds[i]->events = POLLOUT;
}
}
{
struct output_fd *fd = b->output;
for( ; fd; fd = fd->next, i++)
{
write_buffer_pre_write(fd->buffer);
fds[i]->fd = fd->buffer->empty ? -1 : fd->fd;
fds[i]->events = POLLOUT;
}
}
res = poll(fds, nfds, timeout);
if (!res)
{
/* Timeout. Run the callout */
if (!CALLBACK(b->callouts->callout);)
fatal("What now?");
b->callouts = b->callouts->next;
}
if (res<0)
{
switch(errno)
{
case EAGAIN:
case EINTR:
continue;
default:
fatal("io_run:poll failed: %s", strerror(errno));
}
}
else
{ /* Process files */
i = 0;
{
struct input_fd *fd = b->input;
for( ; fd; fd = fd->next, i++)
{
if (fds[i]->revents & POLLIN)
{
struct fd_read r =
{ { (abstract_read_f) do_read }, fd->fd };
if (!fd->callback->handler(fd->callback, &r))
/* FIXME: Remove fd, or close, or? */
fatal("What now?");
}
}
{
struct accept_fd *fd = b->accept;
for( ; fd; fd = fd->next, i++)
{
if (fds[i]->revents & POLLIN)
if (!CALLBACK(fd->callback))
fatal("What now?");
}
}
{
struct connect_fd *fd = b->connect;
for( ; fd; fd = fd->next, i++)
{
if (fds[i]->revents & POLLOUT)
if (!CALLBACK(fd->callback))
fatal("What now?");
}
}
{
struct output_fd *fd = b->output;
for( ; fd; fd = fd->next, i++)
{
if (fds[i]->revents & POLLOUT)
{
UINT32 size = MIN(fd->buffer->end - fd->buffer->start,
fd->buffer->block_size);
int res = write(fd->fd, fd->buffer->data + fd->buffer->start,
size);
if (!res)
fatal("Closed?");
if (res < 0)
switch(errno)
{
case EINTR:
case EAGAIN:
break;
default:
CALLBACK(fd->close_Callback);
}
else
fd->buffer->start += res;
}
}
}
}
}
}
}
void io_set_nonblocking(int fd)
{
if (fcntl(fd, F_SETFL, O_NONBLOCK) < 0)
fatal("io_set_nonblocking: fcntl() failed, %s", strerror(errno));
}
int io_connect(struct sockaddr *sa, struct callback *f)
{
struct connect_fd *file;
int fd = socket(AF_INET, SOCK_STREAM, ...);
if (fd<0)
fatal("io_connect: socket() failed, %s", strerror(errno));
io_set_nonblocking(fd);
if (connect(fd, sa, ...) < 0)
...;
file = xalloc(sizeof(struct connect_fd));
info->next = ...;
info->fd = fd;
/* FIXME: The fd must somehow be passed to the callback. */
info->callback = callback;
return info;
}
int io_listen()
/* io.h
*
*/
#ifndef LSH_IO_H_INCLUDED
#define LSH_IO_H_INCLUDED
#include "abstract_io.h"
#include "write_buffer"
struct input_fd
{
struct input_fd *next;
int fd;
struct read_handler *callback;
int on_hold; /* For flow control */
};
struct output_fd
{
struct output_fd *next;
int fd;
struct write_buffer *buffer;
struct callback close_callback;
};
struct listen_fd
{
struct listen_fd *next;
int fd;
struct callback *callback;
};
struct connect_fd
{
struct connect_fd *next;
int fd;
struct callback *callback;
};
struct callout
{
struct callout_info *next;
struct callback *callout;
time_t when;
/* callback */
};
struct io_backend
{
unsigned ninput;
struct input_fd *input;
unsigned noutput;
struct output_fd *output;
unsigned nlisten;
struct listen_fd *listen;
unsigned nconnect;
struct connect_fd *connect;
struct callout *callouts;
};
#endif /* LSH_IO_H_INCLUDED */
/* write_buffer.c
*
*/
#include "xalloc.h"
static int do_write(struct write_buffer *closure,
struct lsh_string *packet)
{
struct node *new;
if (!packet->length)
{
lsh_string_free(packet);
return;
}
/* Enqueue packet */
new = xalloc(sizeof(struct node));
new->next = 0;
if (closure->tail)
{
new->prev = closure->tail;
closure->tail->next = new;
}
else
{
new->prev = NULL;
closure->head = new;
}
closure->tail = new;
#if 0
if (closure->try_write)
{
/* Attempt writing to the corresponding fd. */
}
#endif
return 1;
}
/* Copy data as necessary, before writing.
*
* FIXME: Writing of large packets could probably be optimized by
* avoiding copying it into the buffer. */
void write_buffer_pre_write(struct write_buffer *buffer)
{
UINT32 length = buffer->end - buffer->start;
if (buffer->start > buffer->block_size)
{
/* Copy contents to the start of the buffer */
memcpy(buffer->data, buffer->data + buffer->start, length);
buffer->start = 0;
buffer->end = length;
}
while (length < buffer->block_size)
{
/* Copy more data into buffer */
if (buffer->partial)
{
UINT32 partial_left = buffer->partial->length - buffer->pos;
UINT32 buffer_left = 2*buffer->block_size - length;
if (partial_left <= buffer_left)
{
/* The rest of the partial packet fits in the buffer */
memcpy(buffer->data + length,
buffer->partial->data + buffer->pos,
partial_left);
buffer->end += partial_left;
length += partial_left;
lsh_string_free(buffer->partial);
buffer->partial = NULL;
}
else
{
memcpy(buffer->data + length,
buffer->partial->data + buffer->pos,
buffer_left);
buffer->end += buffer_left;
length += buffer_left;
buffer->pos += buffer_left;
}
}
else
{
/* Dequeue a packet, if possible */
struct node *n = buffer->head;
if (n)
{
buffer->partial = n->packet;
buffer->pos = 0;
buffer->head = n->next;
if (buffer->head)
buffer->head->->next = 0;
else
buffer->tail = 0;
}
else
break;
}
}
buffer->empty = !length;
}
struct write_buffer *write_buffer_alloc(UINT32 size)
{
struct write_buffer *res = xalloc(sizeof(write_callback) - 1 + size*2);
res->a.write = (abstract_write_f) do_write;
res->block_size = size;
res->empty = 1;
#if 0
res->try_write = try;
#endif
res->head = res->tail = 0;
res->pos = 0;
res->packet = NULL;
res->start = res->end = 0;
return res;
}
/* write_buffer.h
*
*/
#ifndef LSH_WRITE_BUFFER_H_INCLUDED
#define LSH_WRITE_BUFFER_H_INCLUDED
#include "abstract_io.h"
/* For the packet queue */
struct node
{
struct node *next;
struct node *prev;
struct lsh_string *packet;
};
struct write_buffer
{
struct abstract_write a;
UINT32 block_size;
int empty;
#if 0
int try_write;
#endif
struct node *head;
struct node *tail;
UINT32 pos; /* Partial packet */
struct lsh_string *partial;
UINT32 start;
UINT32 end;
UINT8 buffer[1]; /* Real size is twice the blocksize */
};
struct write_callback
{
struct callback c;
struct write_buffer buffer;
};
struct write_callback *write_buffer_alloc(UINT32 size);
#endif /* LSH_WRITE_BUFFER_H_INCLUDED */
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment