[libsoup] http2: add support for content sniffing



commit 1a4c60ae6aef7c02debc627460a520c99dfd7952
Author: Carlos Garcia Campos <cgarcia igalia com>
Date:   Tue May 18 18:16:09 2021 +0200

    http2: add support for content sniffing

 libsoup/http2/soup-client-message-io-http2.c |  65 +++++++++++----
 tests/http2-test.c                           | 119 ++++++++++++++++++++++++++-
 2 files changed, 163 insertions(+), 21 deletions(-)
---
diff --git a/libsoup/http2/soup-client-message-io-http2.c b/libsoup/http2/soup-client-message-io-http2.c
index 8c6d4d19..175454a8 100644
--- a/libsoup/http2/soup-client-message-io-http2.c
+++ b/libsoup/http2/soup-client-message-io-http2.c
@@ -53,6 +53,7 @@ typedef enum {
         STATE_WRITE_DATA,
         STATE_WRITE_DONE,
         STATE_READ_HEADERS,
+        STATE_READ_DATA_START,
         STATE_READ_DATA,
         STATE_READ_DONE,
 } SoupHTTP2IOState;
@@ -172,6 +173,8 @@ state_to_string (SoupHTTP2IOState state)
                 return "WRITE_DONE";
         case STATE_READ_HEADERS:
                 return "READ_HEADERS";
+        case STATE_READ_DATA_START:
+                return "READ_DATA_START";
         case STATE_READ_DATA:
                 return "READ_DATA";
         case STATE_READ_DONE:
@@ -332,23 +335,23 @@ on_begin_frame_callback (nghttp2_session        *session,
                 if (data->state < STATE_READ_HEADERS)
                         advance_state_from (data, STATE_WRITE_DONE, STATE_READ_HEADERS);
                 break;
-        case NGHTTP2_DATA: {
-                if (data->state < STATE_READ_DATA)
-                        advance_state_from (data, STATE_READ_HEADERS, STATE_READ_DATA);
-
-                if (!data->body_istream) {
+        case NGHTTP2_DATA:
+                if (data->state < STATE_READ_DATA_START) {
+                        g_assert (!data->body_istream);
                         data->body_istream = soup_body_input_stream_http2_new (G_POLLABLE_INPUT_STREAM 
(data->io->istream));
                         g_signal_connect (data->body_istream, "need-more-data",
                                           G_CALLBACK (memory_stream_need_more_data_callback), data);
-                }
-                if (!data->decoded_data_istream)
+
+                        g_assert (!data->decoded_data_istream);
                         data->decoded_data_istream = soup_session_setup_message_body_input_stream 
(data->item->session,
                                                                                                    data->msg,
                                                                                                    
data->body_istream,
                                                                                                    
SOUP_STAGE_MESSAGE_BODY);
+
+                        advance_state_from (data, STATE_READ_HEADERS, STATE_READ_DATA_START);
+                }
                 break;
         }
-        }
 
         return 0;
 }
@@ -406,7 +409,6 @@ on_frame_recv_callback (nghttp2_session     *session,
                         if (SOUP_STATUS_IS_INFORMATIONAL (soup_message_get_status (data->msg))) {
                                 soup_message_got_informational (data->msg);
                                 soup_message_cleanup_response (data->msg);
-                                soup_body_input_stream_http2_complete (SOUP_BODY_INPUT_STREAM_HTTP2 
(data->body_istream));
                                 advance_state_from (data, STATE_READ_HEADERS, STATE_READ_DONE);
                                 return 0;
                         }
@@ -422,7 +424,7 @@ on_frame_recv_callback (nghttp2_session     *session,
         case NGHTTP2_DATA:
                 if (data->metrics)
                         data->metrics->response_body_bytes_received += frame->data.hd.length + 
FRAME_HEADER_SIZE;
-                if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM)
+                if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM && data->body_istream)
                         soup_body_input_stream_http2_complete (SOUP_BODY_INPUT_STREAM_HTTP2 
(data->body_istream));
                 break;
         case NGHTTP2_RST_STREAM:
