[libsoup/wip/http2: 10/10] Add initial HTTP2 backend




commit f8d4a0dcf5c11433a7414ae43c15d7e448297098
Author: Patrick Griffis <pgriffis igalia com>
Date:   Sun May 2 13:20:12 2021 -0500

    Add initial HTTP2 backend

 docs/reference/meson.build                   |    2 +
 libsoup/http2/soup-client-message-io-http2.c | 1386 ++++++++++++++++++++++++++
 libsoup/http2/soup-client-message-io-http2.h |   14 +
 libsoup/http2/soup-memory-input-stream.c     |  611 ++++++++++++
 libsoup/http2/soup-memory-input-stream.h     |   16 +
 libsoup/meson.build                          |    5 +
 libsoup/soup-client-message-io-http1.c       |   37 +-
 libsoup/soup-client-message-io.c             |   26 +
 libsoup/soup-client-message-io.h             |   14 +-
 libsoup/soup-connection.c                    |  132 ++-
 libsoup/soup-connection.h                    |    2 +
 libsoup/soup-logger-private.h                |    2 +
 libsoup/soup-logger.c                        |    8 +
 libsoup/soup-message.c                       |    3 +-
 libsoup/soup-message.h                       |    3 +-
 libsoup/soup-session.c                       |   64 +-
 meson.build                                  |   18 +-
 tests/http2-server.py                        |   94 ++
 tests/http2-test.c                           |  531 ++++++++++
 tests/memory-stream-test.c                   |  101 ++
 tests/meson.build                            |    7 +
 tests/test-utils.c                           |   71 ++
 tests/test-utils.h                           |    3 +
 23 files changed, 3110 insertions(+), 40 deletions(-)
