[gtk+] [broadway] Stream data over websocket



commit 0abd5e2767c83758d7305a3854428e715cf7af02
Author: Alexander Larsson <alexl redhat com>
Date:   Mon Apr 18 19:52:05 2011 +0200

    [broadway] Stream data over websocket
    
    The zlib compressed xmlhttprequest thing was a nice hack, but it doesn't
    really work in production. Its not portable, doesn't have enought API
    (missing notification for closed sockets) and having to synchronize
    between two different connections in a reliable way is a pain.
    
    So, we're going everything over the websocket. This is a pure switch,
    but after this we want to modify the protocol to work better over
    the uncompressed utf8 transport of websockets.

 gdk/broadway/broadway.c            |   99 +++--------------------------------
 gdk/broadway/broadway.h            |    3 +-
 gdk/broadway/broadway.js           |   21 ++------
 gdk/broadway/gdkdisplay-broadway.c |   43 ++++++---------
 4 files changed, 34 insertions(+), 132 deletions(-)
---
diff --git a/gdk/broadway/broadway.c b/gdk/broadway/broadway.c
index 3917145..710cb3a 100644
--- a/gdk/broadway/broadway.c
+++ b/gdk/broadway/broadway.c
@@ -4,10 +4,6 @@
 #include <assert.h>
 #include <errno.h>
 #include <zlib.h>
-#include <sys/types.h>
-#include <sys/socket.h>
-#include <netinet/in.h>
-#include <netinet/tcp.h>
 
 #include "broadway.h"
 
@@ -450,126 +446,50 @@ to_png_a (int w, int h, int byte_stride, guint8 *data)
  ************************************************************************/
 
 struct BroadwayOutput {
-  int fd;
-  gzFile *zfd;
+  GOutputStream *out;
   int error;
   guint32 serial;
 };
 
 static void
-broadway_output_write_raw (BroadwayOutput *output,
-			   const void *buf, gsize count)
+broadway_output_write_header (BroadwayOutput *output)
 {
-  gssize res;
-  int errsave;
-  const char *ptr = (const char *)buf;
-
-  if (output->error)
-    return;
-
-  while (count > 0)
-    {
-      res = write(output->fd, ptr, count);
-      if (res == -1)
-	{
-	  errsave = errno;
-	  if (errsave == EINTR)
-	    continue;
-	  output->error = TRUE;
-	  return;
-	}
-      if (res == 0)
-	{
-	  output->error = TRUE;
-	  return;
-	}
-      count -= res;
-      ptr += res;
-    }
+  g_output_stream_write (output->out, "\0", 1, NULL, NULL);
 }
 
 static void
 broadway_output_write (BroadwayOutput *output,
 		       const void *buf, gsize count)
 {
-  gssize res;
-  const char *ptr = (const char *)buf;
-
-  if (output->error)
-    return;
-
-  while (count > 0)
-    {
-      res = gzwrite(output->zfd, ptr, count);
-      if (res == -1)
-	{
-	  output->error = TRUE;
-	  return;
-	}
-      if (res == 0)
-	{
-	  output->error = TRUE;
-	  return;
-	}
-      count -= res;
-      ptr += res;
-    }
-}
-
-static void
-broadway_output_write_header (BroadwayOutput *output)
-{
-  char *header;
-
-  header =
-    "HTTP/1.1 200 OK\r\n"
-    "Content-type: multipart/x-mixed-replace;boundary=x\r\n"
-    "Content-Encoding: gzip\r\n"
-    "\r\n";
-  broadway_output_write_raw (output,
-			     header, strlen (header));
+  g_output_stream_write_all (output->out, buf, count, NULL, NULL, NULL);
 }
 
 static void
 send_boundary (BroadwayOutput *output)
 {
-  char *boundary =
-    "--x\r\n"
-    "\r\n";
-
-  broadway_output_write (output, boundary, strlen (boundary));
+  broadway_output_write (output, "\xff", 1);
+  broadway_output_write (output, "\0", 1);
 }
 
 BroadwayOutput *
