[phodav] spice: OutputQueue refactor



commit da0bfa1e921ef886262bbb718fff26dcb17a25f8
Author: Jakub Janků <jjanku redhat com>
Date:   Sat Aug 24 11:13:31 2019 +0200

    spice: OutputQueue refactor
    
    The main issue with the current implementation is that
    g_output_stream_write_all() in output_queue_idle()
    is called synchronously in the main context.
    This is fine for GBufferedOutputStream, since the data is just
    copied to a buffer, but not for GUnixOutputStream or
    GWin32OutputStream (class of mux_ostream) which might block
    the main thread.
    
    OutputQueue is also used for each client, where it doesn't make
    much sense because the queue has never more than 1 item at a time.
    So let's just use g_output_stream_write_all() instead.
    
    Signed-off-by: Jakub Janků <jjanku redhat com>

 spice/output-queue.c  | 93 +++++++++++++++------------------------------------
 spice/spice-webdavd.c | 50 +++++++++++----------------
 2 files changed, 47 insertions(+), 96 deletions(-)
---
diff --git a/spice/output-queue.c b/spice/output-queue.c
index 0c4034f..0e1dc76 100644
--- a/spice/output-queue.c
+++ b/spice/output-queue.c
@@ -20,7 +20,6 @@
 
 typedef struct _OutputQueueElem
 {
-  OutputQueue  *queue;
   const guint8 *buf;
   gsize         size;
   PushedCb      cb;
@@ -31,14 +30,15 @@ struct _OutputQueue
 {
   GObject        parent_instance;
   GOutputStream *output;
-  gboolean       flushing;
-  guint          idle_id;
+  gboolean       writing;
   GQueue        *queue;
   GCancellable  *cancel;
 };
 
 G_DEFINE_TYPE (OutputQueue, output_queue, G_TYPE_OBJECT);
 
+static void output_queue_kick (OutputQueue *q);
+
 static void output_queue_init (OutputQueue *self)
 {
   self->queue = g_queue_new ();
@@ -48,10 +48,6 @@ static void output_queue_finalize (GObject *obj)
 {
   OutputQueue *self = OUTPUT_QUEUE (obj);
 
-  g_warn_if_fail (g_queue_get_length (self->queue) == 0);
-  g_warn_if_fail (!self->flushing);
-  g_warn_if_fail (!self->idle_id);
-
   g_queue_free_full (self->queue, g_free);
   g_object_unref (self->output);
   g_object_unref (self->cancel);
@@ -73,74 +69,41 @@ OutputQueue* output_queue_new (GOutputStream *output, GCancellable *cancel)
   return self;
 }
 
-static gboolean output_queue_idle (gpointer user_data);
-
 static void
-output_queue_flush_cb (GObject      *source_object,
-                       GAsyncResult *res,
-                       gpointer      user_data)
+write_cb (GObject *source_object,
+          GAsyncResult *res,
+          gpointer user_data)
 {
-  GError *error = NULL;
-  OutputQueueElem *e = user_data;
-  OutputQueue *q = e->queue;
-
-  g_debug ("flushed");
-  q->flushing = FALSE;
-  g_output_stream_flush_finish (G_OUTPUT_STREAM (source_object),
-                                res, &error);
-  if (error)
-    g_warning ("error: %s", error->message);
+  OutputQueue *q = user_data;
+  OutputQueueElem *e;
+  GError *err = NULL;
 
-  g_clear_error (&error);
+  e = g_queue_pop_head (q->queue);
+  g_output_stream_write_all_finish (G_OUTPUT_STREAM (source_object), res, NULL, &err);
 
-  if (!q->idle_id)
-    q->idle_id = g_idle_add (output_queue_idle, g_object_ref (q));
+  if (e->cb)
+    e->cb (q, e->user_data, err);
 
   g_free (e);
+  q->writing = FALSE;
+  if (!err)
+    output_queue_kick (q);
+  g_clear_error (&err);
   g_object_unref (q);
 }
 
-static gboolean
-output_queue_idle (gpointer user_data)
+static void
+output_queue_kick (OutputQueue *q)
 {
-  OutputQueue *q = user_data;
-  OutputQueueElem *e = NULL;
-  GError *error = NULL;
-
-  if (q->flushing)
-    {
-      g_debug ("already flushing");
-      goto end;
-    }
-
-  e = g_queue_pop_head (q->queue);
-  if (!e)
-    {
-      g_debug ("No more data to flush");
-      goto end;
-    }
-
-  g_debug ("flushing %" G_GSIZE_FORMAT, e->size);
-  g_output_stream_write_all (q->output, e->buf, e->size, NULL, q->cancel, &error);
-  if (e->cb)
-    e->cb (q, e->user_data, error);
-
-  if (error)
-      goto end;
-
-  q->flushing = TRUE;
-  g_output_stream_flush_async (q->output, G_PRIORITY_DEFAULT, q->cancel, output_queue_flush_cb, e);
-
-  q->idle_id = 0;
-  return FALSE;
+  OutputQueueElem *e;
 
-end:
-  g_clear_error (&error);
-  q->idle_id = 0;
-  g_free (e);
-  g_object_unref (q);
+  if (!q || q->writing || g_queue_is_empty (q->queue))
+    return;
 
-  return FALSE;
+  e = g_queue_peek_head (q->queue);
+  q->writing = TRUE;
+  g_output_stream_write_all_async (q->output, e->buf, e->size,
+    G_PRIORITY_DEFAULT, q->cancel, write_cb, g_object_ref (q));
 }
 
 void
@@ -156,9 +119,7 @@ output_queue_push (OutputQueue *q, const guint8 *buf, gsize size,
   e->size = size;
   e->cb = pushed_cb;
   e->user_data = user_data;
-  e->queue = q;
   g_queue_push_tail (q->queue, e);
 
-  if (!q->idle_id && !q->flushing)
-    q->idle_id = g_idle_add (output_queue_idle, g_object_ref (q));
+  output_queue_kick (q);
 }
diff --git a/spice/spice-webdavd.c b/spice/spice-webdavd.c
index 1b01158..10b62b6 100644
--- a/spice/spice-webdavd.c
+++ b/spice/spice-webdavd.c
@@ -68,7 +68,6 @@ typedef struct _Client
     guint8             buf[G_MAXUINT16];
   } mux;
   GSocketConnection *client_connection;
-  OutputQueue       *queue;
 } Client;
 
 static volatile gboolean quit_service;
