[glib] Bug 623142 – Ensure ::new-connection runs before processing D-Bus messages



commit 038d03cd08bdb42e6f83f6041ec01732476e900b
Author: David Zeuthen <davidz redhat com>
Date:   Wed Jun 30 11:43:42 2010 -0400

    Bug 623142 â?? Ensure ::new-connection runs before processing D-Bus messages
    
    Without this guarantee, peer-to-peer connections are not very
    useful. However, with this guarantee it's possible to export objects
    in a handler for the GDBusServer::new-connection signal.
    
    There are two caveats with this patch
    
     - it won't work on message bus connections
     - we don't queue up messages to be written
    
    that can be addresses later if needed.
    
    https://bugzilla.gnome.org/show_bug.cgi?id=623142
    
    Signed-off-by: David Zeuthen <davidz redhat com>

 docs/reference/gio/gio-sections.txt |    1 +
 gio/gdbusconnection.c               |   30 ++++++-
 gio/gdbusconnection.h               |    1 +
 gio/gdbusprivate.c                  |   84 +++++++++++++++-
 gio/gdbusprivate.h                  |    4 +
 gio/gdbusserver.c                   |   11 ++-
 gio/gio.symbols                     |    1 +
 gio/gioenums.h                      |    5 +-
 gio/tests/gdbus-peer.c              |  187 +++++++++++++++++++++++++++++++++++
 9 files changed, 316 insertions(+), 8 deletions(-)
---
diff --git a/docs/reference/gio/gio-sections.txt b/docs/reference/gio/gio-sections.txt
index b4f3dd2..420111b 100644
--- a/docs/reference/gio/gio-sections.txt
+++ b/docs/reference/gio/gio-sections.txt
@@ -2390,6 +2390,7 @@ g_dbus_connection_new_sync
 g_dbus_connection_new_for_address
 g_dbus_connection_new_for_address_finish
 g_dbus_connection_new_for_address_sync
+g_dbus_connection_start_message_processing
 GDBusCapabilityFlags
 g_dbus_connection_close
 g_dbus_connection_is_closed
diff --git a/gio/gdbusconnection.c b/gio/gdbusconnection.c
index 98f705e..189541b 100644
--- a/gio/gdbusconnection.c
+++ b/gio/gdbusconnection.c
@@ -830,6 +830,23 @@ g_dbus_connection_get_stream (GDBusConnection *connection)
   return connection->priv->stream;
 }
 
+/**
+ * g_dbus_connection_start_message_processing:
+ * @connection: A #GDBusConnection.
+ *
+ * If @connection was created with
+ * %G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING, this method
+ * starts processing messages. Does nothing on if @connection wasn't
+ * created with this flag or if the method has already been called.
+ *
+ * Since: 2.26
+ */
+void
+g_dbus_connection_start_message_processing (GDBusConnection *connection)
+{
+  g_return_if_fail (G_IS_DBUS_CONNECTION (connection));
+  _g_dbus_worker_unfreeze (connection->priv->worker);
+}
 
 /**
  * g_dbus_connection_is_closed:
@@ -1877,16 +1894,27 @@ initable_init (GInitable     *initable,
 
   connection->priv->worker = _g_dbus_worker_new (connection->priv->stream,
                                                  connection->priv->capabilities,
+                                                 (connection->priv->flags & G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING),
                                                  on_worker_message_received,
                                                  on_worker_message_about_to_be_sent,
                                                  on_worker_closed,
                                                  connection);
 
-  /* if a bus connection, invoke org.freedesktop.DBus.Hello - this is how we're getting a name */
+  /* if a bus connection, call org.freedesktop.DBus.Hello - this is how we're getting a name */
   if (connection->priv->flags & G_DBUS_CONNECTION_FLAGS_MESSAGE_BUS_CONNECTION)
     {
       GVariant *hello_result;
 
+      /* we could lift this restriction by adding code in gdbusprivate.c */
+      if (connection->priv->flags & G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING)
+        {
+          g_set_error_literal (&connection->priv->initialization_error,
+                               G_IO_ERROR,
+                               G_IO_ERROR_FAILED,
+                               "Cannot use DELAY_MESSAGE_PROCESSING with MESSAGE_BUS_CONNECTION");
+          goto out;
+        }
+
       hello_result = g_dbus_connection_call_sync (connection,
                                                   "org.freedesktop.DBus", /* name */
                                                   "/org/freedesktop/DBus", /* path */
diff --git a/gio/gdbusconnection.h b/gio/gdbusconnection.h
index d80daf2..4f85159 100644
--- a/gio/gdbusconnection.h
+++ b/gio/gdbusconnection.h
@@ -128,6 +128,7 @@ GDBusConnection *g_dbus_connection_new_for_address_sync       (const gchar
 
 /* ---------------------------------------------------------------------------------------------------- */
 
+void             g_dbus_connection_start_message_processing   (GDBusConnection    *connection);
 gboolean         g_dbus_connection_is_closed                  (GDBusConnection    *connection);
 void             g_dbus_connection_close                      (GDBusConnection    *connection);
 GIOStream       *g_dbus_connection_get_stream                 (GDBusConnection    *connection);
diff --git a/gio/gdbusprivate.c b/gio/gdbusprivate.c
index 3a48d1d..5208ad9 100644
--- a/gio/gdbusprivate.c
+++ b/gio/gdbusprivate.c
@@ -350,7 +350,16 @@ _g_dbus_shared_thread_unref (void)
 struct GDBusWorker
 {
   volatile gint                       ref_count;
+
   gboolean                            stopped;
+
+  /* TODO: frozen (e.g. G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING) currently
+   * only affects messages received from the other peer (since GDBusServer is the
+   * only user) - we might want it to affect messages sent to the other peer too?
+   */
+  gboolean                            frozen;
+  GQueue                             *received_messages_while_frozen;
+
   GIOStream                          *stream;
   GDBusCapabilityFlags                capabilities;
   GCancellable                       *cancellable;
@@ -406,11 +415,13 @@ _g_dbus_worker_unref (GDBusWorker *worker)
       if (worker->read_fd_list != NULL)
         g_object_unref (worker->read_fd_list);
 
+      g_queue_foreach (worker->received_messages_while_frozen, (GFunc) g_object_unref, NULL);
+      g_queue_free (worker->received_messages_while_frozen);
+
       g_mutex_free (worker->write_lock);
-      g_queue_foreach (worker->write_queue,
-                       (GFunc) message_to_write_data_free,
-                       NULL);
+      g_queue_foreach (worker->write_queue, (GFunc) message_to_write_data_free, NULL);
       g_queue_free (worker->write_queue);
+
       g_free (worker);
     }
 }
@@ -443,6 +454,66 @@ _g_dbus_worker_emit_message_about_to_be_sent (GDBusWorker  *worker,
   return ret;
 }
 
+/* can only be called from private thread with read-lock held - takes ownership of @message */
+static void
+_g_dbus_worker_queue_or_deliver_received_message (GDBusWorker  *worker,
+                                                  GDBusMessage *message)
+{
+  if (worker->frozen || g_queue_get_length (worker->received_messages_while_frozen) > 0)
+    {
+      /* queue up */
+      g_queue_push_tail (worker->received_messages_while_frozen, message);
+    }
+  else
+    {
+      /* not frozen, nor anything in queue */
+      _g_dbus_worker_emit_message_received (worker, message);
+      g_object_unref (message);
+    }
+}
+
+/* called in private thread shared by all GDBusConnection instances (without read-lock held) */
+static gboolean
+unfreeze_in_idle_cb (gpointer user_data)
+{
+  GDBusWorker *worker = user_data;
+  GDBusMessage *message;
+
+  g_mutex_lock (worker->read_lock);
+  if (worker->frozen)
+    {
+      while ((message = g_queue_pop_head (worker->received_messages_while_frozen)) != NULL)
+        {
+          _g_dbus_worker_emit_message_received (worker, message);
+          g_object_unref (message);
+        }
+      worker->frozen = FALSE;
+    }
+  else
+    {
+      g_assert (g_queue_get_length (worker->received_messages_while_frozen) == 0);
+    }
+  g_mutex_unlock (worker->read_lock);
+  return FALSE;
+}
+
+/* can be called from any thread */
+void
+_g_dbus_worker_unfreeze (GDBusWorker *worker)
+{
+  GSource *idle_source;
+  idle_source = g_idle_source_new ();
+  g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
+  g_source_set_callback (idle_source,
+                         unfreeze_in_idle_cb,
+                         _g_dbus_worker_ref (worker),
+                         (GDestroyNotify) _g_dbus_worker_unref);
+  g_source_attach (idle_source, shared_thread_data->context);
+  g_source_unref (idle_source);
+}
+
+/* ---------------------------------------------------------------------------------------------------- */
+
 static void _g_dbus_worker_do_read_unlocked (GDBusWorker *worker);
 
 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
@@ -639,8 +710,7 @@ _g_dbus_worker_do_read_cb (GInputStream  *input_stream,
             }
 
           /* yay, got a message, go deliver it */
-          _g_dbus_worker_emit_message_received (worker, message);
-          g_object_unref (message);
+          _g_dbus_worker_queue_or_deliver_received_message (worker, message);
 
           /* start reading another message! */
           worker->read_buffer_bytes_wanted = 0;
@@ -952,6 +1022,7 @@ _g_dbus_worker_thread_begin_func (gpointer user_data)
 GDBusWorker *
 _g_dbus_worker_new (GIOStream                              *stream,
                     GDBusCapabilityFlags                    capabilities,
+                    gboolean                                initially_frozen,
                     GDBusWorkerMessageReceivedCallback      message_received_callback,
                     GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback,
                     GDBusWorkerDisconnectedCallback         disconnected_callback,
@@ -976,6 +1047,9 @@ _g_dbus_worker_new (GIOStream                              *stream,
   worker->capabilities = capabilities;
   worker->cancellable = g_cancellable_new ();
 
+  worker->frozen = initially_frozen;
+  worker->received_messages_while_frozen = g_queue_new ();
+
   worker->write_lock = g_mutex_new ();
   worker->write_queue = g_queue_new ();
 
diff --git a/gio/gdbusprivate.h b/gio/gdbusprivate.h
index 0d9cb61..766bb98 100644
--- a/gio/gdbusprivate.h
+++ b/gio/gdbusprivate.h
@@ -53,6 +53,7 @@ typedef void (*GDBusWorkerDisconnectedCallback)    (GDBusWorker   *worker,
  */
 GDBusWorker *_g_dbus_worker_new          (GIOStream                          *stream,
                                           GDBusCapabilityFlags                capabilities,
+                                          gboolean                            initially_frozen,
                                           GDBusWorkerMessageReceivedCallback  message_received_callback,
                                           GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback,
                                           GDBusWorkerDisconnectedCallback     disconnected_callback,
@@ -67,6 +68,9 @@ void         _g_dbus_worker_send_message (GDBusWorker    *worker,
 /* can be called from any thread */
 void         _g_dbus_worker_stop         (GDBusWorker    *worker);
 
+/* can be called from any thread */
+void         _g_dbus_worker_unfreeze     (GDBusWorker    *worker);
+
 /* ---------------------------------------------------------------------------------------------------- */
 
 void _g_dbus_initialize (void);
diff --git a/gio/gdbusserver.c b/gio/gdbusserver.c
index 85adccc..5195ba0 100644
--- a/gio/gdbusserver.c
+++ b/gio/gdbusserver.c
@@ -367,6 +367,11 @@ g_dbus_server_class_init (GDBusServerClass *klass)
    * linkend="g-main-context-push-thread-default">thread-default main
    * loop</link> of the thread that @server was constructed in.
    *
+   * You are guaranteed that signal handlers for this signal runs
+   * before incoming messages on @connection are processed. This means
+   * that it's suitable to call g_dbus_connection_register_object() or
+   * similar from the signal handler.
+   *
    * Since: 2.26
    */
   _signals[NEW_CONNECTION_SIGNAL] = g_signal_new ("new-connection",
@@ -889,6 +894,7 @@ emit_new_connection_in_idle (gpointer user_data)
                  _signals[NEW_CONNECTION_SIGNAL],
                  0,
                  data->connection);
+  g_dbus_connection_start_message_processing (data->connection);
   g_object_unref (data->connection);
 
   return FALSE;
@@ -925,7 +931,9 @@ on_run (GSocketService    *service,
         goto out;
     }
 
-  connection_flags = G_DBUS_CONNECTION_FLAGS_AUTHENTICATION_SERVER;
+  connection_flags =
+    G_DBUS_CONNECTION_FLAGS_AUTHENTICATION_SERVER |
+    G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING;
   if (server->priv->flags & G_DBUS_SERVER_FLAGS_AUTHENTICATION_ALLOW_ANONYMOUS)
     connection_flags |= G_DBUS_CONNECTION_FLAGS_AUTHENTICATION_ALLOW_ANONYMOUS;
 
@@ -944,6 +952,7 @@ on_run (GSocketService    *service,
                      _signals[NEW_CONNECTION_SIGNAL],
                      0,
                      connection);
+      g_dbus_connection_start_message_processing (connection);
       g_object_unref (connection);
     }
   else
diff --git a/gio/gio.symbols b/gio/gio.symbols
index ebc3d15..0dd8b79 100644
--- a/gio/gio.symbols
+++ b/gio/gio.symbols
@@ -1529,6 +1529,7 @@ g_dbus_connection_new_for_address
 g_dbus_connection_new_for_address_finish
 g_dbus_connection_new_for_address_sync
 g_dbus_connection_new_sync
+g_dbus_connection_start_message_processing
 g_dbus_connection_get_capabilities
 g_dbus_connection_get_exit_on_close
 g_dbus_connection_get_guid
diff --git a/gio/gioenums.h b/gio/gioenums.h
index 9d1e126..feb5d03 100644
--- a/gio/gioenums.h
+++ b/gio/gioenums.h
@@ -969,6 +969,8 @@ typedef enum
  * method.
  * @G_DBUS_CONNECTION_FLAGS_MESSAGE_BUS_CONNECTION: Pass this flag if connecting to a peer that is a
  * message bus. This means that the Hello() method will be invoked as part of the connection setup.
+ * @G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING: If set, processing of D-Bus messages is
+ * delayed until g_dbus_connection_start_message_processing() is called.
  *
  * Flags used when creating a new #GDBusConnection.
  *
@@ -979,7 +981,8 @@ typedef enum {
   G_DBUS_CONNECTION_FLAGS_AUTHENTICATION_CLIENT = (1<<0),
   G_DBUS_CONNECTION_FLAGS_AUTHENTICATION_SERVER = (1<<1),
   G_DBUS_CONNECTION_FLAGS_AUTHENTICATION_ALLOW_ANONYMOUS = (1<<2),
-  G_DBUS_CONNECTION_FLAGS_MESSAGE_BUS_CONNECTION = (1<<3)
+  G_DBUS_CONNECTION_FLAGS_MESSAGE_BUS_CONNECTION = (1<<3),
+  G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING = (1<<4)
 } GDBusConnectionFlags;
 
 /**
diff --git a/gio/tests/gdbus-peer.c b/gio/tests/gdbus-peer.c
index 7d81372..71bad62 100644
--- a/gio/tests/gdbus-peer.c
+++ b/gio/tests/gdbus-peer.c
@@ -732,6 +732,192 @@ test_peer (void)
 
 /* ---------------------------------------------------------------------------------------------------- */
 
+typedef struct
+{
+  GDBusServer *server;
+  GMainContext *context;
+  GMainLoop *loop;
+
+  GList *connections;
+} DmpData;
+
+static void
+dmp_data_free (DmpData *data)
+{
+  g_main_loop_unref (data->loop);
+  g_main_context_unref (data->context);
+  g_object_unref (data->server);
+  g_list_foreach (data->connections, (GFunc) g_object_unref, NULL);
+  g_list_free (data->connections);
+  g_free (data);
+}
+
+static void
+dmp_on_method_call (GDBusConnection       *connection,
+                    const gchar           *sender,
+                    const gchar           *object_path,
+                    const gchar           *interface_name,
+                    const gchar           *method_name,
+                    GVariant              *parameters,
+                    GDBusMethodInvocation *invocation,
+                    gpointer               user_data)
+{
+  //DmpData *data = user_data;
+  gint32 first;
+  gint32 second;
+  g_variant_get (parameters,
+                 "(ii)",
+                 &first,
+                 &second);
+  g_dbus_method_invocation_return_value (invocation,
+                                         g_variant_new ("(i)", first + second));
+}
+
+static const GDBusInterfaceVTable dmp_interface_vtable =
+{
+  dmp_on_method_call,
+  NULL,  /* get_property */
+  NULL   /* set_property */
+};
+
+
+/* Runs in thread we created GDBusServer in (since we didn't pass G_DBUS_SERVER_FLAGS_RUN_IN_THREAD) */
+static void
+dmp_on_new_connection (GDBusServer     *server,
+                       GDBusConnection *connection,
+                       gpointer         user_data)
+{
+  DmpData *data = user_data;
+  GDBusNodeInfo *node;
+  GError *error;
+
+  /* accept the connection */
+  data->connections = g_list_prepend (data->connections, g_object_ref (connection));
+
+  error = NULL;
+  node = g_dbus_node_info_new_for_xml ("<node>"
+                                       "  <interface name='org.gtk.GDBus.DmpInterface'>"
+                                       "    <method name='AddPair'>"
+                                       "      <arg type='i' name='first' direction='in'/>"
+                                       "      <arg type='i' name='second' direction='in'/>"
+                                       "      <arg type='i' name='sum' direction='out'/>"
+                                       "    </method>"
+                                       "  </interface>"
+                                       "</node>",
+                                       &error);
+  g_assert_no_error (error);
+
+  /* sleep 100ms before exporting an object - this is to test that
+   * G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING really works
+   * (GDBusServer uses this feature).
+   */
+  usleep (100 * 1000);
+
+  /* export an object */
+  error = NULL;
+  g_dbus_connection_register_object (connection,
+                                     "/dmp/test",
+                                     node->interfaces[0],
+                                     &dmp_interface_vtable,
+                                     data,
+                                     NULL,
+                                     &error);
+  //g_dbus_node_info_unref (node);
+}
+
+static gpointer
+dmp_thread_func (gpointer user_data)
+{
+  DmpData *data = user_data;
+  GError *error;
+  gchar *guid;
+
+  data->context = g_main_context_new ();
+  g_main_context_push_thread_default (data->context);
+
+  error = NULL;
+  guid = g_dbus_generate_guid ();
+  data->server = g_dbus_server_new_sync ("nonce-tcp:",
+                                         G_DBUS_SERVER_FLAGS_NONE,
+                                         guid,
+                                         NULL, /* GDBusAuthObserver */
+                                         NULL, /* GCancellable */
+                                         &error);
+  g_assert_no_error (error);
+  g_signal_connect (data->server,
+                    "new-connection",
+                    G_CALLBACK (dmp_on_new_connection),
+                    data);
+
+  g_dbus_server_start (data->server);
+
+  data->loop = g_main_loop_new (data->context, FALSE);
+  g_main_loop_run (data->loop);
+
+  g_main_context_pop_thread_default (data->context);
+
+  g_free (guid);
+  return NULL;
+}
+
+static void
+delayed_message_processing (void)
+{
+  GError *error;
+  DmpData *data;
+  GThread *service_thread;
+  guint n;
+
+  data = g_new0 (DmpData, 1);
+
+  error = NULL;
+  service_thread = g_thread_create (dmp_thread_func,
+                                    data,
+                                    TRUE,
+                                    &error);
+  while (data->server == NULL || !g_dbus_server_is_active (data->server))
+    g_thread_yield ();
+
+  for (n = 0; n < 5; n++)
+    {
+      GDBusConnection *c;
+      GVariant *res;
+      gint32 val;
+
+      error = NULL;
+      c = g_dbus_connection_new_for_address_sync (g_dbus_server_get_client_address (data->server),
+                                                  G_DBUS_CONNECTION_FLAGS_AUTHENTICATION_CLIENT,
+                                                  NULL, /* GDBusAuthObserver */
+                                                  NULL, /* GCancellable */
+                                                  &error);
+      g_assert_no_error (error);
+
+      error = NULL;
+      res = g_dbus_connection_call_sync (c,
+                                         NULL,    /* bus name */
+                                         "/dmp/test",
+                                         "org.gtk.GDBus.DmpInterface",
+                                         "AddPair",
+                                         g_variant_new ("(ii)", 2, n),
+                                         G_VARIANT_TYPE ("(i)"),
+                                         G_DBUS_CALL_FLAGS_NONE,
+                                         -1, /* timeout_msec */
+                                         NULL, /* GCancellable */
+                                         &error);
+      g_assert_no_error (error);
+      g_variant_get (res, "(i)", &val);
+      g_assert_cmpint (val, ==, 2 + n);
+      g_variant_unref (res);
+      g_object_unref (c);
+  }
+
+  g_main_loop_quit (data->loop);
+  g_thread_join (service_thread);
+  dmp_data_free (data);
+}
+
+/* ---------------------------------------------------------------------------------------------------- */
+
 int
 main (int   argc,
       char *argv[])
@@ -753,6 +939,7 @@ main (int   argc,
   loop = g_main_loop_new (NULL, FALSE);
 
   g_test_add_func ("/gdbus/peer-to-peer", test_peer);
+  g_test_add_func ("/gdbus/delayed-message-processing", delayed_message_processing);
 
   ret = g_test_run();
 



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]