-broadway_output_new(int fd, guint32 serial)
+broadway_output_new(GOutputStream *out, guint32 serial)
 {
   BroadwayOutput *output;
-  int flag = 1;
 
   output = g_new0 (BroadwayOutput, 1);
 
-  output->fd = fd;
+  output->out = g_object_ref (out);
   output->serial = serial;
 
-  setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *) &flag, sizeof(int));
-
   broadway_output_write_header (output);
 
-  output->zfd = gzdopen(fd, "wb");
-
-  /* Need an initial multipart boundary */
-  send_boundary (output);
-
   return output;
 }
 
 void
 broadway_output_free (BroadwayOutput *output)
 {
-  if (output->zfd)
-    gzclose (output->zfd);
-  else
-    close (output->fd);
+  g_object_unref (output->out);
   free (output);
 }
 
@@ -583,7 +503,6 @@ int
 broadway_output_flush (BroadwayOutput *output)
 {
   send_boundary (output);
-  gzflush (output->zfd, Z_SYNC_FLUSH);
   return !output->error;
 }
 
diff --git a/gdk/broadway/broadway.h b/gdk/broadway/broadway.h
index fce53a3..1258265 100644
--- a/gdk/broadway/broadway.h
+++ b/gdk/broadway/broadway.h
@@ -1,4 +1,5 @@
 #include <glib.h>
+#include <gio/gio.h>
 
 typedef struct BroadwayOutput BroadwayOutput;
 
@@ -7,7 +8,7 @@ typedef struct  {
     int width, height;
 } BroadwayRect;
 
-BroadwayOutput *broadway_output_new             (int             fd,
+BroadwayOutput *broadway_output_new             (GOutputStream  *out,
 						 guint32         serial);
 void            broadway_output_free            (BroadwayOutput *output);
 int             broadway_output_flush           (BroadwayOutput *output);
diff --git a/gdk/broadway/broadway.js b/gdk/broadway/broadway.js
index 7870eef..ba5b2fa 100644
--- a/gdk/broadway/broadway.js
+++ b/gdk/broadway/broadway.js
@@ -888,10 +888,10 @@ function handleOutstanding()
     }
 }
 
-function handleLoad(event)
+function handleMessage(message)
 {
     var cmdObj = {};
-    cmdObj.data = event.target.responseText;
+    cmdObj.data = message;
     cmdObj.pos = 0;
 
     outstandingCommands.push(cmdObj);
@@ -2768,22 +2768,10 @@ function connect()
 	if (params[0].indexOf("toplevel") != -1)
 	    useToplevelWindows = true;
     }
