[libsoup/carlosgc/http2-sniffer] http2: add support for content sniffing




commit f938de717db191e249613f3815009428a3440ffc
Author: Carlos Garcia Campos <cgarcia igalia com>
Date:   Tue May 18 18:16:09 2021 +0200

    http2: add support for content sniffing

 libsoup/http2/soup-client-message-io-http2.c |  81 ++++++++++++------
 tests/http2-test.c                           | 118 ++++++++++++++++++++++++++-
 2 files changed, 174 insertions(+), 25 deletions(-)
---
diff --git a/libsoup/http2/soup-client-message-io-http2.c b/libsoup/http2/soup-client-message-io-http2.c
index 90edcefa..068e1564 100644
--- a/libsoup/http2/soup-client-message-io-http2.c
+++ b/libsoup/http2/soup-client-message-io-http2.c
@@ -54,6 +54,7 @@ typedef enum {
         STATE_WRITE_DATA,
         STATE_WRITE_DONE,
         STATE_READ_HEADERS,
+        STATE_READ_DATA_START,
         STATE_READ_DATA,
         STATE_READ_DONE,
         STATE_ERROR,
@@ -114,7 +115,7 @@ typedef struct {
 } SoupHTTP2MessageData;
 
 static void soup_client_message_io_http2_finished (SoupClientMessageIO *, SoupMessage *);
-static gboolean io_read_or_write (SoupClientMessageIOHTTP2 *, gboolean, GCancellable *, GError **);
+static gboolean io_read (SoupClientMessageIOHTTP2 *, gboolean, GCancellable *, GError **);
 
 static void
 NGCHECK (int return_code)
@@ -175,6 +176,8 @@ state_to_string (SoupHTTP2IOState state)
                 return "WRITE_DONE";
         case STATE_READ_HEADERS:
                 return "READ_HEADERS";
+        case STATE_READ_DATA_START:
+                return "READ_DATA_START";
         case STATE_READ_DATA:
                 return "READ_DATA";
         case STATE_READ_DONE:
@@ -320,8 +323,10 @@ memory_stream_need_more_data_callback (SoupBodyInputStreamHttp2 *stream,
         SoupHTTP2MessageData *data = (SoupHTTP2MessageData*)user_data;
         GError *error = NULL;
 
-        io_read_or_write (data->io, blocking, cancellable, &error);
+        if (!nghttp2_session_want_read (data->io->session))
+                return blocking ? NULL : g_error_new_literal (G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK, 
_("Operation would block"));
 
+        io_read (data->io, blocking, cancellable, &error);
         return error;
 }
 
@@ -343,19 +348,20 @@ on_begin_frame_callback (nghttp2_session        *session,
                         advance_state_from (data, STATE_ANY, STATE_READ_HEADERS);
                 break;
         case NGHTTP2_DATA: {
-                if (data->state < STATE_READ_DATA)
-                        advance_state_from (data, STATE_ANY, STATE_READ_DATA);
-
-                if (!data->body_istream) {
+                if (data->state < STATE_READ_DATA_START) {
+                        g_assert (!data->body_istream);
                         data->body_istream = soup_body_input_stream_http2_new (G_POLLABLE_INPUT_STREAM 
(data->io->istream));
                         g_signal_connect (data->body_istream, "need-more-data",
                                           G_CALLBACK (memory_stream_need_more_data_callback), data);
-                }
-                if (!data->decoded_data_istream)
+
+                        g_assert (!data->decoded_data_istream);
                         data->decoded_data_istream = soup_session_setup_message_body_input_stream 
(data->item->session,
                                                                                                    data->msg,
                                                                                                    
data->body_istream,
                                                                                                    
SOUP_STAGE_MESSAGE_BODY);
+
+                        advance_state_from (data, STATE_READ_HEADERS, STATE_READ_DATA_START);
+                }
                 break;
         }
         }
@@ -434,7 +440,6 @@ on_frame_recv_callback (nghttp2_session     *session,
                         data->metrics->response_body_bytes_received += frame->data.hd.length + 
FRAME_HEADER_SIZE;
                 if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM) {
                         soup_body_input_stream_http2_complete (SOUP_BODY_INPUT_STREAM_HTTP2 
(data->body_istream));
-                        soup_message_got_body (data->msg);
                 }
                 break;
         case NGHTTP2_RST_STREAM:
@@ -1016,12 +1021,14 @@ soup_client_message_io_http2_get_source (SoupMessage             *msg,
         /* TODO: Handle mixing writes in? */
         if (data->paused)
                 base_source = cancellable ? g_cancellable_source_new (cancellable) : NULL;
-        else if (data->state < STATE_WRITE_DONE)
+        else if (data->state < STATE_WRITE_DONE && nghttp2_session_want_write (io->session))
                 base_source = g_pollable_output_stream_create_source (G_POLLABLE_OUTPUT_STREAM 
(io->ostream), cancellable);
-        else if (data->state < STATE_READ_DONE)
+        else if (data->state < STATE_READ_DONE && nghttp2_session_want_read (io->session))
                 base_source = g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM (io->istream), 
cancellable);
-        else
-                g_assert_not_reached ();
+        else {
+                g_warn_if_reached ();
+                base_source = g_timeout_source_new (0);
+        }
 
         GSource *source = soup_message_io_source_new (base_source, G_OBJECT (msg), data->paused, 
message_source_check);
         g_source_set_callback (source, (GSourceFunc)callback, user_data, NULL);
@@ -1043,6 +1050,7 @@ client_stream_eof (SoupClientInputStream *stream,
         SoupHTTP2MessageData *data = get_data_for_message (io, msg);
         h2_debug (io, data, "Client stream EOF");
         advance_state_from (data, STATE_ANY, STATE_READ_DONE);
+        soup_message_got_body (data->msg);
 }
 
 static GInputStream *
@@ -1119,17 +1127,44 @@ io_write (SoupClientMessageIOHTTP2 *io,
         return TRUE;
 }
 
+static void
+io_try_sniff_content (SoupHTTP2MessageData *data,
+                      gboolean              blocking,
+                      GCancellable         *cancellable)
+{
+        GError *error = NULL;
+
+        if (soup_message_try_sniff_content (data->msg, data->decoded_data_istream, blocking, cancellable, 
&error)) {
+                h2_debug (data->io, data, "[DATA] Sniffed content");
+                advance_state_from (data, STATE_READ_DATA_START, STATE_READ_DATA);
+        } else {
+                h2_debug (data->io, data, "[DATA] Sniffer stream was not ready %s", error->message);
+
+                g_clear_error (&error);
+        }
+}
+
 static gboolean
-io_read_or_write (SoupClientMessageIOHTTP2 *io,
-                  gboolean                  blocking,
-                  GCancellable             *cancellable,
-                  GError                  **error)
+io_run (SoupHTTP2MessageData *data,
+        gboolean              blocking,
+        GCancellable         *cancellable,
+        GError              **error)
 {
-        /* TODO: This can possibly more inteligent about what actually needs
-           writing so we can prioritize better. */
-        if (nghttp2_session_want_write (io->session))
-                return io_write (io, blocking, cancellable, error);
-        return io_read (io, blocking, cancellable, error);
+        gboolean progress = FALSE;
+
+        if (data->state == STATE_READ_DATA_START)
+                io_try_sniff_content (data, blocking, cancellable);
+
+        if (nghttp2_session_want_write (data->io->session))
+                progress = io_write (data->io, blocking, cancellable, error);
+        else if (nghttp2_session_want_read (data->io->session)) {
+                progress = io_read (data->io, blocking, cancellable, error);
+
+                if (progress && data->state == STATE_READ_DATA_START)
+                        io_try_sniff_content (data, blocking, cancellable);
+        }
+
+        return progress;
 }
 
 static gboolean
@@ -1156,7 +1191,7 @@ io_run_until (SoupMessage      *msg,
        g_object_ref (msg);
 
        while (progress && get_io_data (msg) == io && !data->paused && data->state < state) {
-                progress = io_read_or_write (io, blocking, cancellable, &my_error);
+                progress = io_run (data, blocking, cancellable, &my_error);
        }
 
        if (my_error || (my_error = get_error_for_data (io, data))) {
diff --git a/tests/http2-test.c b/tests/http2-test.c
index d5c75c43..cc0bdaa9 100644
--- a/tests/http2-test.c
+++ b/tests/http2-test.c
@@ -111,7 +111,7 @@ read_stream_to_bytes_sync (GInputStream *stream)
                                               NULL, &error);
 
         g_assert_no_error (error);
-        g_assert_cmpint (read, >, 0);
+        g_assert_cmpint (read, >=, 0);
 
         GBytes *bytes = g_memory_output_stream_steal_as_bytes (G_MEMORY_OUTPUT_STREAM (out));
         g_object_unref (out);
@@ -549,6 +549,113 @@ do_preconnect_test (Test *test, gconstpointer data)
         g_main_context_unref (async_context);
 }
 
+static void
+content_sniffed (SoupMessage *msg,
+                 char        *content_type,
+                 GHashTable  *params)
+{
+        soup_test_assert (g_object_get_data (G_OBJECT (msg), "got-chunk") == NULL,
+                          "got-chunk got emitted before content-sniffed");
+
+        g_object_set_data (G_OBJECT (msg), "content-sniffed", GINT_TO_POINTER (TRUE));
+}
+
+static void
+got_headers (SoupMessage *msg)
+{
+        soup_test_assert (g_object_get_data (G_OBJECT (msg), "content-sniffed") == NULL,
+                          "content-sniffed got emitted before got-headers");
+
+        g_object_set_data (G_OBJECT (msg), "got-headers", GINT_TO_POINTER (TRUE));
+}
+
+static void
+sniffer_test_send_ready_cb (SoupSession   *session,
+                            GAsyncResult  *result,
+                            GInputStream **stream)
+{
+        GError *error = NULL;
+
+        *stream = soup_session_send_finish (session, result, &error);
+        g_assert_no_error (error);
+        g_assert_nonnull (*stream);
+}
+
+static void
+do_one_sniffer_test (SoupSession  *session,
+                     const char   *uri,
+                     gsize         expected_size,
+                     gboolean      should_sniff,
+                     GMainContext *async_context)
+{
+        SoupMessage *msg;
+        GInputStream *stream = NULL;
+        GBytes *bytes;
+
+        msg = soup_message_new (SOUP_METHOD_GET, uri);
+        g_object_connect (msg,
+                          "signal::got-headers", got_headers, NULL,
+                          "signal::content-sniffed", content_sniffed, NULL,
+                          NULL);
+        if (async_context) {
+                soup_session_send_async (session, msg, G_PRIORITY_DEFAULT, NULL,
+                                         (GAsyncReadyCallback)sniffer_test_send_ready_cb,
+                                         &stream);
+
+                while (!stream)
+                        g_main_context_iteration (async_context, TRUE);
+        } else {
+                GError *error = NULL;
+
+                stream = soup_session_send (session, msg, NULL, &error);
+                g_assert_no_error (error);
+                g_assert_nonnull (stream);
+        }
+
+        if (should_sniff) {
+                soup_test_assert (g_object_get_data (G_OBJECT (msg), "content-sniffed") != NULL,
+                                  "content-sniffed did not get emitted");
+        } else {
+                soup_test_assert (g_object_get_data (G_OBJECT (msg), "content-sniffed") == NULL,
+                                  "content-sniffed got emitted without a sniffer");
+        }
+
+        bytes = read_stream_to_bytes_sync (stream);
+        g_assert_cmpuint (g_bytes_get_size (bytes), ==, expected_size);
+
+        g_object_unref (stream);
+        g_bytes_unref (bytes);
+        g_object_unref (msg);
+}
+
+static void
+do_sniffer_async_test (Test *test, gconstpointer data)
+{
+        GMainContext *async_context = g_main_context_ref_thread_default ();
+
+        soup_session_add_feature_by_type (test->session, SOUP_TYPE_CONTENT_SNIFFER);
+
+        do_one_sniffer_test (test->session, "https://127.0.0.1:5000/";, 11, TRUE, async_context);
+        do_one_sniffer_test (test->session, "https://127.0.0.1:5000/large";, (1024 * 24) + 1, TRUE, 
async_context);
+        do_one_sniffer_test (test->session, "https://127.0.0.1:5000/no-content";, 0, FALSE, async_context);
+
+        while (g_main_context_pending (async_context))
+                g_main_context_iteration (async_context, FALSE);
+
+        g_main_context_unref (async_context);
+}
+
+static void
+do_sniffer_sync_test (Test *test, gconstpointer data)
+{
+        soup_session_add_feature_by_type (test->session, SOUP_TYPE_CONTENT_SNIFFER);
+
+        do_one_sniffer_test (test->session, "https://127.0.0.1:5000/";, 11, TRUE, NULL);
+        /* FIXME: large seems to be broken in sync mode */
+        /* do_one_sniffer_test (test->session, "https://127.0.0.1:5000/large";, (1024 * 24) + 1, TRUE, NULL); 
*/
+        do_one_sniffer_test (test->session, "https://127.0.0.1:5000/no-content";, 0, FALSE, NULL);
+}
+
 int
 main (int argc, char **argv)
 {
@@ -627,7 +734,14 @@ main (int argc, char **argv)
                     setup_session,
                     do_cancellation_test,
                     teardown_session);
-
+        g_test_add ("/http2/sniffer/async", Test, NULL,
+                    setup_session,
+                    do_sniffer_async_test,
+                    teardown_session);
+        g_test_add ("/http2/sniffer/sync", Test, NULL,
+                    setup_session,
+                    do_sniffer_sync_test,
+                    teardown_session);
 
 
        ret = g_test_run ();


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]