[libsoup/carlosgc/http2-io: 1/2] io-http2: simplify async io handling




commit 3357288cec5471614351e9ef23bdf2fd98e2ab58
Author: Carlos Garcia Campos <cgarcia igalia com>
Date:   Sun May 30 08:50:49 2021 +0200

    io-http2: simplify async io handling
    
    Use a global polling source for reading and process any pending io
    operation after every successful read. Every time we submit data to the
    session we try to write, using a single global polling source in case
    the operation would block until everything is written. This simplifies
    the io handling and avoids the creation and destruction of a lot of
    polling sources.

 libsoup/http2/soup-body-input-stream-http2.c |  87 ++--
 libsoup/http2/soup-body-input-stream-http2.h |   5 +-
 libsoup/http2/soup-client-message-io-http2.c | 660 +++++++++++----------------
 libsoup/soup-message-private.h               |   1 +
 libsoup/soup-message.c                       |   8 +
 tests/http2-body-stream-test.c               |  12 +-
 tests/http2-test.c                           |  10 +-
 7 files changed, 320 insertions(+), 463 deletions(-)
---
diff --git a/libsoup/http2/soup-body-input-stream-http2.c b/libsoup/http2/soup-body-input-stream-http2.c
index 2dacb3e9..82f14ee5 100644
--- a/libsoup/http2/soup-body-input-stream-http2.c
+++ b/libsoup/http2/soup-body-input-stream-http2.c
@@ -43,11 +43,11 @@ struct _SoupBodyInputStreamHttp2 {
 
 typedef struct {
         GSList *chunks;
-        GPollableInputStream *parent_stream;
         gsize start_offset;
         gsize len;
         gsize pos;
         gboolean completed;
+        GCancellable *need_more_data_cancellable;
 } SoupBodyInputStreamHttp2Private;
 
 static void soup_body_input_stream_http2_pollable_iface_init (GPollableInputStreamInterface *iface);
@@ -72,24 +72,15 @@ static guint signals [LAST_SIGNAL] = { 0 };
  * Returns: a new #GInputStream
  */
 GInputStream *
-soup_body_input_stream_http2_new (GPollableInputStream *parent_stream)
+soup_body_input_stream_http2_new ()
 {
-        GInputStream *stream;
-        SoupBodyInputStreamHttp2Private *priv;
-
-        g_assert (G_IS_POLLABLE_INPUT_STREAM (parent_stream));
-
-        stream = g_object_new (SOUP_TYPE_BODY_INPUT_STREAM_HTTP2, NULL);
-        priv = soup_body_input_stream_http2_get_instance_private (SOUP_BODY_INPUT_STREAM_HTTP2 (stream));
-        priv->parent_stream = g_object_ref (parent_stream);
-
-        return stream;
+        return G_INPUT_STREAM (g_object_new (SOUP_TYPE_BODY_INPUT_STREAM_HTTP2, NULL));
 }
 
 void
 soup_body_input_stream_http2_add_data (SoupBodyInputStreamHttp2 *stream,
-                                       const guint8          *data,
-                                       gsize                  size)
+                                       const guint8             *data,
+                                       gsize                     size)
 {
         SoupBodyInputStreamHttp2Private *priv;
 
@@ -100,6 +91,21 @@ soup_body_input_stream_http2_add_data (SoupBodyInputStreamHttp2 *stream,
 
         priv->chunks = g_slist_append (priv->chunks, g_bytes_new (data, size));
         priv->len += size;
+        if (priv->need_more_data_cancellable) {
+                g_cancellable_cancel (priv->need_more_data_cancellable);
+                g_clear_object (&priv->need_more_data_cancellable);
+        }
+}
+
+gboolean
+soup_body_input_stream_http2_is_blocked (SoupBodyInputStreamHttp2 *stream)
+{
+        SoupBodyInputStreamHttp2Private *priv;
+
+        g_return_val_if_fail (SOUP_IS_BODY_INPUT_STREAM_HTTP2 (stream), FALSE);
+
+        priv = soup_body_input_stream_http2_get_instance_private (stream);
+        return priv->need_more_data_cancellable != NULL;
 }
 
 static gssize
@@ -174,9 +180,7 @@ soup_body_input_stream_http2_read_real (GInputStream  *stream,
         if (count == 0 && blocking && !priv->completed) {
                 GError *read_error = NULL;
                 g_signal_emit (memory_stream, signals[NEED_MORE_DATA], 0,
-                               cancellable,
-                               TRUE,
-                               &read_error);
+                               cancellable, &read_error);
 
                 if (read_error) {
                         g_propagate_error (error, read_error);
@@ -214,28 +218,7 @@ soup_body_input_stream_http2_read_nonblocking (GPollableInputStream  *stream,
         gsize read = soup_body_input_stream_http2_read_real (G_INPUT_STREAM (stream), FALSE, buffer, count, 
NULL, &inner_error);
 
         if (read == 0 && !priv->completed && !inner_error) {
-                /* Try requesting more reads from the io backend */
-                GError *inner_error = NULL;
-
-                g_signal_emit (memory_stream, signals[NEED_MORE_DATA], 0,
-                               NULL, FALSE, &inner_error);
-
-                if (inner_error) {
-                        g_propagate_error (error, inner_error);
-
-                        return -1;
-                }
-
-                if (priv->completed)
-                        return soup_body_input_stream_http2_read_real (G_INPUT_STREAM (stream), FALSE, 
buffer, count, NULL, error);
-
-                if (priv->pos < priv->len) {
-                        read = soup_body_input_stream_http2_read_real (G_INPUT_STREAM (stream), FALSE, 
buffer, count, NULL, NULL);
-                        if (read > 0)
-                                return read;
-                }
-
-                g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK, "Operation would block");
+                g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK, _("Operation would block"));
                 return -1;
         }
 
@@ -250,6 +233,10 @@ soup_body_input_stream_http2_complete (SoupBodyInputStreamHttp2 *stream)
 {
         SoupBodyInputStreamHttp2Private *priv = soup_body_input_stream_http2_get_instance_private (stream);
         priv->completed = TRUE;
+        if (priv->need_more_data_cancellable) {
+                g_cancellable_cancel (priv->need_more_data_cancellable);
+                g_clear_object (&priv->need_more_data_cancellable);
+        }
 }
 
 static gssize
@@ -352,13 +339,9 @@ soup_body_input_stream_http2_close_finish (GInputStream  *stream,
 static gboolean
 soup_body_input_stream_http2_is_readable (GPollableInputStream *stream)
 {
-        SoupBodyInputStreamHttp2 *memory_stream = SOUP_BODY_INPUT_STREAM_HTTP2 (stream);
-        SoupBodyInputStreamHttp2Private *priv = soup_body_input_stream_http2_get_instance_private 
(memory_stream);
-
-        if (priv->pos < priv->len || priv->completed)
-                return TRUE;
+        SoupBodyInputStreamHttp2Private *priv = soup_body_input_stream_http2_get_instance_private 
(SOUP_BODY_INPUT_STREAM_HTTP2 (stream));
 
-        return g_pollable_input_stream_is_readable (G_POLLABLE_INPUT_STREAM (priv->parent_stream));
+        return priv->pos < priv->len || priv->completed;
 }
 
 static GSource *
@@ -368,10 +351,9 @@ soup_body_input_stream_http2_create_source (GPollableInputStream *stream,
         SoupBodyInputStreamHttp2Private *priv = soup_body_input_stream_http2_get_instance_private 
(SOUP_BODY_INPUT_STREAM_HTTP2 (stream));
         GSource *base_source, *pollable_source;
 
-        if (g_pollable_input_stream_is_readable (stream))
-                base_source = g_timeout_source_new (0);
-        else
-                base_source = g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM 
(priv->parent_stream), NULL);
+        if (!priv->need_more_data_cancellable)
+                priv->need_more_data_cancellable = g_cancellable_new ();
+        base_source = g_cancellable_source_new (priv->need_more_data_cancellable);
 
         pollable_source = g_pollable_source_new_full (stream, base_source, cancellable);
         g_source_set_name (pollable_source, "SoupMemoryStreamSource");
@@ -387,6 +369,10 @@ soup_body_input_stream_http2_dispose (GObject *object)
         SoupBodyInputStreamHttp2Private *priv = soup_body_input_stream_http2_get_instance_private (stream);
 
         priv->completed = TRUE;
+        if (priv->need_more_data_cancellable) {
+               g_cancellable_cancel (priv->need_more_data_cancellable);
+                g_clear_object (&priv->need_more_data_cancellable);
+        }
 
         G_OBJECT_CLASS (soup_body_input_stream_http2_parent_class)->dispose (object);
 }
@@ -398,7 +384,6 @@ soup_body_input_stream_http2_finalize (GObject *object)
         SoupBodyInputStreamHttp2Private *priv = soup_body_input_stream_http2_get_instance_private (stream);
 
         g_slist_free_full (priv->chunks, (GDestroyNotify)g_bytes_unref);
-        g_clear_object (&priv->parent_stream);
 
         G_OBJECT_CLASS (soup_body_input_stream_http2_parent_class)->finalize (object);
 }
@@ -444,5 +429,5 @@ soup_body_input_stream_http2_class_init (SoupBodyInputStreamHttp2Class *klass)
                               NULL, NULL,
                               NULL,
                               G_TYPE_ERROR,
-                              2, G_TYPE_CANCELLABLE, G_TYPE_BOOLEAN);
+                              1, G_TYPE_CANCELLABLE);
 }
