Commit 8e8c3c2b authored by Niels Möller's avatar Niels Möller

More socket functions. New loop macros in main loop.

Rev: src/io.c:1.2
parent f5614022
......@@ -22,9 +22,25 @@ struct fd_read
static int do_read(struct fd_read *closure, UINT8 *buffer, UINT32 length)
{
return read(closure->fd, buffer, length);
int res;
do
res = read(closure->fd, buffer, length);
while ( (res < 0)
&& ( (errno == EINTR)
|| (errno == EAGAIN) ) );
return res;
};
#define FOR_FDS(type, fd, list, extra) \
{ \
type **(_fd); \
for(_fd = &(list); (fd) = *_fd; _fd = &(*_fd)->next, (extra))
#define END_FOR_FDS }
#define UNLINK_FD (*_fd = (*_fd)->next)
void io_run(struct io_backend *b)
{
while(1)
......@@ -62,49 +78,48 @@ void io_run(struct io_backend *b)
/* Handle fds in order: read, accept, connect, write, */
i = 0;
{
struct input_fd *fd = b->input;
for( ; fd; fd = fd->next, i++)
{
fds[i]->fd = fd->hold_on ? -1 : fd->fd;
fds[i]->events = POLLIN;
}
}
{
struct accept_fd *fd = b->accept;
for( ; fd; fd = fd->next, i++)
{
fds[i]->fd = fd->fd;
fds[i]->events = POLLIN;
}
}
{
struct connect_fd *fd = b->connect;
for( ; fd; fd = fd->next, i++)
{
fds[i]->fd = fd->fd;
fds[i]->events = POLLOUT;
}
}
{
struct output_fd *fd = b->output;
for( ; fd; fd = fd->next, i++)
{
write_buffer_pre_write(fd->buffer);
fds[i]->fd = fd->buffer->empty ? -1 : fd->fd;
FOR_FDS(struct input_fd, fd, b->input, i++)
{
fds[i]->fd = fd->hold_on ? -1 : fd->fd;
fds[i]->events = POLLIN;
}
END_FOR_FDS;
FOR_FDS(struct accept_fd, fd, b->accept, i++)
{
fds[i]->fd = fd->fd;
fds[i]->events = POLLIN;
}
END_FOR_FDS;
FOR_FDS(struct connect_fd, fd, b->connect, i++)
{
fds[i]->fd = fd->fd;
fds[i]->events = POLLOUT;
}
END_FOR_FDS;
FOR_FDS(struct output_fd, fd, b->output, i++)
{
/* pre_write returns 0 if the buffer is empty */
fds[i]->fd = write_buffer_pre_write(fd->buffer)
? fd->fd : -1
fds[i]->events = POLLOUT;
}
}
}
END_FOR_FDS;
res = poll(fds, nfds, timeout);
if (!res)
{
/* Timeout. Run the callout */
if (!CALLBACK(b->callouts->callout);)
struct callout *f = b->callouts;
if (!CALLBACK(f->callout);)
fatal("What now?");
b->callouts = b->callouts->next;
b->callouts = f->next;
free(f);
}
if (res<0)
{
......@@ -120,68 +135,160 @@ void io_run(struct io_backend *b)
else
{ /* Process files */
i = 0;
{
struct input_fd *fd = b->input;
for( ; fd; fd = fd->next, i++)
{
if (fds[i]->revents & POLLIN)
{
struct fd_read r =
{ { (abstract_read_f) do_read }, fd->fd };
if (!fd->callback->handler(fd->callback, &r))
/* FIXME: Remove fd, or close, or? */
fatal("What now?");
}
}
FOR_FDS(struct input_fd, fd, b->input, i++)
{
if (fds[i]->revents & POLLIN)
{
struct fd_read r =
{ { (abstract_read_f) do_read }, fd->fd };
/* The handler function returns a ew handler for the
* file, or NULL. */
if (!(fd->handler = READ_HANDLER(fd->handler, &r)))
{
/* FIXME: Should fd be closed here? */
UNLINK_FD;
free(fd);
}
}
}
END_FOR_FDS;
FOR_FDS(struct accept_fd, fd, b->accept, i++)
{
struct accept_fd *fd = b->accept;
for( ; fd; fd = fd->next, i++)
if (fds[i]->revents & POLLIN)
{
if (fds[i]->revents & POLLIN)
if (!CALLBACK(fd->callback))
fatal("What now?");
int conn = accept(fd->fd);
if (conn < 0)
{
werror("io.c: accept() failed, %s", strerror(errno));
continue;
}
if (!FD_CALLBACK(fd->callback, conn))
{
/* FIXME: Should fd be closed here? */
UNLINK_FD;
free(fd);
}
}
}
END_FOR_FDS;
FOR_FDS(struct connect_fd, fd, b->connect, i++)
{
struct connect_fd *fd = b->connect;
for( ; fd; fd = fd->next, i++)
if (fds[i]->revents & POLLOUT)
{
if (fds[i]->revents & POLLOUT)
if (!CALLBACK(fd->callback))
fatal("What now?");
if (!FD_CALLBACK(fd->callback, fd->fd))
fatal("What now?");
UNLINK_FD;
free(fd);
}
}
END_FOR_FDS;
FOR_FDS(struct output_fd, fd, b->output, i++)
{
struct output_fd *fd = b->output;
for( ; fd; fd = fd->next, i++)
if (fds[i]->revents & POLLOUT)
{
if (fds[i]->revents & POLLOUT)
{
UINT32 size = MIN(fd->buffer->end - fd->buffer->start,
fd->buffer->block_size);
int res = write(fd->fd, fd->buffer->data + fd->buffer->start,
size);
if (!res)
fatal("Closed?");
if (res < 0)
switch(errno)
{
case EINTR:
case EAGAIN:
break;
default:
CALLBACK(fd->close_Callback);
}
else
fd->buffer->start += res;
}
UINT32 size = MIN(fd->buffer->end - fd->buffer->start,
fd->buffer->block_size);
int res = write(fd->fd, fd->buffer->data + fd->buffer->start,
size);
if (!res)
fatal("Closed?");
if (res < 0)
switch(errno)
{
case EINTR:
case EAGAIN:
break;
default:
CALLBACK(fd->close_Callback);
UNLINK_FD;
free(fd->write_buffer);
free(fd);
break;
}
else
fd->buffer->start += res;
}
}
}
END_FOR_FDS;
}
}
}
/*
* Fill in ADDR from HOST, SERVICE and PROTOCOL.
* Supplying a null pointer for HOST means use INADDR_ANY.
* Otherwise HOST is an numbers-and-dits ip-number or a dns name.
*
* PROTOCOL can be tcp or udp.
*
* Supplying a null pointer for SERVICE, means use port 0, i.e. no port.
*
* Returns zero on errors, 1 if everything is ok.
*/
int
get_inaddr(struct sockaddr_in * addr,
const char * host,
const char * service,
const char * protocol)
{
memset(addr, 0, sizeof *addr);
addr->sin_family = AF_INET;
/*
* Set host part of ADDR
*/
if (host == NULL)
addr->sin_addr.s_addr = INADDR_ANY;
else
{
/* First check for numerical ip-number */
addr->sin_addr.s_addr = inet_addr(host);
if (addr->sin_addr.s_addr == (unsigned long)-1)
{
struct hostent * hp;
hp = gethostbyname(host);
if (hp == NULL)
return 0;
memcpy(&addr->sin_addr, hp->h_addr, hp->h_length);
addr->sin_family = hp->h_addrtype;
}
}
/*
* Set port part of ADDR
*/
if (service == NULL)
addr->sin_port = htons(0);
else
{
char * end;
long portno;
portno = strtol(service, &end, 10);
if (portno > 0 && portno <= 65535
&& end != service && *end == '\0')
{
addr->sin_port = htons(portno);
}
else
{
struct servent * serv;
serv = getservbyname(service, "tcp");
if (serv == NULL)
return 0;
addr->sin_port = serv->s_port;
}
}
return 1;
}
void io_set_nonblocking(int fd)
......@@ -190,30 +297,119 @@ void io_set_nonblocking(int fd)
fatal("io_set_nonblocking: fcntl() failed, %s", strerror(errno));
}
int io_connect(struct sockaddr *sa, struct callback *f)
/* Some code is taken from bellman's tcputils. */
struct connect_fd *io_connect(struct io_backend *b,
struct sockaddr_in *remote,
struct sockaddr_in *local,
struct fd_callback *f)
{
struct connect_fd *file;
int fd = socket(AF_INET, SOCK_STREAM, ...);
struct connect_fd *fd;
int s = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (fd<0)
fatal("io_connect: socket() failed, %s", strerror(errno));
if (s<0)
return NULL;
io_set_nonblocking(fd);
io_set_nonblocking(s);
if (connect(fd, sa, ...) < 0)
...;
file = xalloc(sizeof(struct connect_fd));
info->next = ...;
info->fd = fd;
if (local && bind(s, (struct sockaddr *)local, sizeof *local) < 0)
{
int saved_errno = errno;
close(s);
errno = saved_errno;
return NULL;
}
/* FIXME: The fd must somehow be passed to the callback. */
info->callback = callback;
if ( (connect(s, (struct sockaddr *)remote, sizeof *remote) < 0)
&& (errno != EINPROGRESS) )
{
int saved_errno = errno;
close(s);
errno = saved_errno;
return NULL;
}
fd = xalloc(sizeof(struct connect_fd));
fd->fd = s;
fd->callback = callback;
fd->next = b->connect;
b->connect = fd;
return info;
b->nconnect++;
return fd;
}
int io_listen()
struct listen_fd *io_listen(struct io_backend *b,
struct sockaddr_in *local,
struct fd_callback *callback)
{
struct listen_fd *fd;
int s = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (s<0)
return NULL;
io_set_nonblocking(s);
{
int yes = 1;
setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (char*)&yes, sizeof yes);
}
if (bind(s, (struct sockaddr *)local, sizeof *local) < 0)
{
close(s);
return NULL;
}
if (listen(s, 256) < 0)
{
close(s);
return NULL;
}
fd = xalloc(sizeof(struct connect_fd));
fd->fd = s;
fd->callback = callback;
fd->next = b->listen;
b->listen = fd;
b->nlisten++;
return fd;
}
void io_read(struct io_backend *b,
int fd,
struct read_callback *callback)
{
struct input_fd *fd = xalloc(sizeof(struct input_fd));
fd->fd == fd;
fd->callback = callback;
fd->next = b->input;
b->input = fd;
b->ninput++;
}
struct abstract_write *io_write(struct io_backend *b,
int fd,
UINT32 block_size,
struct callback *close_callback)
{
struct output_fd = xalloc(sizeof(struct output_fd));
struct write_buffer *buffer = write_buffer_alloc(block_size);
fd->fd = fd;
fd->close_callback = close_callback;
fd->buffer = buffer;
fd->next = b->output;
b->output = fd;
b->noutput++;
return (struct abstract_write *) buffer;
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment