From 529171a530876abf18bfd4c47f855eb97b53b4db Mon Sep 17 00:00:00 2001 From: Marcus Agehall <agehall@gmail.com> Date: Thu, 19 May 2016 17:37:16 +0200 Subject: [PATCH] Added client mode for WebSocket module. --- lib/modules/Protocols.pmod/WebSocket.pmod | 148 +++++++++++++++++++++- 1 file changed, 146 insertions(+), 2 deletions(-) diff --git a/lib/modules/Protocols.pmod/WebSocket.pmod b/lib/modules/Protocols.pmod/WebSocket.pmod index dc75c3cf62..140af116c8 100644 --- a/lib/modules/Protocols.pmod/WebSocket.pmod +++ b/lib/modules/Protocols.pmod/WebSocket.pmod @@ -257,10 +257,17 @@ class Connection { Parser parser; //! The actual client connection. - Stdio.File stream; + Stdio.File|SSL.File stream; + //! Output buffer Stdio.Buffer buf = Stdio.Buffer(); + //! Used in client mode to read server response. + protected Stdio.Buffer http_buffer = Stdio.Buffer(); + + //! Remote endpoint in client mode + Standards.URI endpoint; + protected int(0..1) will_write = 1; protected mixed id; @@ -294,7 +301,17 @@ class Connection { this::id = id; } - protected void create(Stdio.File f) { + //! Create a WebSocket client + protected void create() { + // Clients start in state CLOSED. + state = CLOSED; + masking = 1; + parser = Parser(); + } + + //! Create a WebSocket server out of the given Stdio.File-like object + protected variant void create(Stdio.File f) { + state = CONNECTING; parser = Parser(); stream = f; f->set_nonblocking(websocket_in, websocket_write, websocket_closed); @@ -302,6 +319,133 @@ class Connection { if (onopen) onopen(id || this); } + //! Read callback for HTTP answer when we are in client mode and + //! have requested an upgrade to WebSocket. + protected void http_read(mixed id, string data) { + http_buffer->add(data); + + array tmp = http_buffer->sscanf("%s\r\n\r\n"); + if (sizeof(tmp)) { + stream->set_blocking_keep_callbacks(); + + array lines = tmp[0]/"\r\n"; + string resp = lines[0]; + + int http_major, http_minor, status; + if (sscanf(resp, "HTTP/%d.%d %d", http_major, http_minor, status) != 3) { + websocket_closed(); + return; + } + + if (status != 101) { +#ifdef WEBSOCKET_DEBUG + werror("Failed to upgrade connection!\n"); +#endif + websocket_closed(); + return; + } + + mapping headers = ([]); + foreach(lines[1..];int i; string l) { + int res = sscanf(l, "%s: %s", string h, string v); + if (res != 2) { + break; + } + headers[h] = v; + } + + // At this point, we are upgraded, so let's set the + // websocket callback handlers + masking = 1; // RFC6455 dictates that clients always use masking! + state = OPEN; + if (onopen) onopen(id || this); + + if (sizeof(http_buffer)) + websocket_in(id, http_buffer->read()); + + stream->set_nonblocking(websocket_in, websocket_write, websocket_closed); + } + } + + //! Write callback during the upgrade of a socket. + protected void request_upgrade() { + int res = buf->output_to(stream); + + if (res < 0) { + websocket_closed(); + return; + } + + if (!sizeof(buf)) { + // Whole request written to server - we now rest until we + // have the reply! + stream->set_write_callback(0); + } + } + + //! Connect to a remote WebSocket. + //! This method will send the actual HTTP request to switch + //! protocols to the server and once a HTTP 101 response is + //! returned, switch the connection to WebSockets and call the + //! @[onopen] callback. + int connect(Standards.URI endpoint, void|mapping(string:string) extra_headers) { + this_program::endpoint = endpoint; + extra_headers = extra_headers || ([]); + + Stdio.File f = Stdio.File(); + state = CONNECTING; + + // FIXME: + // We should probably do an async connect here. + int res = f->connect(endpoint->host, endpoint->port); + if (!res) { + websocket_closed(); + return 0; + } + + if (endpoint->scheme == "wss") { + // If we are connecting a TLS endpoint, so let's turn our + // connection into a TLS one. + SSL.Context ctx = SSL.Context(); + stream = SSL.File(f, ctx); + object ssl_session = stream->connect(endpoint->host,0); + if (!ssl_session) { +#ifdef WEBSOCKET_DEBUG + werror("Handshake failed\n"); +#endif + websocket_closed(); + return 0; + } + } else { + stream = f; + } + + mapping headers = ([ + "Host" : endpoint->host, + "Connection" : "Upgrade", + "User-Agent" : "Pike/8.0", + "Accept": "*/*", + "Upgrade" : "websocket", + "Sec-WebSocket-Key" : "x4JJHMbDL1EzLkh9GBhXDw==", + "Sec-WebSocket-Version": "13", + ]); + + foreach(extra_headers; string idx; string val) { + headers[idx] = val; + } + + // We use our output buffer to generate the request. + buf->add("GET ", endpoint->path," HTTP/1.1\r\n"); + buf->add("Host: ", endpoint->host, "\r\n"); + foreach(headers; string h; string v) { + buf->add(h, ": ", v, "\r\n"); + } + buf->add("\r\n"); + + stream->set_nonblocking(http_read, request_upgrade, websocket_closed); + return res; + } + // Sorry guys... //! -- GitLab