[libsoup/carlosgc/thread-safe: 4/19] session: make message queue handling thread safe




commit 07430d6b3866391c06194a37ac6a8bef37de16fe
Author: Carlos Garcia Campos <cgarcia igalia com>
Date:   Fri Apr 1 13:47:39 2022 +0200

    session: make message queue handling thread safe
    
    Ensure that queue items are only processed in the same thread they were
    created.

 libsoup/soup-message-queue-item.c |   2 +
 libsoup/soup-message-queue-item.h |   1 +
 libsoup/soup-misc.c               |  12 +++
 libsoup/soup-misc.h               |   1 +
 libsoup/soup-session.c            | 178 +++++++++++++++++++++++++++++++++-----
 5 files changed, 172 insertions(+), 22 deletions(-)
---
diff --git a/libsoup/soup-message-queue-item.c b/libsoup/soup-message-queue-item.c
index d674799a..825fc022 100644
--- a/libsoup/soup-message-queue-item.c
+++ b/libsoup/soup-message-queue-item.c
@@ -24,6 +24,7 @@ soup_message_queue_item_new (SoupSession  *session,
         item = g_atomic_rc_box_new0 (SoupMessageQueueItem);
         item->session = g_object_ref (session);
         item->msg = g_object_ref (msg);
+        item->context = g_main_context_ref_thread_default ();
         item->async = async;
         item->cancellable = cancellable ? g_object_ref (cancellable) : g_cancellable_new ();
 
@@ -46,6 +47,7 @@ soup_message_queue_item_destroy (SoupMessageQueueItem *item)
 
         g_object_unref (item->session);
         g_object_unref (item->msg);
+        g_main_context_unref (item->context);
         g_object_unref (item->cancellable);
         g_clear_error (&item->error);
         g_clear_object (&item->task);
diff --git a/libsoup/soup-message-queue-item.h b/libsoup/soup-message-queue-item.h
index 9a958342..49974928 100644
--- a/libsoup/soup-message-queue-item.h
+++ b/libsoup/soup-message-queue-item.h
@@ -29,6 +29,7 @@ typedef enum {
 struct _SoupMessageQueueItem {
         SoupSession *session;
         SoupMessage *msg;
+        GMainContext *context;
 
         GCancellable *cancellable;
         GError *error;
diff --git a/libsoup/soup-misc.c b/libsoup/soup-misc.c
index 7f7fa290..21018075 100644
--- a/libsoup/soup-misc.c
+++ b/libsoup/soup-misc.c
@@ -114,6 +114,18 @@ soup_add_timeout (GMainContext *async_context,
        return source;
 }
 
+GMainContext *
+soup_thread_default_context (void)
+{
+        GMainContext *context;
+
+        context = g_main_context_get_thread_default ();
+        if (!context)
+                context = g_main_context_default ();
+
+        return context;
+}
+
 /* 00 URI_UNRESERVED
  * 01 URI_PCT_ENCODED
  * 02 URI_GEN_DELIMS
diff --git a/libsoup/soup-misc.h b/libsoup/soup-misc.h
index 656f05b5..cb2986b3 100644
--- a/libsoup/soup-misc.h
+++ b/libsoup/soup-misc.h
@@ -24,6 +24,7 @@ GSource           *soup_add_timeout          (GMainContext *async_context,
                                              guint         interval,
                                              GSourceFunc   function,
                                              gpointer      data);
+GMainContext      *soup_thread_default_context (void);
 
 /* Misc utils */
 
diff --git a/libsoup/soup-session.c b/libsoup/soup-session.c
index 9f40f8f8..349e563b 100644
--- a/libsoup/soup-session.c
+++ b/libsoup/soup-session.c
@@ -83,9 +83,12 @@ typedef struct {
 
        SoupSocketProperties *socket_props;
 
+        GMainContext *context;
+        GMutex queue_mutex;
        GQueue *queue;
-       GSource *queue_source;
-        guint16 in_async_run_queue;
+        GMutex queue_sources_mutex;
+       GHashTable *queue_sources;
+        guint in_async_run_queue;
         gboolean needs_queue_sort;
 
        char *user_agent;
@@ -168,6 +171,7 @@ G_DEFINE_QUARK (soup-session-error-quark, soup_session_error)
 typedef struct {
        GSource source;
        SoupSession* session;
+        guint num_items;
 } SoupMessageQueueSource;
 
 static gboolean
@@ -189,20 +193,90 @@ static GSourceFuncs queue_source_funcs = {
        NULL, NULL, NULL
 };
 
+static void
+soup_session_add_queue_source (SoupSession  *session,
+                               GMainContext *context)
+{
+        SoupSessionPrivate *priv = soup_session_get_instance_private (session);
+        SoupMessageQueueSource *queue_source;
+
+        queue_source = g_hash_table_lookup (priv->queue_sources, context);
+        if (!queue_source) {
+                GSource *source;
+
+                source = g_source_new (&queue_source_funcs, sizeof (SoupMessageQueueSource));
+                queue_source = (SoupMessageQueueSource *)source;
+                queue_source->session = session;
+                queue_source->num_items = 0;
+                g_source_set_name (source, "SoupMessageQueue");
+                g_source_set_can_recurse (source, TRUE);
+                g_source_attach (source, context);
+                g_hash_table_insert (priv->queue_sources, context, source);
+        }
+
+        queue_source->num_items++;
+
+}
+
+static void
+soup_session_add_queue_source_for_item (SoupSession          *session,
+                                        SoupMessageQueueItem *item)
+{
+        SoupSessionPrivate *priv = soup_session_get_instance_private (session);
+
+        if (item->context == priv->context)
+                return;
+
+        g_mutex_lock (&priv->queue_sources_mutex);
+        soup_session_add_queue_source (session, item->context);
+        g_mutex_unlock (&priv->queue_sources_mutex);
+}
+
+static void
+soup_session_remove_queue_source (SoupSession  *session,
+                                  GMainContext *context)
+{
+        SoupSessionPrivate *priv = soup_session_get_instance_private (session);
+        SoupMessageQueueSource *queue_source;
+
+        queue_source = g_hash_table_lookup (priv->queue_sources, context);
+        if (!queue_source)
+                return;
+
+        if (--queue_source->num_items > 0)
+                return;
+
+        g_source_destroy ((GSource *)queue_source);
+        g_hash_table_remove (priv->queue_sources, context);
+}
+
+static void
+soup_session_remove_queue_source_for_item (SoupSession          *session,
+                                           SoupMessageQueueItem *item)
+{
+        SoupSessionPrivate *priv = soup_session_get_instance_private (session);
+
+        if (item->context == priv->context)
+                return;
+
+        g_mutex_lock (&priv->queue_sources_mutex);
+        soup_session_remove_queue_source (session, item->context);
+        g_mutex_unlock (&priv->queue_sources_mutex);
+}
+
 static void
 soup_session_init (SoupSession *session)
 {
        SoupSessionPrivate *priv = soup_session_get_instance_private (session);
        SoupAuthManager *auth_manager;
-       SoupMessageQueueSource *source;
 
+        priv->context = g_main_context_ref_thread_default ();
+        g_mutex_init (&priv->queue_mutex);
        priv->queue = g_queue_new ();
-       priv->queue_source = g_source_new (&queue_source_funcs, sizeof (SoupMessageQueueSource));
-       source = (SoupMessageQueueSource *)priv->queue_source;
-       source->session = session;
-       g_source_set_name (priv->queue_source, "SoupMessageQueue");
-       g_source_set_can_recurse (priv->queue_source, TRUE);
-       g_source_attach (priv->queue_source, g_main_context_get_thread_default ());
+        g_mutex_init (&priv->queue_sources_mutex);
+        priv->queue_sources = g_hash_table_new_full (NULL, NULL, NULL, (GDestroyNotify)g_source_unref);
+        soup_session_add_queue_source (session, priv->context);
+
         priv->io_timeout = priv->idle_timeout = 60;
 
         priv->conn_manager = soup_connection_manager_new (session,
@@ -229,12 +303,21 @@ soup_session_init (SoupSession *session)
         priv->tlsdb_use_default = TRUE;
 }
 
+static void
+destroy_queue_source (gpointer key,
+                      GSource *source)
+{
+        g_source_destroy (source);
+}
+
 static void
 soup_session_dispose (GObject *object)
 {
        SoupSession *session = SOUP_SESSION (object);
        SoupSessionPrivate *priv = soup_session_get_instance_private (session);
 
+        g_assert (priv->context == soup_thread_default_context ());
+
        priv->disposed = TRUE;
        soup_session_abort (session);
        g_warn_if_fail (soup_connection_manager_get_num_conns (priv->conn_manager) == 0);
@@ -242,7 +325,7 @@ soup_session_dispose (GObject *object)
        while (priv->features)
                soup_session_remove_feature (session, priv->features->data);
 
-       g_source_destroy (priv->queue_source);
+        g_hash_table_foreach (priv->queue_sources, (GHFunc)destroy_queue_source, NULL);
 
        G_OBJECT_CLASS (soup_session_parent_class)->dispose (object);
 }
@@ -253,9 +336,14 @@ soup_session_finalize (GObject *object)
        SoupSession *session = SOUP_SESSION (object);
        SoupSessionPrivate *priv = soup_session_get_instance_private (session);
 
+        g_assert (priv->context == soup_thread_default_context ());
+
        g_warn_if_fail (g_queue_is_empty (priv->queue));
        g_queue_free (priv->queue);
-       g_source_unref (priv->queue_source);
+        g_mutex_clear (&priv->queue_mutex);
+        g_hash_table_destroy (priv->queue_sources);
+        g_mutex_clear (&priv->queue_sources_mutex);
+        g_main_context_unref (priv->context);
 
         g_clear_pointer (&priv->conn_manager, soup_connection_manager_free);
 
@@ -958,7 +1046,9 @@ soup_session_lookup_queue (SoupSession *session,
        SoupSessionPrivate *priv = soup_session_get_instance_private (session);
        GList *link;
 
+        g_mutex_lock (&priv->queue_mutex);
        link = g_queue_find_custom (priv->queue, data, compare_func);
+        g_mutex_unlock (&priv->queue_mutex);
        return link ? (SoupMessageQueueItem *)link->data : NULL;
 }
 
@@ -1192,13 +1282,15 @@ message_priority_changed (SoupMessage          *msg,
 {
         SoupSessionPrivate *priv = soup_session_get_instance_private (item->session);
 
-        if (priv->in_async_run_queue) {
-                priv->needs_queue_sort = TRUE;
+        if (g_atomic_int_get (&priv->in_async_run_queue)) {
+                g_atomic_int_set (&priv->needs_queue_sort, TRUE);
                 return;
         }
 
+        g_mutex_lock (&priv->queue_mutex);
         g_queue_sort (priv->queue, (GCompareDataFunc)compare_queue_item, NULL);
-        priv->needs_queue_sort = FALSE;
+        g_mutex_unlock (&priv->queue_mutex);
+        g_atomic_int_set (&priv->needs_queue_sort, FALSE);
 }
 
 static SoupMessageQueueItem *
@@ -1216,9 +1308,13 @@ soup_session_append_queue_item (SoupSession        *session,
         soup_message_set_is_preconnect (msg, FALSE);
 
        item = soup_message_queue_item_new (session, msg, async, cancellable);
+        g_mutex_lock (&priv->queue_mutex);
        g_queue_insert_sorted (priv->queue,
                               soup_message_queue_item_ref (item),
                               (GCompareDataFunc)compare_queue_item, NULL);
+        g_mutex_unlock (&priv->queue_mutex);
+
+        soup_session_add_queue_source_for_item (session, item);
 
        if (!soup_message_query_flags (msg, SOUP_MESSAGE_NO_REDIRECT)) {
                soup_message_add_header_handler (
@@ -1253,6 +1349,8 @@ soup_session_send_queue_item (SoupSession *session,
        SoupMessageHeaders *request_headers;
        const char *method;
 
+        g_assert (item->context == soup_thread_default_context ());
+
        request_headers = soup_message_get_request_headers (item->msg);
        if (priv->user_agent)
                soup_message_headers_replace_common (request_headers, SOUP_HEADER_USER_AGENT, 
priv->user_agent);
@@ -1297,7 +1395,11 @@ soup_session_unqueue_item (SoupSession          *session,
                return;
        }
 
+        g_mutex_lock (&priv->queue_mutex);
        g_queue_remove (priv->queue, item);
+        g_mutex_unlock (&priv->queue_mutex);
+
+        soup_session_remove_queue_source_for_item (session, item);
 
        /* g_signal_handlers_disconnect_by_func doesn't work if you
         * have a metamarshal, meaning it doesn't work with
@@ -1321,6 +1423,8 @@ message_completed (SoupMessage *msg, SoupMessageIOCompletion completion, gpointe
 {
        SoupMessageQueueItem *item = user_data;
 
+        g_assert (item->context == soup_thread_default_context ());
+
        if (item->async)
                soup_session_kick_queue (item->session);
 
@@ -1389,6 +1493,8 @@ tunnel_message_completed (SoupMessage *msg, SoupMessageIOCompletion completion,
        SoupSession *session = tunnel_item->session;
        guint status;
 
+        g_assert (tunnel_item->context == soup_thread_default_context ());
+
         if (tunnel_item->state == SOUP_MESSAGE_REQUEUED)
                 tunnel_item->state = SOUP_MESSAGE_RESTARTING;
 
@@ -1583,6 +1689,7 @@ soup_session_process_queue_item (SoupSession          *session,
                                 gboolean              loop)
 {
        g_assert (item->session == session);
+        g_assert (item->context == soup_thread_default_context ());
 
        do {
                if (item->paused)
@@ -1660,33 +1767,49 @@ soup_session_process_queue_item (SoupSession          *session,
 }
 
 static void
-process_queue_item (SoupMessageQueueItem *item)
+collect_queue_item (SoupMessageQueueItem *item,
+                    GList               **items)
 {
         if (!item->async)
                 return;
 
+        if (item->context != soup_thread_default_context ())
+                return;
+
         /* CONNECT messages are handled specially */
         if (soup_message_get_method (item->msg) == SOUP_METHOD_CONNECT)
                 return;
 
-        soup_session_process_queue_item (item->session, item, TRUE);
+        *items = g_list_prepend (*items, item);
 }
 
 static void
 async_run_queue (SoupSession *session)
 {
        SoupSessionPrivate *priv = soup_session_get_instance_private (session);
+        GList *items = NULL;
+        GList *i;
 
        g_object_ref (session);
-        priv->in_async_run_queue++;
+        g_atomic_int_inc (&priv->in_async_run_queue);
        soup_connection_manager_cleanup (priv->conn_manager, FALSE);
 
-        g_queue_foreach (priv->queue, (GFunc)process_queue_item, NULL);
+        g_mutex_lock (&priv->queue_mutex);
+        g_queue_foreach (priv->queue, (GFunc)collect_queue_item, &items);
+        g_mutex_unlock (&priv->queue_mutex);
 
-        priv->in_async_run_queue--;
-        if (!priv->in_async_run_queue && priv->needs_queue_sort) {
+        for (i = g_list_reverse (items); i != NULL; i = g_list_next (i)) {
+                SoupMessageQueueItem *item = (SoupMessageQueueItem *)i->data;
+                soup_session_process_queue_item (item->session, item, TRUE);
+        }
+
+        g_list_free (items);
+
+        if (g_atomic_int_dec_and_test (&priv->in_async_run_queue) && g_atomic_int_get 
(&priv->needs_queue_sort)) {
+                g_mutex_lock (&priv->queue_mutex);
                 g_queue_sort (priv->queue, (GCompareDataFunc)compare_queue_item, NULL);
-                priv->needs_queue_sort = FALSE;
+                g_mutex_unlock (&priv->queue_mutex);
+                g_atomic_int_set (&priv->needs_queue_sort, FALSE);
         }
 
        g_object_unref (session);
@@ -1734,12 +1857,21 @@ soup_session_pause_message (SoupSession *session,
                soup_message_io_pause (msg);
 }
 
+static void
+kick_queue_source (gpointer key,
+                   GSource *source)
+{
+        g_source_set_ready_time (source, 0);
+}
+
 void
 soup_session_kick_queue (SoupSession *session)
 {
        SoupSessionPrivate *priv = soup_session_get_instance_private (session);
 
-       g_source_set_ready_time (priv->queue_source, 0);
+        g_mutex_lock (&priv->queue_sources_mutex);
+        g_hash_table_foreach (priv->queue_sources, (GHFunc)kick_queue_source, NULL);
+        g_mutex_unlock (&priv->queue_sources_mutex);
 }
 
 /**
@@ -1804,7 +1936,9 @@ soup_session_abort (SoupSession *session)
        priv = soup_session_get_instance_private (session);
 
        /* Cancel everything */
+        g_mutex_lock (&priv->queue_mutex);
        g_queue_foreach (priv->queue, (GFunc)soup_message_queue_item_cancel, NULL);
+        g_mutex_unlock (&priv->queue_mutex);
 
        /* Close all idle connections */
         soup_connection_manager_cleanup (priv->conn_manager, TRUE);


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]