[libsoup/carlosgc/requeue-private: 2/7] session: use gio async style for soup_message_io_run_until_read




commit 26962e90a703e0c69afde169eefd3fcbf246ff24
Author: Carlos Garcia Campos <cgarcia igalia com>
Date:   Wed Sep 30 10:12:18 2020 +0200

    session: use gio async style for soup_message_io_run_until_read
    
    Make soup_message_io_run_until_read the sync one and add
    soup_message_io_run_until_read_async and
    soup_message_io_run_until_read_finish. The async version handles would
    block error and all other error handling has been moved to
    soup-message-io too, since it's shared with io_run.

 libsoup/soup-message-io.c      | 129 +++++++++++++++++++++++++++++++++++------
 libsoup/soup-message-private.h |  31 ++++++----
 libsoup/soup-message-queue.c   |   4 --
 libsoup/soup-message-queue.h   |   1 -
 libsoup/soup-session.c         |  83 ++++++++------------------
 5 files changed, 152 insertions(+), 96 deletions(-)
---
diff --git a/libsoup/soup-message-io.c b/libsoup/soup-message-io.c
index f9da4a9b..26f6a58a 100644
--- a/libsoup/soup-message-io.c
+++ b/libsoup/soup-message-io.c
@@ -1035,6 +1035,26 @@ io_run_until (SoupMessage *msg, gboolean blocking,
        return done;
 }
 
+static void
+soup_message_io_update_status (SoupMessage  *msg,
+                              GError       *error)
+{
+       if (g_error_matches (error, SOUP_HTTP_ERROR, SOUP_STATUS_TRY_AGAIN)) {
+               SoupMessageIOData *io = soup_message_get_io_data (msg);
+
+               io->item->state = SOUP_MESSAGE_RESTARTING;
+       } else if (error->domain == G_TLS_ERROR) {
+               soup_message_set_status_full (msg,
+                                             SOUP_STATUS_SSL_FAILED,
+                                             error->message);
+       } else if (!SOUP_STATUS_IS_TRANSPORT_ERROR (msg->status_code) &&
+                  !g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
+               soup_message_set_status (msg, SOUP_STATUS_IO_ERROR);
+       }
+
+       soup_message_io_finished (msg);
+}
+
 static gboolean
 io_run_ready (SoupMessage *msg, gpointer user_data)
 {
@@ -1067,21 +1087,13 @@ io_run (SoupMessage *msg, gboolean blocking)
                g_clear_error (&error);
                io->io_source = soup_message_io_get_source (msg, NULL, io_run_ready, msg);
                g_source_attach (io->io_source, io->async_context);
-       } else if (error && soup_message_get_io_data (msg) == io) {
-               if (g_error_matches (error, SOUP_HTTP_ERROR, SOUP_STATUS_TRY_AGAIN))
-                       io->item->state = SOUP_MESSAGE_RESTARTING;
-               else if (error->domain == G_TLS_ERROR) {
-                       soup_message_set_status_full (msg,
-                                                     SOUP_STATUS_SSL_FAILED,
-                                                     error->message);
-               } else if (!SOUP_STATUS_IS_TRANSPORT_ERROR (msg->status_code))
-                       soup_message_set_status (msg, SOUP_STATUS_IO_ERROR);
-
-               g_error_free (error);
-               soup_message_io_finished (msg);
-       } else if (error)
+       } else {
+               if (soup_message_get_io_data (msg) == io)
+                       soup_message_io_update_status (msg, error);
                g_error_free (error);
 
+       }
+
        g_object_unref (msg);
        g_clear_object (&cancellable);
 }
@@ -1097,13 +1109,92 @@ soup_message_io_run_until_write (SoupMessage *msg, gboolean blocking,
 }
 
 gboolean
-soup_message_io_run_until_read (SoupMessage *msg, gboolean blocking,
-                               GCancellable *cancellable, GError **error)
+soup_message_io_run_until_read (SoupMessage  *msg,
+                               GCancellable *cancellable,
+                               GError      **error)
 {
-       return io_run_until (msg, blocking,
-                            SOUP_MESSAGE_IO_STATE_BODY,
-                            SOUP_MESSAGE_IO_STATE_ANY,
-                            cancellable, error);
+       SoupMessageIOData *io = soup_message_get_io_data (msg);
+
+       if (io_run_until (msg, TRUE,
+                         SOUP_MESSAGE_IO_STATE_BODY,
+                         SOUP_MESSAGE_IO_STATE_ANY,
+                         cancellable, error))
+               return TRUE;
+
+       if (soup_message_get_io_data (msg) == io)
+               soup_message_io_update_status (msg, *error);
+
+       return FALSE;
+}
+
+static void io_run_until_read_async (SoupMessage *msg,
+                                     GTask       *task);
+
+static gboolean
+io_run_until_read_ready (SoupMessage *msg,
+                         gpointer     user_data)
+{
+        GTask *task = user_data;
+
+        io_run_until_read_async (msg, task);
+        return FALSE;
+}
+
+static void
+io_run_until_read_async (SoupMessage *msg,
+                         GTask       *task)
+{
+        SoupMessageIOData *io = soup_message_get_io_data (msg);
+        GError *error = NULL;
+
+        if (io->io_source) {
+                g_source_destroy (io->io_source);
+                g_source_unref (io->io_source);
+                io->io_source = NULL;
+        }
+
+        if (io_run_until (msg, FALSE,
+                          SOUP_MESSAGE_IO_STATE_BODY,
+                          SOUP_MESSAGE_IO_STATE_ANY,
+                          g_task_get_cancellable (task),
+                          &error)) {
+                g_task_return_boolean (task, TRUE);
+                g_object_unref (task);
+                return;
+        }
+
+        if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
+                g_error_free (error);
+                io->io_source = soup_message_io_get_source (msg, NULL, io_run_until_read_ready, task);
+                g_source_attach (io->io_source, io->async_context);
+                return;
+        }
+
+        if (soup_message_get_io_data (msg) == io)
+                soup_message_io_update_status (msg, error);
+
+        g_task_return_error (task, error);
+        g_object_unref (task);
+}
+
+void
+soup_message_io_run_until_read_async (SoupMessage        *msg,
+                                      GCancellable       *cancellable,
+                                      GAsyncReadyCallback callback,
+                                      gpointer            user_data)
+{
+        GTask *task;
+
+        task = g_task_new (msg, cancellable, callback, user_data);
+        io_run_until_read_async (msg, task);
+}
+
+gboolean
+soup_message_io_run_until_read_finish (SoupMessage  *msg,
+                                       GAsyncResult *result,
+                                       GError      **error)
+{
+        return g_task_propagate_boolean (G_TASK (result), error);
 }
 
 gboolean
diff --git a/libsoup/soup-message-private.h b/libsoup/soup-message-private.h
index f9ed82d0..adaaf048 100644
--- a/libsoup/soup-message-private.h
+++ b/libsoup/soup-message-private.h
@@ -109,18 +109,25 @@ gboolean   soup_message_io_in_progress (SoupMessage *msg);
 GIOStream *soup_message_io_steal       (SoupMessage *msg);
 
 
-gboolean soup_message_io_run_until_write  (SoupMessage   *msg,
-                                          gboolean       blocking,
-                                          GCancellable  *cancellable,
-                                          GError       **error);
-gboolean soup_message_io_run_until_read   (SoupMessage   *msg,
-                                          gboolean       blocking,
-                                          GCancellable  *cancellable,
-                                          GError       **error);
-gboolean soup_message_io_run_until_finish (SoupMessage   *msg,
-                                          gboolean       blocking,
-                                          GCancellable  *cancellable,
-                                          GError       **error);
+gboolean soup_message_io_run_until_write       (SoupMessage        *msg,
+                                                gboolean            blocking,
+                                                GCancellable       *cancellable,
+                                                GError            **error);
+gboolean soup_message_io_run_until_finish      (SoupMessage        *msg,
+                                                gboolean            blocking,
+                                                GCancellable       *cancellable,
+                                                GError            **error);
+
+gboolean soup_message_io_run_until_read        (SoupMessage        *msg,
+                                                GCancellable       *cancellable,
+                                                GError            **error);
+void     soup_message_io_run_until_read_async  (SoupMessage        *msg,
+                                                GCancellable       *cancellable,
+                                                GAsyncReadyCallback callback,
+                                                gpointer            user_data);
+gboolean soup_message_io_run_until_read_finish (SoupMessage        *msg,
+                                                GAsyncResult       *result,
+                                                GError            **error);
 
 typedef gboolean (*SoupMessageSourceFunc) (SoupMessage *, gpointer);
 GSource *soup_message_io_get_source       (SoupMessage           *msg,
diff --git a/libsoup/soup-message-queue.c b/libsoup/soup-message-queue.c
index 815fbb79..663061a8 100644
--- a/libsoup/soup-message-queue.c
+++ b/libsoup/soup-message-queue.c
@@ -188,10 +188,6 @@ soup_message_queue_item_unref (SoupMessageQueueItem *item)
        g_clear_error (&item->error);
        g_clear_object (&item->task);
        g_clear_pointer (&item->async_context, g_main_context_unref);
-       if (item->io_source) {
-               g_source_destroy (item->io_source);
-               g_source_unref (item->io_source);
-       }
        g_slice_free (SoupMessageQueueItem, item);
 }
 
diff --git a/libsoup/soup-message-queue.h b/libsoup/soup-message-queue.h
index 2a002c6a..cc4c3ba8 100644
--- a/libsoup/soup-message-queue.h
+++ b/libsoup/soup-message-queue.h
@@ -46,7 +46,6 @@ struct _SoupMessageQueueItem {
 
        SoupConnection *conn;
        GTask *task;
-       GSource *io_source;
 
        guint paused            : 1;
        guint new_api           : 1;
diff --git a/libsoup/soup-session.c b/libsoup/soup-session.c
index 7a71324b..df61b3bb 100644
--- a/libsoup/soup-session.c
+++ b/libsoup/soup-session.c
@@ -3175,11 +3175,6 @@ async_send_request_return_result (SoupMessageQueueItem *item,
        task = item->task;
        item->task = NULL;
 
-       if (item->io_source) {
-               g_source_destroy (item->io_source);
-               g_clear_pointer (&item->io_source, g_source_unref);
-       }
-
        if (error)
                g_task_return_error (task, error);
        else if (item->error) {
@@ -3312,67 +3307,43 @@ send_async_maybe_complete (SoupMessageQueueItem *item,
        async_send_request_return_result (item, stream, NULL);
 }
 
-static void try_run_until_read (SoupMessageQueueItem *item);
-
-static gboolean
-read_ready_cb (SoupMessage *msg, gpointer user_data)
-{
-       SoupMessageQueueItem *item = user_data;
-
-       g_clear_pointer (&item->io_source, g_source_unref);
-       try_run_until_read (item);
-       return FALSE;
-}
-
 static void
-try_run_until_read (SoupMessageQueueItem *item)
+run_until_read_done (SoupMessage          *msg,
+                    GAsyncResult         *result,
+                    SoupMessageQueueItem *item)
 {
-       GError *error = NULL;
        GInputStream *stream = NULL;
+       GError *error = NULL;
 
-       if (soup_message_io_run_until_read (item->msg, FALSE, item->cancellable, &error))
-               stream = soup_message_io_get_response_istream (item->msg, &error);
-       if (stream) {
-               send_async_maybe_complete (item, stream);
+       soup_message_io_run_until_read_finish (msg, result, &error);
+       if (g_error_matches (error, SOUP_HTTP_ERROR, SOUP_STATUS_TRY_AGAIN))
                return;
-       }
 
-       if (g_error_matches (error, SOUP_HTTP_ERROR, SOUP_STATUS_TRY_AGAIN)) {
-               item->state = SOUP_MESSAGE_RESTARTING;
-               soup_message_io_finished (item->msg);
-               g_error_free (error);
-               return;
-       }
+       if (!error)
+               stream = soup_message_io_get_response_istream (msg, &error);
 
-       if (!g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
-               if (error->domain == G_TLS_ERROR) {
-                        soup_message_set_status_full (item->msg,
-                                                      SOUP_STATUS_SSL_FAILED,
-                                                      error->message);
-               }
-               if (item->state != SOUP_MESSAGE_FINISHED) {
-                       if (soup_message_io_in_progress (item->msg))
-                               soup_message_io_finished (item->msg);
-                       item->state = SOUP_MESSAGE_FINISHING;
-                       soup_session_process_queue_item (item->session, item, NULL, FALSE);
-               }
-               async_send_request_return_result (item, NULL, error);
-               return;
+       if (stream) {
+               send_async_maybe_complete (item, stream);
+               return;
        }
 
-       g_clear_error (&error);
-       item->io_source = soup_message_io_get_source (item->msg, item->cancellable,
-                                                     read_ready_cb, item);
-
-        SoupSessionPrivate *priv = soup_session_get_instance_private (item->session);
-       g_source_attach (item->io_source, priv->async_context);
+       if (item->state != SOUP_MESSAGE_FINISHED) {
+               if (soup_message_io_in_progress (msg))
+                       soup_message_io_finished (msg);
+               item->state = SOUP_MESSAGE_FINISHING;
+               soup_session_process_queue_item (item->session, item, NULL, FALSE);
+       }
+       async_send_request_return_result (item, NULL, error);
 }
 
 static void
 async_send_request_running (SoupSession *session, SoupMessageQueueItem *item)
 {
        item->io_started = TRUE;
-       try_run_until_read (item);
+       soup_message_io_run_until_read_async (item->msg,
+                                             item->cancellable,
+                                             (GAsyncReadyCallback)run_until_read_done,
+                                             item);
 }
 
 static void
@@ -3718,19 +3689,11 @@ soup_session_send (SoupSession   *session,
                        break;
 
                /* Send request, read headers */
-               if (!soup_message_io_run_until_read (msg, TRUE, item->cancellable, &my_error)) {
+               if (!soup_message_io_run_until_read (msg, item->cancellable, &my_error)) {
                        if (g_error_matches (my_error, SOUP_HTTP_ERROR, SOUP_STATUS_TRY_AGAIN)) {
-                               item->state = SOUP_MESSAGE_RESTARTING;
-                               soup_message_io_finished (item->msg);
                                g_clear_error (&my_error);
                                continue;
                        }
-
-                       if (my_error->domain == G_TLS_ERROR) {
-                               soup_message_set_status_full (msg,
-                                                             SOUP_STATUS_SSL_FAILED,
-                                                             my_error->message);
-                       }
                        break;
                }
 


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]