-    var xhr = createXHR();
-    if (xhr) {
-	if (typeof xhr.multipart == 'undefined') {
-	    alert("Sorry, this example only works in browsers that support multipart.");
-	    return;
-	}
-
-	xhr.multipart = true;
-	xhr.open("GET", "/output", true);
-	xhr.onload = handleLoad;
-	xhr.send(null);
-    }
 
     if ("WebSocket" in window) {
 	var loc = window.location.toString().replace("http:", "ws:");
-	loc = loc.substr(0, loc.lastIndexOf('/')) + "/input";
+	loc = loc.substr(0, loc.lastIndexOf('/')) + "/socket";
 	var ws = new WebSocket(loc, "broadway");
 	ws.onopen = function() {
 	    inputSocket = ws;
@@ -2806,6 +2794,9 @@ function connect()
 	ws.onclose = function() {
 	    inputSocket = null;
 	};
+	ws.onmessage = function(event) {
+	    handleMessage(event.data);
+	};
     } else {
 	alert("WebSocket not supported, input will not work!");
     }
diff --git a/gdk/broadway/gdkdisplay-broadway.c b/gdk/broadway/gdkdisplay-broadway.c
index cdab101..6a8ca21 100644
--- a/gdk/broadway/gdkdisplay-broadway.c
+++ b/gdk/broadway/gdkdisplay-broadway.c
@@ -40,6 +40,10 @@
 #include <string.h>
 #include <errno.h>
 #include <unistd.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
 
 static void   gdk_broadway_display_dispose            (GObject            *object);
 static void   gdk_broadway_display_finalize           (GObject            *object);
@@ -124,6 +128,8 @@ typedef struct HttpRequest {
   GString *request;
 }  HttpRequest;
 
+static void start_output (HttpRequest *request);
+
 static void
 http_request_free (HttpRequest *request)
 {
@@ -495,21 +501,6 @@ _gdk_broadway_display_block_for_input (GdkDisplay *display, char op,
   }
 }
 
-#include <unistd.h>
-#include <fcntl.h>
-static void
-set_fd_blocking (int fd)
-{
-  glong arg;
-
-  if ((arg = fcntl (fd, F_GETFL, NULL)) < 0)
-    arg = 0;
-
-  arg = arg & ~O_NONBLOCK;
-
-  fcntl (fd, F_SETFL, arg);
-}
-
 static char *
 parse_line (char *line, char *key)
 {
@@ -657,7 +648,7 @@ start_input (HttpRequest *request)
 			 "Upgrade: WebSocket\r\n"
 			 "Connection: Upgrade\r\n"
 			 "Sec-WebSocket-Origin: %s\r\n"
-			 "Sec-WebSocket-Location: ws://%s/input\r\n"
+			 "Sec-WebSocket-Location: ws://%s/socket\r\n"
 			 "Sec-WebSocket-Protocol: broadway\r\n"
 			 "\r\n",
 			 origin, host);
@@ -679,6 +670,8 @@ start_input (HttpRequest *request)
 
   broadway_display->input = input;
 
+  start_output (request);
+
   /* This will free and close the data input stream, but we got all the buffered content already */
   http_request_free (request);
 
@@ -699,14 +692,13 @@ start_output (HttpRequest *request)
 {
   GSocket *socket;
   GdkBroadwayDisplay *broadway_display;
-  int fd;
+  int flag = 1;
 
   socket = g_socket_connection_get_socket (request->connection);
+  setsockopt(g_socket_get_fd (socket), IPPROTO_TCP,
+	     TCP_NODELAY, (char *) &flag, sizeof(int));
 
   broadway_display = GDK_BROADWAY_DISPLAY (request->display);
-  fd = g_socket_get_fd (socket);
-  set_fd_blocking (fd);
-  /* We dup this because otherwise it'll be closed with the request SocketConnection */
 
   if (broadway_display->output)
     {
@@ -714,15 +706,16 @@ start_output (HttpRequest *request)
       broadway_output_free (broadway_display->output);
     }
 
-  broadway_display->output = broadway_output_new (dup(fd), broadway_display->saved_serial);
+  broadway_display->output =
+    broadway_output_new (g_io_stream_get_output_stream (G_IO_STREAM (request->connection)),
+			 broadway_display->saved_serial);
+
   _gdk_broadway_resync_windows ();
 
   if (broadway_display->pointer_grab_window)
     broadway_output_grab_pointer (broadway_display->output,
 				  GDK_WINDOW_IMPL_BROADWAY (broadway_display->pointer_grab_window->impl)->id,
 				  broadway_display->pointer_grab_owner_events);
-
-  http_request_free (request);
 }
 
 static void
@@ -787,9 +780,7 @@ got_request (HttpRequest *request)
     send_data (request, "text/html", client_html, G_N_ELEMENTS(client_html) - 1);
   else if (strcmp (escaped, "/broadway.js") == 0)
     send_data (request, "text/javascript", broadway_js, G_N_ELEMENTS(broadway_js) - 1);
-  else if (strcmp (escaped, "/output") == 0)
-    start_output (request);
-  else if (strcmp (escaped, "/input") == 0)
+  else if (strcmp (escaped, "/socket") == 0)
     start_input (request);
   else
     send_error (request, 404, "File not found");



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]