[libsoup/carlosgc/http2-io: 1/2] io-http2: simplify async io handling
- From: Carlos Garcia Campos <carlosgc src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [libsoup/carlosgc/http2-io: 1/2] io-http2: simplify async io handling
- Date: Mon, 31 May 2021 10:54:51 +0000 (UTC)
commit 3357288cec5471614351e9ef23bdf2fd98e2ab58
Author: Carlos Garcia Campos <cgarcia igalia com>
Date: Sun May 30 08:50:49 2021 +0200
io-http2: simplify async io handling
Use a global polling source for reading and process any pending io
operation after every successful read. Every time we submit data to the
session we try to write, using a single global polling source in case
the operation would block until everything is written. This simplifies
the io handling and avoids the creation and destruction of a lot of
polling sources.
libsoup/http2/soup-body-input-stream-http2.c | 87 ++--
libsoup/http2/soup-body-input-stream-http2.h | 5 +-
libsoup/http2/soup-client-message-io-http2.c | 660 +++++++++++----------------
libsoup/soup-message-private.h | 1 +
libsoup/soup-message.c | 8 +
tests/http2-body-stream-test.c | 12 +-
tests/http2-test.c | 10 +-
7 files changed, 320 insertions(+), 463 deletions(-)
---
diff --git a/libsoup/http2/soup-body-input-stream-http2.c b/libsoup/http2/soup-body-input-stream-http2.c
index 2dacb3e9..82f14ee5 100644
--- a/libsoup/http2/soup-body-input-stream-http2.c
+++ b/libsoup/http2/soup-body-input-stream-http2.c
@@ -43,11 +43,11 @@ struct _SoupBodyInputStreamHttp2 {
typedef struct {
GSList *chunks;
- GPollableInputStream *parent_stream;
gsize start_offset;
gsize len;
gsize pos;
gboolean completed;
+ GCancellable *need_more_data_cancellable;
} SoupBodyInputStreamHttp2Private;
static void soup_body_input_stream_http2_pollable_iface_init (GPollableInputStreamInterface *iface);
@@ -72,24 +72,15 @@ static guint signals [LAST_SIGNAL] = { 0 };
* Returns: a new #GInputStream
*/
GInputStream *
-soup_body_input_stream_http2_new (GPollableInputStream *parent_stream)
+soup_body_input_stream_http2_new ()
{
- GInputStream *stream;
- SoupBodyInputStreamHttp2Private *priv;
-
- g_assert (G_IS_POLLABLE_INPUT_STREAM (parent_stream));
-
- stream = g_object_new (SOUP_TYPE_BODY_INPUT_STREAM_HTTP2, NULL);
- priv = soup_body_input_stream_http2_get_instance_private (SOUP_BODY_INPUT_STREAM_HTTP2 (stream));
- priv->parent_stream = g_object_ref (parent_stream);
-
- return stream;
+ return G_INPUT_STREAM (g_object_new (SOUP_TYPE_BODY_INPUT_STREAM_HTTP2, NULL));
}
void
soup_body_input_stream_http2_add_data (SoupBodyInputStreamHttp2 *stream,
- const guint8 *data,
- gsize size)
+ const guint8 *data,
+ gsize size)
{
SoupBodyInputStreamHttp2Private *priv;
@@ -100,6 +91,21 @@ soup_body_input_stream_http2_add_data (SoupBodyInputStreamHttp2 *stream,
priv->chunks = g_slist_append (priv->chunks, g_bytes_new (data, size));
priv->len += size;
+ if (priv->need_more_data_cancellable) {
+ g_cancellable_cancel (priv->need_more_data_cancellable);
+ g_clear_object (&priv->need_more_data_cancellable);
+ }
+}
+
+gboolean
+soup_body_input_stream_http2_is_blocked (SoupBodyInputStreamHttp2 *stream)
+{
+ SoupBodyInputStreamHttp2Private *priv;
+
+ g_return_val_if_fail (SOUP_IS_BODY_INPUT_STREAM_HTTP2 (stream), FALSE);
+
+ priv = soup_body_input_stream_http2_get_instance_private (stream);
+ return priv->need_more_data_cancellable != NULL;
}
static gssize
@@ -174,9 +180,7 @@ soup_body_input_stream_http2_read_real (GInputStream *stream,
if (count == 0 && blocking && !priv->completed) {
GError *read_error = NULL;
g_signal_emit (memory_stream, signals[NEED_MORE_DATA], 0,
- cancellable,
- TRUE,
- &read_error);
+ cancellable, &read_error);
if (read_error) {
g_propagate_error (error, read_error);
@@ -214,28 +218,7 @@ soup_body_input_stream_http2_read_nonblocking (GPollableInputStream *stream,
gsize read = soup_body_input_stream_http2_read_real (G_INPUT_STREAM (stream), FALSE, buffer, count,
NULL, &inner_error);
if (read == 0 && !priv->completed && !inner_error) {
- /* Try requesting more reads from the io backend */
- GError *inner_error = NULL;
-
- g_signal_emit (memory_stream, signals[NEED_MORE_DATA], 0,
- NULL, FALSE, &inner_error);
-
- if (inner_error) {
- g_propagate_error (error, inner_error);
-
- return -1;
- }
-
- if (priv->completed)
- return soup_body_input_stream_http2_read_real (G_INPUT_STREAM (stream), FALSE,
buffer, count, NULL, error);
-
- if (priv->pos < priv->len) {
- read = soup_body_input_stream_http2_read_real (G_INPUT_STREAM (stream), FALSE,
buffer, count, NULL, NULL);
- if (read > 0)
- return read;
- }
-
- g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK, "Operation would block");
+ g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK, _("Operation would block"));
return -1;
}
@@ -250,6 +233,10 @@ soup_body_input_stream_http2_complete (SoupBodyInputStreamHttp2 *stream)
{
SoupBodyInputStreamHttp2Private *priv = soup_body_input_stream_http2_get_instance_private (stream);
priv->completed = TRUE;
+ if (priv->need_more_data_cancellable) {
+ g_cancellable_cancel (priv->need_more_data_cancellable);
+ g_clear_object (&priv->need_more_data_cancellable);
+ }
}
static gssize
@@ -352,13 +339,9 @@ soup_body_input_stream_http2_close_finish (GInputStream *stream,
static gboolean
soup_body_input_stream_http2_is_readable (GPollableInputStream *stream)
{
- SoupBodyInputStreamHttp2 *memory_stream = SOUP_BODY_INPUT_STREAM_HTTP2 (stream);
- SoupBodyInputStreamHttp2Private *priv = soup_body_input_stream_http2_get_instance_private
(memory_stream);
-
- if (priv->pos < priv->len || priv->completed)
- return TRUE;
+ SoupBodyInputStreamHttp2Private *priv = soup_body_input_stream_http2_get_instance_private
(SOUP_BODY_INPUT_STREAM_HTTP2 (stream));
- return g_pollable_input_stream_is_readable (G_POLLABLE_INPUT_STREAM (priv->parent_stream));
+ return priv->pos < priv->len || priv->completed;
}
static GSource *
@@ -368,10 +351,9 @@ soup_body_input_stream_http2_create_source (GPollableInputStream *stream,
SoupBodyInputStreamHttp2Private *priv = soup_body_input_stream_http2_get_instance_private
(SOUP_BODY_INPUT_STREAM_HTTP2 (stream));
GSource *base_source, *pollable_source;
- if (g_pollable_input_stream_is_readable (stream))
- base_source = g_timeout_source_new (0);
- else
- base_source = g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM
(priv->parent_stream), NULL);
+ if (!priv->need_more_data_cancellable)
+ priv->need_more_data_cancellable = g_cancellable_new ();
+ base_source = g_cancellable_source_new (priv->need_more_data_cancellable);
pollable_source = g_pollable_source_new_full (stream, base_source, cancellable);
g_source_set_name (pollable_source, "SoupMemoryStreamSource");
@@ -387,6 +369,10 @@ soup_body_input_stream_http2_dispose (GObject *object)
SoupBodyInputStreamHttp2Private *priv = soup_body_input_stream_http2_get_instance_private (stream);
priv->completed = TRUE;
+ if (priv->need_more_data_cancellable) {
+ g_cancellable_cancel (priv->need_more_data_cancellable);
+ g_clear_object (&priv->need_more_data_cancellable);
+ }
G_OBJECT_CLASS (soup_body_input_stream_http2_parent_class)->dispose (object);
}
@@ -398,7 +384,6 @@ soup_body_input_stream_http2_finalize (GObject *object)
SoupBodyInputStreamHttp2Private *priv = soup_body_input_stream_http2_get_instance_private (stream);
g_slist_free_full (priv->chunks, (GDestroyNotify)g_bytes_unref);
- g_clear_object (&priv->parent_stream);
G_OBJECT_CLASS (soup_body_input_stream_http2_parent_class)->finalize (object);
}
@@ -444,5 +429,5 @@ soup_body_input_stream_http2_class_init (SoupBodyInputStreamHttp2Class *klass)
NULL, NULL,
NULL,
G_TYPE_ERROR,
- 2, G_TYPE_CANCELLABLE, G_TYPE_BOOLEAN);
+ 1, G_TYPE_CANCELLABLE);
}
diff --git a/libsoup/http2/soup-body-input-stream-http2.h b/libsoup/http2/soup-body-input-stream-http2.h
index 9e25ed34..b55cd6f5 100644
--- a/libsoup/http2/soup-body-input-stream-http2.h
+++ b/libsoup/http2/soup-body-input-stream-http2.h
@@ -6,7 +6,7 @@
#define SOUP_TYPE_BODY_INPUT_STREAM_HTTP2 (soup_body_input_stream_http2_get_type ())
G_DECLARE_FINAL_TYPE (SoupBodyInputStreamHttp2, soup_body_input_stream_http2, SOUP, BODY_INPUT_STREAM_HTTP2,
GInputStream)
-GInputStream * soup_body_input_stream_http2_new (GPollableInputStream *parent_stream);
+GInputStream * soup_body_input_stream_http2_new (void);
void soup_body_input_stream_http2_add_data (SoupBodyInputStreamHttp2 *stream,
const guint8 *data,
@@ -14,4 +14,7 @@ void soup_body_input_stream_http2_add_data (SoupBodyInputStreamHttp2
void soup_body_input_stream_http2_complete (SoupBodyInputStreamHttp2 *stream);
+/* This is only used for tests */
+gboolean soup_body_input_stream_http2_is_blocked (SoupBodyInputStreamHttp2 *stream);
+
G_END_DECLS
diff --git a/libsoup/http2/soup-client-message-io-http2.c b/libsoup/http2/soup-client-message-io-http2.c
index fbb73f7f..6df9ff0a 100644
--- a/libsoup/http2/soup-client-message-io-http2.c
+++ b/libsoup/http2/soup-client-message-io-http2.c
@@ -66,10 +66,13 @@ typedef struct {
GOutputStream *ostream;
guint64 connection_id;
- GMainContext *async_context;
+ GError *error;
+ GSource *read_source;
+ GSource *write_source;
GHashTable *messages;
GHashTable *closed_messages;
+ GList *pending_io_messages;
nghttp2_session *session;
@@ -78,12 +81,9 @@ typedef struct {
gssize write_buffer_size;
gssize written_bytes;
- GSource *reset_stream_source;
- GSource *idle_read_source;
gboolean is_shutdown;
-
GTask *close_task;
- GSource *close_source;
+ gboolean session_terminated;
gboolean goaway_sent;
} SoupClientMessageIOHTTP2;
@@ -95,14 +95,11 @@ typedef struct {
GInputStream *decoded_data_istream;
GInputStream *body_istream;
GTask *task;
- gboolean in_run_until_read_async;
+ gboolean in_io_try_sniff_content;
/* Request body logger */
SoupLogger *logger;
- /* Both data sources */
- GCancellable *data_source_cancellable;
-
/* Pollable data sources */
GSource *data_source_poll;
@@ -111,7 +108,6 @@ typedef struct {
GError *data_source_error;
gboolean data_source_eof;
- GSource *io_source;
SoupClientMessageIOHTTP2 *io; /* Unowned */
SoupMessageIOCompletionFn completion_cb;
gpointer completion_data;
@@ -122,12 +118,7 @@ typedef struct {
gboolean can_be_restarted;
} SoupHTTP2MessageData;
-static void io_run_until_read_async (SoupHTTP2MessageData *data);
-static gboolean io_read (SoupClientMessageIOHTTP2 *, gboolean, GCancellable *, GError **);
-static void io_write_until_stream_reset_is_sent (SoupHTTP2MessageData *data);
-static void io_idle_read (SoupClientMessageIOHTTP2 *io);
-static void io_close (SoupClientMessageIOHTTP2 *io);
-static void io_poll (SoupHTTP2MessageData *data);
+static void soup_client_message_io_http2_finished (SoupClientMessageIO *iface, SoupMessage *msg);
static void
NGCHECK (int return_code)
@@ -255,6 +246,18 @@ set_error_for_data (SoupHTTP2MessageData *data,
g_error_free (error);
}
+static void
+set_io_error (SoupClientMessageIOHTTP2 *io,
+ GError *error)
+{
+ h2_debug (io, NULL, "[SESSION] IO error: %s", error->message);
+
+ if (!io->error)
+ io->error = error;
+ else
+ g_error_free (error);
+}
+
static void
advance_state_from (SoupHTTP2MessageData *data,
SoupHTTP2IOState from,
@@ -279,18 +282,216 @@ advance_state_from (SoupHTTP2MessageData *data,
data->state = to;
}
+static void
+soup_http2_message_data_check_status (SoupHTTP2MessageData *data)
+{
+ SoupClientMessageIOHTTP2 *io = data->io;
+ SoupMessage *msg = data->msg;
+ GTask *task = data->task;
+ GError *error = NULL;
+
+ if (g_cancellable_set_error_if_cancelled (g_task_get_cancellable (task), &error)) {
+ io->pending_io_messages = g_list_remove (io->pending_io_messages, data);
+ data->task = NULL;
+ g_task_return_error (task, error);
+ g_object_unref (task);
+ return;
+ }
+
+ if (data->paused)
+ return;
+
+ if (io->error && !data->error)
+ data->error = g_error_copy (io->error);
+
+ if (data->error) {
+ GError *error = g_steal_pointer (&data->error);
+
+ if (data->can_be_restarted)
+ data->item->state = SOUP_MESSAGE_RESTARTING;
+ else
+ soup_message_set_metrics_timestamp (data->msg, SOUP_MESSAGE_METRICS_RESPONSE_END);
+ io->pending_io_messages = g_list_remove (io->pending_io_messages, data);
+ data->task = NULL;
+ soup_client_message_io_http2_finished ((SoupClientMessageIO *)io, msg);
+
+ g_task_return_error (task, error);
+ g_object_unref (task);
+ return;
+ }
+
+ if (data->state == STATE_READ_DATA_START && !soup_message_has_content_sniffer (msg))
+ advance_state_from (data, STATE_READ_DATA_START, STATE_READ_DATA);
+
+ if (data->state < STATE_READ_DATA)
+ return;
+
+ io->pending_io_messages = g_list_remove (io->pending_io_messages, data);
+ data->task = NULL;
+ g_task_return_boolean (task, TRUE);
+ g_object_unref (task);
+}
+
+static gboolean
+io_write (SoupClientMessageIOHTTP2 *io,
+ gboolean blocking,
+ GCancellable *cancellable,
+ GError **error)
+{
+ /* We must write all of nghttp2's buffer before we ask for more */
+ if (io->written_bytes == io->write_buffer_size)
+ io->write_buffer = NULL;
+
+ if (io->write_buffer == NULL) {
+ io->written_bytes = 0;
+ io->write_buffer_size = nghttp2_session_mem_send (io->session, (const
guint8**)&io->write_buffer);
+ NGCHECK (io->write_buffer_size);
+ if (io->write_buffer_size == 0) {
+ /* Done */
+ io->write_buffer = NULL;
+ return TRUE;
+ }
+ }
+
+ gssize ret = g_pollable_stream_write (io->ostream,
+ io->write_buffer + io->written_bytes,
+ io->write_buffer_size - io->written_bytes,
+ blocking, cancellable, error);
+ if (ret < 0)
+ return FALSE;
+
+ io->written_bytes += ret;
+ return TRUE;
+}
+
+static gboolean
+io_write_ready (GObject *stream,
+ SoupClientMessageIOHTTP2 *io)
+{
+ GError *error = NULL;
+
+ while (nghttp2_session_want_write (io->session) && !error)
+ io_write (io, FALSE, NULL, &error);
+
+ if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
+ g_error_free (error);
+ return G_SOURCE_CONTINUE;
+ }
+
+ if (error)
+ set_io_error (io, error);
+
+ g_clear_pointer (&io->write_source, g_source_unref);
+ return G_SOURCE_REMOVE;
+}
+
+static void
+io_try_write (SoupClientMessageIOHTTP2 *io)
+{
+ GError *error = NULL;
+
+ if (io->write_source)
+ return;
+
+ while (nghttp2_session_want_write (io->session) && !error)
+ io_write (io, FALSE, NULL, &error);
+
+ if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
+ g_clear_error (&error);
+ io->write_source = g_pollable_output_stream_create_source (G_POLLABLE_OUTPUT_STREAM
(io->ostream), NULL);
+ g_source_set_name (io->write_source, "Soup HTTP/2 write source");
+ g_source_set_callback (io->write_source, (GSourceFunc)io_write_ready, io, NULL);
+ g_source_attach (io->write_source, g_main_context_get_thread_default ());
+ }
+
+ if (error)
+ set_io_error (io, error);
+}
+
+static gboolean
+io_read (SoupClientMessageIOHTTP2 *io,
+ gboolean blocking,
+ GCancellable *cancellable,
+ GError **error)
+{
+ guint8 buffer[8192];
+ gssize read;
+ int ret;
+
+ if ((read = g_pollable_stream_read (io->istream, buffer, sizeof (buffer),
+ blocking, cancellable, error)) < 0)
+ return FALSE;
+
+ ret = nghttp2_session_mem_recv (io->session, buffer, read);
+ NGCHECK (ret);
+ return ret != 0;
+}
+
+static gboolean
+io_read_ready (GObject *stream,
+ SoupClientMessageIOHTTP2 *io)
+{
+ GError *error = NULL;
+ gboolean progress = TRUE;
+
+ while (nghttp2_session_want_read (io->session) && progress) {
+ progress = io_read (io, FALSE, NULL, &error);
+ if (progress) {
+ g_list_foreach (io->pending_io_messages,
+ (GFunc)soup_http2_message_data_check_status,
+ NULL);
+ }
+ }
+
+ if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
+ g_error_free (error);
+ return G_SOURCE_CONTINUE;
+ }
+
+ if (error)
+ set_io_error (io, error);
+
+ g_clear_pointer (&io->read_source, g_source_unref);
+ return G_SOURCE_REMOVE;
+}
+
+static void
+io_try_sniff_content (SoupHTTP2MessageData *data,
+ gboolean blocking,
+ GCancellable *cancellable)
+{
+ GError *error = NULL;
+
+ /* This can re-enter in sync mode */
+ if (data->in_io_try_sniff_content)
+ return;
+
+ data->in_io_try_sniff_content = TRUE;
+
+ if (soup_message_try_sniff_content (data->msg, data->decoded_data_istream, blocking, cancellable,
&error)) {
+ h2_debug (data->io, data, "[DATA] Sniffed content");
+ advance_state_from (data, STATE_READ_DATA_START, STATE_READ_DATA);
+ } else {
+ h2_debug (data->io, data, "[DATA] Sniffer stream was not ready %s", error->message);
+
+ g_clear_error (&error);
+ }
+
+ data->in_io_try_sniff_content = FALSE;
+}
+
static void
soup_client_message_io_http2_terminate_session (SoupClientMessageIOHTTP2 *io)
{
- if (io->goaway_sent)
+ if (io->session_terminated)
return;
if (g_hash_table_size (io->messages) != 0)
return;
- io->goaway_sent = TRUE;
+ io->session_terminated = TRUE;
NGCHECK (nghttp2_session_terminate_session (io->session, NGHTTP2_NO_ERROR));
- io_close (io);
+ io_try_write (io);
}
/* HTTP2 read callbacks */
@@ -329,15 +530,13 @@ on_header_callback (nghttp2_session *session,
static GError *
memory_stream_need_more_data_callback (SoupBodyInputStreamHttp2 *stream,
GCancellable *cancellable,
- gboolean blocking,
gpointer user_data)
{
SoupHTTP2MessageData *data = (SoupHTTP2MessageData*)user_data;
GError *error = NULL;
- if (!nghttp2_session_want_read (data->io->session))
- return blocking ? NULL : g_error_new_literal (G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
_("Operation would block"));
- io_read (data->io, blocking, cancellable, &error);
+ if (nghttp2_session_want_read (data->io->session))
+ io_read (data->io, TRUE, cancellable, &error);
return error;
}
@@ -364,7 +563,7 @@ on_begin_frame_callback (nghttp2_session *session,
case NGHTTP2_DATA:
if (data->state < STATE_READ_DATA_START) {
g_assert (!data->body_istream);
- data->body_istream = soup_body_input_stream_http2_new (G_POLLABLE_INPUT_STREAM
(data->io->istream));
+ data->body_istream = soup_body_input_stream_http2_new ();
g_signal_connect (data->body_istream, "need-more-data",
G_CALLBACK (memory_stream_need_more_data_callback), data);
@@ -455,8 +654,11 @@ on_frame_recv_callback (nghttp2_session *session,
case NGHTTP2_DATA:
if (data->metrics)
data->metrics->response_body_bytes_received += frame->data.hd.length +
FRAME_HEADER_SIZE;
- if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM && data->body_istream)
+ if (frame->hd.flags & NGHTTP2_FLAG_END_STREAM && data->body_istream) {
soup_body_input_stream_http2_complete (SOUP_BODY_INPUT_STREAM_HTTP2
(data->body_istream));
+ if (data->state == STATE_READ_DATA_START)
+ io_try_sniff_content (data, FALSE, data->cancellable);
+ }
break;
case NGHTTP2_RST_STREAM:
if (frame->rst_stream.error_code != NGHTTP2_NO_ERROR) {
@@ -487,6 +689,8 @@ on_data_chunk_recv_callback (nghttp2_session *session,
g_assert (msgdata->body_istream != NULL);
soup_body_input_stream_http2_add_data (SOUP_BODY_INPUT_STREAM_HTTP2 (msgdata->body_istream), data,
len);
+ if (msgdata->state == STATE_READ_DATA_START)
+ io_try_sniff_content (msgdata, FALSE, msgdata->cancellable);
return 0;
}
@@ -567,6 +771,14 @@ on_frame_send_callback (nghttp2_session *session,
h2_debug (io, data, "[SEND] [RST_STREAM] stream_id=%u", frame->hd.stream_id);
g_hash_table_foreach_remove (io->closed_messages, (GHRFunc)remove_closed_stream,
(gpointer)frame);
break;
+ case NGHTTP2_GOAWAY:
+ h2_debug (io, data, "[SEND] [%s]", frame_type_to_string (frame->hd.type));
+ io->goaway_sent = TRUE;
+ if (io->close_task) {
+ g_task_return_boolean (io->close_task, TRUE);
+ g_clear_object (&io->close_task);
+ }
+ break;
default:
h2_debug (io, data, "[SEND] [%s]", frame_type_to_string (frame->hd.type));
break;
@@ -604,15 +816,6 @@ on_stream_close_callback (nghttp2_session *session,
if (error_code == NGHTTP2_REFUSED_STREAM && data->state < STATE_READ_DATA)
data->can_be_restarted = TRUE;
- if (data->state < STATE_READ_DATA && data->task && !data->in_run_until_read_async) {
- /* Start polling the decoded data stream instead of the network input stream. */
- if (data->io_source) {
- g_source_destroy (data->io_source);
- g_clear_pointer (&data->io_source, g_source_unref);
- }
- io_poll (data);
- }
-
return 0;
}
@@ -622,10 +825,10 @@ on_data_readable (GInputStream *stream,
{
SoupHTTP2MessageData *data = (SoupHTTP2MessageData*)user_data;
- g_cancellable_cancel (data->data_source_cancellable);
- g_clear_object (&data->data_source_cancellable);
+ h2_debug (data->io, data, "on data readable");
NGCHECK (nghttp2_session_resume_data (data->io->session, data->stream_id));
+ io_try_write (data->io);
g_clear_pointer (&data->data_source_poll, g_source_unref);
return G_SOURCE_REMOVE;
@@ -642,9 +845,6 @@ on_data_read (GInputStream *source,
h2_debug (data->io, data, "[SEND_BODY] Read %zd", read);
- g_cancellable_cancel (data->data_source_cancellable);
- g_clear_object (&data->data_source_cancellable);
-
/* This operation may have outlived the message data in which
case this will have been cancelled. */
if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
@@ -663,6 +863,7 @@ on_data_read (GInputStream *source,
h2_debug (data->io, data, "[SEND_BODY] Resuming send");
NGCHECK (nghttp2_session_resume_data (data->io->session, data->stream_id));
+ io_try_write (data->io);
}
static void
@@ -714,8 +915,6 @@ on_data_source_read_callback (nghttp2_session *session,
g_source_attach (data->data_source_poll, g_main_context_get_thread_default
());
g_error_free (error);
- g_assert (!data->data_source_cancellable);
- data->data_source_cancellable = g_cancellable_new ();
return NGHTTP2_ERR_DEFERRED;
}
@@ -756,8 +955,6 @@ on_data_source_read_callback (nghttp2_session *session,
} else {
h2_debug (data->io, data, "[SEND_BODY] Reading async");
g_byte_array_set_size (data->data_source_buffer, length);
- g_assert (!data->data_source_cancellable);
- data->data_source_cancellable = g_cancellable_new ();
g_input_stream_read_async (in_stream, data->data_source_buffer->data, length,
get_data_io_priority (data),
data->cancellable,
@@ -810,11 +1007,6 @@ soup_http2_message_data_close (SoupHTTP2MessageData *data)
g_clear_pointer (&data->item, soup_message_queue_item_unref);
g_clear_object (&data->decoded_data_istream);
- if (data->io_source) {
- g_source_destroy (data->io_source);
- g_clear_pointer (&data->io_source, g_source_unref);
- }
-
if (data->data_source_poll) {
g_source_destroy (data->data_source_poll);
g_clear_pointer (&data->data_source_poll, g_source_unref);
@@ -823,8 +1015,6 @@ soup_http2_message_data_close (SoupHTTP2MessageData *data)
g_clear_error (&data->data_source_error);
g_clear_pointer (&data->data_source_buffer, g_byte_array_unref);
- g_clear_object (&data->data_source_cancellable);
-
g_clear_error (&data->error);
data->completion_cb = NULL;
@@ -955,6 +1145,7 @@ send_message_request (SoupMessage *msg,
data->stream_id = nghttp2_submit_request (io->session, &priority_spec, (const nghttp2_nv
*)headers->data, headers->len, body_stream ? &data_provider : NULL, data);
h2_debug (io, data, "[SESSION] Request made for %s%s", authority_header, path_and_query);
+ io_try_write (io);
g_array_free (headers, TRUE);
g_free (authority);
@@ -962,8 +1153,6 @@ send_message_request (SoupMessage *msg,
g_free (path_and_query);
}
-
-
static void
soup_client_message_io_http2_send_item (SoupClientMessageIO *iface,
SoupMessageQueueItem *item,
@@ -973,10 +1162,6 @@ soup_client_message_io_http2_send_item (SoupClientMessageIO *iface,
SoupClientMessageIOHTTP2 *io = (SoupClientMessageIOHTTP2 *)iface;
SoupHTTP2MessageData *data = add_message_to_io_data (io, item, completion_cb, user_data);
- if (io->idle_read_source) {
- g_source_destroy (io->idle_read_source);
- g_clear_pointer (&io->idle_read_source, g_source_unref);
- }
send_message_request (item->msg, io, data);
}
@@ -1034,9 +1219,7 @@ soup_client_message_io_http2_finished (SoupClientMessageIO *iface,
return;
}
- io_write_until_stream_reset_is_sent (data);
- if (g_hash_table_size (io->messages) == 0)
- io_idle_read (io);
+ io_try_write (io);
}
static void
@@ -1051,12 +1234,6 @@ soup_client_message_io_http2_pause (SoupClientMessageIO *iface,
if (data->paused)
g_warn_if_reached ();
- if (data->io_source) {
- g_source_destroy (data->io_source);
- g_source_unref (data->io_source);
- data->io_source = NULL;
- }
-
data->paused = TRUE;
}
@@ -1073,6 +1250,8 @@ soup_client_message_io_http2_unpause (SoupClientMessageIO *iface,
g_warn_if_reached ();
data->paused = FALSE;
+
+ soup_http2_message_data_check_status (data);
}
static void
@@ -1117,66 +1296,6 @@ soup_client_message_io_http2_is_reusable (SoupClientMessageIO *iface)
return soup_client_message_io_http2_is_open (iface);
}
-static gboolean
-message_source_check (GSource *source)
-{
- SoupMessageIOSource *message_source = (SoupMessageIOSource *)source;
-
- if (message_source->paused) {
- if (soup_message_is_io_paused (SOUP_MESSAGE (message_source->msg)))
- return FALSE;
- return TRUE;
- }
-
- return FALSE;
-}
-
-static gboolean
-io_poll_ready (SoupMessage *msg,
- gpointer user_data)
-{
- SoupHTTP2MessageData *data = user_data;
-
- io_run_until_read_async (data);
-
- return G_SOURCE_REMOVE;
-}
-
-static void
-io_poll (SoupHTTP2MessageData *data)
-{
- GSource *base_source;
- GCancellable *cancellable;
-
- g_assert (data->task);
- g_assert (!data->io_source);
-
- cancellable = g_task_get_cancellable (data->task);
-
- /* TODO: Handle mixing writes in? */
- if (data->paused)
- base_source = cancellable ? g_cancellable_source_new (cancellable) : NULL;
- else if (data->state < STATE_WRITE_DONE && data->data_source_cancellable)
- base_source = g_cancellable_source_new (data->data_source_cancellable);
- else if (data->state < STATE_WRITE_DONE && nghttp2_session_want_write (data->io->session))
- base_source = g_pollable_output_stream_create_source (G_POLLABLE_OUTPUT_STREAM
(data->io->ostream), cancellable);
- else if (data->state < STATE_READ_DONE && data->decoded_data_istream)
- base_source = g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM
(data->decoded_data_istream), cancellable);
- else if (data->state < STATE_READ_DONE && nghttp2_session_want_read (data->io->session))
- base_source = g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM
(data->io->istream), cancellable);
- else {
- g_warn_if_reached ();
- base_source = g_timeout_source_new (0);
- }
-
- data->io_source = soup_message_io_source_new (base_source, G_OBJECT (data->msg),
- data->paused, message_source_check);
- g_source_set_callback (data->io_source, (GSourceFunc)io_poll_ready, data, NULL);
- g_source_set_priority (data->io_source, g_task_get_priority (data->task));
- g_source_attach (data->io_source, data->io->async_context);
-}
-
-
static void
client_stream_eof (SoupClientInputStream *stream,
gpointer user_data)
@@ -1219,94 +1338,17 @@ soup_client_message_io_http2_get_response_istream (SoupClientMessageIO *iface,
return client_stream;
}
-static gboolean
-io_read (SoupClientMessageIOHTTP2 *io,
- gboolean blocking,
- GCancellable *cancellable,
- GError **error)
-{
- guint8 buffer[8192];
- gssize read;
- int ret;
-
- if ((read = g_pollable_stream_read (io->istream, buffer, sizeof (buffer),
- blocking, cancellable, error)) < 0)
- return FALSE;
-
- ret = nghttp2_session_mem_recv (io->session, buffer, read);
- NGCHECK (ret);
- return ret != 0;
-}
-
-static gboolean
-io_write (SoupClientMessageIOHTTP2 *io,
- gboolean blocking,
- GCancellable *cancellable,
- GError **error)
-{
- /* We must write all of nghttp2's buffer before we ask for more */
-
- if (io->written_bytes == io->write_buffer_size)
- io->write_buffer = NULL;
-
- if (io->write_buffer == NULL) {
- io->written_bytes = 0;
- io->write_buffer_size = nghttp2_session_mem_send (io->session, (const
guint8**)&io->write_buffer);
- NGCHECK (io->write_buffer_size);
- if (io->write_buffer_size == 0) {
- /* Done */
- io->write_buffer = NULL;
- return TRUE;
- }
- }
-
- gssize ret = g_pollable_stream_write (io->ostream,
- io->write_buffer + io->written_bytes,
- io->write_buffer_size - io->written_bytes,
- blocking, cancellable, error);
- if (ret < 0)
- return FALSE;
-
- io->written_bytes += ret;
- return TRUE;
-}
-
-static void
-io_try_sniff_content (SoupHTTP2MessageData *data,
- gboolean blocking,
- GCancellable *cancellable)
-{
- GError *error = NULL;
-
- if (soup_message_try_sniff_content (data->msg, data->decoded_data_istream, blocking, cancellable,
&error)) {
- h2_debug (data->io, data, "[DATA] Sniffed content");
- advance_state_from (data, STATE_READ_DATA_START, STATE_READ_DATA);
- } else {
- h2_debug (data->io, data, "[DATA] Sniffer stream was not ready %s", error->message);
-
- g_clear_error (&error);
- }
-}
-
static gboolean
io_run (SoupHTTP2MessageData *data,
- gboolean blocking,
GCancellable *cancellable,
GError **error)
{
gboolean progress = FALSE;
- if (data->state == STATE_READ_DATA_START)
- io_try_sniff_content (data, blocking, cancellable);
-
if (data->state < STATE_WRITE_DONE && nghttp2_session_want_write (data->io->session))
- progress = io_write (data->io, blocking, cancellable, error);
- else if (data->state < STATE_READ_DONE && nghttp2_session_want_read (data->io->session)) {
- progress = io_read (data->io, blocking, cancellable, error);
-
- if (progress && data->state == STATE_READ_DATA_START)
- io_try_sniff_content (data, blocking, cancellable);
- }
+ progress = io_write (data->io, TRUE, cancellable, error);
+ else if (data->state < STATE_READ_DONE && nghttp2_session_want_read (data->io->session))
+ progress = io_read (data->io, TRUE, cancellable, error);
return progress;
}
@@ -1314,7 +1356,6 @@ io_run (SoupHTTP2MessageData *data,
static gboolean
io_run_until (SoupClientMessageIOHTTP2 *io,
SoupMessage *msg,
- gboolean blocking,
SoupHTTP2IOState state,
GCancellable *cancellable,
GError **error)
@@ -1334,8 +1375,8 @@ io_run_until (SoupClientMessageIOHTTP2 *io,
g_object_ref (msg);
- while (progress && get_io_data (msg) == io && !data->paused && !data->data_source_cancellable &&
data->state < state)
- progress = io_run (data, blocking, cancellable, &my_error);
+ while (progress && get_io_data (msg) == io && !data->paused && data->state < state)
+ progress = io_run (data, cancellable, &my_error);
if (my_error) {
g_propagate_error (error, my_error);
@@ -1359,89 +1400,10 @@ io_run_until (SoupClientMessageIOHTTP2 *io,
done = data->state >= state;
- if (data->paused || (!blocking && !done)) {
- g_set_error_literal (error, G_IO_ERROR,
- G_IO_ERROR_WOULD_BLOCK,
- _("Operation would block"));
- g_object_unref (msg);
- return FALSE;
- }
-
g_object_unref (msg);
return done;
}
-static gboolean
-io_write_until_stream_reset_is_sent_ready (GObject *stream,
- SoupHTTP2MessageData *data)
-{
- io_write_until_stream_reset_is_sent (data);
-
- return G_SOURCE_REMOVE;
-}
-
-static void
-io_write_until_stream_reset_is_sent (SoupHTTP2MessageData *data)
-{
- SoupClientMessageIOHTTP2 *io = data->io;
- GError *error = NULL;
-
- if (io->reset_stream_source) {
- g_source_destroy (io->reset_stream_source);
- g_clear_pointer (&io->reset_stream_source, g_source_unref);
- }
-
- while (g_hash_table_lookup (io->closed_messages, data)) {
- if (!nghttp2_session_want_write (io->session)) {
- error = g_error_new_literal (G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK, _("Operation would
block"));
- break;
- }
- if (!io_write (io, FALSE, FALSE, &error))
- break;
- }
-
- if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
- io->reset_stream_source = g_pollable_output_stream_create_source (G_POLLABLE_OUTPUT_STREAM
(io->ostream), NULL);
- g_source_set_callback (io->reset_stream_source,
(GSourceFunc)io_write_until_stream_reset_is_sent_ready, data, NULL);
- g_source_attach (io->reset_stream_source, g_main_context_get_thread_default ());
- }
-
- g_clear_error (&error);
-}
-
-static gboolean
-io_idle_read_ready (GObject *stream,
- SoupClientMessageIOHTTP2 *io)
-{
- io_idle_read (io);
-
- return G_SOURCE_REMOVE;
-}
-
-static void
-io_idle_read (SoupClientMessageIOHTTP2 *io)
-{
- GError *error = NULL;
-
- if (io->idle_read_source) {
- g_source_destroy (io->idle_read_source);
- g_clear_pointer (&io->idle_read_source, g_source_unref);
- }
-
- while (nghttp2_session_want_read (io->session)) {
- if (!io_read (io, FALSE, FALSE, &error))
- break;
- }
-
- if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
- io->idle_read_source = g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM
(io->istream), NULL);
- g_source_set_callback (io->idle_read_source, (GSourceFunc)io_idle_read_ready, io, NULL);
- g_source_attach (io->idle_read_source, g_main_context_get_thread_default ());
- }
-
- g_clear_error (&error);
-}
-
static gboolean
soup_client_message_io_http2_run_until_read (SoupClientMessageIO *iface,
SoupMessage *msg,
@@ -1451,7 +1413,7 @@ soup_client_message_io_http2_run_until_read (SoupClientMessageIO *iface,
SoupClientMessageIOHTTP2 *io = (SoupClientMessageIOHTTP2 *)iface;
SoupHTTP2MessageData *data = get_data_for_message (io, msg);
- if (io_run_until (io, msg, TRUE, STATE_READ_DATA, cancellable, error))
+ if (io_run_until (io, msg, STATE_READ_DATA, cancellable, error))
return TRUE;
if (get_io_data (msg) == io) {
@@ -1485,6 +1447,7 @@ soup_client_message_io_http2_skip (SoupClientMessageIO *iface,
h2_debug (io, data, "Skip");
NGCHECK (nghttp2_submit_rst_stream (io->session, NGHTTP2_FLAG_NONE, data->stream_id,
NGHTTP2_STREAM_CLOSED));
+ io_try_write (io);
return TRUE;
}
@@ -1496,55 +1459,6 @@ soup_client_message_io_http2_run (SoupClientMessageIO *iface,
g_assert_not_reached ();
}
-static void
-io_run_until_read_async (SoupHTTP2MessageData *data)
-{
- SoupClientMessageIOHTTP2 *io = data->io;
- GTask *task = data->task;
- GError *error = NULL;
-
- if (data->io_source) {
- g_source_destroy (data->io_source);
- g_clear_pointer (&data->io_source, g_source_unref);
- }
-
- data->in_run_until_read_async = TRUE;
-
- if (io_run_until (io, data->msg, FALSE,
- STATE_READ_DATA,
- g_task_get_cancellable (task),
- &error)) {
- data->task = NULL;
- data->in_run_until_read_async = FALSE;
-
- g_task_return_boolean (task, TRUE);
- g_object_unref (task);
- return;
- }
-
- if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
- g_error_free (error);
- io_poll (data);
- data->in_run_until_read_async = FALSE;
- return;
- }
-
- if (get_io_data (data->msg) == io) {
- if (data->can_be_restarted)
- data->item->state = SOUP_MESSAGE_RESTARTING;
- else
- soup_message_set_metrics_timestamp (data->msg, SOUP_MESSAGE_METRICS_RESPONSE_END);
-
- soup_client_message_io_http2_finished ((SoupClientMessageIO *)data->io, data->msg);
- }
-
- data->task = NULL;
- data->in_run_until_read_async = FALSE;
-
- g_task_return_error (task, error);
- g_object_unref (task);
-}
-
static void
soup_client_message_io_http2_run_until_read_async (SoupClientMessageIO *iface,
SoupMessage *msg,
@@ -1558,51 +1472,7 @@ soup_client_message_io_http2_run_until_read_async (SoupClientMessageIO *iface,
data->task = g_task_new (msg, cancellable, callback, user_data);
g_task_set_priority (data->task, io_priority);
- io_run_until_read_async (data);
-}
-
-static gboolean
-io_close_ready (GObject *stream,
- SoupClientMessageIOHTTP2 *io)
-{
- io_close (io);
-
- return G_SOURCE_REMOVE;
-}
-
-static void
-io_close (SoupClientMessageIOHTTP2 *io)
-{
- GError *error = NULL;
-
- if (io->close_source) {
- g_source_destroy (io->close_source);
- g_clear_pointer (&io->close_source, g_source_unref);
- }
-
- while (nghttp2_session_want_write (io->session)) {
- if (!io_write (io, FALSE, FALSE, &error))
- break;
- }
-
- if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
- g_error_free (error);
- io->close_source = g_pollable_output_stream_create_source (G_POLLABLE_OUTPUT_STREAM
(io->ostream), NULL);
- g_source_set_callback (io->close_source, (GSourceFunc)io_close_ready, io, NULL);
- g_source_attach (io->close_source, g_main_context_get_thread_default ());
- return;
- }
-
- if (io->close_task) {
- if (error)
- g_task_return_error (io->close_task, error);
- else
- g_task_return_boolean (io->close_task, TRUE);
- g_clear_object (&io->close_task);
- return;
- }
-
- g_clear_error (&error);
+ io->pending_io_messages = g_list_prepend (io->pending_io_messages, data);
}
static gboolean
@@ -1612,7 +1482,7 @@ soup_client_message_io_http2_close_async (SoupClientMessageIO *iface,
{
SoupClientMessageIOHTTP2 *io = (SoupClientMessageIOHTTP2 *)iface;
- if (io->goaway_sent && !io->close_source)
+ if (io->goaway_sent)
return FALSE;
g_assert (!io->close_task);
@@ -1626,24 +1496,20 @@ soup_client_message_io_http2_destroy (SoupClientMessageIO *iface)
{
SoupClientMessageIOHTTP2 *io = (SoupClientMessageIOHTTP2 *)iface;
- if (io->reset_stream_source) {
- g_source_destroy (io->reset_stream_source);
- g_clear_pointer (&io->reset_stream_source, g_source_unref);
- }
- if (io->idle_read_source) {
- g_source_destroy (io->idle_read_source);
- g_clear_pointer (&io->idle_read_source, g_source_unref);
+ if (io->read_source) {
+ g_source_destroy (io->read_source);
+ g_source_unref (io->read_source);
}
- if (io->close_source) {
- g_source_destroy (io->close_source);
- g_source_unref (io->close_source);
+ if (io->write_source) {
+ g_source_destroy (io->write_source);
+ g_source_unref (io->write_source);
}
g_clear_object (&io->stream);
g_clear_object (&io->close_task);
- g_clear_pointer (&io->async_context, g_main_context_unref);
g_clear_pointer (&io->session, nghttp2_session_del);
g_clear_pointer (&io->messages, g_hash_table_unref);
g_clear_pointer (&io->closed_messages, g_hash_table_unref);
+ g_clear_pointer (&io->pending_io_messages, g_list_free);
g_free (io);
}
@@ -1729,7 +1595,10 @@ soup_client_message_io_http2_new (GIOStream *stream, guint64 connection_id)
io->ostream = g_io_stream_get_output_stream (io->stream);
io->connection_id = connection_id;
- io->async_context = g_main_context_ref_thread_default ();
+ io->read_source = g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM (io->istream),
NULL);
+ g_source_set_name (io->read_source, "Soup HTTP/2 read source");
+ g_source_set_callback (io->read_source, (GSourceFunc)io_read_ready, io, NULL);
+ g_source_attach (io->read_source, g_main_context_get_thread_default ());
NGCHECK (nghttp2_session_set_local_window_size (io->session, NGHTTP2_FLAG_NONE, 0,
INITIAL_WINDOW_SIZE));
@@ -1739,6 +1608,7 @@ soup_client_message_io_http2_new (GIOStream *stream, guint64 connection_id)
{ NGHTTP2_SETTINGS_ENABLE_PUSH, 0 },
};
NGCHECK (nghttp2_submit_settings (io->session, NGHTTP2_FLAG_NONE, settings, G_N_ELEMENTS
(settings)));
+ io_try_write (io);
return (SoupClientMessageIO *)io;
}
diff --git a/libsoup/soup-message-private.h b/libsoup/soup-message-private.h
index 8513c48f..12fbfb42 100644
--- a/libsoup/soup-message-private.h
+++ b/libsoup/soup-message-private.h
@@ -103,6 +103,7 @@ SoupClientMessageIO *soup_message_get_io_data (SoupMessage *msg);
void soup_message_set_content_sniffer (SoupMessage *msg,
SoupContentSniffer *sniffer);
+gboolean soup_message_has_content_sniffer (SoupMessage *msg);
gboolean soup_message_try_sniff_content (SoupMessage *msg,
GInputStream *stream,
gboolean blocking,
diff --git a/libsoup/soup-message.c b/libsoup/soup-message.c
index bd5d34d5..79087e74 100644
--- a/libsoup/soup-message.c
+++ b/libsoup/soup-message.c
@@ -2323,6 +2323,14 @@ soup_message_set_content_sniffer (SoupMessage *msg, SoupContentSniffer *sniffer)
priv->sniffer = sniffer ? g_object_ref (sniffer) : NULL;
}
+gboolean
+soup_message_has_content_sniffer (SoupMessage *msg)
+{
+ SoupMessagePrivate *priv = soup_message_get_instance_private (msg);
+
+ return priv->sniffer != NULL;
+}
+
gboolean
soup_message_try_sniff_content (SoupMessage *msg,
GInputStream *stream,
diff --git a/tests/http2-body-stream-test.c b/tests/http2-body-stream-test.c
index e5aa3007..ee3218bc 100644
--- a/tests/http2-body-stream-test.c
+++ b/tests/http2-body-stream-test.c
@@ -26,8 +26,7 @@ do_large_data_test (void)
#define CHUNK_SIZE (gsize)1024 * 1024 * 512 // 512 MiB
#define TEST_SIZE CHUNK_SIZE * 20 // 10 GiB
- GInputStream *parent_stream = g_memory_input_stream_new ();
- GInputStream *stream = soup_body_input_stream_http2_new (G_POLLABLE_INPUT_STREAM (parent_stream));
+ GInputStream *stream = soup_body_input_stream_http2_new ();
SoupBodyInputStreamHttp2 *mem_stream = SOUP_BODY_INPUT_STREAM_HTTP2 (stream);
gsize data_needed = TEST_SIZE;
guint8 *memory_chunk = g_new (guint8, CHUNK_SIZE);
@@ -58,15 +57,13 @@ do_large_data_test (void)
g_free (trash_buffer);
g_free (memory_chunk);
- g_object_unref (parent_stream);
g_object_unref (stream);
}
static void
do_multiple_chunk_test (void)
{
- GInputStream *parent_stream = g_memory_input_stream_new ();
- GInputStream *stream = soup_body_input_stream_http2_new (G_POLLABLE_INPUT_STREAM (parent_stream));
+ GInputStream *stream = soup_body_input_stream_http2_new ();
SoupBodyInputStreamHttp2 *mem_stream = SOUP_BODY_INPUT_STREAM_HTTP2 (stream);
const char * const chunks[] = {
"1234", "5678", "9012", "hell", "owor", "ld..",
@@ -85,7 +82,6 @@ do_multiple_chunk_test (void)
g_assert_cmpstr (buffer, ==, chunks[i]);
}
- g_object_unref (parent_stream);
g_object_unref (stream);
}
@@ -104,8 +100,7 @@ on_skip_ready (GInputStream *stream, GAsyncResult *res, GMainLoop *loop)
static void
do_skip_async_test (void)
{
- GInputStream *parent_stream = g_memory_input_stream_new ();
- GInputStream *stream = soup_body_input_stream_http2_new (G_POLLABLE_INPUT_STREAM (parent_stream));
+ GInputStream *stream = soup_body_input_stream_http2_new ();
SoupBodyInputStreamHttp2 *bistream = SOUP_BODY_INPUT_STREAM_HTTP2 (stream);
GMainLoop *loop = g_main_loop_new (NULL, FALSE);
@@ -114,7 +109,6 @@ do_skip_async_test (void)
g_input_stream_skip_async (stream, 2, G_PRIORITY_DEFAULT, NULL, (GAsyncReadyCallback)on_skip_ready,
loop);
g_main_loop_run (loop);
- g_object_unref (parent_stream);
g_object_unref (stream);
g_main_loop_unref (loop);
}
diff --git a/tests/http2-test.c b/tests/http2-test.c
index 161782e5..de3b25b2 100644
--- a/tests/http2-test.c
+++ b/tests/http2-test.c
@@ -297,8 +297,7 @@ do_post_blocked_async_test (Test *test, gconstpointer data)
{
GMainContext *async_context = g_main_context_ref_thread_default ();
- GInputStream *parent_stream = g_memory_input_stream_new ();
- GInputStream *in_stream = soup_body_input_stream_http2_new (G_POLLABLE_INPUT_STREAM (parent_stream));
+ GInputStream *in_stream = soup_body_input_stream_http2_new ();
soup_body_input_stream_http2_add_data (SOUP_BODY_INPUT_STREAM_HTTP2 (in_stream), (guint8*)"Part 1
-", 8);
test->msg = soup_message_new (SOUP_METHOD_POST, "https://127.0.0.1:5000/echo_post");
@@ -307,10 +306,9 @@ do_post_blocked_async_test (Test *test, gconstpointer data)
GBytes *response = NULL;
soup_session_send_async (test->session, test->msg, G_PRIORITY_DEFAULT, NULL, on_send_complete,
&response);
- int iteration_count = 20;
while (!response) {
// Let it iterate for a bit waiting on blocked data
- if (iteration_count-- == 0) {
+ if (soup_body_input_stream_http2_is_blocked (SOUP_BODY_INPUT_STREAM_HTTP2 (in_stream))) {
soup_body_input_stream_http2_add_data (SOUP_BODY_INPUT_STREAM_HTTP2 (in_stream),
(guint8*)" Part 2", 8);
soup_body_input_stream_http2_complete (SOUP_BODY_INPUT_STREAM_HTTP2 (in_stream));
}
@@ -323,7 +321,6 @@ do_post_blocked_async_test (Test *test, gconstpointer data)
g_main_context_iteration (async_context, FALSE);
g_bytes_unref (response);
- g_object_unref (parent_stream);
g_object_unref (in_stream);
g_main_context_unref (async_context);
g_object_unref (test->msg);
@@ -869,8 +866,7 @@ do_sniffer_sync_test (Test *test, gconstpointer data)
soup_session_add_feature_by_type (test->session, SOUP_TYPE_CONTENT_SNIFFER);
do_one_sniffer_test (test->session, "https://127.0.0.1:5000/", 11, TRUE, NULL);
- /* FIXME: large seems to be broken in sync mode */
- /* do_one_sniffer_test (test->session, "https://127.0.0.1:5000/large", (1024 * 24) + 1, TRUE, NULL);
*/
+ do_one_sniffer_test (test->session, "https://127.0.0.1:5000/large", (1024 * 24) + 1, TRUE, NULL);
do_one_sniffer_test (test->session, "https://127.0.0.1:5000/no-content", 0, FALSE, NULL);
}
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]