@@ -1021,6 +1023,8 @@ soup_client_message_io_http2_get_source (SoupMessage             *msg,
                 base_source = cancellable ? g_cancellable_source_new (cancellable) : NULL;
         else if (data->state < STATE_WRITE_DONE && nghttp2_session_want_write (io->session))
                 base_source = g_pollable_output_stream_create_source (G_POLLABLE_OUTPUT_STREAM 
(io->ostream), cancellable);
+        else if (data->state < STATE_READ_DONE && data->decoded_data_istream)
+                base_source = g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM 
(data->decoded_data_istream), cancellable);
         else if (data->state < STATE_READ_DONE && nghttp2_session_want_read (io->session))
                 base_source = g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM (io->istream), 
cancellable);
         else {
@@ -1126,17 +1130,43 @@ io_write (SoupClientMessageIOHTTP2 *io,
         return TRUE;
 }
 
+static void
+io_try_sniff_content (SoupHTTP2MessageData *data,
+                      gboolean              blocking,
+                      GCancellable         *cancellable)
+{
+        GError *error = NULL;
+
+        if (soup_message_try_sniff_content (data->msg, data->decoded_data_istream, blocking, cancellable, 
&error)) {
+                h2_debug (data->io, data, "[DATA] Sniffed content");
+                advance_state_from (data, STATE_READ_DATA_START, STATE_READ_DATA);
+        } else {
+                h2_debug (data->io, data, "[DATA] Sniffer stream was not ready %s", error->message);
+
+                g_clear_error (&error);
+        }
+}
+
 static gboolean
-io_read_or_write (SoupHTTP2MessageData *data,
-                  gboolean              blocking,
-                  GCancellable         *cancellable,
-                  GError              **error)
+io_run (SoupHTTP2MessageData *data,
+        gboolean              blocking,
+        GCancellable         *cancellable,
+        GError              **error)
 {
         gboolean progress = FALSE;
+
+        if (data->state == STATE_READ_DATA_START)
+                io_try_sniff_content (data, blocking, cancellable);
+
         if (data->state < STATE_WRITE_DONE && nghttp2_session_want_write (data->io->session))
                 progress = io_write (data->io, blocking, cancellable, error);
-        else if (data->state < STATE_READ_DONE && nghttp2_session_want_read (data->io->session))
+        else if (data->state < STATE_READ_DONE && nghttp2_session_want_read (data->io->session)) {
                 progress = io_read (data->io, blocking, cancellable, error);
+
+                if (progress && data->state == STATE_READ_DATA_START)
+                        io_try_sniff_content (data, blocking, cancellable);
+        }
+
         return progress;
 }
 
@@ -1163,9 +1193,8 @@ io_run_until (SoupClientMessageIOHTTP2 *io,
 
        g_object_ref (msg);
 
-       while (progress && get_io_data (msg) == io && !data->paused && data->state < state) {
-                progress = io_read_or_write (data, blocking, cancellable, &my_error);
-       }
+       while (progress && get_io_data (msg) == io && !data->paused && data->state < state)
+                progress = io_run (data, blocking, cancellable, &my_error);
 
         if (my_error) {
                 g_propagate_error (error, my_error);
diff --git a/tests/http2-test.c b/tests/http2-test.c
index dc0a5538..7a48ce88 100644
--- a/tests/http2-test.c
+++ b/tests/http2-test.c
@@ -116,7 +116,7 @@ read_stream_to_bytes_sync (GInputStream *stream)
                                               NULL, &error);
 
         g_assert_no_error (error);
-        g_assert_cmpint (read, >, 0);
+        g_assert_cmpint (read, >=, 0);
 
         GBytes *bytes = g_memory_output_stream_steal_as_bytes (G_MEMORY_OUTPUT_STREAM (out));
         g_object_unref (out);
@@ -605,6 +605,113 @@ do_invalid_header_test (Test *test, gconstpointer data)
         }
 }
 
