[glib] Bug 621945 – Filter outgoing messages in GDBusConnection



commit 45411ccbe3c9d1b08332942d1e7b594330688126
Author: David Zeuthen <davidz redhat com>
Date:   Mon Jun 21 16:08:53 2010 -0400

    Bug 621945 â?? Filter outgoing messages in GDBusConnection
    
    This patch breaks some rarely-used public API (only known user is
    dconf).
    
    This patch is based on work from Peng Huang <shawn p huang gmail com>.
    
    See https://bugzilla.gnome.org/show_bug.cgi?id=621945
    
    Signed-off-by: David Zeuthen <davidz redhat com>

 gio/gdbusconnection.c        |   66 +++++++++++++++++++++++++++++++++++++---
 gio/gdbusconnection.h        |    3 ++
 gio/gdbusprivate.c           |   69 ++++++++++++++++++++++++++++-------------
 gio/gdbusprivate.h           |    5 +++
 gio/tests/gdbus-connection.c |   16 ++++++++--
 5 files changed, 129 insertions(+), 30 deletions(-)
---
diff --git a/gio/gdbusconnection.c b/gio/gdbusconnection.c
index a41ee85..d2ab2d4 100644
--- a/gio/gdbusconnection.c
+++ b/gio/gdbusconnection.c
@@ -1624,6 +1624,7 @@ on_worker_message_received (GDBusWorker  *worker,
     {
       consumed_by_filter = filters[n].func (connection,
                                             message,
+                                            TRUE,
                                             filters[n].user_data);
       if (consumed_by_filter)
         break;
@@ -1673,6 +1674,52 @@ on_worker_message_received (GDBusWorker  *worker,
   g_free (filters);
 }
 
+/* Called in worker's thread */
+static gboolean
+on_worker_message_about_to_be_sent (GDBusWorker  *worker,
+                                    GDBusMessage *message,
+                                    gpointer      user_data)
+{
+  GDBusConnection *connection = G_DBUS_CONNECTION (user_data);
+  FilterCallback *filters;
+  gboolean consumed_by_filter;
+  guint num_filters;
+  guint n;
+
+  //g_debug ("in on_worker_message_about_to_be_sent");
+
+  g_object_ref (connection);
+
+  /* First collect the set of callback functions */
+  CONNECTION_LOCK (connection);
+  num_filters = connection->priv->filters->len;
+  filters = g_new0 (FilterCallback, num_filters);
+  for (n = 0; n < num_filters; n++)
+    {
+      FilterData *data = connection->priv->filters->pdata[n];
+      filters[n].func = data->filter_function;
+      filters[n].user_data = data->user_data;
+    }
+  CONNECTION_UNLOCK (connection);
+
+  /* the call the filters in order (without holding the lock) */
+  consumed_by_filter = FALSE;
+  for (n = 0; n < num_filters; n++)
+    {
+      consumed_by_filter = filters[n].func (connection,
+                                            message,
+                                            FALSE,
+                                            filters[n].user_data);
+      if (consumed_by_filter)
+        break;
+    }
+
+  g_object_unref (connection);
+  g_free (filters);
+
+  return consumed_by_filter;
+}
+
 /* Called in worker's thread - we must not block */
 static void
 on_worker_closed (GDBusWorker *worker,
@@ -1831,6 +1878,7 @@ initable_init (GInitable     *initable,
   connection->priv->worker = _g_dbus_worker_new (connection->priv->stream,
                                                  connection->priv->capabilities,
                                                  on_worker_message_received,
+                                                 on_worker_message_about_to_be_sent,
                                                  on_worker_closed,
                                                  connection);
 
@@ -2266,11 +2314,11 @@ static guint _global_filter_id = 1;
  * is removed or %NULL.
  *
  * Adds a message filter. Filters are handlers that are run on all
- * incoming messages, prior to standard dispatch. Filters are run in
- * the order that they were added.  The same handler can be added as a
- * filter more than once, in which case it will be run more than once.
- * Filters added during a filter callback won't be run on the message
- * being processed.
+ * incoming and outgoing messages, prior to standard dispatch. Filters
+ * are run in the order that they were added.  The same handler can be
+ * added as a filter more than once, in which case it will be run more
+ * than once.  Filters added during a filter callback won't be run on
+ * the message being processed.
  *
  * Note that filters are run in a dedicated message handling thread so
  * they can't block and, generally, can't do anything but signal a
@@ -2279,6 +2327,14 @@ static guint _global_filter_id = 1;
  * g_dbus_connection_signal_subscribe() or
  * g_dbus_connection_call() instead.
  *
+ * If a filter consumes an incoming message (by returning %TRUE), the
+ * message is not dispatched anywhere else - not even the standard
+ * dispatch machinery (that API such as
+ * g_dbus_connection_signal_subscribe() and
+ * g_dbus_connection_send_message_with_reply() relies on) will see the
+ * message. Similary, if a filter consumes an outgoing message, the
+ * message will not be sent to the other peer.
+ *
  * Returns: A filter identifier that can be used with
  * g_dbus_connection_remove_filter().
  *
diff --git a/gio/gdbusconnection.h b/gio/gdbusconnection.h
index b203196..7390dc5 100644
--- a/gio/gdbusconnection.h
+++ b/gio/gdbusconnection.h
@@ -469,6 +469,8 @@ void             g_dbus_connection_signal_unsubscribe         (GDBusConnection
  * GDBusMessageFilterFunction:
  * @connection: A #GDBusConnection.
  * @message: A #GDBusMessage.
+ * @incoming: %TRUE if it is a message received from the other peer, %FALSE if it is
+ * a message to be sent to the other peer.
  * @user_data: User data passed when adding the filter.
  *
  * Signature for function used in g_dbus_connection_add_filter().
@@ -480,6 +482,7 @@ void             g_dbus_connection_signal_unsubscribe         (GDBusConnection
  */
 typedef gboolean (*GDBusMessageFilterFunction) (GDBusConnection *connection,
                                                 GDBusMessage    *message,
+                                                gboolean         incoming,
                                                 gpointer         user_data);
 
 guint g_dbus_connection_add_filter (GDBusConnection            *connection,
diff --git a/gio/gdbusprivate.c b/gio/gdbusprivate.c
index 76809ee..3a48d1d 100644
--- a/gio/gdbusprivate.c
+++ b/gio/gdbusprivate.c
@@ -355,6 +355,7 @@ struct GDBusWorker
   GDBusCapabilityFlags                capabilities;
   GCancellable                       *cancellable;
   GDBusWorkerMessageReceivedCallback  message_received_callback;
+  GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback;
   GDBusWorkerDisconnectedCallback     disconnected_callback;
   gpointer                            user_data;
 
@@ -424,13 +425,24 @@ _g_dbus_worker_emit_disconnected (GDBusWorker  *worker,
 }
 
 static void
-_g_dbus_worker_emit_message (GDBusWorker  *worker,
-                             GDBusMessage *message)
+_g_dbus_worker_emit_message_received (GDBusWorker  *worker,
+                                      GDBusMessage *message)
 {
   if (!worker->stopped)
     worker->message_received_callback (worker, message, worker->user_data);
 }
 
+static gboolean
+_g_dbus_worker_emit_message_about_to_be_sent (GDBusWorker  *worker,
+                                              GDBusMessage *message)
+{
+  gboolean ret;
+  ret = FALSE;
+  if (!worker->stopped)
+    ret = worker->message_about_to_be_sent_callback (worker, message, worker->user_data);
+  return ret;
+}
+
 static void _g_dbus_worker_do_read_unlocked (GDBusWorker *worker);
 
 /* called in private thread shared by all GDBusConnection instances (without read-lock held) */
@@ -627,7 +639,7 @@ _g_dbus_worker_do_read_cb (GInputStream  *input_stream,
             }
 
           /* yay, got a message, go deliver it */
-          _g_dbus_worker_emit_message (worker, message);
+          _g_dbus_worker_emit_message_received (worker, message);
           g_object_unref (message);
 
           /* start reading another message! */
@@ -720,7 +732,7 @@ message_to_write_data_free (MessageToWriteData *data)
 
 /* ---------------------------------------------------------------------------------------------------- */
 
-/* called in private thread shared by all GDBusConnection instances (with write-lock held) */
+/* called in private thread shared by all GDBusConnection instances (without write-lock held) */
 static gboolean
 write_message (GDBusWorker         *worker,
                MessageToWriteData  *data,
@@ -848,29 +860,39 @@ write_message_in_idle_cb (gpointer user_data)
   GDBusWorker *worker = user_data;
   gboolean more_writes_are_pending;
   MessageToWriteData *data;
+  gboolean message_was_dropped;
   GError *error;
 
   g_mutex_lock (worker->write_lock);
-
   data = g_queue_pop_head (worker->write_queue);
   g_assert (data != NULL);
+  more_writes_are_pending = (g_queue_get_length (worker->write_queue) > 0);
+  worker->write_is_pending = more_writes_are_pending;
+  g_mutex_unlock (worker->write_lock);
 
-  error = NULL;
-  if (!write_message (worker,
-                      data,
-                      &error))
+  /* Note that write_lock is only used for protecting the @write_queue
+   * and @write_is_pending fields of the GDBusWorker struct ... which we
+   * need to modify from arbitrary threads in _g_dbus_worker_send_message().
+   *
+   * Therefore, it's fine to drop it here when calling back into user
+   * code and then writing the message out onto the GIOStream since this
+   * function only runs on the worker thread.
+   */
+  message_was_dropped = _g_dbus_worker_emit_message_about_to_be_sent (worker, data->message);
+  if (G_LIKELY (!message_was_dropped))
     {
-      /* TODO: handle */
-      _g_dbus_worker_emit_disconnected (worker, TRUE, error);
-      g_error_free (error);
+      error = NULL;
+      if (!write_message (worker,
+                          data,
+                          &error))
+        {
+          /* TODO: handle */
+          _g_dbus_worker_emit_disconnected (worker, TRUE, error);
+          g_error_free (error);
+        }
     }
   message_to_write_data_free (data);
 
-  more_writes_are_pending = (g_queue_get_length (worker->write_queue) > 0);
-
-  worker->write_is_pending = more_writes_are_pending;
-  g_mutex_unlock (worker->write_lock);
-
   return more_writes_are_pending;
 }
 
@@ -928,16 +950,18 @@ _g_dbus_worker_thread_begin_func (gpointer user_data)
 }
 
 GDBusWorker *
-_g_dbus_worker_new (GIOStream                          *stream,
-                    GDBusCapabilityFlags                capabilities,
-                    GDBusWorkerMessageReceivedCallback  message_received_callback,
-                    GDBusWorkerDisconnectedCallback     disconnected_callback,
-                    gpointer                            user_data)
+_g_dbus_worker_new (GIOStream                              *stream,
+                    GDBusCapabilityFlags                    capabilities,
+                    GDBusWorkerMessageReceivedCallback      message_received_callback,
+                    GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback,
+                    GDBusWorkerDisconnectedCallback         disconnected_callback,
+                    gpointer                                user_data)
 {
   GDBusWorker *worker;
 
   g_return_val_if_fail (G_IS_IO_STREAM (stream), NULL);
   g_return_val_if_fail (message_received_callback != NULL, NULL);
+  g_return_val_if_fail (message_about_to_be_sent_callback != NULL, NULL);
   g_return_val_if_fail (disconnected_callback != NULL, NULL);
 
   worker = g_new0 (GDBusWorker, 1);
@@ -945,6 +969,7 @@ _g_dbus_worker_new (GIOStream                          *stream,
 
   worker->read_lock = g_mutex_new ();
   worker->message_received_callback = message_received_callback;
+  worker->message_about_to_be_sent_callback = message_about_to_be_sent_callback;
   worker->disconnected_callback = disconnected_callback;
   worker->user_data = user_data;
   worker->stream = g_object_ref (stream);
diff --git a/gio/gdbusprivate.h b/gio/gdbusprivate.h
index 337c278..0d9cb61 100644
--- a/gio/gdbusprivate.h
+++ b/gio/gdbusprivate.h
@@ -39,6 +39,10 @@ typedef void (*GDBusWorkerMessageReceivedCallback) (GDBusWorker   *worker,
                                                     GDBusMessage  *message,
                                                     gpointer       user_data);
 
+typedef gboolean (*GDBusWorkerMessageAboutToBeSentCallback) (GDBusWorker   *worker,
+                                                             GDBusMessage  *message,
+                                                             gpointer       user_data);
+
 typedef void (*GDBusWorkerDisconnectedCallback)    (GDBusWorker   *worker,
                                                     gboolean       remote_peer_vanished,
                                                     GError        *error,
@@ -50,6 +54,7 @@ typedef void (*GDBusWorkerDisconnectedCallback)    (GDBusWorker   *worker,
 GDBusWorker *_g_dbus_worker_new          (GIOStream                          *stream,
                                           GDBusCapabilityFlags                capabilities,
                                           GDBusWorkerMessageReceivedCallback  message_received_callback,
+                                          GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback,
                                           GDBusWorkerDisconnectedCallback     disconnected_callback,
                                           gpointer                            user_data);
 
diff --git a/gio/tests/gdbus-connection.c b/gio/tests/gdbus-connection.c
index 15fca70..01113ee 100644
--- a/gio/tests/gdbus-connection.c
+++ b/gio/tests/gdbus-connection.c
@@ -555,20 +555,29 @@ test_connection_signals (void)
 typedef struct
 {
   guint num_handled;
+  guint num_outgoing;
   guint32 serial;
 } FilterData;
 
 static gboolean
 filter_func (GDBusConnection *connection,
              GDBusMessage    *message,
+             gboolean         incoming,
              gpointer         user_data)
 {
   FilterData *data = user_data;
   guint32 reply_serial;
 
-  reply_serial = g_dbus_message_get_reply_serial (message);
-  if (reply_serial == data->serial)
-    data->num_handled += 1;
+  if (incoming)
+    {
+      reply_serial = g_dbus_message_get_reply_serial (message);
+      if (reply_serial == data->serial)
+        data->num_handled += 1;
+    }
+  else
+    {
+      data->num_outgoing += 1;
+    }
 
   return FALSE;
 }
@@ -638,6 +647,7 @@ test_connection_filter (void)
   g_assert (r != NULL);
   g_object_unref (r);
   g_assert_cmpint (data.num_handled, ==, 3);
+  g_assert_cmpint (data.num_outgoing, ==, 3);
 
   _g_object_wait_for_single_ref (c);
   g_object_unref (c);



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]