[glib] Bug 618882 – No way to ensure that a message is sent



commit 62a1ccf526e7b23ac39cdf7251eac5706eef3f57
Author: David Zeuthen <davidz redhat com>
Date:   Wed Jul 7 15:00:23 2010 -0400

    Bug 618882 â?? No way to ensure that a message is sent
    
    Add g_dbus_connection_flush{_finish,sync}().
    
    https://bugzilla.gnome.org/show_bug.cgi?id=618882
    
    Signed-off-by: David Zeuthen <davidz redhat com>

 docs/reference/gio/gio-sections.txt       |    3 +
 gio/gdbusconnection.c                     |  157 ++++++++++++++++++++++++++++-
 gio/gdbusconnection.h                     |   14 +++
 gio/gdbusprivate.c                        |   81 +++++++++++++++
 gio/gdbusprivate.h                        |    5 +
 gio/gio.symbols                           |    3 +
 gio/tests/Makefile.am                     |    4 +
 gio/tests/gdbus-connection-flush-helper.c |   60 +++++++++++
 gio/tests/gdbus-connection.c              |   82 +++++++++++++++
 9 files changed, 406 insertions(+), 3 deletions(-)
---
diff --git a/docs/reference/gio/gio-sections.txt b/docs/reference/gio/gio-sections.txt
index 4e10c7f..617b893 100644
--- a/docs/reference/gio/gio-sections.txt
+++ b/docs/reference/gio/gio-sections.txt
@@ -2396,6 +2396,9 @@ g_dbus_connection_start_message_processing
 GDBusCapabilityFlags
 g_dbus_connection_close
 g_dbus_connection_is_closed
+g_dbus_connection_flush
+g_dbus_connection_flush_finish
+g_dbus_connection_flush_sync
 g_dbus_connection_get_exit_on_close
 g_dbus_connection_set_exit_on_close
 g_dbus_connection_get_stream
diff --git a/gio/gdbusconnection.c b/gio/gdbusconnection.c
index 9b94d1d..1fa49c6 100644
--- a/gio/gdbusconnection.c
+++ b/gio/gdbusconnection.c
@@ -882,6 +882,151 @@ g_dbus_connection_get_capabilities (GDBusConnection *connection)
   return connection->priv->capabilities;
 }
 
