[glib] Bug 623142 – Ensure ::new-connection runs before processing D-Bus messages
- From: David Zeuthen <davidz src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [glib] Bug 623142 – Ensure ::new-connection runs before processing D-Bus messages
- Date: Wed, 30 Jun 2010 15:56:51 +0000 (UTC)
commit 038d03cd08bdb42e6f83f6041ec01732476e900b
Author: David Zeuthen <davidz redhat com>
Date: Wed Jun 30 11:43:42 2010 -0400
Bug 623142 â?? Ensure ::new-connection runs before processing D-Bus messages
Without this guarantee, peer-to-peer connections are not very
useful. However, with this guarantee it's possible to export objects
in a handler for the GDBusServer::new-connection signal.
There are two caveats with this patch
- it won't work on message bus connections
- we don't queue up messages to be written
that can be addresses later if needed.
https://bugzilla.gnome.org/show_bug.cgi?id=623142
Signed-off-by: David Zeuthen <davidz redhat com>
docs/reference/gio/gio-sections.txt | 1 +
gio/gdbusconnection.c | 30 ++++++-
gio/gdbusconnection.h | 1 +
gio/gdbusprivate.c | 84 +++++++++++++++-
gio/gdbusprivate.h | 4 +
gio/gdbusserver.c | 11 ++-
gio/gio.symbols | 1 +
gio/gioenums.h | 5 +-
gio/tests/gdbus-peer.c | 187 +++++++++++++++++++++++++++++++++++
9 files changed, 316 insertions(+), 8 deletions(-)
---
diff --git a/docs/reference/gio/gio-sections.txt b/docs/reference/gio/gio-sections.txt
index b4f3dd2..420111b 100644
--- a/docs/reference/gio/gio-sections.txt
+++ b/docs/reference/gio/gio-sections.txt
@@ -2390,6 +2390,7 @@ g_dbus_connection_new_sync
g_dbus_connection_new_for_address
g_dbus_connection_new_for_address_finish
g_dbus_connection_new_for_address_sync
+g_dbus_connection_start_message_processing
GDBusCapabilityFlags
g_dbus_connection_close
g_dbus_connection_is_closed
diff --git a/gio/gdbusconnection.c b/gio/gdbusconnection.c
index 98f705e..189541b 100644
--- a/gio/gdbusconnection.c
+++ b/gio/gdbusconnection.c
@@ -830,6 +830,23 @@ g_dbus_connection_get_stream (GDBusConnection *connection)
return connection->priv->stream;
}
+/**
+ * g_dbus_connection_start_message_processing:
+ * @connection: A #GDBusConnection.
+ *
+ * If @connection was created with
+ * %G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING, this method
+ * starts processing messages. Does nothing on if @connection wasn't
+ * created with this flag or if the method has already been called.
+ *
+ * Since: 2.26
+ */
+void
+g_dbus_connection_start_message_processing (GDBusConnection *connection)
+{
+ g_return_if_fail (G_IS_DBUS_CONNECTION (connection));
+ _g_dbus_worker_unfreeze (connection->priv->worker);
+}
/**
* g_dbus_connection_is_closed:
@@ -1877,16 +1894,27 @@ initable_init (GInitable *initable,
connection->priv->worker = _g_dbus_worker_new (connection->priv->stream,
connection->priv->capabilities,
+ (connection->priv->flags & G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING),
on_worker_message_received,
on_worker_message_about_to_be_sent,
on_worker_closed,
connection);
- /* if a bus connection, invoke org.freedesktop.DBus.Hello - this is how we're getting a name */
+ /* if a bus connection, call org.freedesktop.DBus.Hello - this is how we're getting a name */
if (connection->priv->flags & G_DBUS_CONNECTION_FLAGS_MESSAGE_BUS_CONNECTION)
{
GVariant *hello_result;
+ /* we could lift this restriction by adding code in gdbusprivate.c */
+ if (connection->priv->flags & G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING)
+ {
+ g_set_error_literal (&connection->priv->initialization_error,
+ G_IO_ERROR,
+ G_IO_ERROR_FAILED,
+ "Cannot use DELAY_MESSAGE_PROCESSING with MESSAGE_BUS_CONNECTION");
+ goto out;
+ }
+
hello_result = g_dbus_connection_call_sync (connection,
"org.freedesktop.DBus", /* name */
"/org/freedesktop/DBus", /* path */
diff --git a/gio/gdbusconnection.h b/gio/gdbusconnection.h
index d80daf2..4f85159 100644
--- a/gio/gdbusconnection.h
+++ b/gio/gdbusconnection.h
@@ -128,6 +128,7 @@ GDBusConnection *g_dbus_connection_new_for_address_sync (const gchar
/* ---------------------------------------------------------------------------------------------------- */
+void g_dbus_connection_start_message_processing (GDBusConnection *connection);
gboolean g_dbus_connection_is_closed (GDBusConnection *connection);
void g_dbus_connection_close (GDBusConnection *connection);
GIOStream *g_dbus_connection_get_stream (GDBusConnection *connection);
diff --git a/gio/gdbusprivate.c b/gio/gdbusprivate.c
index 3a48d1d..5208ad9 100644
--- a/gio/gdbusprivate.c
+++ b/gio/gdbusprivate.c
@@ -350,7 +350,16 @@ _g_dbus_shared_thread_unref (void)
struct GDBusWorker
{
volatile gint ref_count;
+
gboolean stopped;
+
+ /* TODO: frozen (e.g. G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING) currently
+ * only affects messages received from the other peer (since GDBusServer is the
+ * only user) - we might want it to affect messages sent to the other peer too?
+ */
+ gboolean frozen;
+ GQueue *received_messages_while_frozen;
+
GIOStream *stream;
GDBusCapabilityFlags capabilities;
GCancellable *cancellable;
@@ -406,11 +415,13 @@ _g_dbus_worker_unref (GDBusWorker *worker)
if (worker->read_fd_list != NULL)
g_object_unref (worker->read_fd_list);
+ g_queue_foreach (worker->received_messages_while_frozen, (GFunc) g_object_unref, NULL);
+ g_queue_free (worker->received_messages_while_frozen);
+
g_mutex_free (worker->write_lock);
- g_queue_foreach (worker->write_queue,
- (GFunc) message_to_write_data_free,
- NULL);
+ g_queue_foreach (worker->write_queue, (GFunc) message_to_write_data_free, NULL);
g_queue_free (worker->write_queue);
+
g_free (worker);
}
}
@@ -443,6 +454,66 @@ _g_dbus_worker_emit_message_about_to_be_sent (GDBusWorker *worker,
return ret;
}
+/* can only be called from private thread with read-lock held - takes ownership of @message */
+static void
+_g_dbus_worker_queue_or_deliver_received_message (GDBusWorker *worker,
+ GDBusMessage *message)
+{
+ if (worker->frozen || g_queue_get_length (worker->received_messages_while_frozen) > 0)
+ {
+ /* queue up */
+ g_queue_push_tail (worker->received_messages_while_frozen, message);
+ }
+ else
+ {
+ /* not frozen, nor anything in queue */
+ _g_dbus_worker_emit_message_received (worker, message);
+ g_object_unref (message);
+ }
+}
+
+/* called in private thread shared by all GDBusConnection instances (without read-lock held) */
+static gboolean
+unfreeze_in_idle_cb (gpointer user_data)
+{
+ GDBusWorker *worker = user_data;
+ GDBusMessage *message;
+
+ g_mutex_lock (worker->read_lock);
+ if (worker->frozen)
+ {
+ while ((message = g_queue_pop_head (worker->received_messages_while_frozen)) != NULL)
+ {
+ _g_dbus_worker_emit_message_received (worker, message);
+ g_object_unref (message);
+ }
+ worker->frozen = FALSE;
+ }
+ else
+ {
+ g_assert (g_queue_get_length (worker->received_messages_while_frozen) == 0);
+ }
+ g_mutex_unlock (worker->read_lock);
+ return FALSE;
+}
+
+/* can be called from any thread */
+void
+_g_dbus_worker_unfreeze (GDBusWorker *worker)
+{
+ GSource *idle_source;
+ idle_source = g_idle_source_new ();
+ g_source_set_priority (idle_source, G_PRIORITY_DEFAULT);
+ g_source_set_callback (idle_source,
+ unfreeze_in_idle_cb,
+ _g_dbus_worker_ref (worker),
+ (GDestroyNotify) _g_dbus_worker_unref);
+ g_source_attach (idle_source, shared_thread_data->context);
+ g_source_unref (idle_source);
+}
+
+/* ---------------------------------------------------------------------------------------------------- */
+
static void _g_dbus_worker_do_read_unlocked (GDBusWorker *worker);
/* called in private thread shared by all GDBusConnection instances (without read-lock held) */
@@ -639,8 +710,7 @@ _g_dbus_worker_do_read_cb (GInputStream *input_stream,
}
/* yay, got a message, go deliver it */
- _g_dbus_worker_emit_message_received (worker, message);
- g_object_unref (message);
+ _g_dbus_worker_queue_or_deliver_received_message (worker, message);
/* start reading another message! */
worker->read_buffer_bytes_wanted = 0;
@@ -952,6 +1022,7 @@ _g_dbus_worker_thread_begin_func (gpointer user_data)
GDBusWorker *
_g_dbus_worker_new (GIOStream *stream,
GDBusCapabilityFlags capabilities,
+ gboolean initially_frozen,
GDBusWorkerMessageReceivedCallback message_received_callback,
GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback,
GDBusWorkerDisconnectedCallback disconnected_callback,
@@ -976,6 +1047,9 @@ _g_dbus_worker_new (GIOStream *stream,
worker->capabilities = capabilities;
worker->cancellable = g_cancellable_new ();
+ worker->frozen = initially_frozen;
+ worker->received_messages_while_frozen = g_queue_new ();
+
worker->write_lock = g_mutex_new ();
worker->write_queue = g_queue_new ();
diff --git a/gio/gdbusprivate.h b/gio/gdbusprivate.h
index 0d9cb61..766bb98 100644
--- a/gio/gdbusprivate.h
+++ b/gio/gdbusprivate.h
@@ -53,6 +53,7 @@ typedef void (*GDBusWorkerDisconnectedCallback) (GDBusWorker *worker,
*/
GDBusWorker *_g_dbus_worker_new (GIOStream *stream,
GDBusCapabilityFlags capabilities,
+ gboolean initially_frozen,
GDBusWorkerMessageReceivedCallback message_received_callback,
GDBusWorkerMessageAboutToBeSentCallback message_about_to_be_sent_callback,
GDBusWorkerDisconnectedCallback disconnected_callback,
@@ -67,6 +68,9 @@ void _g_dbus_worker_send_message (GDBusWorker *worker,
/* can be called from any thread */
void _g_dbus_worker_stop (GDBusWorker *worker);
+/* can be called from any thread */
+void _g_dbus_worker_unfreeze (GDBusWorker *worker);
+
/* ---------------------------------------------------------------------------------------------------- */
void _g_dbus_initialize (void);
diff --git a/gio/gdbusserver.c b/gio/gdbusserver.c
index 85adccc..5195ba0 100644
--- a/gio/gdbusserver.c
+++ b/gio/gdbusserver.c
@@ -367,6 +367,11 @@ g_dbus_server_class_init (GDBusServerClass *klass)
* linkend="g-main-context-push-thread-default">thread-default main
* loop</link> of the thread that @server was constructed in.
*
+ * You are guaranteed that signal handlers for this signal runs
+ * before incoming messages on @connection are processed. This means
+ * that it's suitable to call g_dbus_connection_register_object() or
+ * similar from the signal handler.
+ *
* Since: 2.26
*/
_signals[NEW_CONNECTION_SIGNAL] = g_signal_new ("new-connection",
@@ -889,6 +894,7 @@ emit_new_connection_in_idle (gpointer user_data)
_signals[NEW_CONNECTION_SIGNAL],
0,
data->connection);
+ g_dbus_connection_start_message_processing (data->connection);
g_object_unref (data->connection);
return FALSE;
@@ -925,7 +931,9 @@ on_run (GSocketService *service,
goto out;
}
- connection_flags = G_DBUS_CONNECTION_FLAGS_AUTHENTICATION_SERVER;
+ connection_flags =
+ G_DBUS_CONNECTION_FLAGS_AUTHENTICATION_SERVER |
+ G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING;
if (server->priv->flags & G_DBUS_SERVER_FLAGS_AUTHENTICATION_ALLOW_ANONYMOUS)
connection_flags |= G_DBUS_CONNECTION_FLAGS_AUTHENTICATION_ALLOW_ANONYMOUS;
@@ -944,6 +952,7 @@ on_run (GSocketService *service,
_signals[NEW_CONNECTION_SIGNAL],
0,
connection);
+ g_dbus_connection_start_message_processing (connection);
g_object_unref (connection);
}
else
diff --git a/gio/gio.symbols b/gio/gio.symbols
index ebc3d15..0dd8b79 100644
--- a/gio/gio.symbols
+++ b/gio/gio.symbols
@@ -1529,6 +1529,7 @@ g_dbus_connection_new_for_address
g_dbus_connection_new_for_address_finish
g_dbus_connection_new_for_address_sync
g_dbus_connection_new_sync
+g_dbus_connection_start_message_processing
g_dbus_connection_get_capabilities
g_dbus_connection_get_exit_on_close
g_dbus_connection_get_guid
diff --git a/gio/gioenums.h b/gio/gioenums.h
index 9d1e126..feb5d03 100644
--- a/gio/gioenums.h
+++ b/gio/gioenums.h
@@ -969,6 +969,8 @@ typedef enum
* method.
* @G_DBUS_CONNECTION_FLAGS_MESSAGE_BUS_CONNECTION: Pass this flag if connecting to a peer that is a
* message bus. This means that the Hello() method will be invoked as part of the connection setup.
+ * @G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING: If set, processing of D-Bus messages is
+ * delayed until g_dbus_connection_start_message_processing() is called.
*
* Flags used when creating a new #GDBusConnection.
*
@@ -979,7 +981,8 @@ typedef enum {
G_DBUS_CONNECTION_FLAGS_AUTHENTICATION_CLIENT = (1<<0),
G_DBUS_CONNECTION_FLAGS_AUTHENTICATION_SERVER = (1<<1),
G_DBUS_CONNECTION_FLAGS_AUTHENTICATION_ALLOW_ANONYMOUS = (1<<2),
- G_DBUS_CONNECTION_FLAGS_MESSAGE_BUS_CONNECTION = (1<<3)
+ G_DBUS_CONNECTION_FLAGS_MESSAGE_BUS_CONNECTION = (1<<3),
+ G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING = (1<<4)
} GDBusConnectionFlags;
/**
diff --git a/gio/tests/gdbus-peer.c b/gio/tests/gdbus-peer.c
index 7d81372..71bad62 100644
--- a/gio/tests/gdbus-peer.c
+++ b/gio/tests/gdbus-peer.c
@@ -732,6 +732,192 @@ test_peer (void)
/* ---------------------------------------------------------------------------------------------------- */
+typedef struct
+{
+ GDBusServer *server;
+ GMainContext *context;
+ GMainLoop *loop;
+
+ GList *connections;
+} DmpData;
+
+static void
+dmp_data_free (DmpData *data)
+{
+ g_main_loop_unref (data->loop);
+ g_main_context_unref (data->context);
+ g_object_unref (data->server);
+ g_list_foreach (data->connections, (GFunc) g_object_unref, NULL);
+ g_list_free (data->connections);
+ g_free (data);
+}
+
+static void
+dmp_on_method_call (GDBusConnection *connection,
+ const gchar *sender,
+ const gchar *object_path,
+ const gchar *interface_name,
+ const gchar *method_name,
+ GVariant *parameters,
+ GDBusMethodInvocation *invocation,
+ gpointer user_data)
+{
+ //DmpData *data = user_data;
+ gint32 first;
+ gint32 second;
+ g_variant_get (parameters,
+ "(ii)",
+ &first,
+ &second);
+ g_dbus_method_invocation_return_value (invocation,
+ g_variant_new ("(i)", first + second));
+}
+
+static const GDBusInterfaceVTable dmp_interface_vtable =
+{
+ dmp_on_method_call,
+ NULL, /* get_property */
+ NULL /* set_property */
+};
+
+
+/* Runs in thread we created GDBusServer in (since we didn't pass G_DBUS_SERVER_FLAGS_RUN_IN_THREAD) */
+static void
+dmp_on_new_connection (GDBusServer *server,
+ GDBusConnection *connection,
+ gpointer user_data)
+{
+ DmpData *data = user_data;
+ GDBusNodeInfo *node;
+ GError *error;
+
+ /* accept the connection */
+ data->connections = g_list_prepend (data->connections, g_object_ref (connection));
+
+ error = NULL;
+ node = g_dbus_node_info_new_for_xml ("<node>"
+ " <interface name='org.gtk.GDBus.DmpInterface'>"
+ " <method name='AddPair'>"
+ " <arg type='i' name='first' direction='in'/>"
+ " <arg type='i' name='second' direction='in'/>"
+ " <arg type='i' name='sum' direction='out'/>"
+ " </method>"
+ " </interface>"
+ "</node>",
+ &error);
+ g_assert_no_error (error);
+
+ /* sleep 100ms before exporting an object - this is to test that
+ * G_DBUS_CONNECTION_FLAGS_DELAY_MESSAGE_PROCESSING really works
+ * (GDBusServer uses this feature).
+ */
+ usleep (100 * 1000);
+
+ /* export an object */
+ error = NULL;
+ g_dbus_connection_register_object (connection,
+ "/dmp/test",
+ node->interfaces[0],
+ &dmp_interface_vtable,
+ data,
+ NULL,
+ &error);
+ //g_dbus_node_info_unref (node);
+}
+
+static gpointer
+dmp_thread_func (gpointer user_data)
+{
+ DmpData *data = user_data;
+ GError *error;
+ gchar *guid;
+
+ data->context = g_main_context_new ();
+ g_main_context_push_thread_default (data->context);
+
+ error = NULL;
+ guid = g_dbus_generate_guid ();
+ data->server = g_dbus_server_new_sync ("nonce-tcp:",
+ G_DBUS_SERVER_FLAGS_NONE,
+ guid,
+ NULL, /* GDBusAuthObserver */
+ NULL, /* GCancellable */
+ &error);
+ g_assert_no_error (error);
+ g_signal_connect (data->server,
+ "new-connection",
+ G_CALLBACK (dmp_on_new_connection),
+ data);
+
+ g_dbus_server_start (data->server);
+
+ data->loop = g_main_loop_new (data->context, FALSE);
+ g_main_loop_run (data->loop);
+
+ g_main_context_pop_thread_default (data->context);
+
+ g_free (guid);
+ return NULL;
+}
+
+static void
+delayed_message_processing (void)
+{
+ GError *error;
+ DmpData *data;
+ GThread *service_thread;
+ guint n;
+
+ data = g_new0 (DmpData, 1);
+
+ error = NULL;
+ service_thread = g_thread_create (dmp_thread_func,
+ data,
+ TRUE,
+ &error);
+ while (data->server == NULL || !g_dbus_server_is_active (data->server))
+ g_thread_yield ();
+
+ for (n = 0; n < 5; n++)
+ {
+ GDBusConnection *c;
+ GVariant *res;
+ gint32 val;
+
+ error = NULL;
+ c = g_dbus_connection_new_for_address_sync (g_dbus_server_get_client_address (data->server),
+ G_DBUS_CONNECTION_FLAGS_AUTHENTICATION_CLIENT,
+ NULL, /* GDBusAuthObserver */
+ NULL, /* GCancellable */
+ &error);
+ g_assert_no_error (error);
+
+ error = NULL;
+ res = g_dbus_connection_call_sync (c,
+ NULL, /* bus name */
+ "/dmp/test",
+ "org.gtk.GDBus.DmpInterface",
+ "AddPair",
+ g_variant_new ("(ii)", 2, n),
+ G_VARIANT_TYPE ("(i)"),
+ G_DBUS_CALL_FLAGS_NONE,
+ -1, /* timeout_msec */
+ NULL, /* GCancellable */
+ &error);
+ g_assert_no_error (error);
+ g_variant_get (res, "(i)", &val);
+ g_assert_cmpint (val, ==, 2 + n);
+ g_variant_unref (res);
+ g_object_unref (c);
+ }
+
+ g_main_loop_quit (data->loop);
+ g_thread_join (service_thread);
+ dmp_data_free (data);
+}
+
+/* ---------------------------------------------------------------------------------------------------- */
+
int
main (int argc,
char *argv[])
@@ -753,6 +939,7 @@ main (int argc,
loop = g_main_loop_new (NULL, FALSE);
g_test_add_func ("/gdbus/peer-to-peer", test_peer);
+ g_test_add_func ("/gdbus/delayed-message-processing", delayed_message_processing);
ret = g_test_run();
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]