[libsoup/carlosgc/http2-io-from-callbacks: 2/2] http2: ensure mem_send and mem_recv functions are not called from nghttp2 callbacks
- From: Carlos Garcia Campos <carlosgc src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [libsoup/carlosgc/http2-io-from-callbacks: 2/2] http2: ensure mem_send and mem_recv functions are not called from nghttp2 callbacks
- Date: Wed, 10 Nov 2021 12:21:46 +0000 (UTC)
commit 70816300521ca9ad714657edbcd57207cad9270d
Author: Carlos Garcia Campos <cgarcia igalia com>
Date: Wed Nov 10 13:19:36 2021 +0100
http2: ensure mem_send and mem_recv functions are not called from nghttp2 callbacks
libsoup/http2/soup-client-message-io-http2.c | 52 +++++++++++++++++++--
tests/http2-test.c | 70 ++++++++++++++++++++++++++++
2 files changed, 119 insertions(+), 3 deletions(-)
---
diff --git a/libsoup/http2/soup-client-message-io-http2.c b/libsoup/http2/soup-client-message-io-http2.c
index d710ec2b..cc91f02c 100644
--- a/libsoup/http2/soup-client-message-io-http2.c
+++ b/libsoup/http2/soup-client-message-io-http2.c
@@ -87,6 +87,8 @@ typedef struct {
GTask *close_task;
gboolean session_terminated;
gboolean goaway_sent;
+
+ guint in_callback;
} SoupClientMessageIOHTTP2;
typedef struct {
@@ -345,6 +347,7 @@ io_write (SoupClientMessageIOHTTP2 *io,
if (io->write_buffer == NULL) {
io->written_bytes = 0;
+ g_assert (io->in_callback == 0);
io->write_buffer_size = nghttp2_session_mem_send (io->session, (const
guint8**)&io->write_buffer);
NGCHECK (io->write_buffer_size);
if (io->write_buffer_size == 0) {
@@ -400,10 +403,15 @@ io_try_write (SoupClientMessageIOHTTP2 *io,
if (io->write_source)
return;
- while (nghttp2_session_want_write (io->session) && !error)
- io_write (io, blocking, NULL, &error);
+ if (io->in_callback) {
+ if (blocking || !nghttp2_session_want_write (io->session))
+ return;
+ } else {
+ while (nghttp2_session_want_write (io->session) && !error)
+ io_write (io, blocking, NULL, &error);
+ }
- if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
+ if (io->in_callback || g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
g_clear_error (&error);
io->write_source = g_pollable_output_stream_create_source (G_POLLABLE_OUTPUT_STREAM
(io->ostream), NULL);
g_source_set_name (io->write_source, "Soup HTTP/2 write source");
@@ -429,6 +437,7 @@ io_read (SoupClientMessageIOHTTP2 *io,
blocking, cancellable, error)) < 0)
return FALSE;
+ g_assert (io->in_callback == 0);
ret = nghttp2_session_mem_recv (io->session, buffer, read);
NGCHECK (ret);
return ret != 0;
@@ -535,19 +544,24 @@ on_header_callback (nghttp2_session *session,
if (!data)
return 0;
+ data->io->in_callback++;
+
SoupMessage *msg = data->msg;
if (name[0] == ':') {
if (strcmp ((char *)name, ":status") == 0) {
guint status_code = (guint)g_ascii_strtoull ((char *)value, NULL, 10);
soup_message_set_status (msg, status_code, NULL);
+ data->io->in_callback--;
return 0;
}
g_debug ("Unknown header: %s = %s", name, value);
+ data->io->in_callback--;
return 0;
}
soup_message_headers_append_untrusted_data (soup_message_get_response_headers (data->msg),
(const char*)name, (const char*)value);
+ data->io->in_callback--;
return 0;
}
@@ -577,6 +591,8 @@ on_begin_frame_callback (nghttp2_session *session,
if (!data)
return 0;
+ data->io->in_callback++;
+
switch (hd->type) {
case NGHTTP2_HEADERS:
if (data->state < STATE_READ_HEADERS) {
@@ -602,6 +618,7 @@ on_begin_frame_callback (nghttp2_session *session,
break;
}
+ data->io->in_callback--;
return 0;
}
@@ -634,6 +651,8 @@ on_frame_recv_callback (nghttp2_session *session,
SoupClientMessageIOHTTP2 *io = user_data;
SoupHTTP2MessageData *data;
+ io->in_callback++;
+
if (frame->hd.stream_id == 0) {
h2_debug (io, NULL, "[RECV] [%s] Recieved (%u)", frame_type_to_string (frame->hd.type),
frame->hd.flags);
@@ -653,6 +672,7 @@ on_frame_recv_callback (nghttp2_session *session,
break;
}
+ io->in_callback--;
return 0;
}
@@ -662,6 +682,7 @@ on_frame_recv_callback (nghttp2_session *session,
if (!data) {
if (!(frame->hd.flags & NGHTTP2_FLAG_END_STREAM))
g_warn_if_reached ();
+ io->in_callback--;
return 0;
}
@@ -676,6 +697,7 @@ on_frame_recv_callback (nghttp2_session *session,
soup_message_got_informational (data->msg);
soup_message_cleanup_response (data->msg);
advance_state_from (data, STATE_READ_HEADERS, STATE_READ_DONE);
+ io->in_callback--;
return 0;
}
@@ -713,6 +735,7 @@ on_frame_recv_callback (nghttp2_session *session,
break;
};
+ io->in_callback--;
return 0;
}
@@ -730,6 +753,8 @@ on_data_chunk_recv_callback (nghttp2_session *session,
if (!msgdata)
return NGHTTP2_ERR_CALLBACK_FAILURE;
+ io->in_callback++;
+
h2_debug (io, msgdata, "[DATA] Recieved chunk, len=%zu, flags=%u, paused=%d", len, flags,
msgdata->paused);
g_assert (msgdata->body_istream != NULL);
@@ -737,6 +762,7 @@ on_data_chunk_recv_callback (nghttp2_session *session,
if (msgdata->state == STATE_READ_DATA_START)
io_try_sniff_content (msgdata, FALSE, msgdata->item->cancellable);
+ io->in_callback--;
return 0;
}
@@ -752,12 +778,15 @@ on_before_frame_send_callback (nghttp2_session *session,
if (!data)
return 0;
+ data->io->in_callback++;
+
switch (frame->hd.type) {
case NGHTTP2_HEADERS:
advance_state_from (data, STATE_NONE, STATE_WRITE_HEADERS);
break;
}
+ data->io->in_callback--;
return 0;
}
@@ -777,6 +806,8 @@ on_frame_send_callback (nghttp2_session *session,
SoupClientMessageIOHTTP2 *io = user_data;
SoupHTTP2MessageData *data = nghttp2_session_get_stream_user_data (session, frame->hd.stream_id);
+ io->in_callback++;
+
switch (frame->hd.type) {
case NGHTTP2_HEADERS:
g_assert (data);
@@ -829,6 +860,7 @@ on_frame_send_callback (nghttp2_session *session,
break;
}
+ io->in_callback--;
return 0;
}
@@ -840,9 +872,12 @@ on_frame_not_send_callback (nghttp2_session *session,
{
SoupHTTP2MessageData *data = nghttp2_session_get_stream_user_data (session, frame->hd.stream_id);
+ data->io->in_callback++;
+
h2_debug (user_data, data, "[SEND] [%s] Failed: %s", frame_type_to_string (frame->hd.type),
nghttp2_strerror (lib_error_code));
+ data->io->in_callback--;
return 0;
}
@@ -858,9 +893,12 @@ on_stream_close_callback (nghttp2_session *session,
if (!data)
return 0;
+ data->io->in_callback++;
+
if (error_code == NGHTTP2_REFUSED_STREAM && data->state < STATE_READ_DATA)
data->can_be_restarted = TRUE;
+ data->io->in_callback--;
return 0;
}
@@ -935,6 +973,7 @@ on_data_source_read_callback (nghttp2_session *session,
void *user_data)
{
SoupHTTP2MessageData *data = nghttp2_session_get_stream_user_data (session, stream_id);
+ data->io->in_callback++;
/* We support pollable streams in the best case because they
* should perform better with one fewer copy of each buffer and no threading. */
@@ -960,10 +999,12 @@ on_data_source_read_callback (nghttp2_session *session,
g_source_attach (data->data_source_poll, g_main_context_get_thread_default
());
g_error_free (error);
+ data->io->in_callback--;
return NGHTTP2_ERR_DEFERRED;
}
set_error_for_data (data, g_steal_pointer (&error));
+ data->io->in_callback--;
return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
}
else if (read == 0) {
@@ -971,6 +1012,7 @@ on_data_source_read_callback (nghttp2_session *session,
*data_flags |= NGHTTP2_DATA_FLAG_EOF;
}
+ data->io->in_callback--;
return read;
} else {
GInputStream *in_stream = G_INPUT_STREAM (source->ptr);
@@ -989,13 +1031,16 @@ on_data_source_read_callback (nghttp2_session *session,
memcpy (buf, data->data_source_buffer->data, buffer_len);
log_request_data (data, buf, buffer_len);
g_byte_array_set_size (data->data_source_buffer, 0);
+ data->io->in_callback--;
return buffer_len;
} else if (data->data_source_eof) {
h2_debug (data->io, data, "[SEND_BODY] EOF");
*data_flags |= NGHTTP2_DATA_FLAG_EOF;
+ data->io->in_callback--;
return 0;
} else if (data->data_source_error) {
set_error_for_data (data, g_steal_pointer (&data->data_source_error));
+ data->io->in_callback--;
return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
} else {
h2_debug (data->io, data, "[SEND_BODY] Reading async");
@@ -1004,6 +1049,7 @@ on_data_source_read_callback (nghttp2_session *session,
get_data_io_priority (data),
data->item->cancellable,
(GAsyncReadyCallback)on_data_read, data);
+ data->io->in_callback--;
return NGHTTP2_ERR_DEFERRED;
}
}
diff --git a/tests/http2-test.c b/tests/http2-test.c
index 5eef10b2..c566505a 100644
--- a/tests/http2-test.c
+++ b/tests/http2-test.c
@@ -327,6 +327,35 @@ do_post_sync_test (Test *test, gconstpointer data)
}
+static void
+do_post_large_sync_test (Test *test, gconstpointer data)
+{
+ guint large_size = 1000000;
+ char *large_data;
+ unsigned int i;
+
+ large_data = g_malloc (large_size);
+ for (i = 0; i < large_size; i++)
+ large_data[i] = i & 0xFF;
+ GBytes *bytes = g_bytes_new_take (large_data, large_size);
+ test->msg = soup_message_new (SOUP_METHOD_POST, "https://127.0.0.1:5000/echo_post");
+ soup_message_set_request_body_from_bytes (test->msg, "text/plain", bytes);
+
+ GError *error = NULL;
+ GInputStream *response = soup_session_send (test->session, test->msg, NULL, &error);
+
+ g_assert_no_error (error);
+ g_assert_nonnull (response);
+
+ GBytes *response_bytes = read_stream_to_bytes_sync (response);
+ g_assert_true (g_bytes_equal (bytes, response_bytes));
+
+ g_bytes_unref (response_bytes);
+ g_object_unref (response);
+ g_bytes_unref (bytes);
+ g_object_unref (test->msg);
+}
+
static void
do_post_async_test (Test *test, gconstpointer data)
{
@@ -354,6 +383,39 @@ do_post_async_test (Test *test, gconstpointer data)
g_object_unref (test->msg);
}
+static void
+do_post_large_async_test (Test *test, gconstpointer data)
+{
+ GMainContext *async_context = g_main_context_ref_thread_default ();
+ guint large_size = 1000000;
+ char *large_data;
+ unsigned int i;
+
+ large_data = g_malloc (large_size);
+ for (i = 0; i < large_size; i++)
+ large_data[i] = i & 0xFF;
+ GBytes *bytes = g_bytes_new_take (large_data, large_size);
+ test->msg = soup_message_new (SOUP_METHOD_POST, "https://127.0.0.1:5000/echo_post");
+ soup_message_set_request_body_from_bytes (test->msg, "text/plain", bytes);
+
+ GBytes *response = NULL;
+ soup_session_send_async (test->session, test->msg, G_PRIORITY_DEFAULT, NULL, on_send_complete,
&response);
+
+ while (!response) {
+ g_main_context_iteration (async_context, TRUE);
+ }
+
+ g_assert_true (g_bytes_equal (bytes, response));
+
+ while (g_main_context_pending (async_context))
+ g_main_context_iteration (async_context, FALSE);
+
+ g_bytes_unref (response);
+ g_bytes_unref (bytes);
+ g_main_context_unref (async_context);
+ g_object_unref (test->msg);
+}
+
static void
do_post_blocked_async_test (Test *test, gconstpointer data)
{
@@ -976,6 +1038,14 @@ main (int argc, char **argv)
setup_session,
do_post_sync_test,
teardown_session);
+ g_test_add ("/http2/post/large/sync", Test, NULL,
+ setup_session,
+ do_post_large_sync_test,
+ teardown_session);
+ g_test_add ("/http2/post/large/async", Test, NULL,
+ setup_session,
+ do_post_large_async_test,
+ teardown_session);
g_test_add ("/http2/post/blocked/async", Test, NULL,
setup_session,
do_post_blocked_async_test,
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]