[libsoup/carlosgc/http2-io-from-callbacks: 2/2] http2: ensure mem_send and mem_recv functions are not called from nghttp2 callbacks




commit 70816300521ca9ad714657edbcd57207cad9270d
Author: Carlos Garcia Campos <cgarcia igalia com>
Date:   Wed Nov 10 13:19:36 2021 +0100

    http2: ensure mem_send and mem_recv functions are not called from nghttp2 callbacks

 libsoup/http2/soup-client-message-io-http2.c | 52 +++++++++++++++++++--
 tests/http2-test.c                           | 70 ++++++++++++++++++++++++++++
 2 files changed, 119 insertions(+), 3 deletions(-)
---
diff --git a/libsoup/http2/soup-client-message-io-http2.c b/libsoup/http2/soup-client-message-io-http2.c
index d710ec2b..cc91f02c 100644
--- a/libsoup/http2/soup-client-message-io-http2.c
+++ b/libsoup/http2/soup-client-message-io-http2.c
@@ -87,6 +87,8 @@ typedef struct {
         GTask *close_task;
         gboolean session_terminated;
         gboolean goaway_sent;
+
+        guint in_callback;
 } SoupClientMessageIOHTTP2;
 
 typedef struct {
@@ -345,6 +347,7 @@ io_write (SoupClientMessageIOHTTP2 *io,
 
         if (io->write_buffer == NULL) {
                 io->written_bytes = 0;
+                g_assert (io->in_callback == 0);
                 io->write_buffer_size = nghttp2_session_mem_send (io->session, (const 
guint8**)&io->write_buffer);
                 NGCHECK (io->write_buffer_size);
                 if (io->write_buffer_size == 0) {
@@ -400,10 +403,15 @@ io_try_write (SoupClientMessageIOHTTP2 *io,
         if (io->write_source)
                 return;
 
-        while (nghttp2_session_want_write (io->session) && !error)
-                io_write (io, blocking, NULL, &error);
+        if (io->in_callback) {
+                if (blocking || !nghttp2_session_want_write (io->session))
+                        return;
+        } else {
+                while (nghttp2_session_want_write (io->session) && !error)
+                        io_write (io, blocking, NULL, &error);
+        }
 
-        if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
+        if (io->in_callback || g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
                 g_clear_error (&error);
                 io->write_source = g_pollable_output_stream_create_source (G_POLLABLE_OUTPUT_STREAM 
(io->ostream), NULL);
                 g_source_set_name (io->write_source, "Soup HTTP/2 write source");
@@ -429,6 +437,7 @@ io_read (SoupClientMessageIOHTTP2  *io,
                                             blocking, cancellable, error)) < 0)
             return FALSE;
 
+        g_assert (io->in_callback == 0);
         ret = nghttp2_session_mem_recv (io->session, buffer, read);
         NGCHECK (ret);
         return ret != 0;
@@ -535,19 +544,24 @@ on_header_callback (nghttp2_session     *session,
         if (!data)
                 return 0;
 
+        data->io->in_callback++;
+
         SoupMessage *msg = data->msg;
         if (name[0] == ':') {
                 if (strcmp ((char *)name, ":status") == 0) {
                         guint status_code = (guint)g_ascii_strtoull ((char *)value, NULL, 10);
                         soup_message_set_status (msg, status_code, NULL);
+                        data->io->in_callback--;
                         return 0;
                 }
                 g_debug ("Unknown header: %s = %s", name, value);
+                data->io->in_callback--;
                 return 0;
         }
 
         soup_message_headers_append_untrusted_data (soup_message_get_response_headers (data->msg),
                                                     (const char*)name, (const char*)value);
+        data->io->in_callback--;
         return 0;
 }
 
@@ -577,6 +591,8 @@ on_begin_frame_callback (nghttp2_session        *session,
         if (!data)
                 return 0;
 
+        data->io->in_callback++;
+
         switch (hd->type) {
         case NGHTTP2_HEADERS:
                 if (data->state < STATE_READ_HEADERS) {
@@ -602,6 +618,7 @@ on_begin_frame_callback (nghttp2_session        *session,
                 break;
         }
 
+        data->io->in_callback--;
         return 0;
 }
 
@@ -634,6 +651,8 @@ on_frame_recv_callback (nghttp2_session     *session,
         SoupClientMessageIOHTTP2 *io = user_data;
         SoupHTTP2MessageData *data;
 
+        io->in_callback++;
+
         if (frame->hd.stream_id == 0) {
                 h2_debug (io, NULL, "[RECV] [%s] Recieved (%u)", frame_type_to_string (frame->hd.type), 
frame->hd.flags);
 
@@ -653,6 +672,7 @@ on_frame_recv_callback (nghttp2_session     *session,
                         break;
                 }
 
+                io->in_callback--;
                 return 0;
         }
 
@@ -662,6 +682,7 @@ on_frame_recv_callback (nghttp2_session     *session,
         if (!data) {
                 if (!(frame->hd.flags & NGHTTP2_FLAG_END_STREAM))
                         g_warn_if_reached ();
+                io->in_callback--;
                 return 0;
         }
 
@@ -676,6 +697,7 @@ on_frame_recv_callback (nghttp2_session     *session,
                                 soup_message_got_informational (data->msg);
                                 soup_message_cleanup_response (data->msg);
                                 advance_state_from (data, STATE_READ_HEADERS, STATE_READ_DONE);
+                                io->in_callback--;
                                 return 0;
                         }
 
@@ -713,6 +735,7 @@ on_frame_recv_callback (nghttp2_session     *session,
                 break;
         };
 
+        io->in_callback--;
         return 0;
 }
 
@@ -730,6 +753,8 @@ on_data_chunk_recv_callback (nghttp2_session *session,
         if (!msgdata)
                 return NGHTTP2_ERR_CALLBACK_FAILURE;
 
+        io->in_callback++;
+
         h2_debug (io, msgdata, "[DATA] Recieved chunk, len=%zu, flags=%u, paused=%d", len, flags, 
msgdata->paused);
 
         g_assert (msgdata->body_istream != NULL);
@@ -737,6 +762,7 @@ on_data_chunk_recv_callback (nghttp2_session *session,
         if (msgdata->state == STATE_READ_DATA_START)
                 io_try_sniff_content (msgdata, FALSE, msgdata->item->cancellable);
 
+        io->in_callback--;
         return 0;
 }
 
@@ -752,12 +778,15 @@ on_before_frame_send_callback (nghttp2_session     *session,
         if (!data)
                 return 0;
 
+        data->io->in_callback++;
+
         switch (frame->hd.type) {
         case NGHTTP2_HEADERS:
                 advance_state_from (data, STATE_NONE, STATE_WRITE_HEADERS);
                 break;
         }
 
+        data->io->in_callback--;
         return 0;
 }
 
@@ -777,6 +806,8 @@ on_frame_send_callback (nghttp2_session     *session,
         SoupClientMessageIOHTTP2 *io = user_data;
         SoupHTTP2MessageData *data = nghttp2_session_get_stream_user_data (session, frame->hd.stream_id);
 
+        io->in_callback++;
+
         switch (frame->hd.type) {
         case NGHTTP2_HEADERS:
                 g_assert (data);
@@ -829,6 +860,7 @@ on_frame_send_callback (nghttp2_session     *session,
                 break;
         }
 
+        io->in_callback--;
         return 0;
 }
 
@@ -840,9 +872,12 @@ on_frame_not_send_callback (nghttp2_session     *session,
 {
         SoupHTTP2MessageData *data = nghttp2_session_get_stream_user_data (session, frame->hd.stream_id);
 
+        data->io->in_callback++;
+
         h2_debug (user_data, data, "[SEND] [%s] Failed: %s", frame_type_to_string (frame->hd.type),
                   nghttp2_strerror (lib_error_code));
 
+        data->io->in_callback--;
         return 0;
 }
 
@@ -858,9 +893,12 @@ on_stream_close_callback (nghttp2_session *session,
         if (!data)
                 return 0;
 
+        data->io->in_callback++;
+
         if (error_code == NGHTTP2_REFUSED_STREAM && data->state < STATE_READ_DATA)
                 data->can_be_restarted = TRUE;
 
+        data->io->in_callback--;
         return 0;
 }
 
@@ -935,6 +973,7 @@ on_data_source_read_callback (nghttp2_session     *session,
                               void                *user_data)
 {
         SoupHTTP2MessageData *data = nghttp2_session_get_stream_user_data (session, stream_id);
+        data->io->in_callback++;
 
         /* We support pollable streams in the best case because they
          * should perform better with one fewer copy of each buffer and no threading. */
@@ -960,10 +999,12 @@ on_data_source_read_callback (nghttp2_session     *session,
                                 g_source_attach (data->data_source_poll, g_main_context_get_thread_default 
());
 
                                 g_error_free (error);
+                                data->io->in_callback--;
                                 return NGHTTP2_ERR_DEFERRED;
                         }
 
                         set_error_for_data (data, g_steal_pointer (&error));
+                        data->io->in_callback--;
                         return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
                 }
                 else if (read == 0) {
@@ -971,6 +1012,7 @@ on_data_source_read_callback (nghttp2_session     *session,
                         *data_flags |= NGHTTP2_DATA_FLAG_EOF;
                 }
 
+                data->io->in_callback--;
                 return read;
         } else {
                 GInputStream *in_stream = G_INPUT_STREAM (source->ptr);
@@ -989,13 +1031,16 @@ on_data_source_read_callback (nghttp2_session     *session,
                         memcpy (buf, data->data_source_buffer->data, buffer_len);
                         log_request_data (data, buf, buffer_len);
                         g_byte_array_set_size (data->data_source_buffer, 0);
+                        data->io->in_callback--;
                         return buffer_len;
                 } else if (data->data_source_eof) {
                         h2_debug (data->io, data, "[SEND_BODY] EOF");
                         *data_flags |= NGHTTP2_DATA_FLAG_EOF;
+                        data->io->in_callback--;
                         return 0;
                 } else if (data->data_source_error) {
                         set_error_for_data (data, g_steal_pointer (&data->data_source_error));
+                        data->io->in_callback--;
                         return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
                 } else {
                         h2_debug (data->io, data, "[SEND_BODY] Reading async");
@@ -1004,6 +1049,7 @@ on_data_source_read_callback (nghttp2_session     *session,
                                                    get_data_io_priority (data),
                                                    data->item->cancellable,
                                                    (GAsyncReadyCallback)on_data_read, data);
+                        data->io->in_callback--;
                         return NGHTTP2_ERR_DEFERRED;
                 }
         }
diff --git a/tests/http2-test.c b/tests/http2-test.c
index 5eef10b2..c566505a 100644
--- a/tests/http2-test.c
+++ b/tests/http2-test.c
@@ -327,6 +327,35 @@ do_post_sync_test (Test *test, gconstpointer data)
 
 }
 
+static void
+do_post_large_sync_test (Test *test, gconstpointer data)
+{
+        guint large_size = 1000000;
+        char *large_data;
+        unsigned int i;
+
+        large_data = g_malloc (large_size);
+        for (i = 0; i < large_size; i++)
+                large_data[i] = i & 0xFF;
+        GBytes *bytes = g_bytes_new_take (large_data, large_size);
+        test->msg = soup_message_new (SOUP_METHOD_POST, "https://127.0.0.1:5000/echo_post";);
+        soup_message_set_request_body_from_bytes (test->msg, "text/plain", bytes);
+
+        GError *error = NULL;
+        GInputStream *response = soup_session_send (test->session, test->msg, NULL, &error);
+
+        g_assert_no_error (error);
+        g_assert_nonnull (response);
+
+        GBytes *response_bytes = read_stream_to_bytes_sync (response);
+        g_assert_true (g_bytes_equal (bytes, response_bytes));
+
+        g_bytes_unref (response_bytes);
+        g_object_unref (response);
+        g_bytes_unref (bytes);
+        g_object_unref (test->msg);
+}
+
 static void
 do_post_async_test (Test *test, gconstpointer data)
 {
@@ -354,6 +383,39 @@ do_post_async_test (Test *test, gconstpointer data)
         g_object_unref (test->msg);
 }
 
+static void
+do_post_large_async_test (Test *test, gconstpointer data)
+{
+        GMainContext *async_context = g_main_context_ref_thread_default ();
+        guint large_size = 1000000;
+        char *large_data;
+        unsigned int i;
+
+        large_data = g_malloc (large_size);
+        for (i = 0; i < large_size; i++)
+                large_data[i] = i & 0xFF;
+        GBytes *bytes = g_bytes_new_take (large_data, large_size);
+        test->msg = soup_message_new (SOUP_METHOD_POST, "https://127.0.0.1:5000/echo_post";);
+        soup_message_set_request_body_from_bytes (test->msg, "text/plain", bytes);
+
+        GBytes *response = NULL;
+        soup_session_send_async (test->session, test->msg, G_PRIORITY_DEFAULT, NULL, on_send_complete, 
&response);
+
+        while (!response) {
+                g_main_context_iteration (async_context, TRUE);
+        }
+
+        g_assert_true (g_bytes_equal (bytes, response));
+
+        while (g_main_context_pending (async_context))
+                g_main_context_iteration (async_context, FALSE);
+
+        g_bytes_unref (response);
+        g_bytes_unref (bytes);
+        g_main_context_unref (async_context);
+        g_object_unref (test->msg);
+}
+
 static void
 do_post_blocked_async_test (Test *test, gconstpointer data)
 {
@@ -976,6 +1038,14 @@ main (int argc, char **argv)
                     setup_session,
                     do_post_sync_test,
                     teardown_session);
+        g_test_add ("/http2/post/large/sync", Test, NULL,
+                    setup_session,
+                    do_post_large_sync_test,
+                    teardown_session);
+        g_test_add ("/http2/post/large/async", Test, NULL,
+                    setup_session,
+                    do_post_large_async_test,
+                    teardown_session);
         g_test_add ("/http2/post/blocked/async", Test, NULL,
                     setup_session,
                     do_post_blocked_async_test,


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]