Commit 980f0d48 authored by Niels Möller's avatar Niels Möller
Browse files

* src/connection.h (ssh_connection): New attribute socket,

replaces old attribute raw. New attributes hard_limit and
soft_limit.

Implemented flow control for the connections write_buffer. The
buffer is now of limited size. When it starts to get large, channel
i/o is disabled. If it hits the limit, the connection is closed.
* src/connection.c (WRITE_BUFFER_MAX, WRITE_BUFFER_MARGIN): New
constants.
(connection_flow_controlled): New class.
(do_connection_flow_controlled): Reenable channel i/o when we have
room in the write buffer.
(make_ssh_connection): Initialize ssh_connection attributes
wakeup, soft_limit and hard_limit.
(connection_init_io): Changed argument type from struct
abstract_write to struct lsh_fd. Install flow control on the
socket's write_buffer.
(connection_wakeup): New function, which installs a wakeup
callback on the connection.
(connection_send_kex): Compare the size of the write_buffer to the
soft and hard limits.
(connection_send_kex_end): Invoke wakeup callback, if it's
non-NULL.

Rev: src/connection.c:1.81
Rev: src/connection.h:1.72
parent ec70c327
......@@ -311,6 +311,46 @@ make_exc_connection_handler(struct ssh_connection *connection,
return &self->super;
}
/* About three max size packets */
#define WRITE_BUFFER_MAX 100000
/* Should be enough for a reasonable number of keyexchange packets or
* keepalive packets */
#define WRITE_BUFFER_MARGIN 10000
/* GABA:
(class
(name connection_flow_controlled)
(super flow_controlled)
(vars
(connection object ssh_connection)))
*/
static void
do_connection_flow_controlled(struct flow_controlled *s,
uint32_t written UNUSED)
{
CAST(connection_flow_controlled, self, s);
struct ssh_connection *connection = self->connection;
uint32_t length = connection->socket->write_buffer->length;
if (connection->hard_limit
&& length + WRITE_BUFFER_MARGIN < connection->soft_limit)
{
connection->hard_limit = 0;
if (connection->wakeup)
COMMAND_RETURN(connection->wakeup, connection);
}
}
static struct flow_controlled *
make_connection_flow_controlled(struct ssh_connection *connection)
{
NEW(connection_flow_controlled, self);
self->super.report = do_connection_flow_controlled;
self->connection = connection;
return &self->super;
}
struct ssh_connection *
make_ssh_connection(enum connection_flag flags,
struct address_info *peer,
......@@ -334,9 +374,8 @@ make_ssh_connection(enum connection_flag flags,
connection->e = make_exc_connection_handler(connection, e, HANDLER_CONTEXT);
connection->keyexchange_done = NULL;
connection->wakeup = NULL;
/* Initialize instance variables */
connection->versions[CONNECTION_SERVER]
= connection->versions[CONNECTION_CLIENT]
= connection->session_id = NULL;
......@@ -367,6 +406,8 @@ make_ssh_connection(enum connection_flag flags,
connection->send_kex_only = 0;
string_queue_init(&connection->send_queue);
connection->soft_limit = WRITE_BUFFER_MAX;
connection->hard_limit = 0;
connection->kexinits[CONNECTION_CLIENT]
= connection->kexinits[CONNECTION_SERVER] = NULL;
......@@ -419,16 +460,18 @@ make_ssh_connection(enum connection_flag flags,
void
connection_init_io(struct ssh_connection *connection,
struct abstract_write *raw,
struct lsh_fd *socket,
struct randomness *r)
{
/* Initialize i/o hooks */
connection->raw = raw;
connection->socket = socket;
connection->write_packet =
make_packet_debug(make_write_packet(connection, r, raw),
make_packet_debug(make_write_packet(connection, r, &socket->write_buffer->super),
(connection->debug_comment
? ssh_format("%lz sent", connection->debug_comment)
: ssh_format("Sent")));
connection->socket->write_buffer->report = make_connection_flow_controlled(connection);
}
void
......@@ -439,6 +482,14 @@ connection_after_keyexchange(struct ssh_connection *self,
self->keyexchange_done = c;
}
void
connection_wakeup(struct ssh_connection *self,
struct command_continuation *c)
{
assert(!self->wakeup);
self->wakeup = c;
}
/* GABA:
(class
(name connection_close_handler)
......@@ -473,7 +524,22 @@ void
connection_send_kex(struct ssh_connection *self,
struct lsh_string *message)
{
uint32_t length;
A_WRITE(self->write_packet, message);
length = self->socket->write_buffer->length;
if (self->hard_limit)
{
if (length > self->hard_limit)
{
static struct protocol_exception disconnect =
STATIC_PROTOCOL_EXCEPTION(0, "Write buffer full, peer not responding.");
EXCEPTION_RAISE(self->e, &disconnect.super);
}
}
else if (length > self->soft_limit)
self->hard_limit = length + WRITE_BUFFER_MARGIN;
}
/* Sends one ordinary (non keyexchange) packet */
......@@ -517,6 +583,8 @@ connection_send_kex_end(struct ssh_connection *self)
COMMAND_RETURN(c, self);
}
if (self->wakeup)
COMMAND_RETURN(self->wakeup, self);
}
/* Serialization. */
......
......@@ -181,7 +181,13 @@ do_##NAME(struct packet_handler *s UNUSED, \
(rec_compress object compress_instance)
; Sending
(raw object abstract_write) ; Socket connected to the other end
(socket object lsh_fd) ; Socket connected to the other end
; When crossing the soft_limit we stop reading data on user
; channels, and set hard_limit to the current size plus some
; margin. When crossing the hard_limit, the connection is closed.
(soft_limit . uint32_t)
(hard_limit . uint32_t)
(write_packet object abstract_write) ; Where to send packets
; through the pipeline
......@@ -211,7 +217,10 @@ do_##NAME(struct packet_handler *s UNUSED, \
; Automatically reset to zero after each invocation.
; Gets the connection as argument.
(keyexchange_done object command_continuation)
; Called when key(re)exchange is finished, and when our
; write buffer shrinks anough to allow more channel data.
(wakeup object command_continuation)
(kexinits array (object kexinit) 2)
(literal_kexinits array (string) 2)
......@@ -233,15 +242,19 @@ make_ssh_connection(enum connection_flag flags,
const char *id_comment,
struct exception_handler *e);
void connection_init_io(struct ssh_connection *connection,
struct abstract_write *raw,
struct randomness *r);
void
connection_init_io(struct ssh_connection *connection,
struct lsh_fd *socket,
struct randomness *r);
void
connection_after_keyexchange(struct ssh_connection *self,
struct command_continuation *c);
void
connection_wakeup(struct ssh_connection *self,
struct command_continuation *c);
struct lsh_callback *
make_connection_close_handler(struct ssh_connection *c);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment