[libsoup/carlosgc/message-io: 1/2] io-http1: split SoupClientMessageIOHTTP1 in two
- From: Carlos Garcia Campos <carlosgc src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [libsoup/carlosgc/message-io: 1/2] io-http1: split SoupClientMessageIOHTTP1 in two
- Date: Sat, 15 May 2021 15:00:25 +0000 (UTC)
commit add8d375ae6f8ec3016a554dd8a55615c6e06a45
Author: Carlos Garcia Campos <cgarcia igalia com>
Date: Sat May 15 16:22:59 2021 +0200
io-http1: split SoupClientMessageIOHTTP1 in two
Move the message io to SoupMessageIOHTTP1 struct. This is how http2 IO
will work, but in this case there's only one message io. This allows to
create the SoupClientMessageIO earlier in SoupConnection and we will
move http1 specific code to the interface implementation.
libsoup/server/soup-server-io.c | 29 +++--
libsoup/soup-client-input-stream.c | 4 +
libsoup/soup-client-message-io-http1.c | 223 +++++++++++++++++++--------------
libsoup/soup-connection.c | 25 ++--
libsoup/soup-connection.h | 2 -
libsoup/soup-message-io-data.c | 43 ++++---
libsoup/soup-message-io-data.h | 22 ++--
7 files changed, 199 insertions(+), 149 deletions(-)
---
diff --git a/libsoup/server/soup-server-io.c b/libsoup/server/soup-server-io.c
index a806d2f3..cc1cfc3b 100644
--- a/libsoup/server/soup-server-io.c
+++ b/libsoup/server/soup-server-io.c
@@ -22,6 +22,10 @@
struct _SoupServerMessageIOData {
SoupMessageIOData base;
+ GIOStream *iostream;
+ GInputStream *istream;
+ GOutputStream *ostream;
+
GBytes *write_chunk;
goffset write_body_offset;
@@ -39,6 +43,8 @@ soup_server_message_io_data_free (SoupServerMessageIOData *io)
if (!io)
return;
+ g_clear_object (&io->iostream);
+
soup_message_io_data_cleanup (&io->base);
if (io->unpause_source) {
@@ -90,10 +96,10 @@ soup_server_message_io_steal (SoupServerMessage *msg)
GIOStream *iostream;
io = soup_server_message_get_io_data (msg);
- if (!io || !io->base.iostream)
+ if (!io || !io->iostream)
return NULL;
- iostream = g_object_ref (io->base.iostream);
+ iostream = g_object_ref (io->iostream);
completion_cb = io->base.completion_cb;
completion_data = io->base.completion_data;
@@ -363,7 +369,7 @@ io_write (SoupServerMessage *msg,
write_headers (msg, io->write_buf, &io->write_encoding);
while (io->written < io->write_buf->len) {
- nwrote = g_pollable_stream_write (io->ostream,
+ nwrote = g_pollable_stream_write (server_io->ostream,
io->write_buf->str + io->written,
io->write_buf->len - io->written,
FALSE,
@@ -420,7 +426,7 @@ io_write (SoupServerMessage *msg,
break;
case SOUP_MESSAGE_IO_STATE_BODY_START:
- io->body_ostream = soup_body_output_stream_new (io->ostream,
+ io->body_ostream = soup_body_output_stream_new (server_io->ostream,
io->write_encoding,
io->write_length);
io->write_state = SOUP_MESSAGE_IO_STATE_BODY;
@@ -661,7 +667,7 @@ io_read (SoupServerMessage *msg,
switch (io->read_state) {
case SOUP_MESSAGE_IO_STATE_HEADERS:
- if (!soup_message_io_data_read_headers (io, FALSE, NULL, NULL, error)) {
+ if (!soup_message_io_data_read_headers (io, SOUP_FILTER_INPUT_STREAM (server_io->istream),
FALSE, NULL, NULL, error)) {
if (g_error_matches (*error, G_IO_ERROR, G_IO_ERROR_PARTIAL_INPUT))
soup_server_message_set_status (msg, SOUP_STATUS_BAD_REQUEST, NULL);
return FALSE;
@@ -711,7 +717,7 @@ io_read (SoupServerMessage *msg,
case SOUP_MESSAGE_IO_STATE_BODY_START:
if (!io->body_istream) {
- io->body_istream = soup_body_input_stream_new (G_INPUT_STREAM (io->istream),
+ io->body_istream = soup_body_input_stream_new (server_io->istream,
io->read_encoding,
io->read_length);
@@ -850,7 +856,10 @@ io_run (SoupServerMessage *msg)
soup_server_message_io_finished (msg);
} else if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
g_clear_error (&error);
- io->io_source = soup_message_io_data_get_source (io, G_OBJECT (msg), NULL,
+ io->io_source = soup_message_io_data_get_source (io, G_OBJECT (msg),
+ server_io->istream,
+ server_io->ostream,
+ NULL,
(SoupMessageIOSourceFunc)io_run_ready,
NULL);
g_source_attach (io->io_source, server_io->async_context);
@@ -875,9 +884,9 @@ soup_server_message_read_request (SoupServerMessage *msg,
io->base.completion_data = user_data;
sock = soup_server_message_get_soup_socket (msg);
- io->base.iostream = g_object_ref (soup_socket_get_iostream (sock));
- io->base.istream = SOUP_FILTER_INPUT_STREAM (g_io_stream_get_input_stream (io->base.iostream));
- io->base.ostream = g_io_stream_get_output_stream (io->base.iostream);
+ io->iostream = g_object_ref (soup_socket_get_iostream (sock));
+ io->istream = g_io_stream_get_input_stream (io->iostream);
+ io->ostream = g_io_stream_get_output_stream (io->iostream);
io->base.read_header_buf = g_byte_array_new ();
io->base.write_buf = g_string_new (NULL);
diff --git a/libsoup/soup-client-input-stream.c b/libsoup/soup-client-input-stream.c
index e32b90f8..098cda5e 100644
--- a/libsoup/soup-client-input-stream.c
+++ b/libsoup/soup-client-input-stream.c
@@ -241,8 +241,12 @@ soup_client_input_stream_close_async (GInputStream *stream,
g_task_set_priority (task, priority);
if (close_async_ready (priv->msg, task) == G_SOURCE_CONTINUE) {
+ /* When SoupClientInputStream is created we always have a body input stream,
+ * and we finished writing, so it's safe to pass NULL for the streams
+ */
source = soup_message_io_data_get_source ((SoupMessageIOData *)soup_message_get_io_data
(priv->msg),
G_OBJECT (priv->msg),
+ NULL, NULL,
cancellable, NULL, NULL);
g_task_attach_source (task, source, (GSourceFunc) close_async_ready);
diff --git a/libsoup/soup-client-message-io-http1.c b/libsoup/soup-client-message-io-http1.c
index 32dbf7bb..b8f8724e 100644
--- a/libsoup/soup-client-message-io-http1.c
+++ b/libsoup/soup-client-message-io-http1.c
@@ -31,7 +31,6 @@
#include "soup-uri-utils-private.h"
typedef struct {
- SoupClientMessageIO iface;
SoupMessageIOData base;
SoupMessageQueueItem *item;
@@ -41,18 +40,36 @@ typedef struct {
#ifdef HAVE_SYSPROF
gint64 begin_time_nsec;
#endif
+} SoupMessageIOHTTP1;
+
+typedef struct {
+ SoupClientMessageIO iface;
+
+ GIOStream *iostream;
+ GInputStream *istream;
+ GOutputStream *ostream;
+
+ SoupMessageIOHTTP1 *msg_io;
} SoupClientMessageIOHTTP1;
#define RESPONSE_BLOCK_SIZE 8192
#define HEADER_SIZE_LIMIT (64 * 1024)
+static void
+soup_message_io_http1_free (SoupMessageIOHTTP1 *msg_io)
+{
+ soup_message_io_data_cleanup (&msg_io->base);
+ soup_message_queue_item_unref (msg_io->item);
+ g_free (msg_io);
+}
+
static void
soup_client_message_io_http1_destroy (SoupClientMessageIO *iface)
{
SoupClientMessageIOHTTP1 *io = (SoupClientMessageIOHTTP1 *)iface;
- soup_message_io_data_cleanup (&io->base);
- soup_message_queue_item_unref (io->item);
+ g_clear_object (&io->iostream);
+ g_clear_pointer (&io->msg_io, soup_message_io_http1_free);
g_slice_free (SoupClientMessageIOHTTP1, io);
}
@@ -60,10 +77,10 @@ soup_client_message_io_http1_destroy (SoupClientMessageIO *iface)
static int
soup_client_message_io_http1_get_priority (SoupClientMessageIOHTTP1 *io)
{
- if (!io->item->task)
+ if (!io->msg_io->item->task)
return G_PRIORITY_DEFAULT;
- return g_task_get_priority (io->item->task);
+ return g_task_get_priority (io->msg_io->item->task);
}
static void
@@ -74,15 +91,15 @@ soup_client_message_io_complete (SoupClientMessageIOHTTP1 *io,
SoupMessageIOCompletionFn completion_cb;
gpointer completion_data;
- completion_cb = io->base.completion_cb;
- completion_data = io->base.completion_data;
+ completion_cb = io->msg_io->base.completion_cb;
+ completion_data = io->msg_io->base.completion_data;
- msg = g_object_ref (msg);
- if (io->base.istream)
- g_signal_handlers_disconnect_by_data (io->base.istream, msg);
- if (io->base.body_ostream)
- g_signal_handlers_disconnect_by_data (io->base.body_ostream, msg);
- soup_connection_message_io_finished (soup_message_get_connection (msg), msg);
+ g_object_ref (msg);
+ if (io->istream)
+ g_signal_handlers_disconnect_by_data (io->istream, msg);
+ if (io->msg_io->base.body_ostream)
+ g_signal_handlers_disconnect_by_data (io->msg_io->base.body_ostream, msg);
+ g_clear_pointer (&io->msg_io, soup_message_io_http1_free);
if (completion_cb)
completion_cb (G_OBJECT (msg), completion, completion_data);
g_object_unref (msg);
@@ -95,8 +112,8 @@ soup_client_message_io_http1_finished (SoupClientMessageIO *iface,
SoupClientMessageIOHTTP1 *io = (SoupClientMessageIOHTTP1 *)iface;
SoupMessageIOCompletion completion;
- if ((io->base.read_state >= SOUP_MESSAGE_IO_STATE_FINISHING &&
- io->base.write_state >= SOUP_MESSAGE_IO_STATE_FINISHING))
+ if ((io->msg_io->base.read_state >= SOUP_MESSAGE_IO_STATE_FINISHING &&
+ io->msg_io->base.write_state >= SOUP_MESSAGE_IO_STATE_FINISHING))
completion = SOUP_MESSAGE_IO_COMPLETE;
else
completion = SOUP_MESSAGE_IO_INTERRUPTED;
@@ -109,7 +126,7 @@ soup_client_message_io_http1_stolen (SoupClientMessageIO *iface)
{
SoupClientMessageIOHTTP1 *io = (SoupClientMessageIOHTTP1 *)iface;
- soup_client_message_io_complete (io, io->item->msg, SOUP_MESSAGE_IO_STOLEN);
+ soup_client_message_io_complete (io, io->msg_io->item->msg, SOUP_MESSAGE_IO_STOLEN);
}
static void
@@ -120,10 +137,10 @@ request_body_stream_wrote_data_cb (SoupMessage *msg,
{
SoupClientMessageIOHTTP1 *client_io = (SoupClientMessageIOHTTP1 *)soup_message_get_io_data (msg);
- if (client_io->metrics) {
- client_io->metrics->request_body_bytes_sent += count;
+ if (client_io->msg_io->metrics) {
+ client_io->msg_io->metrics->request_body_bytes_sent += count;
if (!is_metadata)
- client_io->metrics->request_body_size += count;
+ client_io->msg_io->metrics->request_body_size += count;
}
if (!is_metadata)
@@ -143,19 +160,19 @@ request_body_stream_wrote_cb (GOutputStream *ostream,
nwrote = g_output_stream_splice_finish (ostream, result, &error);
io = (SoupClientMessageIOHTTP1 *)soup_message_get_io_data (msg);
- if (!io || !io->base.async_wait || io->base.body_ostream != ostream) {
+ if (!io || !io->msg_io || !io->msg_io->base.async_wait || io->msg_io->base.body_ostream != ostream) {
g_clear_error (&error);
g_object_unref (msg);
return;
}
if (nwrote != -1)
- io->base.write_state = SOUP_MESSAGE_IO_STATE_BODY_FLUSH;
+ io->msg_io->base.write_state = SOUP_MESSAGE_IO_STATE_BODY_FLUSH;
if (error)
- g_propagate_error (&io->base.async_error, error);
- async_wait = io->base.async_wait;
- io->base.async_wait = NULL;
+ g_propagate_error (&io->msg_io->base.async_error, error);
+ async_wait = io->msg_io->base.async_wait;
+ io->msg_io->base.async_wait = NULL;
g_cancellable_cancel (async_wait);
g_object_unref (async_wait);
@@ -173,16 +190,16 @@ closed_async (GObject *source,
GCancellable *async_wait;
io = (SoupClientMessageIOHTTP1 *)soup_message_get_io_data (msg);
- if (!io || !io->base.async_wait || io->base.body_ostream != body_ostream) {
+ if (!io || !io->msg_io || !io->msg_io->base.async_wait || io->msg_io->base.body_ostream !=
body_ostream) {
g_object_unref (msg);
return;
}
- g_output_stream_close_finish (body_ostream, result, &io->base.async_error);
- g_clear_object (&io->base.body_ostream);
+ g_output_stream_close_finish (body_ostream, result, &io->msg_io->base.async_error);
+ g_clear_object (&io->msg_io->base.body_ostream);
- async_wait = io->base.async_wait;
- io->base.async_wait = NULL;
+ async_wait = io->msg_io->base.async_wait;
+ io->msg_io->base.async_wait = NULL;
g_cancellable_cancel (async_wait);
g_object_unref (async_wait);
@@ -277,8 +294,8 @@ io_write (SoupClientMessageIOHTTP1 *client_io,
GCancellable *cancellable,
GError **error)
{
- SoupMessageIOData *io = &client_io->base;
- SoupMessage *msg = client_io->item->msg;
+ SoupMessageIOData *io = &client_io->msg_io->base;
+ SoupMessage *msg = client_io->msg_io->item->msg;
SoupSessionFeature *logger;
gssize nwrote;
@@ -299,7 +316,7 @@ io_write (SoupClientMessageIOHTTP1 *client_io,
write_headers (msg, io->write_buf, &io->write_encoding);
while (io->written < io->write_buf->len) {
- nwrote = g_pollable_stream_write (io->ostream,
+ nwrote = g_pollable_stream_write (client_io->ostream,
io->write_buf->str + io->written,
io->write_buf->len - io->written,
blocking,
@@ -307,8 +324,8 @@ io_write (SoupClientMessageIOHTTP1 *client_io,
if (nwrote == -1)
return FALSE;
io->written += nwrote;
- if (client_io->metrics)
- client_io->metrics->request_header_bytes_sent += nwrote;
+ if (client_io->msg_io->metrics)
+ client_io->msg_io->metrics->request_header_bytes_sent += nwrote;
}
io->written = 0;
@@ -328,11 +345,11 @@ io_write (SoupClientMessageIOHTTP1 *client_io,
break;
case SOUP_MESSAGE_IO_STATE_BODY_START:
- io->body_ostream = soup_body_output_stream_new (io->ostream,
+ io->body_ostream = soup_body_output_stream_new (client_io->ostream,
io->write_encoding,
io->write_length);
io->write_state = SOUP_MESSAGE_IO_STATE_BODY;
- logger = soup_session_get_feature_for_message (client_io->item->session,
+ logger = soup_session_get_feature_for_message (client_io->msg_io->item->session,
SOUP_TYPE_LOGGER, msg);
if (logger) {
soup_logger_request_body_setup (SOUP_LOGGER (logger), msg,
@@ -469,10 +486,10 @@ response_network_stream_read_data_cb (SoupMessage *msg,
{
SoupClientMessageIOHTTP1 *client_io = (SoupClientMessageIOHTTP1 *)soup_message_get_io_data (msg);
- if (client_io->base.read_state < SOUP_MESSAGE_IO_STATE_BODY_START)
- client_io->metrics->response_header_bytes_received += count;
+ if (client_io->msg_io->base.read_state < SOUP_MESSAGE_IO_STATE_BODY_START)
+ client_io->msg_io->metrics->response_header_bytes_received += count;
else
- client_io->metrics->response_body_bytes_received += count;
+ client_io->msg_io->metrics->response_body_bytes_received += count;
}
/* Attempts to push forward the reading side of @msg's I/O. Returns
@@ -487,8 +504,8 @@ io_read (SoupClientMessageIOHTTP1 *client_io,
GCancellable *cancellable,
GError **error)
{
- SoupMessageIOData *io = &client_io->base;
- SoupMessage *msg = client_io->item->msg;
+ SoupMessageIOData *io = &client_io->msg_io->base;
+ SoupMessage *msg = client_io->msg_io->item->msg;
gboolean succeeded;
gboolean is_first_read;
gushort extra_bytes;
@@ -498,20 +515,21 @@ io_read (SoupClientMessageIOHTTP1 *client_io,
is_first_read = io->read_header_buf->len == 0 &&
soup_message_get_status (msg) == SOUP_STATUS_NONE;
- succeeded = soup_message_io_data_read_headers (io, blocking, cancellable, &extra_bytes,
error);
+ succeeded = soup_message_io_data_read_headers (io, SOUP_FILTER_INPUT_STREAM
(client_io->istream),
+ blocking, cancellable, &extra_bytes, error);
if (is_first_read && io->read_header_buf->len > 0)
soup_message_set_metrics_timestamp (msg, SOUP_MESSAGE_METRICS_RESPONSE_START);
if (!succeeded)
return FALSE;
- if (client_io->metrics) {
+ if (client_io->msg_io->metrics) {
/* Adjust the header and body bytes received, since we might
* have read part of the body already that is queued by the stream.
*/
- if (client_io->metrics->response_header_bytes_received > io->read_header_buf->len +
extra_bytes) {
- client_io->metrics->response_body_bytes_received =
- client_io->metrics->response_header_bytes_received -
io->read_header_buf->len - extra_bytes;
- client_io->metrics->response_header_bytes_received -=
client_io->metrics->response_body_bytes_received;
+ if (client_io->msg_io->metrics->response_header_bytes_received >
io->read_header_buf->len + extra_bytes) {
+ client_io->msg_io->metrics->response_body_bytes_received =
+ client_io->msg_io->metrics->response_header_bytes_received -
io->read_header_buf->len - extra_bytes;
+ client_io->msg_io->metrics->response_header_bytes_received -=
client_io->msg_io->metrics->response_body_bytes_received;
}
}
@@ -593,11 +611,11 @@ io_read (SoupClientMessageIOHTTP1 *client_io,
case SOUP_MESSAGE_IO_STATE_BODY_START:
if (!io->body_istream) {
- GInputStream *body_istream = soup_body_input_stream_new (G_INPUT_STREAM
(io->istream),
+ GInputStream *body_istream = soup_body_input_stream_new (client_io->istream,
io->read_encoding,
io->read_length);
- io->body_istream = soup_session_setup_message_body_input_stream
(client_io->item->session,
+ io->body_istream = soup_session_setup_message_body_input_stream
(client_io->msg_io->item->session,
msg, body_istream,
SOUP_STAGE_MESSAGE_BODY);
g_object_unref (body_istream);
@@ -624,8 +642,8 @@ io_read (SoupClientMessageIOHTTP1 *client_io,
if (nread == 0)
io->read_state = SOUP_MESSAGE_IO_STATE_BODY_DONE;
- if (client_io->metrics)
- client_io->metrics->response_body_size += nread;
+ if (client_io->msg_io->metrics)
+ client_io->msg_io->metrics->response_body_size += nread;
break;
}
@@ -653,14 +671,14 @@ request_is_restartable (SoupMessage *msg, GError *error)
SoupClientMessageIOHTTP1 *client_io = (SoupClientMessageIOHTTP1 *)soup_message_get_io_data (msg);
SoupMessageIOData *io;
- if (!client_io)
+ if (!client_io || !client_io->msg_io)
return FALSE;
- io = &client_io->base;
+ io = &client_io->msg_io->base;
return (io->read_state <= SOUP_MESSAGE_IO_STATE_HEADERS &&
io->read_header_buf->len == 0 &&
- soup_connection_get_ever_used (soup_message_get_connection (client_io->item->msg)) &&
+ soup_connection_get_ever_used (soup_message_get_connection (client_io->msg_io->item->msg)) &&
!g_error_matches (error, G_IO_ERROR, G_IO_ERROR_TIMED_OUT) &&
!g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK) &&
!g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED) &&
@@ -676,11 +694,14 @@ io_run_until (SoupClientMessageIOHTTP1 *client_io,
GCancellable *cancellable,
GError **error)
{
- SoupMessageIOData *io = &client_io->base;
+ SoupMessageIOData *io;
SoupMessage *msg;
gboolean progress = TRUE, done;
GError *my_error = NULL;
+ g_assert (client_io); // Silence clang static analysis
+ io = &client_io->msg_io->base;
+
if (g_cancellable_set_error_if_cancelled (cancellable, error))
return FALSE;
else if (!io) {
@@ -690,8 +711,7 @@ io_run_until (SoupClientMessageIOHTTP1 *client_io,
return FALSE;
}
- g_assert (client_io); // Silence clang static analysis
- msg = client_io->item->msg;
+ msg = client_io->msg_io->item->msg;
g_object_ref (msg);
while (progress && (SoupClientMessageIOHTTP1 *)soup_message_get_io_data (msg) == client_io &&
@@ -746,8 +766,8 @@ io_run_until (SoupClientMessageIOHTTP1 *client_io,
/* FIXME: Expand and generalise sysprof support:
* https://gitlab.gnome.org/GNOME/sysprof/-/issues/43 */
- sysprof_collector_mark_printf (client_io->begin_time_nsec,
- SYSPROF_CAPTURE_CURRENT_TIME - client_io->begin_time_nsec,
+ sysprof_collector_mark_printf (client_io->msg_io->begin_time_nsec,
+ SYSPROF_CAPTURE_CURRENT_TIME -
client_io->msg_io->begin_time_nsec,
"libsoup", "message",
"%s request/response to %s: "
"read %" G_GOFFSET_FORMAT "B, "
@@ -778,7 +798,7 @@ soup_message_io_finish (SoupMessage *msg,
SoupClientMessageIOHTTP1 *io = (SoupClientMessageIOHTTP1 *)soup_message_get_io_data (msg);
/* Connection got closed, but we can safely try again. */
- io->item->state = SOUP_MESSAGE_RESTARTING;
+ io->msg_io->item->state = SOUP_MESSAGE_RESTARTING;
} else if (error) {
soup_message_set_metrics_timestamp (msg, SOUP_MESSAGE_METRICS_RESPONSE_END);
}
@@ -801,7 +821,7 @@ soup_client_message_io_http1_run (SoupClientMessageIO *iface,
gboolean blocking)
{
SoupClientMessageIOHTTP1 *client_io = (SoupClientMessageIOHTTP1 *)iface;
- SoupMessageIOData *io = &client_io->base;
+ SoupMessageIOData *io = &client_io->msg_io->base;
GError *error = NULL;
if (io->io_source) {
@@ -815,12 +835,14 @@ soup_client_message_io_http1_run (SoupClientMessageIO *iface,
if (io_run_until (client_io, blocking,
SOUP_MESSAGE_IO_STATE_DONE,
SOUP_MESSAGE_IO_STATE_DONE,
- client_io->item->cancellable, &error)) {
+ client_io->msg_io->item->cancellable, &error)) {
soup_message_io_finished (msg);
} else if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
g_clear_error (&error);
io->io_source = soup_message_io_data_get_source (io, G_OBJECT (msg),
- client_io->item->cancellable,
+ client_io->istream,
+ client_io->ostream,
+ client_io->msg_io->item->cancellable,
(SoupMessageIOSourceFunc)io_run_ready,
NULL);
g_source_set_priority (io->io_source,
@@ -872,8 +894,8 @@ static void
io_run_until_read_async (SoupClientMessageIOHTTP1 *client_io,
GTask *task)
{
- SoupMessageIOData *io = &client_io->base;
- SoupMessage *msg = client_io->item->msg;
+ SoupMessageIOData *io = &client_io->msg_io->base;
+ SoupMessage *msg = client_io->msg_io->item->msg;
GError *error = NULL;
if (io->io_source) {
@@ -894,7 +916,10 @@ io_run_until_read_async (SoupClientMessageIOHTTP1 *client_io,
if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
g_error_free (error);
- io->io_source = soup_message_io_data_get_source (io, G_OBJECT (msg), g_task_get_cancellable
(task),
+ io->io_source = soup_message_io_data_get_source (io, G_OBJECT (msg),
+ client_io->istream,
+ client_io->ostream,
+ g_task_get_cancellable (task),
(SoupMessageIOSourceFunc)io_run_until_read_ready,
task);
g_source_set_priority (io->io_source, g_task_get_priority (task));
@@ -937,9 +962,9 @@ soup_client_message_io_http1_run_until_finish (SoupClientMessageIO *iface,
g_object_ref (msg);
- if (io) {
- if (io->base.read_state < SOUP_MESSAGE_IO_STATE_BODY_DONE)
- io->base.read_state = SOUP_MESSAGE_IO_STATE_FINISHING;
+ if (io && io->msg_io) {
+ if (io->msg_io->base.read_state < SOUP_MESSAGE_IO_STATE_BODY_DONE)
+ io->msg_io->base.read_state = SOUP_MESSAGE_IO_STATE_FINISHING;
}
success = io_run_until (io, blocking,
@@ -955,8 +980,8 @@ static void
client_stream_eof (SoupClientInputStream *stream,
SoupClientMessageIOHTTP1 *io)
{
- if (io && io->base.read_state == SOUP_MESSAGE_IO_STATE_BODY)
- io->base.read_state = SOUP_MESSAGE_IO_STATE_BODY_DONE;
+ if (io && io->msg_io && io->msg_io->base.read_state == SOUP_MESSAGE_IO_STATE_BODY)
+ io->msg_io->base.read_state = SOUP_MESSAGE_IO_STATE_BODY_DONE;
}
static GInputStream *
@@ -967,7 +992,9 @@ soup_client_message_io_http1_get_response_stream (SoupClientMessageIO *iface,
SoupClientMessageIOHTTP1 *io = (SoupClientMessageIOHTTP1 *)iface;
GInputStream *client_stream;
- client_stream = soup_client_input_stream_new (io->base.body_istream, msg);
+ g_assert (io->msg_io && io->msg_io->item->msg == msg);
+
+ client_stream = soup_client_input_stream_new (io->msg_io->base.body_istream, msg);
g_signal_connect (client_stream, "eof",
G_CALLBACK (client_stream_eof), io);
@@ -981,21 +1008,30 @@ soup_client_message_io_http1_send_item (SoupClientMessageIO *iface,
gpointer user_data)
{
SoupClientMessageIOHTTP1 *io = (SoupClientMessageIOHTTP1 *)iface;
+ SoupMessageIOHTTP1 *msg_io;
+
+ msg_io = g_new0 (SoupMessageIOHTTP1, 1);
+ msg_io->item = soup_message_queue_item_ref (item);
+ msg_io->base.completion_cb = completion_cb;
+ msg_io->base.completion_data = user_data;
- io->item = soup_message_queue_item_ref (item);
- io->base.completion_cb = completion_cb;
- io->base.completion_data = user_data;
+ msg_io->base.read_header_buf = g_byte_array_new ();
+ msg_io->base.write_buf = g_string_new (NULL);
- io->metrics = soup_message_get_metrics (io->item->msg);
- if (io->metrics) {
- g_signal_connect_object (io->base.istream, "read-data",
+ msg_io->base.read_state = SOUP_MESSAGE_IO_STATE_NOT_STARTED;
+ msg_io->base.write_state = SOUP_MESSAGE_IO_STATE_HEADERS;
+ msg_io->metrics = soup_message_get_metrics (msg_io->item->msg);
+ if (msg_io->metrics) {
+ g_signal_connect_object (io->istream, "read-data",
G_CALLBACK (response_network_stream_read_data_cb),
- io->item->msg, G_CONNECT_SWAPPED);
+ msg_io->item->msg, G_CONNECT_SWAPPED);
}
#ifdef HAVE_SYSPROF
- io->begin_time_nsec = SYSPROF_CAPTURE_CURRENT_TIME;
+ msg_io->begin_time_nsec = SYSPROF_CAPTURE_CURRENT_TIME;
#endif
+
+ io->msg_io = msg_io;
}
static void
@@ -1004,9 +1040,10 @@ soup_client_message_io_http1_pause (SoupClientMessageIO *iface,
{
SoupClientMessageIOHTTP1 *io = (SoupClientMessageIOHTTP1 *)iface;
- g_return_if_fail (io->base.read_state < SOUP_MESSAGE_IO_STATE_BODY);
+ g_assert (io->msg_io && io->msg_io->item->msg == msg);
+ g_assert (io->msg_io->base.read_state < SOUP_MESSAGE_IO_STATE_BODY);
- soup_message_io_data_pause (&io->base);
+ soup_message_io_data_pause (&io->msg_io->base);
}
static void
@@ -1015,8 +1052,10 @@ soup_client_message_io_http1_unpause (SoupClientMessageIO *iface,
{
SoupClientMessageIOHTTP1 *io = (SoupClientMessageIOHTTP1 *)iface;
- g_return_if_fail (io->base.read_state < SOUP_MESSAGE_IO_STATE_BODY);
- io->base.paused = FALSE;
+ g_assert (io->msg_io && io->msg_io->item->msg == msg);
+ g_assert (io->msg_io->base.read_state < SOUP_MESSAGE_IO_STATE_BODY);
+
+ io->msg_io->base.paused = FALSE;
}
static gboolean
@@ -1025,7 +1064,9 @@ soup_client_message_io_http1_is_paused (SoupClientMessageIO *iface,
{
SoupClientMessageIOHTTP1 *io = (SoupClientMessageIOHTTP1 *)iface;
- return io->base.paused;
+ g_assert (io->msg_io && io->msg_io->item->msg == msg);
+
+ return io->msg_io->base.paused;
}
static const SoupClientMessageIOFuncs io_funcs = {
@@ -1049,15 +1090,9 @@ soup_client_message_io_http1_new (GIOStream *stream)
SoupClientMessageIOHTTP1 *io;
io = g_slice_new0 (SoupClientMessageIOHTTP1);
- io->base.iostream = g_object_ref (stream);
- io->base.istream = SOUP_FILTER_INPUT_STREAM (g_io_stream_get_input_stream (io->base.iostream));
- io->base.ostream = g_io_stream_get_output_stream (io->base.iostream);
-
- io->base.read_header_buf = g_byte_array_new ();
- io->base.write_buf = g_string_new (NULL);
-
- io->base.read_state = SOUP_MESSAGE_IO_STATE_NOT_STARTED;
- io->base.write_state = SOUP_MESSAGE_IO_STATE_HEADERS;
+ io->iostream = g_object_ref (stream);
+ io->istream = g_io_stream_get_input_stream (io->iostream);
+ io->ostream = g_io_stream_get_output_stream (io->iostream);
io->iface.funcs = &io_funcs;
diff --git a/libsoup/soup-connection.c b/libsoup/soup-connection.c
index c5283b6d..3b65cc37 100644
--- a/libsoup/soup-connection.c
+++ b/libsoup/soup-connection.c
@@ -410,6 +410,8 @@ soup_connection_set_connection (SoupConnection *conn,
{
SoupConnectionPrivate *priv = soup_connection_get_instance_private (conn);
+ g_clear_pointer (&priv->io_data, soup_client_message_io_destroy);
+
g_clear_object (&priv->connection);
priv->connection = connection;
g_clear_object (&priv->iostream);
@@ -564,6 +566,9 @@ soup_connection_complete (SoupConnection *conn)
NULL);
}
+ g_assert (!priv->io_data);
+ priv->io_data = soup_client_message_io_http1_new (priv->iostream);
+
soup_connection_set_state (conn, SOUP_CONNECTION_IN_USE);
priv->unused_timeout = time (NULL) + SOUP_CONNECTION_UNUSED_TIMEOUT;
start_idle_timer (conn);
@@ -744,6 +749,10 @@ tunnel_handshake_ready_cb (GTlsConnection *tls_connection,
if (g_tls_connection_handshake_finish (tls_connection, result, &error)) {
soup_connection_event (conn, G_SOCKET_CLIENT_TLS_HANDSHAKED, NULL);
soup_connection_event (conn, G_SOCKET_CLIENT_COMPLETE, NULL);
+
+ g_assert (!priv->io_data);
+ priv->io_data = soup_client_message_io_http1_new (priv->iostream);
+
g_task_return_boolean (task, TRUE);
} else {
g_task_return_error (task, error);
@@ -832,6 +841,9 @@ soup_connection_tunnel_handshake (SoupConnection *conn,
soup_connection_event (conn, G_SOCKET_CLIENT_TLS_HANDSHAKED, NULL);
soup_connection_event (conn, G_SOCKET_CLIENT_COMPLETE, NULL);
+ g_assert (!priv->io_data);
+ priv->io_data = soup_client_message_io_http1_new (priv->iostream);
+
return TRUE;
}
@@ -1058,22 +1070,9 @@ soup_connection_setup_message_io (SoupConnection *conn,
else
priv->reusable = FALSE;
- g_assert (priv->io_data == NULL);
- priv->io_data = soup_client_message_io_http1_new (priv->iostream);
-
return priv->io_data;
}
-void
-soup_connection_message_io_finished (SoupConnection *conn,
- SoupMessage *msg)
-{
- SoupConnectionPrivate *priv = soup_connection_get_instance_private (conn);
-
- g_assert (priv->current_msg == msg);
- g_clear_pointer (&priv->io_data, soup_client_message_io_destroy);
-}
-
GTlsCertificate *
soup_connection_get_tls_certificate (SoupConnection *conn)
{
diff --git a/libsoup/soup-connection.h b/libsoup/soup-connection.h
index e0c5ab18..991d001d 100644
--- a/libsoup/soup-connection.h
+++ b/libsoup/soup-connection.h
@@ -66,8 +66,6 @@ gboolean soup_connection_get_ever_used (SoupConnection *conn);
SoupClientMessageIO *soup_connection_setup_message_io (SoupConnection *conn,
SoupMessage *msg);
-void soup_connection_message_io_finished (SoupConnection *conn,
- SoupMessage *msg);
GTlsCertificate *soup_connection_get_tls_certificate (SoupConnection *conn);
GTlsCertificateFlags soup_connection_get_tls_certificate_errors (SoupConnection *conn);
diff --git a/libsoup/soup-message-io-data.c b/libsoup/soup-message-io-data.c
index bce43145..49f2e591 100644
--- a/libsoup/soup-message-io-data.c
+++ b/libsoup/soup-message-io-data.c
@@ -28,8 +28,6 @@ soup_message_io_data_cleanup (SoupMessageIOData *io)
io->io_source = NULL;
}
- if (io->iostream)
- g_object_unref (io->iostream);
if (io->body_istream)
g_object_unref (io->body_istream);
if (io->body_ostream)
@@ -47,11 +45,12 @@ soup_message_io_data_cleanup (SoupMessageIOData *io)
}
gboolean
-soup_message_io_data_read_headers (SoupMessageIOData *io,
- gboolean blocking,
- GCancellable *cancellable,
- gushort *extra_bytes,
- GError **error)
+soup_message_io_data_read_headers (SoupMessageIOData *io,
+ SoupFilterInputStream *istream,
+ gboolean blocking,
+ GCancellable *cancellable,
+ gushort *extra_bytes,
+ GError **error)
{
gssize nread, old_len;
gboolean got_lf;
@@ -59,7 +58,7 @@ soup_message_io_data_read_headers (SoupMessageIOData *io,
while (1) {
old_len = io->read_header_buf->len;
g_byte_array_set_size (io->read_header_buf, old_len + RESPONSE_BLOCK_SIZE);
- nread = soup_filter_input_stream_read_line (io->istream,
+ nread = soup_filter_input_stream_read_line (istream,
io->read_header_buf->data + old_len,
RESPONSE_BLOCK_SIZE,
blocking,
@@ -138,8 +137,10 @@ message_io_source_check (GSource *source)
}
GSource *
-soup_message_io_data_get_source (SoupMessageIOData *io,
+soup_message_io_data_get_source (SoupMessageIOData *io,
GObject *msg,
+ GInputStream *istream,
+ GOutputStream *ostream,
GCancellable *cancellable,
SoupMessageIOSourceFunc callback,
gpointer user_data)
@@ -153,21 +154,25 @@ soup_message_io_data_get_source (SoupMessageIOData *io,
} else if (io->async_wait) {
base_source = g_cancellable_source_new (io->async_wait);
} else if (SOUP_MESSAGE_IO_STATE_POLLABLE (io->read_state)) {
- GPollableInputStream *istream;
+ GPollableInputStream *stream;
if (io->body_istream)
- istream = G_POLLABLE_INPUT_STREAM (io->body_istream);
- else
- istream = G_POLLABLE_INPUT_STREAM (io->istream);
- base_source = g_pollable_input_stream_create_source (istream, cancellable);
+ stream = G_POLLABLE_INPUT_STREAM (io->body_istream);
+ else if (istream)
+ stream = G_POLLABLE_INPUT_STREAM (istream);
+ else
+ g_assert_not_reached ();
+ base_source = g_pollable_input_stream_create_source (stream, cancellable);
} else if (SOUP_MESSAGE_IO_STATE_POLLABLE (io->write_state)) {
- GPollableOutputStream *ostream;
+ GPollableOutputStream *stream;
if (io->body_ostream)
- ostream = G_POLLABLE_OUTPUT_STREAM (io->body_ostream);
- else
- ostream = G_POLLABLE_OUTPUT_STREAM (io->ostream);
- base_source = g_pollable_output_stream_create_source (ostream, cancellable);
+ stream = G_POLLABLE_OUTPUT_STREAM (io->body_ostream);
+ else if (ostream)
+ stream = G_POLLABLE_OUTPUT_STREAM (ostream);
+ else
+ g_assert_not_reached ();
+ base_source = g_pollable_output_stream_create_source (stream, cancellable);
} else
base_source = g_timeout_source_new (0);
diff --git a/libsoup/soup-message-io-data.h b/libsoup/soup-message-io-data.h
index 80722239..5cf255c8 100644
--- a/libsoup/soup-message-io-data.h
+++ b/libsoup/soup-message-io-data.h
@@ -34,11 +34,8 @@ typedef enum {
state != SOUP_MESSAGE_IO_STATE_BODY_DONE)
typedef struct {
- GIOStream *iostream;
- SoupFilterInputStream *istream;
- GInputStream *body_istream;
- GOutputStream *ostream;
- GOutputStream *body_ostream;
+ GInputStream *body_istream;
+ GOutputStream *body_ostream;
SoupMessageIOState read_state;
SoupEncoding read_encoding;
@@ -67,16 +64,19 @@ typedef struct {
#endif
} SoupMessageIOData;
-void soup_message_io_data_cleanup (SoupMessageIOData *io);
+void soup_message_io_data_cleanup (SoupMessageIOData *io);
-gboolean soup_message_io_data_read_headers (SoupMessageIOData *io,
- gboolean blocking,
- GCancellable *cancellable,
- gushort *extra_bytes,
- GError **error);
+gboolean soup_message_io_data_read_headers (SoupMessageIOData *io,
+ SoupFilterInputStream *istream,
+ gboolean blocking,
+ GCancellable *cancellable,
+ gushort *extra_bytes,
+ GError **error);
GSource *soup_message_io_data_get_source (SoupMessageIOData *io,
GObject *msg,
+ GInputStream *istream,
+ GOutputStream *ostream,
GCancellable *cancellable,
SoupMessageIOSourceFunc callback,
gpointer user_data);
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]