+/* ---------------------------------------------------------------------------------------------------- */
+
+static void
+flush_in_thread_func (GSimpleAsyncResult *res,
+                      GObject            *object,
+                      GCancellable       *cancellable)
+{
+  GError *error;
+
+  error = NULL;
+  if (!g_dbus_connection_flush_sync (G_DBUS_CONNECTION (object),
+                                     cancellable,
+                                     &error))
+    {
+      g_simple_async_result_set_from_error (res, error);
+      g_error_free (error);
+    }
+}
+
+/**
+ * g_dbus_connection_flush:
+ * @connection: A #GDBusConnection.
+ * @cancellable: A #GCancellable or %NULL.
+ * @callback: A #GAsyncReadyCallback to call when the request is satisfied or %NULL if you don't
+ * care about the result.
+ * @user_data: The data to pass to @callback.
+ *
+ * Asynchronously flushes @connection, that is, writes all queued
+ * outgoing message to the transport and then flushes the transport
+ * (using g_output_stream_flush_async()). This is useful in programs
+ * that wants to emit a D-Bus signal and then exit
+ * immediately. Without flushing the connection, there is no guarantee
+ * that the message has been sent to the networking buffers in the OS
+ * kernel.
+ *
+ * This is an asynchronous method. When the operation is finished,
+ * @callback will be invoked in the <link
+ * linkend="g-main-context-push-thread-default">thread-default main
+ * loop</link> of the thread you are calling this method from. You can
+ * then call g_dbus_connection_flush_finish() to get the result of the
+ * operation.  See g_dbus_connection_flush_sync() for the synchronous
+ * version.
+ *
+ * Since: 2.26
+ */
+void
+g_dbus_connection_flush (GDBusConnection     *connection,
+                         GCancellable        *cancellable,
+                         GAsyncReadyCallback  callback,
+                         gpointer             user_data)
+{
+  GSimpleAsyncResult *simple;
+
+  g_return_if_fail (G_IS_DBUS_CONNECTION (connection));
+
+  simple = g_simple_async_result_new (NULL,
+                                      callback,
+                                      user_data,
+                                      g_dbus_connection_flush);
+  g_simple_async_result_run_in_thread (simple,
+                                       flush_in_thread_func,
+                                       G_PRIORITY_DEFAULT,
+                                       cancellable);
+  g_object_unref (simple);
+}
+
+/**
+ * g_dbus_connection_flush_finish:
+ * @connection: A #GDBusConnection.
+ * @res: A #GAsyncResult obtained from the #GAsyncReadyCallback passed to g_dbus_connection_flush().
+ * @error: Return location for error or %NULL.
+ *
+ * Finishes an operation started with g_dbus_connection_flush().
+ *
+ * Returns: %TRUE if the operation succeeded, %FALSE if @error is set.
+ *
+ * Since: 2.26
+ */
+gboolean
+g_dbus_connection_flush_finish (GDBusConnection  *connection,
+                                GAsyncResult     *res,
+                                GError          **error)
+{
+  GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (res);
+  gboolean ret;
+
+  ret = FALSE;
+
+  g_return_val_if_fail (G_IS_DBUS_CONNECTION (connection), FALSE);
+  g_return_val_if_fail (G_IS_ASYNC_RESULT (res), FALSE);
+  g_return_val_if_fail (error == NULL || *error == NULL, FALSE);
+
+  g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_dbus_connection_flush);
+
+  if (g_simple_async_result_propagate_error (simple, error))
+    goto out;
+
+  ret = TRUE;
+
+ out:
+  return ret;
+}
+
+/**
+ * g_dbus_connection_flush_sync:
+ * @connection: A #GDBusConnection.
+ * @cancellable: A #GCancellable or %NULL.
+ * @error: Return location for error or %NULL.
+ *
+ * Synchronously flushes @connection. The calling thread is blocked
+ * until this is done. See g_dbus_connection_flush() for the
+ * asynchronous version of this method and more details about what it
+ * does.
+ *
+ * Returns: %TRUE if the operation succeeded, %FALSE if @error is set.
+ *
+ * Since: 2.26
+ */
+gboolean
+g_dbus_connection_flush_sync (GDBusConnection  *connection,
+                              GCancellable     *cancellable,
+                              GError          **error)
+{
+  gboolean ret;
+
+  g_return_val_if_fail (G_IS_DBUS_CONNECTION (connection), FALSE);
+
+  ret = FALSE;
+
+  if (connection->priv->closed)
+    {
+      g_set_error_literal (error,
+                           G_IO_ERROR,
+                           G_IO_ERROR_CLOSED,
+                           _("The connection is closed"));
+      goto out;
+    }
+
+  ret = _g_dbus_worker_flush_sync (connection->priv->worker,
+                                   cancellable,
+                                   error);
+
+ out:
+  return ret;
+}
 
 /* ---------------------------------------------------------------------------------------------------- */
 
