[libsoup/carlosgc/session-queue] Remove SoupMessageQueue




commit 077d518d2c993ea5c605453731399dc1d556a0f3
Author: Carlos Garcia Campos <cgarcia igalia com>
Date:   Fri Feb 26 13:27:28 2021 +0100

    Remove SoupMessageQueue
    
    We can just use a GQueue instead now that we don't try to support
    multiple threads

 docs/reference/meson.build        |   2 +-
 libsoup/meson.build               |   2 +-
 libsoup/soup-connection.c         |   2 +-
 libsoup/soup-message-io.c         |   5 +-
 libsoup/soup-message-queue-item.c |  73 ++++++++++
 libsoup/soup-message-queue-item.h |  63 +++++++++
 libsoup/soup-message-queue.c      | 275 --------------------------------------
 libsoup/soup-message-queue.h      |  90 -------------
 libsoup/soup-session-private.h    |   4 +
 libsoup/soup-session.c            | 126 ++++++++---------
 10 files changed, 203 insertions(+), 439 deletions(-)
---
diff --git a/docs/reference/meson.build b/docs/reference/meson.build
index de514e29..812042fa 100644
--- a/docs/reference/meson.build
+++ b/docs/reference/meson.build
@@ -8,7 +8,7 @@ ignore_headers = [
   'soup-brotli-decompressor.h',
   'soup-connection.h',
   'soup-connection-auth.h',
-  'soup-message-queue.h',
+  'soup-message-queue-item.h',
   'soup-path-map.h',
   'soup-http-input-stream.h',
   'soup-converter-wrapper.h',
diff --git a/libsoup/meson.build b/libsoup/meson.build
index e7950317..48b8e596 100644
--- a/libsoup/meson.build
+++ b/libsoup/meson.build
@@ -63,7 +63,7 @@ soup_sources = [
   'soup-message-headers.c',
   'soup-message-io.c',
   'soup-message-io-data.c',
-  'soup-message-queue.c',
+  'soup-message-queue-item.c',
   'soup-method.c',
   'soup-misc.c',
   'soup-multipart.c',
diff --git a/libsoup/soup-connection.c b/libsoup/soup-connection.c
index ecbd8693..62a48211 100644
--- a/libsoup/soup-connection.c
+++ b/libsoup/soup-connection.c
@@ -12,7 +12,7 @@
 #include "soup-connection.h"
 #include "soup.h"
 #include "soup-io-stream.h"
-#include "soup-message-queue.h"
+#include "soup-message-queue-item.h"
 #include "soup-socket-properties.h"
 #include "soup-private-enum-types.h"
 #include <gio/gnetworking.h>
diff --git a/libsoup/soup-message-io.c b/libsoup/soup-message-io.c
index 6503892c..c4999df0 100644
--- a/libsoup/soup-message-io.c
+++ b/libsoup/soup-message-io.c
@@ -24,7 +24,7 @@
 #include "content-sniffer/soup-content-sniffer-stream.h"
 #include "soup-filter-input-stream.h"
 #include "soup-message-private.h"
-#include "soup-message-queue.h"
+#include "soup-message-queue-item.h"
 #include "soup-misc.h"
 #include "soup-uri-utils-private.h"
 
@@ -971,8 +971,7 @@ soup_message_send_request (SoupMessageQueueItem      *item,
        io->base.completion_cb = completion_cb;
        io->base.completion_data = user_data;
 
-       io->item = item;
-       soup_message_queue_item_ref (item);
+       io->item = soup_message_queue_item_ref (item);
        io->base.iostream = g_object_ref (soup_connection_get_iostream (io->item->conn));
        io->base.istream = SOUP_FILTER_INPUT_STREAM (g_io_stream_get_input_stream (io->base.iostream));
        io->base.ostream = g_io_stream_get_output_stream (io->base.iostream);
diff --git a/libsoup/soup-message-queue-item.c b/libsoup/soup-message-queue-item.c
new file mode 100644
index 00000000..4097dd2b
--- /dev/null
+++ b/libsoup/soup-message-queue-item.c
@@ -0,0 +1,73 @@
+/*
+ * soup-message-queue-item.c: Message queue item
+ *
+ * Copyright (C) 2003 Novell, Inc.
+ * Copyright (C) 2008 Red Hat, Inc.
+ * Copyright (C) 2021 Igalia S.L.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#include "soup-message-queue-item.h"
+#include "soup.h"
+
+SoupMessageQueueItem *
+soup_message_queue_item_new (SoupSession        *session,
+                             SoupMessage        *msg,
+                             gboolean            async,
+                             GCancellable       *cancellable,
+                             SoupSessionCallback callback,
+                             gpointer            user_data)
+{
+        SoupMessageQueueItem *item;
+
+        item = g_atomic_rc_box_new0 (SoupMessageQueueItem);
+        item->session = g_object_ref (session);
+        item->msg = g_object_ref (msg);
+        item->async = async;
+        item->callback = callback;
+        item->callback_data = user_data;
+        item->cancellable = cancellable ? g_object_ref (cancellable) : g_cancellable_new ();
+        item->priority = soup_message_get_priority (msg);
+
+        g_signal_connect_swapped (msg, "restarted",
+                                  G_CALLBACK (g_cancellable_reset),
+                                  item->cancellable);
+        return item;
+}
+
+SoupMessageQueueItem *
+soup_message_queue_item_ref (SoupMessageQueueItem *item)
+{
+        g_atomic_rc_box_acquire (item);
+
+        return item;
+}
+
+static void
+soup_message_queue_item_destroy (SoupMessageQueueItem *item)
+{
+        g_warn_if_fail (item->conn == NULL);
+
+        g_signal_handlers_disconnect_by_data (item->msg, item->cancellable);
+
+        g_object_unref (item->session);
+        g_object_unref (item->msg);
+        g_object_unref (item->cancellable);
+        g_clear_error (&item->error);
+        g_clear_object (&item->task);
+}
+
+void
+soup_message_queue_item_unref (SoupMessageQueueItem *item)
+{
+        g_atomic_rc_box_release_full (item, (GDestroyNotify)soup_message_queue_item_destroy);
+}
+
+void
+soup_message_queue_item_cancel (SoupMessageQueueItem *item)
+{
+        g_cancellable_cancel (item->cancellable);
+}
diff --git a/libsoup/soup-message-queue-item.h b/libsoup/soup-message-queue-item.h
new file mode 100644
index 00000000..4f7bf055
--- /dev/null
+++ b/libsoup/soup-message-queue-item.h
@@ -0,0 +1,63 @@
+/*
+ * Copyright (C) 2003 Novell, Inc.
+ * Copyright (C) 2008 Red Hat, Inc.
+ * Copyright (C) 2021 Igalia S.L.
+ */
+
+#pragma once
+
+#include "soup-connection.h"
+#include "soup-message.h"
+#include "soup-session-private.h"
+
+G_BEGIN_DECLS
+
+typedef enum {
+        SOUP_MESSAGE_STARTING,
+        SOUP_MESSAGE_CONNECTING,
+        SOUP_MESSAGE_CONNECTED,
+        SOUP_MESSAGE_TUNNELING,
+        SOUP_MESSAGE_READY,
+        SOUP_MESSAGE_RUNNING,
+        SOUP_MESSAGE_CACHED,
+        SOUP_MESSAGE_RESTARTING,
+        SOUP_MESSAGE_FINISHING,
+        SOUP_MESSAGE_FINISHED
+} SoupMessageQueueItemState;
+
+struct _SoupMessageQueueItem {
+        SoupSession *session;
+        SoupMessage *msg;
+        SoupSessionCallback callback;
+        gpointer callback_data;
+
+        GCancellable *cancellable;
+        GError *error;
+
+        SoupConnection *conn;
+        GTask *task;
+
+        guint paused       : 1;
+        guint io_started   : 1;
+        guint async        : 1;
+        guint connect_only : 1;
+        guint priority     : 3;
+        guint resend_count : 5;
+        int io_priority;
+
+        SoupMessageQueueItemState state;
+        SoupMessageQueueItem *related;
+};
+
+SoupMessageQueueItem *soup_message_queue_item_new    (SoupSession          *session,
+                                                      SoupMessage          *msg,
+                                                      gboolean              async,
+                                                      GCancellable         *cancellable,
+                                                      SoupSessionCallback   callback,
+                                                      gpointer              user_data);
+
+SoupMessageQueueItem *soup_message_queue_item_ref    (SoupMessageQueueItem *item);
+void                  soup_message_queue_item_unref  (SoupMessageQueueItem *item);
+void                  soup_message_queue_item_cancel (SoupMessageQueueItem *item);
+
+G_END_DECLS
diff --git a/libsoup/soup-session-private.h b/libsoup/soup-session-private.h
index 69ec8d07..b7f15b9c 100644
--- a/libsoup/soup-session-private.h
+++ b/libsoup/soup-session-private.h
@@ -10,6 +10,10 @@
 
 G_BEGIN_DECLS
 
+typedef void (*SoupSessionCallback) (SoupSession *session,
+                                     SoupMessage *msg,
+                                     gpointer     user_data);
+
 void     soup_session_requeue_message       (SoupSession *session,
                                             SoupMessage *msg);
 void     soup_session_pause_message         (SoupSession *session,
diff --git a/libsoup/soup-session.c b/libsoup/soup-session.c
index 77c1f81c..a2605838 100644
--- a/libsoup/soup-session.c
+++ b/libsoup/soup-session.c
@@ -19,7 +19,7 @@
 #include "soup-connection.h"
 #include "soup-message-private.h"
 #include "soup-misc.h"
-#include "soup-message-queue.h"
+#include "soup-message-queue-item.h"
 #include "soup-session-private.h"
 #include "soup-session-feature-private.h"
 #include "soup-socket-properties.h"
@@ -105,7 +105,7 @@ typedef struct {
 
        SoupSocketProperties *socket_props;
 
-       SoupMessageQueue *queue;
+       GQueue *queue;
        GSource *queue_source;
 
        char *user_agent;
@@ -234,7 +234,7 @@ soup_session_init (SoupSession *session)
        SoupAuthManager *auth_manager;
        SoupMessageQueueSource *source;
 
-       priv->queue = soup_message_queue_new (session);
+       priv->queue = g_queue_new ();
        priv->queue_source = g_source_new (&queue_source_funcs, sizeof (SoupMessageQueueSource));
        source = (SoupMessageQueueSource *)priv->queue_source;
        source->session = session;
@@ -298,7 +298,8 @@ soup_session_finalize (GObject *object)
        SoupSession *session = SOUP_SESSION (object);
        SoupSessionPrivate *priv = soup_session_get_instance_private (session);
 
-       soup_message_queue_destroy (priv->queue);
+       g_warn_if_fail (g_queue_is_empty (priv->queue));
+       g_queue_free (priv->queue);
        g_source_unref (priv->queue_source);
 
        g_hash_table_destroy (priv->http_hosts);
@@ -1063,6 +1064,24 @@ free_host (SoupSessionHost *host)
        g_slice_free (SoupSessionHost, host);
 }
 
+static int
+lookup_message (SoupMessageQueueItem *item,
+               SoupMessage          *msg)
+{
+       return item->msg == msg ? 0 : 1;
+}
+
+static SoupMessageQueueItem *
+soup_session_lookup_queue_item (SoupSession *session,
+                               SoupMessage *msg)
+{
+       SoupSessionPrivate *priv = soup_session_get_instance_private (session);
+       GList *link;
+
+       link = g_queue_find_custom (priv->queue, msg, (GCompareFunc)lookup_message);
+       return link ? (SoupMessageQueueItem *)link->data : NULL;
+}
+
 #define SOUP_SESSION_WOULD_REDIRECT_AS_GET(session, msg) \
        (soup_message_get_status (msg) == SOUP_STATUS_SEE_OTHER || \
         (soup_message_get_status (msg) == SOUP_STATUS_FOUND && \
@@ -1171,11 +1190,8 @@ soup_session_redirect_message (SoupSession *session,
                               SoupMessage *msg,
                               GError     **error)
 {
-       SoupSessionPrivate *priv;
        GUri *new_uri;
        char *host;
-       SoupMessageQueueItem *item;
-       gboolean retval;
 
        g_return_val_if_fail (SOUP_IS_SESSION (session), FALSE);
        g_return_val_if_fail (SOUP_IS_MESSAGE (msg), FALSE);
@@ -1211,12 +1227,9 @@ soup_session_redirect_message (SoupSession *session,
        soup_message_set_uri (msg, new_uri);
        g_uri_unref (new_uri);
 
-       priv = soup_session_get_instance_private (session);
-       item = soup_message_queue_lookup (priv->queue, msg);
-       retval = soup_session_requeue_item (session, item, error);
-       soup_message_queue_item_unref (item);
-
-       return retval;
+       return soup_session_requeue_item (session,
+                                         soup_session_lookup_queue_item (session, msg),
+                                         error);
 }
 
 static void
@@ -1259,6 +1272,13 @@ message_restarted (SoupMessage *msg, gpointer user_data)
        soup_message_cleanup_response (msg);
 }
 
+static int
+compare_queue_item (SoupMessageQueueItem *a,
+                   SoupMessageQueueItem *b)
+{
+       return b->priority - a->priority;
+}
+
 static SoupMessageQueueItem *
 soup_session_append_queue_item (SoupSession        *session,
                                SoupMessage        *msg,
@@ -1274,8 +1294,10 @@ soup_session_append_queue_item (SoupSession        *session,
 
        soup_message_cleanup_response (msg);
 
-       item = soup_message_queue_append (priv->queue, msg, cancellable, callback, user_data);
-       item->async = async;
+       item = soup_message_queue_item_new (session, msg, async, cancellable, callback, user_data);
+       g_queue_insert_sorted (priv->queue,
+                              soup_message_queue_item_ref (item),
+                              (GCompareDataFunc)compare_queue_item, NULL);
 
        host = get_host_for_message (session, item->msg);
        host->num_messages++;
@@ -1296,7 +1318,6 @@ soup_session_append_queue_item (SoupSession        *session,
        }
        g_signal_emit (session, signals[REQUEST_QUEUED], 0, msg);
 
-       soup_message_queue_item_ref (item);
        return item;
 }
 
@@ -1490,7 +1511,7 @@ soup_session_unqueue_item (SoupSession          *session,
                return;
        }
 
-       soup_message_queue_remove (priv->queue, item);
+       g_queue_remove (priv->queue, item);
 
        host = get_host_for_message (session, item->msg);
        host->num_messages--;
@@ -1637,8 +1658,7 @@ tunnel_connect (SoupMessageQueueItem *item)
                                                      item->cancellable,
                                                      NULL, NULL);
        tunnel_item->io_priority = item->io_priority;
-       tunnel_item->related = item;
-       soup_message_queue_item_ref (item);
+       tunnel_item->related = soup_message_queue_item_ref (item);
        soup_session_set_item_connection (session, tunnel_item, item->conn);
        tunnel_item->state = SOUP_MESSAGE_RUNNING;
 
@@ -1810,11 +1830,11 @@ get_connection (SoupMessageQueueItem *item, gboolean *should_cleanup)
        item->state = SOUP_MESSAGE_CONNECTING;
 
        if (item->async) {
-               soup_message_queue_item_ref (item);
                soup_connection_connect_async (item->conn,
                                               item->io_priority,
                                               item->cancellable,
-                                              connect_async_complete, item);
+                                              connect_async_complete,
+                                              soup_message_queue_item_ref (item));
                return FALSE;
        } else {
                GError *error = NULL;
@@ -1915,14 +1935,14 @@ async_run_queue (SoupSession *session)
        SoupMessageQueueItem *item;
        SoupMessage *msg;
        gboolean try_cleanup = TRUE, should_cleanup = FALSE;
+       GList *l;
 
        g_object_ref (session);
        soup_session_cleanup_connections (session, FALSE);
 
  try_again:
-       for (item = soup_message_queue_first (priv->queue);
-            item;
-            item = soup_message_queue_next (priv->queue, item)) {
+       for (l = priv->queue->head; l && l->data; l = g_list_next (l)) {
+               item = (SoupMessageQueueItem *)l->data;
                msg = item->msg;
 
                /* CONNECT messages are handled specially */
@@ -1961,12 +1981,9 @@ void
 soup_session_requeue_message (SoupSession *session,
                              SoupMessage *msg)
 {
-       SoupSessionPrivate *priv = soup_session_get_instance_private (session);
-       SoupMessageQueueItem *item;
+       SoupMessageQueueItem *item = soup_session_lookup_queue_item (session, msg);
 
-       item = soup_message_queue_lookup (priv->queue, msg);
        soup_session_requeue_item (session, item, &item->error);
-       soup_message_queue_item_unref (item);
 }
 
 /**
@@ -1981,21 +1998,18 @@ void
 soup_session_pause_message (SoupSession *session,
                            SoupMessage *msg)
 {
-       SoupSessionPrivate *priv;
        SoupMessageQueueItem *item;
 
        g_return_if_fail (SOUP_IS_SESSION (session));
        g_return_if_fail (SOUP_IS_MESSAGE (msg));
 
-       priv = soup_session_get_instance_private (session);
-       item = soup_message_queue_lookup (priv->queue, msg);
+       item = soup_session_lookup_queue_item (session, msg);
        g_return_if_fail (item != NULL);
        g_return_if_fail (item->async);
 
        item->paused = TRUE;
        if (item->state == SOUP_MESSAGE_RUNNING)
                soup_message_io_pause (msg);
-       soup_message_queue_item_unref (item);
 }
 
 static void
@@ -2022,21 +2036,18 @@ void
 soup_session_unpause_message (SoupSession *session,
                              SoupMessage *msg)
 {
-       SoupSessionPrivate *priv;
        SoupMessageQueueItem *item;
 
        g_return_if_fail (SOUP_IS_SESSION (session));
        g_return_if_fail (SOUP_IS_MESSAGE (msg));
 
-       priv = soup_session_get_instance_private (session);
-       item = soup_message_queue_lookup (priv->queue, msg);
+       item = soup_session_lookup_queue_item (session, msg);
        g_return_if_fail (item != NULL);
        g_return_if_fail (item->async);
 
        item->paused = FALSE;
        if (item->state == SOUP_MESSAGE_RUNNING)
                soup_message_io_unpause (msg);
-       soup_message_queue_item_unref (item);
 
        soup_session_kick_queue (session);
 }
@@ -2045,16 +2056,11 @@ void
 soup_session_cancel_message (SoupSession *session,
                             SoupMessage *msg)
 {
-       SoupSessionPrivate *priv = soup_session_get_instance_private (session);
-       SoupMessageQueueItem *item;
+       SoupMessageQueueItem *item = soup_session_lookup_queue_item (session, msg);
 
-       item = soup_message_queue_lookup (priv->queue, msg);
         /* If the message is already ending, don't do anything */
-       if (!item)
-                return;
-
-       g_cancellable_cancel (item->cancellable);
-       soup_message_queue_item_unref (item);
+       if (item)
+               soup_message_queue_item_cancel (item);
 }
 
 /**
@@ -2072,17 +2078,12 @@ soup_session_abort (SoupSession *session)
        GSList *conns, *c;
        GHashTableIter iter;
        gpointer conn, host;
-        SoupMessageQueueItem *item;
 
        g_return_if_fail (SOUP_IS_SESSION (session));
        priv = soup_session_get_instance_private (session);
 
        /* Cancel everything */
-       for (item = soup_message_queue_first (priv->queue);
-            item;
-            item = soup_message_queue_next (priv->queue, item)) {
-               g_cancellable_cancel (item->cancellable);
-       }
+       g_queue_foreach (priv->queue, (GFunc)soup_message_queue_item_cancel, NULL);
 
        /* Close all idle connections */
        conns = NULL;
@@ -2831,8 +2832,6 @@ send_async_maybe_complete (SoupMessageQueueItem *item,
 
                g_object_set_data (G_OBJECT (ostream), "istream", stream);
 
-               /* Give the splice op its own ref on item */
-               soup_message_queue_item_ref (item);
                /* We don't use CLOSE_SOURCE because we need to control when the
                 * side effects of closing the SoupClientInputStream happen.
                 */
@@ -2840,7 +2839,8 @@ send_async_maybe_complete (SoupMessageQueueItem *item,
                                              G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET,
                                              item->io_priority,
                                              item->cancellable,
-                                             send_async_spliced, item);
+                                             send_async_spliced,
+                                             soup_message_queue_item_ref (item));
                return;
        }
 
@@ -3055,8 +3055,7 @@ async_respond_from_cache (SoupSession          *session,
                data = g_slice_new0 (AsyncCacheConditionalData);
                data->cache = g_object_ref (cache);
                data->conditional_msg = conditional_msg;
-               data->item = item;
-               soup_message_queue_item_ref (item);
+               data->item = soup_message_queue_item_ref (item);
                soup_message_disable_feature (conditional_msg, SOUP_TYPE_CACHE);
                soup_session_send_async (session, conditional_msg,
                                         item->io_priority,
@@ -3816,19 +3815,16 @@ static GIOStream *
 soup_session_steal_connection (SoupSession *session,
                               SoupMessage *msg)
 {
-       SoupSessionPrivate *priv = soup_session_get_instance_private (session);
        SoupMessageQueueItem *item;
        GIOStream *stream = NULL;
 
-       item = soup_message_queue_lookup (priv->queue, msg);
+       item = soup_session_lookup_queue_item (session, msg);
        if (!item)
                return NULL;
 
        if (item->conn && soup_connection_get_state (item->conn) == SOUP_CONNECTION_IN_USE)
                stream = steal_connection (session, item);
 
-       soup_message_queue_item_unref (item);
-
        return stream;
 }
 
@@ -4009,20 +4005,14 @@ SoupMessage *
 soup_session_get_original_message_for_authentication (SoupSession *session,
                                                      SoupMessage *msg)
 {
-       SoupSessionPrivate *priv = soup_session_get_instance_private (session);
        SoupMessageQueueItem *item;
-       SoupMessage *original_msg;
 
-       item = soup_message_queue_lookup (priv->queue, msg);
+       item = soup_session_lookup_queue_item (session, msg);
        if (!item)
                 return msg;
 
-       if (soup_message_get_method (msg) != SOUP_METHOD_CONNECT) {
-               soup_message_queue_item_unref (item);
+       if (soup_message_get_method (msg) != SOUP_METHOD_CONNECT)
                return msg;
-       }
 
-       original_msg = item->related ? item->related->msg : msg;
-       soup_message_queue_item_unref (item);
-       return original_msg;
+       return item->related ? item->related->msg : msg;
 }


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]