@@ -110,22 +109,12 @@ signal_handler (gpointer user_data)
 static Client *
 add_client (GSocketConnection *client_connection)
 {
-  GIOStream *iostream = G_IO_STREAM (client_connection);
-  GOutputStream *ostream = g_io_stream_get_output_stream (iostream);
-  GOutputStream *bostream;
   Client *client;
-
-  bostream = g_buffered_output_stream_new (ostream);
-  g_buffered_output_stream_set_auto_grow (G_BUFFERED_OUTPUT_STREAM (bostream), TRUE);
-
   client = g_new0 (Client, 1);
   client->ref_count = 1;
   client->client_connection = g_object_ref (client_connection);
   // TODO: check if usage of this idiom is portable, or if we need to check collisions
   client->mux.id = GPOINTER_TO_INT (client_connection);
-  client->queue = output_queue_new (bostream, cancel);
-  g_object_unref (bostream);
-
   g_hash_table_insert (clients, &client->mux.id, client);
   g_warn_if_fail (g_hash_table_lookup (clients, &client->mux.id));
 
@@ -150,7 +139,6 @@ client_unref (gpointer user_data)
 
   g_io_stream_close (G_IO_STREAM (c->client_connection), NULL, NULL);
   g_object_unref (c->client_connection);
-  g_object_unref (c->queue);
   g_free (c);
 }
 
@@ -163,24 +151,21 @@ remove_client (Client *client)
 }
 
 static void
-handle_push_error (OutputQueue *q, gpointer user_data, GError *error)
+mux_pushed_client_cb (GObject *source_object,
+                      GAsyncResult *res,
+                      gpointer user_data)
 {
   Client *client = user_data;
+  GError *error = NULL;
+  g_output_stream_write_all_finish (G_OUTPUT_STREAM (source_object), res, NULL, &error);
 
-  if (!error)
-    return;
-
-  g_warning ("push error: %s", error->message);
-  remove_client (client);
-}
-
-static void
-mux_pushed_client_cb (OutputQueue *q, gpointer user_data, GError *error)
-{
-  if (error) {
-    handle_push_error (q, user_data, error);
-  }
-  client_unref (user_data);
+  if (error)
+    {
+      g_warning ("error pushing to client %p: %s", client, error->message);
+      g_error_free (error);
+      remove_client (client);
+    }
+  client_unref (client);
 
   start_mux_read (mux_istream);
 }
@@ -210,8 +195,12 @@ mux_data_read_cb (GObject      *source_object,
   g_warn_if_fail(c != NULL);
 
   if (c)
-    output_queue_push (c->queue, (guint8 *) demux.buf, demux.size,
-                       mux_pushed_client_cb, client_ref (c));
+    {
+      GOutputStream *out;
+      out = g_io_stream_get_output_stream (G_IO_STREAM (c->client_connection));
+      g_output_stream_write_all_async (out, demux.buf, demux.size,
+        G_PRIORITY_DEFAULT, cancel, mux_pushed_client_cb, client_ref (c));
+    }
   else
     start_mux_read (mux_istream);
 }
@@ -289,7 +278,8 @@ mux_pushed_cb (OutputQueue *q, gpointer user_data, GError *error)
 
   if (error)
     {
-      handle_push_error (q, client, error);
+      g_warning ("error pushing to mux from client %p: %s", client, error->message);
+      remove_client (client);
       goto end;
     }
 


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]