Commit 4a5a5ab3 authored by Per Cederqvist's avatar Per Cederqvist

Initial revision

parent 997161ef
/*
** isc_event.c definitions of ISC subsystem routines
**
** Copyright (c) 1992 Peter Eriksson and Per Cederqvist of the
** Lysator Academic Computer Association.
**
** history:
** 900403 pen initial coding.
** 900612 pen rewrote the message buffering.
** 901102 pen fixed bug in isc_gethostname().
** 910303 ceder clean up. Removed everything that is lyskom-specific.
** 910304 pen really removed everything lyskom-specific.. :-)
** 920129 pen added support for "lazy" connect
*/
#include <stdio.h>
#include <stdarg.h>
#include <stdlib.h>
#include <time.h>
#include <sys/types.h>
#include <sys/time.h>
#include <errno.h>
#include <stddef.h>
#include <string.h>
#include "isc.h"
#include "intern.h"
/*
* External function declarations.
*/
extern void *bzero(void *buf, unsigned length);
extern time_t time(time_t *buf);
extern int select(int setsize,
fd_set *rs, fd_set *rw, fd_set *es,
struct timeval *wait);
IscEvent *
isc_getnextevent(IscMaster *mcb, long timeout)
{
struct timeval wait;
fd_set read_set;
fd_set write_set;
fd_set error_set;
IscEvent * event;
IscEvent * event2;
IscSessionList * isl;
IscSession * scb;
IscSession * new_scb;
IscMessage * msg;
IscAddress * ia;
int nfds;
int numwaitmsgs;
/* Allocate an event */
ISC_XNEW(event);
event->msg = NULL;
event->session = NULL;
RETRY:
/* Set up the list of file descriptors to wait on */
FD_ZERO(&read_set);
FD_ZERO(&write_set);
FD_ZERO(&error_set);
/* Number of sessions with queued messages */
numwaitmsgs = 0;
/*
* Go thru the list of sessions, add to read_set and handle LOGOUT
*/
for (isl = mcb->sessions; isl != NULL; isl = isl->next)
{
scb = isl->scb;
if (scb->state == ISC_STATE_CLOSING)
{
/*
** If we are in state CLOSING, return all outstanding messages
** before returning event LOGOUT unless fd still open and outgoing
** messages exists.
*/
event->session = scb;
event->msg = isc_popqueue(scb->rd_msg_q);
if (event->msg)
{
event->event = ISC_EVENT_MESSAGE;
/* Move the session pointer to the next in the circular queue */
mcb->sessions = mcb->sessions->next;
return event;
}
else
if (scb->fd == -1 || !isc_pollqueue(scb->wr_msg_q))
{
event->event = ISC_EVENT_LOGOUT;
/* Move the session pointer to the next in the circular queue */
mcb->sessions = mcb->sessions->next;
return event;
}
}
/*
* Add to the write set if in state CONNECTING or RUNNING and there
* are messages in the outgoing queue.
*/
if (scb->state == ISC_STATE_CONNECTING ||
(scb->state == ISC_STATE_RUNNING &&
(scb->sendindex > 0 || isc_pollqueue(scb->wr_msg_q))))
FD_SET(scb->fd, &write_set);
/*
* Add to the read set if state RUNNING or LISTENING
*/
if (scb->state == ISC_STATE_RUNNING ||
scb->state == ISC_STATE_LISTENING)
FD_SET(scb->fd, &read_set);
/*
* Increment the count of sessions with waiting messages
*/
if (isc_pollqueue(scb->rd_msg_q))
numwaitmsgs++;
/*
* Terminate at end of circular ring
*/
if (isl->next == mcb->sessions)
break;
}
/*
* Set up the timeout structure: If there are queued messages,
* do not wait in select, else use user supplied timeout limit
*/
if (numwaitmsgs > 0)
{
wait.tv_sec = 0;
wait.tv_usec = 0;
}
else
{
wait.tv_sec = timeout / 1000;
wait.tv_usec = (timeout % 1000L) * 1000L;
}
/* Wait for event */
nfds = select(FD_SETSIZE, &read_set, &write_set, &error_set, &wait);
/* Error in select? */
if (nfds < 0)
{
event->event = ISC_EVENT_ERROR;
event->msg = isc_mkstrmsg("Error in select()");
return event;
}
/* Timeout? */
if (nfds == 0 && numwaitmsgs == 0)
{
event->event = ISC_EVENT_TIMEOUT;
return event;
}
for (isl = mcb->sessions; isl != NULL; isl = isl->next)
{
scb = isl->scb;
if (FD_ISSET(scb->fd, &write_set))
switch (scb->state)
{
case ISC_STATE_RUNNING:
isc_oflush(scb);
break;
case ISC_STATE_CONNECTING:
ia = isc_getraddress(scb);
if (ia == NULL)
{
/*
* Connect request failed
*/
scb->state = ISC_STATE_CLOSING;
scb->errno = errno;
event->event = ISC_EVENT_REJECTED;
event->session = scb;
}
else
{
/*
* Connect request acknowledged
*/
isc_free(ia);
scb->state = ISC_STATE_RUNNING;
event->event = ISC_EVENT_CONNECTED;
event->session = scb;
}
/* Move the session pointer to the next in the circular queue */
mcb->sessions = mcb->sessions->next;
return event;
default:
/* Major fuck-up somewhere, this should not happen... */
event->event = ISC_EVENT_ERROR;
event->session = scb;
event->msg = isc_mkstrmsg(
"Should not happen; illegal state for write FD_SET");
/* Move the session pointer to the next in the circular queue */
mcb->sessions = mcb->sessions->next;
return event;
}
if (isl->next == mcb->sessions)
break;
}
/* Check for messages from sessions */
for (isl = mcb->sessions; isl != NULL; isl = isl->next)
{
scb = isl->scb;
if (FD_ISSET(scb->fd, &read_set) &&
!(scb->type == ISC_TYPE_TCP && scb->state == ISC_STATE_LISTENING))
{
time(&scb->idlesince);
msg = ISC_SCALLFUN1(scb, read, scb);
if (msg)
{
scb->stats.rx.bytes += msg->length;
scb->stats.rx.packets++;
if (scb->fun.parse)
{
event2 = ISC_SCALLFUN2(scb, parse, scb, msg);
if (event2)
{
isc_free(event);
return event2;
}
}
isc_pushqueue(scb->rd_msg_q, msg);
}
else
{
scb->state = ISC_STATE_CLOSING;
scb->errno = errno;
}
}
if (isl->next == mcb->sessions)
break;
}
/* New connection? */
for (isl = mcb->sessions; isl != NULL; isl = isl->next)
{
scb = isl->scb;
if (scb->state == ISC_STATE_LISTENING &&
FD_ISSET(scb->fd, &read_set) &&
scb->fun.accept != NULL)
{
new_scb = ISC_SCALLFUN2(scb, accept, scb, NULL);
if (new_scb == NULL)
{
event->event = ISC_EVENT_ERROR;
event->session = NULL;
event->msg = isc_mkstrmsg("Failed isc_getnextevent() "
"in new session");
return event;
}
/* Fill in the event info structure */
event->session = new_scb;
event->event = ISC_EVENT_LOGIN;
/* Move the session pointer to the next in the circular queue */
mcb->sessions = mcb->sessions->next;
isc_insert(mcb, new_scb);
return event;
}
if (isl->next == mcb->sessions)
break;
}
/* Return one of the received messages from the clients */
for (isl = mcb->sessions; isl != NULL; isl = isl->next)
{
scb = isl->scb;
if (isc_pollqueue(scb->rd_msg_q))
{
event->session = scb;
event->msg = isc_popqueue(scb->rd_msg_q);
event->event = ISC_EVENT_MESSAGE;
/* Move the session pointer to the next in the circular queue */
mcb->sessions = mcb->sessions->next;
return event;
}
if (isl->next == mcb->sessions)
break;
}
goto RETRY; /* This happens if/when only transmits where pending.. */
}
/*
** Get rid of the specified event
*/
void
isc_dispose(IscEvent *ep)
{
if (!ep)
return;
if (ep->msg)
isc_freemsg(ep->msg);
isc_free(ep);
}
/*
** isc_master.c IscMaster control functions
**
** Copyright (c) 1992 Peter Eriksson and Per Cederqvist of the
** Lysator Academic Computer Association.
**
** history:
** 910305 pen moved to separate file
*/
#include <errno.h>
#include <stdlib.h>
#include <stddef.h>
#include <sys/file.h>
#include "isc.h"
#include "intern.h"
/*
** This routine will initialize the ISC subsystem
*/
IscMaster *
isc_initialize(IscConfig * cfg)
{
IscMaster * mcb;
IscMasterConfig * mcfg;
IscSessionConfig * scfg;
ISC_XNEW(mcb);
mcb->sessions = NULL;
/* Setup default values */
mcb->scfg.max.msgsize = ISC_DEFAULT_MAX_MSG_SIZE;
mcb->scfg.max.queuedsize = ISC_DEFAULT_MAX_QUEUED_SIZE;
mcb->scfg.max.dequeuelen = ISC_DEFAULT_MAX_DEQUEUE_LEN;
mcb->scfg.max.openretries = ISC_DEFAULT_MAX_OPEN_RETRIES;
mcb->scfg.max.backlog = ISC_DEFAULT_MAX_BACKLOG;
/* Handle user specified defaults */
if (cfg)
switch (cfg->version)
{
case 1005:
scfg = &cfg->session;
mcfg = &cfg->master;
switch (scfg->version)
{
case 0:
break;
case 1001:
if (scfg->max.msgsize >= 0)
mcb->scfg.max.msgsize = scfg->max.msgsize;
if (scfg->max.queuedsize >= 0)
mcb->scfg.max.queuedsize = scfg->max.queuedsize;
if (scfg->max.dequeuelen >= 0)
mcb->scfg.max.dequeuelen = scfg->max.dequeuelen;
if (scfg->max.openretries >= 0)
mcb->scfg.max.openretries = scfg->max.openretries;
if (scfg->max.backlog >= 0)
mcb->scfg.max.backlog = scfg->max.backlog;
break;
default:
isc_free(mcb);
errno = EINVAL;
return NULL;
}
switch (mcfg->version)
{
case 0:
break;
case 1001:
if (mcfg->memfn.alloc &&
mcfg->memfn.realloc &&
mcfg->memfn.free)
isc_setallocfn(mcfg->memfn.alloc,
mcfg->memfn.realloc,
mcfg->memfn.free);
if (mcfg->abortfn)
isc_setabortfn(mcfg->abortfn);
break;
default:
isc_free(mcb);
errno = EINVAL;
return NULL;
}
break;
default:
isc_free(mcb);
errno = EINVAL;
return NULL;
}
return mcb;
}
/*
** Close and destroy all sessions associated with a MCB, then
** destroy the MCB too.
*/
void
isc_shutdown(IscMaster * mcb)
{
while (mcb->sessions)
isc_destroy(mcb, mcb->sessions->scb);
isc_free(mcb);
}
/*
** isc_message.c Generic "IscMessage" handling functions
**
** Copyright (c) 1991 Peter Eriksson and Per Cederqvist of the
** Lysator Academic Computer Association.
**
** history:
** 910305 pen moved into separate file
** 920102 pen added isc_copymsg()
*/
#include <stdlib.h>
#include <stddef.h>
#include <string.h>
#include "isc.h"
#include "intern.h"
extern void *memcpy(void *, void *, int);
IscMessage *
isc_allocmsg(size_t size)
{
IscMessage * msg;
ISC_XNEW(msg);
msg->buffer = (char *) isc_malloc(size+1);
msg->size = size;
msg->length = 0;
msg->address = NULL;
return msg;
}
IscMessage *
isc_reallocmsg(IscMessage * msg,
size_t newsize)
{
msg->buffer = (char *) isc_realloc(msg->buffer, newsize+1);
msg->size = newsize;
return msg;
}
IscMessage *
isc_copymsg(IscMessage *msg)
{
IscMessage * newmsg;
newmsg = isc_allocmsg(msg->size);
if (msg->buffer && msg->length > 0)
memcpy(newmsg->buffer, msg->buffer, msg->length);
newmsg->length = msg->length;
newmsg->address = msg->address;
return newmsg;
}
void
isc_freemsg(IscMessage * msg)
{
if (msg->buffer)
isc_free(msg->buffer);
isc_free(msg);
}
IscMessage *
isc_mkstrmsg(const char *str)
{
IscMessage * msg;
msg = isc_allocmsg(strlen(str)+1);
strcpy(msg->buffer,str);
msg->length = strlen(str);
return msg;
}
/*
** isc_output.c Isc data output functions
**
** Copyright (c) 1991 Peter Eriksson and Per Cederqvist of the
** Lysator Academic Computer Association.
**
** history:
** 910305 pen moved into separate file
** 910310 pen added isc_send()
*/
#include <stdlib.h>
#include <stddef.h>
#include <errno.h>
#include <string.h>
#include <sys/file.h>
#include <errno.h>
#include "isc.h"
#include "intern.h"
/*
* External function declarations
*/
extern void *memcpy(void *, void *, int);
extern char *strerror(int);
void isc_flush(IscSession *scb)
{
switch (scb->state)
{
case ISC_STATE_CLOSING:
if (scb->wr_msg_q)
{
isc_killqueue(scb->wr_msg_q);
scb->wr_msg_q = NULL;
}
if (scb->rd_msg_q)
{
isc_killqueue(scb->rd_msg_q);
scb->rd_msg_q = NULL;
}
scb->sendindex = 0;
return;
case ISC_STATE_RUNNING:
case ISC_STATE_CONNECTING:
isc_oflush(scb);
return;
default:
return;
}
}
void isc_oflush(IscSession *scb)
{
IscMessage * msg;
int failed;
int wlen;
int loopcnt;
/* Data in the block buffer? */
if (scb->sendindex > 0)
{
/* Too many queued blocks? */
if (isc_sizequeue(scb->wr_msg_q) >= scb->cfg->max.queuedsize)
{
scb->state = ISC_STATE_CLOSING;
scb->errno = E2BIG;
scb->sendindex = 0; /* And lets fake a write */
return;
}
msg = isc_allocmsg(scb->sendindex);
memcpy(msg->buffer, scb->sendbuf, scb->sendindex);
msg->length = scb->sendindex;
isc_pushqueue(scb->wr_msg_q, msg);
scb->sendindex = 0;
}
/* We only try to transmit messages for RUNNING or CLOSING sessions */
if ((scb->state != ISC_STATE_RUNNING &&
scb->state != ISC_STATE_CLOSING) || scb->fd == -1)
return;