@@ -955,7 +1100,14 @@ set_closed_unlocked (GDBusConnection *connection,
  *
  * Closes @connection. Note that this never causes the process to
  * exit (this might only happen if the other end of a shared message
- * bus connection disconnects).
+ * bus connection disconnects, see #GDBusConnection:exit-on-close).
+ *
+ * Once the stream is closed, all operations will return
+ * %G_IO_ERROR_CLOSED.
+ *
+ * Note that closing a connection will not automatically flush the
+ * connection so queued messages may be lost. Use
+ * g_dbus_connection_flush() if you need such guarantees.
  *
  * If @connection is already closed, this method does nothing.
  *
@@ -1091,8 +1243,7 @@ g_dbus_connection_send_message_unlocked (GDBusConnection   *connection,
  * submitting the message to the underlying transport.
  *
  * If @connection is closed then the operation will fail with
- * %G_IO_ERROR_CLOSED. If @cancellable is canceled, the operation will
- * fail with %G_IO_ERROR_CANCELLED. If @message is not well-formed,
+ * %G_IO_ERROR_CLOSED. If @message is not well-formed,
  * the operation fails with %G_IO_ERROR_INVALID_ARGUMENT.
  *
  * See <xref linkend="gdbus-server"/> and <xref
diff --git a/gio/gdbusconnection.h b/gio/gdbusconnection.h
index 36c03c6..f422ccc 100644
--- a/gio/gdbusconnection.h
+++ b/gio/gdbusconnection.h
@@ -139,6 +139,20 @@ gboolean         g_dbus_connection_get_exit_on_close          (GDBusConnection
 void             g_dbus_connection_set_exit_on_close          (GDBusConnection    *connection,
                                                                gboolean            exit_on_close);
 GDBusCapabilityFlags  g_dbus_connection_get_capabilities      (GDBusConnection    *connection);
+
+/* ---------------------------------------------------------------------------------------------------- */
+
+void             g_dbus_connection_flush                          (GDBusConnection     *connection,
+                                                                   GCancellable        *cancellable,
+                                                                   GAsyncReadyCallback  callback,
+                                                                   gpointer             user_data);
+gboolean         g_dbus_connection_flush_finish                   (GDBusConnection     *connection,
+                                                                   GAsyncResult        *res,
+                                                                   GError             **error);
+gboolean         g_dbus_connection_flush_sync                     (GDBusConnection     *connection,
+                                                                   GCancellable        *cancellable,
+                                                                   GError             **error);
+
 /* ---------------------------------------------------------------------------------------------------- */
 
 gboolean         g_dbus_connection_send_message                   (GDBusConnection     *connection,
diff --git a/gio/gdbusprivate.c b/gio/gdbusprivate.c
index bcc031e..819aa34 100644
--- a/gio/gdbusprivate.c
+++ b/gio/gdbusprivate.c
@@ -388,8 +388,17 @@ struct GDBusWorker
   GMutex                             *write_lock;
   GQueue                             *write_queue;
   gboolean                            write_is_pending;
+  guint64                             write_num_messages_written;
+  GList                              *write_pending_flushes;
 };
 
+typedef struct
+{
+  GMutex *mutex;
+  GCond *cond;
+  guint64 number_to_wait_for;
+} FlushData;
+
 struct _MessageToWriteData ;
 typedef struct _MessageToWriteData MessageToWriteData;
 
@@ -407,6 +416,8 @@ _g_dbus_worker_unref (GDBusWorker *worker)
 {
   if (g_atomic_int_dec_and_test (&worker->ref_count))
     {
+      g_assert (worker->write_pending_flushes == NULL);
+
       _g_dbus_shared_thread_unref ();
 
       g_object_unref (worker->stream);
@@ -815,6 +826,8 @@ write_message (GDBusWorker         *worker,
                GError             **error)
 {
   gboolean ret;
+  GList *l;
+  GList *ll;
 
   g_return_val_if_fail (data->blob_size > 16, FALSE);
 
@@ -908,6 +921,24 @@ write_message (GDBusWorker         *worker,
 
   ret = TRUE;
 
+  /* wake up pending flushes */
+  g_mutex_lock (worker->write_lock);
+  worker->write_num_messages_written += 1;
+  for (l = worker->write_pending_flushes; l != NULL; l = ll)
+    {
+      FlushData *f = l->data;
+      ll = l->next;
+
+      if (f->number_to_wait_for == worker->write_num_messages_written)
+        {
+          g_mutex_lock (f->mutex);
+          g_cond_signal (f->cond);
+          g_mutex_unlock (f->mutex);
+          worker->write_pending_flushes = g_list_delete_link (worker->write_pending_flushes, l);
+        }
+    }
+  g_mutex_unlock (worker->write_lock);
+
   if (G_UNLIKELY (_g_dbus_debug_message ()))
     {
       gchar *s;
@@ -1072,6 +1103,8 @@ _g_dbus_worker_new (GIOStream                              *stream,
   return worker;
 }
 
+/* ---------------------------------------------------------------------------------------------------- */
+
 /* This can be called from any thread - frees worker - guarantees no callbacks
  * will ever be issued again
  */
@@ -1092,6 +1125,54 @@ _g_dbus_worker_stop (GDBusWorker *worker)
   _g_dbus_worker_unref (worker);
 }
 
+/* ---------------------------------------------------------------------------------------------------- */
+
+/* can be called from any thread (except the worker thread) - blocks
+ * calling thread until all queued outgoing messages are written and
+ * the transport has been flushed
+ */
+gboolean
+_g_dbus_worker_flush_sync (GDBusWorker    *worker,
+                           GCancellable   *cancellable,
+                           GError        **error)
+{
+  gboolean ret;
+  FlushData *data;
+
+  data = NULL;
+
+  /* if the queue is empty, there's nothing to wait for */
+  g_mutex_lock (worker->write_lock);
+  if (g_queue_get_length (worker->write_queue) > 0)
+    {
+      data = g_new0 (FlushData, 1);
+      data->mutex = g_mutex_new ();
+      data->cond = g_cond_new ();
+      data->number_to_wait_for = worker->write_num_messages_written + g_queue_get_length (worker->write_queue);
+      g_mutex_lock (data->mutex);
+      worker->write_pending_flushes = g_list_prepend (worker->write_pending_flushes, data);
+    }
+  g_mutex_unlock (worker->write_lock);
+
+  if (data != NULL)
+    {
+      g_cond_wait (data->cond, data->mutex);
+      g_mutex_unlock (data->mutex);
+
+      /* note:the element is removed from worker->write_pending_flushes in write_message() */
+      g_cond_free (data->cond);
+      g_mutex_free (data->mutex);
+      g_free (data);
+    }
+
+  ret = g_output_stream_flush (g_io_stream_get_output_stream (worker->stream),
+                               cancellable,
+                               error);
+  return ret;
+}
+
+/* ---------------------------------------------------------------------------------------------------- */
+
 #define G_DBUS_DEBUG_AUTHENTICATION (1<<0)
 #define G_DBUS_DEBUG_MESSAGE        (1<<1)
 #define G_DBUS_DEBUG_PAYLOAD        (1<<2)
diff --git a/gio/gdbusprivate.h b/gio/gdbusprivate.h
index 91a78c4..a879fab 100644
--- a/gio/gdbusprivate.h
+++ b/gio/gdbusprivate.h
@@ -71,6 +71,11 @@ void         _g_dbus_worker_stop         (GDBusWorker    *worker);
 /* can be called from any thread */
 void         _g_dbus_worker_unfreeze     (GDBusWorker    *worker);
 
+/* can be called from any thread (except the worker thread) */
+gboolean     _g_dbus_worker_flush_sync   (GDBusWorker    *worker,
+                                          GCancellable   *cancellable,
+                                          GError        **error);
+
 /* ---------------------------------------------------------------------------------------------------- */
 
 void _g_dbus_initialize (void);
diff --git a/gio/gio.symbols b/gio/gio.symbols
index 8f43a54..abe8835 100644
--- a/gio/gio.symbols
+++ b/gio/gio.symbols
@@ -1541,6 +1541,9 @@ g_dbus_connection_get_unique_name
 g_dbus_connection_is_closed
 g_dbus_connection_set_exit_on_close
 g_dbus_connection_close
+g_dbus_connection_flush
+g_dbus_connection_flush_finish
+g_dbus_connection_flush_sync
 g_dbus_connection_emit_signal
 g_dbus_connection_call
 g_dbus_connection_call_finish
diff --git a/gio/tests/Makefile.am b/gio/tests/Makefile.am
index 8886f1b..b724fe2 100644
--- a/gio/tests/Makefile.am
+++ b/gio/tests/Makefile.am
@@ -81,6 +81,7 @@ SAMPLE_PROGS = 				\
 	gdbus-example-subtree		\
 	gdbus-example-peer		\
 	gdbus-example-proxy-subclass	\
+	gdbus-connection-flush-helper	\
 	testapp				\
 	appinfo-test			\
 	$(NULL)
@@ -264,6 +265,9 @@ gdbus_example_proxy_subclass_LDADD   = $(progs_ldadd)
 gdbus_example_export_SOURCES = gdbus-example-export.c
 gdbus_example_export_LDADD   = $(progs_ldadd)
 
+gdbus_connection_flush_helper_SOURCES = gdbus-connection-flush-helper.c
+gdbus_connection_flush_helper_LDADD = $(progs_ldadd)
+
 application_SOURCES = application.c gdbus-sessionbus.c gdbus-sessionbus.h
 application_LDADD   = $(progs_ldadd)
 
diff --git a/gio/tests/gdbus-connection-flush-helper.c b/gio/tests/gdbus-connection-flush-helper.c
new file mode 100644
index 0000000..5e36075
--- /dev/null
+++ b/gio/tests/gdbus-connection-flush-helper.c
@@ -0,0 +1,60 @@
+/* GLib testing framework examples and tests
+ *
+ * Copyright (C) 2008-2010 Red Hat, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General
+ * Public License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
+ * Boston, MA 02111-1307, USA.
+ *
+ * Author: David Zeuthen <davidz redhat com>
+ */
+
+#include <gio/gio.h>
+
+int
+main (int   argc,
+      char *argv[])
+{
+  GDBusConnection *c;
+  GError *error;
+  gboolean ret;
+
+  g_type_init ();
+
+  error = NULL;
+  c = g_bus_get_sync (G_BUS_TYPE_SESSION,
+                      NULL, /* GCancellable* */
+                      &error);
+  g_assert_no_error (error);
+
+  error = NULL;
+  g_dbus_connection_emit_signal (c,
+                                 NULL, /* const gchar *destination_bus_name */
+                                 "/org/gtk/GDBus/FlushObject",
+                                 "org.gtk.GDBus.FlushInterface",
+                                 "SomeSignal",
+                                 NULL, /* GVariant *parameters */
+                                 &error);
+  g_assert_no_error (error);
+
+  error = NULL;
+  ret = g_dbus_connection_flush_sync (c,
+                                      NULL, /* GCancellable* */
+                                      &error);
+  g_assert_no_error (error);
+  g_assert (ret);
+
+  /* and now exit immediately! */
+  return 0;
+}
diff --git a/gio/tests/gdbus-connection.c b/gio/tests/gdbus-connection.c
index 3c63cae..cd6aa0d 100644
--- a/gio/tests/gdbus-connection.c
+++ b/gio/tests/gdbus-connection.c
@@ -24,6 +24,9 @@
 #include <unistd.h>
 #include <string.h>
 
+#include <sys/types.h>
+#include <sys/wait.h>
+
 #include "gdbus-tests.h"
 
 /* all tests rely on a shared mainloop */
@@ -661,6 +664,84 @@ test_connection_filter (void)
 
 /* ---------------------------------------------------------------------------------------------------- */
 
+static void
+test_connection_flush_signal_handler (GDBusConnection  *connection,
+                                      const gchar      *sender_name,
+                                      const gchar      *object_path,
+                                      const gchar      *interface_name,
+                                      const gchar      *signal_name,
+                                      GVariant         *parameters,
+                                      gpointer         user_data)
+{
+  g_main_loop_quit (loop);
+}
+
+static gboolean
+test_connection_flush_on_timeout (gpointer user_data)
+{
+  guint iteration = GPOINTER_TO_UINT (user_data);
+  g_printerr ("Timeout waiting 1000 msec on iteration %d\n", iteration);
+  g_assert_not_reached ();
+  return FALSE;
+}
+
+static void
+test_connection_flush (void)
+{
+  GDBusConnection *connection;
+  GError *error;
+  guint n;
+  guint signal_handler_id;
+
+  session_bus_up ();
+
+  error = NULL;
+  connection = g_bus_get_sync (G_BUS_TYPE_SESSION, NULL, &error);
+  g_assert_no_error (error);
+  g_assert (connection != NULL);
+
+  signal_handler_id = g_dbus_connection_signal_subscribe (connection,
+                                                          NULL, /* sender */
+                                                          "org.gtk.GDBus.FlushInterface",
+                                                          "SomeSignal",
+                                                          "/org/gtk/GDBus/FlushObject",
+                                                          NULL,
+                                                          test_connection_flush_signal_handler,
+                                                          NULL,
+                                                          NULL);
+  g_assert_cmpint (signal_handler_id, !=, 0);
+
+  for (n = 0; n < 50; n++)
+    {
+      gboolean ret;
+      gint exit_status;
+      guint timeout_mainloop_id;
+
+      error = NULL;
+      ret = g_spawn_command_line_sync ("./gdbus-connection-flush-helper",
+                                       NULL, /* stdout */
+                                       NULL, /* stderr */
+                                       &exit_status,
+                                       &error);
+      g_assert_no_error (error);
+      g_assert (WIFEXITED (exit_status));
+      g_assert_cmpint (WEXITSTATUS (exit_status), ==, 0);
+      g_assert (ret);
+
+      timeout_mainloop_id = g_timeout_add (1000, test_connection_flush_on_timeout, GUINT_TO_POINTER (n));
+      g_main_loop_run (loop);
+      g_source_remove (timeout_mainloop_id);
+    }
+
+  g_dbus_connection_signal_unsubscribe (connection, signal_handler_id);
+  _g_object_wait_for_single_ref (connection);
+  g_object_unref (connection);
+
+  session_bus_down ();
+}
+
+/* ---------------------------------------------------------------------------------------------------- */
+
 int
 main (int   argc,
       char *argv[])
@@ -681,5 +762,6 @@ main (int   argc,
   g_test_add_func ("/gdbus/connection-send", test_connection_send);
   g_test_add_func ("/gdbus/connection-signals", test_connection_signals);
   g_test_add_func ("/gdbus/connection-filter", test_connection_filter);
+  g_test_add_func ("/gdbus/connection-flush", test_connection_flush);
   return g_test_run();
 }



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]