[gtk+] [broadway] Stream data over websocket
- From: Alexander Larsson <alexl src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [gtk+] [broadway] Stream data over websocket
- Date: Mon, 18 Apr 2011 18:52:14 +0000 (UTC)
commit 0abd5e2767c83758d7305a3854428e715cf7af02
Author: Alexander Larsson <alexl redhat com>
Date: Mon Apr 18 19:52:05 2011 +0200
[broadway] Stream data over websocket
The zlib compressed xmlhttprequest thing was a nice hack, but it doesn't
really work in production. Its not portable, doesn't have enought API
(missing notification for closed sockets) and having to synchronize
between two different connections in a reliable way is a pain.
So, we're going everything over the websocket. This is a pure switch,
but after this we want to modify the protocol to work better over
the uncompressed utf8 transport of websockets.
gdk/broadway/broadway.c | 99 +++--------------------------------
gdk/broadway/broadway.h | 3 +-
gdk/broadway/broadway.js | 21 ++------
gdk/broadway/gdkdisplay-broadway.c | 43 ++++++---------
4 files changed, 34 insertions(+), 132 deletions(-)
---
diff --git a/gdk/broadway/broadway.c b/gdk/broadway/broadway.c
index 3917145..710cb3a 100644
--- a/gdk/broadway/broadway.c
+++ b/gdk/broadway/broadway.c
@@ -4,10 +4,6 @@
#include <assert.h>
#include <errno.h>
#include <zlib.h>
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <netinet/in.h>
-#include <netinet/tcp.h>
#include "broadway.h"
@@ -450,126 +446,50 @@ to_png_a (int w, int h, int byte_stride, guint8 *data)
************************************************************************/
struct BroadwayOutput {
- int fd;
- gzFile *zfd;
+ GOutputStream *out;
int error;
guint32 serial;
};
static void
-broadway_output_write_raw (BroadwayOutput *output,
- const void *buf, gsize count)
+broadway_output_write_header (BroadwayOutput *output)
{
- gssize res;
- int errsave;
- const char *ptr = (const char *)buf;
-
- if (output->error)
- return;
-
- while (count > 0)
- {
- res = write(output->fd, ptr, count);
- if (res == -1)
- {
- errsave = errno;
- if (errsave == EINTR)
- continue;
- output->error = TRUE;
- return;
- }
- if (res == 0)
- {
- output->error = TRUE;
- return;
- }
- count -= res;
- ptr += res;
- }
+ g_output_stream_write (output->out, "\0", 1, NULL, NULL);
}
static void
broadway_output_write (BroadwayOutput *output,
const void *buf, gsize count)
{
- gssize res;
- const char *ptr = (const char *)buf;
-
- if (output->error)
- return;
-
- while (count > 0)
- {
- res = gzwrite(output->zfd, ptr, count);
- if (res == -1)
- {
- output->error = TRUE;
- return;
- }
- if (res == 0)
- {
- output->error = TRUE;
- return;
- }
- count -= res;
- ptr += res;
- }
-}
-
-static void
-broadway_output_write_header (BroadwayOutput *output)
-{
- char *header;
-
- header =
- "HTTP/1.1 200 OK\r\n"
- "Content-type: multipart/x-mixed-replace;boundary=x\r\n"
- "Content-Encoding: gzip\r\n"
- "\r\n";
- broadway_output_write_raw (output,
- header, strlen (header));
+ g_output_stream_write_all (output->out, buf, count, NULL, NULL, NULL);
}
static void
send_boundary (BroadwayOutput *output)
{
- char *boundary =
- "--x\r\n"
- "\r\n";
-
- broadway_output_write (output, boundary, strlen (boundary));
+ broadway_output_write (output, "\xff", 1);
+ broadway_output_write (output, "\0", 1);
}
BroadwayOutput *
-broadway_output_new(int fd, guint32 serial)
+broadway_output_new(GOutputStream *out, guint32 serial)
{
BroadwayOutput *output;
- int flag = 1;
output = g_new0 (BroadwayOutput, 1);
- output->fd = fd;
+ output->out = g_object_ref (out);
output->serial = serial;
- setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int));
-
broadway_output_write_header (output);
- output->zfd = gzdopen(fd, "wb");
-
- /* Need an initial multipart boundary */
- send_boundary (output);
-
return output;
}
void
broadway_output_free (BroadwayOutput *output)
{
- if (output->zfd)
- gzclose (output->zfd);
- else
- close (output->fd);
+ g_object_unref (output->out);
free (output);
}
@@ -583,7 +503,6 @@ int
broadway_output_flush (BroadwayOutput *output)
{
send_boundary (output);
- gzflush (output->zfd, Z_SYNC_FLUSH);
return !output->error;
}
diff --git a/gdk/broadway/broadway.h b/gdk/broadway/broadway.h
index fce53a3..1258265 100644
--- a/gdk/broadway/broadway.h
+++ b/gdk/broadway/broadway.h
@@ -1,4 +1,5 @@
#include <glib.h>
+#include <gio/gio.h>
typedef struct BroadwayOutput BroadwayOutput;
@@ -7,7 +8,7 @@ typedef struct {
int width, height;
} BroadwayRect;
-BroadwayOutput *broadway_output_new (int fd,
+BroadwayOutput *broadway_output_new (GOutputStream *out,
guint32 serial);
void broadway_output_free (BroadwayOutput *output);
int broadway_output_flush (BroadwayOutput *output);
diff --git a/gdk/broadway/broadway.js b/gdk/broadway/broadway.js
index 7870eef..ba5b2fa 100644
--- a/gdk/broadway/broadway.js
+++ b/gdk/broadway/broadway.js
@@ -888,10 +888,10 @@ function handleOutstanding()
}
}
-function handleLoad(event)
+function handleMessage(message)
{
var cmdObj = {};
- cmdObj.data = event.target.responseText;
+ cmdObj.data = message;
cmdObj.pos = 0;
outstandingCommands.push(cmdObj);
@@ -2768,22 +2768,10 @@ function connect()
if (params[0].indexOf("toplevel") != -1)
useToplevelWindows = true;
}
- var xhr = createXHR();
- if (xhr) {
- if (typeof xhr.multipart == 'undefined') {
- alert("Sorry, this example only works in browsers that support multipart.");
- return;
- }
-
- xhr.multipart = true;
- xhr.open("GET", "/output", true);
- xhr.onload = handleLoad;
- xhr.send(null);
- }
if ("WebSocket" in window) {
var loc = window.location.toString().replace("http:", "ws:");
- loc = loc.substr(0, loc.lastIndexOf('/')) + "/input";
+ loc = loc.substr(0, loc.lastIndexOf('/')) + "/socket";
var ws = new WebSocket(loc, "broadway");
ws.onopen = function() {
inputSocket = ws;
@@ -2806,6 +2794,9 @@ function connect()
ws.onclose = function() {
inputSocket = null;
};
+ ws.onmessage = function(event) {
+ handleMessage(event.data);
+ };
} else {
alert("WebSocket not supported, input will not work!");
}
diff --git a/gdk/broadway/gdkdisplay-broadway.c b/gdk/broadway/gdkdisplay-broadway.c
index cdab101..6a8ca21 100644
--- a/gdk/broadway/gdkdisplay-broadway.c
+++ b/gdk/broadway/gdkdisplay-broadway.c
@@ -40,6 +40,10 @@
#include <string.h>
#include <errno.h>
#include <unistd.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
static void gdk_broadway_display_dispose (GObject *object);
static void gdk_broadway_display_finalize (GObject *object);
@@ -124,6 +128,8 @@ typedef struct HttpRequest {
GString *request;
} HttpRequest;
+static void start_output (HttpRequest *request);
+
static void
http_request_free (HttpRequest *request)
{
@@ -495,21 +501,6 @@ _gdk_broadway_display_block_for_input (GdkDisplay *display, char op,
}
}
-#include <unistd.h>
-#include <fcntl.h>
-static void
-set_fd_blocking (int fd)
-{
- glong arg;
-
- if ((arg = fcntl (fd, F_GETFL, NULL)) < 0)
- arg = 0;
-
- arg = arg & ~O_NONBLOCK;
-
- fcntl (fd, F_SETFL, arg);
-}
-
static char *
parse_line (char *line, char *key)
{
@@ -657,7 +648,7 @@ start_input (HttpRequest *request)
"Upgrade: WebSocket\r\n"
"Connection: Upgrade\r\n"
"Sec-WebSocket-Origin: %s\r\n"
- "Sec-WebSocket-Location: ws://%s/input\r\n"
+ "Sec-WebSocket-Location: ws://%s/socket\r\n"
"Sec-WebSocket-Protocol: broadway\r\n"
"\r\n",
origin, host);
@@ -679,6 +670,8 @@ start_input (HttpRequest *request)
broadway_display->input = input;
+ start_output (request);
+
/* This will free and close the data input stream, but we got all the buffered content already */
http_request_free (request);
@@ -699,14 +692,13 @@ start_output (HttpRequest *request)
{
GSocket *socket;
GdkBroadwayDisplay *broadway_display;
- int fd;
+ int flag = 1;
socket = g_socket_connection_get_socket (request->connection);
+ setsockopt(g_socket_get_fd (socket), IPPROTO_TCP,
+ TCP_NODELAY, (char *) &flag, sizeof(int));
broadway_display = GDK_BROADWAY_DISPLAY (request->display);
- fd = g_socket_get_fd (socket);
- set_fd_blocking (fd);
- /* We dup this because otherwise it'll be closed with the request SocketConnection */
if (broadway_display->output)
{
@@ -714,15 +706,16 @@ start_output (HttpRequest *request)
broadway_output_free (broadway_display->output);
}
- broadway_display->output = broadway_output_new (dup(fd), broadway_display->saved_serial);
+ broadway_display->output =
+ broadway_output_new (g_io_stream_get_output_stream (G_IO_STREAM (request->connection)),
+ broadway_display->saved_serial);
+
_gdk_broadway_resync_windows ();
if (broadway_display->pointer_grab_window)
broadway_output_grab_pointer (broadway_display->output,
GDK_WINDOW_IMPL_BROADWAY (broadway_display->pointer_grab_window->impl)->id,
broadway_display->pointer_grab_owner_events);
-
- http_request_free (request);
}
static void
@@ -787,9 +780,7 @@ got_request (HttpRequest *request)
send_data (request, "text/html", client_html, G_N_ELEMENTS(client_html) - 1);
else if (strcmp (escaped, "/broadway.js") == 0)
send_data (request, "text/javascript", broadway_js, G_N_ELEMENTS(broadway_js) - 1);
- else if (strcmp (escaped, "/output") == 0)
- start_output (request);
- else if (strcmp (escaped, "/input") == 0)
+ else if (strcmp (escaped, "/socket") == 0)
start_input (request);
else
send_error (request, 404, "File not found");
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]