[libsoup/carlosgc/io-split: 5/5] Rename SoupClientMessageIOData as SoupClientMessageIOHTTP1




commit 5343de56e07591e22376960e8f0ee1aa60c01407
Author: Carlos Garcia Campos <cgarcia igalia com>
Date:   Thu Apr 22 13:08:36 2021 +0200

    Rename SoupClientMessageIOData as SoupClientMessageIOHTTP1

 docs/reference/meson.build             |    1 +
 libsoup/meson.build                    |    2 +-
 libsoup/soup-client-message-io-http1.c | 1118 ++++++++++++++++++++++++++++++++
 libsoup/soup-client-message-io-http1.h |   10 +
 libsoup/soup-connection.c              |    3 +-
 libsoup/soup-message-io.c              | 1118 --------------------------------
 libsoup/soup-message-private.h         |    1 -
 7 files changed, 1132 insertions(+), 1121 deletions(-)
---
diff --git a/docs/reference/meson.build b/docs/reference/meson.build
index 38792607..3aea8764 100644
--- a/docs/reference/meson.build
+++ b/docs/reference/meson.build
@@ -40,6 +40,7 @@ ignore_headers = [
   'soup-message-metrics-private.h',
   'soup-client-message-io.h',
   'soup-message-io-completion.h',
+  'soup-client-message-io-http1.h',
 ]
 
 mkdb_args = [
diff --git a/libsoup/meson.build b/libsoup/meson.build
index ceafe5a1..1448f820 100644
--- a/libsoup/meson.build
+++ b/libsoup/meson.build
@@ -52,6 +52,7 @@ soup_sources = [
   'soup-body-output-stream.c',
   'soup-client-input-stream.c',
   'soup-client-message-io.c',
+  'soup-client-message-io-http1.c',
   'soup-connection.c',
   'soup-date-utils.c',
   'soup-filter-input-stream.c',
@@ -63,7 +64,6 @@ soup_sources = [
   'soup-logger-input-stream.c',
   'soup-message.c',
   'soup-message-headers.c',
-  'soup-message-io.c',
   'soup-message-io-data.c',
   'soup-message-metrics.c',
   'soup-message-queue-item.c',
diff --git a/libsoup/soup-client-message-io-http1.c b/libsoup/soup-client-message-io-http1.c
new file mode 100644
index 00000000..833a2c0a
--- /dev/null
+++ b/libsoup/soup-client-message-io-http1.c
@@ -0,0 +1,1118 @@
+/* -*- Mode: C; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 8 -*- */
+/*
+ * soup-message-io.c: HTTP message I/O
+ *
+ * Copyright (C) 2000-2003, Ximian, Inc.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include <glib/gi18n-lib.h>
+
+#ifdef HAVE_SYSPROF
+#include <sysprof-capture.h>
+#endif
+
+#include "soup-client-message-io-http1.h"
+#include "soup.h"
+#include "soup-body-input-stream.h"
+#include "soup-body-output-stream.h"
+#include "soup-client-input-stream.h"
+#include "soup-connection.h"
+#include "soup-content-processor.h"
+#include "content-sniffer/soup-content-sniffer-stream.h"
+#include "soup-filter-input-stream.h"
+#include "soup-logger-private.h"
+#include "soup-message-private.h"
+#include "soup-message-metrics-private.h"
+#include "soup-message-queue-item.h"
+#include "soup-misc.h"
+#include "soup-uri-utils-private.h"
+
+typedef struct {
+        SoupClientMessageIO iface;
+        SoupMessageIOData base;
+
+        SoupMessageQueueItem *item;
+
+        SoupMessageMetrics *metrics;
+
+#ifdef HAVE_SYSPROF
+        gint64 begin_time_nsec;
+#endif
+} SoupClientMessageIOHTTP1;
+
+#define RESPONSE_BLOCK_SIZE 8192
+#define HEADER_SIZE_LIMIT (64 * 1024)
+
+static void
+soup_client_message_io_http1_destroy (SoupClientMessageIO *iface)
+{
+        SoupClientMessageIOHTTP1 *io = (SoupClientMessageIOHTTP1 *)iface;
+
+        soup_message_io_data_cleanup (&io->base);
+        soup_message_queue_item_unref (io->item);
+
+        g_slice_free (SoupClientMessageIOHTTP1, io);
+}
+
+static int
+soup_client_message_io_http1_get_priority (SoupClientMessageIOHTTP1 *io)
+{
+        if (!io->item->task)
+                return G_PRIORITY_DEFAULT;
+
+        return g_task_get_priority (io->item->task);
+}
+
+static void
+soup_client_message_io_http1_finished (SoupClientMessageIO *iface)
+{
+        SoupClientMessageIOHTTP1 *io = (SoupClientMessageIOHTTP1 *)iface;
+        SoupMessageIOCompletionFn completion_cb;
+        gpointer completion_data;
+        SoupMessageIOCompletion completion;
+        SoupMessage *msg;
+
+        completion_cb = io->base.completion_cb;
+        completion_data = io->base.completion_data;
+
+        if ((io->base.read_state >= SOUP_MESSAGE_IO_STATE_FINISHING &&
+             io->base.write_state >= SOUP_MESSAGE_IO_STATE_FINISHING))
+                completion = SOUP_MESSAGE_IO_COMPLETE;
+        else
+                completion = SOUP_MESSAGE_IO_INTERRUPTED;
+
+        msg = g_object_ref (io->item->msg);
+        soup_connection_message_io_finished (io->item->conn, msg);
+        if (completion_cb)
+                completion_cb (G_OBJECT (msg), completion, completion_data);
+        g_object_unref (msg);
+}
+
+static void
+soup_client_message_io_http1_stolen (SoupClientMessageIO *iface)
+{
+        SoupClientMessageIOHTTP1 *io = (SoupClientMessageIOHTTP1 *)iface;
+        SoupMessageIOCompletionFn completion_cb;
+        gpointer completion_data;
+        SoupMessage *msg;
+
+        completion_cb = io->base.completion_cb;
+        completion_data = io->base.completion_data;
+
+        msg = g_object_ref (io->item->msg);
+        soup_connection_message_io_finished (io->item->conn, msg);
+        if (completion_cb)
+                completion_cb (G_OBJECT (msg), SOUP_MESSAGE_IO_STOLEN, completion_data);
+        g_object_unref (msg);
+}
+
+static gint
+processing_stage_cmp (gconstpointer a,
+                      gconstpointer b)
+{
+        SoupProcessingStage stage_a = soup_content_processor_get_processing_stage (SOUP_CONTENT_PROCESSOR 
((gpointer)a));
+        SoupProcessingStage stage_b = soup_content_processor_get_processing_stage (SOUP_CONTENT_PROCESSOR 
((gpointer)b));
+
+        if (stage_a > stage_b)
+                return 1;
+        if (stage_a == stage_b)
+                return 0;
+        return -1;
+}
+
+GInputStream *
+soup_message_setup_body_istream (GInputStream *body_stream,
+                                 SoupMessage *msg,
+                                 SoupSession *session,
+                                 SoupProcessingStage start_at_stage)
+{
+        GInputStream *istream;
+        GSList *p, *processors;
+
+        istream = g_object_ref (body_stream);
+
+        processors = soup_session_get_features (session, SOUP_TYPE_CONTENT_PROCESSOR);
+        processors = g_slist_sort (processors, processing_stage_cmp);
+
+        for (p = processors; p; p = p->next) {
+                GInputStream *wrapper;
+                SoupContentProcessor *processor;
+
+                processor = SOUP_CONTENT_PROCESSOR (p->data);
+                if (soup_message_disables_feature (msg, p->data) ||
+                    soup_content_processor_get_processing_stage (processor) < start_at_stage)
+                        continue;
+
+                wrapper = soup_content_processor_wrap_input (processor, istream, msg, NULL);
+                if (wrapper) {
+                        g_object_unref (istream);
+                        istream = wrapper;
+                }
+        }
+
+        g_slist_free (processors);
+
+        return istream;
+}
+
+static void
+request_body_stream_wrote_data_cb (SoupMessage *msg,
+                                   const void  *buffer,
+                                   guint        count,
+                                   gboolean     is_metadata)
+{
+        SoupClientMessageIOHTTP1 *client_io = (SoupClientMessageIOHTTP1 *)soup_message_get_io_data (msg);
+
+        if (client_io->metrics) {
+                client_io->metrics->request_body_bytes_sent += count;
+                if (!is_metadata)
+                        client_io->metrics->request_body_size += count;
+        }
+
+        if (!is_metadata)
+                soup_message_wrote_body_data (msg, count);
+}
+
+static void
+request_body_stream_wrote_cb (GOutputStream *ostream,
+                              GAsyncResult  *result,
+                              SoupMessage   *msg)
+{
+        SoupClientMessageIOHTTP1 *io;
+        gssize nwrote;
+        GCancellable *async_wait;
+        GError *error = NULL;
+
+        nwrote = g_output_stream_splice_finish (ostream, result, &error);
+
+        io = (SoupClientMessageIOHTTP1 *)soup_message_get_io_data (msg);
+        if (!io || !io->base.async_wait || io->base.body_ostream != ostream) {
+                g_clear_error (&error);
+                g_object_unref (msg);
+                return;
+        }
+
+        if (nwrote != -1)
+                io->base.write_state = SOUP_MESSAGE_IO_STATE_BODY_FLUSH;
+
+        if (error)
+                g_propagate_error (&io->base.async_error, error);
+        async_wait = io->base.async_wait;
+        io->base.async_wait = NULL;
+        g_cancellable_cancel (async_wait);
+        g_object_unref (async_wait);
+
+        g_object_unref (msg);
+}
+
+static void
+closed_async (GObject      *source,
+              GAsyncResult *result,
+              gpointer      user_data)
+{
+        GOutputStream *body_ostream = G_OUTPUT_STREAM (source);
+        SoupMessage *msg = user_data;
+        SoupClientMessageIOHTTP1 *io;
+        GCancellable *async_wait;
+
+        io = (SoupClientMessageIOHTTP1 *)soup_message_get_io_data (msg);
+        if (!io || !io->base.async_wait || io->base.body_ostream != body_ostream) {
+                g_object_unref (msg);
+                return;
+        }
+
+        g_output_stream_close_finish (body_ostream, result, &io->base.async_error);
+        g_clear_object (&io->base.body_ostream);
+
+        async_wait = io->base.async_wait;
+        io->base.async_wait = NULL;
+        g_cancellable_cancel (async_wait);
+        g_object_unref (async_wait);
+
+        g_object_unref (msg);
+}
+
+/*
+ * There are two request/response formats: the basic request/response,
+ * possibly with one or more unsolicited informational responses (such
+ * as the WebDAV "102 Processing" response):
+ *
+ *     Client                            Server
+ *      W:HEADERS  / R:NOT_STARTED    ->  R:HEADERS  / W:NOT_STARTED
+ *      W:BODY     / R:NOT_STARTED    ->  R:BODY     / W:NOT_STARTED
+ *     [W:DONE     / R:HEADERS (1xx)  <-  R:DONE     / W:HEADERS (1xx) ...]
+ *      W:DONE     / R:HEADERS        <-  R:DONE     / W:HEADERS
+ *      W:DONE     / R:BODY           <-  R:DONE     / W:BODY
+ *      W:DONE     / R:DONE               R:DONE     / W:DONE
+ *     
+ * and the "Expect: 100-continue" request/response, with the client
+ * blocking halfway through its request, and then either continuing or
+ * aborting, depending on the server response:
+ *
+ *     Client                            Server
+ *      W:HEADERS  / R:NOT_STARTED    ->  R:HEADERS  / W:NOT_STARTED
+ *      W:BLOCKING / R:HEADERS        <-  R:BLOCKING / W:HEADERS
+ *     [W:BODY     / R:BLOCKING       ->  R:BODY     / W:BLOCKING]
+ *     [W:DONE     / R:HEADERS        <-  R:DONE     / W:HEADERS]
+ *      W:DONE     / R:BODY           <-  R:DONE     / W:BODY
+ *      W:DONE     / R:DONE               R:DONE     / W:DONE
+ */
+
+static void
+write_headers (SoupMessage          *msg,
+               GString              *header,
+               SoupConnection       *conn,
+               SoupEncoding         *encoding)
+{
+        GUri *uri = soup_message_get_uri (msg);
+        char *uri_string;
+        SoupMessageHeadersIter iter;
+        const char *name, *value;
+
+        if (soup_message_get_method (msg) == SOUP_METHOD_CONNECT) {
+                char *uri_host = soup_uri_get_host_for_headers (uri);
+
+                /* CONNECT URI is hostname:port for tunnel destination */
+                uri_string = g_strdup_printf ("%s:%d", uri_host, g_uri_get_port (uri));
+                g_free (uri_host);
+        } else {
+                gboolean proxy = soup_connection_is_via_proxy (conn);
+
+                /* Proxy expects full URI to destination. Otherwise
+                 * just the path.
+                 */
+                if (proxy)
+                        uri_string = g_uri_to_string (uri);
+                else if (soup_message_get_is_options_ping (msg))
+                        uri_string = g_strdup ("*");
+                else
+                        uri_string = soup_uri_get_path_and_query (uri);
+
+                if (proxy && g_uri_get_fragment (uri)) {
+                        /* Strip fragment */
+                        char *fragment = strchr (uri_string, '#');
+                        if (fragment)
+                                *fragment = '\0';
+                }
+        }
+
+        g_string_append_printf (header, "%s %s HTTP/1.%d\r\n",
+                                soup_message_get_method (msg), uri_string,
+                                (soup_message_get_http_version (msg) == SOUP_HTTP_1_0) ? 0 : 1);
+        g_free (uri_string);
+
+        *encoding = soup_message_headers_get_encoding (soup_message_get_request_headers (msg));
+
+        soup_message_headers_iter_init (&iter, soup_message_get_request_headers (msg));
+        while (soup_message_headers_iter_next (&iter, &name, &value))
+                g_string_append_printf (header, "%s: %s\r\n", name, value);
+        g_string_append (header, "\r\n");
+}
+
+/* Attempts to push forward the writing side of @msg's I/O. Returns
+ * %TRUE if it manages to make some progress, and it is likely that
+ * further progress can be made. Returns %FALSE if it has reached a
+ * stopping point of some sort (need input from the application,
+ * socket not writable, write is complete, etc).
+ */
+static gboolean
+io_write (SoupClientMessageIOHTTP1 *client_io,
+          gboolean                  blocking,
+          GCancellable             *cancellable,
+          GError                  **error)
+{
+        SoupMessageIOData *io = &client_io->base;
+        SoupMessage *msg = client_io->item->msg;
+        SoupSessionFeature *logger;
+        gssize nwrote;
+
+        if (io->async_error) {
+                g_propagate_error (error, io->async_error);
+                io->async_error = NULL;
+                return FALSE;
+        } else if (io->async_wait) {
+                g_set_error_literal (error, G_IO_ERROR,
+                                     G_IO_ERROR_WOULD_BLOCK,
+                                     _("Operation would block"));
+                return FALSE;
+        }
+
+        switch (io->write_state) {
+        case SOUP_MESSAGE_IO_STATE_HEADERS:
+                if (!io->write_buf->len)
+                        write_headers (msg, io->write_buf, client_io->item->conn, &io->write_encoding);
+
+                while (io->written < io->write_buf->len) {
+                        nwrote = g_pollable_stream_write (io->ostream,
+                                                          io->write_buf->str + io->written,
+                                                          io->write_buf->len - io->written,
+                                                          blocking,
+                                                          cancellable, error);
+                        if (nwrote == -1)
+                                return FALSE;
+                        io->written += nwrote;
+                        if (client_io->metrics)
+                                client_io->metrics->request_header_bytes_sent += nwrote;
+                }
+
+                io->written = 0;
+                g_string_truncate (io->write_buf, 0);
+
+                if (io->write_encoding == SOUP_ENCODING_CONTENT_LENGTH)
+                        io->write_length = soup_message_headers_get_content_length 
(soup_message_get_request_headers (msg));
+
+                if (soup_message_headers_get_expectations (soup_message_get_request_headers (msg)) & 
SOUP_EXPECTATION_CONTINUE) {
+                        /* Need to wait for the Continue response */
+                        io->write_state = SOUP_MESSAGE_IO_STATE_BLOCKING;
+                        io->read_state = SOUP_MESSAGE_IO_STATE_HEADERS;
+                } else
+                        io->write_state = SOUP_MESSAGE_IO_STATE_BODY_START;
+
+                soup_message_wrote_headers (msg);
+                break;
+
+        case SOUP_MESSAGE_IO_STATE_BODY_START:
+                io->body_ostream = soup_body_output_stream_new (io->ostream,
+                                                                io->write_encoding,
+                                                                io->write_length);
+                io->write_state = SOUP_MESSAGE_IO_STATE_BODY;
+                logger = soup_session_get_feature_for_message (client_io->item->session,
+                                                               SOUP_TYPE_LOGGER, msg);
+                if (logger) {
+                        soup_logger_request_body_setup (SOUP_LOGGER (logger), msg,
+                                                        SOUP_BODY_OUTPUT_STREAM (io->body_ostream));
+                }
+                break;
+
+        case SOUP_MESSAGE_IO_STATE_BODY:
+                if (!io->write_length &&
+                    io->write_encoding != SOUP_ENCODING_EOF &&
+                    io->write_encoding != SOUP_ENCODING_CHUNKED) {
+                        io->write_state = SOUP_MESSAGE_IO_STATE_BODY_FLUSH;
+                        break;
+                }
+
+                if (soup_message_get_request_body_stream (msg)) {
+                        g_signal_connect_object (io->body_ostream,
+                                                 "wrote-data",
+                                                 G_CALLBACK (request_body_stream_wrote_data_cb),
+                                                 msg, G_CONNECT_SWAPPED);
+                        if (blocking) {
+                                nwrote = g_output_stream_splice (io->body_ostream,
+                                                                 soup_message_get_request_body_stream (msg),
+                                                                 G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE,
+                                                                 cancellable,
+                                                                 error);
+                                if (nwrote == -1)
+                                        return FALSE;
+                                io->write_state = SOUP_MESSAGE_IO_STATE_BODY_FLUSH;
+                                break;
+                        } else {
+                                io->async_wait = g_cancellable_new ();
+                                g_output_stream_splice_async (io->body_ostream,
+                                                              soup_message_get_request_body_stream (msg),
+                                                              G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE,
+                                                              soup_client_message_io_http1_get_priority 
(client_io),
+                                                              cancellable,
+                                                              
(GAsyncReadyCallback)request_body_stream_wrote_cb,
+                                                              g_object_ref (msg));
+                                return FALSE;
+                        }
+                } else
+                        io->write_state = SOUP_MESSAGE_IO_STATE_BODY_FLUSH;
+                break;
+
+        case SOUP_MESSAGE_IO_STATE_BODY_FLUSH:
+                if (io->body_ostream) {
+                        if (blocking || io->write_encoding != SOUP_ENCODING_CHUNKED) {
+                                if (!g_output_stream_close (io->body_ostream, cancellable, error))
+                                        return FALSE;
+                                g_clear_object (&io->body_ostream);
+                        } else {
+                                io->async_wait = g_cancellable_new ();
+                                g_output_stream_close_async (io->body_ostream,
+                                                             soup_client_message_io_http1_get_priority 
(client_io),
+                                                             cancellable,
+                                                             closed_async, g_object_ref (msg));
+                        }
+                }
+
+                io->write_state = SOUP_MESSAGE_IO_STATE_BODY_DONE;
+                break;
+
+        case SOUP_MESSAGE_IO_STATE_BODY_DONE:
+                io->write_state = SOUP_MESSAGE_IO_STATE_FINISHING;
+                soup_message_wrote_body (msg);
+                break;
+
+        case SOUP_MESSAGE_IO_STATE_FINISHING:
+                io->write_state = SOUP_MESSAGE_IO_STATE_DONE;
+                io->read_state = SOUP_MESSAGE_IO_STATE_HEADERS;
+                break;
+
+        default:
+                g_return_val_if_reached (FALSE);
+        }
+
+        return TRUE;
+}
+
+static gboolean
+parse_headers (SoupMessage  *msg,
+               char         *headers,
+               guint         headers_len,
+               SoupEncoding *encoding,
+               GError      **error)
+{
+        SoupHTTPVersion version;
+        char *reason_phrase;
+        SoupStatus status;
+
+        soup_message_set_reason_phrase (msg, NULL);
+
+        if (!soup_headers_parse_response (headers, headers_len,
+                                          soup_message_get_response_headers (msg),
+                                          &version,
+                                          &status,
+                                          &reason_phrase)) {
+                g_set_error_literal (error, SOUP_SESSION_ERROR,
+                                     SOUP_SESSION_ERROR_PARSING,
+                                     _("Could not parse HTTP response"));
+                return FALSE;
+        }
+
+        soup_message_set_status (msg, status, reason_phrase);
+        g_free (reason_phrase);
+
+        if (version < soup_message_get_http_version (msg))
+                soup_message_set_http_version (msg, version);
+
+        if ((soup_message_get_method (msg) == SOUP_METHOD_HEAD ||
+             soup_message_get_status (msg)  == SOUP_STATUS_NO_CONTENT ||
+             soup_message_get_status (msg)  == SOUP_STATUS_NOT_MODIFIED ||
+             SOUP_STATUS_IS_INFORMATIONAL (soup_message_get_status (msg))) ||
+            (soup_message_get_method (msg) == SOUP_METHOD_CONNECT &&
+             SOUP_STATUS_IS_SUCCESSFUL (soup_message_get_status (msg))))
+                *encoding = SOUP_ENCODING_NONE;
+        else
+                *encoding = soup_message_headers_get_encoding (soup_message_get_response_headers (msg));
+
+        if (*encoding == SOUP_ENCODING_UNRECOGNIZED) {
+                g_set_error_literal (error, SOUP_SESSION_ERROR,
+                                     SOUP_SESSION_ERROR_ENCODING,
+                                     _("Unrecognized HTTP response encoding"));
+                return FALSE;
+        }
+
+        return TRUE;
+}
+
+static void
+response_network_stream_read_data_cb (SoupMessage *msg,
+                                      guint        count)
+{
+        SoupClientMessageIOHTTP1 *client_io = (SoupClientMessageIOHTTP1 *)soup_message_get_io_data (msg);
+
+        if (client_io->base.read_state < SOUP_MESSAGE_IO_STATE_BODY_START)
+                client_io->metrics->response_header_bytes_received += count;
+        else
+                client_io->metrics->response_body_bytes_received += count;
+}
+
+/* Attempts to push forward the reading side of @msg's I/O. Returns
+ * %TRUE if it manages to make some progress, and it is likely that
+ * further progress can be made. Returns %FALSE if it has reached a
+ * stopping point of some sort (need input from the application,
+ * socket not readable, read is complete, etc).
+ */
+static gboolean
+io_read (SoupClientMessageIOHTTP1 *client_io,
+         gboolean                  blocking,
+         GCancellable             *cancellable,
+         GError                  **error)
+{
+        SoupMessageIOData *io = &client_io->base;
+        SoupMessage *msg = client_io->item->msg;
+        gboolean succeeded;
+        gboolean is_first_read;
+        gushort extra_bytes;
+
+        switch (io->read_state) {
+        case SOUP_MESSAGE_IO_STATE_HEADERS:
+                is_first_read = io->read_header_buf->len == 0 &&
+                        soup_message_get_status (msg) == SOUP_STATUS_NONE;
+
+                if (!soup_message_io_data_read_headers (io, blocking, cancellable, &extra_bytes, error))
+                        return FALSE;
+
+                if (client_io->metrics) {
+                        /* Adjust the header and body bytes received, since we might
+                         * have read part of the body already that is queued by the stream.
+                         */
+                        if (client_io->metrics->response_header_bytes_received > io->read_header_buf->len + 
extra_bytes) {
+                                client_io->metrics->response_body_bytes_received =
+                                        client_io->metrics->response_header_bytes_received - 
io->read_header_buf->len - extra_bytes;
+                                client_io->metrics->response_header_bytes_received -= 
client_io->metrics->response_body_bytes_received;
+                        }
+                }
+
+                if (is_first_read)
+                        soup_message_set_metrics_timestamp (msg, SOUP_MESSAGE_METRICS_RESPONSE_START);
+
+                succeeded = parse_headers (msg,
+                                           (char *)io->read_header_buf->data,
+                                           io->read_header_buf->len,
+                                           &io->read_encoding,
+                                           error);
+                g_byte_array_set_size (io->read_header_buf, 0);
+
+                if (!succeeded) {
+                        /* Either we couldn't parse the headers, or they
+                         * indicated something that would mean we wouldn't
+                         * be able to parse the body. (Eg, unknown
+                         * Transfer-Encoding.). Skip the rest of the
+                         * reading, and make sure the connection gets
+                         * closed when we're done.
+                         */
+                        soup_message_headers_append (soup_message_get_request_headers (msg),
+                                                     "Connection", "close");
+                        soup_message_set_metrics_timestamp (msg, SOUP_MESSAGE_METRICS_RESPONSE_END);
+                        io->read_state = SOUP_MESSAGE_IO_STATE_FINISHING;
+                        break;
+                }
+
+                if (SOUP_STATUS_IS_INFORMATIONAL (soup_message_get_status (msg))) {
+                        if (soup_message_get_status (msg) == SOUP_STATUS_CONTINUE &&
+                            io->write_state == SOUP_MESSAGE_IO_STATE_BLOCKING) {
+                                /* Pause the reader, unpause the writer */
+                                io->read_state =
+                                        SOUP_MESSAGE_IO_STATE_BLOCKING;
+                                io->write_state =
+                                        SOUP_MESSAGE_IO_STATE_BODY_START;
+                        } else {
+                                /* Just stay in HEADERS */
+                                io->read_state = SOUP_MESSAGE_IO_STATE_HEADERS;
+                        }
+
+                        /* Informational responses have no bodies, so
+                         * bail out here rather than parsing encoding, etc
+                         */
+                        soup_message_got_informational (msg);
+
+                        /* If this was "101 Switching Protocols", then
+                         * the session may have stolen the connection...
+                         */
+                        if (client_io != (SoupClientMessageIOHTTP1 *)soup_message_get_io_data (msg))
+                                return FALSE;
+
+                        soup_message_cleanup_response (msg);
+                        break;
+                } else {
+                        io->read_state = SOUP_MESSAGE_IO_STATE_BODY_START;
+
+                        /* If the client was waiting for a Continue
+                         * but got something else, then it's done
+                         * writing.
+                         */
+                        if (io->write_state == SOUP_MESSAGE_IO_STATE_BLOCKING)
+                                io->write_state = SOUP_MESSAGE_IO_STATE_FINISHING;
+                }
+
+                if (io->read_encoding == SOUP_ENCODING_CONTENT_LENGTH) {
+                        io->read_length = soup_message_headers_get_content_length 
(soup_message_get_response_headers (msg));
+
+                        if (!soup_message_is_keepalive (msg)) {
+                                /* Some servers suck and send
+                                 * incorrect Content-Length values, so
+                                 * allow EOF termination in this case
+                                 * (iff the message is too short) too.
+                                 */
+                                io->read_encoding = SOUP_ENCODING_EOF;
+                        }
+                } else
+                        io->read_length = -1;
+
+                soup_message_got_headers (msg);
+                break;
+
+        case SOUP_MESSAGE_IO_STATE_BODY_START:
+                if (!io->body_istream) {
+                        GInputStream *body_istream = soup_body_input_stream_new (G_INPUT_STREAM 
(io->istream),
+                                                                                 io->read_encoding,
+                                                                                 io->read_length);
+
+                        io->body_istream = soup_message_setup_body_istream (body_istream, msg,
+                                                                            client_io->item->session,
+                                                                            SOUP_STAGE_MESSAGE_BODY);
+                        g_object_unref (body_istream);
+                }
+
+                if (soup_message_get_content_sniffer (msg)) {
+                        SoupContentSnifferStream *sniffer_stream = SOUP_CONTENT_SNIFFER_STREAM 
(io->body_istream);
+                        const char *content_type;
+                        GHashTable *params;
+
+                        if (!soup_content_sniffer_stream_is_ready (sniffer_stream, blocking,
+                                                                   cancellable, error))
+                                return FALSE;
+
+                        content_type = soup_content_sniffer_stream_sniff (sniffer_stream, &params);
+                        soup_message_content_sniffed (msg, content_type, params);
+                }
+
+                io->read_state = SOUP_MESSAGE_IO_STATE_BODY;
+                break;
+
+        case SOUP_MESSAGE_IO_STATE_BODY: {
+                guchar buf[RESPONSE_BLOCK_SIZE];
+                gssize nread;
+
+                nread = g_pollable_stream_read (io->body_istream,
+                                                buf,
+                                                RESPONSE_BLOCK_SIZE,
+                                                blocking,
+                                                cancellable, error);
+                if (nread == -1)
+                        return FALSE;
+
+                if (nread == 0)
+                        io->read_state = SOUP_MESSAGE_IO_STATE_BODY_DONE;
+
+                if (client_io->metrics)
+                        client_io->metrics->response_body_size += nread;
+
+                break;
+        }
+
+        case SOUP_MESSAGE_IO_STATE_BODY_DONE:
+                io->read_state = SOUP_MESSAGE_IO_STATE_FINISHING;
+                soup_message_set_metrics_timestamp (msg, SOUP_MESSAGE_METRICS_RESPONSE_END);
+                soup_message_got_body (msg);
+                break;
+
+        case SOUP_MESSAGE_IO_STATE_FINISHING:
+                io->read_state = SOUP_MESSAGE_IO_STATE_DONE;
+                break;
+
+        default:
+                g_return_val_if_reached (FALSE);
+        }
+
+        return TRUE;
+}
+
+static gboolean
+request_is_restartable (SoupMessage *msg, GError *error)
+{
+        SoupClientMessageIOHTTP1 *client_io = (SoupClientMessageIOHTTP1 *)soup_message_get_io_data (msg);
+        SoupMessageIOData *io;
+
+        if (!client_io)
+                return FALSE;
+
+        io = &client_io->base;
+
+        return (io->read_state <= SOUP_MESSAGE_IO_STATE_HEADERS &&
+                io->read_header_buf->len == 0 &&
+                soup_connection_get_ever_used (client_io->item->conn) &&
+                !g_error_matches (error, G_IO_ERROR, G_IO_ERROR_TIMED_OUT) &&
+                !g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK) &&
+                !g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED) &&
+                error->domain != G_TLS_ERROR &&
+                SOUP_METHOD_IS_IDEMPOTENT (soup_message_get_method (msg)));
+}
+
+static gboolean
+io_run_until (SoupClientMessageIOHTTP1 *client_io,
+              gboolean                  blocking,
+              SoupMessageIOState        read_state,
+              SoupMessageIOState        write_state,
+              GCancellable             *cancellable,
+              GError                  **error)
+{
+        SoupMessageIOData *io = &client_io->base;
+        SoupMessage *msg = client_io->item->msg;
+        gboolean progress = TRUE, done;
+        GError *my_error = NULL;
+
+        if (g_cancellable_set_error_if_cancelled (cancellable, error))
+                return FALSE;
+        else if (!io) {
+                g_set_error_literal (error, G_IO_ERROR,
+                                     G_IO_ERROR_CANCELLED,
+                                     _("Operation was cancelled"));
+                return FALSE;
+        }
+
+        g_object_ref (msg);
+
+        while (progress && (SoupClientMessageIOHTTP1 *)soup_message_get_io_data (msg) == client_io &&
+               !io->paused && !io->async_wait &&
+               (io->read_state < read_state || io->write_state < write_state)) {
+
+                if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->read_state))
+                        progress = io_read (client_io, blocking, cancellable, &my_error);
+                else if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->write_state))
+                        progress = io_write (client_io, blocking, cancellable, &my_error);
+                else
+                        progress = FALSE;
+        }
+
+        if (my_error) {
+                g_propagate_error (error, my_error);
+                g_object_unref (msg);
+                return FALSE;
+        } else if ((SoupClientMessageIOHTTP1 *)soup_message_get_io_data (msg) != client_io) {
+                g_set_error_literal (error, G_IO_ERROR,
+                                     G_IO_ERROR_CANCELLED,
+                                     _("Operation was cancelled"));
+                g_object_unref (msg);
+                return FALSE;
+        } else if (!io->async_wait &&
+                   g_cancellable_set_error_if_cancelled (cancellable, error)) {
+                g_object_unref (msg);
+                return FALSE;
+        }
+
+        done = (io->read_state >= read_state &&
+                io->write_state >= write_state);
+
+        if (!blocking && !done) {
+                g_set_error_literal (error, G_IO_ERROR,
+                                     G_IO_ERROR_WOULD_BLOCK,
+                                     _("Operation would block"));
+                g_object_unref (msg);
+                return FALSE;
+        }
+
+#ifdef HAVE_SYSPROF
+        /* Allow profiling of network requests. */
+        if (io->read_state == SOUP_MESSAGE_IO_STATE_DONE &&
+            io->write_state == SOUP_MESSAGE_IO_STATE_DONE) {
+                GUri *uri = soup_message_get_uri (msg);
+                char *uri_str = g_uri_to_string_partial (uri, G_URI_HIDE_PASSWORD);
+                const gchar *last_modified = soup_message_headers_get_one (soup_message_get_response_headers 
(msg), "Last-Modified");
+                const gchar *etag = soup_message_headers_get_one (soup_message_get_response_headers (msg), 
"ETag");
+                const gchar *if_modified_since = soup_message_headers_get_one 
(soup_message_get_request_headers (msg), "If-Modified-Since");
+                const gchar *if_none_match = soup_message_headers_get_one (soup_message_get_request_headers 
(msg), "If-None-Match");
+
+                /* FIXME: Expand and generalise sysprof support:
+                 * https://gitlab.gnome.org/GNOME/sysprof/-/issues/43 */
+                sysprof_collector_mark_printf (client_io->begin_time_nsec,
+                                               SYSPROF_CAPTURE_CURRENT_TIME - client_io->begin_time_nsec,
+                                               "libsoup", "message",
+                                               "%s request/response to %s: "
+                                               "read %" G_GOFFSET_FORMAT "B, "
+                                               "wrote %" G_GOFFSET_FORMAT "B, "
+                                               "If-Modified-Since: %s, "
+                                               "If-None-Match: %s, "
+                                               "Last-Modified: %s, "
+                                               "ETag: %s",
+                                               soup_message_get_tls_peer_certificate (msg) ? "HTTPS" : 
"HTTP",
+                                               uri_str, io->read_length, io->write_length,
+                                               (if_modified_since != NULL) ? if_modified_since : "(unset)",
+                                               (if_none_match != NULL) ? if_none_match : "(unset)",
+                                               (last_modified != NULL) ? last_modified : "(unset)",
+                                               (etag != NULL) ? etag : "(unset)");
+                g_free (uri_str);
+        }
+#endif  /* HAVE_SYSPROF */
+
+        g_object_unref (msg);
+        return done;
+}
+
+static void
+soup_message_io_finish (SoupMessage  *msg,
+                        GError       *error)
+{
+        if (request_is_restartable (msg, error)) {
+                SoupClientMessageIOHTTP1 *io = (SoupClientMessageIOHTTP1 *)soup_message_get_io_data (msg);
+
+                /* Connection got closed, but we can safely try again. */
+                io->item->state = SOUP_MESSAGE_RESTARTING;
+        } else if (error) {
+                soup_message_set_metrics_timestamp (msg, SOUP_MESSAGE_METRICS_RESPONSE_END);
+        }
+
+        soup_message_io_finished (msg);
+}
+
+static void soup_client_message_io_http1_run (SoupClientMessageIO *iface, gboolean blocking);
+
+static gboolean
+io_run_ready (SoupMessage *msg, gpointer user_data)
+{
+        soup_client_message_io_http1_run (soup_message_get_io_data (msg), FALSE);
+        return FALSE;
+}
+
+static void
+soup_client_message_io_http1_run (SoupClientMessageIO *iface,
+                                  gboolean             blocking)
+{
+        SoupClientMessageIOHTTP1 *client_io = (SoupClientMessageIOHTTP1 *)iface;
+        SoupMessageIOData *io = &client_io->base;
+        SoupMessage *msg = client_io->item->msg;
+        GError *error = NULL;
+
+        if (io->io_source) {
+                g_source_destroy (io->io_source);
+                g_source_unref (io->io_source);
+                io->io_source = NULL;
+        }
+
+        g_object_ref (msg);
+
+        if (io_run_until (client_io, blocking,
+                          SOUP_MESSAGE_IO_STATE_DONE,
+                          SOUP_MESSAGE_IO_STATE_DONE,
+                          client_io->item->cancellable, &error)) {
+                soup_message_io_finished (msg);
+        } else if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
+                g_clear_error (&error);
+                io->io_source = soup_message_io_data_get_source (io, G_OBJECT (msg),
+                                                                 client_io->item->cancellable,
+                                                                 (SoupMessageIOSourceFunc)io_run_ready,
+                                                                 NULL);
+                g_source_set_priority (io->io_source,
+                                       soup_client_message_io_http1_get_priority (client_io));
+                g_source_attach (io->io_source, g_main_context_get_thread_default ());
+        } else {
+                if ((SoupClientMessageIOHTTP1 *)soup_message_get_io_data (msg) == client_io)
+                        soup_message_io_finish (msg, error);
+                g_error_free (error);
+
+        }
+
+        g_object_unref (msg);
+}
+
+static gboolean
+soup_client_message_io_http1_run_until_read (SoupClientMessageIO *iface,
+                                             GCancellable        *cancellable,
+                                             GError             **error)
+{
+        SoupClientMessageIOHTTP1 *io = (SoupClientMessageIOHTTP1 *)iface;
+        SoupMessage *msg = io->item->msg;
+
+        if (io_run_until (io, TRUE,
+                          SOUP_MESSAGE_IO_STATE_BODY,
+                          SOUP_MESSAGE_IO_STATE_ANY,
+                          cancellable, error))
+                return TRUE;
+
+        if ((SoupClientMessageIOHTTP1 *)soup_message_get_io_data (msg) == io)
+                soup_message_io_finish (msg, *error);
+
+        return FALSE;
+}
+
+static void io_run_until_read_async (SoupClientMessageIOHTTP1 *io, GTask *task);
+
+static gboolean
+io_run_until_read_ready (SoupMessage *msg,
+                         gpointer     user_data)
+{
+        GTask *task = user_data;
+
+        io_run_until_read_async ((SoupClientMessageIOHTTP1 *)soup_message_get_io_data (msg), task);
+        return FALSE;
+}
+
+static void
+io_run_until_read_async (SoupClientMessageIOHTTP1 *client_io,
+                         GTask                    *task)
+{
+        SoupMessageIOData *io = &client_io->base;
+        SoupMessage *msg = client_io->item->msg;
+        GError *error = NULL;
+
+        if (io->io_source) {
+                g_source_destroy (io->io_source);
+                g_source_unref (io->io_source);
+                io->io_source = NULL;
+        }
+
+        if (io_run_until (client_io, FALSE,
+                          SOUP_MESSAGE_IO_STATE_BODY,
+                          SOUP_MESSAGE_IO_STATE_ANY,
+                          g_task_get_cancellable (task),
+                          &error)) {
+                g_task_return_boolean (task, TRUE);
+                g_object_unref (task);
+                return;
+        }
+
+        if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
+                g_error_free (error);
+                io->io_source = soup_message_io_data_get_source (io, G_OBJECT (msg), g_task_get_cancellable 
(task),
+                                                                 
(SoupMessageIOSourceFunc)io_run_until_read_ready,
+                                                                 task);
+                g_source_set_priority (io->io_source, g_task_get_priority (task));
+                g_source_attach (io->io_source, g_main_context_get_thread_default ());
+                return;
+        }
+
+        if ((SoupClientMessageIOHTTP1 *)soup_message_get_io_data (msg) == client_io)
+                soup_message_io_finish (msg, error);
+
+        g_task_return_error (task, error);
+        g_object_unref (task);
+}
+
+static void
+soup_client_message_io_http1_run_until_read_async (SoupClientMessageIO *iface,
+                                                   int                  io_priority,
+                                                   GCancellable        *cancellable,
+                                                   GAsyncReadyCallback  callback,
+                                                   gpointer             user_data)
+{
+        SoupClientMessageIOHTTP1 *io = (SoupClientMessageIOHTTP1 *)iface;
+        SoupMessage *msg = io->item->msg;
+        GTask *task;
+
+        task = g_task_new (msg, cancellable, callback, user_data);
+        g_task_set_priority (task, io_priority);
+        io_run_until_read_async (io, task);
+}
+
+static gboolean
+soup_client_message_io_http1_run_until_finish (SoupClientMessageIO *iface,
+                                               gboolean             blocking,
+                                               GCancellable        *cancellable,
+                                               GError             **error)
+{
+        SoupClientMessageIOHTTP1 *io = (SoupClientMessageIOHTTP1 *)iface;
+        SoupMessage *msg = io->item->msg;
+        gboolean success;
+
+        g_object_ref (msg);
+
+        if (io) {
+                if (io->base.read_state < SOUP_MESSAGE_IO_STATE_BODY_DONE)
+                        io->base.read_state = SOUP_MESSAGE_IO_STATE_FINISHING;
+        }
+
+        success = io_run_until (io, blocking,
+                                SOUP_MESSAGE_IO_STATE_DONE,
+                                SOUP_MESSAGE_IO_STATE_DONE,
+                                cancellable, error);
+
+        g_object_unref (msg);
+        return success;
+}
+
+static void
+client_stream_eof (SoupClientInputStream    *stream,
+                   SoupClientMessageIOHTTP1 *io)
+{
+        if (io && io->base.read_state == SOUP_MESSAGE_IO_STATE_BODY)
+                io->base.read_state = SOUP_MESSAGE_IO_STATE_BODY_DONE;
+}
+
+static GInputStream *
+soup_client_message_io_http1_get_response_stream (SoupClientMessageIO *iface,
+                                                  GError             **error)
+{
+        SoupClientMessageIOHTTP1 *io = (SoupClientMessageIOHTTP1 *)iface;
+        SoupMessage *msg = io->item->msg;
+        GInputStream *client_stream;
+
+        client_stream = soup_client_input_stream_new (io->base.body_istream, msg);
+        g_signal_connect (client_stream, "eof",
+                          G_CALLBACK (client_stream_eof), io);
+
+        return client_stream;
+}
+
+static void
+soup_client_message_io_http1_send_item (SoupClientMessageIO       *iface,
+                                        SoupMessageQueueItem      *item,
+                                        SoupMessageIOCompletionFn  completion_cb,
+                                        gpointer                   user_data)
+{
+        SoupClientMessageIOHTTP1 *io = (SoupClientMessageIOHTTP1 *)iface;
+
+        io->item = soup_message_queue_item_ref (item);
+        io->base.completion_cb = completion_cb;
+        io->base.completion_data = user_data;
+
+        io->metrics = soup_message_get_metrics (io->item->msg);
+        if (io->metrics) {
+                g_signal_connect_object (io->base.istream, "read-data",
+                                         G_CALLBACK (response_network_stream_read_data_cb),
+                                         io->item->msg, G_CONNECT_SWAPPED);
+        }
+
+#ifdef HAVE_SYSPROF
+        io->begin_time_nsec = SYSPROF_CAPTURE_CURRENT_TIME;
+#endif
+}
+
+static void
+soup_client_message_io_http1_pause (SoupClientMessageIO *iface)
+{
+        SoupClientMessageIOHTTP1 *io = (SoupClientMessageIOHTTP1 *)iface;
+
+        g_return_if_fail (io->base.read_state < SOUP_MESSAGE_IO_STATE_BODY);
+
+        soup_message_io_data_pause (&io->base);
+}
+
+static void
+soup_client_message_io_http1_unpause (SoupClientMessageIO *iface)
+{
+        SoupClientMessageIOHTTP1 *io = (SoupClientMessageIOHTTP1 *)iface;
+
+        g_return_if_fail (io->base.read_state < SOUP_MESSAGE_IO_STATE_BODY);
+        io->base.paused = FALSE;
+}
+
+static gboolean
+soup_client_message_io_http1_is_paused (SoupClientMessageIO *iface)
+{
+        SoupClientMessageIOHTTP1 *io = (SoupClientMessageIOHTTP1 *)iface;
+
+        return io->base.paused;
+}
+
+static const SoupClientMessageIOFuncs io_funcs = {
+        soup_client_message_io_http1_destroy,
+        soup_client_message_io_http1_finished,
+        soup_client_message_io_http1_stolen,
+        soup_client_message_io_http1_send_item,
+        soup_client_message_io_http1_get_response_stream,
+        soup_client_message_io_http1_pause,
+        soup_client_message_io_http1_unpause,
+        soup_client_message_io_http1_is_paused,
+        soup_client_message_io_http1_run,
+        soup_client_message_io_http1_run_until_read,
+        soup_client_message_io_http1_run_until_read_async,
+        soup_client_message_io_http1_run_until_finish
+};
+
+SoupClientMessageIO *
+soup_client_message_io_http1_new (GIOStream *stream)
+{
+        SoupClientMessageIOHTTP1 *io;
+
+        io = g_slice_new0 (SoupClientMessageIOHTTP1);
+        io->base.iostream = g_object_ref (stream);
+        io->base.istream = SOUP_FILTER_INPUT_STREAM (g_io_stream_get_input_stream (io->base.iostream));
+        io->base.ostream = g_io_stream_get_output_stream (io->base.iostream);
+
+        io->base.read_header_buf = g_byte_array_new ();
+        io->base.write_buf = g_string_new (NULL);
+
+        io->base.read_state = SOUP_MESSAGE_IO_STATE_NOT_STARTED;
+        io->base.write_state = SOUP_MESSAGE_IO_STATE_HEADERS;
+
+        io->iface.funcs = &io_funcs;
+
+        return (SoupClientMessageIO *)io;
+}
diff --git a/libsoup/soup-client-message-io-http1.h b/libsoup/soup-client-message-io-http1.h
new file mode 100644
index 00000000..e749360d
--- /dev/null
+++ b/libsoup/soup-client-message-io-http1.h
@@ -0,0 +1,10 @@
+/* -*- Mode: C; tab-width: 8; indent-tabs-mode: nil; c-basic-offset: 8 -*- */
+/*
+ * Copyright (C) 2021 Igalia S.L.
+ */
+
+#pragma once
+
+#include "soup-client-message-io.h"
+
+SoupClientMessageIO *soup_client_message_io_http1_new (GIOStream *stream);
diff --git a/libsoup/soup-connection.c b/libsoup/soup-connection.c
index 1c36fb89..00c1840e 100644
--- a/libsoup/soup-connection.c
+++ b/libsoup/soup-connection.c
@@ -13,6 +13,7 @@
 #include "soup.h"
 #include "soup-io-stream.h"
 #include "soup-message-queue-item.h"
+#include "soup-client-message-io-http1.h"
 #include "soup-socket-properties.h"
 #include "soup-private-enum-types.h"
 #include <gio/gnetworking.h>
@@ -1045,7 +1046,7 @@ soup_connection_setup_message_io (SoupConnection *conn,
                 priv->reusable = FALSE;
 
         g_assert (priv->io_data == NULL);
-        priv->io_data = soup_client_message_io_data_new (priv->iostream);
+        priv->io_data = soup_client_message_io_http1_new (priv->iostream);
 
         return priv->io_data;
 }
diff --git a/libsoup/soup-message-private.h b/libsoup/soup-message-private.h
index d8cfa0bb..4efba24e 100644
--- a/libsoup/soup-message-private.h
+++ b/libsoup/soup-message-private.h
@@ -45,7 +45,6 @@ SoupAuth      *soup_message_get_proxy_auth (SoupMessage *msg);
 GUri          *soup_message_get_uri_for_auth (SoupMessage *msg);
 
 /* I/O */
-SoupClientMessageIO *soup_client_message_io_data_new       (GIOStream                 *stream);
 void       soup_message_io_run         (SoupMessage *msg,
                                        gboolean     blocking);
 void       soup_message_io_finished    (SoupMessage *msg);


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]