+static void
+content_sniffed (SoupMessage *msg,
+                 char        *content_type,
+                 GHashTable  *params)
+{
+        soup_test_assert (g_object_get_data (G_OBJECT (msg), "got-chunk") == NULL,
+                          "got-chunk got emitted before content-sniffed");
+
+        g_object_set_data (G_OBJECT (msg), "content-sniffed", GINT_TO_POINTER (TRUE));
+}
+
+static void
+got_headers (SoupMessage *msg)
+{
+        soup_test_assert (g_object_get_data (G_OBJECT (msg), "content-sniffed") == NULL,
+                          "content-sniffed got emitted before got-headers");
+
+        g_object_set_data (G_OBJECT (msg), "got-headers", GINT_TO_POINTER (TRUE));
+}
+
+static void
+sniffer_test_send_ready_cb (SoupSession   *session,
+                            GAsyncResult  *result,
+                            GInputStream **stream)
+{
+        GError *error = NULL;
+
+        *stream = soup_session_send_finish (session, result, &error);
+        g_assert_no_error (error);
+        g_assert_nonnull (*stream);
+}
+
+static void
+do_one_sniffer_test (SoupSession  *session,
+                     const char   *uri,
+                     gsize         expected_size,
+                     gboolean      should_sniff,
+                     GMainContext *async_context)
+{
+        SoupMessage *msg;
+        GInputStream *stream = NULL;
+        GBytes *bytes;
+
+        msg = soup_message_new (SOUP_METHOD_GET, uri);
+        g_object_connect (msg,
+                          "signal::got-headers", got_headers, NULL,
+                          "signal::content-sniffed", content_sniffed, NULL,
+                          NULL);
+        if (async_context) {
+                soup_session_send_async (session, msg, G_PRIORITY_DEFAULT, NULL,
+                                         (GAsyncReadyCallback)sniffer_test_send_ready_cb,
+                                         &stream);
+
+                while (!stream)
+                        g_main_context_iteration (async_context, TRUE);
+        } else {
+                GError *error = NULL;
+
+                stream = soup_session_send (session, msg, NULL, &error);
+                g_assert_no_error (error);
+                g_assert_nonnull (stream);
+        }
+
+        if (should_sniff) {
+                soup_test_assert (g_object_get_data (G_OBJECT (msg), "content-sniffed") != NULL,
+                                  "content-sniffed did not get emitted");
+        } else {
+                soup_test_assert (g_object_get_data (G_OBJECT (msg), "content-sniffed") == NULL,
+                                  "content-sniffed got emitted without a sniffer");
+        }
+
+        bytes = read_stream_to_bytes_sync (stream);
+        g_assert_cmpuint (g_bytes_get_size (bytes), ==, expected_size);
+
+        g_object_unref (stream);
+        g_bytes_unref (bytes);
+        g_object_unref (msg);
+}
+
+static void
+do_sniffer_async_test (Test *test, gconstpointer data)
+{
+        GMainContext *async_context = g_main_context_ref_thread_default ();
+
+        soup_session_add_feature_by_type (test->session, SOUP_TYPE_CONTENT_SNIFFER);
+
+        do_one_sniffer_test (test->session, "https://127.0.0.1:5000/";, 11, TRUE, async_context);
+        do_one_sniffer_test (test->session, "https://127.0.0.1:5000/large";, (1024 * 24) + 1, TRUE, 
async_context);
+        do_one_sniffer_test (test->session, "https://127.0.0.1:5000/no-content";, 0, FALSE, async_context);
+
+        while (g_main_context_pending (async_context))
+                g_main_context_iteration (async_context, FALSE);
+
+        g_main_context_unref (async_context);
+}
+
+static void
+do_sniffer_sync_test (Test *test, gconstpointer data)
+{
+        soup_session_add_feature_by_type (test->session, SOUP_TYPE_CONTENT_SNIFFER);
+
+        do_one_sniffer_test (test->session, "https://127.0.0.1:5000/";, 11, TRUE, NULL);
+        /* FIXME: large seems to be broken in sync mode */
+        /* do_one_sniffer_test (test->session, "https://127.0.0.1:5000/large";, (1024 * 24) + 1, TRUE, NULL); 
*/
+        do_one_sniffer_test (test->session, "https://127.0.0.1:5000/no-content";, 0, FALSE, NULL);
+}
+
 int
 main (int argc, char **argv)
 {
@@ -691,8 +798,14 @@ main (int argc, char **argv)
                     setup_session,
                     do_invalid_header_test,
                     teardown_session);
-
-
+        g_test_add ("/http2/sniffer/async", Test, NULL,
+                    setup_session,
+                    do_sniffer_async_test,
+                    teardown_session);
+        g_test_add ("/http2/sniffer/sync", Test, NULL,
+                    setup_session,
+                    do_sniffer_sync_test,
+                    teardown_session);
 
        ret = g_test_run ();
 


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]