[libsoup] io-http2: ensure we actually send the reset stream frames



commit 846aef66f92b99d8d77f029764501a0146a1d95e
Author: Carlos Garcia Campos <cgarcia igalia com>
Date:   Wed May 26 15:57:42 2021 +0200

    io-http2: ensure we actually send the reset stream frames
    
    In case there's no more IO after the message is finished, we don't
    really send the reset stream frame. We should ensure there's a valid
    write after calling nghttp2_submit_rst_stream.

 libsoup/http2/soup-client-message-io-http2.c | 113 ++++++++++++++++++++++++---
 1 file changed, 100 insertions(+), 13 deletions(-)
---
diff --git a/libsoup/http2/soup-client-message-io-http2.c b/libsoup/http2/soup-client-message-io-http2.c
index dfa97033..573143b7 100644
--- a/libsoup/http2/soup-client-message-io-http2.c
+++ b/libsoup/http2/soup-client-message-io-http2.c
@@ -69,6 +69,7 @@ typedef struct {
         GMainContext *async_context;
 
         GHashTable *messages;
+        GHashTable *closed_messages;
 
         nghttp2_session *session;
 
@@ -77,6 +78,7 @@ typedef struct {
         gssize write_buffer_size;
         gssize written_bytes;
 
+        GSource *reset_stream_source;
         gboolean is_shutdown;
 } SoupClientMessageIOHTTP2;
 
@@ -110,9 +112,11 @@ typedef struct {
         GError *error;
         gboolean paused;
         guint32 stream_id;
+        gboolean can_be_restarted;
 } SoupHTTP2MessageData;
 
 static gboolean io_read (SoupClientMessageIOHTTP2 *, gboolean, GCancellable *, GError **);
+static void io_write_until_stream_reset_is_sent (SoupHTTP2MessageData *data);
 
 static void
 NGCHECK (int return_code)
@@ -487,17 +491,26 @@ on_before_frame_send_callback (nghttp2_session     *session,
         return 0;
 }
 
+static gboolean
+remove_closed_stream (SoupHTTP2MessageData *data,
+                      gpointer              value,
+                      nghttp2_frame        *frame)
+{
+        return data->stream_id == frame->hd.stream_id;
+}
+
 static int
 on_frame_send_callback (nghttp2_session     *session,
                         const nghttp2_frame *frame,
                         void                *user_data)
 {
+        SoupClientMessageIOHTTP2 *io = user_data;
         SoupHTTP2MessageData *data = nghttp2_session_get_stream_user_data (session, frame->hd.stream_id);
 
         switch (frame->hd.type) {
         case NGHTTP2_HEADERS:
                 g_assert (data);
-                h2_debug (user_data, data, "[SEND] [HEADERS] finished=%d",
+                h2_debug (io, data, "[SEND] [HEADERS] finished=%d",
                           (frame->hd.flags & NGHTTP2_FLAG_END_HEADERS) ? 1 : 0);
 
                 if (data->metrics)
@@ -516,7 +529,7 @@ on_frame_send_callback (nghttp2_session     *session,
                 if (data->state < STATE_WRITE_DATA)
                         advance_state_from (data, STATE_WRITE_HEADERS, STATE_WRITE_DATA);
 
-                h2_debug (user_data, data, "[SEND] [DATA] bytes=%zu, finished=%d",
+                h2_debug (io, data, "[SEND] [DATA] bytes=%zu, finished=%d",
                           frame->data.hd.length, frame->hd.flags & NGHTTP2_FLAG_END_STREAM);
                 if (data->metrics) {
                         data->metrics->request_body_bytes_sent += frame->hd.length + FRAME_HEADER_SIZE;
@@ -530,10 +543,11 @@ on_frame_send_callback (nghttp2_session     *session,
                 }
                 break;
         case NGHTTP2_RST_STREAM:
-                h2_debug (user_data, data, "[SEND] [RST_STREAM] stream_id=%u", frame->hd.stream_id);
+                h2_debug (io, data, "[SEND] [RST_STREAM] stream_id=%u", frame->hd.stream_id);
+                g_hash_table_foreach_remove (io->closed_messages, (GHRFunc)remove_closed_stream, 
(gpointer)frame);
                 break;
         default:
-                h2_debug (user_data, data, "[SEND] [%s]", frame_type_to_string (frame->hd.type));
+                h2_debug (io, data, "[SEND] [%s]", frame_type_to_string (frame->hd.type));
                 break;
         }
 
@@ -743,13 +757,21 @@ add_message_to_io_data (SoupClientMessageIOHTTP2        *io,
 }
 
 static void
-soup_http2_message_data_free (SoupHTTP2MessageData *data)
+soup_http2_message_data_close (SoupHTTP2MessageData *data)
 {
-        if (data->body_istream)
+        /* Message data in close state is just waiting for reset stream to be sent
+         * to be removed from the messages hash table. Everything is reset but
+         * stream_id and io.
+         */
+        if (data->body_istream) {
                 g_signal_handlers_disconnect_by_data (data->body_istream, data);
+                g_clear_object (&data->body_istream);
+        }
 
+        data->msg = NULL;
+        data->metrics = NULL;
+        data->cancellable = NULL;
         g_clear_pointer (&data->item, soup_message_queue_item_unref);
-        g_clear_object (&data->body_istream);
         g_clear_object (&data->decoded_data_istream);
 
         if (data->io_source) {
@@ -757,9 +779,10 @@ soup_http2_message_data_free (SoupHTTP2MessageData *data)
                 g_clear_pointer (&data->io_source, g_source_unref);
         }
 
-        if (data->data_source_poll)
+        if (data->data_source_poll) {
                 g_source_destroy (data->data_source_poll);
-        g_clear_pointer (&data->data_source_poll, g_source_unref);
+                g_clear_pointer (&data->data_source_poll, g_source_unref);
+        }
 
         g_clear_error (&data->data_source_error);
         g_clear_pointer (&data->data_source_buffer, g_byte_array_unref);
@@ -768,6 +791,14 @@ soup_http2_message_data_free (SoupHTTP2MessageData *data)
 
         g_clear_error (&data->error);
 
+        data->completion_cb = NULL;
+        data->completion_data = NULL;
+}
+
+static void
+soup_http2_message_data_free (SoupHTTP2MessageData *data)
+{
+        soup_http2_message_data_close (data);
         g_free (data);
 }
 
@@ -920,16 +951,29 @@ soup_client_message_io_http2_finished (SoupClientMessageIO *iface,
 
        g_object_ref (msg);
 
-        NGCHECK (nghttp2_submit_rst_stream (io->session, NGHTTP2_FLAG_NONE, data->stream_id,
-                                            completion == SOUP_MESSAGE_IO_COMPLETE ? NGHTTP2_NO_ERROR : 
NGHTTP2_CANCEL));
         nghttp2_session_set_stream_user_data (io->session, data->stream_id, NULL);
-        if (!g_hash_table_remove (io->messages, msg))
-                g_warn_if_reached ();
+
+        if (!io->is_shutdown) {
+                NGCHECK (nghttp2_submit_rst_stream (io->session, NGHTTP2_FLAG_NONE, data->stream_id,
+                                                    completion == SOUP_MESSAGE_IO_COMPLETE ? 
NGHTTP2_NO_ERROR : NGHTTP2_CANCEL));
+                soup_http2_message_data_close (data);
+
+                if (!g_hash_table_steal (io->messages, msg))
+                        g_warn_if_reached ();
+                if (!g_hash_table_add (io->closed_messages, data))
+                        g_warn_if_reached ();
+        } else {
+                if (!g_hash_table_remove (io->messages, msg))
+                        g_warn_if_reached ();
+        }
 
        if (completion_cb)
                completion_cb (G_OBJECT (msg), SOUP_MESSAGE_IO_COMPLETE, completion_data);
 
        g_object_unref (msg);
+
+        if (!io->is_shutdown)
+                io_write_until_stream_reset_is_sent (data);
 }
 
 static void
@@ -1242,6 +1286,43 @@ io_run_until (SoupClientMessageIOHTTP2 *io,
        return done;
 }
 
+static gboolean
+io_write_until_stream_reset_is_sent_ready (GObject              *stream,
+                                           SoupHTTP2MessageData *data)
+{
+        io_write_until_stream_reset_is_sent (data);
+
+        return G_SOURCE_REMOVE;
+}
+
+static void
+io_write_until_stream_reset_is_sent (SoupHTTP2MessageData *data)
+{
+        SoupClientMessageIOHTTP2 *io = data->io;
+        GError *error = NULL;
+
+        if (io->reset_stream_source) {
+                g_source_destroy (io->reset_stream_source);
+                g_clear_pointer (&io->reset_stream_source, g_source_unref);
+        }
+
+        while (g_hash_table_lookup (io->closed_messages, data)) {
+                if (!nghttp2_session_want_write (io->session)) {
+                        error = g_error_new_literal (G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK, _("Operation would 
block"));
+                        break;
+                }
+                if (!io_write (io, FALSE, FALSE, &error))
+                        break;
+        }
+
+        if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
+                io->reset_stream_source = g_pollable_output_stream_create_source (G_POLLABLE_OUTPUT_STREAM 
(io->ostream), NULL);
+                g_source_set_callback (io->reset_stream_source, 
(GSourceFunc)io_write_until_stream_reset_is_sent_ready, data, NULL);
+                g_source_attach (io->reset_stream_source, g_main_context_get_thread_default ());
+        }
+
+        g_clear_error (&error);
+}
 
 static gboolean
 soup_client_message_io_http2_run_until_read (SoupClientMessageIO  *iface,
@@ -1379,10 +1460,15 @@ soup_client_message_io_http2_destroy (SoupClientMessageIO *iface)
 {
         SoupClientMessageIOHTTP2 *io = (SoupClientMessageIOHTTP2 *)iface;
 
+        if (io->reset_stream_source) {
+                g_source_destroy (io->reset_stream_source);
+                g_clear_pointer (&io->reset_stream_source, g_source_unref);
+        }
         g_clear_object (&io->stream);
         g_clear_pointer (&io->async_context, g_main_context_unref);
         g_clear_pointer (&io->session, nghttp2_session_del);
         g_clear_pointer (&io->messages, g_hash_table_unref);
+        g_clear_pointer (&io->closed_messages, g_hash_table_unref);
 
         g_free (io);
 }
@@ -1448,6 +1534,7 @@ soup_client_message_io_http2_init (SoupClientMessageIOHTTP2 *io)
         nghttp2_session_callbacks_del (callbacks);
 
         io->messages = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL, 
(GDestroyNotify)soup_http2_message_data_free);
+        io->closed_messages = g_hash_table_new_full (g_direct_hash, g_direct_equal, 
(GDestroyNotify)soup_http2_message_data_free, NULL);
 
         io->iface.funcs = &io_funcs;
 }


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]