[libsoup/carlosgc/message-requeue] Do not allow to queue the same message more twice




commit d9e616819826d61fc6a65f043628020a50b2b6f6
Author: Carlos Garcia Campos <cgarcia igalia com>
Date:   Fri Jun 11 15:13:00 2021 +0200

    Do not allow to queue the same message more twice
    
    Add a new error to indicate the message is already in the session queue.

 libsoup/soup-message-queue-item.c |  3 +-
 libsoup/soup-session.c            | 62 +++++++++++++++++++++++++++++++++++----
 libsoup/soup-session.h            |  3 +-
 tests/misc-test.c                 | 53 +++++++++++++++++++++++++++++++++
 tests/test-utils.c                | 16 +++++-----
 5 files changed, 122 insertions(+), 15 deletions(-)
---
diff --git a/libsoup/soup-message-queue-item.c b/libsoup/soup-message-queue-item.c
index c6e28eff..d674799a 100644
--- a/libsoup/soup-message-queue-item.c
+++ b/libsoup/soup-message-queue-item.c
@@ -41,7 +41,8 @@ soup_message_queue_item_ref (SoupMessageQueueItem *item)
 static void
 soup_message_queue_item_destroy (SoupMessageQueueItem *item)
 {
-        g_warn_if_fail (soup_message_get_connection (item->msg) == NULL);
+        if (!g_error_matches (item->error, SOUP_SESSION_ERROR, SOUP_SESSION_ERROR_MESSAGE_ALREADY_IN_QUEUE))
+                g_warn_if_fail (soup_message_get_connection (item->msg) == NULL);
 
         g_object_unref (item->session);
         g_object_unref (item->msg);
diff --git a/libsoup/soup-session.c b/libsoup/soup-session.c
index 7380ff0a..4789fc63 100644
--- a/libsoup/soup-session.c
+++ b/libsoup/soup-session.c
@@ -200,6 +200,8 @@ static GParamSpec *properties[LAST_PROPERTY] = { NULL, };
  *   Location header was missing or empty in response
  * @SOUP_SESSION_ERROR_REDIRECT_BAD_URI: failed to redirect message because
  *   Location header contains an invalid URI
+ * @SOUP_SESSION_ERROR_MESSAGE_ALREADY_IN_QUEUE: the message is already in the
+ *   session queue. Messages can only be reused after unqueued.
  *
  * A #SoupSession error.
  */
@@ -3290,6 +3292,35 @@ async_respond_from_cache (SoupSession          *session,
                return FALSE;
 }
 
+static gboolean
+soup_session_return_error_if_message_already_in_queue (SoupSession         *session,
+                                                       SoupMessage         *msg,
+                                                       GCancellable        *cancellable,
+                                                       GAsyncReadyCallback  callback,
+                                                       gpointer             user_data)
+{
+        SoupMessageQueueItem *item;
+        GTask *task;
+
+        if (!soup_session_lookup_queue_item (session, msg))
+                return FALSE;
+
+        /* Set a new SoupMessageQueueItem in finished state as task data for
+         * soup_session_get_async_result_message() and soup_session_send_finish().
+         */
+        item = soup_message_queue_item_new (session, msg, TRUE, cancellable);
+        item->state = SOUP_MESSAGE_FINISHED;
+        item->error = g_error_new_literal (SOUP_SESSION_ERROR,
+                                           SOUP_SESSION_ERROR_MESSAGE_ALREADY_IN_QUEUE,
+                                           _("Message is already in session queue"));
+        task = g_task_new (session, cancellable, callback, user_data);
+        g_task_set_task_data (task, item, (GDestroyNotify)soup_message_queue_item_unref);
+        g_task_return_error (task, g_error_copy (item->error));
+        g_object_unref (task);
+
+        return TRUE;
+}
+
 /**
  * soup_session_send_async:
  * @session: a #SoupSession
@@ -3320,6 +3351,9 @@ soup_session_send_async (SoupSession         *session,
 
        g_return_if_fail (SOUP_IS_SESSION (session));
 
+        if (soup_session_return_error_if_message_already_in_queue (session, msg, cancellable, callback, 
user_data))
+            return;
+
        item = soup_session_append_queue_item (session, msg, TRUE, cancellable);
        item->io_priority = io_priority;
        g_signal_connect (msg, "restarted",
@@ -3364,13 +3398,15 @@ soup_session_send_finish (SoupSession   *session,
        if (g_task_had_error (task)) {
                SoupMessageQueueItem *item = g_task_get_task_data (task);
 
-               if (soup_message_io_in_progress (item->msg))
-                       soup_message_io_finished (item->msg);
-               else if (item->state != SOUP_MESSAGE_FINISHED)
-                       item->state = SOUP_MESSAGE_FINISHING;
+                if (!g_error_matches (item->error, SOUP_SESSION_ERROR, 
SOUP_SESSION_ERROR_MESSAGE_ALREADY_IN_QUEUE)) {
+                        if (soup_message_io_in_progress (item->msg))
+                                soup_message_io_finished (item->msg);
+                        else if (item->state != SOUP_MESSAGE_FINISHED)
+                                item->state = SOUP_MESSAGE_FINISHING;
 
-               if (item->state != SOUP_MESSAGE_FINISHED)
-                       soup_session_process_queue_item (session, item, NULL, FALSE);
+                        if (item->state != SOUP_MESSAGE_FINISHED)
+                                soup_session_process_queue_item (session, item, NULL, FALSE);
+                }
        }
 
        return g_task_propagate_pointer (task, error);
@@ -3421,6 +3457,14 @@ soup_session_send (SoupSession   *session,
 
        g_return_val_if_fail (SOUP_IS_SESSION (session), NULL);
 
+        if (soup_session_lookup_queue_item (session, msg)) {
+                g_set_error_literal (error,
+                                     SOUP_SESSION_ERROR,
+                                     SOUP_SESSION_ERROR_MESSAGE_ALREADY_IN_QUEUE,
+                                     _("Message is already in session queue"));
+                return NULL;
+        }
+
        item = soup_session_append_queue_item (session, msg, FALSE, cancellable);
 
        while (!stream) {
@@ -3869,6 +3913,9 @@ soup_session_websocket_connect_async (SoupSession          *session,
        g_return_if_fail (SOUP_IS_SESSION (session));
        g_return_if_fail (SOUP_IS_MESSAGE (msg));
 
+        if (soup_session_return_error_if_message_already_in_queue (session, msg, cancellable, callback, 
user_data))
+                return;
+
        supported_extensions = soup_session_get_supported_websocket_extensions_for_message (session, msg);
        soup_websocket_client_prepare_handshake (msg, origin, protocols, supported_extensions);
 
@@ -3987,6 +4034,9 @@ soup_session_preconnect_async (SoupSession        *session,
         g_return_if_fail (SOUP_IS_SESSION (session));
         g_return_if_fail (SOUP_IS_MESSAGE (msg));
 
+        if (soup_session_return_error_if_message_already_in_queue (session, msg, cancellable, callback, 
user_data))
+                return;
+
         item = soup_session_append_queue_item (session, msg, TRUE, cancellable);
         item->connect_only = TRUE;
         item->io_priority = io_priority;
diff --git a/libsoup/soup-session.h b/libsoup/soup-session.h
index e8457aaa..053cc39c 100644
--- a/libsoup/soup-session.h
+++ b/libsoup/soup-session.h
@@ -43,7 +43,8 @@ typedef enum {
        SOUP_SESSION_ERROR_TOO_MANY_REDIRECTS,
        SOUP_SESSION_ERROR_TOO_MANY_RESTARTS,
        SOUP_SESSION_ERROR_REDIRECT_NO_LOCATION,
-       SOUP_SESSION_ERROR_REDIRECT_BAD_URI
+       SOUP_SESSION_ERROR_REDIRECT_BAD_URI,
+        SOUP_SESSION_ERROR_MESSAGE_ALREADY_IN_QUEUE,
 } SoupSessionError;
 
 SOUP_AVAILABLE_IN_ALL
diff --git a/tests/misc-test.c b/tests/misc-test.c
index 3d2e6808..efbe2798 100644
--- a/tests/misc-test.c
+++ b/tests/misc-test.c
@@ -255,6 +255,22 @@ reuse_test_authenticate (SoupMessage *msg,
        return TRUE;
 }
 
+static void
+reuse_preconnect_finished (SoupSession   *session,
+                           GAsyncResult  *result,
+                           GError       **error)
+{
+        g_assert_false (soup_session_preconnect_finish (session, result, error));
+}
+
+static void
+reuse_websocket_connect_finished (SoupSession   *session,
+                                  GAsyncResult  *result,
+                                  GError       **error)
+{
+        g_assert_false (soup_session_websocket_connect_finish (session, result, error));
+}
+
 static void
 do_msg_reuse_test (void)
 {
@@ -263,6 +279,8 @@ do_msg_reuse_test (void)
         GBytes *body;
        GUri *uri;
        guint *signal_ids, n_signal_ids;
+        GInputStream *stream;
+        GError *error = NULL;
 
        g_test_bug ("559054");
 
@@ -303,6 +321,41 @@ do_msg_reuse_test (void)
        ensure_no_signal_handlers (msg, signal_ids, n_signal_ids);
         g_bytes_unref (body);
 
+        debug_printf (1, "  Reuse before finishing\n");
+        msg = soup_message_new_from_uri ("GET", base_uri);
+        stream = soup_test_request_send (session, msg, NULL, 0, &error);
+        g_assert_no_error (error);
+        g_assert_null (soup_test_request_send (session, msg, NULL, 0, &error));
+        g_assert_error (error, SOUP_SESSION_ERROR, SOUP_SESSION_ERROR_MESSAGE_ALREADY_IN_QUEUE);
+        g_clear_error (&error);
+        g_assert_null (soup_test_session_async_send (session, msg, NULL, &error));
+        g_assert_error (error, SOUP_SESSION_ERROR, SOUP_SESSION_ERROR_MESSAGE_ALREADY_IN_QUEUE);
+        g_clear_error (&error);
+        g_assert_null (soup_session_send (session, msg, NULL, &error));
+        g_assert_error (error, SOUP_SESSION_ERROR, SOUP_SESSION_ERROR_MESSAGE_ALREADY_IN_QUEUE);
+        g_clear_error (&error);
+        g_assert_null (soup_session_send_and_read (session, msg, NULL, &error));
+        g_assert_error (error, SOUP_SESSION_ERROR, SOUP_SESSION_ERROR_MESSAGE_ALREADY_IN_QUEUE);
+        g_clear_error (&error);
+        soup_session_preconnect_async (session, msg, G_PRIORITY_DEFAULT, NULL,
+                                       (GAsyncReadyCallback)reuse_preconnect_finished, &error);
+        while (error == NULL)
+                g_main_context_iteration (NULL, TRUE);
+        g_assert_error (error, SOUP_SESSION_ERROR, SOUP_SESSION_ERROR_MESSAGE_ALREADY_IN_QUEUE);
+        g_clear_error (&error);
+        soup_session_websocket_connect_async (session, msg, NULL, NULL, G_PRIORITY_DEFAULT, NULL,
+                                              (GAsyncReadyCallback)reuse_websocket_connect_finished, &error);
+        while (error == NULL)
+                g_main_context_iteration (NULL, TRUE);
+        g_assert_error (error, SOUP_SESSION_ERROR, SOUP_SESSION_ERROR_MESSAGE_ALREADY_IN_QUEUE);
+        g_clear_error (&error);
+        g_object_unref (stream);
+
+        while (g_main_context_pending (NULL))
+                g_main_context_iteration (NULL, FALSE);
+
+        ensure_no_signal_handlers (msg, signal_ids, n_signal_ids);
+
        soup_test_session_abort_unref (session);
        g_object_unref (msg);
        g_free (signal_ids);
diff --git a/tests/test-utils.c b/tests/test-utils.c
index e57907ed..89782ef5 100644
--- a/tests/test-utils.c
+++ b/tests/test-utils.c
@@ -379,6 +379,7 @@ typedef struct {
        GBytes *body;
        GError *error;
        gboolean done;
+        gboolean message_finished;
 } SendAsyncData;
 
 static void
@@ -389,13 +390,15 @@ send_and_read_async_ready_cb (SoupSession   *session,
        data->done = TRUE;
        g_assert_true (soup_session_get_async_result_message (session, result) == data->msg);
        data->body = soup_session_send_and_read_finish (session, result, &data->error);
+        if (g_error_matches (data->error, SOUP_SESSION_ERROR, SOUP_SESSION_ERROR_MESSAGE_ALREADY_IN_QUEUE))
+                data->message_finished = TRUE;
 }
 
 static void
-on_message_finished (SoupMessage *msg,
-                    gboolean    *message_finished)
+on_message_finished (SoupMessage   *msg,
+                     SendAsyncData *data)
 {
-        *message_finished = TRUE;
+        data->message_finished = TRUE;
 }
 
 GBytes *
@@ -404,18 +407,17 @@ soup_test_session_async_send (SoupSession  *session,
                              GCancellable *cancellable,
                              GError      **error)
 {
-       gboolean message_finished = FALSE;
        GMainContext *async_context = g_main_context_ref_thread_default ();
        gulong signal_id;
-       SendAsyncData data = { msg, NULL, NULL, FALSE };
+       SendAsyncData data = { msg, NULL, NULL, FALSE, FALSE };
 
        signal_id = g_signal_connect (msg, "finished",
-                                     G_CALLBACK (on_message_finished), &message_finished);
+                                      G_CALLBACK (on_message_finished), &data);
 
        soup_session_send_and_read_async (session, msg, G_PRIORITY_DEFAULT, cancellable,
                                          (GAsyncReadyCallback)send_and_read_async_ready_cb, &data);
 
-       while (!data.done || !message_finished)
+       while (!data.done || !data.message_finished)
                g_main_context_iteration (async_context, TRUE);
 
        g_signal_handler_disconnect (msg, signal_id);


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]