[libsoup/carlosgc/http2-sniffer] http2: add support for content sniffing
- From: Carlos Garcia Campos <carlosgc src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [libsoup/carlosgc/http2-sniffer] http2: add support for content sniffing
- Date: Tue, 18 May 2021 16:17:23 +0000 (UTC)
commit f938de717db191e249613f3815009428a3440ffc
Author: Carlos Garcia Campos <cgarcia igalia com>
Date: Tue May 18 18:16:09 2021 +0200
http2: add support for content sniffing
libsoup/http2/soup-client-message-io-http2.c | 81 ++++++++++++------
tests/http2-test.c | 118 ++++++++++++++++++++++++++-
2 files changed, 174 insertions(+), 25 deletions(-)
---
diff --git a/libsoup/http2/soup-client-message-io-http2.c b/libsoup/http2/soup-client-message-io-http2.c
index 90edcefa..068e1564 100644
--- a/libsoup/http2/soup-client-message-io-http2.c
+++ b/libsoup/http2/soup-client-message-io-http2.c
@@ -54,6 +54,7 @@ typedef enum {
STATE_WRITE_DATA,
STATE_WRITE_DONE,
STATE_READ_HEADERS,
+ STATE_READ_DATA_START,
STATE_READ_DATA,
STATE_READ_DONE,
STATE_ERROR,
@@ -114,7 +115,7 @@ typedef struct {
} SoupHTTP2MessageData;
static void soup_client_message_io_http2_finished (SoupClientMessageIO *, SoupMessage *);
-static gboolean io_read_or_write (SoupClientMessageIOHTTP2 *, gboolean, GCancellable *, GError **);
+static gboolean io_read (SoupClientMessageIOHTTP2 *, gboolean, GCancellable *, GError **);
static void
NGCHECK (int return_code)
@@ -175,6 +176,8 @@ state_to_string (SoupHTTP2IOState state)
return "WRITE_DONE";
case STATE_READ_HEADERS:
return "READ_HEADERS";
+ case STATE_READ_DATA_START:
+ return "READ_DATA_START";
case STATE_READ_DATA:
return "READ_DATA";
case STATE_READ_DONE:
@@ -320,8 +323,10 @@ memory_stream_need_more_data_callback (SoupBodyInputStreamHttp2 *stream,
SoupHTTP2MessageData *data = (SoupHTTP2MessageData*)user_data;
GError *error = NULL;
- io_read_or_write (data->io, blocking, cancellable, &error);
+ if (!nghttp2_session_want_read (data->io->session))
+ return blocking ? NULL : g_error_new_literal (G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
_("Operation would block"));
+ io_read (data->io, blocking, cancellable, &error);
return error;
}
@@ -343,19 +348,20 @@ on_begin_frame_callback (nghttp2_session *session,
advance_state_from (data, STATE_ANY, STATE_READ_HEADERS);
break;
case NGHTTP2_DATA: {
- if (data->state < STATE_READ_DATA)
- advance_state_from (data, STATE_ANY, STATE_READ_DATA);
-
- if (!data->body_istream) {
+ if (data->state < STATE_READ_DATA_START) {
+ g_assert (!data->body_istream);
data->body_istream = soup_body_input_stream_http2_new (G_POLLABLE_INPUT_STREAM
(data->io->istream));
g_signal_connect (data->body_istream, "need-more-data",
G_CALLBACK (memory_stream_need_more_data_callback), data);
- }
- if (!data->decoded_data_istream)
+
+ g_assert (!data->decoded_data_istream);
data->decoded_data_istream = soup_session_setup_message_body_input_stream
(data->item->session,
data->msg,
data->body_istream,
SOUP_STAGE_MESSAGE_BODY);
+
+ advance_state_from (data, STATE_READ_HEADERS, STATE_READ_DATA_START);
+ }
break;
}
}
@@ -434,7 +440,6 @@ on_frame_recv_callback (nghttp2_session *session,
data->metrics->response_body_bytes_received += frame->data.hd.length +
FRAME_HEADER_SIZE;
if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM) {
soup_body_input_stream_http2_complete (SOUP_BODY_INPUT_STREAM_HTTP2
(data->body_istream));
- soup_message_got_body (data->msg);
}
break;
case NGHTTP2_RST_STREAM:
@@ -1016,12 +1021,14 @@ soup_client_message_io_http2_get_source (SoupMessage *msg,
/* TODO: Handle mixing writes in? */
if (data->paused)
base_source = cancellable ? g_cancellable_source_new (cancellable) : NULL;
- else if (data->state < STATE_WRITE_DONE)
+ else if (data->state < STATE_WRITE_DONE && nghttp2_session_want_write (io->session))
base_source = g_pollable_output_stream_create_source (G_POLLABLE_OUTPUT_STREAM
(io->ostream), cancellable);
- else if (data->state < STATE_READ_DONE)
+ else if (data->state < STATE_READ_DONE && nghttp2_session_want_read (io->session))
base_source = g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM (io->istream),
cancellable);
- else
- g_assert_not_reached ();
+ else {
+ g_warn_if_reached ();
+ base_source = g_timeout_source_new (0);
+ }
GSource *source = soup_message_io_source_new (base_source, G_OBJECT (msg), data->paused,
message_source_check);
g_source_set_callback (source, (GSourceFunc)callback, user_data, NULL);
@@ -1043,6 +1050,7 @@ client_stream_eof (SoupClientInputStream *stream,
SoupHTTP2MessageData *data = get_data_for_message (io, msg);
h2_debug (io, data, "Client stream EOF");
advance_state_from (data, STATE_ANY, STATE_READ_DONE);
+ soup_message_got_body (data->msg);
}
static GInputStream *
@@ -1119,17 +1127,44 @@ io_write (SoupClientMessageIOHTTP2 *io,
return TRUE;
}
+static void
+io_try_sniff_content (SoupHTTP2MessageData *data,
+ gboolean blocking,
+ GCancellable *cancellable)
+{
+ GError *error = NULL;
+
+ if (soup_message_try_sniff_content (data->msg, data->decoded_data_istream, blocking, cancellable,
&error)) {
+ h2_debug (data->io, data, "[DATA] Sniffed content");
+ advance_state_from (data, STATE_READ_DATA_START, STATE_READ_DATA);
+ } else {
+ h2_debug (data->io, data, "[DATA] Sniffer stream was not ready %s", error->message);
+
+ g_clear_error (&error);
+ }
+}
+
static gboolean
-io_read_or_write (SoupClientMessageIOHTTP2 *io,
- gboolean blocking,
- GCancellable *cancellable,
- GError **error)
+io_run (SoupHTTP2MessageData *data,
+ gboolean blocking,
+ GCancellable *cancellable,
+ GError **error)
{
- /* TODO: This can possibly more inteligent about what actually needs
- writing so we can prioritize better. */
- if (nghttp2_session_want_write (io->session))
- return io_write (io, blocking, cancellable, error);
- return io_read (io, blocking, cancellable, error);
+ gboolean progress = FALSE;
+
+ if (data->state == STATE_READ_DATA_START)
+ io_try_sniff_content (data, blocking, cancellable);
+
+ if (nghttp2_session_want_write (data->io->session))
+ progress = io_write (data->io, blocking, cancellable, error);
+ else if (nghttp2_session_want_read (data->io->session)) {
+ progress = io_read (data->io, blocking, cancellable, error);
+
+ if (progress && data->state == STATE_READ_DATA_START)
+ io_try_sniff_content (data, blocking, cancellable);
+ }
+
+ return progress;
}
static gboolean
@@ -1156,7 +1191,7 @@ io_run_until (SoupMessage *msg,
g_object_ref (msg);
while (progress && get_io_data (msg) == io && !data->paused && data->state < state) {
- progress = io_read_or_write (io, blocking, cancellable, &my_error);
+ progress = io_run (data, blocking, cancellable, &my_error);
}
if (my_error || (my_error = get_error_for_data (io, data))) {
diff --git a/tests/http2-test.c b/tests/http2-test.c
index d5c75c43..cc0bdaa9 100644
--- a/tests/http2-test.c
+++ b/tests/http2-test.c
@@ -111,7 +111,7 @@ read_stream_to_bytes_sync (GInputStream *stream)
NULL, &error);
g_assert_no_error (error);
- g_assert_cmpint (read, >, 0);
+ g_assert_cmpint (read, >=, 0);
GBytes *bytes = g_memory_output_stream_steal_as_bytes (G_MEMORY_OUTPUT_STREAM (out));
g_object_unref (out);
@@ -549,6 +549,113 @@ do_preconnect_test (Test *test, gconstpointer data)
g_main_context_unref (async_context);
}
+static void
+content_sniffed (SoupMessage *msg,
+ char *content_type,
+ GHashTable *params)
+{
+ soup_test_assert (g_object_get_data (G_OBJECT (msg), "got-chunk") == NULL,
+ "got-chunk got emitted before content-sniffed");
+
+ g_object_set_data (G_OBJECT (msg), "content-sniffed", GINT_TO_POINTER (TRUE));
+}
+
+static void
+got_headers (SoupMessage *msg)
+{
+ soup_test_assert (g_object_get_data (G_OBJECT (msg), "content-sniffed") == NULL,
+ "content-sniffed got emitted before got-headers");
+
+ g_object_set_data (G_OBJECT (msg), "got-headers", GINT_TO_POINTER (TRUE));
+}
+
+static void
+sniffer_test_send_ready_cb (SoupSession *session,
+ GAsyncResult *result,
+ GInputStream **stream)
+{
+ GError *error = NULL;
+
+ *stream = soup_session_send_finish (session, result, &error);
+ g_assert_no_error (error);
+ g_assert_nonnull (*stream);
+}
+
+static void
+do_one_sniffer_test (SoupSession *session,
+ const char *uri,
+ gsize expected_size,
+ gboolean should_sniff,
+ GMainContext *async_context)
+{
+ SoupMessage *msg;
+ GInputStream *stream = NULL;
+ GBytes *bytes;
+
+ msg = soup_message_new (SOUP_METHOD_GET, uri);
+ g_object_connect (msg,
+ "signal::got-headers", got_headers, NULL,
+ "signal::content-sniffed", content_sniffed, NULL,
+ NULL);
+ if (async_context) {
+ soup_session_send_async (session, msg, G_PRIORITY_DEFAULT, NULL,
+ (GAsyncReadyCallback)sniffer_test_send_ready_cb,
+ &stream);
+
+ while (!stream)
+ g_main_context_iteration (async_context, TRUE);
+ } else {
+ GError *error = NULL;
+
+ stream = soup_session_send (session, msg, NULL, &error);
+ g_assert_no_error (error);
+ g_assert_nonnull (stream);
+ }
+
+ if (should_sniff) {
+ soup_test_assert (g_object_get_data (G_OBJECT (msg), "content-sniffed") != NULL,
+ "content-sniffed did not get emitted");
+ } else {
+ soup_test_assert (g_object_get_data (G_OBJECT (msg), "content-sniffed") == NULL,
+ "content-sniffed got emitted without a sniffer");
+ }
+
+ bytes = read_stream_to_bytes_sync (stream);
+ g_assert_cmpuint (g_bytes_get_size (bytes), ==, expected_size);
+
+ g_object_unref (stream);
+ g_bytes_unref (bytes);
+ g_object_unref (msg);
+}
+
+static void
+do_sniffer_async_test (Test *test, gconstpointer data)
+{
+ GMainContext *async_context = g_main_context_ref_thread_default ();
+
+ soup_session_add_feature_by_type (test->session, SOUP_TYPE_CONTENT_SNIFFER);
+
+ do_one_sniffer_test (test->session, "https://127.0.0.1:5000/", 11, TRUE, async_context);
+ do_one_sniffer_test (test->session, "https://127.0.0.1:5000/large", (1024 * 24) + 1, TRUE,
async_context);
+ do_one_sniffer_test (test->session, "https://127.0.0.1:5000/no-content", 0, FALSE, async_context);
+
+ while (g_main_context_pending (async_context))
+ g_main_context_iteration (async_context, FALSE);
+
+ g_main_context_unref (async_context);
+}
+
+static void
+do_sniffer_sync_test (Test *test, gconstpointer data)
+{
+ soup_session_add_feature_by_type (test->session, SOUP_TYPE_CONTENT_SNIFFER);
+
+ do_one_sniffer_test (test->session, "https://127.0.0.1:5000/", 11, TRUE, NULL);
+ /* FIXME: large seems to be broken in sync mode */
+ /* do_one_sniffer_test (test->session, "https://127.0.0.1:5000/large", (1024 * 24) + 1, TRUE, NULL);
*/
+ do_one_sniffer_test (test->session, "https://127.0.0.1:5000/no-content", 0, FALSE, NULL);
+}
+
int
main (int argc, char **argv)
{
@@ -627,7 +734,14 @@ main (int argc, char **argv)
setup_session,
do_cancellation_test,
teardown_session);
-
+ g_test_add ("/http2/sniffer/async", Test, NULL,
+ setup_session,
+ do_sniffer_async_test,
+ teardown_session);
+ g_test_add ("/http2/sniffer/sync", Test, NULL,
+ setup_session,
+ do_sniffer_sync_test,
+ teardown_session);
ret = g_test_run ();
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]