Skip to content
Snippets Groups Projects
Commit 529171a5 authored by Marcus Agehall's avatar Marcus Agehall
Browse files

Added client mode for WebSocket module.

parent af7d4fbd
No related branches found
No related tags found
No related merge requests found
......@@ -257,10 +257,17 @@ class Connection {
Parser parser;
//! The actual client connection.
Stdio.File stream;
Stdio.File|SSL.File stream;
//! Output buffer
Stdio.Buffer buf = Stdio.Buffer();
//! Used in client mode to read server response.
protected Stdio.Buffer http_buffer = Stdio.Buffer();
//! Remote endpoint in client mode
Standards.URI endpoint;
protected int(0..1) will_write = 1;
protected mixed id;
......@@ -294,7 +301,17 @@ class Connection {
this::id = id;
}
protected void create(Stdio.File f) {
//! Create a WebSocket client
protected void create() {
// Clients start in state CLOSED.
state = CLOSED;
masking = 1;
parser = Parser();
}
//! Create a WebSocket server out of the given Stdio.File-like object
protected variant void create(Stdio.File f) {
state = CONNECTING;
parser = Parser();
stream = f;
f->set_nonblocking(websocket_in, websocket_write, websocket_closed);
......@@ -302,6 +319,133 @@ class Connection {
if (onopen) onopen(id || this);
}
//! Read callback for HTTP answer when we are in client mode and
//! have requested an upgrade to WebSocket.
protected void http_read(mixed id, string data) {
http_buffer->add(data);
array tmp = http_buffer->sscanf("%s\r\n\r\n");
if (sizeof(tmp)) {
stream->set_blocking_keep_callbacks();
array lines = tmp[0]/"\r\n";
string resp = lines[0];
int http_major, http_minor, status;
if (sscanf(resp, "HTTP/%d.%d %d", http_major, http_minor, status) != 3) {
websocket_closed();
return;
}
if (status != 101) {
#ifdef WEBSOCKET_DEBUG
werror("Failed to upgrade connection!\n");
#endif
websocket_closed();
return;
}
mapping headers = ([]);
foreach(lines[1..];int i; string l) {
int res = sscanf(l, "%s: %s", string h, string v);
if (res != 2) {
break;
}
headers[h] = v;
}
// At this point, we are upgraded, so let's set the
// websocket callback handlers
masking = 1; // RFC6455 dictates that clients always use masking!
state = OPEN;
if (onopen) onopen(id || this);
if (sizeof(http_buffer))
websocket_in(id, http_buffer->read());
stream->set_nonblocking(websocket_in, websocket_write, websocket_closed);
}
}
//! Write callback during the upgrade of a socket.
protected void request_upgrade() {
int res = buf->output_to(stream);
if (res < 0) {
websocket_closed();
return;
}
if (!sizeof(buf)) {
// Whole request written to server - we now rest until we
// have the reply!
stream->set_write_callback(0);
}
}
//! Connect to a remote WebSocket.
//! This method will send the actual HTTP request to switch
//! protocols to the server and once a HTTP 101 response is
//! returned, switch the connection to WebSockets and call the
//! @[onopen] callback.
int connect(Standards.URI endpoint, void|mapping(string:string) extra_headers) {
this_program::endpoint = endpoint;
extra_headers = extra_headers || ([]);
Stdio.File f = Stdio.File();
state = CONNECTING;
// FIXME:
// We should probably do an async connect here.
int res = f->connect(endpoint->host, endpoint->port);
if (!res) {
websocket_closed();
return 0;
}
if (endpoint->scheme == "wss") {
// If we are connecting a TLS endpoint, so let's turn our
// connection into a TLS one.
SSL.Context ctx = SSL.Context();
stream = SSL.File(f, ctx);
object ssl_session = stream->connect(endpoint->host,0);
if (!ssl_session) {
#ifdef WEBSOCKET_DEBUG
werror("Handshake failed\n");
#endif
websocket_closed();
return 0;
}
} else {
stream = f;
}
mapping headers = ([
"Host" : endpoint->host,
"Connection" : "Upgrade",
"User-Agent" : "Pike/8.0",
"Accept": "*/*",
"Upgrade" : "websocket",
"Sec-WebSocket-Key" : "x4JJHMbDL1EzLkh9GBhXDw==",
"Sec-WebSocket-Version": "13",
]);
foreach(extra_headers; string idx; string val) {
headers[idx] = val;
}
// We use our output buffer to generate the request.
buf->add("GET ", endpoint->path," HTTP/1.1\r\n");
buf->add("Host: ", endpoint->host, "\r\n");
foreach(headers; string h; string v) {
buf->add(h, ": ", v, "\r\n");
}
buf->add("\r\n");
stream->set_nonblocking(http_read, request_upgrade, websocket_closed);
return res;
}
// Sorry guys...
//!
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment