[libsoup] io-http2: ensure we stop polling the network input stream after the decoded data stream is created



commit 69258b5739fea9ae165d1911999525b9e39efe14
Author: Carlos Garcia Campos <cgarcia igalia com>
Date:   Thu May 27 18:10:04 2021 +0200

    io-http2: ensure we stop polling the network input stream after the decoded data stream is created
    
    In some cases we might end up polling the network input forever, if
    a message read iteration handles the end of another message and there's
    nothing more to read from the network. In those cases we should make
    sure we stop polling the network to poll the decoded data stream
    instead.

 libsoup/http2/soup-client-message-io-http2.c | 106 ++++++++++++++++-----------
 1 file changed, 65 insertions(+), 41 deletions(-)
---
diff --git a/libsoup/http2/soup-client-message-io-http2.c b/libsoup/http2/soup-client-message-io-http2.c
index 15794c89..686ab864 100644
--- a/libsoup/http2/soup-client-message-io-http2.c
+++ b/libsoup/http2/soup-client-message-io-http2.c
@@ -94,6 +94,8 @@ typedef struct {
         GCancellable *cancellable;
         GInputStream *decoded_data_istream;
         GInputStream *body_istream;
+        GTask *task;
+        gboolean in_run_until_read_async;
 
         /* Request body logger */
         SoupLogger *logger;
@@ -120,10 +122,12 @@ typedef struct {
         gboolean can_be_restarted;
 } SoupHTTP2MessageData;
 
+static void io_run_until_read_async (SoupHTTP2MessageData *data);
 static gboolean io_read (SoupClientMessageIOHTTP2 *, gboolean, GCancellable *, GError **);
 static void io_write_until_stream_reset_is_sent (SoupHTTP2MessageData *data);
 static void io_idle_read (SoupClientMessageIOHTTP2 *io);
 static void io_close (SoupClientMessageIOHTTP2 *io);
+static void io_poll (SoupHTTP2MessageData *data);
 
 static void
 NGCHECK (int return_code)
@@ -592,10 +596,23 @@ on_stream_close_callback (nghttp2_session *session,
                           void            *user_data)
 {
         SoupHTTP2MessageData *data = nghttp2_session_get_stream_user_data (session, stream_id);
+
         h2_debug (user_data, data, "[SESSION] Closed: %s", nghttp2_http2_strerror (error_code));
-        if (error_code == NGHTTP2_REFUSED_STREAM && data && data->state < STATE_READ_DATA)
+        if (!data)
+                return 0;
+
+        if (error_code == NGHTTP2_REFUSED_STREAM && data->state < STATE_READ_DATA)
                 data->can_be_restarted = TRUE;
 
+        if (data->state < STATE_READ_DATA && !data->in_run_until_read_async) {
+                /* Start polling the decoded data stream instead of the network input stream. */
+                if (data->io_source) {
+                        g_source_destroy (data->io_source);
+                        g_clear_pointer (&data->io_source, g_source_unref);
+                }
+                io_poll (data);
+        }
+
         return 0;
 }
 
@@ -1114,14 +1131,27 @@ message_source_check (GSource *source)
         return FALSE;
 }
 
-static GSource *
-soup_client_message_io_http2_get_source (SoupHTTP2MessageData    *data,
-                                         SoupMessage             *msg,
-                                         GCancellable            *cancellable,
-                                         SoupMessageIOSourceFunc  callback,
-                                         gpointer                 user_data)
+static gboolean
+io_poll_ready (SoupMessage *msg,
+               gpointer     user_data)
+{
+        SoupHTTP2MessageData *data = user_data;
+
+        io_run_until_read_async (data);
+
+        return G_SOURCE_REMOVE;
+}
+
+static void
+io_poll (SoupHTTP2MessageData *data)
 {
         GSource *base_source;
+        GCancellable *cancellable;
+
+        g_assert (data->task);
+        g_assert (!data->io_source);
+
+        cancellable = g_task_get_cancellable (data->task);
 
         /* TODO: Handle mixing writes in? */
         if (data->paused)
@@ -1139,11 +1169,14 @@ soup_client_message_io_http2_get_source (SoupHTTP2MessageData    *data,
                 base_source = g_timeout_source_new (0);
         }
 
-        GSource *source = soup_message_io_source_new (base_source, G_OBJECT (msg), data->paused, 
message_source_check);
-        g_source_set_callback (source, (GSourceFunc)callback, user_data, NULL);
-        return source;
+        data->io_source = soup_message_io_source_new (base_source, G_OBJECT (data->msg),
+                                                      data->paused, message_source_check);
+        g_source_set_callback (data->io_source, (GSourceFunc)io_poll_ready, data, NULL);
+        g_source_set_priority (data->io_source, g_task_get_priority (data->task));
+        g_source_attach (data->io_source, data->io->async_context);
 }
 
+
 static void
 client_stream_eof (SoupClientInputStream *stream,
                    gpointer               user_data)
@@ -1461,26 +1494,11 @@ soup_client_message_io_http2_run (SoupClientMessageIO *iface,
         g_assert_not_reached ();
 }
 
-static void io_run_until_read_async (SoupMessage *msg,
-                                     GTask       *task);
-
-static gboolean
-io_run_until_read_ready (SoupMessage *msg,
-                         gpointer     user_data)
-{
-        GTask *task = user_data;
-
-        io_run_until_read_async (msg, task);
-
-        return G_SOURCE_REMOVE;
-}
-
 static void
-io_run_until_read_async (SoupMessage *msg,
-                         GTask       *task)
+io_run_until_read_async (SoupHTTP2MessageData *data)
 {
-        SoupClientMessageIOHTTP2 *io = get_io_data (msg);
-        SoupHTTP2MessageData *data = get_data_for_message (io, msg);
+        SoupClientMessageIOHTTP2 *io = data->io;
+        GTask *task = data->task;
         GError *error = NULL;
 
         if (data->io_source) {
@@ -1488,10 +1506,15 @@ io_run_until_read_async (SoupMessage *msg,
                 g_clear_pointer (&data->io_source, g_source_unref);
         }
 
-        if (io_run_until (io, msg, FALSE,
+        data->in_run_until_read_async = TRUE;
+
+        if (io_run_until (io, data->msg, FALSE,
                           STATE_READ_DATA,
                           g_task_get_cancellable (task),
                           &error)) {
+                data->task = NULL;
+                data->in_run_until_read_async = FALSE;
+
                 g_task_return_boolean (task, TRUE);
                 g_object_unref (task);
                 return;
@@ -1499,23 +1522,23 @@ io_run_until_read_async (SoupMessage *msg,
 
         if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
                 g_error_free (error);
-                data->io_source = soup_client_message_io_http2_get_source (data, msg, g_task_get_cancellable 
(task),
-                                                                           
(SoupMessageIOSourceFunc)io_run_until_read_ready,
-                                                                           task);
-               g_source_set_priority (data->io_source, g_task_get_priority (task));
-                g_source_attach (data->io_source, io->async_context);
+                io_poll (data);
+                data->in_run_until_read_async = FALSE;
                 return;
         }
 
-        if (get_io_data (msg) == io) {
+        if (get_io_data (data->msg) == io) {
                 if (data->can_be_restarted)
                         data->item->state = SOUP_MESSAGE_RESTARTING;
                 else
-                        soup_message_set_metrics_timestamp (msg, SOUP_MESSAGE_METRICS_RESPONSE_END);
+                        soup_message_set_metrics_timestamp (data->msg, SOUP_MESSAGE_METRICS_RESPONSE_END);
 
-                soup_client_message_io_http2_finished ((SoupClientMessageIO *)io, msg);
+                soup_client_message_io_http2_finished ((SoupClientMessageIO *)data->io, data->msg);
         }
 
+        data->task = NULL;
+        data->in_run_until_read_async = FALSE;
+
         g_task_return_error (task, error);
         g_object_unref (task);
 }
@@ -1528,11 +1551,12 @@ soup_client_message_io_http2_run_until_read_async (SoupClientMessageIO *iface,
                                                    GAsyncReadyCallback  callback,
                                                    gpointer             user_data)
 {
-        GTask *task;
+        SoupClientMessageIOHTTP2 *io = (SoupClientMessageIOHTTP2 *)iface;
+        SoupHTTP2MessageData *data = get_data_for_message (io, msg);
 
-        task = g_task_new (msg, cancellable, callback, user_data);
-       g_task_set_priority (task, io_priority);
-        io_run_until_read_async (msg, task);
+        data->task = g_task_new (msg, cancellable, callback, user_data);
+        g_task_set_priority (data->task, io_priority);
+        io_run_until_read_async (data);
 }
 
 static gboolean


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]