[libsoup/carlosgc/thread-safe: 20/32] http2: make message IO thread safe
- From: Carlos Garcia Campos <carlosgc src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [libsoup/carlosgc/thread-safe: 20/32] http2: make message IO thread safe
- Date: Wed, 8 Jun 2022 10:38:14 +0000 (UTC)
commit e3333be150200eba59220ad1ff82c3e3507bf78c
Author: Carlos Garcia Campos <cgarcia igalia com>
Date: Mon Apr 18 15:53:01 2022 +0200
http2: make message IO thread safe
nghttp2 session can't be used by multiple threads at the same time, so
we need to ensure that only messages from the same thread share the
connection. Connections in idle state can be reused from other threads,
though but we need to ensure all the pending IO is completed before
switching to another thread. When the connection switches to IN_USE
state, the current thread becomes the owner of the connection IO. In the
case of HTTP/2 there might be session IO not related to a particular
message, in that case a thread with no default context is considered
synchronous and all IO that is not explicitly sync or async will be
sync.
libsoup/http2/soup-client-message-io-http2.c | 90 +++++++++++++++++++++-------
libsoup/soup-client-message-io.c | 7 +++
libsoup/soup-client-message-io.h | 2 +
libsoup/soup-connection-manager.c | 5 +-
libsoup/soup-connection.c | 16 ++++-
libsoup/soup-connection.h | 1 +
6 files changed, 96 insertions(+), 25 deletions(-)
---
diff --git a/libsoup/http2/soup-client-message-io-http2.c b/libsoup/http2/soup-client-message-io-http2.c
index 16e46b02..fe6aeaab 100644
--- a/libsoup/http2/soup-client-message-io-http2.c
+++ b/libsoup/http2/soup-client-message-io-http2.c
@@ -62,6 +62,8 @@ typedef enum {
typedef struct {
SoupClientMessageIO iface;
+ GThread *owner;
+ gboolean async;
SoupConnection *conn;
GIOStream *stream;
GInputStream *istream;
@@ -429,7 +431,7 @@ io_try_write (SoupClientMessageIOHTTP2 *io,
io_write (io, blocking, NULL, &error);
}
- if (io->in_callback || g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
+ if (!blocking && (io->in_callback || g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))) {
g_clear_error (&error);
io->write_source = g_pollable_output_stream_create_source (G_POLLABLE_OUTPUT_STREAM
(io->ostream), NULL);
g_source_set_name (io->write_source, "Soup HTTP/2 write source");
@@ -546,7 +548,7 @@ soup_client_message_io_http2_terminate_session (SoupClientMessageIOHTTP2 *io)
io->session_terminated = TRUE;
NGCHECK (nghttp2_session_terminate_session (io->session, NGHTTP2_NO_ERROR));
- io_try_write (io, FALSE);
+ io_try_write (io, !io->async);
}
/* HTTP2 read callbacks */
@@ -908,7 +910,11 @@ on_frame_send_callback (nghttp2_session *session,
break;
case NGHTTP2_RST_STREAM:
h2_debug (io, data, "[SEND] [RST_STREAM] stream_id=%u", frame->hd.stream_id);
- g_hash_table_foreach_remove (io->closed_messages, (GHRFunc)remove_closed_stream,
(gpointer)frame);
+ if (g_hash_table_foreach_remove (io->closed_messages, (GHRFunc)remove_closed_stream,
(gpointer)frame)) {
+ if (io->conn)
+ soup_connection_set_in_use (io->conn, FALSE);
+ }
+
break;
case NGHTTP2_GOAWAY:
h2_debug (io, data, "[SEND] [%s]", frame_type_to_string (frame->hd.type));
@@ -920,7 +926,7 @@ on_frame_send_callback (nghttp2_session *session,
source = g_idle_source_new ();
g_source_set_name (source, "Soup HTTP/2 close source");
g_source_set_callback (source, (GSourceFunc)close_in_idle_cb, io, NULL);
- g_source_attach (source, g_main_context_get_thread_default ());
+ g_source_attach (source, g_task_get_context (io->close_task));
g_source_unref (source);
}
break;
@@ -1428,6 +1434,11 @@ soup_client_message_io_http2_finished (SoupClientMessageIO *iface,
g_warn_if_reached ();
if (!g_hash_table_add (io->closed_messages, data))
g_warn_if_reached ();
+
+ if (io->conn)
+ soup_connection_set_in_use (io->conn, TRUE);
+
+ io_try_write (io, !io->async);
} else {
if (!g_hash_table_remove (io->messages, msg))
g_warn_if_reached ();
@@ -1438,12 +1449,8 @@ soup_client_message_io_http2_finished (SoupClientMessageIO *iface,
g_object_unref (msg);
- if (io->is_shutdown) {
+ if (io->is_shutdown)
soup_client_message_io_http2_terminate_session (io);
- return;
- }
-
- io_try_write (io, FALSE);
}
static void
@@ -1578,12 +1585,13 @@ io_run (SoupHTTP2MessageData *data,
GCancellable *cancellable,
GError **error)
{
+ SoupClientMessageIOHTTP2 *io = data->io;
gboolean progress = FALSE;
- if (data->state < STATE_WRITE_DONE && nghttp2_session_want_write (data->io->session))
- progress = io_write (data->io, TRUE, cancellable, error);
- else if (data->state < STATE_READ_DONE && nghttp2_session_want_read (data->io->session))
- progress = io_read (data->io, TRUE, cancellable, error);
+ if (data->state < STATE_WRITE_DONE && !io->in_callback && nghttp2_session_want_write (io->session))
+ progress = io_write (io, TRUE, cancellable, error);
+ else if (data->state < STATE_READ_DONE && !io->in_callback && nghttp2_session_want_read
(io->session))
+ progress = io_read (io, TRUE, cancellable, error);
return progress;
}
@@ -1712,6 +1720,31 @@ soup_client_message_io_http2_run_until_read_async (SoupClientMessageIO *iface,
soup_http2_message_data_check_status (data);
}
+static void
+soup_client_message_io_http2_set_owner (SoupClientMessageIOHTTP2 *io,
+ GThread *owner)
+{
+ if (owner == io->owner)
+ return;
+
+ io->owner = owner;
+ g_assert (!io->write_source);
+ if (io->read_source) {
+ g_source_destroy (io->read_source);
+ g_source_unref (io->read_source);
+ io->read_source = NULL;
+ }
+
+ io->async = g_main_context_is_owner (g_main_context_get_thread_default ());
+ if (!io->async)
+ return;
+
+ io->read_source = g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM (io->istream),
NULL);
+ g_source_set_name (io->read_source, "Soup HTTP/2 read source");
+ g_source_set_callback (io->read_source, (GSourceFunc)io_read_ready, io, NULL);
+ g_source_attach (io->read_source, g_main_context_get_thread_default ());
+}
+
static gboolean
soup_client_message_io_http2_close_async (SoupClientMessageIO *iface,
SoupConnection *conn,
@@ -1722,9 +1755,18 @@ soup_client_message_io_http2_close_async (SoupClientMessageIO *iface,
if (io->goaway_sent)
return FALSE;
- g_assert (!io->close_task);
- io->close_task = g_task_new (conn, NULL, callback, NULL);
+ soup_client_message_io_http2_set_owner (io, g_thread_self ());
+ if (io->async) {
+ g_assert (!io->close_task);
+ io->close_task = g_task_new (conn, NULL, callback, NULL);
+ }
+
soup_client_message_io_http2_terminate_session (io);
+ if (!io->async) {
+ g_assert (io->goaway_sent);
+ return FALSE;
+ }
+
return TRUE;
}
@@ -1755,6 +1797,14 @@ soup_client_message_io_http2_destroy (SoupClientMessageIO *iface)
g_free (io);
}
+static void
+soup_client_message_io_http2_owner_changed (SoupClientMessageIO *iface)
+{
+ SoupClientMessageIOHTTP2 *io = (SoupClientMessageIOHTTP2 *)iface;
+
+ soup_client_message_io_http2_set_owner (io, g_thread_self ());
+}
+
static const SoupClientMessageIOFuncs io_funcs = {
soup_client_message_io_http2_destroy,
soup_client_message_io_http2_finished,
@@ -1772,7 +1822,8 @@ static const SoupClientMessageIOFuncs io_funcs = {
soup_client_message_io_http2_is_open,
soup_client_message_io_http2_in_progress,
soup_client_message_io_http2_is_reusable,
- soup_client_message_io_http2_get_cancellable
+ soup_client_message_io_http2_get_cancellable,
+ soup_client_message_io_http2_owner_changed
};
G_GNUC_PRINTF(1, 0)
@@ -1840,10 +1891,7 @@ soup_client_message_io_http2_new (SoupConnection *conn)
io->ostream = g_io_stream_get_output_stream (io->stream);
io->connection_id = soup_connection_get_id (conn);
- io->read_source = g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM (io->istream),
NULL);
- g_source_set_name (io->read_source, "Soup HTTP/2 read source");
- g_source_set_callback (io->read_source, (GSourceFunc)io_read_ready, io, NULL);
- g_source_attach (io->read_source, g_main_context_get_thread_default ());
+ soup_client_message_io_http2_set_owner (io, soup_connection_get_owner (conn));
NGCHECK (nghttp2_session_set_local_window_size (io->session, NGHTTP2_FLAG_NONE, 0,
INITIAL_WINDOW_SIZE));
@@ -1853,7 +1901,7 @@ soup_client_message_io_http2_new (SoupConnection *conn)
{ NGHTTP2_SETTINGS_ENABLE_PUSH, 0 },
};
NGCHECK (nghttp2_submit_settings (io->session, NGHTTP2_FLAG_NONE, settings, G_N_ELEMENTS
(settings)));
- io_try_write (io, FALSE);
+ io_try_write (io, !io->async);
return (SoupClientMessageIO *)io;
}
diff --git a/libsoup/soup-client-message-io.c b/libsoup/soup-client-message-io.c
index 5fb2c38a..a6a62a55 100644
--- a/libsoup/soup-client-message-io.c
+++ b/libsoup/soup-client-message-io.c
@@ -140,3 +140,10 @@ soup_client_message_io_get_cancellable (SoupClientMessageIO *io,
{
return io->funcs->get_cancellable (io, msg);
}
+
+void
+soup_client_message_io_owner_changed (SoupClientMessageIO *io)
+{
+ if (io->funcs->owner_changed)
+ io->funcs->owner_changed (io);
+}
diff --git a/libsoup/soup-client-message-io.h b/libsoup/soup-client-message-io.h
index ae41e6e8..25f67a2a 100644
--- a/libsoup/soup-client-message-io.h
+++ b/libsoup/soup-client-message-io.h
@@ -55,6 +55,7 @@ typedef struct {
gboolean (*is_reusable) (SoupClientMessageIO *io);
GCancellable *(*get_cancellable) (SoupClientMessageIO *io,
SoupMessage *msg);
+ void (*owner_changed) (SoupClientMessageIO *io);
} SoupClientMessageIOFuncs;
struct _SoupClientMessageIO {
@@ -105,3 +106,4 @@ gboolean soup_client_message_io_in_progress (SoupClientMessageIO
gboolean soup_client_message_io_is_reusable (SoupClientMessageIO *io);
GCancellable *soup_client_message_io_get_cancellable (SoupClientMessageIO *io,
SoupMessage *msg);
+void soup_client_message_io_owner_changed (SoupClientMessageIO *io);
diff --git a/libsoup/soup-connection-manager.c b/libsoup/soup-connection-manager.c
index c64d7e31..bdf4d291 100644
--- a/libsoup/soup-connection-manager.c
+++ b/libsoup/soup-connection-manager.c
@@ -341,7 +341,6 @@ connection_state_changed (SoupConnection *conn,
GParamSpec *param,
SoupConnectionManager *manager)
{
-
if (soup_connection_get_state (conn) != SOUP_CONNECTION_IDLE)
return;
@@ -387,7 +386,7 @@ soup_connection_manager_get_connection_locked (SoupConnectionManager *manager,
switch (soup_connection_get_state (conn)) {
case SOUP_CONNECTION_IN_USE:
- if (!need_new_connection && http_version == SOUP_HTTP_2_0 &&
soup_connection_is_reusable (conn))
+ if (!need_new_connection && http_version == SOUP_HTTP_2_0 &&
soup_connection_get_owner (conn) == g_thread_self () && soup_connection_is_reusable (conn))
return conn;
break;
case SOUP_CONNECTION_IDLE:
@@ -401,7 +400,7 @@ soup_connection_manager_get_connection_locked (SoupConnectionManager *manager,
/* Always wait if we have a pending connection as it may be
* an h2 connection which will be shared. http/1.x connections
* will only be slightly delayed. */
- if (force_http_version > SOUP_HTTP_1_1 && !need_new_connection &&
!item->connect_only && item->async)
+ if (force_http_version > SOUP_HTTP_1_1 && !need_new_connection &&
!item->connect_only && item->async && soup_connection_get_owner (conn) == g_thread_self ())
return NULL;
default:
break;
diff --git a/libsoup/soup-connection.c b/libsoup/soup-connection.c
index 0c9788b9..64ee4afa 100644
--- a/libsoup/soup-connection.c
+++ b/libsoup/soup-connection.c
@@ -47,6 +47,7 @@ typedef struct {
GTlsCertificate *tls_client_cert;
GCancellable *cancellable;
+ GThread *owner;
} SoupConnectionPrivate;
G_DEFINE_FINAL_TYPE_WITH_PRIVATE (SoupConnection, soup_connection, G_TYPE_OBJECT)
@@ -96,6 +97,7 @@ soup_connection_init (SoupConnection *conn)
priv->http_version = SOUP_HTTP_1_1;
priv->force_http_version = G_MAXUINT8;
+ priv->owner = g_thread_self ();
}
static void
@@ -1136,8 +1138,12 @@ soup_connection_set_in_use (SoupConnection *conn,
if (in_use) {
g_atomic_int_inc (&priv->in_use);
- if (g_atomic_int_compare_and_exchange (&priv->state, SOUP_CONNECTION_IDLE,
SOUP_CONNECTION_IN_USE))
+ if (g_atomic_int_compare_and_exchange (&priv->state, SOUP_CONNECTION_IDLE,
SOUP_CONNECTION_IN_USE)) {
+ priv->owner = g_thread_self ();
+ soup_client_message_io_owner_changed (priv->io_data);
g_object_notify_by_pspec (G_OBJECT (conn), properties[PROP_STATE]);
+ }
+
return;
}
@@ -1350,3 +1356,11 @@ soup_connection_is_reusable (SoupConnection *conn)
return priv->io_data && soup_client_message_io_is_reusable (priv->io_data);
}
+
+GThread *
+soup_connection_get_owner (SoupConnection *conn)
+{
+ SoupConnectionPrivate *priv = soup_connection_get_instance_private (conn);
+
+ return priv->owner;
+}
diff --git a/libsoup/soup-connection.h b/libsoup/soup-connection.h
index e8e66d18..18a21749 100644
--- a/libsoup/soup-connection.h
+++ b/libsoup/soup-connection.h
@@ -85,6 +85,7 @@ guint64 soup_connection_get_id (SoupConnection
GSocketAddress *soup_connection_get_remote_address (SoupConnection *conn);
SoupHTTPVersion soup_connection_get_negotiated_protocol (SoupConnection *conn);
gboolean soup_connection_is_reusable (SoupConnection *conn);
+GThread *soup_connection_get_owner (SoupConnection *conn);
G_END_DECLS
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]