---
diff --git a/docs/reference/meson.build b/docs/reference/meson.build
index 3aea8764..22c9042a 100644
--- a/docs/reference/meson.build
+++ b/docs/reference/meson.build
@@ -41,6 +41,8 @@ ignore_headers = [
   'soup-client-message-io.h',
   'soup-message-io-completion.h',
   'soup-client-message-io-http1.h',
+  'soup-client-message-io-http2.h',
+  'soup-memory-input-stream.h',
 ]
 
 mkdb_args = [
diff --git a/libsoup/http2/soup-client-message-io-http2.c b/libsoup/http2/soup-client-message-io-http2.c
new file mode 100644
index 00000000..1baaa3e8
--- /dev/null
+++ b/libsoup/http2/soup-client-message-io-http2.c
@@ -0,0 +1,1386 @@
+/* soup-message-io-http2.c
+ *
+ * Copyright 2021 Igalia S.L.
+ *
+ * This file is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This file is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ * SPDX-License-Identifier: LGPL-3.0-or-later
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#undef G_LOG_DOMAIN
+#define G_LOG_DOMAIN "libsoup-http2"
+
+#include <glib.h>
+#include <glib/gi18n-lib.h>
+
+#include "soup-client-message-io-http2.h"
+
+#include "soup-body-input-stream.h"
+#include "soup-message-metrics-private.h"
+#include "soup-message-private.h"
+#include "soup-message-io-source.h"
+#include "soup-message-queue-item.h"
+#include "content-sniffer/soup-content-sniffer-stream.h"
+#include "soup-client-input-stream.h"
+#include "soup-logger-private.h"
+#include "soup-uri-utils-private.h"
+
+#include "content-decoder/soup-content-decoder.h"
+#include "soup-memory-input-stream.h"
+
+#include <nghttp2/nghttp2.h>
+
+#define FRAME_HEADER_SIZE 9
+
+typedef enum {
+        STATE_NONE,
+        STATE_WRITE_HEADERS,
+        STATE_WRITE_DATA,
+        STATE_WRITE_DONE,
+        STATE_READ_HEADERS,
+        STATE_READ_DATA,
+        STATE_READ_DATA_SNIFFED,
+        STATE_READ_DONE,
+        STATE_ERROR,
+} SoupHTTP2IOState;
+
+typedef struct {
+        SoupClientMessageIO iface;
+
+        GIOStream *stream;
+        GInputStream *istream;
+        GOutputStream *ostream;
+
+        GMainContext *async_context;
+
+        GPtrArray *messages;
+        GHashTable *message_errors;
+
+        nghttp2_session *session;
+
+        // Owned by nghttp2
+        guint8 *write_buffer;
+        gssize write_buffer_size;
+        gssize written_bytes;
+
+        gboolean is_shutdown;
+} SoupMessageIOHTTP2;
+
+typedef struct {
+        SoupMessageQueueItem *item;
+        SoupMessage *msg;
+        SoupMessageMetrics *metrics;
+        GCancellable *cancellable;
+        GInputStream *decoded_data_istream;
+        GInputStream *memory_data_istream;
+
+        // Request body logger
+        SoupLogger *logger;
+
+        // Both data sources
+        GCancellable *data_source_cancellable;
+
+        // Pollable data sources
+        GSource *data_source_poll;
+
+        // Non-pollable data sources
+        GByteArray *data_source_buffer;
+        GError *data_source_error;
+        gboolean data_source_eof;
+
+        GSource *io_source;
+        SoupMessageIOHTTP2 *io; // Unowned
+        SoupMessageIOCompletionFn completion_cb;
+        gpointer completion_data;
+        SoupHTTP2IOState state;
+        gboolean paused;
+        guint32 stream_id;
+} SoupHTTP2MessageData;
+
+static void soup_message_io_http2_finished (SoupClientMessageIO *, SoupMessage *);
+static gboolean io_read_or_write (SoupMessageIOHTTP2 *, gboolean, GCancellable *, GError **);
+
+#if 0
+static SoupHTTP2MessageData *
+get_message_by_stream_id (SoupMessageIOHTTP2 *io, guint32 stream_id)
+{
+        const guint len = io->messages->len;
+
+        for (uint i = 0; i < len; ++i) {
+                SoupHTTP2MessageData *data = io->messages->pdata[i];
+                if (data->stream_id == stream_id)
+                        return data;
+        }
+
+        if (stream_id != 0)
+                g_warning ("Recieved frame for unknown stream id %u!", stream_id);
+        return NULL;
+}
+#endif
+
+static const char *
+frame_type_to_string (nghttp2_frame_type type)
+{
+        switch (type) {
+        case NGHTTP2_DATA:
+                return "DATA";
+        case NGHTTP2_HEADERS:
+                return "HEADERS";
+        case NGHTTP2_PRIORITY:
+                return "PRIORITY";
+        case NGHTTP2_RST_STREAM:
+                return "RST_STREAM";
+        case NGHTTP2_SETTINGS:
+                return "SETTINGS";
+        case NGHTTP2_PUSH_PROMISE:
+                return "PUSH_PROMISE";
+        case NGHTTP2_PING:
+                return "PING";
+        case NGHTTP2_GOAWAY:
+                return "GOAWAY";
+        case NGHTTP2_WINDOW_UPDATE:
+                return "WINDOW_UPDATE";
+        case NGHTTP2_CONTINUATION:
+                return "CONTINUATION";
+        case NGHTTP2_ALTSVC:
+                return "ALTSVC";
+        case NGHTTP2_ORIGIN:
+                return "ORIGIN";
+        default:
+                g_warn_if_reached ();
+                return "UNKNOWN";
+        }
+}
+
+static void
+h2_debug (SoupMessageIOHTTP2 *io, SoupHTTP2MessageData *data, const char *format, ...)
+{
+        va_list args;
+        char *message;
+        guint32 stream_id = 0;
+
+       va_start (args, format);
+       message = g_strdup_vprintf (format, args);
+       va_end (args);
+
+        if (data)
+                stream_id = data->stream_id;
+
+        g_assert (io);
+        g_log (G_LOG_DOMAIN, G_LOG_LEVEL_DEBUG, "[C%p-S%u] %s", io->stream, stream_id, message);
+
+        g_free (message);
+}
+
+static SoupMessageIOHTTP2 *
+get_io_data (SoupMessage *msg)
+{
+        return (SoupMessageIOHTTP2 *)soup_message_get_io_data (msg);
+}
+
+static int
+get_data_io_priority (SoupHTTP2MessageData *data)
+{
+       if (!data->item->task)
+               return G_PRIORITY_DEFAULT;
+
+       return g_task_get_priority (data->item->task);
+}
+
+static void
+set_error_for_data (SoupMessageIOHTTP2 *io, SoupHTTP2MessageData *data, GError *error)
+{
+        g_debug ("set_error_for_data: %s", error->message);
+        g_hash_table_replace (io->message_errors, data, error);
+}
+
+static GError *
+get_error_for_data (SoupMessageIOHTTP2 *io, SoupHTTP2MessageData *data)
+{
+        GError *error = NULL;
+
+        g_hash_table_steal_extended (io->message_errors, data, NULL, (gpointer*)&error);
+
+        return error;
+}
+
+/* HTTP2 read callbacks */
+
+static int
+on_header_callback (nghttp2_session *session, const nghttp2_frame *frame, const uint8_t *name, size_t 
namelen, const uint8_t *value, size_t valuelen, uint8_t flags, void *user_data)
+{
+        SoupHTTP2MessageData *data = nghttp2_session_get_stream_user_data (session, frame->hd.stream_id);
+
+        if (!data)
+                return 0;
+
+        SoupMessage *msg = data->msg;
+        if (name[0] == ':') {
+                if (strcmp ((char *)name, ":status") == 0) {
+                        guint status_code = (guint)g_ascii_strtoull ((char *)value, NULL, 10);
+                        soup_message_set_status (msg, status_code, NULL);
+                        return 0;
+                }
+                g_debug ("%s = %s", name, value);
+                return 0;
+        }
+
+        // FIXME: Encoding
+        char *name_utf8 = g_utf8_make_valid ((const char *)name, namelen);
+        char *value_utf8 = g_utf8_make_valid ((const char *)value, valuelen);
+        soup_message_headers_append (soup_message_get_response_headers (data->msg),
+                                     name_utf8, value_utf8);
+        g_free (name_utf8);
+        g_free (value_utf8);
+        return 0;
+}
+
+static GError *
+memory_stream_want_read_callback (SoupMemoryInputStream *stream, GCancellable *cancellable, gboolean 
blocking, gpointer user_data)
+{
+        SoupHTTP2MessageData *data = (SoupHTTP2MessageData*)user_data;
+        GError *error = NULL;
+
+        g_debug ("memory_stream_want_read_callback write=%d read=%d", nghttp2_session_want_write 
(data->io->session), nghttp2_session_want_read (data->io->session));
+
+        io_read_or_write (data->io, blocking, cancellable, &error);
+
+        return error;
+}
+
+static int
+on_begin_frame_callback (nghttp2_session *session, const nghttp2_frame_hd *hd, void *user_data)
+{
+        // SoupMessageIOHTTP2 *io = user_data;
+        SoupHTTP2MessageData *data = nghttp2_session_get_stream_user_data (session, hd->stream_id);
+
+        h2_debug (user_data, data, "[RECV] [%s] Beginning", frame_type_to_string (hd->type));
+
+        if (!data)
+                return 0;
+
+        switch (hd->type) {
+        case NGHTTP2_HEADERS:
+                if (data->state < STATE_READ_HEADERS)
+                        data->state = STATE_READ_HEADERS;
+                break;
+        case NGHTTP2_DATA: {
+                // We may have sniffed a previous DATA frame
+                if (data->state < STATE_READ_DATA)
+                        data->state = STATE_READ_DATA;
+                if (!data->memory_data_istream) {
+                        data->memory_data_istream = soup_memory_input_stream_new (G_POLLABLE_INPUT_STREAM 
(data->io->istream));
+                        g_signal_connect (data->memory_data_istream, "want-read",
+                                          G_CALLBACK (memory_stream_want_read_callback), data);
+                }
+                if (!data->decoded_data_istream)
+                        data->decoded_data_istream = soup_message_setup_body_istream 
(data->memory_data_istream, data->msg,
+                                                                                      data->item->session, 
SOUP_STAGE_MESSAGE_BODY);
+                break;
+        }
+        }
+
+        return 0;
+}
+
+static void
+handle_goaway (SoupMessageIOHTTP2 *io, guint32 error_code, guint32 last_stream_id)
+{
+        const guint len = io->messages->len;
+
+        for (uint i = 0; i < len; ++i) {
+                SoupHTTP2MessageData *data = io->messages->pdata[i];
+                /* If there is no error it is a graceful shutdown and
+                 * existing messages can be handled otherwise it is a fatal error */
+                if ((error_code == 0 && data->stream_id > last_stream_id) ||
+                     data->state < STATE_READ_DONE) {
+                        h2_debug (io, data, "[GOAWAY] Error: %s", nghttp2_http2_strerror (error_code));
+                        data->state = STATE_ERROR;
+                        // TODO: We can restart unfinished messages
+                        set_error_for_data (io, data, g_error_new (G_IO_ERROR, G_IO_ERROR_FAILED,
+                                            "HTTP/2 Error: %s", nghttp2_http2_strerror (error_code)));
+                }
+        }
+}
+
+static int
+on_frame_recv_callback (nghttp2_session *session, const nghttp2_frame *frame, gpointer user_data)
+{
+        SoupMessageIOHTTP2 *io = user_data;
+        SoupHTTP2MessageData *data = nghttp2_session_get_stream_user_data (session, frame->hd.stream_id);
+
+        h2_debug (io, data, "[RECV] [%s] Recieved (%u)", frame_type_to_string (frame->hd.type), 
frame->hd.flags);
+
+        if (frame->hd.type == NGHTTP2_GOAWAY) {
+                handle_goaway (io, frame->goaway.error_code, frame->goaway.last_stream_id);
+                io->is_shutdown = TRUE;
+                return 0;
+        }
+
+        if (!data) {
+                if (frame->hd.stream_id != 0 && frame->hd.flags != NGHTTP2_FLAG_END_STREAM)
+                        g_warn_if_reached ();
+                return 0;
+        }
+
+        switch (frame->hd.type) {
+        case NGHTTP2_HEADERS:
+                if (data->metrics)
+                        data->metrics->response_header_bytes_received += frame->hd.length + 
FRAME_HEADER_SIZE;
+
+                if (frame->headers.cat == NGHTTP2_HCAT_RESPONSE && frame->hd.flags & 
NGHTTP2_FLAG_END_HEADERS) {
+                        h2_debug (io, data, "[HEADERS] status %u", soup_message_get_status (data->msg));
+                        if (SOUP_STATUS_IS_INFORMATIONAL (soup_message_get_status (data->msg))) {
+                                soup_message_got_informational (data->msg);
+                                soup_message_cleanup_response (data->msg);
+                                soup_memory_input_stream_complete (SOUP_MEMORY_INPUT_STREAM 
(data->memory_data_istream));
+                                data->state = STATE_READ_DONE;
+                                break;
+                        } else if (soup_message_get_status (data->msg) == SOUP_STATUS_NO_CONTENT) {
+                                data->state = STATE_READ_DONE;
+                        }
+                        soup_message_got_headers (data->msg);
+                }
+                break;
+        case NGHTTP2_DATA:
+                if (data->metrics)
+                        data->metrics->response_body_bytes_received += frame->data.hd.length + 
FRAME_HEADER_SIZE;
+                break;
+        case NGHTTP2_RST_STREAM:
+                h2_debug (io, data, "[RST_STREAM] %s", nghttp2_http2_strerror 
(frame->rst_stream.error_code));
+                if (frame->rst_stream.error_code != NGHTTP2_NO_ERROR) {
+                        set_error_for_data (io, data, g_error_new_literal (G_IO_ERROR, G_IO_ERROR_FAILED,
+                                                                           nghttp2_http2_strerror 
(frame->rst_stream.error_code)));
+                        data->state = STATE_ERROR;
+                }
+                break;
+        };
+
+        if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM) {
+                h2_debug (io, data, "Stream done");
+                data->state = STATE_READ_DONE;
+                if (frame->hd.type == NGHTTP2_DATA) {
+                        soup_memory_input_stream_complete (SOUP_MEMORY_INPUT_STREAM 
(data->memory_data_istream));
+                        soup_message_got_body (data->msg);
+                }
+
+                // soup_message_io_http2_finished (data->msg);
+                // nghttp2_submit_rst_stream (session, NGHTTP2_FLAG_NONE, frame->hd.stream_id, 
NGHTTP2_STREAM_CLOSED);
+        }
+
+        return 0;
+}
+
+static int
+on_data_chunk_recv_callback (nghttp2_session *session, uint8_t flags, int32_t stream_id, const uint8_t 
*data, size_t len, void *user_data)
+{
+        SoupMessageIOHTTP2 *io = user_data;
+        SoupHTTP2MessageData *msgdata = nghttp2_session_get_stream_user_data (session, stream_id);
+
+        if (!msgdata)
+                return NGHTTP2_ERR_CALLBACK_FAILURE;
+
+        h2_debug (io, msgdata, "[DATA] Recieved chunk, len=%zu, flags=%u, paused=%d", len, flags, 
msgdata->paused);
+
+        if (msgdata->paused)
+                return NGHTTP2_ERR_PAUSE;
+
+        SoupMessage *msg = msgdata->msg;
+        GBytes *bytes = g_bytes_new (data, len);
+        g_assert (msgdata->memory_data_istream != NULL);
+        soup_memory_input_stream_add_bytes (SOUP_MEMORY_INPUT_STREAM (msgdata->memory_data_istream), bytes);
+        g_bytes_unref (bytes);
+
+        if (msgdata->state < STATE_READ_DATA_SNIFFED) {
+                if (soup_message_get_content_sniffer (msg)) {
+                        SoupContentSnifferStream *sniffer_stream = SOUP_CONTENT_SNIFFER_STREAM 
(msgdata->decoded_data_istream);
+                        GError *error = NULL;
+                        if (soup_content_sniffer_stream_is_ready (sniffer_stream, FALSE, NULL, &error)) {
+                                GHashTable *params;
+                                const char *content_type = soup_content_sniffer_stream_sniff 
(sniffer_stream, &params);
+
+                                msgdata->state = STATE_READ_DATA_SNIFFED;
+                                soup_message_content_sniffed (msg, content_type, params);
+                                h2_debug (io, msgdata, "[DATA] Sniffed %s", content_type);
+                        } else {
+                                h2_debug (io, msgdata, "[DATA] Sniffer stream was not ready %s", 
error->message);
+                                g_clear_error (&error);
+                        }
+                }
+                else
+                        msgdata->state = STATE_READ_DATA_SNIFFED;
+        }
+
+        return 0;
+}
+
+/* HTTP2 write callbacks */
+
+static int
+on_before_frame_send_callback (nghttp2_session *session, const nghttp2_frame *frame, void *user_data)
+{
+        SoupHTTP2MessageData *data = nghttp2_session_get_stream_user_data (session, frame->hd.stream_id);
+
+        // h2_debug (user_data, data, "[SEND] [%s] Before", frame_type_to_string (frame->hd.type));
+
+        if (!data)
+                return 0;
+
+        switch (frame->hd.type) {
+        case NGHTTP2_HEADERS:
+                data->state = STATE_WRITE_HEADERS;
+                break;
+        case NGHTTP2_DATA:
+                data->state = STATE_WRITE_DATA;
+                break;
+        }
+
+        return 0;
+}
+
+static int
+on_frame_send_callback (nghttp2_session *session, const nghttp2_frame *frame, void *user_data)
+{
+        SoupHTTP2MessageData *data = nghttp2_session_get_stream_user_data (session, frame->hd.stream_id);
+
+        if (!data) {
+                h2_debug (user_data, NULL, "[SEND] [%s]", frame_type_to_string (frame->hd.type));
+                return 0;
+        }
+
+        switch (frame->hd.type) {
+        case NGHTTP2_HEADERS:
+                h2_debug (user_data, data, "[SEND] [HEADERS] finished=%d",
+                          (frame->hd.flags & NGHTTP2_FLAG_END_HEADERS) ? 1 : 0);
+                if (data->metrics)
+                        data->metrics->request_header_bytes_sent += frame->hd.length + FRAME_HEADER_SIZE;
+
+                if (frame->hd.flags & NGHTTP2_FLAG_END_HEADERS) {
+                        soup_message_wrote_headers (data->msg);
+                        if (soup_message_get_request_body_stream (data->msg) == NULL) {
+                                data->state = STATE_WRITE_DONE;
+                                soup_message_wrote_body (data->msg);
+                        }
+                }
+                break;
+        case NGHTTP2_DATA:
+                h2_debug (user_data, data, "[SEND] [DATA] bytes=%zu, finished=%d",
+                          frame->data.hd.length, frame->hd.flags & NGHTTP2_FLAG_END_STREAM);
+                if (data->metrics) {
+                        data->metrics->request_body_bytes_sent += frame->hd.length + FRAME_HEADER_SIZE;
+                        data->metrics->request_body_size += frame->data.hd.length;
+                }
+                if (frame->data.hd.length)
+                        soup_message_wrote_body_data (data->msg, frame->data.hd.length);
+                if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM) {
+                        data->state = STATE_WRITE_DONE;
+                        soup_message_wrote_body (data->msg);
+                }
+                break;
+        default:
+                h2_debug (user_data, NULL, "[SEND] [%s]", frame_type_to_string (frame->hd.type));
+                break;
+        }
+
+        return 0;
+}
+
+static int
+on_frame_not_send_callback (nghttp2_session *session, const nghttp2_frame *frame, int lib_error_code, void 
*user_data)
+{
+        SoupHTTP2MessageData *data = nghttp2_session_get_stream_user_data (session, frame->hd.stream_id);
+
+        h2_debug (user_data, data, "[SEND] [%s] Failed", frame_type_to_string (frame->hd.type));
+
+        if (!data)
+                return 0;
+
+        data->state = STATE_ERROR;
+
+        return 0;
+}
+
+static int
+on_stream_close_callback (nghttp2_session *session, int32_t stream_id, uint32_t error_code, void *user_data)
+{
+        g_debug ("on_stream_close %d", stream_id);
+        return 0;
+}
+
+static gboolean
+on_data_readable (GInputStream *stream, gpointer user_data)
+{
+        SoupHTTP2MessageData *data = (SoupHTTP2MessageData*)user_data;
+
+        nghttp2_session_resume_data (data->io->session, data->stream_id);
+
+        g_clear_pointer (&data->data_source_poll, g_source_unref);
+        return G_SOURCE_REMOVE;
+}
+
+static void
+on_data_read (GInputStream *source, GAsyncResult *res, gpointer user_data)
+{
+        SoupHTTP2MessageData *data = user_data;
+        GError *error = NULL;
+        gssize read = g_input_stream_read_finish (source, res, &error);
+
+        g_debug ("[SEND_BODY] Read %zu", read);
+
+        // This operation may have outlived the message data in which
+        // case this will have been cancelled.
+        if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
+                g_error_free (error);
+                return;
+        }
+
+        if (read < 0) {
+                g_byte_array_set_size (data->data_source_buffer, 0);
+                data->data_source_error = g_steal_pointer (&error);
+        } else if (read == 0)
+                data->data_source_eof = TRUE;
+        else
+                g_byte_array_set_size (data->data_source_buffer, read);
+
+        g_debug ("[SEND_BODY] Resuming send");
+        nghttp2_session_resume_data (data->io->session, data->stream_id);
+}
+
+static void
+log_request_data (SoupHTTP2MessageData *data, const guint8 *buffer, gsize len)
+{
+        if (!data->logger)
+                return;
+
+        // NOTE: This doesn't exactly log data as it hits the network but
+        // rather as soon as we read it from our source which is as good
+        // as we can do since nghttp handles the actual io.
+        soup_logger_log_request_data (data->logger, data->msg, (const char *)buffer, len);
+}
+
+static ssize_t
+on_data_source_read_callback (nghttp2_session *session, int32_t stream_id, uint8_t *buf, size_t length, 
uint32_t *data_flags, nghttp2_data_source *source, void *user_data)
+{
+        SoupHTTP2MessageData *data = nghttp2_session_get_stream_user_data (session, stream_id);
+        SoupMessageIOHTTP2 *io = get_io_data (data->msg);
+
+        if (data->paused) {
+                g_debug ("[SEND_BODY] Paused");
+                return NGHTTP2_ERR_PAUSE;
+        }
+
+        /* This cancellable is only used for async data source operations,
+         * only exists while reading is happening, and will be cancelled
+         * at any point if the data is freed.
+         */
+        if (!data->data_source_cancellable)
+                data->data_source_cancellable = g_cancellable_new ();
+
+        /* We support pollable streams in the best case because they
+         * should perform better with one fewer copy of each buffer and no threading. */
+        if (G_IS_POLLABLE_INPUT_STREAM (source->ptr) && g_pollable_input_stream_can_poll 
(G_POLLABLE_INPUT_STREAM (source->ptr))) {
+                GPollableInputStream *in_stream = G_POLLABLE_INPUT_STREAM (source->ptr);
+                GError *error = NULL;
+
+                gssize read = g_pollable_input_stream_read_nonblocking  (in_stream, buf, length, 
data->cancellable, &error);
+
+                if (read) {
+                        g_debug ("[SEND_BODY] Read %zu", read);
+                        log_request_data (data, buf, read);
+                }
+
+                if (read < 0) {
+                        if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
+                                g_assert (data->data_source_poll == NULL);
+
+                                g_debug ("[SEND_BODY] Polling");
+                                data->data_source_poll = g_pollable_input_stream_create_source (in_stream, 
data->data_source_cancellable);
+                                g_source_set_callback (data->data_source_poll, 
(GSourceFunc)on_data_readable, data, NULL);
+                                g_source_set_priority (data->data_source_poll, get_data_io_priority (data));
+                                g_source_attach (data->data_source_poll, g_main_context_get_thread_default 
());
+
+                                g_error_free (error);
+                                return NGHTTP2_ERR_DEFERRED;
+                        }
+
+                        g_debug ("[SEND_BODY] Error %s", error->message);
+                        set_error_for_data (io, data, g_steal_pointer (&error));
+                        data->state = STATE_ERROR;
+                        return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
+                }
+                else if (read == 0) {
+                        g_debug ("[SEND_BODY] EOF");
+                        *data_flags |= NGHTTP2_DATA_FLAG_EOF;                
+                }
+
+                return read;
+        } else {
+                GInputStream *in_stream = G_INPUT_STREAM (source->ptr);
+        
+                /* To support non-pollable input streams we always deffer reads
+                * and read async into a local buffer. The next time around we will
+                * send that buffer or error.
+                */
+                if (!data->data_source_buffer)
+                        data->data_source_buffer = g_byte_array_new ();
+
+                gsize buffer_len = data->data_source_buffer->len;
+                if (buffer_len) {
+                        g_debug ("[SEND_BODY] Sending %zu", buffer_len);
+                        g_assert (buffer_len <= length); // QUESTION: Maybe not reliable
+                        memcpy (buf, data->data_source_buffer->data, buffer_len);
+                        log_request_data (data, buf, buffer_len);
+                        g_byte_array_set_size (data->data_source_buffer, 0);
+                        return buffer_len;
+                } else if (data->data_source_eof) {
+                        g_debug ("[SEND_BODY] EOF");
+                        g_clear_object (&data->data_source_cancellable);
+                        *data_flags |= NGHTTP2_DATA_FLAG_EOF;
+                        return 0;
+                } else if (data->data_source_error) {
+                        g_debug ("[SEND_BODY] Error %s", data->data_source_error->message);
+                        g_clear_object (&data->data_source_cancellable);
+                        set_error_for_data (io, data, g_steal_pointer (&data->data_source_error));
+                        data->state = STATE_ERROR;
+                        return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
+                } else {
+                        g_debug ("[SEND_BODY] Reading async");
+                        g_byte_array_set_size (data->data_source_buffer, length);
+                        g_input_stream_read_async (in_stream, data->data_source_buffer->data, length,
+                                                   get_data_io_priority (data),
+                                                   data->data_source_cancellable,
+                                                   (GAsyncReadyCallback)on_data_read, data);
+                        return NGHTTP2_ERR_DEFERRED;
+                }
+        }
+}
+
+/* HTTP2 IO functions */
+
+static gboolean
+data_compare (gconstpointer a, gconstpointer b)
+{
+        SoupHTTP2MessageData *data1 = (SoupHTTP2MessageData *)a, *data2 = (SoupHTTP2MessageData *)b;
+
+        return data1->msg == data2->msg;
+}
+
+static SoupHTTP2MessageData *
+add_message_to_io_data (SoupMessageIOHTTP2 *io,
+                        SoupMessageQueueItem *item,
+                        SoupMessageIOCompletionFn completion_cb,
+                        gpointer completion_data)
+{
+        SoupHTTP2MessageData *data = g_new0 (SoupHTTP2MessageData, 1);
+
+        data->item = soup_message_queue_item_ref (item);
+        data->msg = item->msg;
+        data->metrics = soup_message_get_metrics (data->msg);
+        data->cancellable = item->cancellable;
+        data->completion_cb = completion_cb;
+        data->completion_data = completion_data;
+        data->stream_id = 0; // Will be overwritten
+        data->io = io;
+
+        if (g_ptr_array_find_with_equal_func (io->messages, data, data_compare, NULL))
+                g_warn_if_reached ();
+        g_ptr_array_add (io->messages, data);
+
+        return data;
+}
+
+static void
+soup_http2_message_data_free (SoupHTTP2MessageData *data)
+{
+        g_clear_pointer (&data->item, soup_message_queue_item_unref);
+        g_clear_object (&data->memory_data_istream);
+        g_clear_object (&data->decoded_data_istream);
+
+        if (data->io_source) {
+                g_source_destroy (data->io_source);
+                g_clear_pointer (&data->io_source, g_source_unref);
+        }
+
+        if (data->data_source_poll)
+                g_source_destroy (data->data_source_poll);
+        g_clear_pointer (&data->data_source_poll, g_source_unref);
+
+        g_clear_error (&data->data_source_error);
+        g_clear_pointer (&data->data_source_buffer, g_byte_array_unref);
+
+        if (data->data_source_cancellable) {
+                g_cancellable_cancel (data->data_source_cancellable);
+                g_clear_object (&data->data_source_cancellable);
+        }
+
+        g_free (data);
+}
+
+#define MAKE_NV(NAME, VALUE, VALUELEN)                                      \
+        {                                                                   \
+                (uint8_t *)NAME, (uint8_t *)VALUE, strlen (NAME), VALUELEN, \
+                    NGHTTP2_NV_FLAG_NONE                                    \
+        }
+
+#define MAKE_NV2(NAME, VALUE)                                                     \
+        {                                                                         \
+                (uint8_t *)NAME, (uint8_t *)VALUE, strlen (NAME), strlen (VALUE), \
+                    NGHTTP2_NV_FLAG_NONE                                          \
+        }
+
+#define MAKE_NV3(NAME, VALUE, FLAGS)                                              \
+        {                                                                         \
+                (uint8_t *)NAME, (uint8_t *)VALUE, strlen (NAME), strlen (VALUE), \
+                    FLAGS                                                         \
+        }
+
+static void
+send_message_request (SoupMessage *msg, SoupMessageIOHTTP2 *io, SoupHTTP2MessageData *data)
+{
+        GArray *headers = g_array_new (FALSE, FALSE, sizeof (nghttp2_nv));
+
+        GUri *uri = soup_message_get_uri (msg);
+        char *host = soup_uri_get_host_for_headers (uri);
+        char *authority = g_strdup_printf ("%s:%u", host, g_uri_get_port (uri));
+
+        char *path_and_query;
+        if (soup_message_get_is_options_ping (msg))
+                path_and_query = g_strdup ("*");
+        else
+                path_and_query = g_strdup_printf ("%s%c%s", g_uri_get_path (uri), g_uri_get_query (uri) ? 
'?' : '\0', g_uri_get_query (uri));
+
+        const nghttp2_nv pseudo_headers[] = {
+                MAKE_NV3 (":method", soup_message_get_method (msg), NGHTTP2_NV_FLAG_NO_COPY_VALUE),
+                MAKE_NV2 (":scheme", g_uri_get_scheme (uri)),
+                MAKE_NV2 (":authority", authority),
+                MAKE_NV2 (":path", path_and_query),
+        };
+
+        for (guint i = 0; i < G_N_ELEMENTS (pseudo_headers); ++i) {
+                g_array_append_val (headers, pseudo_headers[i]);
+        }
+
+        SoupMessageHeadersIter iter;
+        const char *name, *value;
+        soup_message_headers_iter_init (&iter, soup_message_get_request_headers (msg));
+        while (soup_message_headers_iter_next (&iter, &name, &value)) {
+                /* Forbidden headers. TODO: Avoid setting this elsewhere? */
+                if (g_ascii_strcasecmp (name, "Transfer-Encoding") == 0)
+                        continue;
+                const nghttp2_nv nv = MAKE_NV2 (name, value);
+                g_array_append_val (headers, nv);
+        }
+        
+        GInputStream *body_stream = soup_message_get_request_body_stream (msg);
+        SoupSessionFeature *logger = soup_session_get_feature_for_message (data->item->session, 
SOUP_TYPE_LOGGER, data->msg);
+        if (logger && body_stream)
+                data->logger = SOUP_LOGGER (logger);
+
+        nghttp2_data_provider *data_provider = NULL;
+        if (body_stream) {
+                data_provider = g_new (nghttp2_data_provider, 1);
+                data_provider->source.ptr = body_stream;
+                data_provider->read_callback = on_data_source_read_callback;
+        }
+
+        data->stream_id = nghttp2_submit_request (io->session, NULL, (const nghttp2_nv *)headers->data, 
headers->len, data_provider, data);
+
+        h2_debug (io, data, "[SESSION] Request made for %s%s", authority, path_and_query);
+
+        g_array_free (headers, TRUE);
+        g_free (authority);
+        g_free (host);
+        g_free (path_and_query);
+        g_free (data_provider);
+}
+
+
+
+static void
+soup_message_io_http2_send_item (SoupClientMessageIO *iface,
+                                 SoupMessageQueueItem *item,
+                                 SoupMessageIOCompletionFn completion_cb,
+                                 gpointer user_data)
+{
+        SoupMessageIOHTTP2 *io = (SoupMessageIOHTTP2 *)iface;
+        SoupHTTP2MessageData *data = add_message_to_io_data (io, item, completion_cb, user_data);
+
+        send_message_request (item->msg, io, data);
+}
+
+static SoupHTTP2MessageData *
+get_data_for_message (SoupMessageIOHTTP2 *io, SoupMessage *msg)
+{
+        const guint len = io->messages->len;
+
+        for (uint i = 0; i < len; ++i) {
+                SoupHTTP2MessageData *data = io->messages->pdata[i];
+                if (data->msg == msg)
+                        return data;
+        }
+
+        g_warn_if_reached ();
+        return NULL;
+}
+
+static void
+soup_message_io_http2_finished (SoupClientMessageIO *iface,
+                                SoupMessage         *msg)
+{
+        SoupMessageIOHTTP2 *io = (SoupMessageIOHTTP2 *)iface;
+        SoupHTTP2MessageData *data;
+       SoupMessageIOCompletionFn completion_cb;
+       gpointer completion_data;
+       SoupMessageIOCompletion completion;
+
+        data = get_data_for_message (io, msg);
+
+        h2_debug (io, data, "Finished");
+
+        // int ret;
+        // ret = nghttp2_submit_rst_stream (io->session, NGHTTP2_FLAG_NONE, data->stream_id, 
NGHTTP2_STREAM_CLOSED);
+        // g_assert (ret == 0);
+        // ret = nghttp2_session_terminate_session (io->session, NGHTTP2_NO_ERROR);
+        // g_assert (ret == 0);
+
+       completion_cb = data->completion_cb;
+       completion_data = data->completion_data;
+
+       // TODO
+       completion = SOUP_MESSAGE_IO_COMPLETE;
+
+       g_object_ref (msg);
+
+        nghttp2_session_set_stream_user_data (io->session, data->stream_id, NULL);
+        if (!g_ptr_array_remove_fast (io->messages, data))
+                g_warn_if_reached ();
+
+        soup_connection_message_io_finished (soup_message_get_connection (msg), msg);
+
+       if (completion_cb)
+               completion_cb (G_OBJECT (msg), completion, completion_data);
+
+       g_object_unref (msg);
+}
+
+static void
+soup_message_io_http2_pause (SoupClientMessageIO *iface,
+                             SoupMessage         *msg)
+{
+        SoupMessageIOHTTP2 *io = (SoupMessageIOHTTP2 *)iface;
+        SoupHTTP2MessageData *data = get_data_for_message (io, msg);
+
+        g_debug ("soup_message_io_http2_pause");
+
+        if (data->paused)
+                g_warn_if_reached ();
+
+        data->paused = TRUE;
+}
+
+static void
+soup_message_io_http2_unpause (SoupClientMessageIO *iface,
+                               SoupMessage         *msg)
+{
+        SoupMessageIOHTTP2 *io = (SoupMessageIOHTTP2 *)iface;
+        SoupHTTP2MessageData *data = get_data_for_message (io, msg);
+
+        g_debug ("soup_message_io_http2_unpause");
+
+        if (!data->paused)
+                g_warn_if_reached ();
+
+        data->paused = FALSE;
+}
+
+static void
+soup_message_io_http2_stolen (SoupClientMessageIO *iface)
+{
+        g_assert_not_reached ();
+}
+
+static gboolean
+soup_message_io_http2_in_progress (SoupClientMessageIO *iface,
+                                   SoupMessage         *msg)
+{
+        SoupMessageIOHTTP2 *io = (SoupMessageIOHTTP2 *)iface;
+        SoupHTTP2MessageData *data = get_data_for_message (io, msg);
+
+        return data->state < STATE_WRITE_DONE;
+}
+
+static gboolean
+soup_message_io_http2_is_paused (SoupClientMessageIO *iface,
+                                 SoupMessage         *msg)
+{
+        SoupMessageIOHTTP2 *io = (SoupMessageIOHTTP2 *)iface;
+        SoupHTTP2MessageData *data = get_data_for_message (io, msg);
+
+        return data->paused;
+}
+
+static gboolean
+soup_message_io_http2_is_reusable (SoupClientMessageIO *iface)
+{
+        SoupMessageIOHTTP2 *io = (SoupMessageIOHTTP2 *)iface;
+
+        // TODO: This logic is probably incomplete
+        return !io->is_shutdown;
+}
+
+static gboolean
+message_source_check (GSource *source)
+{
+       SoupMessageIOSource *message_source = (SoupMessageIOSource *)source;
+        SoupMessage *msg = SOUP_MESSAGE (message_source->msg);
+        SoupMessageIOHTTP2 *io = get_io_data (msg);
+        SoupHTTP2MessageData *data = get_data_for_message (io, msg);
+
+        //QUESTION: What is the point of message_source->paused
+
+        return !data->paused;
+}
+
+static GSource *
+soup_message_io_http2_get_source (SoupMessage *msg,
+                                  GCancellable *cancellable,
+                                  SoupMessageIOSourceFunc callback,
+                                  gpointer user_data)
+{
+        SoupMessageIOHTTP2 *io = get_io_data (msg);
+        SoupHTTP2MessageData *data = get_data_for_message (io, msg);
+        //GPollableInputStream *istream;
+
+        //g_debug ("soup_message_io_http2_get_source state=%u, paused=%d", data->state, data->paused);
+
+        //g_debug ("Queue %lu", nghttp2_session_get_outbound_queue_size (io->session));
+
+        GSource *base_source;
+        // TODO: Handle mixing writes in
+        if (data->paused)
+                base_source = cancellable ? g_cancellable_source_new (cancellable) : NULL;
+        else if (data->state < STATE_WRITE_DONE)
+                base_source = g_pollable_output_stream_create_source (G_POLLABLE_OUTPUT_STREAM 
(io->ostream), cancellable);
+        else if (data->state < STATE_READ_DONE)
+                base_source = g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM (io->istream), 
cancellable);
+        else
+                g_assert_not_reached ();
+
+        GSource *source = soup_message_io_source_new (base_source, G_OBJECT (msg), data->paused, 
message_source_check);
+        g_source_set_callback (source, (GSourceFunc)callback, user_data, NULL);
+        return source;
+}
+
+static void
+soup_message_io_http2_skip_body (SoupClientMessageIO *iface,
+                                 SoupMessage         *msg)
+{
+        SoupMessageIOHTTP2 *io = (SoupMessageIOHTTP2 *)iface;
+        SoupHTTP2MessageData *data = get_data_for_message (io, msg);
+
+        g_debug ("soup_message_io_http2_skip_body");
+
+        g_assert (data->memory_data_istream);
+
+        soup_memory_input_stream_complete (SOUP_MEMORY_INPUT_STREAM (data->memory_data_istream));
+        data->state = STATE_READ_DONE;
+        soup_message_got_body (data->msg);
+}
+
+#if 0
+static int
+idle_finish (gpointer user_data)
+{
+        SoupMessage *msg = user_data;
+        soup_message_io_http2_finished (msg); // TODO: Smarter
+        return G_SOURCE_REMOVE;
+}
+#endif
+
+static void
+client_stream_eof (SoupClientInputStream *stream, gpointer user_data)
+{
+       SoupMessage *msg = user_data;
+       SoupMessageIOHTTP2 *io = get_io_data (msg);
+
+        if (!io) {
+                g_warn_if_reached (); // QUESTION: Probably fine
+                return;
+        }
+
+        SoupHTTP2MessageData *data = get_data_for_message (io, msg);
+
+        g_debug ("client_stream_eof %d", SOUP_STATUS_IS_REDIRECTION (soup_message_get_status (data->msg)));
+
+        data->state = STATE_READ_DONE;
+        // data->item->state = SOUP_MESSAGE_FINISHED;
+        // soup_message_io_http2_finished (msg);
+        // g_idle_add (idle_finish, msg); // TODO
+}
+
+static GInputStream *
+soup_message_io_http2_get_response_istream (SoupClientMessageIO *iface,
+                                            SoupMessage         *msg,
+                                            GError **error)
+{
+        SoupMessageIOHTTP2 *io = (SoupMessageIOHTTP2 *)iface;
+        SoupHTTP2MessageData *data = get_data_for_message (io, msg);
+        GInputStream *client_stream, *base_stream;
+
+        g_debug ("soup_message_io_http2_get_response_istream paused=%d", data->paused);
+
+        if (data->decoded_data_istream)
+                base_stream = g_object_ref (data->decoded_data_istream);
+        else /* For example with status_code == SOUP_STATUS_NO_CONTENT */
+                base_stream = g_memory_input_stream_new ();
+
+        client_stream = soup_client_input_stream_new (base_stream, msg);
+        g_signal_connect (client_stream, "eof", G_CALLBACK (client_stream_eof), msg);
+
+        g_object_unref (base_stream);
+
+        return client_stream;
+}
+
+static gboolean
+io_read (SoupMessageIOHTTP2 *io,
+         gboolean blocking,
+        GCancellable *cancellable,
+         GError **error)
+{
+        guint8 buffer[8192];
+        gssize read;
+        int ret;
+
+        if ((read = g_pollable_stream_read (io->istream, buffer, sizeof (buffer),
+                                            blocking, cancellable, error)) < 0)
+            return FALSE;
+
+        ret = nghttp2_session_mem_recv (io->session, buffer, read);
+        return ret != 0;
+}
+
+static gboolean
+io_write (SoupMessageIOHTTP2 *io,
+         gboolean blocking,
+        GCancellable *cancellable,
+         GError **error)
+{
+        /* We must write all of nghttp2's buffer before we ask for more */
+
+        if (io->written_bytes == io->write_buffer_size)
+                io->write_buffer = NULL;
+
+        if (io->write_buffer == NULL) {
+                io->written_bytes = 0;
+                io->write_buffer_size = nghttp2_session_mem_send (io->session, (const 
guint8**)&io->write_buffer);
+                if (io->write_buffer_size == 0) {
+                        /* Done */
+                        io->write_buffer = NULL;
+                        return TRUE;
+                }
+        }
+
+        gssize ret = g_pollable_stream_write (io->ostream,
+                                              io->write_buffer + io->written_bytes,
+                                              io->write_buffer_size - io->written_bytes,
+                                              blocking, cancellable, error);
+        if (ret < 0)
+                return FALSE;
+
+        io->written_bytes += ret;
+        return TRUE;
+}
+
+static gboolean
+io_read_or_write (SoupMessageIOHTTP2 *io,
+                  gboolean blocking,
+                  GCancellable *cancellable,
+                  GError **error)
+{
+        // TODO: This can possibly more inteligent about what actually needs
+        // writing so we can prioritize better.
+        if (nghttp2_session_want_write (io->session))
+                return io_write (io, blocking, cancellable, error);
+        return io_read (io, blocking, cancellable, error);
+}
+
+static gboolean
+io_run_until (SoupMessage *msg,
+              gboolean blocking,
+             SoupHTTP2IOState state,
+             GCancellable *cancellable,
+              GError **error)
+{
+       SoupMessageIOHTTP2 *io = get_io_data (msg);
+        SoupHTTP2MessageData *data = get_data_for_message (io, msg);
+       gboolean progress = TRUE, done;
+       GError *my_error = NULL;
+
+       if (g_cancellable_set_error_if_cancelled (cancellable, error))
+               return FALSE;
+       else if (!io) {
+               g_set_error_literal (error, G_IO_ERROR,
+                                    G_IO_ERROR_CANCELLED,
+                                    _("Operation was cancelled"));
+               return FALSE;
+       }
+
+       g_object_ref (msg);
+
+       while (progress && get_io_data (msg) == io && !data->paused && data->state < state) {
+                progress = io_read_or_write (io, blocking, cancellable, &my_error);
+       }
+
+       if (my_error || (my_error = get_error_for_data (io, data))) {
+               g_propagate_error (error, my_error);
+               g_object_unref (msg);
+               return FALSE;
+        } else if (get_io_data (msg) != io) {
+               g_set_error_literal (error, G_IO_ERROR,
+                                    G_IO_ERROR_CANCELLED,
+                                    _("Operation was cancelled"));
+               g_object_unref (msg);
+               return FALSE;
+       }       
+
+       done = data->state >= state;
+
+       if (!blocking && !done) {
+               g_set_error_literal (error, G_IO_ERROR,
+                                    G_IO_ERROR_WOULD_BLOCK,
+                                    _("Operation would block"));
+               g_object_unref (msg);
+               return FALSE;
+       }
+
+       g_object_unref (msg);
+       return done;
+}
+
+
+static gboolean
+soup_message_io_http2_run_until_read (SoupClientMessageIO *iface,
+                                      SoupMessage         *msg,
+                                      GCancellable *cancellable,
+                                      GError **error)
+{
+        //SoupMessageIOHTTP2 *io = (SoupMessageIOHTTP2 *)iface;
+        return io_run_until (msg, TRUE, STATE_READ_DATA, cancellable, error);
+}
+
+static gboolean
+soup_message_io_http2_run_until_finish (SoupClientMessageIO *iface,
+                                        SoupMessage         *msg,
+                                        gboolean             blocking,
+                                        GCancellable        *cancellable,
+                                        GError             **error)
+{
+
+        g_debug ("soup_message_io_http2_run_until_finish");
+
+        //QUESTION: Prematurely end the stream, we don't need more than what we are getting
+        //nghttp2_submit_rst_stream (io->session, NGHTTP2_FLAG_NONE, data->stream_id, NGHTTP2_STREAM_CLOSED);
+
+        return io_run_until (msg, blocking, STATE_READ_DONE, cancellable, error);
+}
+
+static void
+soup_message_io_http2_run (SoupClientMessageIO *iface,
+                           SoupMessage         *msg,
+                          gboolean             blocking)
+{
+        g_assert_not_reached ();
+}
+
+static void io_run_until_read_async (SoupMessage *msg,
+                                     GTask       *task);
+
+static gboolean
+io_run_until_read_ready (SoupMessage *msg,
+                         gpointer     user_data)
+{
+        GTask *task = user_data;
+
+        io_run_until_read_async (msg, task);
+
+        return G_SOURCE_REMOVE;
+}
+
+static void
+io_run_until_read_async (SoupMessage *msg,
+                         GTask       *task)
+{
+        SoupMessageIOHTTP2 *io = get_io_data (msg);
+        SoupHTTP2MessageData *data = get_data_for_message (io, msg);
+        GError *error = NULL;
+
+        if (data->io_source) {
+                g_source_destroy (data->io_source);
+                g_clear_pointer (&data->io_source, g_source_unref);
+        }
+
+        if (io_run_until (msg, FALSE,
+                          STATE_READ_DATA,
+                          g_task_get_cancellable (task),
+                          &error)) {
+                g_task_return_boolean (task, TRUE);
+                g_object_unref (task);
+                return;
+        }
+
+        if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
+                g_error_free (error);
+                data->io_source = soup_message_io_http2_get_source (msg, g_task_get_cancellable (task),
+                                                                  
(SoupMessageIOSourceFunc)io_run_until_read_ready,
+                                                                  task);
+               g_source_set_priority (data->io_source, g_task_get_priority (task));
+                g_source_attach (data->io_source, io->async_context);
+                return;
+        }
+
+        if (get_io_data (msg) == io)
+                soup_message_io_http2_finished ((SoupClientMessageIO *)io, msg);
+        else
+                g_warn_if_reached ();
+
+        g_task_return_error (task, error);
+        g_object_unref (task);
+}
+
+static void
+soup_message_io_http2_run_until_read_async (SoupClientMessageIO *iface,
+                                            SoupMessage         *msg,
+                                            int                  io_priority,
+                                            GCancellable        *cancellable,
+                                            GAsyncReadyCallback  callback,
+                                            gpointer             user_data)
+{
+        GTask *task;
+
+        task = g_task_new (msg, cancellable, callback, user_data);
+       g_task_set_priority (task, io_priority);
+        io_run_until_read_async (msg, task);
+}
+
+static gboolean
+soup_message_io_http2_is_open (SoupClientMessageIO *iface)
+{
+        SoupMessageIOHTTP2 *io = (SoupMessageIOHTTP2 *)iface;
+        gboolean ret = TRUE;
+
+        GError *error = NULL;
+        if (!io_read (io, FALSE, NULL, &error)) {
+                if (!g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
+                        ret = FALSE;
+
+                g_clear_error (&error);
+        }
+
+        h2_debug (io, NULL, "[SESSION] Open=%d", ret);
+
+        return ret;
+}
+
+static void
+soup_message_io_http2_destroy (SoupClientMessageIO *iface)
+{
+        SoupMessageIOHTTP2 *io = (SoupMessageIOHTTP2 *)iface;
+
+        g_debug ("soup_message_io_http2_destroy");
+
+        g_clear_object (&io->stream);
+        g_clear_pointer (&io->async_context, g_main_context_unref);
+        g_clear_pointer (&io->session, nghttp2_session_del);
+        g_clear_pointer (&io->messages, g_ptr_array_unref);
+        g_clear_pointer (&io->message_errors, g_hash_table_unref);
+
+        g_free (io);
+}
+
+static const SoupClientMessageIOFuncs io_funcs = {
+        soup_message_io_http2_destroy,
+        soup_message_io_http2_finished,
+        soup_message_io_http2_stolen,
+        soup_message_io_http2_send_item,
+        soup_message_io_http2_get_response_istream,
+        soup_message_io_http2_pause,
+        soup_message_io_http2_unpause,
+        soup_message_io_http2_is_paused,
+        soup_message_io_http2_run,
+        soup_message_io_http2_run_until_read,
+        soup_message_io_http2_run_until_read_async,
+        soup_message_io_http2_run_until_finish,
+        soup_message_io_http2_in_progress,
+        soup_message_io_http2_skip_body,
+        soup_message_io_http2_is_open,
+        soup_message_io_http2_is_reusable
+        // soup_message_io_http2_get_source
+};
+
+static void
+soup_client_message_io_http2_init (SoupMessageIOHTTP2 *io)
+{
+        // FIXME: Abort on out of memory errors
+        nghttp2_session_callbacks *callbacks;
+        nghttp2_session_callbacks_new (&callbacks);
+        nghttp2_session_callbacks_set_on_header_callback (callbacks, on_header_callback);
+        nghttp2_session_callbacks_set_on_frame_recv_callback (callbacks, on_frame_recv_callback);
+        nghttp2_session_callbacks_set_on_data_chunk_recv_callback (callbacks, on_data_chunk_recv_callback);
+        nghttp2_session_callbacks_set_on_begin_frame_callback (callbacks, on_begin_frame_callback);
+        nghttp2_session_callbacks_set_before_frame_send_callback (callbacks, on_before_frame_send_callback);
+        nghttp2_session_callbacks_set_on_frame_not_send_callback (callbacks, on_frame_not_send_callback);
+        nghttp2_session_callbacks_set_on_frame_send_callback (callbacks, on_frame_send_callback);
+        nghttp2_session_callbacks_set_on_stream_close_callback (callbacks, on_stream_close_callback);
+
+        nghttp2_session_client_new (&io->session, callbacks, io);
+        nghttp2_session_callbacks_del (callbacks);
+
+        io->messages = g_ptr_array_new_full (1, (GDestroyNotify)soup_http2_message_data_free);
+        /* Errors are stored separate as they have a longer lifetime than MessageData */
+        io->message_errors = g_hash_table_new_full (g_direct_hash, g_direct_equal, NULL, 
(GDestroyNotify)g_error_free);
+
+        io->iface.funcs = &io_funcs;
+}
+
+#define INITIAL_WINDOW_SIZE (32 * 1024 * 1024) // 32MB matches other implementations
+#define MAX_HEADER_TABLE_SIZE 65536 // Match size used by Chromium/Firefox
+
+SoupClientMessageIO *
+soup_client_message_io_http2_new (GIOStream *stream)
+{
+        SoupMessageIOHTTP2 *io = g_new0 (SoupMessageIOHTTP2, 1);
+        soup_client_message_io_http2_init (io);
+
+        io->stream = g_object_ref (stream);
+        io->istream = g_io_stream_get_input_stream (io->stream);
+        io->ostream = g_io_stream_get_output_stream (io->stream);
+
+        io->async_context = g_main_context_ref_thread_default ();
+
+        nghttp2_session_set_local_window_size (io->session, NGHTTP2_FLAG_NONE, 0, INITIAL_WINDOW_SIZE);
+
+        const nghttp2_settings_entry settings[] = {
+                { NGHTTP2_SETTINGS_INITIAL_WINDOW_SIZE, INITIAL_WINDOW_SIZE },
+                { NGHTTP2_SETTINGS_HEADER_TABLE_SIZE, MAX_HEADER_TABLE_SIZE },
+                { NGHTTP2_SETTINGS_ENABLE_PUSH, 0 },
+        };
+        nghttp2_submit_settings (io->session, NGHTTP2_FLAG_NONE, settings, G_N_ELEMENTS (settings));
+
+        return (SoupClientMessageIO *)io;
+}
\ No newline at end of file
diff --git a/libsoup/http2/soup-client-message-io-http2.h b/libsoup/http2/soup-client-message-io-http2.h
new file mode 100644
index 00000000..e96934ac
--- /dev/null
+++ b/libsoup/http2/soup-client-message-io-http2.h
@@ -0,0 +1,14 @@
+/* -*- Mode: C; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 8 -*- */
+/*
+ * Copyright (C) 2021 Igalia S.L.
+ */
+
+#pragma once
+
+#include "soup-client-message-io.h"
+
+G_BEGIN_DECLS
+
+SoupClientMessageIO *           soup_client_message_io_http2_new       (GIOStream                 *stream);
+
+G_END_DECLS
diff --git a/libsoup/http2/soup-memory-input-stream.c b/libsoup/http2/soup-memory-input-stream.c
new file mode 100644
index 00000000..88add784
--- /dev/null
+++ b/libsoup/http2/soup-memory-input-stream.c
@@ -0,0 +1,611 @@
+/* GIO - GLib Input, Output and Streaming Library
+ * 
+ * Copyright (C) 2006-2007 Red Hat, Inc.
+ * Copyright 2021 Igalia S.L.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General
+ * Public License along with this library; if not, see <http://www.gnu.org/licenses/>.
+ *
+ * Author: Christian Kellner <gicmo gnome org> 
+ */
+
+#include "config.h"
+
+#include "soup-memory-input-stream.h"
+#include <glib/gi18n-lib.h>
+
+/**
+ * SECTION:SoupMemoryInputStream
+ * @short_description: Streaming input operations on memory chunks
+ * @include: gio/gio.h
+ * @see_also: #SoupMemoryOutputStream
+ *
+ * #SoupMemoryInputStream is a class for using arbitrary
+ * memory chunks as input for GIO streaming input operations.
+ *
+ * It differs from #GMemoryInputStream in that it frees older chunks
+ * after they have been read, returns #G_IO_ERROR_WOULDBLOCK at the end
+ * of data until soup_memory_input_stream_complete() is called, and implements
+ * g_pollable_input_stream_is_readable().
+ */
+
+struct _SoupMemoryInputStream {
+        GInputStream parent_instance;
+};
+
+typedef struct {
+        GSList *chunks;
+        GPollableInputStream *parent_stream;
+        gsize start_offset;
+        gsize len;
+        gsize pos;
+        gboolean completed;
+} SoupMemoryInputStreamPrivate;
+
+static gssize soup_memory_input_stream_read (GInputStream *stream,
+                                             void *buffer,
+                                             gsize count,
+                                             GCancellable *cancellable,
+                                             GError **error);
+static gssize soup_memory_input_stream_skip (GInputStream *stream,
+                                             gsize count,
+                                             GCancellable *cancellable,
+                                             GError **error);
+static gboolean soup_memory_input_stream_close (GInputStream *stream,
+                                                GCancellable *cancellable,
+                                                GError **error);
+static void soup_memory_input_stream_skip_async (GInputStream *stream,
+                                                 gsize count,
+                                                 int io_priority,
+                                                 GCancellable *cancellabl,
+                                                 GAsyncReadyCallback callback,
+                                                 gpointer datae);
+static gssize soup_memory_input_stream_skip_finish (GInputStream *stream,
+                                                    GAsyncResult *result,
+                                                    GError **error);
+static void soup_memory_input_stream_close_async (GInputStream *stream,
+                                                  int io_priority,
+                                                  GCancellable *cancellabl,
+                                                  GAsyncReadyCallback callback,
+                                                  gpointer data);
+static gboolean soup_memory_input_stream_close_finish (GInputStream *stream,
+                                                       GAsyncResult *result,
+                                                       GError **error);
+
+static void soup_memory_input_stream_seekable_iface_init (GSeekableIface *iface);
+static goffset soup_memory_input_stream_tell (GSeekable *seekable);
+static gboolean soup_memory_input_stream_can_seek (GSeekable *seekable);
+static gboolean soup_memory_input_stream_seek (GSeekable *seekable,
+                                               goffset offset,
+                                               GSeekType type,
+                                               GCancellable *cancellable,
+                                               GError **error);
+static gboolean soup_memory_input_stream_can_truncate (GSeekable *seekable);
+static gboolean soup_memory_input_stream_truncate (GSeekable *seekable,
+                                                   goffset offset,
+                                                   GCancellable *cancellable,
+                                                   GError **error);
+
+static void soup_memory_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface);
+static gboolean soup_memory_input_stream_is_readable (GPollableInputStream *stream);
+static GSource *soup_memory_input_stream_create_source (GPollableInputStream *stream,
+                                                        GCancellable *cancellable);
+static gssize soup_memory_input_stream_read_nonblocking (GPollableInputStream *stream,
+                                                         void *buffer,
+                                                         gsize count,
+                                                         GError **error);
+
+static void soup_memory_input_stream_dispose (GObject *object);
+static void soup_memory_input_stream_finalize (GObject *object);
+
+G_DEFINE_TYPE_WITH_CODE (SoupMemoryInputStream, soup_memory_input_stream, G_TYPE_INPUT_STREAM,
+                         G_ADD_PRIVATE (SoupMemoryInputStream)
+                             G_IMPLEMENT_INTERFACE (G_TYPE_SEEKABLE,
+                                                    soup_memory_input_stream_seekable_iface_init);
+                         G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM,
+                                                soup_memory_input_stream_pollable_iface_init);)
+
+enum {
+        WANT_READ,
+        LAST_SIGNAL
+};
+
+static guint signals [LAST_SIGNAL] = { 0 };
+
+static void
+soup_memory_input_stream_class_init (SoupMemoryInputStreamClass *klass)
+{
+        GObjectClass *object_class;
+        GInputStreamClass *istream_class;
+
+        object_class = G_OBJECT_CLASS (klass);
+        object_class->finalize = soup_memory_input_stream_finalize;
+        object_class->dispose = soup_memory_input_stream_dispose;
+
+        istream_class = G_INPUT_STREAM_CLASS (klass);
+        istream_class->read_fn = soup_memory_input_stream_read;
+        istream_class->skip = soup_memory_input_stream_skip;
+        istream_class->close_fn = soup_memory_input_stream_close;
+
+        istream_class->skip_async = soup_memory_input_stream_skip_async;
+        istream_class->skip_finish = soup_memory_input_stream_skip_finish;
+        istream_class->close_async = soup_memory_input_stream_close_async;
+        istream_class->close_finish = soup_memory_input_stream_close_finish;
+
+        signals[WANT_READ] =
+                g_signal_new ("want-read",
+                              G_OBJECT_CLASS_TYPE (object_class),
+                              G_SIGNAL_RUN_FIRST,
+                              0,
+                              NULL, NULL,
+                              NULL,
+                              G_TYPE_ERROR,
+                              2, G_TYPE_CANCELLABLE, G_TYPE_BOOLEAN);
+}
+
+static void
+soup_memory_input_stream_dispose (GObject *object)
+{
+        SoupMemoryInputStream *stream = SOUP_MEMORY_INPUT_STREAM (object);
+        SoupMemoryInputStreamPrivate *priv = soup_memory_input_stream_get_instance_private (stream);
+
+        priv->completed = TRUE;
+
+        G_OBJECT_CLASS (soup_memory_input_stream_parent_class)->dispose (object);
+}
+
+static void
+soup_memory_input_stream_finalize (GObject *object)
+{
+        SoupMemoryInputStream *stream = SOUP_MEMORY_INPUT_STREAM (object);
+        SoupMemoryInputStreamPrivate *priv = soup_memory_input_stream_get_instance_private (stream);
+
+        g_slist_free_full (priv->chunks, (GDestroyNotify)g_bytes_unref);
+        g_clear_object (&priv->parent_stream);
+
+        G_OBJECT_CLASS (soup_memory_input_stream_parent_class)->finalize (object);
+}
+
+static void
+soup_memory_input_stream_seekable_iface_init (GSeekableIface *iface)
+{
+        iface->tell = soup_memory_input_stream_tell;
+        iface->can_seek = soup_memory_input_stream_can_seek;
+        iface->seek = soup_memory_input_stream_seek;
+        iface->can_truncate = soup_memory_input_stream_can_truncate;
+        iface->truncate_fn = soup_memory_input_stream_truncate;
+}
+
+static void
+soup_memory_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface)
+{
+        iface->is_readable = soup_memory_input_stream_is_readable;
+        iface->create_source = soup_memory_input_stream_create_source;
+        iface->read_nonblocking = soup_memory_input_stream_read_nonblocking;
+}
+
+static void
+soup_memory_input_stream_init (SoupMemoryInputStream *stream)
+{
+}
+
+/**
+ * soup_memory_input_stream_new:
+ *
+ * Creates a new empty #SoupMemoryInputStream. 
+ *
+ * Returns: a new #GInputStream
+ */
+GInputStream *
+soup_memory_input_stream_new (GPollableInputStream *parent_stream)
+{
+        GInputStream *stream;
+        SoupMemoryInputStreamPrivate *priv;
+
+        stream = g_object_new (SOUP_TYPE_MEMORY_INPUT_STREAM, NULL);
+        priv = soup_memory_input_stream_get_instance_private (SOUP_MEMORY_INPUT_STREAM (stream));
+        if (parent_stream)
+                priv->parent_stream = g_object_ref (parent_stream);
+
+        return stream;
+}
+
+void
+soup_memory_input_stream_add_bytes (SoupMemoryInputStream *stream,
+                                    GBytes *bytes)
+{
+        SoupMemoryInputStreamPrivate *priv;
+
+        g_return_if_fail (SOUP_IS_MEMORY_INPUT_STREAM (stream));
+        g_return_if_fail (bytes != NULL);
+
+        priv = soup_memory_input_stream_get_instance_private (stream);
+
+        priv->chunks = g_slist_append (priv->chunks, g_bytes_ref (bytes));
+        priv->len += g_bytes_get_size (bytes);
+}
+
+static gssize
+soup_memory_input_stream_read_real (GInputStream *stream,
+                                    gboolean blocking,
+                                    void *buffer,
+                                    gsize read_count,
+                                    GCancellable *cancellable,
+                                    GError **error)
+{
+        SoupMemoryInputStream *memory_stream;
+        SoupMemoryInputStreamPrivate *priv;
+        GSList *l;
+        GBytes *chunk;
+        gsize len;
+        gsize offset, start, rest, size;
+        gsize count;
+
+        memory_stream = SOUP_MEMORY_INPUT_STREAM (stream);
+        priv = soup_memory_input_stream_get_instance_private (memory_stream);
+
+        /* We have a list of chunked bytes that we continually read from.
+         * Once a chunk is fully read it is removed from our list and we
+         * keep the offset of where the chunks start.
+         */
+
+        count = MIN (read_count, priv->len - priv->pos);
+
+        offset = priv->start_offset;
+        for (l = priv->chunks; l; l = l->next) {
+                chunk = (GBytes *)l->data;
+                len = g_bytes_get_size (chunk);
+
+                if (offset + len > priv->pos)
+                        break;
+
+                offset += len;
+        }
+
+        priv->start_offset = offset;
+        start = priv->pos - offset;
+        rest = count;
+
+        while (l && rest > 0) {
+                GSList *next = l->next;
+
+                const guint8 *chunk_data;
+                chunk = (GBytes *)l->data;
+
+                chunk_data = g_bytes_get_data (chunk, &len);
+
+                size = MIN (rest, len - start);
+
+                memcpy ((guint8 *)buffer + (count - rest), chunk_data + start, size);
+                rest -= size;
+
+                // Remove fully read chunk from list, note that we are always near the start of the list
+                if (start + size == len) {
+                        priv->start_offset += len;
+                        priv->chunks = g_slist_delete_link (priv->chunks, l);
+                        g_bytes_unref (chunk);
+                }
+
+                start = 0;
+                l = next;
+        }
+
+        priv->pos += count;
+
+        // We need to block until the read is completed.
+        // So emit a signal saying we need more data.
+        if (count == 0 && blocking && !priv->completed) {
+                GError *read_error = NULL;
+                g_signal_emit (memory_stream, signals[WANT_READ], 0,
+                               cancellable,
+                               TRUE,
+                               &read_error);
+
+                if (read_error) {
+                        g_propagate_error (error, read_error);
+                        return -1;
+                }
+
+                return soup_memory_input_stream_read_real (
+                        stream, blocking, buffer, read_count, cancellable, error
+                );
+        }
+
+        return count;
+}
+
+static gssize
+soup_memory_input_stream_read (GInputStream *stream,
+                               void *buffer,
+                               gsize count,
+                               GCancellable *cancellable,
+                               GError **error)
+{
+        return soup_memory_input_stream_read_real (stream, TRUE, buffer, count, cancellable, error);
+}
+
+static gssize
+soup_memory_input_stream_read_nonblocking (GPollableInputStream *stream,
+                                           void *buffer,
+                                           gsize count,
+                                           GError **error)
+{
+        SoupMemoryInputStream *memory_stream = SOUP_MEMORY_INPUT_STREAM (stream);
+        SoupMemoryInputStreamPrivate *priv = soup_memory_input_stream_get_instance_private (memory_stream);
+        GError *inner_error = NULL;
+
+        gsize read = soup_memory_input_stream_read_real (G_INPUT_STREAM (stream), FALSE, buffer, count, 
NULL, &inner_error);
+
+        if (read == 0 && !priv->completed && !inner_error) {
+                g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK, "Operation would block");
+
+                // Try requesting more reads from the io backend
+                GError *inner_error = NULL;
+                g_signal_emit (memory_stream, signals[WANT_READ], 0,
+                               NULL, FALSE, &inner_error);
+
+                // TODO: Do we care?
+                g_clear_error (&inner_error);
+
+                return -1;
+        }
+
+        if (inner_error)
+                g_propagate_error (error, inner_error);
+
+        return read;
+}
+
+void
+soup_memory_input_stream_complete (SoupMemoryInputStream *stream)
+{
+        SoupMemoryInputStreamPrivate *priv = soup_memory_input_stream_get_instance_private (stream);
+        priv->completed = TRUE;
+}
+
+static gssize
+soup_memory_input_stream_skip (GInputStream *stream,
+                               gsize count,
+                               GCancellable *cancellable,
+                               GError **error)
+{
+        SoupMemoryInputStream *memory_stream;
+        SoupMemoryInputStreamPrivate *priv;
+
+        memory_stream = SOUP_MEMORY_INPUT_STREAM (stream);
+        priv = soup_memory_input_stream_get_instance_private (memory_stream);
+
+        count = MIN (count, priv->len - priv->pos);
+        priv->pos += count;
+
+        return count;
+}
+
+static gboolean
+soup_memory_input_stream_close (GInputStream *stream,
+                                GCancellable *cancellable,
+                                GError **error)
+{
+        return TRUE;
+}
+
+static void
+soup_memory_input_stream_skip_async (GInputStream *stream,
+                                     gsize count,
+                                     int io_priority,
+                                     GCancellable *cancellable,
+                                     GAsyncReadyCallback callback,
+                                     gpointer user_data)
+{
+        GTask *task;
+        gssize nskipped;
+        GError *error = NULL;
+
+        nskipped = G_INPUT_STREAM_GET_CLASS (stream)->skip (stream, count, cancellable, &error);
+        task = g_task_new (stream, cancellable, callback, user_data);
+        g_task_set_source_tag (task, soup_memory_input_stream_skip_async);
+
+        if (error)
+                g_task_return_error (task, error);
+        else
+                g_task_return_int (task, nskipped);
+        g_object_unref (task);
+}
+
+static gssize
+soup_memory_input_stream_skip_finish (GInputStream *stream,
+                                      GAsyncResult *result,
+                                      GError **error)
+{
+        g_return_val_if_fail (g_task_is_valid (result, stream), -1);
+
+        return g_task_propagate_int (G_TASK (result), error);
+}
+
+static void
+soup_memory_input_stream_close_async (GInputStream *stream,
+                                      int io_priority,
+                                      GCancellable *cancellable,
+                                      GAsyncReadyCallback callback,
+                                      gpointer user_data)
+{
+        GTask *task;
+
+        task = g_task_new (stream, cancellable, callback, user_data);
+        g_task_set_source_tag (task, soup_memory_input_stream_close_async);
+        g_task_return_boolean (task, TRUE);
+        g_object_unref (task);
+}
+
+static gboolean
+soup_memory_input_stream_close_finish (GInputStream *stream,
+                                       GAsyncResult *result,
+                                       GError **error)
+{
+        return TRUE;
+}
+
+static goffset
+soup_memory_input_stream_tell (GSeekable *seekable)
+{
+        SoupMemoryInputStream *memory_stream;
+        SoupMemoryInputStreamPrivate *priv;
+
+        memory_stream = SOUP_MEMORY_INPUT_STREAM (seekable);
+        priv = soup_memory_input_stream_get_instance_private (memory_stream);
+
+        return priv->pos;
+}
+
+static gboolean soup_memory_input_stream_can_seek (GSeekable *seekable)
+{
+        return FALSE;
+}
+
+static gboolean
+soup_memory_input_stream_seek (GSeekable *seekable,
+                               goffset offset,
+                               GSeekType type,
+                               GCancellable *cancellable,
+                               GError **error)
+{
+        g_set_error_literal (error,
+                             G_IO_ERROR,
+                             G_IO_ERROR_NOT_SUPPORTED,
+                             _ ("Cannot seek SoupMemoryInputStream"));
+        return FALSE;
+}
+
+static gboolean
+soup_memory_input_stream_can_truncate (GSeekable *seekable)
+{
+        return FALSE;
+}
+
+static gboolean
+soup_memory_input_stream_truncate (GSeekable *seekable,
+                                   goffset offset,
+                                   GCancellable *cancellable,
+                                   GError **error)
+{
+        g_set_error_literal (error,
+                             G_IO_ERROR,
+                             G_IO_ERROR_NOT_SUPPORTED,
+                             _ ("Cannot truncate SoupMemoryInputStream"));
+        return FALSE;
+}
+
+static gboolean
+soup_memory_input_stream_is_readable (GPollableInputStream *stream)
+{
+        SoupMemoryInputStream *memory_stream = SOUP_MEMORY_INPUT_STREAM (stream);
+        SoupMemoryInputStreamPrivate *priv = soup_memory_input_stream_get_instance_private (memory_stream);
+
+        return priv->pos < priv->len || priv->completed;
+}
+
+/* Custom GSource */
+
+typedef struct {
+       GSource source;
+       SoupMemoryInputStream *stream;
+} SoupMemoryStreamSource;
+
+static gboolean
+memory_stream_source_prepare (GSource *source,
+                              gint    *timeout)
+{
+        SoupMemoryStreamSource *stream_source = (SoupMemoryStreamSource *)source;
+        return soup_memory_input_stream_is_readable (G_POLLABLE_INPUT_STREAM (stream_source->stream));
+}
+
+static gboolean
+memory_stream_source_dispatch (GSource     *source,
+                               GSourceFunc  callback,
+                               gpointer     user_data)
+{
+       GPollableSourceFunc func = (GPollableSourceFunc )callback;
+       SoupMemoryStreamSource *memory_source = (SoupMemoryStreamSource *)source;
+
+        if (!func)
+                return FALSE;
+
+       return (*func) (G_OBJECT (memory_source->stream), user_data);
+}
+
+static gboolean
+memory_stream_source_closure_callback (GObject *pollable_stream,
+                                      gpointer data)
+{
+       GClosure *closure = data;
+       GValue param = G_VALUE_INIT;
+       GValue result_value = G_VALUE_INIT;
+       gboolean result;
+
+       g_value_init (&result_value, G_TYPE_BOOLEAN);
+
+        g_assert (G_IS_POLLABLE_INPUT_STREAM (pollable_stream));
+       g_value_init (&param, G_TYPE_POLLABLE_INPUT_STREAM);
+       g_value_set_object (&param, pollable_stream);
+
+       g_closure_invoke (closure, &result_value, 1, &param, NULL);
+
+       result = g_value_get_boolean (&result_value);
+       g_value_unset (&result_value);
+       g_value_unset (&param);
+
+       return result;
+}
+
+static void
+memory_stream_source_finalize (GSource *source)
+{
+       SoupMemoryStreamSource *memory_source = (SoupMemoryStreamSource *)source;
+
+       g_object_unref (memory_source->stream);
+}
+
+static GSourceFuncs source_funcs =
+{
+       memory_stream_source_prepare,
+       NULL,
+       memory_stream_source_dispatch,
+       memory_stream_source_finalize,
+       (GSourceFunc)memory_stream_source_closure_callback,
+       NULL,
+};
+
+static GSource *
+soup_memory_input_stream_create_source (GPollableInputStream *stream,
+                                        GCancellable *cancellable)
+{
+        SoupMemoryInputStreamPrivate *priv = soup_memory_input_stream_get_instance_private 
(SOUP_MEMORY_INPUT_STREAM (stream));
+
+        GSource *source = g_source_new (&source_funcs, sizeof (SoupMemoryStreamSource));
+       g_source_set_name (source, "SoupMemoryStreamSource");
+
+       SoupMemoryStreamSource *stream_source = (SoupMemoryStreamSource *)source;
+        stream_source->stream = g_object_ref (SOUP_MEMORY_INPUT_STREAM (stream));
+
+        GSource *child_source;
+        if (priv->parent_stream)
+                child_source = g_pollable_input_stream_create_source (priv->parent_stream, cancellable);
+        else
+                child_source = g_cancellable_source_new (cancellable);
+
+        g_source_set_dummy_callback (child_source);
+        g_source_add_child_source (source, child_source);
+        g_source_unref (child_source);
+
+        return source;
+}
diff --git a/libsoup/http2/soup-memory-input-stream.h b/libsoup/http2/soup-memory-input-stream.h
new file mode 100644
index 00000000..3a26c1db
--- /dev/null
+++ b/libsoup/http2/soup-memory-input-stream.h
@@ -0,0 +1,16 @@
+
+#pragma once
+
+#include "soup-types.h"
+
+#define SOUP_TYPE_MEMORY_INPUT_STREAM (soup_memory_input_stream_get_type ())
+G_DECLARE_FINAL_TYPE (SoupMemoryInputStream, soup_memory_input_stream, SOUP, MEMORY_INPUT_STREAM, 
GInputStream)
+
+GInputStream * soup_memory_input_stream_new        (GPollableInputStream *parent_stream);
+
+void           soup_memory_input_stream_add_bytes (SoupMemoryInputStream *stream,
+                                                   GBytes                *bytes);
+
+void          soup_memory_input_stream_complete    (SoupMemoryInputStream *stream);
+
+G_END_DECLS
diff --git a/libsoup/meson.build b/libsoup/meson.build
index 1448f820..68de84cc 100644
--- a/libsoup/meson.build
+++ b/libsoup/meson.build
@@ -32,6 +32,9 @@ soup_sources = [
   'hsts/soup-hsts-enforcer-db.c',
   'hsts/soup-hsts-policy.c',
 
+  'http2/soup-client-message-io-http2.c',
+  'http2/soup-memory-input-stream.c',
+
   'server/soup-auth-domain.c',
   'server/soup-auth-domain-basic.c',
   'server/soup-auth-domain-digest.c',
@@ -180,6 +183,7 @@ deps = [
   brotlidec_dep,
   platform_deps,
   libz_dep,
+  libnghttp2_dep,
 ]
 
 libsoup_includes = [
@@ -189,6 +193,7 @@ libsoup_includes = [
     'cookies',
     'content-decoder',
     'hsts',
+    'http2',
     'server',
     'websocket',
     '.'
diff --git a/libsoup/soup-client-message-io-http1.c b/libsoup/soup-client-message-io-http1.c
index b2279a7c..2443a51d 100644
--- a/libsoup/soup-client-message-io-http1.c
+++ b/libsoup/soup-client-message-io-http1.c
@@ -1085,6 +1085,37 @@ soup_client_message_io_http1_is_paused (SoupClientMessageIO *iface,
         return io->base.paused;
 }
 
+static gboolean
+soup_client_message_io_http1_in_progress (SoupClientMessageIO *iface,
+                                          SoupMessage         *msg)
+{
+        // In progress as long as object is alive
+        return TRUE;
+}
+
+static gboolean
+soup_client_message_io_http1_is_open (SoupClientMessageIO *iface)
+{
+        /* This is handled in SoupConnection */
+        g_assert_not_reached ();
+        return FALSE;
+}
+
+static gboolean
+soup_client_message_io_http1_is_reusable (SoupClientMessageIO *iface)
+{
+        SoupClientMessageIOHTTP1 *io = (SoupClientMessageIOHTTP1 *)iface;
+        return soup_message_is_keepalive (io->item->msg);
+}
+
+static void
+soup_client_message_io_http1_skip_body (SoupClientMessageIO *iface,
+                                       SoupMessage         *msg)
+{
+        /* This is already handled by SoupSession reading the rest
+         * of the stream */
+}
+
 static const SoupClientMessageIOFuncs io_funcs = {
         soup_client_message_io_http1_destroy,
         soup_client_message_io_http1_finished,
@@ -1097,7 +1128,11 @@ static const SoupClientMessageIOFuncs io_funcs = {
         soup_client_message_io_http1_run,
         soup_client_message_io_http1_run_until_read,
         soup_client_message_io_http1_run_until_read_async,
-        soup_client_message_io_http1_run_until_finish
+        soup_client_message_io_http1_run_until_finish,
+        soup_client_message_io_http1_in_progress,
+        soup_client_message_io_http1_skip_body,
+        soup_client_message_io_http1_is_open,
+        soup_client_message_io_http1_is_reusable,
 };
 
 SoupClientMessageIO *
diff --git a/libsoup/soup-client-message-io.c b/libsoup/soup-client-message-io.c
index 7a48006a..3537469d 100644
--- a/libsoup/soup-client-message-io.c
+++ b/libsoup/soup-client-message-io.c
@@ -106,3 +106,29 @@ soup_client_message_io_get_response_stream (SoupClientMessageIO *io,
 {
         return io->funcs->get_response_stream (io, msg, error);
 }
+
+gboolean
+soup_client_message_io_in_progress (SoupClientMessageIO       *io,
+                                    SoupMessage               *msg)
+{
+        return io->funcs->in_progress (io, msg);
+}
+
+gboolean
+soup_client_message_io_is_open (SoupClientMessageIO       *io)
+{
+        return io->funcs->is_open (io);
+}
+
+gboolean
+soup_client_message_io_is_reusable (SoupClientMessageIO       *io)
+{
+        return io->funcs->is_reusable (io);
+}
+
+void
+soup_client_message_io_skip_body (SoupClientMessageIO       *io,
+                                  SoupMessage               *msg)
+{
+        io->funcs->skip_body (io, msg);
+}
diff --git a/libsoup/soup-client-message-io.h b/libsoup/soup-client-message-io.h
index dfdb8690..ad358d9f 100644
--- a/libsoup/soup-client-message-io.h
+++ b/libsoup/soup-client-message-io.h
@@ -46,6 +46,12 @@ typedef struct {
                                                gboolean                   blocking,
                                                GCancellable              *cancellable,
                                                GError                   **error);
+        gboolean      (*in_progress)          (SoupClientMessageIO       *io,
+                                               SoupMessage               *msg);
+        void          (*skip_body)            (SoupClientMessageIO       *io,
+                                               SoupMessage               *msg);
+        gboolean      (*is_open)              (SoupClientMessageIO       *io);
+        gboolean      (*is_reusable)          (SoupClientMessageIO       *io);
 } SoupClientMessageIOFuncs;
 
 struct _SoupClientMessageIO {
@@ -86,4 +92,10 @@ gboolean      soup_client_message_io_run_until_finish     (SoupClientMessageIO
                                                            GError                   **error);
 GInputStream *soup_client_message_io_get_response_stream  (SoupClientMessageIO       *io,
                                                            SoupMessage               *msg,
-                                                           GError                   **error);
\ No newline at end of file
+                                                           GError                   **error);
+gboolean      soup_client_message_io_in_progress           (SoupClientMessageIO       *io,
+                                                           SoupMessage               *msg);
+gboolean      soup_client_message_io_is_open              (SoupClientMessageIO       *io);
+gboolean      soup_client_message_io_is_reusable          (SoupClientMessageIO       *io);
+void          soup_client_message_io_skip_body            (SoupClientMessageIO       *io,
+                                                           SoupMessage               *msg);
diff --git a/libsoup/soup-connection.c b/libsoup/soup-connection.c
index a7666246..b13738dc 100644
--- a/libsoup/soup-connection.c
+++ b/libsoup/soup-connection.c
@@ -14,6 +14,7 @@
 #include "soup-io-stream.h"
 #include "soup-message-queue-item.h"
 #include "soup-client-message-io-http1.h"
+#include "soup-client-message-io-http2.h"
 #include "soup-socket-properties.h"
 #include "soup-private-enum-types.h"
 #include <gio/gnetworking.h>
@@ -29,6 +30,7 @@ typedef struct {
        SoupSocketProperties *socket_props;
         guint64 id;
         GSocketAddress *remote_address;
+        gboolean force_http1;
 
        GUri *proxy_uri;
        gboolean ssl;
@@ -39,6 +41,7 @@ typedef struct {
        time_t       unused_timeout;
        GSource     *idle_timeout_src;
        gboolean     reusable;
+        SoupHTTPVersion http_version;
 
        GCancellable *cancellable;
 } SoupConnectionPrivate;
@@ -65,6 +68,7 @@ enum {
        PROP_SSL,
        PROP_TLS_CERTIFICATE,
        PROP_TLS_CERTIFICATE_ERRORS,
+        PROP_FORCE_HTTP1,
 
        LAST_PROPERTY
 };
@@ -81,6 +85,9 @@ static void stop_idle_timer (SoupConnectionPrivate *priv);
 static void
 soup_connection_init (SoupConnection *conn)
 {
+        SoupConnectionPrivate *priv = soup_connection_get_instance_private (conn);
+
+        priv->http_version = SOUP_HTTP_1_1;
 }
 
 static void
@@ -144,6 +151,9 @@ soup_connection_set_property (GObject *object, guint prop_id,
        case PROP_ID:
                priv->id = g_value_get_uint64 (value);
                break;
+       case PROP_FORCE_HTTP1:
+               priv->force_http1 = g_value_get_boolean (value);
+               break;
        default:
                G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
                break;
@@ -181,6 +191,9 @@ soup_connection_get_property (GObject *object, guint prop_id,
        case PROP_TLS_CERTIFICATE_ERRORS:
                g_value_set_flags (value, soup_connection_get_tls_certificate_errors (SOUP_CONNECTION 
(object)));
                break;
+       case PROP_FORCE_HTTP1:
+               g_value_set_boolean (value, priv->force_http1);
+               break;
        default:
                G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
                break;
@@ -284,6 +297,12 @@ soup_connection_class_init (SoupConnectionClass *connection_class)
                                     G_TYPE_TLS_CERTIFICATE_FLAGS, 0,
                                     G_PARAM_READABLE |
                                     G_PARAM_STATIC_STRINGS);
+        properties[PROP_FORCE_HTTP1] =
+                g_param_spec_boolean ("force-http1",
+                                      "Force HTTP 1.x",
+                                      "Force connection to use HTTP 1.x",
+                                      FALSE, G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY |
+                                      G_PARAM_STATIC_STRINGS);
 
         g_object_class_install_properties (object_class, LAST_PROPERTY, properties);
 }
@@ -345,7 +364,7 @@ current_msg_got_body (SoupMessage *msg, gpointer user_data)
                g_clear_pointer (&priv->proxy_uri, g_uri_unref);
        }
 
-       priv->reusable = soup_message_is_keepalive (msg);
+       priv->reusable = soup_client_message_io_is_reusable (priv->io_data);
 }
 
 static void
@@ -368,9 +387,20 @@ set_current_msg (SoupConnection *conn, SoupMessage *msg)
 
        g_return_if_fail (priv->state == SOUP_CONNECTION_IN_USE);
 
+        /* With HTTP/1.x we keep track of the current message both for
+         * proxying and to find out later if the connection is reusable
+         * with keep-alive. With HTTP/2 we don't support proxying and
+         * we assume its reusable by default and detect a closed connection
+         * elsewhere */
+        if (priv->http_version >= SOUP_HTTP_2_0) {
+                priv->reusable = TRUE;
+                return;
+        }
+
        g_object_freeze_notify (G_OBJECT (conn));
 
        if (priv->current_msg) {
+                // FIXME
                g_return_if_fail (soup_message_get_method (priv->current_msg) == SOUP_METHOD_CONNECT);
                clear_current_msg (conn);
        }
@@ -467,6 +497,15 @@ new_tls_connection (SoupConnection    *conn,
 {
         SoupConnectionPrivate *priv = soup_connection_get_instance_private (conn);
         GTlsClientConnection *tls_connection;
+        GPtrArray *advertised_protocols = g_ptr_array_sized_new (4);
+
+        // https://www.iana.org/assignments/tls-extensiontype-values/tls-extensiontype-values.xhtml
+        if (!priv->force_http1)
+                g_ptr_array_add (advertised_protocols, "h2");
+
+        g_ptr_array_add (advertised_protocols, "http/1.1");
+        g_ptr_array_add (advertised_protocols, "http/1.0");
+        g_ptr_array_add (advertised_protocols, NULL);
 
         tls_connection = g_initable_new (g_tls_backend_get_client_connection_type (g_tls_backend_get_default 
()),
                                          priv->cancellable, error,
@@ -474,7 +513,11 @@ new_tls_connection (SoupConnection    *conn,
                                          "server-identity", priv->remote_connectable,
                                          "require-close-notify", FALSE,
                                          "interaction", priv->socket_props->tls_interaction,
+                                         "advertised-protocols", advertised_protocols->pdata,
                                          NULL);
+
+        g_ptr_array_unref (advertised_protocols);
+
         if (!tls_connection)
                 return NULL;
 
@@ -543,6 +586,15 @@ soup_connection_complete (SoupConnection *conn)
 
         g_clear_object (&priv->cancellable);
 
+        if (G_IS_TLS_CONNECTION (priv->connection)) {
+                const char *protocol = g_tls_connection_get_negotiated_protocol (G_TLS_CONNECTION 
(priv->connection));
+                if (g_strcmp0 (protocol, "h2") == 0)
+                        priv->http_version = SOUP_HTTP_2_0;
+                else if (g_strcmp0 (protocol, "http/1.0") == 0)
+                        priv->http_version = SOUP_HTTP_1_0;
+                // Default is 1.1
+        }
+
         if (!priv->ssl || !priv->proxy_uri) {
                 soup_connection_event (conn,
                                        G_SOCKET_CLIENT_COMPLETE,
@@ -937,9 +989,7 @@ static gboolean
 is_idle_connection_disconnected (SoupConnection *conn)
 {
        SoupConnectionPrivate *priv = soup_connection_get_instance_private (conn);
-       GInputStream *istream;
        char buffer[1];
-       GError *error = NULL;
 
        if (!g_socket_is_connected (soup_connection_get_socket (conn)))
                return TRUE;
@@ -947,26 +997,33 @@ is_idle_connection_disconnected (SoupConnection *conn)
        if (priv->unused_timeout && priv->unused_timeout < time (NULL))
                return TRUE;
 
-       istream = g_io_stream_get_input_stream (priv->iostream);
-
-       /* This is tricky. The goal is to check if the socket is readable. If
-        * so, that means either the server has disconnected or it's broken (it
-        * should not send any data while the connection is in idle state). But
-        * we can't just check the readability of the SoupSocket because there
-        * could be non-application layer TLS data that is readable, but which
-        * we don't want to consider. So instead, just read and see if the read
-        * succeeds. This is OK to do here because if the read does succeed, we
-        * just disconnect and ignore the data anyway.
-        */
-       g_pollable_input_stream_read_nonblocking (G_POLLABLE_INPUT_STREAM (istream),
-                                                 &buffer, sizeof (buffer),
-                                                 NULL, &error);
-       if (!g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
-               g_clear_error (&error);
-               return TRUE;
-       }
+        if (priv->http_version == SOUP_HTTP_2_0) {
+                /* We need the HTTP2 backend to maintain its read state.
+                 * It does exactly what below does in practice */
+                return !soup_client_message_io_is_open (priv->io_data);
+        } else {
+                GInputStream *istream = g_io_stream_get_input_stream (priv->iostream);
+                GError *error = NULL;
+
+                /* This is tricky. The goal is to check if the socket is readable. If
+                * so, that means either the server has disconnected or it's broken (it
+                * should not send any data while the connection is in idle state). But
+                * we can't just check the readability of the SoupSocket because there
+                * could be non-application layer TLS data that is readable, but which
+                * we don't want to consider. So instead, just read and see if the read
+                * succeeds. This is OK to do here because if the read does succeed, we
+                * just disconnect and ignore the data anyway.
+                */
+                g_pollable_input_stream_read_nonblocking (G_POLLABLE_INPUT_STREAM (istream),
+                                                        &buffer, sizeof (buffer),
+                                                        NULL, &error);
+                if (!g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
+                        g_clear_error (&error);
+                        return TRUE;
+                }
 
-       g_error_free (error);
+                g_error_free (error);
+        }
 
        return FALSE;
 }
@@ -1058,8 +1115,13 @@ soup_connection_setup_message_io (SoupConnection *conn,
         else
                 priv->reusable = FALSE;
 
-        g_assert (priv->io_data == NULL);
-        priv->io_data = soup_client_message_io_http1_new (priv->iostream);
+        if (priv->http_version <= SOUP_HTTP_1_1) {
+                g_assert (priv->io_data == NULL);
+                priv->io_data = soup_client_message_io_http1_new (priv->iostream);
+        } else {
+                if (!priv->io_data || !soup_client_message_io_is_reusable (priv->io_data))
+                        priv->io_data = soup_client_message_io_http2_new (priv->iostream);
+        }
 
         return priv->io_data;
 }
@@ -1070,8 +1132,10 @@ soup_connection_message_io_finished (SoupConnection *conn,
 {
         SoupConnectionPrivate *priv = soup_connection_get_instance_private (conn);
 
-        g_assert (priv->current_msg == msg);
-        g_clear_pointer (&priv->io_data, soup_client_message_io_destroy);
+        if (priv->http_version < SOUP_HTTP_2_0) {
+                g_assert (priv->current_msg == msg);
+                g_clear_pointer (&priv->io_data, soup_client_message_io_destroy);
+        }
 }
 
 GTlsCertificate *
@@ -1119,3 +1183,19 @@ soup_connection_get_remote_address (SoupConnection *conn)
 
         return priv->remote_address;
 }
+
+SoupHTTPVersion
+soup_connection_get_negotiated_protocol (SoupConnection *conn)
+{
+        SoupConnectionPrivate *priv = soup_connection_get_instance_private (conn);
+
+        return priv->http_version;
+}
+
+gboolean
+soup_connection_is_reusable (SoupConnection *conn)
+{
+        SoupConnectionPrivate *priv = soup_connection_get_instance_private (conn);
+
+        return priv->reusable;
+}
diff --git a/libsoup/soup-connection.h b/libsoup/soup-connection.h
index 9c556029..bb076037 100644
--- a/libsoup/soup-connection.h
+++ b/libsoup/soup-connection.h
@@ -75,6 +75,8 @@ GTlsCertificateFlags soup_connection_get_tls_certificate_errors (SoupConnection
 
 guint64              soup_connection_get_id                     (SoupConnection *conn);
 GSocketAddress      *soup_connection_get_remote_address         (SoupConnection *conn);
+SoupHTTPVersion      soup_connection_get_negotiated_protocol    (SoupConnection *conn);
+gboolean             soup_connection_is_reusable                (SoupConnection *conn);
 
 G_END_DECLS
 
diff --git a/libsoup/soup-logger-private.h b/libsoup/soup-logger-private.h
index a30eb396..e48d16c0 100644
--- a/libsoup/soup-logger-private.h
+++ b/libsoup/soup-logger-private.h
@@ -13,4 +13,6 @@ void soup_logger_request_body_setup (SoupLogger           *logger,
                                      SoupMessage          *msg,
                                      SoupBodyOutputStream *stream);
 
+void soup_logger_log_request_data (SoupLogger *logger, SoupMessage *msg, const char *buffer, gsize len);
+
 G_END_DECLS
diff --git a/libsoup/soup-logger.c b/libsoup/soup-logger.c
index 6690c706..e3963c52 100644
--- a/libsoup/soup-logger.c
+++ b/libsoup/soup-logger.c
@@ -178,6 +178,14 @@ write_body (SoupLogger *logger, const char *buffer, gsize nread,
         }
 }
 
+void
+soup_logger_log_request_data (SoupLogger *logger, SoupMessage *msg, const char *buffer, gsize len)
+{
+        SoupLoggerPrivate *priv = soup_logger_get_instance_private (logger);
+
+        write_body (logger, buffer, len, msg, priv->request_bodies);
+}
+
 static void
 write_response_body (SoupLoggerInputStream *stream, char *buffer, gsize nread,
                      gpointer user_data)
diff --git a/libsoup/soup-message.c b/libsoup/soup-message.c
index ef4d782a..848ff2eb 100644
--- a/libsoup/soup-message.c
+++ b/libsoup/soup-message.c
@@ -1499,6 +1499,7 @@ soup_message_cleanup_response (SoupMessage *msg)
  *   be emitted, if you want to disable authentication for a message use
  *   soup_message_disable_feature() passing #SOUP_TYPE_AUTH_MANAGER instead.
  * @SOUP_MESSAGE_COLLECT_METRICS: Metrics will be collected for this message.
+ * @SOUP_MESSAGE_FORCE_HTTP1: Require an HTTP/1.x connection.
  *
  * Various flags that can be set on a #SoupMessage to alter its
  * behavior.
@@ -2234,7 +2235,7 @@ soup_message_io_in_progress (SoupMessage *msg)
 {
         SoupMessagePrivate *priv = soup_message_get_instance_private (msg);
 
-        return priv->io_data != NULL;
+        return priv->io_data && soup_client_message_io_in_progress (priv->io_data, msg);
 }
 
 void
diff --git a/libsoup/soup-message.h b/libsoup/soup-message.h
index 93fd9af0..e11ec2d8 100644
--- a/libsoup/soup-message.h
+++ b/libsoup/soup-message.h
@@ -78,7 +78,8 @@ typedef enum {
        SOUP_MESSAGE_NEW_CONNECTION           = (1 << 2),
        SOUP_MESSAGE_IDEMPOTENT               = (1 << 3),
        SOUP_MESSAGE_DO_NOT_USE_AUTH_CACHE    = (1 << 4),
-        SOUP_MESSAGE_COLLECT_METRICS          = (1 << 5)
+        SOUP_MESSAGE_COLLECT_METRICS          = (1 << 5),
+        SOUP_MESSAGE_FORCE_HTTP1              = (1 << 6)
 } SoupMessageFlags;
 
 SOUP_AVAILABLE_IN_ALL
diff --git a/libsoup/soup-session.c b/libsoup/soup-session.c
index b027a19b..0759e036 100644
--- a/libsoup/soup-session.c
+++ b/libsoup/soup-session.c
@@ -1286,6 +1286,23 @@ redirect_handler (SoupMessage *msg,
        soup_session_redirect_message (session, msg, &item->error);
 }
 
+static void
+misdirected_handler (SoupMessage *msg,
+                    gpointer     user_data)
+{
+       SoupMessageQueueItem *item = user_data;
+       SoupSession *session = item->session;
+
+        /* HTTP/2 messages may get the misdirected request status and MAY
+         * try a new connection */
+        if (!soup_message_query_flags (msg, SOUP_MESSAGE_NEW_CONNECTION)) {
+                soup_message_add_flags (msg, SOUP_MESSAGE_NEW_CONNECTION);
+                soup_session_requeue_item (session,
+                                         item,
+                                         &item->error);
+        }
+}
+
 static void
 message_restarted (SoupMessage *msg, gpointer user_data)
 {
@@ -1339,6 +1356,9 @@ soup_session_append_queue_item (SoupSession        *session,
                        msg, "got_body", "Location",
                        G_CALLBACK (redirect_handler), item);
        }
+        soup_message_add_status_code_handler (msg, "got-body",
+                                              SOUP_STATUS_MISDIRECTED_REQUEST,
+                                              G_CALLBACK (misdirected_handler), item);
        g_signal_connect (msg, "restarted",
                          G_CALLBACK (message_restarted), item);
 
@@ -1369,9 +1389,12 @@ soup_session_send_queue_item (SoupSession *session,
        if (priv->accept_language && !soup_message_headers_get_list (request_headers, "Accept-Language"))
                soup_message_headers_append (request_headers, "Accept-Language", priv->accept_language);
 
+        soup_message_set_http_version (item->msg, soup_connection_get_negotiated_protocol 
(soup_message_get_connection (item->msg)));
+
         soup_message_force_keep_alive_if_needed (item->msg);
         soup_message_update_request_host_if_needed (item->msg);
 
+
        /* A user agent SHOULD send a Content-Length in a request message when
         * no Transfer-Encoding is sent and the request method defines a meaning
         * for an enclosed payload body. For example, a Content-Length header
@@ -1760,9 +1783,9 @@ get_connection_for_host (SoupSession *session,
 {
        SoupSessionPrivate *priv = soup_session_get_instance_private (session);
        GSocketConnectable *remote_connectable;
+        gboolean force_http1;
        SoupConnection *conn;
        GSList *conns;
-       guint num_pending = 0;
 
        if (priv->disposed)
                return NULL;
@@ -1773,10 +1796,22 @@ get_connection_for_host (SoupSession *session,
                return conn;
        }
 
+        force_http1 = soup_message_query_flags (item->msg, SOUP_MESSAGE_FORCE_HTTP1);
+        if (g_getenv ("SOUP_FORCE_HTTP1"))
+                force_http1 = TRUE;
+
        for (conns = host->connections; conns; conns = conns->next) {
                conn = conns->data;
 
+                if (force_http1 && soup_connection_get_negotiated_protocol (conn) > SOUP_HTTP_1_1)
+                        continue;
+
                switch (soup_connection_get_state (conn)) {
+                case SOUP_CONNECTION_IN_USE:
+                        if (!need_new_connection && soup_connection_is_reusable (conn)) {
+                                return conn;
+                        }
+                        break;
                case SOUP_CONNECTION_IDLE:
                        if (!need_new_connection) {
                                soup_connection_set_state (conn, SOUP_CONNECTION_IN_USE);
@@ -1787,19 +1822,15 @@ get_connection_for_host (SoupSession *session,
                        if (steal_preconnection (session, item, conn))
                                return conn;
 
-                       num_pending++;
-                       break;
+                        /* Always wait if we have a pending connection as it may be
+                         * an h2 connection which will be shared. http/1.x connections
+                         * will only be slightly delayed. */
+                       return NULL;
                default:
                        break;
                }
        }
 
-       /* Limit the number of pending connections; num_messages / 2
-        * is somewhat arbitrary...
-        */
-       if (num_pending > host->num_messages / 2)
-               return NULL;
-
        if (host->num_conns >= priv->max_conns_per_host) {
                if (need_new_connection)
                        *try_cleanup = TRUE;
@@ -1828,6 +1859,7 @@ get_connection_for_host (SoupSession *session,
                             "remote-connectable", remote_connectable,
                             "ssl", soup_uri_is_https (host->uri),
                             "socket-properties", priv->socket_props,
+                             "force-http1", force_http1,
                             NULL);
        g_object_unref (remote_connectable);
 
@@ -2790,6 +2822,9 @@ expected_to_be_requeued (SoupSession *session, SoupMessage *msg)
                return !feature || !soup_message_disables_feature (msg, feature);
        }
 
+        if (soup_message_get_status (msg) == SOUP_STATUS_MISDIRECTED_REQUEST)
+                return TRUE;
+
        if (!soup_message_query_flags (msg, SOUP_MESSAGE_NO_REDIRECT)) {
                return SOUP_SESSION_WOULD_REDIRECT_AS_GET (session, msg) ||
                        SOUP_SESSION_WOULD_REDIRECT_AS_SAFE (session, msg);
@@ -2916,6 +2951,10 @@ send_async_maybe_complete (SoupMessageQueueItem *item,
        if (expected_to_be_requeued (item->session, item->msg)) {
                GOutputStream *ostream;
 
+                /* Tell the backend to expect no more body */
+                soup_client_message_io_skip_body (soup_message_get_io_data (item->msg),
+                                                  item->msg);
+
                /* Gather the current message body... */
                ostream = g_memory_output_stream_new_resizable ();
                g_object_set_data_full (G_OBJECT (item->task), "SoupSession:ostream",
@@ -3318,6 +3357,8 @@ soup_session_send (SoupSession   *session,
                if (!expected_to_be_requeued (session, msg))
                        break;
 
+                soup_client_message_io_skip_body (soup_message_get_io_data (msg), msg);
+
                /* Gather the current message body... */
                ostream = g_memory_output_stream_new_resizable ();
                if (g_output_stream_splice (ostream, stream,
@@ -3748,6 +3789,11 @@ soup_session_websocket_connect_async (SoupSession          *session,
         */
        soup_message_add_flags (msg, SOUP_MESSAGE_NEW_CONNECTION);
 
+        /* WebSocket negotiation over HTTP/2 is not currently supported
+         * and in practice all websocket servers support HTTP1.x with
+         * HTTP/2 not providing a tangible benefit */
+        soup_message_add_flags (msg, SOUP_MESSAGE_FORCE_HTTP1);
+
        item = soup_session_append_queue_item (session, msg, TRUE, cancellable);
        item->io_priority = io_priority;
 
diff --git a/meson.build b/meson.build
index 157da832..3daca77b 100644
--- a/meson.build
+++ b/meson.build
@@ -100,6 +100,8 @@ gio_dep = dependency('gio-2.0', version : glib_required_version,
 
 glib_deps = [glib_dep, gobject_dep, gio_dep]
 
+libnghttp2_dep = dependency('libnghttp2')
+
 sqlite_dep = dependency('sqlite3', required: false)
 
 # Fallback check for sqlite, not all platforms ship pkg-config file
@@ -289,7 +291,20 @@ else
   have_autobahn = find_program('wstest', required: get_option('autobahn')).found()
 endif
 
-if not have_apache or not have_autobahn
+# Quart server used for HTTP/2 tests
+pymod = import('python')
+quart_found = false
+
+python = pymod.find_installation('python3')
+if python.found()
+  ret = run_command(python, '-c', 'import importlib\nassert(importlib.find_loader("quart"))')
+  if ret.returncode() == 0
+    quart_found = true
+  endif
+endif
+message('Python module quart found: @0@'.format(quart_found.to_string('YES', 'NO')))
+
+if not have_apache or not quart_found or not have_autobahn
   warning('Some regression tests will not be compiled due to missing libraries or modules. Please check the 
logs for more details.')
 endif
 
@@ -415,6 +430,7 @@ summary({
 summary({
     'All tests' : get_option('tests'),
     'Tests requiring Apache' : have_apache,
+    'Tests requiring Quart' : quart_found,
     'Fuzzing tests' : get_option('fuzzing').enabled(),
     'Autobahn tests' : have_autobahn,
     'Install tests': get_option('installed_tests'),
diff --git a/tests/http2-server.py b/tests/http2-server.py
new file mode 100755
index 00000000..d091a611
--- /dev/null
+++ b/tests/http2-server.py
@@ -0,0 +1,94 @@
+#!/usr/bin/env python3
+
+import asyncio
+from functools import wraps
+from secrets import compare_digest
+import sys
+from urllib.parse import urlparse
+
+from quart import (
+    request,
+    make_response,
+    Quart,
+)
+
+app = Quart(__name__)
+
+@app.route('/')
+async def index():
+    return 'Hello world'
+
+@app.route('/slow')
+async def slow():
+    await asyncio.sleep(1)
+    return 'Hello world'
+
+@app.route('/no-content')
+async def no_content():
+    return await make_response('', 204)
+
+@app.route('/large')
+async def large():
+
+    async def generate_data():
+        # Send increasing letters just to aid debugging
+        letter = ord('A')
+        bytes_pending = 1024 * 24
+        while bytes_pending:
+            await asyncio.sleep(0.1)
+            bytes_pending -= 1024
+            string = chr(letter) * 1024
+            letter += 1
+            yield bytes(string, 'UTF-8')
+        yield b'\0'
+
+    return generate_data()
+
+@app.route('/echo_query')
+async def echo_query():
+    url = urlparse(request.url)
+    return url.query
+
+@app.route('/echo_post', methods=['POST'])
+async def echo_post():
+    data = await request.get_data()
+    return data
+
+@app.route('/auth')
+async def auth():
+    auth = request.authorization
+
+    if (
+        auth is not None and 
+        auth.type == "basic" and
+        auth.username == 'username' and
+        compare_digest(auth.password, 'password')
+    ):
+        return 'Authenticated'
+
+    response = await make_response('Authentication Required')
+    response.status_code = 401
+    response.headers['WWW-Authenticate'] = 'Basic'
+    return response
+
+has_been_misdirected = False
+
+@app.route('/misdirected_request')
+async def misdirected_request():
+    global has_been_misdirected
+
+    if not has_been_misdirected:
+        has_been_misdirected = True
+        response = await make_response('', 421)
+        return response
+
+    return 'Success!'
+
+if __name__ == '__main__':
+    # Always close so this is never left running by accident
+    loop = asyncio.get_event_loop()
+    loop.call_later(10, lambda: sys.exit(0))
+
+    app.run(use_reloader=False, loop=loop,
+        certfile='test-cert.pem',
+        keyfile='test-key.pem')
diff --git a/tests/http2-test.c b/tests/http2-test.c
new file mode 100644
index 00000000..4f4e69db
--- /dev/null
+++ b/tests/http2-test.c
@@ -0,0 +1,531 @@
+/*
+ * Copyright 2021 Igalia S.L.
+ *
+ * This file is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2 of the
+ * License, or (at your option) any later version.
+ *
+ * This file is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ * SPDX-License-Identifier: LGPL-2.0-or-later
+ */
+
+#include "test-utils.h"
+#include "soup-connection.h"
+#include "soup-message-private.h"
+#include "soup-memory-input-stream.h"
+
+typedef struct {
+        SoupSession *session;
+        SoupMessage *msg;
+} Test;
+
+static void
+setup_session (Test *test, gconstpointer data)
+{
+        test->session = soup_test_session_new (NULL);
+}
+
+static void
+teardown_session (Test *test, gconstpointer data)
+{
+        if (test->msg) {
+                g_assert_cmpuint (soup_message_get_http_version (test->msg), ==, SOUP_HTTP_2_0);
+                g_object_unref (test->msg);
+        }
+
+        soup_test_session_abort_unref (test->session);
+}
+
+static void
+do_basic_async_test (Test *test, gconstpointer data)
+{
+        test->msg = soup_message_new (SOUP_METHOD_GET, "https://127.0.0.1:5000/";);
+        GError *error = NULL;
+        GBytes *response = soup_test_session_async_send (test->session, test->msg, NULL, &error);
+
+        g_assert_no_error (error);
+        g_assert_cmpstr (g_bytes_get_data (response, NULL), ==, "Hello world");
+
+        g_bytes_unref (response);
+}
+
+static void
+do_basic_sync_test (Test *test, gconstpointer data)
+{
+        test->msg = soup_message_new (SOUP_METHOD_GET, "https://127.0.0.1:5000/";);
+        GError *error = NULL;
+
+        GBytes *response = soup_session_send_and_read (test->session, test->msg, NULL, &error);
+
+        g_assert_no_error (error);
+        g_assert_cmpstr (g_bytes_get_data (response, NULL), ==, "Hello world");
+
+        g_bytes_unref (response);
+}
+
+static void
+do_no_content_async_test (Test *test, gconstpointer data)
+{
+        test->msg = soup_message_new (SOUP_METHOD_GET, "https://127.0.0.1:5000/no-content";);
+        GError *error = NULL;
+
+        GBytes *response = soup_test_session_async_send (test->session, test->msg, NULL, &error);
+
+        g_assert_no_error (error);
+        g_assert_cmpuint (soup_message_get_status (test->msg), ==, 204);
+        g_assert_cmpuint (g_bytes_get_size (response), ==, 0);
+
+        g_bytes_unref (response);
+}
+
+static void
+do_large_async_test (Test *test, gconstpointer data)
+{
+        test->msg = soup_message_new (SOUP_METHOD_GET, "https://127.0.0.1:5000/large";);
+        GError *error = NULL;
+
+        /* This is both large and read in chunks */
+        GBytes *response = soup_test_session_async_send (test->session, test->msg, NULL, &error);
+
+        g_assert_no_error (error);
+        // Size hardcoded to match http2-server.py's response
+        g_assert_cmpuint (g_bytes_get_size (response), ==, (1024 * 24) + 1);
+
+        g_bytes_unref (response);
+}
+
+static GBytes *
+read_stream_to_bytes_sync (GInputStream *stream)
+{
+        GOutputStream *out = g_memory_output_stream_new_resizable ();
+        GError *error = NULL;
+
+        gssize read = g_output_stream_splice (out, stream,  G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE | 
G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET,
+                                              NULL, &error);
+
+        g_assert_no_error (error);
+        g_assert_cmpint (read, >, 0);
+
+        GBytes *bytes = g_memory_output_stream_steal_as_bytes (G_MEMORY_OUTPUT_STREAM (out));
+        g_object_unref (out);
+        return bytes;
+}
+
+static void
+on_send_complete (GObject *source, GAsyncResult *res, gpointer user_data)
+{
+        SoupSession *sess = SOUP_SESSION (source);
+        GError *error = NULL;
+        GInputStream *stream;
+        GBytes **bytes_out = user_data;
+
+        stream = soup_session_send_finish (sess, res, &error);
+
+        g_assert_no_error (error);
+        g_assert_nonnull (stream);
+
+        *bytes_out = read_stream_to_bytes_sync (stream);
+        g_object_unref (stream);
+}
+
+static void
+do_multi_message_async_test (Test *test, gconstpointer data)
+{
+        GMainContext *async_context = g_main_context_ref_thread_default ();
+
+        SoupMessage *msg1 = soup_message_new (SOUP_METHOD_GET, "https://127.0.0.1:5000/echo_query?body%201";);
+        soup_message_set_http_version (msg1, SOUP_HTTP_2_0);
+
+        SoupMessage *msg2 = soup_message_new (SOUP_METHOD_GET, "https://127.0.0.1:5000/echo_query?body%202";);
+        soup_message_set_http_version (msg2, SOUP_HTTP_2_0);
+
+        GBytes *response1 = NULL;
+        GBytes *response2 = NULL;
+        soup_session_send_async (test->session, msg1, G_PRIORITY_DEFAULT, NULL, on_send_complete, 
&response1);
+        soup_session_send_async (test->session, msg2, G_PRIORITY_DEFAULT, NULL, on_send_complete, 
&response2);
+
+        while (!response1 || !response2) {
+                g_main_context_iteration (async_context, TRUE);
+        }
+
+        g_assert_cmpuint (soup_message_get_http_version (msg1), ==, SOUP_HTTP_2_0);
+        g_assert_cmpuint (soup_message_get_http_version (msg2), ==, SOUP_HTTP_2_0);
+
+        g_assert_cmpstr (g_bytes_get_data (response1, NULL), ==, "body%201");
+        g_assert_cmpstr (g_bytes_get_data (response2, NULL), ==, "body%202");
+
+        g_bytes_unref (response1);
+        g_bytes_unref (response2);
+        g_object_unref (msg1);
+        g_object_unref (msg2);
+        g_main_context_unref (async_context);
+}
+
+static void
+do_post_sync_test (Test *test, gconstpointer data)
+{
+        GBytes *bytes = g_bytes_new_static ("body 1", sizeof ("body 1"));
+        test->msg = soup_message_new (SOUP_METHOD_POST, "https://127.0.0.1:5000/echo_post";);
+        soup_message_set_request_body_from_bytes (test->msg, "text/plain", bytes);
+
+        GError *error = NULL;
+        GInputStream *response = soup_session_send (test->session, test->msg, NULL, &error);
+
+        g_assert_no_error (error);
+        g_assert_nonnull (response);
+
+        GBytes *response_bytes = read_stream_to_bytes_sync (response);
+        g_assert_cmpstr (g_bytes_get_data (response_bytes, NULL), ==, "body 1");
+
+        g_bytes_unref (response_bytes);
+        g_object_unref (response);
+        g_bytes_unref (bytes);
+
+}
+
+static void
+do_post_async_test (Test *test, gconstpointer data)
+{
+        GMainContext *async_context = g_main_context_ref_thread_default ();
+
+        GBytes *bytes = g_bytes_new_static ("body 1", sizeof ("body 1"));
+        test->msg = soup_message_new (SOUP_METHOD_POST, "https://127.0.0.1:5000/echo_post";);
+        soup_message_set_request_body_from_bytes (test->msg, "text/plain", bytes);
+
+        GBytes *response = NULL;
+        soup_session_send_async (test->session, test->msg, G_PRIORITY_DEFAULT, NULL, on_send_complete, 
&response);
+
+        while (!response) {
+                g_main_context_iteration (async_context, TRUE);
+        }
+
+        g_assert_cmpstr (g_bytes_get_data (response, NULL), ==, "body 1");
+
+        g_bytes_unref (response);
+        g_bytes_unref (bytes);
+        g_main_context_unref (async_context);
+}
+
+static void
+do_post_blocked_async_test (Test *test, gconstpointer data)
+{
+        GMainContext *async_context = g_main_context_ref_thread_default ();
+
+        GInputStream *in_stream = soup_memory_input_stream_new (NULL);
+        GBytes *bytes = g_bytes_new_static ("Part 1 -", 8);
+        soup_memory_input_stream_add_bytes (SOUP_MEMORY_INPUT_STREAM (in_stream), bytes);
+        g_bytes_unref (bytes);
+
+        test->msg = soup_message_new (SOUP_METHOD_POST, "https://127.0.0.1:5000/echo_post";);
+        soup_message_set_request_body (test->msg, "text/plain", in_stream, 8 + 8);
+
+        GBytes *response = NULL;
+        soup_session_send_async (test->session, test->msg, G_PRIORITY_DEFAULT, NULL, on_send_complete, 
&response);
+
+        int iteration_count = 20;
+        while (!response) {
+                // Let it iterate for a bit waiting on blocked data
+                if (iteration_count-- == 0) {
+                        bytes = g_bytes_new_static (" Part 2", 8);
+                        soup_memory_input_stream_add_bytes (SOUP_MEMORY_INPUT_STREAM (in_stream), bytes);
+                        g_bytes_unref (bytes);
+                        soup_memory_input_stream_complete (SOUP_MEMORY_INPUT_STREAM (in_stream));
+                }
+                g_main_context_iteration (async_context, TRUE);
+        }
+
+        g_assert_cmpstr (g_bytes_get_data (response, NULL), ==, "Part 1 - Part 2");
+
+        g_bytes_unref (response);
+        g_object_unref (in_stream);
+        g_main_context_unref (async_context);
+}
+
+static void
+do_post_file_async_test (Test *test, gconstpointer data)
+{
+        GMainContext *async_context = g_main_context_ref_thread_default ();
+
+        GFile *in_file = g_file_new_for_path (g_test_get_filename (G_TEST_DIST, "test-cert.pem", NULL));
+        GFileInputStream *in_stream = g_file_read (in_file, NULL, NULL);
+        g_assert_nonnull (in_stream);
+
+        test->msg = soup_message_new (SOUP_METHOD_POST, "https://127.0.0.1:5000/echo_post";);
+        soup_message_set_request_body (test->msg, "application/x-x509-ca-cert", G_INPUT_STREAM (in_stream), 
-1);
+
+        GBytes *response = NULL;
+        soup_session_send_async (test->session, test->msg, G_PRIORITY_DEFAULT, NULL, on_send_complete, 
&response);
+
+        while (!response)
+                g_main_context_iteration (async_context, TRUE);
+
+        g_assert_true (g_str_has_prefix (g_bytes_get_data (response, NULL), "-----BEGIN CERTIFICATE-----"));
+
+        g_bytes_unref (response);
+        g_object_unref (in_stream);
+        g_object_unref (in_file);
+        g_main_context_unref (async_context);
+}
+
+static gboolean
+on_delayed_auth (SoupAuth *auth)
+{
+        g_test_message ("Authenticating");
+        soup_auth_authenticate (auth, "username", "password");
+        return G_SOURCE_REMOVE;
+}
+
+static gboolean
+on_authenticate (SoupMessage *msg, SoupAuth *auth, gboolean retrying, gpointer user_data)
+{
+        g_test_message ("Authenticate request");
+        /* Force it to pause the message by delaying auth */
+        g_timeout_add (500, (GSourceFunc)on_delayed_auth, auth);
+        return TRUE;
+}
+
+static void
+do_paused_async_test (Test *test, gconstpointer data)
+{
+
+        test->msg = soup_message_new (SOUP_METHOD_GET, "https://127.0.0.1:5000/auth";);
+        g_signal_connect (test->msg, "authenticate", G_CALLBACK (on_authenticate), NULL);
+
+        GError *error = NULL;
+        GBytes *response = soup_test_session_async_send (test->session, test->msg, NULL, &error);
+
+        g_assert_no_error (error);
+        g_assert_cmpstr (g_bytes_get_data (response, NULL), ==, "Authenticated");
+
+        g_bytes_unref (response);
+}
+
+static SoupConnection *last_connection;
+
+static void
+on_send_ready (GObject *source, GAsyncResult *res, gpointer user_data)
+{
+        SoupSession *sess = SOUP_SESSION (source);
+        SoupMessage *msg = soup_session_get_async_result_message (sess, res);
+        guint *complete_count = user_data;
+        SoupConnection *conn;
+        GError *error = NULL;
+        GInputStream *stream;
+
+        stream = soup_session_send_finish (sess, res, &error);
+
+        g_assert_no_error (error);
+        g_assert_nonnull (stream);
+
+        GBytes *result = read_stream_to_bytes_sync (stream);
+        g_object_unref (stream);
+        g_assert_nonnull (result);
+        g_assert_cmpstr (g_bytes_get_data (result, NULL), ==, "Hello world");
+        g_bytes_unref (result);
+
+        g_assert_nonnull (msg);
+        g_assert_cmpuint (soup_message_get_http_version (msg), ==, SOUP_HTTP_2_0);
+        conn = soup_message_get_connection (msg);
+
+        if (last_connection)
+                g_assert (last_connection == conn);
+        else
+                last_connection = conn;
+        
+        g_test_message ("Conn (%u) = %p", *complete_count, conn);
+
+        *complete_count += 1;
+}
+
+static void
+do_connections_test (Test *test, gconstpointer data)
+{
+        GMainContext *async_context = g_main_context_ref_thread_default ();
+        guint complete_count = 0;
+
+#define N_TESTS 100
+
+        for (uint i = 0; i < N_TESTS; ++i) {
+                SoupMessage *msg = soup_message_new ("GET", "https://127.0.0.1:5000/slow";);
+                soup_session_send_async (test->session, msg, G_PRIORITY_DEFAULT, NULL, on_send_ready, 
&complete_count);
+                g_object_unref (msg);
+        }
+
+        while (complete_count != N_TESTS) {
+                g_main_context_iteration (async_context, TRUE);
+        }
+
+        // After no messages reference the connection we should still be able to re-use the same connection
+        SoupMessage *msg = soup_message_new ("GET", "https://127.0.0.1:5000/slow";);
+        soup_session_send_async (test->session, msg, G_PRIORITY_DEFAULT, NULL, on_send_ready, 
&complete_count);
+        g_object_unref (msg);
+
+        g_main_context_unref (async_context);
+}
+
+static void
+do_misdirected_request_test (Test *test, gconstpointer data)
+{
+        test->msg = soup_message_new (SOUP_METHOD_GET, "https://127.0.0.1:5000/misdirected_request";);
+        GError *error = NULL;
+
+        GBytes *response = soup_test_session_async_send (test->session, test->msg, NULL, &error);
+
+        g_assert_no_error (error);
+        g_assert_cmpstr (g_bytes_get_data (response, NULL), ==, "Success!");
+
+        g_bytes_unref (response);
+}
+
+static void
+log_printer (SoupLogger *logger,
+             SoupLoggerLogLevel level,
+             char direction,
+             const char *data,
+             gpointer user_data)
+{
+        gboolean *has_logged_body = user_data;
+
+        // We are testing that the request body is logged
+        // which is backend specific for now
+        if (direction == '>' && g_strcmp0 (data, "Test") == 0)
+                *has_logged_body = TRUE;
+}
+
+static void
+do_logging_test (Test *test, gconstpointer data)
+{
+        gboolean has_logged_body = FALSE;
+
+        SoupLogger *logger = soup_logger_new (SOUP_LOGGER_LOG_BODY);
+        soup_logger_set_printer (logger, log_printer, &has_logged_body, NULL);
+        soup_session_add_feature (test->session, SOUP_SESSION_FEATURE (logger));
+
+        GBytes *bytes = g_bytes_new_static ("Test", sizeof ("Test"));
+        test->msg = soup_message_new (SOUP_METHOD_POST, "https://127.0.0.1:5000/echo_post";);
+        soup_message_set_request_body_from_bytes (test->msg, "text/plain", bytes);
+        GError *error = NULL;
+
+        GBytes *response = soup_test_session_async_send (test->session, test->msg, NULL, &error);
+
+        g_assert_no_error (error);
+        g_assert_cmpstr (g_bytes_get_data (response, NULL), ==, "Test");
+        g_assert_true (has_logged_body);
+
+        g_bytes_unref (response);
+}
+
+static void
+do_metrics_test (Test *test, gconstpointer data)
+{
+        GBytes *bytes = g_bytes_new_static ("Test", sizeof ("Test"));
+        test->msg = soup_message_new (SOUP_METHOD_POST, "https://127.0.0.1:5000/echo_post";);
+        soup_message_set_request_body_from_bytes (test->msg, "text/plain", bytes);
+        soup_message_add_flags (test->msg, SOUP_MESSAGE_COLLECT_METRICS);
+
+        GError *error = NULL;
+        GBytes *response = soup_test_session_async_send (test->session, test->msg, NULL, &error);
+
+        g_assert_no_error (error);
+        g_assert_cmpstr (g_bytes_get_data (response, NULL), ==, "Test");
+
+        SoupMessageMetrics *metrics = soup_message_get_metrics (test->msg);
+        g_assert_nonnull (metrics);
+
+        g_assert_cmpuint (soup_message_metrics_get_request_header_bytes_sent (metrics), >, 0);
+        g_assert_cmpuint (soup_message_metrics_get_request_body_size (metrics), ==, g_bytes_get_size 
(bytes));
+        g_assert_cmpuint (soup_message_metrics_get_request_body_bytes_sent (metrics), >, 
soup_message_metrics_get_request_body_size (metrics));
+
+        g_assert_cmpuint (soup_message_metrics_get_response_header_bytes_received (metrics), >, 0);
+        g_assert_cmpuint (soup_message_metrics_get_response_body_size (metrics), ==, g_bytes_get_size 
(response));
+        g_assert_cmpuint (soup_message_metrics_get_response_body_bytes_received (metrics), >, 
soup_message_metrics_get_response_body_size (metrics));
+
+        g_bytes_unref (response);
+        g_bytes_unref (bytes);     
+}
+
+
+int
+main (int argc, char **argv)
+{
+       int ret;
+
+       test_init (argc, argv, NULL);
+
+        if (!quart_init ()) {
+                test_cleanup ();
+                return 1;
+        }
+
+        g_test_add ("/http2/basic/async", Test, NULL,
+                    setup_session,
+                    do_basic_async_test,
+                    teardown_session);
+        g_test_add ("/http2/basic/sync", Test, NULL,
+                    setup_session,
+                    do_basic_sync_test,
+                    teardown_session);
+        g_test_add ("/http2/no_content/async", Test, NULL,
+                    setup_session,
+                    do_no_content_async_test,
+                    teardown_session);
+        g_test_add ("/http2/large/async", Test, NULL,
+                    setup_session,
+                    do_large_async_test,
+                    teardown_session);
+        g_test_add ("/http2/multiplexing/async", Test, NULL,
+                    setup_session,
+                    do_multi_message_async_test,
+                    teardown_session);
+        g_test_add ("/http2/post/async", Test, NULL,
+                    setup_session,
+                    do_post_async_test,
+                    teardown_session);
+        g_test_add ("/http2/post/sync", Test, NULL,
+                    setup_session,
+                    do_post_sync_test,
+                    teardown_session);
+        g_test_add ("/http2/post/blocked/async", Test, NULL,
+                    setup_session,
+                    do_post_blocked_async_test,
+                    teardown_session);
+        g_test_add ("/http2/post/file/async", Test, NULL,
+                    setup_session,
+                    do_post_file_async_test,
+                    teardown_session);
+        g_test_add ("/http2/paused/async", Test, NULL,
+                    setup_session,
+                    do_paused_async_test,
+                    teardown_session);
+        g_test_add ("/http2/connections", Test, NULL,
+                    setup_session,
+                    do_connections_test,
+                    teardown_session);
+        g_test_add ("/http2/misdirected_request", Test, NULL,
+                    setup_session,
+                    do_misdirected_request_test,
+                    teardown_session);
+        g_test_add ("/http2/logging", Test, NULL,
+                    setup_session,
+                    do_logging_test,
+                    teardown_session);
+        g_test_add ("/http2/metrics", Test, NULL,
+                    setup_session,
+                    do_metrics_test,
+                    teardown_session);
+
+
+       ret = g_test_run ();
+
+        test_cleanup ();
+
+       return ret;
+}
diff --git a/tests/memory-stream-test.c b/tests/memory-stream-test.c
new file mode 100644
index 00000000..b49e7eed
--- /dev/null
+++ b/tests/memory-stream-test.c
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2021 Igalia S.L.
+ *
+ * This file is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2 of the
+ * License, or (at your option) any later version.
+ *
+ * This file is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ * SPDX-License-Identifier: LGPL-2.0-or-later
+ */
+
+#include "test-utils.h"
+#include "soup-memory-input-stream.h"
+
+static void
+do_large_data_test (void)
+{
+#define CHUNK_SIZE (gsize)1024 * 1024 * 512 // 512 MiB
+#define TEST_SIZE CHUNK_SIZE * 20 // 10 GiB
+
+        GInputStream *stream = soup_memory_input_stream_new (NULL);
+        SoupMemoryInputStream *mem_stream = SOUP_MEMORY_INPUT_STREAM (stream);
+        gsize data_needed = TEST_SIZE;
+        char *memory_chunk = g_new (char, CHUNK_SIZE); 
+        char *trash_buffer = g_new (char, CHUNK_SIZE);
+
+        // We can add unlimited data and as long as its read the data will
+        // be freed, so this should work fine even though its reading GB of data
+
+        while (data_needed > 0) {
+                // Copy chunk
+                GBytes *bytes = g_bytes_new (memory_chunk, CHUNK_SIZE);
+                soup_memory_input_stream_add_bytes (mem_stream, bytes);
+                g_bytes_unref (bytes);
+
+                // This should free the copy
+                gssize read = g_input_stream_read (stream, trash_buffer, CHUNK_SIZE, NULL, NULL);
+                g_assert_cmpint (read, ==, CHUNK_SIZE);
+
+                data_needed -= CHUNK_SIZE;
+                
+        }
+
+        g_free (trash_buffer);
+        g_free (memory_chunk);
+        g_object_unref (stream);
+}
+
+static void
+do_multiple_chunk_test (void)
+{
+        GInputStream *stream = soup_memory_input_stream_new (NULL);
+        SoupMemoryInputStream *mem_stream = SOUP_MEMORY_INPUT_STREAM (stream);
+        const char * const chunks[] = {
+                "1234", "5678", "9012", "hell", "owor", "ld..",
+        };
+
+        for (guint i = 0; i < G_N_ELEMENTS (chunks); ++i) {
+                GBytes *bytes = g_bytes_new_static (chunks[i], 4);
+                g_assert (g_bytes_get_size (bytes) == 4);
+                soup_memory_input_stream_add_bytes (mem_stream, bytes);
+                g_bytes_unref (bytes);
+        }
+
+        // Do partial reads of chunks to ensure it always comes out as expected
+        for (guint i = 0; i < G_N_ELEMENTS (chunks); ++i) {
+                char buffer[5] = { 0 };
+                gssize read = g_input_stream_read (stream, buffer, 2, NULL, NULL);
+                g_assert_cmpint (read, ==, 2);
+                read = g_input_stream_read (stream, buffer + 2, 2, NULL, NULL);
+                g_assert_cmpint (read, ==, 2);
+                g_assert_cmpstr (buffer, ==, chunks[i]);
+        }
+
+        g_object_unref (stream);
+}
+
+int
+main (int argc, char **argv)
+{
+       int ret;
+
+       test_init (argc, argv, NULL);
+
+       g_test_add_func ("/memory_stream/large_data", do_large_data_test);
+        g_test_add_func ("/memory_stream/multiple_chunks", do_multiple_chunk_test);
+
+       ret = g_test_run ();
+
+        test_cleanup ();
+
+       return ret;
+}
diff --git a/tests/meson.build b/tests/meson.build
index 88c2813d..bb7f13c2 100644
--- a/tests/meson.build
+++ b/tests/meson.build
@@ -47,6 +47,7 @@ tests = [
   ['hsts-db', true, []],
   ['logger', true, []],
   ['misc', true, []],
+  ['memory-stream', true, []],
   ['multipart', true, []],
   ['no-ssl', true, []],
   ['ntlm', true, []],
@@ -66,6 +67,12 @@ tests = [
   ['websocket', true, [libz_dep]]
 ]
 
+if quart_found
+  tests += [
+    ['http2', true, []],
+  ]
+endif
+
 if brotlidec_dep.found()
   tests += [
     ['brotli-decompressor', true, []],
diff --git a/tests/test-utils.c b/tests/test-utils.c
index d8b9c68d..d4b299ba 100644
--- a/tests/test-utils.c
+++ b/tests/test-utils.c
@@ -112,6 +112,8 @@ test_cleanup (void)
                apache_cleanup ();
 #endif
 
+        quart_cleanup ();
+
        if (logger)
                g_object_unref (logger);
        if (index_buffer)
@@ -256,6 +258,75 @@ apache_cleanup (void)
 
 #endif /* HAVE_APACHE */
 
+static GSubprocess *quart_proc;
+
+gboolean
+quart_init (void)
+{
+        if (quart_proc)
+                return TRUE;
+
+        GSubprocessLauncher *launcher = g_subprocess_launcher_new (G_SUBPROCESS_FLAGS_STDOUT_PIPE); // | 
G_SUBPROCESS_FLAGS_STDERR_SILENCE
+        g_subprocess_launcher_set_cwd (launcher, g_test_get_dir (G_TEST_DIST));
+
+        GError *error = NULL;
+        char *script = g_test_build_filename (G_TEST_DIST, "http2-server.py", NULL);
+        quart_proc = g_subprocess_launcher_spawn (launcher, &error, script, NULL);
+        g_free (script);
+        g_object_unref (launcher);
+
+        if (error) {
+                g_test_message ("Failed to start quart server: %s", error->message);
+                g_error_free (error);
+                return FALSE;
+        }
+
+        GDataInputStream *in_stream = g_data_input_stream_new (g_subprocess_get_stdout_pipe (quart_proc));
+
+        // We don't own the stream, don't break the pipe
+        g_filter_input_stream_set_close_base_stream (G_FILTER_INPUT_STREAM (in_stream), FALSE);
+
+        // Read stdout until the server says it is running
+        while (TRUE) {
+                char *line = g_data_input_stream_read_line_utf8 (in_stream, NULL, NULL, &error);
+
+                if (error) {
+                        g_test_message ("Failed to start quart server: %s", error->message);
+                        g_error_free (error);
+                        g_object_unref (in_stream);
+                        return FALSE;
+                } else if (line == NULL) {
+                        g_test_message ("Failed to start quart server (not installed?)");
+                        g_object_unref (in_stream);
+                        return FALSE;
+                }
+
+                if (g_str_has_prefix (line, " * Running")) {
+                        g_test_message ("Started quart server: %s", line + 3);
+                        g_free (line);
+                        g_object_unref (in_stream);
+                        return TRUE;
+                }
+                g_free (line);
+        }
+}
+
+void
+quart_cleanup (void)
+{
+        if (quart_proc) {
+                GError *error = NULL;
+                g_subprocess_force_exit (quart_proc);
+                g_subprocess_wait (quart_proc, NULL, &error);
+                if (error) {
+                        g_test_message ("Failed to stop quart server: %s", error->message);
+                        g_error_free (error);
+                }
+        }
+
+        g_clear_object (&quart_proc);
+}
+
 SoupSession *
 soup_test_session_new (const char *propname, ...)
 {
diff --git a/tests/test-utils.h b/tests/test-utils.h
index 1b4cfa75..6a9a37c6 100644
--- a/tests/test-utils.h
+++ b/tests/test-utils.h
@@ -47,6 +47,9 @@ void apache_cleanup (void);
        } G_STMT_END
 #endif
 
+gboolean quart_init    (void);
+void     quart_cleanup (void);
+
 gboolean have_curl (void);
 
 typedef enum {


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]