diff --git a/libsoup/http2/soup-body-input-stream-http2.h b/libsoup/http2/soup-body-input-stream-http2.h
index 9e25ed34..b55cd6f5 100644
--- a/libsoup/http2/soup-body-input-stream-http2.h
+++ b/libsoup/http2/soup-body-input-stream-http2.h
@@ -6,7 +6,7 @@
 #define SOUP_TYPE_BODY_INPUT_STREAM_HTTP2 (soup_body_input_stream_http2_get_type ())
 G_DECLARE_FINAL_TYPE (SoupBodyInputStreamHttp2, soup_body_input_stream_http2, SOUP, BODY_INPUT_STREAM_HTTP2, 
GInputStream)
 
-GInputStream * soup_body_input_stream_http2_new        (GPollableInputStream     *parent_stream);
+GInputStream * soup_body_input_stream_http2_new        (void);
 
 void           soup_body_input_stream_http2_add_data   (SoupBodyInputStreamHttp2 *stream,
                                                         const guint8             *data,
@@ -14,4 +14,7 @@ void           soup_body_input_stream_http2_add_data   (SoupBodyInputStreamHttp2
 
 void           soup_body_input_stream_http2_complete   (SoupBodyInputStreamHttp2 *stream);
 
+/* This is only used for tests */
+gboolean       soup_body_input_stream_http2_is_blocked (SoupBodyInputStreamHttp2 *stream);
+
 G_END_DECLS
diff --git a/libsoup/http2/soup-client-message-io-http2.c b/libsoup/http2/soup-client-message-io-http2.c
index fbb73f7f..6df9ff0a 100644
--- a/libsoup/http2/soup-client-message-io-http2.c
+++ b/libsoup/http2/soup-client-message-io-http2.c
@@ -66,10 +66,13 @@ typedef struct {
         GOutputStream *ostream;
         guint64 connection_id;
 
-        GMainContext *async_context;
+        GError *error;
+        GSource *read_source;
+        GSource *write_source;
 
         GHashTable *messages;
         GHashTable *closed_messages;
+        GList *pending_io_messages;
 
         nghttp2_session *session;
 
@@ -78,12 +81,9 @@ typedef struct {
         gssize write_buffer_size;
         gssize written_bytes;
 
-        GSource *reset_stream_source;
-        GSource *idle_read_source;
         gboolean is_shutdown;
-
         GTask *close_task;
-        GSource *close_source;
+        gboolean session_terminated;
         gboolean goaway_sent;
 } SoupClientMessageIOHTTP2;
 
@@ -95,14 +95,11 @@ typedef struct {
         GInputStream *decoded_data_istream;
         GInputStream *body_istream;
         GTask *task;
-        gboolean in_run_until_read_async;
+        gboolean in_io_try_sniff_content;
 
         /* Request body logger */
         SoupLogger *logger;
 
-        /* Both data sources */
-        GCancellable *data_source_cancellable;
-
         /* Pollable data sources */
         GSource *data_source_poll;
 
@@ -111,7 +108,6 @@ typedef struct {
         GError *data_source_error;
         gboolean data_source_eof;
 
-        GSource *io_source;
         SoupClientMessageIOHTTP2 *io; /* Unowned */
         SoupMessageIOCompletionFn completion_cb;
         gpointer completion_data;
@@ -122,12 +118,7 @@ typedef struct {
         gboolean can_be_restarted;
 } SoupHTTP2MessageData;
 
-static void io_run_until_read_async (SoupHTTP2MessageData *data);
-static gboolean io_read (SoupClientMessageIOHTTP2 *, gboolean, GCancellable *, GError **);
-static void io_write_until_stream_reset_is_sent (SoupHTTP2MessageData *data);
-static void io_idle_read (SoupClientMessageIOHTTP2 *io);
-static void io_close (SoupClientMessageIOHTTP2 *io);
-static void io_poll (SoupHTTP2MessageData *data);
+static void soup_client_message_io_http2_finished (SoupClientMessageIO *iface, SoupMessage *msg);
 
 static void
 NGCHECK (int return_code)
@@ -255,6 +246,18 @@ set_error_for_data (SoupHTTP2MessageData *data,
                 g_error_free (error);
 }
 
+static void
+set_io_error (SoupClientMessageIOHTTP2 *io,
+              GError                   *error)
+{
+        h2_debug (io, NULL, "[SESSION] IO error: %s", error->message);
+
+        if (!io->error)
+                io->error = error;
+        else
+                g_error_free (error);
+}
+
 static void
 advance_state_from (SoupHTTP2MessageData *data,
                     SoupHTTP2IOState      from,
@@ -279,18 +282,216 @@ advance_state_from (SoupHTTP2MessageData *data,
         data->state = to;
 }
 
+static void
+soup_http2_message_data_check_status (SoupHTTP2MessageData *data)
+{
+        SoupClientMessageIOHTTP2 *io = data->io;
+        SoupMessage *msg = data->msg;
+        GTask *task = data->task;
+        GError *error = NULL;
+
+        if (g_cancellable_set_error_if_cancelled (g_task_get_cancellable (task), &error)) {
+                io->pending_io_messages = g_list_remove (io->pending_io_messages, data);
+                data->task = NULL;
+                g_task_return_error (task, error);
+                g_object_unref (task);
+                return;
+        }
+
+        if (data->paused)
+                return;
+
+        if (io->error && !data->error)
+                data->error = g_error_copy (io->error);
+
+        if (data->error) {
+                GError *error = g_steal_pointer (&data->error);
+
+                if (data->can_be_restarted)
+                        data->item->state = SOUP_MESSAGE_RESTARTING;
+                else
+                        soup_message_set_metrics_timestamp (data->msg, SOUP_MESSAGE_METRICS_RESPONSE_END);
+                io->pending_io_messages = g_list_remove (io->pending_io_messages, data);
+                data->task = NULL;
+                soup_client_message_io_http2_finished ((SoupClientMessageIO *)io, msg);
+
+                g_task_return_error (task, error);
+                g_object_unref (task);
+                return;
+        }
+
+        if (data->state == STATE_READ_DATA_START && !soup_message_has_content_sniffer (msg))
+                advance_state_from (data, STATE_READ_DATA_START, STATE_READ_DATA);
+
+        if (data->state < STATE_READ_DATA)
+                return;
+
+        io->pending_io_messages = g_list_remove (io->pending_io_messages, data);
+        data->task = NULL;
+        g_task_return_boolean (task, TRUE);
+        g_object_unref (task);
+}
+
+static gboolean
+io_write (SoupClientMessageIOHTTP2 *io,
+          gboolean                  blocking,
+          GCancellable             *cancellable,
+          GError                  **error)
+{
+        /* We must write all of nghttp2's buffer before we ask for more */
+        if (io->written_bytes == io->write_buffer_size)
+                io->write_buffer = NULL;
+
+        if (io->write_buffer == NULL) {
+                io->written_bytes = 0;
+                io->write_buffer_size = nghttp2_session_mem_send (io->session, (const 
guint8**)&io->write_buffer);
+                NGCHECK (io->write_buffer_size);
+                if (io->write_buffer_size == 0) {
+                        /* Done */
+                        io->write_buffer = NULL;
+                        return TRUE;
+                }
+        }
+
+        gssize ret = g_pollable_stream_write (io->ostream,
+                                              io->write_buffer + io->written_bytes,
+                                              io->write_buffer_size - io->written_bytes,
+                                              blocking, cancellable, error);
+        if (ret < 0)
+                return FALSE;
+
+        io->written_bytes += ret;
+        return TRUE;
+}
+
+static gboolean
+io_write_ready (GObject                  *stream,
+                SoupClientMessageIOHTTP2 *io)
+{
+        GError *error = NULL;
+
+        while (nghttp2_session_want_write (io->session) && !error)
+                io_write (io, FALSE, NULL, &error);
+
+        if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
+                g_error_free (error);
+                return G_SOURCE_CONTINUE;
+        }
+
+        if (error)
+                set_io_error (io, error);
+
+        g_clear_pointer (&io->write_source, g_source_unref);
+        return G_SOURCE_REMOVE;
+}
+
+static void
+io_try_write (SoupClientMessageIOHTTP2 *io)
+{
+        GError *error = NULL;
+
+        if (io->write_source)
+                return;
+
+        while (nghttp2_session_want_write (io->session) && !error)
+                io_write (io, FALSE, NULL, &error);
+
+        if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
+                g_clear_error (&error);
+                io->write_source = g_pollable_output_stream_create_source (G_POLLABLE_OUTPUT_STREAM 
(io->ostream), NULL);
+                g_source_set_name (io->write_source, "Soup HTTP/2 write source");
+                g_source_set_callback (io->write_source, (GSourceFunc)io_write_ready, io, NULL);
+                g_source_attach (io->write_source, g_main_context_get_thread_default ());
+        }
+
+        if (error)
+                set_io_error (io, error);
+}
+
+static gboolean
+io_read (SoupClientMessageIOHTTP2  *io,
+         gboolean                   blocking,
+         GCancellable              *cancellable,
+         GError                   **error)
+{
+        guint8 buffer[8192];
+        gssize read;
+        int ret;
+
+        if ((read = g_pollable_stream_read (io->istream, buffer, sizeof (buffer),
+                                            blocking, cancellable, error)) < 0)
+            return FALSE;
+
+        ret = nghttp2_session_mem_recv (io->session, buffer, read);
+        NGCHECK (ret);
+        return ret != 0;
+}
+
+static gboolean
+io_read_ready (GObject                  *stream,
+               SoupClientMessageIOHTTP2 *io)
+{
+        GError *error = NULL;
+        gboolean progress = TRUE;
+
+        while (nghttp2_session_want_read (io->session) && progress) {
+                progress = io_read (io, FALSE, NULL, &error);
+                if (progress) {
+                        g_list_foreach (io->pending_io_messages,
+                                        (GFunc)soup_http2_message_data_check_status,
+                                        NULL);
+                }
+        }
+
+        if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
+                g_error_free (error);
+                return G_SOURCE_CONTINUE;
+        }
+
+        if (error)
+                set_io_error (io, error);
+
+        g_clear_pointer (&io->read_source, g_source_unref);
+        return G_SOURCE_REMOVE;
+}
+
+static void
+io_try_sniff_content (SoupHTTP2MessageData *data,
+                      gboolean              blocking,
+                      GCancellable         *cancellable)
+{
+        GError *error = NULL;
+
+        /* This can re-enter in sync mode */
+        if (data->in_io_try_sniff_content)
+                return;
+
+        data->in_io_try_sniff_content = TRUE;
+
+        if (soup_message_try_sniff_content (data->msg, data->decoded_data_istream, blocking, cancellable, 
&error)) {
+                h2_debug (data->io, data, "[DATA] Sniffed content");
+                advance_state_from (data, STATE_READ_DATA_START, STATE_READ_DATA);
+        } else {
+                h2_debug (data->io, data, "[DATA] Sniffer stream was not ready %s", error->message);
+
+                g_clear_error (&error);
+        }
+
+        data->in_io_try_sniff_content = FALSE;
+}
+
 static void
 soup_client_message_io_http2_terminate_session (SoupClientMessageIOHTTP2 *io)
 {
-        if (io->goaway_sent)
+        if (io->session_terminated)
                 return;
 
         if (g_hash_table_size (io->messages) != 0)
                 return;
 
-        io->goaway_sent = TRUE;
+        io->session_terminated = TRUE;
         NGCHECK (nghttp2_session_terminate_session (io->session, NGHTTP2_NO_ERROR));
-        io_close (io);
+        io_try_write (io);
 }
 
 /* HTTP2 read callbacks */
@@ -329,15 +530,13 @@ on_header_callback (nghttp2_session     *session,
 static GError *
 memory_stream_need_more_data_callback (SoupBodyInputStreamHttp2 *stream,
                                        GCancellable             *cancellable,
-                                       gboolean                  blocking,
                                        gpointer                  user_data)
 {
         SoupHTTP2MessageData *data = (SoupHTTP2MessageData*)user_data;
         GError *error = NULL;
 
-        if (!nghttp2_session_want_read (data->io->session))
-                return blocking ? NULL : g_error_new_literal (G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK, 
_("Operation would block"));
-        io_read (data->io, blocking, cancellable, &error);
+        if (nghttp2_session_want_read (data->io->session))
+                io_read (data->io, TRUE, cancellable, &error);
 
         return error;
 }
@@ -364,7 +563,7 @@ on_begin_frame_callback (nghttp2_session        *session,
         case NGHTTP2_DATA:
                 if (data->state < STATE_READ_DATA_START) {
                         g_assert (!data->body_istream);
-                        data->body_istream = soup_body_input_stream_http2_new (G_POLLABLE_INPUT_STREAM 
(data->io->istream));
+                        data->body_istream = soup_body_input_stream_http2_new ();
                         g_signal_connect (data->body_istream, "need-more-data",
                                           G_CALLBACK (memory_stream_need_more_data_callback), data);
 
@@ -455,8 +654,11 @@ on_frame_recv_callback (nghttp2_session     *session,
         case NGHTTP2_DATA:
                 if (data->metrics)
                         data->metrics->response_body_bytes_received += frame->data.hd.length + 
FRAME_HEADER_SIZE;
-                if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM && data->body_istream)
+                if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM && data->body_istream) {
                         soup_body_input_stream_http2_complete (SOUP_BODY_INPUT_STREAM_HTTP2 
(data->body_istream));
+                        if (data->state == STATE_READ_DATA_START)
+                                io_try_sniff_content (data, FALSE, data->cancellable);
+                }
                 break;
         case NGHTTP2_RST_STREAM:
                 if (frame->rst_stream.error_code != NGHTTP2_NO_ERROR) {
@@ -487,6 +689,8 @@ on_data_chunk_recv_callback (nghttp2_session *session,
 
         g_assert (msgdata->body_istream != NULL);
         soup_body_input_stream_http2_add_data (SOUP_BODY_INPUT_STREAM_HTTP2 (msgdata->body_istream), data, 
len);
+        if (msgdata->state == STATE_READ_DATA_START)
+                io_try_sniff_content (msgdata, FALSE, msgdata->cancellable);
 
         return 0;
 }
@@ -567,6 +771,14 @@ on_frame_send_callback (nghttp2_session     *session,
                 h2_debug (io, data, "[SEND] [RST_STREAM] stream_id=%u", frame->hd.stream_id);
                 g_hash_table_foreach_remove (io->closed_messages, (GHRFunc)remove_closed_stream, 
(gpointer)frame);
                 break;
+        case NGHTTP2_GOAWAY:
+                h2_debug (io, data, "[SEND] [%s]", frame_type_to_string (frame->hd.type));
+                io->goaway_sent = TRUE;
+                if (io->close_task) {
+                        g_task_return_boolean (io->close_task, TRUE);
+                        g_clear_object (&io->close_task);
+                }
+                break;
         default:
                 h2_debug (io, data, "[SEND] [%s]", frame_type_to_string (frame->hd.type));
                 break;
@@ -604,15 +816,6 @@ on_stream_close_callback (nghttp2_session *session,
         if (error_code == NGHTTP2_REFUSED_STREAM && data->state < STATE_READ_DATA)
                 data->can_be_restarted = TRUE;
 
-        if (data->state < STATE_READ_DATA && data->task && !data->in_run_until_read_async) {
-                /* Start polling the decoded data stream instead of the network input stream. */
-                if (data->io_source) {
-                        g_source_destroy (data->io_source);
-                        g_clear_pointer (&data->io_source, g_source_unref);
-                }
-                io_poll (data);
-        }
-
         return 0;
 }
 
@@ -622,10 +825,10 @@ on_data_readable (GInputStream *stream,
 {
         SoupHTTP2MessageData *data = (SoupHTTP2MessageData*)user_data;
 
-        g_cancellable_cancel (data->data_source_cancellable);
-        g_clear_object (&data->data_source_cancellable);
+        h2_debug (data->io, data, "on data readable");
 
         NGCHECK (nghttp2_session_resume_data (data->io->session, data->stream_id));
+        io_try_write (data->io);
 
         g_clear_pointer (&data->data_source_poll, g_source_unref);
         return G_SOURCE_REMOVE;
@@ -642,9 +845,6 @@ on_data_read (GInputStream *source,
 
         h2_debug (data->io, data, "[SEND_BODY] Read %zd", read);
 
-        g_cancellable_cancel (data->data_source_cancellable);
-        g_clear_object (&data->data_source_cancellable);
-
         /* This operation may have outlived the message data in which
            case this will have been cancelled. */
         if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
@@ -663,6 +863,7 @@ on_data_read (GInputStream *source,
 
         h2_debug (data->io, data, "[SEND_BODY] Resuming send");
         NGCHECK (nghttp2_session_resume_data (data->io->session, data->stream_id));
+        io_try_write (data->io);
 }
 
 static void
@@ -714,8 +915,6 @@ on_data_source_read_callback (nghttp2_session     *session,
                                 g_source_attach (data->data_source_poll, g_main_context_get_thread_default 
());
 
                                 g_error_free (error);
-                                g_assert (!data->data_source_cancellable);
-                                data->data_source_cancellable = g_cancellable_new ();
                                 return NGHTTP2_ERR_DEFERRED;
                         }
 
@@ -756,8 +955,6 @@ on_data_source_read_callback (nghttp2_session     *session,
                 } else {
                         h2_debug (data->io, data, "[SEND_BODY] Reading async");
                         g_byte_array_set_size (data->data_source_buffer, length);
-                        g_assert (!data->data_source_cancellable);
-                        data->data_source_cancellable = g_cancellable_new ();
                         g_input_stream_read_async (in_stream, data->data_source_buffer->data, length,
                                                    get_data_io_priority (data),
                                                    data->cancellable,
@@ -810,11 +1007,6 @@ soup_http2_message_data_close (SoupHTTP2MessageData *data)
         g_clear_pointer (&data->item, soup_message_queue_item_unref);
         g_clear_object (&data->decoded_data_istream);
 
-        if (data->io_source) {
-                g_source_destroy (data->io_source);
-                g_clear_pointer (&data->io_source, g_source_unref);
-        }
-
         if (data->data_source_poll) {
                 g_source_destroy (data->data_source_poll);
                 g_clear_pointer (&data->data_source_poll, g_source_unref);
@@ -823,8 +1015,6 @@ soup_http2_message_data_close (SoupHTTP2MessageData *data)
         g_clear_error (&data->data_source_error);
         g_clear_pointer (&data->data_source_buffer, g_byte_array_unref);
 
-        g_clear_object (&data->data_source_cancellable);
-
         g_clear_error (&data->error);
 
         data->completion_cb = NULL;
@@ -955,6 +1145,7 @@ send_message_request (SoupMessage          *msg,
         data->stream_id = nghttp2_submit_request (io->session, &priority_spec, (const nghttp2_nv 
*)headers->data, headers->len, body_stream ? &data_provider : NULL, data);
 
         h2_debug (io, data, "[SESSION] Request made for %s%s", authority_header, path_and_query);
+        io_try_write (io);
 
         g_array_free (headers, TRUE);
         g_free (authority);
@@ -962,8 +1153,6 @@ send_message_request (SoupMessage          *msg,
         g_free (path_and_query);
 }
 
-
-
 static void
 soup_client_message_io_http2_send_item (SoupClientMessageIO       *iface,
                                         SoupMessageQueueItem      *item,
@@ -973,10 +1162,6 @@ soup_client_message_io_http2_send_item (SoupClientMessageIO       *iface,
         SoupClientMessageIOHTTP2 *io = (SoupClientMessageIOHTTP2 *)iface;
         SoupHTTP2MessageData *data = add_message_to_io_data (io, item, completion_cb, user_data);
 
-        if (io->idle_read_source) {
-                g_source_destroy (io->idle_read_source);
-                g_clear_pointer (&io->idle_read_source, g_source_unref);
-        }
         send_message_request (item->msg, io, data);
 }
 
@@ -1034,9 +1219,7 @@ soup_client_message_io_http2_finished (SoupClientMessageIO *iface,
                 return;
         }
 
-        io_write_until_stream_reset_is_sent (data);
-        if (g_hash_table_size (io->messages) == 0)
-                io_idle_read (io);
+        io_try_write (io);
 }
 
 static void
@@ -1051,12 +1234,6 @@ soup_client_message_io_http2_pause (SoupClientMessageIO *iface,
         if (data->paused)
                 g_warn_if_reached ();
 
-        if (data->io_source) {
-                g_source_destroy (data->io_source);
-                g_source_unref (data->io_source);
-                data->io_source = NULL;
-        }
-
         data->paused = TRUE;
 }
 
@@ -1073,6 +1250,8 @@ soup_client_message_io_http2_unpause (SoupClientMessageIO *iface,
                 g_warn_if_reached ();
 
         data->paused = FALSE;
+
+        soup_http2_message_data_check_status (data);
 }
 
 static void
@@ -1117,66 +1296,6 @@ soup_client_message_io_http2_is_reusable (SoupClientMessageIO *iface)
         return soup_client_message_io_http2_is_open (iface);
 }
 
-static gboolean
-message_source_check (GSource *source)
-{
-        SoupMessageIOSource *message_source = (SoupMessageIOSource *)source;
-
-        if (message_source->paused) {
-                if (soup_message_is_io_paused (SOUP_MESSAGE (message_source->msg)))
-                        return FALSE;
-                return TRUE;
-        }
-
-        return FALSE;
-}
-
-static gboolean
-io_poll_ready (SoupMessage *msg,
-               gpointer     user_data)
-{
-        SoupHTTP2MessageData *data = user_data;
-
-        io_run_until_read_async (data);
-
-        return G_SOURCE_REMOVE;
-}
-
-static void
-io_poll (SoupHTTP2MessageData *data)
-{
-        GSource *base_source;
-        GCancellable *cancellable;
-
-        g_assert (data->task);
-        g_assert (!data->io_source);
-
-        cancellable = g_task_get_cancellable (data->task);
-
-        /* TODO: Handle mixing writes in? */
-        if (data->paused)
-                base_source = cancellable ? g_cancellable_source_new (cancellable) : NULL;
-        else if (data->state < STATE_WRITE_DONE && data->data_source_cancellable)
-                base_source = g_cancellable_source_new (data->data_source_cancellable);
-        else if (data->state < STATE_WRITE_DONE && nghttp2_session_want_write (data->io->session))
-                base_source = g_pollable_output_stream_create_source (G_POLLABLE_OUTPUT_STREAM 
(data->io->ostream), cancellable);
-        else if (data->state < STATE_READ_DONE && data->decoded_data_istream)
-                base_source = g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM 
(data->decoded_data_istream), cancellable);
-        else if (data->state < STATE_READ_DONE && nghttp2_session_want_read (data->io->session))
-                base_source = g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM 
(data->io->istream), cancellable);
-        else {
-                g_warn_if_reached ();
-                base_source = g_timeout_source_new (0);
-        }
-
-        data->io_source = soup_message_io_source_new (base_source, G_OBJECT (data->msg),
-                                                      data->paused, message_source_check);
-        g_source_set_callback (data->io_source, (GSourceFunc)io_poll_ready, data, NULL);
-        g_source_set_priority (data->io_source, g_task_get_priority (data->task));
-        g_source_attach (data->io_source, data->io->async_context);
-}
-
-
 static void
 client_stream_eof (SoupClientInputStream *stream,
                    gpointer               user_data)
@@ -1219,94 +1338,17 @@ soup_client_message_io_http2_get_response_istream (SoupClientMessageIO  *iface,
         return client_stream;
 }
 
-static gboolean
-io_read (SoupClientMessageIOHTTP2  *io,
-         gboolean             blocking,
-        GCancellable        *cancellable,
-         GError             **error)
-{
-        guint8 buffer[8192];
-        gssize read;
-        int ret;
-
-        if ((read = g_pollable_stream_read (io->istream, buffer, sizeof (buffer),
-                                            blocking, cancellable, error)) < 0)
-            return FALSE;
-
-        ret = nghttp2_session_mem_recv (io->session, buffer, read);
-        NGCHECK (ret);
-        return ret != 0;
-}
-
-static gboolean
-io_write (SoupClientMessageIOHTTP2 *io,
-         gboolean                   blocking,
-        GCancellable              *cancellable,
-         GError                   **error)
-{
-        /* We must write all of nghttp2's buffer before we ask for more */
-
-        if (io->written_bytes == io->write_buffer_size)
-                io->write_buffer = NULL;
-
-        if (io->write_buffer == NULL) {
-                io->written_bytes = 0;
-                io->write_buffer_size = nghttp2_session_mem_send (io->session, (const 
guint8**)&io->write_buffer);
-                NGCHECK (io->write_buffer_size);
-                if (io->write_buffer_size == 0) {
-                        /* Done */
-                        io->write_buffer = NULL;
-                        return TRUE;
-                }
-        }
-
-        gssize ret = g_pollable_stream_write (io->ostream,
-                                              io->write_buffer + io->written_bytes,
-                                              io->write_buffer_size - io->written_bytes,
-                                              blocking, cancellable, error);
-        if (ret < 0)
-                return FALSE;
-
-        io->written_bytes += ret;
-        return TRUE;
-}
-
-static void
-io_try_sniff_content (SoupHTTP2MessageData *data,
-                      gboolean              blocking,
-                      GCancellable         *cancellable)
-{
-        GError *error = NULL;
-
-        if (soup_message_try_sniff_content (data->msg, data->decoded_data_istream, blocking, cancellable, 
&error)) {
-                h2_debug (data->io, data, "[DATA] Sniffed content");
-                advance_state_from (data, STATE_READ_DATA_START, STATE_READ_DATA);
-        } else {
-                h2_debug (data->io, data, "[DATA] Sniffer stream was not ready %s", error->message);
-
-                g_clear_error (&error);
-        }
-}
-
 static gboolean
 io_run (SoupHTTP2MessageData *data,
-        gboolean              blocking,
         GCancellable         *cancellable,
         GError              **error)
 {
         gboolean progress = FALSE;
 
-        if (data->state == STATE_READ_DATA_START)
-                io_try_sniff_content (data, blocking, cancellable);
-
         if (data->state < STATE_WRITE_DONE && nghttp2_session_want_write (data->io->session))
-                progress = io_write (data->io, blocking, cancellable, error);
-        else if (data->state < STATE_READ_DONE && nghttp2_session_want_read (data->io->session)) {
-                progress = io_read (data->io, blocking, cancellable, error);
-
-                if (progress && data->state == STATE_READ_DATA_START)
-                        io_try_sniff_content (data, blocking, cancellable);
-        }
+                progress = io_write (data->io, TRUE, cancellable, error);
+        else if (data->state < STATE_READ_DONE && nghttp2_session_want_read (data->io->session))
+                progress = io_read (data->io, TRUE, cancellable, error);
 
         return progress;
 }
@@ -1314,7 +1356,6 @@ io_run (SoupHTTP2MessageData *data,
 static gboolean
 io_run_until (SoupClientMessageIOHTTP2 *io,
               SoupMessage              *msg,
-              gboolean                  blocking,
               SoupHTTP2IOState          state,
               GCancellable             *cancellable,
               GError                  **error)
@@ -1334,8 +1375,8 @@ io_run_until (SoupClientMessageIOHTTP2 *io,
 
        g_object_ref (msg);
 
-       while (progress && get_io_data (msg) == io && !data->paused && !data->data_source_cancellable && 
data->state < state)
-                progress = io_run (data, blocking, cancellable, &my_error);
+       while (progress && get_io_data (msg) == io && !data->paused && data->state < state)
+                progress = io_run (data, cancellable, &my_error);
 
         if (my_error) {
                 g_propagate_error (error, my_error);
@@ -1359,89 +1400,10 @@ io_run_until (SoupClientMessageIOHTTP2 *io,
 
        done = data->state >= state;
 
-       if (data->paused || (!blocking && !done)) {
-               g_set_error_literal (error, G_IO_ERROR,
-                                    G_IO_ERROR_WOULD_BLOCK,
-                                    _("Operation would block"));
-               g_object_unref (msg);
-               return FALSE;
-       }
-
        g_object_unref (msg);
        return done;
 }
 
-static gboolean
-io_write_until_stream_reset_is_sent_ready (GObject              *stream,
-                                           SoupHTTP2MessageData *data)
-{
-        io_write_until_stream_reset_is_sent (data);
-
-        return G_SOURCE_REMOVE;
-}
-
-static void
-io_write_until_stream_reset_is_sent (SoupHTTP2MessageData *data)
-{
-        SoupClientMessageIOHTTP2 *io = data->io;
-        GError *error = NULL;
-
-        if (io->reset_stream_source) {
-                g_source_destroy (io->reset_stream_source);
-                g_clear_pointer (&io->reset_stream_source, g_source_unref);
-        }
-
-        while (g_hash_table_lookup (io->closed_messages, data)) {
-                if (!nghttp2_session_want_write (io->session)) {
-                        error = g_error_new_literal (G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK, _("Operation would 
block"));
-                        break;
-                }
-                if (!io_write (io, FALSE, FALSE, &error))
-                        break;
-        }
-
-        if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
-                io->reset_stream_source = g_pollable_output_stream_create_source (G_POLLABLE_OUTPUT_STREAM 
(io->ostream), NULL);
-                g_source_set_callback (io->reset_stream_source, 
(GSourceFunc)io_write_until_stream_reset_is_sent_ready, data, NULL);
-                g_source_attach (io->reset_stream_source, g_main_context_get_thread_default ());
-        }
-
-        g_clear_error (&error);
-}
-
-static gboolean
-io_idle_read_ready (GObject                  *stream,
-                    SoupClientMessageIOHTTP2 *io)
-{
-        io_idle_read (io);
-
-        return G_SOURCE_REMOVE;
-}
-
-static void
-io_idle_read (SoupClientMessageIOHTTP2 *io)
-{
-        GError *error = NULL;
-
-        if (io->idle_read_source) {
-                g_source_destroy (io->idle_read_source);
-                g_clear_pointer (&io->idle_read_source, g_source_unref);
-        }
-
-        while (nghttp2_session_want_read (io->session)) {
-                if (!io_read (io, FALSE, FALSE, &error))
-                        break;
-        }
-
-        if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
-                io->idle_read_source = g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM 
(io->istream), NULL);
-                g_source_set_callback (io->idle_read_source, (GSourceFunc)io_idle_read_ready, io, NULL);
-                g_source_attach (io->idle_read_source, g_main_context_get_thread_default ());
-        }
-
-        g_clear_error (&error);
-}
-
 static gboolean
 soup_client_message_io_http2_run_until_read (SoupClientMessageIO  *iface,
                                              SoupMessage          *msg,
@@ -1451,7 +1413,7 @@ soup_client_message_io_http2_run_until_read (SoupClientMessageIO  *iface,
         SoupClientMessageIOHTTP2 *io = (SoupClientMessageIOHTTP2 *)iface;
         SoupHTTP2MessageData *data = get_data_for_message (io, msg);
 
-        if (io_run_until (io, msg, TRUE, STATE_READ_DATA, cancellable, error))
+        if (io_run_until (io, msg, STATE_READ_DATA, cancellable, error))
                 return TRUE;
 
         if (get_io_data (msg) == io) {
@@ -1485,6 +1447,7 @@ soup_client_message_io_http2_skip (SoupClientMessageIO *iface,
 
         h2_debug (io, data, "Skip");
         NGCHECK (nghttp2_submit_rst_stream (io->session, NGHTTP2_FLAG_NONE, data->stream_id, 
NGHTTP2_STREAM_CLOSED));
+        io_try_write (io);
         return TRUE;
 }
 
@@ -1496,55 +1459,6 @@ soup_client_message_io_http2_run (SoupClientMessageIO *iface,
         g_assert_not_reached ();
 }
 
-static void
-io_run_until_read_async (SoupHTTP2MessageData *data)
-{
-        SoupClientMessageIOHTTP2 *io = data->io;
-        GTask *task = data->task;
-        GError *error = NULL;
-
-        if (data->io_source) {
-                g_source_destroy (data->io_source);
-                g_clear_pointer (&data->io_source, g_source_unref);
-        }
-
-        data->in_run_until_read_async = TRUE;
-
-        if (io_run_until (io, data->msg, FALSE,
-                          STATE_READ_DATA,
-                          g_task_get_cancellable (task),
-                          &error)) {
-                data->task = NULL;
-                data->in_run_until_read_async = FALSE;
-
-                g_task_return_boolean (task, TRUE);
-                g_object_unref (task);
-                return;
-        }
-
-        if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
-                g_error_free (error);
-                io_poll (data);
-                data->in_run_until_read_async = FALSE;
-                return;
-        }
-
-        if (get_io_data (data->msg) == io) {
-                if (data->can_be_restarted)
-                        data->item->state = SOUP_MESSAGE_RESTARTING;
-                else
-                        soup_message_set_metrics_timestamp (data->msg, SOUP_MESSAGE_METRICS_RESPONSE_END);
-
-                soup_client_message_io_http2_finished ((SoupClientMessageIO *)data->io, data->msg);
-        }
-
-        data->task = NULL;
-        data->in_run_until_read_async = FALSE;
-
-        g_task_return_error (task, error);
-        g_object_unref (task);
-}
-
 static void
 soup_client_message_io_http2_run_until_read_async (SoupClientMessageIO *iface,
                                                    SoupMessage         *msg,
@@ -1558,51 +1472,7 @@ soup_client_message_io_http2_run_until_read_async (SoupClientMessageIO *iface,
 
         data->task = g_task_new (msg, cancellable, callback, user_data);
         g_task_set_priority (data->task, io_priority);
-        io_run_until_read_async (data);
-}
-
-static gboolean
-io_close_ready (GObject                  *stream,
-                SoupClientMessageIOHTTP2 *io)
-{
-        io_close (io);
-
-        return G_SOURCE_REMOVE;
-}
-
-static void
-io_close (SoupClientMessageIOHTTP2 *io)
-{
-        GError *error = NULL;
-
-        if (io->close_source) {
-                g_source_destroy (io->close_source);
-                g_clear_pointer (&io->close_source, g_source_unref);
-        }
-
-        while (nghttp2_session_want_write (io->session)) {
-                if (!io_write (io, FALSE, FALSE, &error))
-                        break;
-        }
-
-        if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
-                g_error_free (error);
-                io->close_source = g_pollable_output_stream_create_source (G_POLLABLE_OUTPUT_STREAM 
(io->ostream), NULL);
-                g_source_set_callback (io->close_source, (GSourceFunc)io_close_ready, io, NULL);
-                g_source_attach (io->close_source, g_main_context_get_thread_default ());
-                return;
-        }
-
-        if (io->close_task) {
-                if (error)
-                        g_task_return_error (io->close_task, error);
-                else
-                        g_task_return_boolean (io->close_task, TRUE);
-                g_clear_object (&io->close_task);
-                return;
-        }
-
-        g_clear_error (&error);
+        io->pending_io_messages = g_list_prepend (io->pending_io_messages, data);
 }
 
 static gboolean
@@ -1612,7 +1482,7 @@ soup_client_message_io_http2_close_async (SoupClientMessageIO *iface,
 {
         SoupClientMessageIOHTTP2 *io = (SoupClientMessageIOHTTP2 *)iface;
 
-        if (io->goaway_sent && !io->close_source)
+        if (io->goaway_sent)
                 return FALSE;
 
         g_assert (!io->close_task);
@@ -1626,24 +1496,20 @@ soup_client_message_io_http2_destroy (SoupClientMessageIO *iface)
 {
         SoupClientMessageIOHTTP2 *io = (SoupClientMessageIOHTTP2 *)iface;
 
-        if (io->reset_stream_source) {
-                g_source_destroy (io->reset_stream_source);
-                g_clear_pointer (&io->reset_stream_source, g_source_unref);
-        }
-        if (io->idle_read_source) {
-                g_source_destroy (io->idle_read_source);
-                g_clear_pointer (&io->idle_read_source, g_source_unref);
+        if (io->read_source) {
+                g_source_destroy (io->read_source);
+                g_source_unref (io->read_source);
         }
-        if (io->close_source) {
-                g_source_destroy (io->close_source);
-                g_source_unref (io->close_source);
+        if (io->write_source) {
+                g_source_destroy (io->write_source);
+                g_source_unref (io->write_source);
         }
         g_clear_object (&io->stream);
         g_clear_object (&io->close_task);
-        g_clear_pointer (&io->async_context, g_main_context_unref);
         g_clear_pointer (&io->session, nghttp2_session_del);
         g_clear_pointer (&io->messages, g_hash_table_unref);
         g_clear_pointer (&io->closed_messages, g_hash_table_unref);
+        g_clear_pointer (&io->pending_io_messages, g_list_free);
 
         g_free (io);
 }
@@ -1729,7 +1595,10 @@ soup_client_message_io_http2_new (GIOStream *stream, guint64 connection_id)
         io->ostream = g_io_stream_get_output_stream (io->stream);
         io->connection_id = connection_id;
 
-        io->async_context = g_main_context_ref_thread_default ();
+        io->read_source = g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM (io->istream), 
NULL);
+        g_source_set_name (io->read_source, "Soup HTTP/2 read source");
+        g_source_set_callback (io->read_source, (GSourceFunc)io_read_ready, io, NULL);
+        g_source_attach (io->read_source, g_main_context_get_thread_default ());
 
         NGCHECK (nghttp2_session_set_local_window_size (io->session, NGHTTP2_FLAG_NONE, 0, 
INITIAL_WINDOW_SIZE));
 
@@ -1739,6 +1608,7 @@ soup_client_message_io_http2_new (GIOStream *stream, guint64 connection_id)
                 { NGHTTP2_SETTINGS_ENABLE_PUSH, 0 },
         };
         NGCHECK (nghttp2_submit_settings (io->session, NGHTTP2_FLAG_NONE, settings, G_N_ELEMENTS 
(settings)));
+        io_try_write (io);
 
         return (SoupClientMessageIO *)io;
 }
diff --git a/libsoup/soup-message-private.h b/libsoup/soup-message-private.h
index 8513c48f..12fbfb42 100644
--- a/libsoup/soup-message-private.h
+++ b/libsoup/soup-message-private.h
@@ -103,6 +103,7 @@ SoupClientMessageIO *soup_message_get_io_data (SoupMessage             *msg);
 
 void                soup_message_set_content_sniffer    (SoupMessage        *msg,
                                                         SoupContentSniffer *sniffer);
+gboolean            soup_message_has_content_sniffer    (SoupMessage        *msg);
 gboolean            soup_message_try_sniff_content      (SoupMessage        *msg,
                                                          GInputStream       *stream,
                                                          gboolean            blocking,
diff --git a/libsoup/soup-message.c b/libsoup/soup-message.c
index bd5d34d5..79087e74 100644
--- a/libsoup/soup-message.c
+++ b/libsoup/soup-message.c
@@ -2323,6 +2323,14 @@ soup_message_set_content_sniffer (SoupMessage *msg, SoupContentSniffer *sniffer)
        priv->sniffer = sniffer ? g_object_ref (sniffer) : NULL;
 }
 
+gboolean
+soup_message_has_content_sniffer (SoupMessage *msg)
+{
+        SoupMessagePrivate *priv = soup_message_get_instance_private (msg);
+
+        return priv->sniffer != NULL;
+}
+
 gboolean
 soup_message_try_sniff_content (SoupMessage  *msg,
                                 GInputStream *stream,
diff --git a/tests/http2-body-stream-test.c b/tests/http2-body-stream-test.c
index e5aa3007..ee3218bc 100644
--- a/tests/http2-body-stream-test.c
+++ b/tests/http2-body-stream-test.c
@@ -26,8 +26,7 @@ do_large_data_test (void)
 #define CHUNK_SIZE (gsize)1024 * 1024 * 512 // 512 MiB
 #define TEST_SIZE CHUNK_SIZE * 20 // 10 GiB
 
-        GInputStream *parent_stream = g_memory_input_stream_new ();
-        GInputStream *stream = soup_body_input_stream_http2_new (G_POLLABLE_INPUT_STREAM (parent_stream));
+        GInputStream *stream = soup_body_input_stream_http2_new ();
         SoupBodyInputStreamHttp2 *mem_stream = SOUP_BODY_INPUT_STREAM_HTTP2 (stream);
         gsize data_needed = TEST_SIZE;
         guint8 *memory_chunk = g_new (guint8, CHUNK_SIZE); 
@@ -58,15 +57,13 @@ do_large_data_test (void)
 
         g_free (trash_buffer);
         g_free (memory_chunk);
-        g_object_unref (parent_stream);
         g_object_unref (stream);
 }
 
 static void
 do_multiple_chunk_test (void)
 {
-        GInputStream *parent_stream = g_memory_input_stream_new ();
-        GInputStream *stream = soup_body_input_stream_http2_new (G_POLLABLE_INPUT_STREAM (parent_stream));
+        GInputStream *stream = soup_body_input_stream_http2_new ();
         SoupBodyInputStreamHttp2 *mem_stream = SOUP_BODY_INPUT_STREAM_HTTP2 (stream);
         const char * const chunks[] = {
                 "1234", "5678", "9012", "hell", "owor", "ld..",
@@ -85,7 +82,6 @@ do_multiple_chunk_test (void)
                 g_assert_cmpstr (buffer, ==, chunks[i]);
         }
 
-        g_object_unref (parent_stream);
         g_object_unref (stream);
 }
 
@@ -104,8 +100,7 @@ on_skip_ready (GInputStream *stream, GAsyncResult *res, GMainLoop *loop)
 static void
 do_skip_async_test (void)
 {
-        GInputStream *parent_stream = g_memory_input_stream_new ();
-        GInputStream *stream = soup_body_input_stream_http2_new (G_POLLABLE_INPUT_STREAM (parent_stream));
+        GInputStream *stream = soup_body_input_stream_http2_new ();
         SoupBodyInputStreamHttp2 *bistream = SOUP_BODY_INPUT_STREAM_HTTP2 (stream);
         GMainLoop *loop = g_main_loop_new (NULL, FALSE);
 
@@ -114,7 +109,6 @@ do_skip_async_test (void)
         g_input_stream_skip_async (stream, 2, G_PRIORITY_DEFAULT, NULL, (GAsyncReadyCallback)on_skip_ready, 
loop);
 
         g_main_loop_run (loop);
-        g_object_unref (parent_stream);
         g_object_unref (stream);
         g_main_loop_unref (loop);
 }
diff --git a/tests/http2-test.c b/tests/http2-test.c
index 161782e5..de3b25b2 100644
--- a/tests/http2-test.c
+++ b/tests/http2-test.c
@@ -297,8 +297,7 @@ do_post_blocked_async_test (Test *test, gconstpointer data)
 {
         GMainContext *async_context = g_main_context_ref_thread_default ();
 
-        GInputStream *parent_stream = g_memory_input_stream_new ();
-        GInputStream *in_stream = soup_body_input_stream_http2_new (G_POLLABLE_INPUT_STREAM (parent_stream));
+        GInputStream *in_stream = soup_body_input_stream_http2_new ();
         soup_body_input_stream_http2_add_data (SOUP_BODY_INPUT_STREAM_HTTP2 (in_stream), (guint8*)"Part 1 
-", 8);
 
         test->msg = soup_message_new (SOUP_METHOD_POST, "https://127.0.0.1:5000/echo_post";);
@@ -307,10 +306,9 @@ do_post_blocked_async_test (Test *test, gconstpointer data)
         GBytes *response = NULL;
         soup_session_send_async (test->session, test->msg, G_PRIORITY_DEFAULT, NULL, on_send_complete, 
&response);
 
-        int iteration_count = 20;
         while (!response) {
                 // Let it iterate for a bit waiting on blocked data
-                if (iteration_count-- == 0) {
+                if (soup_body_input_stream_http2_is_blocked (SOUP_BODY_INPUT_STREAM_HTTP2 (in_stream))) {
                         soup_body_input_stream_http2_add_data (SOUP_BODY_INPUT_STREAM_HTTP2 (in_stream), 
(guint8*)" Part 2", 8);
                         soup_body_input_stream_http2_complete (SOUP_BODY_INPUT_STREAM_HTTP2 (in_stream));
                 }
@@ -323,7 +321,6 @@ do_post_blocked_async_test (Test *test, gconstpointer data)
                 g_main_context_iteration (async_context, FALSE);
 
         g_bytes_unref (response);
-        g_object_unref (parent_stream);
         g_object_unref (in_stream);
         g_main_context_unref (async_context);
         g_object_unref (test->msg);
@@ -869,8 +866,7 @@ do_sniffer_sync_test (Test *test, gconstpointer data)
         soup_session_add_feature_by_type (test->session, SOUP_TYPE_CONTENT_SNIFFER);
 
         do_one_sniffer_test (test->session, "https://127.0.0.1:5000/";, 11, TRUE, NULL);
-        /* FIXME: large seems to be broken in sync mode */
-        /* do_one_sniffer_test (test->session, "https://127.0.0.1:5000/large";, (1024 * 24) + 1, TRUE, NULL); 
*/
+        do_one_sniffer_test (test->session, "https://127.0.0.1:5000/large";, (1024 * 24) + 1, TRUE, NULL);
         do_one_sniffer_test (test->session, "https://127.0.0.1:5000/no-content";, 0, FALSE, NULL);
 }
 


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]