[gtk+/wip/alexl/broadway4: 3/5] broadwayd: Read using socket API



commit 84ac4ae57efc08c0d35e39779d8df033059e8510
Author: Alexander Larsson <alexl redhat com>
Date:   Fri Nov 17 15:22:06 2017 +0100

    broadwayd: Read using socket API
    
    This changes nothing, but it allows us to later recieve
    unix messages and thus fd passing

 gdk/broadway/broadwayd.c |  104 ++++++++++++++++++++++++++-------------------
 1 files changed, 60 insertions(+), 44 deletions(-)
---
diff --git a/gdk/broadway/broadwayd.c b/gdk/broadway/broadwayd.c
index f9ca737..e36c8fc 100644
--- a/gdk/broadway/broadwayd.c
+++ b/gdk/broadway/broadwayd.c
@@ -48,7 +48,9 @@ typedef struct {
 typedef struct  {
   guint32 id;
   GSocketConnection *connection;
-  GBufferedInputStream *in;
+  GInputStream *in;
+  GString *buffer;
+  GSource *source;
   GSList *serial_mappings;
   GList *windows;
   guint disconnect_idle;
@@ -62,6 +64,7 @@ client_free (BroadwayClient *client)
   clients = g_list_remove (clients, client);
   g_object_unref (client->connection);
   g_object_unref (client->in);
+  g_string_free (client->buffer, TRUE);
   g_slist_free_full (client->serial_mappings, g_free);
   g_free (client);
 }
@@ -77,6 +80,12 @@ client_disconnected (BroadwayClient *client)
       client->disconnect_idle = 0;
     }
 
+  if (client->source != 0)
+    {
+      g_source_destroy (client->source);
+      client->source = 0;
+    }
+
   for (l = client->windows; l != NULL; l = l->next)
     broadway_server_destroy_window (server,
                                    GPOINTER_TO_UINT (l->data));
@@ -320,53 +329,61 @@ client_handle_request (BroadwayClient *client,
                               before_serial - 1);
 }
 
-static void
-client_fill_cb (GObject *source_object,
-               GAsyncResult *result,
-               gpointer user_data)
+#define INPUT_BUFFER_SIZE 8192
+
+static gboolean
+client_input_cb (GPollableInputStream *stream,
+                 gpointer              user_data)
 {
   BroadwayClient *client = user_data;
+  GSocket *socket = g_socket_connection_get_socket (client->connection);
   gssize res;
+  gsize old_len;
+  guchar *buffer;
+  gsize buffer_len;
 
-  res = g_buffered_input_stream_fill_finish (client->in, result, NULL);
-  
-  if (res > 0)
-    {
-      guint32 size;
-      gsize count, remaining;
-      guint8 *buffer;
-
-      buffer = (guint8 *)g_buffered_input_stream_peek_buffer (client->in, &count);
+  old_len = client->buffer->len;
 
-      remaining = count;
-      while (remaining >= sizeof (guint32))
-       {
-         memcpy (&size, buffer, sizeof (guint32));
+  /* Ensure we have at least INPUT_BUFFER_SIZE extra space */
+  g_string_set_size (client->buffer, old_len + INPUT_BUFFER_SIZE);
+  g_string_set_size (client->buffer, old_len);
 
-         if (size <= remaining)
-           {
-             client_handle_request (client, (BroadwayRequest *)buffer);
+  res  = g_socket_receive_with_blocking (socket, client->buffer->str + old_len,
+                                         client->buffer->allocated_len - client->buffer->len -1,
+                                         FALSE, NULL, NULL);
 
-             remaining -= size;
-             buffer += size;
-           }
-       }
-      
-      /* This is guaranteed not to block */
-      g_input_stream_skip (G_INPUT_STREAM (client->in), count - remaining, NULL, NULL);
-      
-      g_buffered_input_stream_fill_async (client->in,
-                                         4*1024,
-                                         0,
-                                         NULL,
-                                         client_fill_cb, client);
-    }
-  else
+  if (res <= 0)
     {
+      client->source = NULL;
       client_disconnected (client);
+      return G_SOURCE_REMOVE;
     }
-}
 
+  g_string_set_size (client->buffer, old_len + res);
+
+  buffer = (guchar *)client->buffer->str;
+  buffer_len = client->buffer->len;
+
+  while (buffer_len >= sizeof (guint32))
+    {
+      guint32 size;
+
+      memcpy (&size, buffer, sizeof (guint32));
+      if (size <= buffer_len)
+        {
+          client_handle_request (client, (BroadwayRequest *)buffer);
+
+          buffer_len -= size;
+          buffer += size;
+        }
+      else
+        break;
+    }
+
+  g_string_erase (client->buffer, 0, client->buffer->len - buffer_len);
+
+  return G_SOURCE_CONTINUE;
+}
 
 static gboolean
 incoming_client (GSocketService    *service,
@@ -382,15 +399,14 @@ incoming_client (GSocketService    *service,
   client->connection = g_object_ref (connection);
 
   input = g_io_stream_get_input_stream (G_IO_STREAM (client->connection));
-  client->in = (GBufferedInputStream *)g_buffered_input_stream_new (input);
+  client->in = input;
+  client->buffer = g_string_sized_new (INPUT_BUFFER_SIZE);
+  client->source = g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM (input), NULL);
 
-  clients = g_list_prepend (clients, client);
+  g_source_set_callback (client->source, (GSourceFunc) client_input_cb, client, NULL);
+  g_source_attach (client->source, NULL);
 
-  g_buffered_input_stream_fill_async (client->in,
-                                     4*1024,
-                                     0,
-                                     NULL,
-                                     client_fill_cb, client);
+  clients = g_list_prepend (clients, client);
 
   /* Send initial resize notify */
   ev.base.type = BROADWAY_EVENT_SCREEN_SIZE_CHANGED;


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]