[phodav: 12/14] spice: move OutputQueue to file



commit f10bda01842e2925b35952e5637b8e05f00ecc2c
Author: Jakub Janků <jjanku redhat com>
Date:   Thu May 16 11:51:59 2019 +0200

    spice: move OutputQueue to file
    
    OutputQueue is a self-contained unit and as such can be put in
    a separate file to make the spice-webdavd.c less cluttered.
    
    Also, as the current implementation defines output_queue_{ref, unref},
    turn OutputQueue into a GObject which can handle these for us.
    
    Signed-off-by: Jakub Janků <jjanku redhat com>

 spice/meson.build     |   8 ++-
 spice/output-queue.c  | 164 ++++++++++++++++++++++++++++++++++++++++++++++++++
 spice/output-queue.h  |  38 ++++++++++++
 spice/spice-webdavd.c | 162 ++-----------------------------------------------
 4 files changed, 214 insertions(+), 158 deletions(-)
---
diff --git a/spice/meson.build b/spice/meson.build
index 6db22cc..06d20e6 100644
--- a/spice/meson.build
+++ b/spice/meson.build
@@ -4,9 +4,15 @@ if host_machine.system() == 'windows'
   win32_deps += compiler.find_library('mpr')
 endif
 
+sources = [
+  'spice-webdavd.c',
+  'output-queue.c',
+  'output-queue.h'
+]
+
 executable(
   'spice-webdavd',
-  [ 'spice-webdavd.c' ],
+  sources,
   install_dir : sbindir,
   include_directories : incdir,
   dependencies : win32_deps + avahi_deps + deps,
diff --git a/spice/output-queue.c b/spice/output-queue.c
new file mode 100644
index 0000000..0c4034f
--- /dev/null
+++ b/spice/output-queue.c
@@ -0,0 +1,164 @@
+/*
+ * Copyright (C) 2019 Red Hat, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, see <http://www.gnu.org/licenses/>.
+ */
+#include <config.h>
+
+#include "output-queue.h"
+
+typedef struct _OutputQueueElem
+{
+  OutputQueue  *queue;
+  const guint8 *buf;
+  gsize         size;
+  PushedCb      cb;
+  gpointer      user_data;
+} OutputQueueElem;
+
+struct _OutputQueue
+{
+  GObject        parent_instance;
+  GOutputStream *output;
+  gboolean       flushing;
+  guint          idle_id;
+  GQueue        *queue;
+  GCancellable  *cancel;
+};
+
+G_DEFINE_TYPE (OutputQueue, output_queue, G_TYPE_OBJECT);
+
+static void output_queue_init (OutputQueue *self)
+{
+  self->queue = g_queue_new ();
+}
+
+static void output_queue_finalize (GObject *obj)
+{
+  OutputQueue *self = OUTPUT_QUEUE (obj);
+
+  g_warn_if_fail (g_queue_get_length (self->queue) == 0);
+  g_warn_if_fail (!self->flushing);
+  g_warn_if_fail (!self->idle_id);
+
+  g_queue_free_full (self->queue, g_free);
+  g_object_unref (self->output);
+  g_object_unref (self->cancel);
+
+  G_OBJECT_CLASS (output_queue_parent_class)->finalize (obj);
+}
+
+static void output_queue_class_init (OutputQueueClass *klass)
+{
+  GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
+  gobject_class->finalize = output_queue_finalize;
+}
+
+OutputQueue* output_queue_new (GOutputStream *output, GCancellable *cancel)
+{
+  OutputQueue *self = g_object_new (OUTPUT_TYPE_QUEUE, NULL);
+  self->output = g_object_ref (output);
+  self->cancel = g_object_ref (cancel);
+  return self;
+}
+
+static gboolean output_queue_idle (gpointer user_data);
+
+static void
+output_queue_flush_cb (GObject      *source_object,
+                       GAsyncResult *res,
+                       gpointer      user_data)
+{
+  GError *error = NULL;
+  OutputQueueElem *e = user_data;
+  OutputQueue *q = e->queue;
+
+  g_debug ("flushed");
+  q->flushing = FALSE;
+  g_output_stream_flush_finish (G_OUTPUT_STREAM (source_object),
+                                res, &error);
+  if (error)
+    g_warning ("error: %s", error->message);
+
+  g_clear_error (&error);
+
+  if (!q->idle_id)
+    q->idle_id = g_idle_add (output_queue_idle, g_object_ref (q));
+
+  g_free (e);
+  g_object_unref (q);
+}
+
+static gboolean
+output_queue_idle (gpointer user_data)
+{
+  OutputQueue *q = user_data;
+  OutputQueueElem *e = NULL;
+  GError *error = NULL;
+
+  if (q->flushing)
+    {
+      g_debug ("already flushing");
+      goto end;
+    }
+
+  e = g_queue_pop_head (q->queue);
+  if (!e)
+    {
+      g_debug ("No more data to flush");
+      goto end;
+    }
+
+  g_debug ("flushing %" G_GSIZE_FORMAT, e->size);
+  g_output_stream_write_all (q->output, e->buf, e->size, NULL, q->cancel, &error);
+  if (e->cb)
+    e->cb (q, e->user_data, error);
+
+  if (error)
+      goto end;
+
+  q->flushing = TRUE;
+  g_output_stream_flush_async (q->output, G_PRIORITY_DEFAULT, q->cancel, output_queue_flush_cb, e);
+
+  q->idle_id = 0;
+  return FALSE;
+
+end:
+  g_clear_error (&error);
+  q->idle_id = 0;
+  g_free (e);
+  g_object_unref (q);
+
+  return FALSE;
+}
+
+void
+output_queue_push (OutputQueue *q, const guint8 *buf, gsize size,
+                   PushedCb pushed_cb, gpointer user_data)
+{
+  OutputQueueElem *e;
+
+  g_return_if_fail (q != NULL);
+
+  e = g_new (OutputQueueElem, 1);
+  e->buf = buf;
+  e->size = size;
+  e->cb = pushed_cb;
+  e->user_data = user_data;
+  e->queue = q;
+  g_queue_push_tail (q->queue, e);
+
+  if (!q->idle_id && !q->flushing)
+    q->idle_id = g_idle_add (output_queue_idle, g_object_ref (q));
+}
diff --git a/spice/output-queue.h b/spice/output-queue.h
new file mode 100644
index 0000000..21ba7a9
--- /dev/null
+++ b/spice/output-queue.h
@@ -0,0 +1,38 @@
+/*
+ * Copyright (C) 2019 Red Hat, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef __OUTPUT_QUEUE_H
+#define __OUTPUT_QUEUE_H
+
+#include <gio/gio.h>
+#include <glib-object.h>
+
+G_BEGIN_DECLS
+
+#define OUTPUT_TYPE_QUEUE output_queue_get_type ()
+G_DECLARE_FINAL_TYPE (OutputQueue, output_queue, OUTPUT, QUEUE, GObject);
+
+OutputQueue* output_queue_new (GOutputStream *output, GCancellable *cancel);
+
+typedef void (*PushedCb) (OutputQueue *q, gpointer user_data, GError *error);
+
+void output_queue_push (OutputQueue *q, const guint8 *buf, gsize size,
+                        PushedCb pushed_cb, gpointer user_data);
+
+G_END_DECLS
+
+#endif
diff --git a/spice/spice-webdavd.c b/spice/spice-webdavd.c
index 61a3cc7..f814890 100644
--- a/spice/spice-webdavd.c
+++ b/spice/spice-webdavd.c
@@ -40,25 +40,7 @@
 #include <avahi-gobject/ga-entry-group.h>
 #endif
 
-typedef struct _OutputQueue
-{
-  guint          refs;
-  GOutputStream *output;
-  gboolean       flushing;
-  guint          idle_id;
-  GQueue        *queue;
-} OutputQueue;
-
-typedef void (*PushedCb) (OutputQueue *q, gpointer user_data, GError *error);
-
-typedef struct _OutputQueueElem
-{
-  OutputQueue  *queue;
-  const guint8 *buf;
-  gsize         size;
-  PushedCb      cb;
-  gpointer      user_data;
-} OutputQueueElem;
+#include "output-queue.h"
 
 typedef struct _ServiceData
 {
@@ -70,139 +52,6 @@ typedef struct _ServiceData
 
 static GCancellable *cancel;
 
-static OutputQueue*
-output_queue_new (GOutputStream *output)
-{
-  OutputQueue *queue = g_new0 (OutputQueue, 1);
-
-  queue->output = g_object_ref (output);
-  queue->queue = g_queue_new ();
-  queue->refs = 1;
-
-  return queue;
-}
-
-static
-void
-output_queue_free (OutputQueue *queue)
-{
-  g_warn_if_fail (g_queue_get_length (queue->queue) == 0);
-  g_warn_if_fail (!queue->flushing);
-  g_warn_if_fail (!queue->idle_id);
-
-  g_queue_free_full (queue->queue, g_free);
-  g_clear_object (&queue->output);
-  g_free (queue);
-}
-
-static OutputQueue*
-output_queue_ref (OutputQueue *q)
-{
-  q->refs++;
-  return q;
-}
-
-static void
-output_queue_unref (OutputQueue *q)
-{
-  g_return_if_fail (q != NULL);
-
-  q->refs--;
-  if (q->refs == 0)
-    output_queue_free (q);
-}
-
-static gboolean output_queue_idle (gpointer user_data);
-
-static void
-output_queue_flush_cb (GObject      *source_object,
-                       GAsyncResult *res,
-                       gpointer      user_data)
-{
-  GError *error = NULL;
-  OutputQueueElem *e = user_data;
-  OutputQueue *q = e->queue;
-
-  g_debug ("flushed");
-  q->flushing = FALSE;
-  g_output_stream_flush_finish (G_OUTPUT_STREAM (source_object),
-                                res, &error);
-  if (error)
-    g_warning ("error: %s", error->message);
-
-  g_clear_error (&error);
-
-  if (!q->idle_id)
-    q->idle_id = g_idle_add (output_queue_idle, output_queue_ref (q));
-
-  g_free (e);
-  output_queue_unref (q);
-}
-
-static gboolean
-output_queue_idle (gpointer user_data)
-{
-  OutputQueue *q = user_data;
-  OutputQueueElem *e = NULL;
-  GError *error = NULL;
-
-  if (q->flushing)
-    {
-      g_debug ("already flushing");
-      goto end;
-    }
-
-  e = g_queue_pop_head (q->queue);
-  if (!e)
-    {
-      g_debug ("No more data to flush");
-      goto end;
-    }
-
-  g_debug ("flushing %" G_GSIZE_FORMAT, e->size);
-  g_output_stream_write_all (q->output, e->buf, e->size, NULL, cancel, &error);
-  if (e->cb)
-    e->cb (q, e->user_data, error);
-
-  if (error)
-      goto end;
-
-  q->flushing = TRUE;
-  g_output_stream_flush_async (q->output, G_PRIORITY_DEFAULT, cancel, output_queue_flush_cb, e);
-
-  q->idle_id = 0;
-  return FALSE;
-
-end:
-  g_clear_error (&error);
-  q->idle_id = 0;
-  g_free (e);
-  output_queue_unref (q);
-
-  return FALSE;
-}
-
-static void
-output_queue_push (OutputQueue *q, const guint8 *buf, gsize size,
-                   PushedCb pushed_cb, gpointer user_data)
-{
-  OutputQueueElem *e;
-
-  g_return_if_fail (q != NULL);
-
-  e = g_new (OutputQueueElem, 1);
-  e->buf = buf;
-  e->size = size;
-  e->cb = pushed_cb;
-  e->user_data = user_data;
-  e->queue = q;
-  g_queue_push_tail (q->queue, e);
-
-  if (!q->idle_id && !q->flushing)
-    q->idle_id = g_idle_add (output_queue_idle, output_queue_ref (q));
-}
-
-
 static struct _DemuxData
 {
   gint64  client;
@@ -273,7 +122,7 @@ add_client (GSocketConnection *client_connection)
   client->client_connection = g_object_ref (client_connection);
   // TODO: check if usage of this idiom is portable, or if we need to check collisions
   client->id = GPOINTER_TO_INT (client_connection);
-  client->queue = output_queue_new (bostream);
+  client->queue = output_queue_new (bostream, cancel);
   g_object_unref (bostream);
 
   g_hash_table_insert (clients, &client->id, client);
@@ -289,7 +138,7 @@ client_free (Client *c)
 
   g_io_stream_close (G_IO_STREAM (c->client_connection), NULL, NULL);
   g_object_unref (c->client_connection);
-  output_queue_unref (c->queue);
+  g_object_unref (c->queue);
   g_free (c);
 }
 
@@ -741,7 +590,7 @@ open_mux_path (const char *path)
   mux_istream = G_INPUT_STREAM (g_win32_input_stream_new (port_handle, TRUE));
 #endif
 
-  mux_queue = output_queue_new (G_OUTPUT_STREAM (mux_ostream));
+  mux_queue = output_queue_new (G_OUTPUT_STREAM (mux_ostream), cancel);
 }
 
 #ifdef G_OS_WIN32
@@ -1011,12 +860,11 @@ run_service (ServiceData *service_data)
   g_clear_object (&mux_istream);
   g_clear_object (&mux_ostream);
 
-  output_queue_unref (mux_queue);
+  g_clear_object (&mux_queue);
   g_hash_table_unref (clients);
 
   g_socket_service_stop (socket_service);
 
-  mux_queue = NULL;
   g_clear_object (&cancel);
 
 #ifdef G_OS_WIN32


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]