[glib/glib-2-30] GDBusWorker: move flush async op into continue_writing()



commit d0d04c091aa1453bad69e05fbdd95065965c2950
Author: Simon McVittie <simon mcvittie collabora co uk>
Date:   Tue Nov 1 19:09:19 2011 +0000

    GDBusWorker: move flush async op into continue_writing()
    
    This makes it easier to schedule a flush, by putting it on the same code
    path as writing and closing.
    
    Also change message_written to expect the lock to be held, since all
    that's left in that function either wants to hold the lock or doesn't
    care, and it's silly to release the lock immediately before calling
    message_written, which just takes it again.
    
    Bug: https://bugzilla.gnome.org/show_bug.cgi?id=662395
    Signed-off-by: Simon McVittie <simon mcvittie collabora co uk>
    Reviewed-by: Cosimo Alfarano <cosimo alfarano collabora co uk>

 gio/gdbusprivate.c |  101 +++++++++++++++++++++++++++++++++++-----------------
 1 files changed, 68 insertions(+), 33 deletions(-)
---
diff --git a/gio/gdbusprivate.c b/gio/gdbusprivate.c
index 28cfeac..955e282 100644
--- a/gio/gdbusprivate.c
+++ b/gio/gdbusprivate.c
@@ -1131,7 +1131,7 @@ write_message_async (GDBusWorker         *worker,
   write_message_continue_writing (data);
 }
 
-/* called in private thread shared by all GDBusConnection instances (without write-lock held) */
+/* called in private thread shared by all GDBusConnection instances (with write-lock held) */
 static gboolean
 write_message_finish (GAsyncResult   *res,
                       GError        **error)
@@ -1225,17 +1225,27 @@ ostream_flush_cb (GObject      *source_object,
 /* called in private thread shared by all GDBusConnection instances
  *
  * write-lock is not held on entry
- * output_pending is PENDING_NONE on entry
+ * output_pending is PENDING_FLUSH on entry
  */
 static void
-message_written (GDBusWorker *worker,
-                 MessageToWriteData *message_data)
+start_flush (FlushAsyncData *data)
 {
-  GList *l;
-  GList *ll;
-  GList *flushers;
+  g_output_stream_flush_async (g_io_stream_get_output_stream (data->worker->stream),
+                               G_PRIORITY_DEFAULT,
+                               data->worker->cancellable,
+                               ostream_flush_cb,
+                               data);
+}
 
-  /* first log the fact that we wrote a message */
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is held on entry
+ * output_pending is PENDING_NONE on entry
+ */
+static void
+message_written_unlocked (GDBusWorker *worker,
+                          MessageToWriteData *message_data)
+{
   if (G_UNLIKELY (_g_dbus_debug_message ()))
     {
       gchar *s;
@@ -1256,10 +1266,24 @@ message_written (GDBusWorker *worker,
       _g_dbus_debug_print_unlock ();
     }
 
-  /* then first wake up pending flushes and, if needed, flush the stream */
-  flushers = NULL;
-  g_mutex_lock (worker->write_lock);
   worker->write_num_messages_written += 1;
+}
+
+/* called in private thread shared by all GDBusConnection instances
+ *
+ * write-lock is held on entry
+ * output_pending is PENDING_NONE on entry
+ *
+ * Returns: non-%NULL, setting @output_pending, if we need to flush now
+ */
+static FlushAsyncData *
+prepare_flush_unlocked (GDBusWorker *worker)
+{
+  GList *l;
+  GList *ll;
+  GList *flushers;
+
+  flushers = NULL;
   for (l = worker->write_pending_flushes; l != NULL; l = ll)
     {
       FlushData *f = l->data;
@@ -1276,26 +1300,18 @@ message_written (GDBusWorker *worker,
       g_assert (worker->output_pending == PENDING_NONE);
       worker->output_pending = PENDING_FLUSH;
     }
-  g_mutex_unlock (worker->write_lock);
 
   if (flushers != NULL)
     {
       FlushAsyncData *data;
+
       data = g_new0 (FlushAsyncData, 1);
       data->worker = _g_dbus_worker_ref (worker);
       data->flushers = flushers;
-      /* flush the stream before writing the next message */
-      g_output_stream_flush_async (g_io_stream_get_output_stream (worker->stream),
-                                   G_PRIORITY_DEFAULT,
-                                   worker->cancellable,
-                                   ostream_flush_cb,
-                                   data);
-    }
-  else
-    {
-      /* kick off the next write! */
-      continue_writing (worker);
+      return data;
     }
+
+  return NULL;
 }
 
 /* called in private thread shared by all GDBusConnection instances
@@ -1314,21 +1330,24 @@ write_message_cb (GObject       *source_object,
   g_mutex_lock (data->worker->write_lock);
   g_assert (data->worker->output_pending == PENDING_WRITE);
   data->worker->output_pending = PENDING_NONE;
-  g_mutex_unlock (data->worker->write_lock);
 
   error = NULL;
   if (!write_message_finish (res, &error))
     {
+      g_mutex_unlock (data->worker->write_lock);
+
       /* TODO: handle */
       _g_dbus_worker_emit_disconnected (data->worker, TRUE, error);
       g_error_free (error);
+
+      g_mutex_lock (data->worker->write_lock);
     }
 
-  /* this function will also kick of the next write (it might need to
-   * flush so writing the next message might happen much later
-   * e.g. async)
-   */
-  message_written (data->worker, data);
+  message_written_unlocked (data->worker, data);
+
+  g_mutex_unlock (data->worker->write_lock);
+
+  continue_writing (data->worker);
 
   message_to_write_data_free (data);
 }
@@ -1412,6 +1431,7 @@ static void
 continue_writing (GDBusWorker *worker)
 {
   MessageToWriteData *data;
+  FlushAsyncData *flush_async_data;
 
  write_next:
   /* we mustn't try to write two things at once */
@@ -1432,10 +1452,19 @@ continue_writing (GDBusWorker *worker)
     }
   else
     {
-      data = g_queue_pop_head (worker->write_queue);
+      flush_async_data = prepare_flush_unlocked (worker);
 
-      if (data != NULL)
-        worker->output_pending = PENDING_WRITE;
+      if (flush_async_data == NULL)
+        {
+          data = g_queue_pop_head (worker->write_queue);
+
+          if (data != NULL)
+            worker->output_pending = PENDING_WRITE;
+        }
+      else
+        {
+          data = NULL;
+        }
     }
 
   g_mutex_unlock (worker->write_lock);
@@ -1448,7 +1477,13 @@ continue_writing (GDBusWorker *worker)
    * code and then writing the message out onto the GIOStream since this
    * function only runs on the worker thread.
    */
-  if (data != NULL)
+
+  if (flush_async_data != NULL)
+    {
+      start_flush (flush_async_data);
+      g_assert (data == NULL);
+    }
+  else if (data != NULL)
     {
       GDBusMessage *old_message;
       guchar *new_blob;



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]