[libsoup/carlosgc/thread-safe: 13/22] http2: make message IO thread safe
- From: Carlos Garcia Campos <carlosgc src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [libsoup/carlosgc/thread-safe: 13/22] http2: make message IO thread safe
- Date: Sun, 24 Apr 2022 08:19:43 +0000 (UTC)
commit aa7c625ae1529badea5fe8fff3a1b8c5f7e2e844
Author: Carlos Garcia Campos <cgarcia igalia com>
Date: Mon Apr 18 15:53:01 2022 +0200
http2: make message IO thread safe
nghttp2 session can't be used by multiple threads at the same time, so
we need to ensure that only messages from the same thread share the
connection. Connections in idle state can be reused from other threads,
though but we need to ensure all the pending IO is completed before
switching to another thread. When the connection switches to IN_USE
state, the current thread becomes the owner of the connection IO. In the
case of HTTP/2 there might be session IO not related to a particular
message, in that case a thread with no default context is considered
synchronous and all IO that is not explicitly sync or async will be
sync.
libsoup/http2/soup-client-message-io-http2.c | 93 +++++++++++++++++++++-------
libsoup/soup-client-message-io.c | 7 +++
libsoup/soup-client-message-io.h | 2 +
libsoup/soup-connection-manager.c | 5 +-
libsoup/soup-connection.c | 16 ++++-
libsoup/soup-connection.h | 1 +
6 files changed, 96 insertions(+), 28 deletions(-)
---
diff --git a/libsoup/http2/soup-client-message-io-http2.c b/libsoup/http2/soup-client-message-io-http2.c
index 5216693b..7b5cf02d 100644
--- a/libsoup/http2/soup-client-message-io-http2.c
+++ b/libsoup/http2/soup-client-message-io-http2.c
@@ -62,6 +62,8 @@ typedef enum {
typedef struct {
SoupClientMessageIO iface;
+ GThread *owner;
+ gboolean async;
SoupConnection *conn;
GIOStream *stream;
GInputStream *istream;
@@ -429,7 +431,7 @@ io_try_write (SoupClientMessageIOHTTP2 *io,
io_write (io, blocking, NULL, &error);
}
- if (io->in_callback || g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
+ if (!blocking && (io->in_callback || g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))) {
g_clear_error (&error);
io->write_source = g_pollable_output_stream_create_source (G_POLLABLE_OUTPUT_STREAM
(io->ostream), NULL);
g_source_set_name (io->write_source, "Soup HTTP/2 write source");
@@ -546,7 +548,7 @@ soup_client_message_io_http2_terminate_session (SoupClientMessageIOHTTP2 *io)
io->session_terminated = TRUE;
NGCHECK (nghttp2_session_terminate_session (io->session, NGHTTP2_NO_ERROR));
- io_try_write (io, FALSE);
+ io_try_write (io, !io->async);
}
/* HTTP2 read callbacks */
@@ -908,7 +910,11 @@ on_frame_send_callback (nghttp2_session *session,
break;
case NGHTTP2_RST_STREAM:
h2_debug (io, data, "[SEND] [RST_STREAM] stream_id=%u", frame->hd.stream_id);
- g_hash_table_foreach_remove (io->closed_messages, (GHRFunc)remove_closed_stream,
(gpointer)frame);
+ if (g_hash_table_foreach_remove (io->closed_messages, (GHRFunc)remove_closed_stream,
(gpointer)frame)) {
+ if (io->conn)
+ soup_connection_set_in_use (io->conn, FALSE);
+ }
+
break;
case NGHTTP2_GOAWAY:
h2_debug (io, data, "[SEND] [%s]", frame_type_to_string (frame->hd.type));
@@ -916,12 +922,17 @@ on_frame_send_callback (nghttp2_session *session,
if (io->close_task) {
GSource *source;
- /* Close in idle to ensure all pending io is finished first */
- source = g_idle_source_new ();
- g_source_set_name (source, "Soup HTTP/2 close source");
- g_source_set_callback (source, (GSourceFunc)close_in_idle_cb, io, NULL);
- g_source_attach (source, g_main_context_get_thread_default ());
- g_source_unref (source);
+ if (io->async) {
+ /* Close in idle to ensure all pending io is finished first */
+ source = g_idle_source_new ();
+ g_source_set_name (source, "Soup HTTP/2 close source");
+ g_source_set_callback (source, (GSourceFunc)close_in_idle_cb, io, NULL);
+ g_source_attach (source, g_main_context_get_thread_default ());
+ g_source_unref (source);
+ } else {
+ g_task_return_boolean (io->close_task, TRUE);
+ g_clear_object (&io->close_task);
+ }
}
break;
default:
@@ -1403,6 +1414,11 @@ soup_client_message_io_http2_finished (SoupClientMessageIO *iface,
g_warn_if_reached ();
if (!g_hash_table_add (io->closed_messages, data))
g_warn_if_reached ();
+
+ if (io->conn)
+ soup_connection_set_in_use (io->conn, TRUE);
+
+ io_try_write (io, !io->async);
} else {
if (!g_hash_table_remove (io->messages, msg))
g_warn_if_reached ();
@@ -1413,12 +1429,8 @@ soup_client_message_io_http2_finished (SoupClientMessageIO *iface,
g_object_unref (msg);
- if (io->is_shutdown) {
+ if (io->is_shutdown)
soup_client_message_io_http2_terminate_session (io);
- return;
- }
-
- io_try_write (io, FALSE);
}
static void
@@ -1553,12 +1565,13 @@ io_run (SoupHTTP2MessageData *data,
GCancellable *cancellable,
GError **error)
{
+ SoupClientMessageIOHTTP2 *io = data->io;
gboolean progress = FALSE;
- if (data->state < STATE_WRITE_DONE && nghttp2_session_want_write (data->io->session))
- progress = io_write (data->io, TRUE, cancellable, error);
- else if (data->state < STATE_READ_DONE && nghttp2_session_want_read (data->io->session))
- progress = io_read (data->io, TRUE, cancellable, error);
+ if (data->state < STATE_WRITE_DONE && !io->in_callback && nghttp2_session_want_write (io->session))
+ progress = io_write (io, TRUE, cancellable, error);
+ else if (data->state < STATE_READ_DONE && !io->in_callback && nghttp2_session_want_read
(io->session))
+ progress = io_read (io, TRUE, cancellable, error);
return progress;
}
@@ -1687,6 +1700,31 @@ soup_client_message_io_http2_run_until_read_async (SoupClientMessageIO *iface,
soup_http2_message_data_check_status (data);
}
+static void
+soup_client_message_io_http2_set_owner (SoupClientMessageIOHTTP2 *io,
+ GThread *owner)
+{
+ if (owner == io->owner)
+ return;
+
+ io->owner = owner;
+ g_assert (!io->write_source);
+ if (io->read_source) {
+ g_source_destroy (io->read_source);
+ g_source_unref (io->read_source);
+ io->read_source = NULL;
+ }
+
+ io->async = g_main_context_is_owner (g_main_context_get_thread_default ());
+ if (!io->async)
+ return;
+
+ io->read_source = g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM (io->istream),
NULL);
+ g_source_set_name (io->read_source, "Soup HTTP/2 read source");
+ g_source_set_callback (io->read_source, (GSourceFunc)io_read_ready, io, NULL);
+ g_source_attach (io->read_source, g_main_context_get_thread_default ());
+}
+
static gboolean
soup_client_message_io_http2_close_async (SoupClientMessageIO *iface,
SoupConnection *conn,
@@ -1697,6 +1735,7 @@ soup_client_message_io_http2_close_async (SoupClientMessageIO *iface,
if (io->goaway_sent)
return FALSE;
+ soup_client_message_io_http2_set_owner (io, g_thread_self ());
g_assert (!io->close_task);
io->close_task = g_task_new (conn, NULL, callback, NULL);
soup_client_message_io_http2_terminate_session (io);
@@ -1730,6 +1769,14 @@ soup_client_message_io_http2_destroy (SoupClientMessageIO *iface)
g_free (io);
}
+static void
+soup_client_message_io_http2_owner_changed (SoupClientMessageIO *iface)
+{
+ SoupClientMessageIOHTTP2 *io = (SoupClientMessageIOHTTP2 *)iface;
+
+ soup_client_message_io_http2_set_owner (io, g_thread_self ());
+}
+
static const SoupClientMessageIOFuncs io_funcs = {
soup_client_message_io_http2_destroy,
soup_client_message_io_http2_finished,
@@ -1747,7 +1794,8 @@ static const SoupClientMessageIOFuncs io_funcs = {
soup_client_message_io_http2_is_open,
soup_client_message_io_http2_in_progress,
soup_client_message_io_http2_is_reusable,
- soup_client_message_io_http2_get_cancellable
+ soup_client_message_io_http2_get_cancellable,
+ soup_client_message_io_http2_owner_changed
};
G_GNUC_PRINTF(1, 0)
@@ -1815,10 +1863,7 @@ soup_client_message_io_http2_new (SoupConnection *conn)
io->ostream = g_io_stream_get_output_stream (io->stream);
io->connection_id = soup_connection_get_id (conn);
- io->read_source = g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM (io->istream),
NULL);
- g_source_set_name (io->read_source, "Soup HTTP/2 read source");
- g_source_set_callback (io->read_source, (GSourceFunc)io_read_ready, io, NULL);
- g_source_attach (io->read_source, g_main_context_get_thread_default ());
+ soup_client_message_io_http2_set_owner (io, soup_connection_get_owner (conn));
NGCHECK (nghttp2_session_set_local_window_size (io->session, NGHTTP2_FLAG_NONE, 0,
INITIAL_WINDOW_SIZE));
@@ -1828,7 +1873,7 @@ soup_client_message_io_http2_new (SoupConnection *conn)
{ NGHTTP2_SETTINGS_ENABLE_PUSH, 0 },
};
NGCHECK (nghttp2_submit_settings (io->session, NGHTTP2_FLAG_NONE, settings, G_N_ELEMENTS
(settings)));
- io_try_write (io, FALSE);
+ io_try_write (io, !io->async);
return (SoupClientMessageIO *)io;
}
diff --git a/libsoup/soup-client-message-io.c b/libsoup/soup-client-message-io.c
index 5fb2c38a..a6a62a55 100644
--- a/libsoup/soup-client-message-io.c
+++ b/libsoup/soup-client-message-io.c
@@ -140,3 +140,10 @@ soup_client_message_io_get_cancellable (SoupClientMessageIO *io,
{
return io->funcs->get_cancellable (io, msg);
}
+
+void
+soup_client_message_io_owner_changed (SoupClientMessageIO *io)
+{
+ if (io->funcs->owner_changed)
+ io->funcs->owner_changed (io);
+}
diff --git a/libsoup/soup-client-message-io.h b/libsoup/soup-client-message-io.h
index ae41e6e8..25f67a2a 100644
--- a/libsoup/soup-client-message-io.h
+++ b/libsoup/soup-client-message-io.h
@@ -55,6 +55,7 @@ typedef struct {
gboolean (*is_reusable) (SoupClientMessageIO *io);
GCancellable *(*get_cancellable) (SoupClientMessageIO *io,
SoupMessage *msg);
+ void (*owner_changed) (SoupClientMessageIO *io);
} SoupClientMessageIOFuncs;
struct _SoupClientMessageIO {
@@ -105,3 +106,4 @@ gboolean soup_client_message_io_in_progress (SoupClientMessageIO
gboolean soup_client_message_io_is_reusable (SoupClientMessageIO *io);
GCancellable *soup_client_message_io_get_cancellable (SoupClientMessageIO *io,
SoupMessage *msg);
+void soup_client_message_io_owner_changed (SoupClientMessageIO *io);
diff --git a/libsoup/soup-connection-manager.c b/libsoup/soup-connection-manager.c
index 0feeb398..cad51a51 100644
--- a/libsoup/soup-connection-manager.c
+++ b/libsoup/soup-connection-manager.c
@@ -341,7 +341,6 @@ connection_state_changed (SoupConnection *conn,
GParamSpec *param,
SoupConnectionManager *manager)
{
-
if (soup_connection_get_state (conn) != SOUP_CONNECTION_IDLE)
return;
@@ -387,7 +386,7 @@ soup_connection_manager_get_connection_locked (SoupConnectionManager *manager,
switch (soup_connection_get_state (conn)) {
case SOUP_CONNECTION_IN_USE:
- if (!need_new_connection && http_version == SOUP_HTTP_2_0 &&
soup_connection_is_reusable (conn))
+ if (!need_new_connection && http_version == SOUP_HTTP_2_0 &&
soup_connection_get_owner (conn) == g_thread_self () && soup_connection_is_reusable (conn))
return conn;
break;
case SOUP_CONNECTION_IDLE:
@@ -401,7 +400,7 @@ soup_connection_manager_get_connection_locked (SoupConnectionManager *manager,
/* Always wait if we have a pending connection as it may be
* an h2 connection which will be shared. http/1.x connections
* will only be slightly delayed. */
- if (force_http_version > SOUP_HTTP_1_1 && !need_new_connection &&
!item->connect_only && item->async)
+ if (force_http_version > SOUP_HTTP_1_1 && !need_new_connection &&
!item->connect_only && item->async && soup_connection_get_owner (conn) == g_thread_self ())
return NULL;
default:
break;
diff --git a/libsoup/soup-connection.c b/libsoup/soup-connection.c
index 0c9788b9..64ee4afa 100644
--- a/libsoup/soup-connection.c
+++ b/libsoup/soup-connection.c
@@ -47,6 +47,7 @@ typedef struct {
GTlsCertificate *tls_client_cert;
GCancellable *cancellable;
+ GThread *owner;
} SoupConnectionPrivate;
G_DEFINE_FINAL_TYPE_WITH_PRIVATE (SoupConnection, soup_connection, G_TYPE_OBJECT)
@@ -96,6 +97,7 @@ soup_connection_init (SoupConnection *conn)
priv->http_version = SOUP_HTTP_1_1;
priv->force_http_version = G_MAXUINT8;
+ priv->owner = g_thread_self ();
}
static void
@@ -1136,8 +1138,12 @@ soup_connection_set_in_use (SoupConnection *conn,
if (in_use) {
g_atomic_int_inc (&priv->in_use);
- if (g_atomic_int_compare_and_exchange (&priv->state, SOUP_CONNECTION_IDLE,
SOUP_CONNECTION_IN_USE))
+ if (g_atomic_int_compare_and_exchange (&priv->state, SOUP_CONNECTION_IDLE,
SOUP_CONNECTION_IN_USE)) {
+ priv->owner = g_thread_self ();
+ soup_client_message_io_owner_changed (priv->io_data);
g_object_notify_by_pspec (G_OBJECT (conn), properties[PROP_STATE]);
+ }
+
return;
}
@@ -1350,3 +1356,11 @@ soup_connection_is_reusable (SoupConnection *conn)
return priv->io_data && soup_client_message_io_is_reusable (priv->io_data);
}
+
+GThread *
+soup_connection_get_owner (SoupConnection *conn)
+{
+ SoupConnectionPrivate *priv = soup_connection_get_instance_private (conn);
+
+ return priv->owner;
+}
diff --git a/libsoup/soup-connection.h b/libsoup/soup-connection.h
index e8e66d18..18a21749 100644
--- a/libsoup/soup-connection.h
+++ b/libsoup/soup-connection.h
@@ -85,6 +85,7 @@ guint64 soup_connection_get_id (SoupConnection
GSocketAddress *soup_connection_get_remote_address (SoupConnection *conn);
SoupHTTPVersion soup_connection_get_negotiated_protocol (SoupConnection *conn);
gboolean soup_connection_is_reusable (SoupConnection *conn);
+GThread *soup_connection_get_owner (SoupConnection *conn);
G_END_DECLS
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]