[glib] Bug 621945 – Filter outgoing messages in GDBusConnection
- From: David Zeuthen <davidz src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [glib] Bug 621945 – Filter outgoing messages in GDBusConnection
- Date: Mon, 21 Jun 2010 20:13:27 +0000 (UTC)
commit 45411ccbe3c9d1b08332942d1e7b594330688126
Author: David Zeuthen <davidz redhat com>
Date: Mon Jun 21 16:08:53 2010 -0400
Bug 621945 â?? Filter outgoing messages in GDBusConnection
This patch breaks some rarely-used public API (only known user is
dconf).
This patch is based on work from Peng Huang <shawn p huang gmail com>.
See https://bugzilla.gnome.org/show_bug.cgi?id=621945
Signed-off-by: David Zeuthen <davidz redhat com>
gio/gdbusconnection.c | 66 +++++++++++++++++++++++++++++++++++++---
gio/gdbusconnection.h | 3 ++
gio/gdbusprivate.c | 69 ++++++++++++++++++++++++++++-------------
gio/gdbusprivate.h | 5 +++
gio/tests/gdbus-connection.c | 16 ++++++++--
5 files changed, 129 insertions(+), 30 deletions(-)
---
diff --git a/gio/gdbusconnection.c b/gio/gdbusconnection.c
index a41ee85..d2ab2d4 100644
--- a/gio/gdbusconnection.c
+++ b/gio/gdbusconnection.c
@@ -1624,6 +1624,7 @@ on_worker_message_received (GDBusWorker *worker,
{
consumed_by_filter = filters[n].func (connection,
message,
+ TRUE,
filters[n].user_data);
if (consumed_by_filter)
break;
@@ -1673,6 +1674,52 @@ on_worker_message_received (GDBusWorker *worker,
g_free (filters);
}
+/* Called in worker's thread */
+static gboolean
+on_worker_message_about_to_be_sent (GDBusWorker *worker,
+ GDBusMessage *message,
+ gpointer user_data)
+{
+ GDBusConnection *connection = G_DBUS_CONNECTION (user_data);
+ FilterCallback *filters;
+ gboolean consumed_by_filter;
+ guint num_filters;
+ guint n;
+
+ //g_debug ("in on_worker_message_about_to_be_sent");
+
+ g_object_ref (connection);
+
+ /* First collect the set of callback functions */
+ CONNECTION_LOCK (connection);
+ num_filters = connection->priv->filters->len;
+ filters = g_new0 (FilterCallback, num_filters);
+ for (n = 0; n < num_filters; n++)
+ {
+ FilterData *data = connection->priv->filters->pdata[n];
+ filters[n].func = data->filter_function;
+ filters[n].user_data = data->user_data;
+ }
+ CONNECTION_UNLOCK (connection);
+
+ /* the call the filters in order (without holding the lock) */
+ consumed_by_filter = FALSE;
+ for (n = 0; n < num_filters; n++)
+ {
+ consumed_by_filter = filters[n].func (connection,
+ message,
+ FALSE,
+ filters[n].user_data);
+ if (consumed_by_filter)
+ break;
+ }
+
+ g_object_unref (connection);
+ g_free (filters);
+
+ return consumed_by_filter;
+}
+
/* Called in worker's thread - we must not block */
static void
on_worker_closed (GDBusWorker *worker,
@@ -1831,6 +1878,7 @@ initable_init (GInitable *initable,
connection->priv->worker = _g_dbus_worker_new (connection->priv->stream,
connection->priv->capabilities,
on_worker_message_received,
+ on_worker_message_about_to_be_sent,
on_worker_closed,
connection);
@@ -2266,11 +2314,11 @@ static guint _global_filter_id = 1;
* is removed or %NULL.
*
* Adds a message filter. Filters are handlers that are run on all
- * incoming messages, prior to standard dispatch. Filters are run in
- * the order that they were added. The same handler can be added as a
- * filter more than once, in which case it will be run more than once.
- * Filters added during a filter callback won't be run on the message
- * being processed.
+ * incoming and outgoing messages, prior to standard dispatch. Filters
+ * are run in the order that they were added. The same handler can be
+ * added as a filter more than once, in which case it will be run more
+ * than once. Filters added during a filter callback won't be run on
+ * the message being processed.
*
* Note that filters are run in a dedicated message handling thread so
* they can't block and, generally, can't do anything but signal a
@@ -2279,6 +2327,14 @@ static guint _global_filter_id = 1;
* g_dbus_connection_signal_subscribe() or
* g_dbus_connection_call() instead.
*
+ * If a filter consumes an incoming message (by returning %TRUE), the
+ * message is not dispatched anywhere else - not even the standard
+ * dispatch machinery (that API such as
+ * g_dbus_connection_signal_subscribe() and
+ * g_dbus_connection_send_message_with_reply() relies on) will see the
+ * message. Similary, if a filter consumes an outgoing message, the
+ * message will not be sent to the other peer.
+ *
* Returns: A filter identifier that can be used with
* g_dbus_connection_remove_filter().
*
diff --git a/gio/gdbusconnection.h b/gio/gdbusconnection.h
index b203196..7390dc5 100644
--- a/gio/gdbusconnection.h
+++ b/gio/gdbusconnection.h
@@ -469,6 +469,8 @@ void g_dbus_connection_signal_unsubscribe (GDBusConnection
* GDBusMessageFilterFunction:
* @connection: A #GDBusConnection.
* @message: A #GDBusMessage.
+ * @incoming: %TRUE if it is a message received from the other peer, %FALSE if it is
+ * a message to be sent to the other peer.
* @user_data: User data passed when adding the filter.
*
* Signature for function used in g_dbus_connection_add_filter().
@@ -480,6 +482,7 @@ void g_dbus_connection_signal_unsubscribe (GDBusConnection
*/
typedef gboolean (*GDBusMessageFilterFunction) (GDBusConnection *connection,
GDBusMessage *message,
+ gboolean incoming,
gpointer user_data);
guint g_dbus_connection_add_filter (GDBusConnection *connection,
diff --git a/gio/gdbusprivate.c b/gio/gdbusprivate.c
index 76809ee..3a48d1d 100644
--- a/gio/gdbusprivate.c
+++ b/gio/gdbusprivate.c
@@ -355,6 +355,7 @@ struct GDBusWorker
GDBusCapabilityFlags capabilities;
GCancellable *cancellable;
GDBusWorkerMessageReceivedCallback message_received_callback;
+ GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback;
GDBusWorkerDisconnectedCallback disconnected_callback;
gpointer user_data;
@@ -424,13 +425,24 @@ _g_dbus_worker_emit_disconnected (GDBusWorker *worker,
}
static void
-_g_dbus_worker_emit_message (GDBusWorker *worker,
- GDBusMessage *message)
+_g_dbus_worker_emit_message_received (GDBusWorker *worker,
+ GDBusMessage *message)
{
if (!worker->stopped)
worker->message_received_callback (worker, message, worker->user_data);
}
+static gboolean
+_g_dbus_worker_emit_message_about_to_be_sent (GDBusWorker *worker,
+ GDBusMessage *message)
+{
+ gboolean ret;
+ ret = FALSE;
+ if (!worker->stopped)
+ ret = worker->message_about_to_be_sent_callback (worker, message, worker->user_data);
+ return ret;
+}
+
static void _g_dbus_worker_do_read_unlocked (GDBusWorker *worker);
/* called in private thread shared by all GDBusConnection instances (without read-lock held) */
@@ -627,7 +639,7 @@ _g_dbus_worker_do_read_cb (GInputStream *input_stream,
}
/* yay, got a message, go deliver it */
- _g_dbus_worker_emit_message (worker, message);
+ _g_dbus_worker_emit_message_received (worker, message);
g_object_unref (message);
/* start reading another message! */
@@ -720,7 +732,7 @@ message_to_write_data_free (MessageToWriteData *data)
/* ---------------------------------------------------------------------------------------------------- */
-/* called in private thread shared by all GDBusConnection instances (with write-lock held) */
+/* called in private thread shared by all GDBusConnection instances (without write-lock held) */
static gboolean
write_message (GDBusWorker *worker,
MessageToWriteData *data,
@@ -848,29 +860,39 @@ write_message_in_idle_cb (gpointer user_data)
GDBusWorker *worker = user_data;
gboolean more_writes_are_pending;
MessageToWriteData *data;
+ gboolean message_was_dropped;
GError *error;
g_mutex_lock (worker->write_lock);
-
data = g_queue_pop_head (worker->write_queue);
g_assert (data != NULL);
+ more_writes_are_pending = (g_queue_get_length (worker->write_queue) > 0);
+ worker->write_is_pending = more_writes_are_pending;
+ g_mutex_unlock (worker->write_lock);
- error = NULL;
- if (!write_message (worker,
- data,
- &error))
+ /* Note that write_lock is only used for protecting the @write_queue
+ * and @write_is_pending fields of the GDBusWorker struct ... which we
+ * need to modify from arbitrary threads in _g_dbus_worker_send_message().
+ *
+ * Therefore, it's fine to drop it here when calling back into user
+ * code and then writing the message out onto the GIOStream since this
+ * function only runs on the worker thread.
+ */
+ message_was_dropped = _g_dbus_worker_emit_message_about_to_be_sent (worker, data->message);
+ if (G_LIKELY (!message_was_dropped))
{
- /* TODO: handle */
- _g_dbus_worker_emit_disconnected (worker, TRUE, error);
- g_error_free (error);
+ error = NULL;
+ if (!write_message (worker,
+ data,
+ &error))
+ {
+ /* TODO: handle */
+ _g_dbus_worker_emit_disconnected (worker, TRUE, error);
+ g_error_free (error);
+ }
}
message_to_write_data_free (data);
- more_writes_are_pending = (g_queue_get_length (worker->write_queue) > 0);
-
- worker->write_is_pending = more_writes_are_pending;
- g_mutex_unlock (worker->write_lock);
-
return more_writes_are_pending;
}
@@ -928,16 +950,18 @@ _g_dbus_worker_thread_begin_func (gpointer user_data)
}
GDBusWorker *
-_g_dbus_worker_new (GIOStream *stream,
- GDBusCapabilityFlags capabilities,
- GDBusWorkerMessageReceivedCallback message_received_callback,
- GDBusWorkerDisconnectedCallback disconnected_callback,
- gpointer user_data)
+_g_dbus_worker_new (GIOStream *stream,
+ GDBusCapabilityFlags capabilities,
+ GDBusWorkerMessageReceivedCallback message_received_callback,
+ GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback,
+ GDBusWorkerDisconnectedCallback disconnected_callback,
+ gpointer user_data)
{
GDBusWorker *worker;
g_return_val_if_fail (G_IS_IO_STREAM (stream), NULL);
g_return_val_if_fail (message_received_callback != NULL, NULL);
+ g_return_val_if_fail (message_about_to_be_sent_callback != NULL, NULL);
g_return_val_if_fail (disconnected_callback != NULL, NULL);
worker = g_new0 (GDBusWorker, 1);
@@ -945,6 +969,7 @@ _g_dbus_worker_new (GIOStream *stream,
worker->read_lock = g_mutex_new ();
worker->message_received_callback = message_received_callback;
+ worker->message_about_to_be_sent_callback = message_about_to_be_sent_callback;
worker->disconnected_callback = disconnected_callback;
worker->user_data = user_data;
worker->stream = g_object_ref (stream);
diff --git a/gio/gdbusprivate.h b/gio/gdbusprivate.h
index 337c278..0d9cb61 100644
--- a/gio/gdbusprivate.h
+++ b/gio/gdbusprivate.h
@@ -39,6 +39,10 @@ typedef void (*GDBusWorkerMessageReceivedCallback) (GDBusWorker *worker,
GDBusMessage *message,
gpointer user_data);
+typedef gboolean (*GDBusWorkerMessageAboutToBeSentCallback) (GDBusWorker *worker,
+ GDBusMessage *message,
+ gpointer user_data);
+
typedef void (*GDBusWorkerDisconnectedCallback) (GDBusWorker *worker,
gboolean remote_peer_vanished,
GError *error,
@@ -50,6 +54,7 @@ typedef void (*GDBusWorkerDisconnectedCallback) (GDBusWorker *worker,
GDBusWorker *_g_dbus_worker_new (GIOStream *stream,
GDBusCapabilityFlags capabilities,
GDBusWorkerMessageReceivedCallback message_received_callback,
+ GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback,
GDBusWorkerDisconnectedCallback disconnected_callback,
gpointer user_data);
diff --git a/gio/tests/gdbus-connection.c b/gio/tests/gdbus-connection.c
index 15fca70..01113ee 100644
--- a/gio/tests/gdbus-connection.c
+++ b/gio/tests/gdbus-connection.c
@@ -555,20 +555,29 @@ test_connection_signals (void)
typedef struct
{
guint num_handled;
+ guint num_outgoing;
guint32 serial;
} FilterData;
static gboolean
filter_func (GDBusConnection *connection,
GDBusMessage *message,
+ gboolean incoming,
gpointer user_data)
{
FilterData *data = user_data;
guint32 reply_serial;
- reply_serial = g_dbus_message_get_reply_serial (message);
- if (reply_serial == data->serial)
- data->num_handled += 1;
+ if (incoming)
+ {
+ reply_serial = g_dbus_message_get_reply_serial (message);
+ if (reply_serial == data->serial)
+ data->num_handled += 1;
+ }
+ else
+ {
+ data->num_outgoing += 1;
+ }
return FALSE;
}
@@ -638,6 +647,7 @@ test_connection_filter (void)
g_assert (r != NULL);
g_object_unref (r);
g_assert_cmpint (data.num_handled, ==, 3);
+ g_assert_cmpint (data.num_outgoing, ==, 3);
_g_object_wait_for_single_ref (c);
g_object_unref (c);
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]