[libsoup/carlosgc/message-io: 1/2] io-http1: split SoupClientMessageIOHTTP1 in two




commit f59d259735295b1d35561aafdcf9b5c8df6b0aad
Author: Carlos Garcia Campos <cgarcia igalia com>
Date:   Sat May 15 16:22:59 2021 +0200

    io-http1: split SoupClientMessageIOHTTP1 in two
    
    Move the message io to SoupMessageIOHTTP1 struct. This is how http2 IO
    will work, but in this case there's only one message io. This allows to
    create the SoupClientMessageIO earlier in SoupConnection and we will
    move http1 specific code to the interface implementation.

 libsoup/server/soup-server-io.c        |  29 +++--
 libsoup/soup-client-input-stream.c     |   4 +
 libsoup/soup-client-message-io-http1.c | 219 +++++++++++++++++++--------------
 libsoup/soup-connection.c              |  25 ++--
 libsoup/soup-connection.h              |   2 -
 libsoup/soup-message-io-data.c         |  43 ++++---
 libsoup/soup-message-io-data.h         |  22 ++--
 7 files changed, 196 insertions(+), 148 deletions(-)
---
diff --git a/libsoup/server/soup-server-io.c b/libsoup/server/soup-server-io.c
index a806d2f3..cc1cfc3b 100644
--- a/libsoup/server/soup-server-io.c
+++ b/libsoup/server/soup-server-io.c
@@ -22,6 +22,10 @@
 struct _SoupServerMessageIOData {
         SoupMessageIOData base;
 
+        GIOStream *iostream;
+        GInputStream *istream;
+        GOutputStream *ostream;
+
        GBytes  *write_chunk;
        goffset  write_body_offset;
 
@@ -39,6 +43,8 @@ soup_server_message_io_data_free (SoupServerMessageIOData *io)
         if (!io)
                 return;
 
+        g_clear_object (&io->iostream);
+
         soup_message_io_data_cleanup (&io->base);
 
        if (io->unpause_source) {
@@ -90,10 +96,10 @@ soup_server_message_io_steal (SoupServerMessage *msg)
        GIOStream *iostream;
 
         io = soup_server_message_get_io_data (msg);
-        if (!io || !io->base.iostream)
+        if (!io || !io->iostream)
                 return NULL;
 
-        iostream = g_object_ref (io->base.iostream);
+        iostream = g_object_ref (io->iostream);
         completion_cb = io->base.completion_cb;
        completion_data = io->base.completion_data;
 
@@ -363,7 +369,7 @@ io_write (SoupServerMessage *msg,
                         write_headers (msg, io->write_buf, &io->write_encoding);
 
                 while (io->written < io->write_buf->len) {
-                        nwrote = g_pollable_stream_write (io->ostream,
+                        nwrote = g_pollable_stream_write (server_io->ostream,
                                                           io->write_buf->str + io->written,
                                                           io->write_buf->len - io->written,
                                                           FALSE,
@@ -420,7 +426,7 @@ io_write (SoupServerMessage *msg,
                 break;
 
         case SOUP_MESSAGE_IO_STATE_BODY_START:
-                io->body_ostream = soup_body_output_stream_new (io->ostream,
+                io->body_ostream = soup_body_output_stream_new (server_io->ostream,
                                                                 io->write_encoding,
                                                                 io->write_length);
                 io->write_state = SOUP_MESSAGE_IO_STATE_BODY;
@@ -661,7 +667,7 @@ io_read (SoupServerMessage *msg,
 
         switch (io->read_state) {
         case SOUP_MESSAGE_IO_STATE_HEADERS:
-                if (!soup_message_io_data_read_headers (io, FALSE, NULL, NULL, error)) {
+                if (!soup_message_io_data_read_headers (io, SOUP_FILTER_INPUT_STREAM (server_io->istream), 
FALSE, NULL, NULL, error)) {
                        if (g_error_matches (*error, G_IO_ERROR, G_IO_ERROR_PARTIAL_INPUT))
                                soup_server_message_set_status (msg, SOUP_STATUS_BAD_REQUEST, NULL);
                         return FALSE;
@@ -711,7 +717,7 @@ io_read (SoupServerMessage *msg,
 
         case SOUP_MESSAGE_IO_STATE_BODY_START:
                 if (!io->body_istream) {
-                        io->body_istream = soup_body_input_stream_new (G_INPUT_STREAM (io->istream),
+                        io->body_istream = soup_body_input_stream_new (server_io->istream,
                                                                        io->read_encoding,
                                                                        io->read_length);
 
@@ -850,7 +856,10 @@ io_run (SoupServerMessage *msg)
                 soup_server_message_io_finished (msg);
         } else if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
                 g_clear_error (&error);
-                io->io_source = soup_message_io_data_get_source (io, G_OBJECT (msg), NULL,
+                io->io_source = soup_message_io_data_get_source (io, G_OBJECT (msg),
+                                                                 server_io->istream,
+                                                                 server_io->ostream,
+                                                                 NULL,
                                                                 (SoupMessageIOSourceFunc)io_run_ready,
                                                                 NULL);
                 g_source_attach (io->io_source, server_io->async_context);
@@ -875,9 +884,9 @@ soup_server_message_read_request (SoupServerMessage        *msg,
         io->base.completion_data = user_data;
 
        sock = soup_server_message_get_soup_socket (msg);
-        io->base.iostream = g_object_ref (soup_socket_get_iostream (sock));
-        io->base.istream = SOUP_FILTER_INPUT_STREAM (g_io_stream_get_input_stream (io->base.iostream));
-        io->base.ostream = g_io_stream_get_output_stream (io->base.iostream);
+        io->iostream = g_object_ref (soup_socket_get_iostream (sock));
+        io->istream = g_io_stream_get_input_stream (io->iostream);
+        io->ostream = g_io_stream_get_output_stream (io->iostream);
 
         io->base.read_header_buf = g_byte_array_new ();
         io->base.write_buf = g_string_new (NULL);
diff --git a/libsoup/soup-client-input-stream.c b/libsoup/soup-client-input-stream.c
index e32b90f8..098cda5e 100644
--- a/libsoup/soup-client-input-stream.c
+++ b/libsoup/soup-client-input-stream.c
@@ -241,8 +241,12 @@ soup_client_input_stream_close_async (GInputStream        *stream,
        g_task_set_priority (task, priority);
 
        if (close_async_ready (priv->msg, task) == G_SOURCE_CONTINUE) {
+                /* When SoupClientInputStream is created we always have a body input stream,
+                 * and we finished writing, so it's safe to pass NULL for the streams
+                 */
                source = soup_message_io_data_get_source ((SoupMessageIOData *)soup_message_get_io_data 
(priv->msg),
                                                          G_OBJECT (priv->msg),
+                                                          NULL, NULL,
                                                          cancellable, NULL, NULL);
 
                g_task_attach_source (task, source, (GSourceFunc) close_async_ready);
diff --git a/libsoup/soup-client-message-io-http1.c b/libsoup/soup-client-message-io-http1.c
index 32dbf7bb..392dbb05 100644
--- a/libsoup/soup-client-message-io-http1.c
+++ b/libsoup/soup-client-message-io-http1.c
@@ -31,7 +31,6 @@
 #include "soup-uri-utils-private.h"
 
 typedef struct {
-        SoupClientMessageIO iface;
         SoupMessageIOData base;
 
         SoupMessageQueueItem *item;
@@ -41,18 +40,36 @@ typedef struct {
 #ifdef HAVE_SYSPROF
         gint64 begin_time_nsec;
 #endif
+} SoupMessageIOHTTP1;
+
+typedef struct {
+        SoupClientMessageIO iface;
+
+        GIOStream *iostream;
+        GInputStream *istream;
+        GOutputStream *ostream;
+
+        SoupMessageIOHTTP1 *msg_io;
 } SoupClientMessageIOHTTP1;
 
 #define RESPONSE_BLOCK_SIZE 8192
 #define HEADER_SIZE_LIMIT (64 * 1024)
 
+static void
+soup_message_io_http1_free (SoupMessageIOHTTP1 *msg_io)
+{
+        soup_message_io_data_cleanup (&msg_io->base);
+        soup_message_queue_item_unref (msg_io->item);
+        g_free (msg_io);
+}
+
 static void
 soup_client_message_io_http1_destroy (SoupClientMessageIO *iface)
 {
         SoupClientMessageIOHTTP1 *io = (SoupClientMessageIOHTTP1 *)iface;
 
-        soup_message_io_data_cleanup (&io->base);
-        soup_message_queue_item_unref (io->item);
+        g_clear_object (&io->iostream);
+        g_clear_pointer (&io->msg_io, soup_message_io_http1_free);
 
         g_slice_free (SoupClientMessageIOHTTP1, io);
 }
@@ -60,10 +77,10 @@ soup_client_message_io_http1_destroy (SoupClientMessageIO *iface)
 static int
 soup_client_message_io_http1_get_priority (SoupClientMessageIOHTTP1 *io)
 {
-        if (!io->item->task)
+        if (!io->msg_io->item->task)
                 return G_PRIORITY_DEFAULT;
 
-        return g_task_get_priority (io->item->task);
+        return g_task_get_priority (io->msg_io->item->task);
 }
 
 static void
@@ -74,15 +91,15 @@ soup_client_message_io_complete (SoupClientMessageIOHTTP1 *io,
         SoupMessageIOCompletionFn completion_cb;
         gpointer completion_data;
 
-        completion_cb = io->base.completion_cb;
-        completion_data = io->base.completion_data;
+        completion_cb = io->msg_io->base.completion_cb;
+        completion_data = io->msg_io->base.completion_data;
 
-        msg = g_object_ref (msg);
-        if (io->base.istream)
-                g_signal_handlers_disconnect_by_data (io->base.istream, msg);
-        if (io->base.body_ostream)
-                g_signal_handlers_disconnect_by_data (io->base.body_ostream, msg);
-        soup_connection_message_io_finished (soup_message_get_connection (msg), msg);
+        g_object_ref (msg);
+        if (io->istream)
+                g_signal_handlers_disconnect_by_data (io->istream, msg);
+        if (io->msg_io->base.body_ostream)
+                g_signal_handlers_disconnect_by_data (io->msg_io->base.body_ostream, msg);
+        g_clear_pointer (&io->msg_io, soup_message_io_http1_free);
         if (completion_cb)
                 completion_cb (G_OBJECT (msg), completion, completion_data);
         g_object_unref (msg);
@@ -95,8 +112,8 @@ soup_client_message_io_http1_finished (SoupClientMessageIO *iface,
         SoupClientMessageIOHTTP1 *io = (SoupClientMessageIOHTTP1 *)iface;
         SoupMessageIOCompletion completion;
 
-        if ((io->base.read_state >= SOUP_MESSAGE_IO_STATE_FINISHING &&
-             io->base.write_state >= SOUP_MESSAGE_IO_STATE_FINISHING))
+        if ((io->msg_io->base.read_state >= SOUP_MESSAGE_IO_STATE_FINISHING &&
+             io->msg_io->base.write_state >= SOUP_MESSAGE_IO_STATE_FINISHING))
                 completion = SOUP_MESSAGE_IO_COMPLETE;
         else
                 completion = SOUP_MESSAGE_IO_INTERRUPTED;
@@ -109,7 +126,7 @@ soup_client_message_io_http1_stolen (SoupClientMessageIO *iface)
 {
         SoupClientMessageIOHTTP1 *io = (SoupClientMessageIOHTTP1 *)iface;
 
-        soup_client_message_io_complete (io, io->item->msg, SOUP_MESSAGE_IO_STOLEN);
+        soup_client_message_io_complete (io, io->msg_io->item->msg, SOUP_MESSAGE_IO_STOLEN);
 }
 
 static void
@@ -120,10 +137,10 @@ request_body_stream_wrote_data_cb (SoupMessage *msg,
 {
         SoupClientMessageIOHTTP1 *client_io = (SoupClientMessageIOHTTP1 *)soup_message_get_io_data (msg);
 
-        if (client_io->metrics) {
-                client_io->metrics->request_body_bytes_sent += count;
+        if (client_io->msg_io->metrics) {
+                client_io->msg_io->metrics->request_body_bytes_sent += count;
                 if (!is_metadata)
-                        client_io->metrics->request_body_size += count;
+                        client_io->msg_io->metrics->request_body_size += count;
         }
 
         if (!is_metadata)
@@ -143,19 +160,19 @@ request_body_stream_wrote_cb (GOutputStream *ostream,
         nwrote = g_output_stream_splice_finish (ostream, result, &error);
 
         io = (SoupClientMessageIOHTTP1 *)soup_message_get_io_data (msg);
-        if (!io || !io->base.async_wait || io->base.body_ostream != ostream) {
+        if (!io || !io->msg_io || !io->msg_io->base.async_wait || io->msg_io->base.body_ostream != ostream) {
                 g_clear_error (&error);
                 g_object_unref (msg);
                 return;
         }
 
         if (nwrote != -1)
-                io->base.write_state = SOUP_MESSAGE_IO_STATE_BODY_FLUSH;
+                io->msg_io->base.write_state = SOUP_MESSAGE_IO_STATE_BODY_FLUSH;
 
         if (error)
-                g_propagate_error (&io->base.async_error, error);
-        async_wait = io->base.async_wait;
-        io->base.async_wait = NULL;
+                g_propagate_error (&io->msg_io->base.async_error, error);
+        async_wait = io->msg_io->base.async_wait;
+        io->msg_io->base.async_wait = NULL;
         g_cancellable_cancel (async_wait);
         g_object_unref (async_wait);
 
@@ -173,16 +190,16 @@ closed_async (GObject      *source,
         GCancellable *async_wait;
 
         io = (SoupClientMessageIOHTTP1 *)soup_message_get_io_data (msg);
-        if (!io || !io->base.async_wait || io->base.body_ostream != body_ostream) {
+        if (!io || !io->msg_io || !io->msg_io->base.async_wait || io->msg_io->base.body_ostream != 
body_ostream) {
                 g_object_unref (msg);
                 return;
         }
 
-        g_output_stream_close_finish (body_ostream, result, &io->base.async_error);
-        g_clear_object (&io->base.body_ostream);
+        g_output_stream_close_finish (body_ostream, result, &io->msg_io->base.async_error);
+        g_clear_object (&io->msg_io->base.body_ostream);
 
-        async_wait = io->base.async_wait;
-        io->base.async_wait = NULL;
+        async_wait = io->msg_io->base.async_wait;
+        io->msg_io->base.async_wait = NULL;
         g_cancellable_cancel (async_wait);
         g_object_unref (async_wait);
 
@@ -277,8 +294,8 @@ io_write (SoupClientMessageIOHTTP1 *client_io,
           GCancellable             *cancellable,
           GError                  **error)
 {
-        SoupMessageIOData *io = &client_io->base;
-        SoupMessage *msg = client_io->item->msg;
+        SoupMessageIOData *io = &client_io->msg_io->base;
+        SoupMessage *msg = client_io->msg_io->item->msg;
         SoupSessionFeature *logger;
         gssize nwrote;
 
@@ -299,7 +316,7 @@ io_write (SoupClientMessageIOHTTP1 *client_io,
                         write_headers (msg, io->write_buf, &io->write_encoding);
 
                 while (io->written < io->write_buf->len) {
-                        nwrote = g_pollable_stream_write (io->ostream,
+                        nwrote = g_pollable_stream_write (client_io->ostream,
                                                           io->write_buf->str + io->written,
                                                           io->write_buf->len - io->written,
                                                           blocking,
@@ -307,8 +324,8 @@ io_write (SoupClientMessageIOHTTP1 *client_io,
                         if (nwrote == -1)
                                 return FALSE;
                         io->written += nwrote;
-                        if (client_io->metrics)
-                                client_io->metrics->request_header_bytes_sent += nwrote;
+                        if (client_io->msg_io->metrics)
+                                client_io->msg_io->metrics->request_header_bytes_sent += nwrote;
                 }
 
                 io->written = 0;
@@ -328,11 +345,11 @@ io_write (SoupClientMessageIOHTTP1 *client_io,
                 break;
 
         case SOUP_MESSAGE_IO_STATE_BODY_START:
-                io->body_ostream = soup_body_output_stream_new (io->ostream,
+                io->body_ostream = soup_body_output_stream_new (client_io->ostream,
                                                                 io->write_encoding,
                                                                 io->write_length);
                 io->write_state = SOUP_MESSAGE_IO_STATE_BODY;
-                logger = soup_session_get_feature_for_message (client_io->item->session,
+                logger = soup_session_get_feature_for_message (client_io->msg_io->item->session,
                                                                SOUP_TYPE_LOGGER, msg);
                 if (logger) {
                         soup_logger_request_body_setup (SOUP_LOGGER (logger), msg,
@@ -469,10 +486,10 @@ response_network_stream_read_data_cb (SoupMessage *msg,
 {
         SoupClientMessageIOHTTP1 *client_io = (SoupClientMessageIOHTTP1 *)soup_message_get_io_data (msg);
 
-        if (client_io->base.read_state < SOUP_MESSAGE_IO_STATE_BODY_START)
-                client_io->metrics->response_header_bytes_received += count;
+        if (client_io->msg_io->base.read_state < SOUP_MESSAGE_IO_STATE_BODY_START)
+                client_io->msg_io->metrics->response_header_bytes_received += count;
         else
-                client_io->metrics->response_body_bytes_received += count;
+                client_io->msg_io->metrics->response_body_bytes_received += count;
 }
 
 /* Attempts to push forward the reading side of @msg's I/O. Returns
@@ -487,8 +504,8 @@ io_read (SoupClientMessageIOHTTP1 *client_io,
          GCancellable             *cancellable,
          GError                  **error)
 {
-        SoupMessageIOData *io = &client_io->base;
-        SoupMessage *msg = client_io->item->msg;
+        SoupMessageIOData *io = &client_io->msg_io->base;
+        SoupMessage *msg = client_io->msg_io->item->msg;
         gboolean succeeded;
         gboolean is_first_read;
         gushort extra_bytes;
@@ -498,20 +515,21 @@ io_read (SoupClientMessageIOHTTP1 *client_io,
                 is_first_read = io->read_header_buf->len == 0 &&
                         soup_message_get_status (msg) == SOUP_STATUS_NONE;
 
-                succeeded = soup_message_io_data_read_headers (io, blocking, cancellable, &extra_bytes, 
error);
+                succeeded = soup_message_io_data_read_headers (io, SOUP_FILTER_INPUT_STREAM 
(client_io->istream),
+                                                               blocking, cancellable, &extra_bytes, error);
                 if (is_first_read && io->read_header_buf->len > 0)
                         soup_message_set_metrics_timestamp (msg, SOUP_MESSAGE_METRICS_RESPONSE_START);
                 if (!succeeded)
                         return FALSE;
 
-                if (client_io->metrics) {
+                if (client_io->msg_io->metrics) {
                         /* Adjust the header and body bytes received, since we might
                          * have read part of the body already that is queued by the stream.
                          */
-                        if (client_io->metrics->response_header_bytes_received > io->read_header_buf->len + 
extra_bytes) {
-                                client_io->metrics->response_body_bytes_received =
-                                        client_io->metrics->response_header_bytes_received - 
io->read_header_buf->len - extra_bytes;
-                                client_io->metrics->response_header_bytes_received -= 
client_io->metrics->response_body_bytes_received;
+                        if (client_io->msg_io->metrics->response_header_bytes_received > 
io->read_header_buf->len + extra_bytes) {
+                                client_io->msg_io->metrics->response_body_bytes_received =
+                                        client_io->msg_io->metrics->response_header_bytes_received - 
io->read_header_buf->len - extra_bytes;
+                                client_io->msg_io->metrics->response_header_bytes_received -= 
client_io->msg_io->metrics->response_body_bytes_received;
                         }
                 }
 
@@ -593,11 +611,11 @@ io_read (SoupClientMessageIOHTTP1 *client_io,
 
         case SOUP_MESSAGE_IO_STATE_BODY_START:
                 if (!io->body_istream) {
-                        GInputStream *body_istream = soup_body_input_stream_new (G_INPUT_STREAM 
(io->istream),
+                        GInputStream *body_istream = soup_body_input_stream_new (client_io->istream,
                                                                                  io->read_encoding,
                                                                                  io->read_length);
 
-                        io->body_istream = soup_session_setup_message_body_input_stream 
(client_io->item->session,
+                        io->body_istream = soup_session_setup_message_body_input_stream 
(client_io->msg_io->item->session,
                                                                                          msg, body_istream,
                                                                                          
SOUP_STAGE_MESSAGE_BODY);
                         g_object_unref (body_istream);
@@ -624,8 +642,8 @@ io_read (SoupClientMessageIOHTTP1 *client_io,
                 if (nread == 0)
                         io->read_state = SOUP_MESSAGE_IO_STATE_BODY_DONE;
 
-                if (client_io->metrics)
-                        client_io->metrics->response_body_size += nread;
+                if (client_io->msg_io->metrics)
+                        client_io->msg_io->metrics->response_body_size += nread;
 
                 break;
         }
@@ -653,14 +671,14 @@ request_is_restartable (SoupMessage *msg, GError *error)
         SoupClientMessageIOHTTP1 *client_io = (SoupClientMessageIOHTTP1 *)soup_message_get_io_data (msg);
         SoupMessageIOData *io;
 
-        if (!client_io)
+        if (!client_io || !client_io->msg_io)
                 return FALSE;
 
-        io = &client_io->base;
+        io = &client_io->msg_io->base;
 
         return (io->read_state <= SOUP_MESSAGE_IO_STATE_HEADERS &&
                 io->read_header_buf->len == 0 &&
-                soup_connection_get_ever_used (soup_message_get_connection (client_io->item->msg)) &&
+                soup_connection_get_ever_used (soup_message_get_connection (client_io->msg_io->item->msg)) &&
                 !g_error_matches (error, G_IO_ERROR, G_IO_ERROR_TIMED_OUT) &&
                 !g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK) &&
                 !g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED) &&
@@ -676,7 +694,7 @@ io_run_until (SoupClientMessageIOHTTP1 *client_io,
               GCancellable             *cancellable,
               GError                  **error)
 {
-        SoupMessageIOData *io = &client_io->base;
+        SoupMessageIOData *io = &client_io->msg_io->base;
         SoupMessage *msg;
         gboolean progress = TRUE, done;
         GError *my_error = NULL;
@@ -691,7 +709,7 @@ io_run_until (SoupClientMessageIOHTTP1 *client_io,
         }
 
         g_assert (client_io); // Silence clang static analysis
-        msg = client_io->item->msg;
+        msg = client_io->msg_io->item->msg;
         g_object_ref (msg);
 
         while (progress && (SoupClientMessageIOHTTP1 *)soup_message_get_io_data (msg) == client_io &&
@@ -746,8 +764,8 @@ io_run_until (SoupClientMessageIOHTTP1 *client_io,
 
                 /* FIXME: Expand and generalise sysprof support:
                  * https://gitlab.gnome.org/GNOME/sysprof/-/issues/43 */
-                sysprof_collector_mark_printf (client_io->begin_time_nsec,
-                                               SYSPROF_CAPTURE_CURRENT_TIME - client_io->begin_time_nsec,
+                sysprof_collector_mark_printf (client_io->msg_io->begin_time_nsec,
+                                               SYSPROF_CAPTURE_CURRENT_TIME - 
client_io->msg_io->begin_time_nsec,
                                                "libsoup", "message",
                                                "%s request/response to %s: "
                                                "read %" G_GOFFSET_FORMAT "B, "
@@ -778,7 +796,7 @@ soup_message_io_finish (SoupMessage  *msg,
                 SoupClientMessageIOHTTP1 *io = (SoupClientMessageIOHTTP1 *)soup_message_get_io_data (msg);
 
                 /* Connection got closed, but we can safely try again. */
-                io->item->state = SOUP_MESSAGE_RESTARTING;
+                io->msg_io->item->state = SOUP_MESSAGE_RESTARTING;
         } else if (error) {
                 soup_message_set_metrics_timestamp (msg, SOUP_MESSAGE_METRICS_RESPONSE_END);
         }
@@ -801,7 +819,7 @@ soup_client_message_io_http1_run (SoupClientMessageIO *iface,
                                   gboolean             blocking)
 {
         SoupClientMessageIOHTTP1 *client_io = (SoupClientMessageIOHTTP1 *)iface;
-        SoupMessageIOData *io = &client_io->base;
+        SoupMessageIOData *io = &client_io->msg_io->base;
         GError *error = NULL;
 
         if (io->io_source) {
@@ -815,12 +833,14 @@ soup_client_message_io_http1_run (SoupClientMessageIO *iface,
         if (io_run_until (client_io, blocking,
                           SOUP_MESSAGE_IO_STATE_DONE,
                           SOUP_MESSAGE_IO_STATE_DONE,
-                          client_io->item->cancellable, &error)) {
+                          client_io->msg_io->item->cancellable, &error)) {
                 soup_message_io_finished (msg);
         } else if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
                 g_clear_error (&error);
                 io->io_source = soup_message_io_data_get_source (io, G_OBJECT (msg),
-                                                                 client_io->item->cancellable,
+                                                                 client_io->istream,
+                                                                 client_io->ostream,
+                                                                 client_io->msg_io->item->cancellable,
                                                                  (SoupMessageIOSourceFunc)io_run_ready,
                                                                  NULL);
                 g_source_set_priority (io->io_source,
@@ -872,8 +892,8 @@ static void
 io_run_until_read_async (SoupClientMessageIOHTTP1 *client_io,
                          GTask                    *task)
 {
-        SoupMessageIOData *io = &client_io->base;
-        SoupMessage *msg = client_io->item->msg;
+        SoupMessageIOData *io = &client_io->msg_io->base;
+        SoupMessage *msg = client_io->msg_io->item->msg;
         GError *error = NULL;
 
         if (io->io_source) {
@@ -894,7 +914,10 @@ io_run_until_read_async (SoupClientMessageIOHTTP1 *client_io,
 
         if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
                 g_error_free (error);
-                io->io_source = soup_message_io_data_get_source (io, G_OBJECT (msg), g_task_get_cancellable 
(task),
+                io->io_source = soup_message_io_data_get_source (io, G_OBJECT (msg),
+                                                                 client_io->istream,
+                                                                 client_io->ostream,
+                                                                 g_task_get_cancellable (task),
                                                                  
(SoupMessageIOSourceFunc)io_run_until_read_ready,
                                                                  task);
                 g_source_set_priority (io->io_source, g_task_get_priority (task));
@@ -937,9 +960,9 @@ soup_client_message_io_http1_run_until_finish (SoupClientMessageIO *iface,
 
         g_object_ref (msg);
 
-        if (io) {
-                if (io->base.read_state < SOUP_MESSAGE_IO_STATE_BODY_DONE)
-                        io->base.read_state = SOUP_MESSAGE_IO_STATE_FINISHING;
+        if (io && io->msg_io) {
+                if (io->msg_io->base.read_state < SOUP_MESSAGE_IO_STATE_BODY_DONE)
+                        io->msg_io->base.read_state = SOUP_MESSAGE_IO_STATE_FINISHING;
         }
 
         success = io_run_until (io, blocking,
@@ -955,8 +978,8 @@ static void
 client_stream_eof (SoupClientInputStream    *stream,
                    SoupClientMessageIOHTTP1 *io)
 {
-        if (io && io->base.read_state == SOUP_MESSAGE_IO_STATE_BODY)
-                io->base.read_state = SOUP_MESSAGE_IO_STATE_BODY_DONE;
+        if (io && io->msg_io && io->msg_io->base.read_state == SOUP_MESSAGE_IO_STATE_BODY)
+                io->msg_io->base.read_state = SOUP_MESSAGE_IO_STATE_BODY_DONE;
 }
 
 static GInputStream *
@@ -967,7 +990,9 @@ soup_client_message_io_http1_get_response_stream (SoupClientMessageIO *iface,
         SoupClientMessageIOHTTP1 *io = (SoupClientMessageIOHTTP1 *)iface;
         GInputStream *client_stream;
 
-        client_stream = soup_client_input_stream_new (io->base.body_istream, msg);
+        g_assert (io->msg_io && io->msg_io->item->msg == msg);
+
+        client_stream = soup_client_input_stream_new (io->msg_io->base.body_istream, msg);
         g_signal_connect (client_stream, "eof",
                           G_CALLBACK (client_stream_eof), io);
 
@@ -981,21 +1006,30 @@ soup_client_message_io_http1_send_item (SoupClientMessageIO       *iface,
                                         gpointer                   user_data)
 {
         SoupClientMessageIOHTTP1 *io = (SoupClientMessageIOHTTP1 *)iface;
+        SoupMessageIOHTTP1 *msg_io;
 
-        io->item = soup_message_queue_item_ref (item);
-        io->base.completion_cb = completion_cb;
-        io->base.completion_data = user_data;
+        msg_io = g_new0 (SoupMessageIOHTTP1, 1);
+        msg_io->item = soup_message_queue_item_ref (item);
+        msg_io->base.completion_cb = completion_cb;
+        msg_io->base.completion_data = user_data;
 
-        io->metrics = soup_message_get_metrics (io->item->msg);
-        if (io->metrics) {
-                g_signal_connect_object (io->base.istream, "read-data",
+        msg_io->base.read_header_buf = g_byte_array_new ();
+        msg_io->base.write_buf = g_string_new (NULL);
+
+        msg_io->base.read_state = SOUP_MESSAGE_IO_STATE_NOT_STARTED;
+        msg_io->base.write_state = SOUP_MESSAGE_IO_STATE_HEADERS;
+        msg_io->metrics = soup_message_get_metrics (msg_io->item->msg);
+        if (msg_io->metrics) {
+                g_signal_connect_object (io->istream, "read-data",
                                          G_CALLBACK (response_network_stream_read_data_cb),
-                                         io->item->msg, G_CONNECT_SWAPPED);
+                                         msg_io->item->msg, G_CONNECT_SWAPPED);
         }
 
 #ifdef HAVE_SYSPROF
-        io->begin_time_nsec = SYSPROF_CAPTURE_CURRENT_TIME;
+        msg_io->begin_time_nsec = SYSPROF_CAPTURE_CURRENT_TIME;
 #endif
+
+        io->msg_io = msg_io;
 }
 
 static void
@@ -1004,9 +1038,10 @@ soup_client_message_io_http1_pause (SoupClientMessageIO *iface,
 {
         SoupClientMessageIOHTTP1 *io = (SoupClientMessageIOHTTP1 *)iface;
 
-        g_return_if_fail (io->base.read_state < SOUP_MESSAGE_IO_STATE_BODY);
+        g_assert (io->msg_io && io->msg_io->item->msg == msg);
+        g_assert (io->msg_io->base.read_state < SOUP_MESSAGE_IO_STATE_BODY);
 
-        soup_message_io_data_pause (&io->base);
+        soup_message_io_data_pause (&io->msg_io->base);
 }
 
 static void
@@ -1015,8 +1050,10 @@ soup_client_message_io_http1_unpause (SoupClientMessageIO *iface,
 {
         SoupClientMessageIOHTTP1 *io = (SoupClientMessageIOHTTP1 *)iface;
 
-        g_return_if_fail (io->base.read_state < SOUP_MESSAGE_IO_STATE_BODY);
-        io->base.paused = FALSE;
+        g_assert (io->msg_io && io->msg_io->item->msg == msg);
+        g_assert (io->msg_io->base.read_state < SOUP_MESSAGE_IO_STATE_BODY);
+
+        io->msg_io->base.paused = FALSE;
 }
 
 static gboolean
@@ -1025,7 +1062,9 @@ soup_client_message_io_http1_is_paused (SoupClientMessageIO *iface,
 {
         SoupClientMessageIOHTTP1 *io = (SoupClientMessageIOHTTP1 *)iface;
 
-        return io->base.paused;
+        g_assert (io->msg_io && io->msg_io->item->msg == msg);
+
+        return io->msg_io->base.paused;
 }
 
 static const SoupClientMessageIOFuncs io_funcs = {
@@ -1049,15 +1088,9 @@ soup_client_message_io_http1_new (GIOStream *stream)
         SoupClientMessageIOHTTP1 *io;
 
         io = g_slice_new0 (SoupClientMessageIOHTTP1);
-        io->base.iostream = g_object_ref (stream);
-        io->base.istream = SOUP_FILTER_INPUT_STREAM (g_io_stream_get_input_stream (io->base.iostream));
-        io->base.ostream = g_io_stream_get_output_stream (io->base.iostream);
-
-        io->base.read_header_buf = g_byte_array_new ();
-        io->base.write_buf = g_string_new (NULL);
-
-        io->base.read_state = SOUP_MESSAGE_IO_STATE_NOT_STARTED;
-        io->base.write_state = SOUP_MESSAGE_IO_STATE_HEADERS;
+        io->iostream = g_object_ref (stream);
+        io->istream = g_io_stream_get_input_stream (io->iostream);
+        io->ostream = g_io_stream_get_output_stream (io->iostream);
 
         io->iface.funcs = &io_funcs;
 
diff --git a/libsoup/soup-connection.c b/libsoup/soup-connection.c
index c5283b6d..3b65cc37 100644
--- a/libsoup/soup-connection.c
+++ b/libsoup/soup-connection.c
@@ -410,6 +410,8 @@ soup_connection_set_connection (SoupConnection *conn,
 {
        SoupConnectionPrivate *priv = soup_connection_get_instance_private (conn);
 
+        g_clear_pointer (&priv->io_data, soup_client_message_io_destroy);
+
        g_clear_object (&priv->connection);
        priv->connection = connection;
        g_clear_object (&priv->iostream);
@@ -564,6 +566,9 @@ soup_connection_complete (SoupConnection *conn)
                                        NULL);
         }
 
+        g_assert (!priv->io_data);
+        priv->io_data = soup_client_message_io_http1_new (priv->iostream);
+
         soup_connection_set_state (conn, SOUP_CONNECTION_IN_USE);
         priv->unused_timeout = time (NULL) + SOUP_CONNECTION_UNUSED_TIMEOUT;
         start_idle_timer (conn);
@@ -744,6 +749,10 @@ tunnel_handshake_ready_cb (GTlsConnection *tls_connection,
         if (g_tls_connection_handshake_finish (tls_connection, result, &error)) {
                 soup_connection_event (conn, G_SOCKET_CLIENT_TLS_HANDSHAKED, NULL);
                 soup_connection_event (conn, G_SOCKET_CLIENT_COMPLETE, NULL);
+
+                g_assert (!priv->io_data);
+                priv->io_data = soup_client_message_io_http1_new (priv->iostream);
+
                 g_task_return_boolean (task, TRUE);
         } else {
                 g_task_return_error (task, error);
@@ -832,6 +841,9 @@ soup_connection_tunnel_handshake (SoupConnection *conn,
         soup_connection_event (conn, G_SOCKET_CLIENT_TLS_HANDSHAKED, NULL);
         soup_connection_event (conn, G_SOCKET_CLIENT_COMPLETE, NULL);
 
+        g_assert (!priv->io_data);
+        priv->io_data = soup_client_message_io_http1_new (priv->iostream);
+
         return TRUE;
 }
 
@@ -1058,22 +1070,9 @@ soup_connection_setup_message_io (SoupConnection *conn,
         else
                 priv->reusable = FALSE;
 
-        g_assert (priv->io_data == NULL);
-        priv->io_data = soup_client_message_io_http1_new (priv->iostream);
-
         return priv->io_data;
 }
 
-void
-soup_connection_message_io_finished (SoupConnection *conn,
-                                     SoupMessage    *msg)
-{
-        SoupConnectionPrivate *priv = soup_connection_get_instance_private (conn);
-
-        g_assert (priv->current_msg == msg);
-        g_clear_pointer (&priv->io_data, soup_client_message_io_destroy);
-}
-
 GTlsCertificate *
 soup_connection_get_tls_certificate (SoupConnection *conn)
 {
diff --git a/libsoup/soup-connection.h b/libsoup/soup-connection.h
index e0c5ab18..991d001d 100644
--- a/libsoup/soup-connection.h
+++ b/libsoup/soup-connection.h
@@ -66,8 +66,6 @@ gboolean        soup_connection_get_ever_used  (SoupConnection   *conn);
 
 SoupClientMessageIO *soup_connection_setup_message_io    (SoupConnection *conn,
                                                           SoupMessage    *msg);
-void                 soup_connection_message_io_finished (SoupConnection *conn,
-                                                          SoupMessage    *msg);
 
 GTlsCertificate     *soup_connection_get_tls_certificate        (SoupConnection *conn);
 GTlsCertificateFlags soup_connection_get_tls_certificate_errors (SoupConnection *conn);
diff --git a/libsoup/soup-message-io-data.c b/libsoup/soup-message-io-data.c
index bce43145..49f2e591 100644
--- a/libsoup/soup-message-io-data.c
+++ b/libsoup/soup-message-io-data.c
@@ -28,8 +28,6 @@ soup_message_io_data_cleanup (SoupMessageIOData *io)
                io->io_source = NULL;
        }
 
-       if (io->iostream)
-               g_object_unref (io->iostream);
        if (io->body_istream)
                g_object_unref (io->body_istream);
        if (io->body_ostream)
@@ -47,11 +45,12 @@ soup_message_io_data_cleanup (SoupMessageIOData *io)
 }
 
 gboolean
-soup_message_io_data_read_headers (SoupMessageIOData *io,
-                                  gboolean           blocking,
-                                  GCancellable      *cancellable,
-                                   gushort           *extra_bytes,
-                                  GError           **error)
+soup_message_io_data_read_headers (SoupMessageIOData     *io,
+                                   SoupFilterInputStream *istream,
+                                   gboolean               blocking,
+                                   GCancellable          *cancellable,
+                                   gushort               *extra_bytes,
+                                   GError               **error)
 {
        gssize nread, old_len;
        gboolean got_lf;
@@ -59,7 +58,7 @@ soup_message_io_data_read_headers (SoupMessageIOData *io,
        while (1) {
                old_len = io->read_header_buf->len;
                g_byte_array_set_size (io->read_header_buf, old_len + RESPONSE_BLOCK_SIZE);
-               nread = soup_filter_input_stream_read_line (io->istream,
+               nread = soup_filter_input_stream_read_line (istream,
                                                            io->read_header_buf->data + old_len,
                                                            RESPONSE_BLOCK_SIZE,
                                                            blocking,
@@ -138,8 +137,10 @@ message_io_source_check (GSource *source)
 }
 
 GSource *
-soup_message_io_data_get_source (SoupMessageIOData     *io,
+soup_message_io_data_get_source (SoupMessageIOData      *io,
                                 GObject                *msg,
+                                 GInputStream           *istream,
+                                 GOutputStream          *ostream,
                                 GCancellable           *cancellable,
                                 SoupMessageIOSourceFunc callback,
                                 gpointer                user_data)
@@ -153,21 +154,25 @@ soup_message_io_data_get_source (SoupMessageIOData     *io,
        } else if (io->async_wait) {
                base_source = g_cancellable_source_new (io->async_wait);
        } else if (SOUP_MESSAGE_IO_STATE_POLLABLE (io->read_state)) {
-               GPollableInputStream *istream;
+               GPollableInputStream *stream;
 
                if (io->body_istream)
-                       istream = G_POLLABLE_INPUT_STREAM (io->body_istream);
-               else
-                       istream = G_POLLABLE_INPUT_STREAM (io->istream);
-               base_source = g_pollable_input_stream_create_source (istream, cancellable);
+                       stream = G_POLLABLE_INPUT_STREAM (io->body_istream);
+                else if (istream)
+                        stream = G_POLLABLE_INPUT_STREAM (istream);
+                else
+                        g_assert_not_reached ();
+               base_source = g_pollable_input_stream_create_source (stream, cancellable);
        } else if (SOUP_MESSAGE_IO_STATE_POLLABLE (io->write_state)) {
-               GPollableOutputStream *ostream;
+               GPollableOutputStream *stream;
 
                if (io->body_ostream)
-                       ostream = G_POLLABLE_OUTPUT_STREAM (io->body_ostream);
-               else
-                       ostream = G_POLLABLE_OUTPUT_STREAM (io->ostream);
-               base_source = g_pollable_output_stream_create_source (ostream, cancellable);
+                       stream = G_POLLABLE_OUTPUT_STREAM (io->body_ostream);
+                else if (ostream)
+                        stream = G_POLLABLE_OUTPUT_STREAM (ostream);
+                else
+                        g_assert_not_reached ();
+               base_source = g_pollable_output_stream_create_source (stream, cancellable);
        } else
                base_source = g_timeout_source_new (0);
 
diff --git a/libsoup/soup-message-io-data.h b/libsoup/soup-message-io-data.h
index 80722239..5cf255c8 100644
--- a/libsoup/soup-message-io-data.h
+++ b/libsoup/soup-message-io-data.h
@@ -34,11 +34,8 @@ typedef enum {
         state != SOUP_MESSAGE_IO_STATE_BODY_DONE)
 
 typedef struct {
-       GIOStream              *iostream;
-       SoupFilterInputStream  *istream;
-       GInputStream           *body_istream;
-       GOutputStream          *ostream;
-       GOutputStream          *body_ostream;
+       GInputStream         *body_istream;
+       GOutputStream        *body_ostream;
 
        SoupMessageIOState    read_state;
        SoupEncoding          read_encoding;
@@ -67,16 +64,19 @@ typedef struct {
 #endif
 } SoupMessageIOData;
 
-void     soup_message_io_data_cleanup      (SoupMessageIOData *io);
+void     soup_message_io_data_cleanup      (SoupMessageIOData      *io);
 
-gboolean soup_message_io_data_read_headers (SoupMessageIOData *io,
-                                           gboolean           blocking,
-                                           GCancellable      *cancellable,
-                                            gushort           *extra_bytes,
-                                           GError           **error);
+gboolean soup_message_io_data_read_headers (SoupMessageIOData      *io,
+                                            SoupFilterInputStream  *istream,
+                                            gboolean                blocking,
+                                            GCancellable           *cancellable,
+                                            gushort                *extra_bytes,
+                                            GError                **error);
 
 GSource *soup_message_io_data_get_source   (SoupMessageIOData      *io,
                                            GObject                *msg,
+                                            GInputStream           *istream,
+                                            GOutputStream          *ostream,
                                            GCancellable           *cancellable,
                                            SoupMessageIOSourceFunc callback,
                                            gpointer                user_data);


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]