[libsoup/carlosgc/thread-safe: 11/19] http2: make message IO thread safe




commit 66519180d4d27feadff5bdbf11efa197818ed735
Author: Carlos Garcia Campos <cgarcia igalia com>
Date:   Mon Apr 18 15:53:01 2022 +0200

    http2: make message IO thread safe
    
    nghttp2 session can't be used by multiple threads at the same time, so
    we need to ensure that only messages from the same thread share the
    connection. Connections in idle state can be reused from other threads,
    though but we need to ensure all the pending IO is completed before
    switching to another thread. When the connection switches to IN_USE
    state, the current thread becomes the owner of the connection IO. In the
    case of HTTP/2 there might be session IO not related to a particular
    message, in that case a thread with no default context is considered
    synchronous and all IO that is not explicitly sync or async will be
    sync.

 libsoup/http2/soup-client-message-io-http2.c | 93 +++++++++++++++++++++-------
 libsoup/soup-client-message-io.c             |  7 +++
 libsoup/soup-client-message-io.h             |  2 +
 libsoup/soup-connection-manager.c            |  5 +-
 libsoup/soup-connection.c                    | 16 ++++-
 libsoup/soup-connection.h                    |  1 +
 6 files changed, 96 insertions(+), 28 deletions(-)
---
diff --git a/libsoup/http2/soup-client-message-io-http2.c b/libsoup/http2/soup-client-message-io-http2.c
index 67ef5006..6763d499 100644
--- a/libsoup/http2/soup-client-message-io-http2.c
+++ b/libsoup/http2/soup-client-message-io-http2.c
@@ -62,6 +62,8 @@ typedef enum {
 typedef struct {
         SoupClientMessageIO iface;
 
+        GThread *owner;
+        gboolean async;
         SoupConnection *conn;
         GIOStream *stream;
         GInputStream *istream;
@@ -429,7 +431,7 @@ io_try_write (SoupClientMessageIOHTTP2 *io,
                         io_write (io, blocking, NULL, &error);
         }
 
-        if (io->in_callback || g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
+        if (!blocking && (io->in_callback || g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))) {
                 g_clear_error (&error);
                 io->write_source = g_pollable_output_stream_create_source (G_POLLABLE_OUTPUT_STREAM 
(io->ostream), NULL);
                 g_source_set_name (io->write_source, "Soup HTTP/2 write source");
@@ -546,7 +548,7 @@ soup_client_message_io_http2_terminate_session (SoupClientMessageIOHTTP2 *io)
 
         io->session_terminated = TRUE;
         NGCHECK (nghttp2_session_terminate_session (io->session, NGHTTP2_NO_ERROR));
-        io_try_write (io, FALSE);
+        io_try_write (io, !io->async);
 }
 
 /* HTTP2 read callbacks */
@@ -908,7 +910,11 @@ on_frame_send_callback (nghttp2_session     *session,
                 break;
         case NGHTTP2_RST_STREAM:
                 h2_debug (io, data, "[SEND] [RST_STREAM] stream_id=%u", frame->hd.stream_id);
-                g_hash_table_foreach_remove (io->closed_messages, (GHRFunc)remove_closed_stream, 
(gpointer)frame);
+                if (g_hash_table_foreach_remove (io->closed_messages, (GHRFunc)remove_closed_stream, 
(gpointer)frame)) {
+                        if (io->conn)
+                                soup_connection_set_in_use (io->conn, FALSE);
+                }
+
                 break;
         case NGHTTP2_GOAWAY:
                 h2_debug (io, data, "[SEND] [%s]", frame_type_to_string (frame->hd.type));
@@ -916,12 +922,17 @@ on_frame_send_callback (nghttp2_session     *session,
                 if (io->close_task) {
                         GSource *source;
 
-                        /* Close in idle to ensure all pending io is finished first */
-                        source = g_idle_source_new ();
-                        g_source_set_name (source, "Soup HTTP/2 close source");
-                        g_source_set_callback (source, (GSourceFunc)close_in_idle_cb, io, NULL);
-                        g_source_attach (source, g_main_context_get_thread_default ());
-                        g_source_unref (source);
+                        if (io->async) {
+                                /* Close in idle to ensure all pending io is finished first */
+                                source = g_idle_source_new ();
+                                g_source_set_name (source, "Soup HTTP/2 close source");
+                                g_source_set_callback (source, (GSourceFunc)close_in_idle_cb, io, NULL);
+                                g_source_attach (source, g_main_context_get_thread_default ());
+                                g_source_unref (source);
+                        } else {
+                                g_task_return_boolean (io->close_task, TRUE);
+                                g_clear_object (&io->close_task);
+                        }
                 }
                 break;
         default:
@@ -1395,6 +1406,11 @@ soup_client_message_io_http2_finished (SoupClientMessageIO *iface,
                         g_warn_if_reached ();
                 if (!g_hash_table_add (io->closed_messages, data))
                         g_warn_if_reached ();
+
+                if (io->conn)
+                        soup_connection_set_in_use (io->conn, TRUE);
+
+                io_try_write (io, !io->async);
         } else {
                 if (!g_hash_table_remove (io->messages, msg))
                         g_warn_if_reached ();
@@ -1405,12 +1421,8 @@ soup_client_message_io_http2_finished (SoupClientMessageIO *iface,
 
        g_object_unref (msg);
 
-        if (io->is_shutdown) {
+        if (io->is_shutdown)
                 soup_client_message_io_http2_terminate_session (io);
-                return;
-        }
-
-        io_try_write (io, FALSE);
 }
 
 static void
@@ -1545,12 +1557,13 @@ io_run (SoupHTTP2MessageData *data,
         GCancellable         *cancellable,
         GError              **error)
 {
+        SoupClientMessageIOHTTP2 *io = data->io;
         gboolean progress = FALSE;
 
-        if (data->state < STATE_WRITE_DONE && nghttp2_session_want_write (data->io->session))
-                progress = io_write (data->io, TRUE, cancellable, error);
-        else if (data->state < STATE_READ_DONE && nghttp2_session_want_read (data->io->session))
-                progress = io_read (data->io, TRUE, cancellable, error);
+        if (data->state < STATE_WRITE_DONE && !io->in_callback && nghttp2_session_want_write (io->session))
+                progress = io_write (io, TRUE, cancellable, error);
+        else if (data->state < STATE_READ_DONE && !io->in_callback && nghttp2_session_want_read 
(io->session))
+                progress = io_read (io, TRUE, cancellable, error);
 
         return progress;
 }
@@ -1679,6 +1692,31 @@ soup_client_message_io_http2_run_until_read_async (SoupClientMessageIO *iface,
                 soup_http2_message_data_check_status (data);
 }
 
+static void
+soup_client_message_io_http2_set_owner (SoupClientMessageIOHTTP2 *io,
+                                        GThread                  *owner)
+{
+        if (owner == io->owner)
+                return;
+
+        io->owner = owner;
+        g_assert (!io->write_source);
+        if (io->read_source) {
+                g_source_destroy (io->read_source);
+                g_source_unref (io->read_source);
+                io->read_source = NULL;
+        }
+
+        io->async = g_main_context_is_owner (g_main_context_get_thread_default ());
+        if (!io->async)
+                return;
+
+        io->read_source = g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM (io->istream), 
NULL);
+        g_source_set_name (io->read_source, "Soup HTTP/2 read source");
+        g_source_set_callback (io->read_source, (GSourceFunc)io_read_ready, io, NULL);
+        g_source_attach (io->read_source, g_main_context_get_thread_default ());
+}
+
 static gboolean
 soup_client_message_io_http2_close_async (SoupClientMessageIO *iface,
                                           SoupConnection      *conn,
@@ -1689,6 +1727,7 @@ soup_client_message_io_http2_close_async (SoupClientMessageIO *iface,
         if (io->goaway_sent)
                 return FALSE;
 
+        soup_client_message_io_http2_set_owner (io, g_thread_self ());
         g_assert (!io->close_task);
         io->close_task = g_task_new (conn, NULL, callback, NULL);
         soup_client_message_io_http2_terminate_session (io);
@@ -1722,6 +1761,14 @@ soup_client_message_io_http2_destroy (SoupClientMessageIO *iface)
         g_free (io);
 }
 
+static void
+soup_client_message_io_http2_owner_changed (SoupClientMessageIO *iface)
+{
+        SoupClientMessageIOHTTP2 *io = (SoupClientMessageIOHTTP2 *)iface;
+
+        soup_client_message_io_http2_set_owner (io, g_thread_self ());
+}
+
 static const SoupClientMessageIOFuncs io_funcs = {
         soup_client_message_io_http2_destroy,
         soup_client_message_io_http2_finished,
@@ -1739,7 +1786,8 @@ static const SoupClientMessageIOFuncs io_funcs = {
         soup_client_message_io_http2_is_open,
         soup_client_message_io_http2_in_progress,
         soup_client_message_io_http2_is_reusable,
-        soup_client_message_io_http2_get_cancellable
+        soup_client_message_io_http2_get_cancellable,
+        soup_client_message_io_http2_owner_changed
 };
 
 G_GNUC_PRINTF(1, 0)
@@ -1807,10 +1855,7 @@ soup_client_message_io_http2_new (SoupConnection *conn)
         io->ostream = g_io_stream_get_output_stream (io->stream);
         io->connection_id = soup_connection_get_id (conn);
 
-        io->read_source = g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM (io->istream), 
NULL);
-        g_source_set_name (io->read_source, "Soup HTTP/2 read source");
-        g_source_set_callback (io->read_source, (GSourceFunc)io_read_ready, io, NULL);
-        g_source_attach (io->read_source, g_main_context_get_thread_default ());
+        soup_client_message_io_http2_set_owner (io, soup_connection_get_owner (conn));
 
         NGCHECK (nghttp2_session_set_local_window_size (io->session, NGHTTP2_FLAG_NONE, 0, 
INITIAL_WINDOW_SIZE));
 
@@ -1820,7 +1865,7 @@ soup_client_message_io_http2_new (SoupConnection *conn)
                 { NGHTTP2_SETTINGS_ENABLE_PUSH, 0 },
         };
         NGCHECK (nghttp2_submit_settings (io->session, NGHTTP2_FLAG_NONE, settings, G_N_ELEMENTS 
(settings)));
-        io_try_write (io, FALSE);
+        io_try_write (io, !io->async);
 
         return (SoupClientMessageIO *)io;
 }
diff --git a/libsoup/soup-client-message-io.c b/libsoup/soup-client-message-io.c
index 5fb2c38a..a6a62a55 100644
--- a/libsoup/soup-client-message-io.c
+++ b/libsoup/soup-client-message-io.c
@@ -140,3 +140,10 @@ soup_client_message_io_get_cancellable (SoupClientMessageIO *io,
 {
         return io->funcs->get_cancellable (io, msg);
 }
+
+void
+soup_client_message_io_owner_changed (SoupClientMessageIO *io)
+{
+        if (io->funcs->owner_changed)
+                io->funcs->owner_changed (io);
+}
diff --git a/libsoup/soup-client-message-io.h b/libsoup/soup-client-message-io.h
index ae41e6e8..25f67a2a 100644
--- a/libsoup/soup-client-message-io.h
+++ b/libsoup/soup-client-message-io.h
@@ -55,6 +55,7 @@ typedef struct {
         gboolean      (*is_reusable)          (SoupClientMessageIO       *io);
         GCancellable *(*get_cancellable)      (SoupClientMessageIO       *io,
                                                SoupMessage               *msg);
+        void          (*owner_changed)        (SoupClientMessageIO       *io);
 } SoupClientMessageIOFuncs;
 
 struct _SoupClientMessageIO {
@@ -105,3 +106,4 @@ gboolean      soup_client_message_io_in_progress          (SoupClientMessageIO
 gboolean      soup_client_message_io_is_reusable          (SoupClientMessageIO       *io);
 GCancellable *soup_client_message_io_get_cancellable      (SoupClientMessageIO       *io,
                                                            SoupMessage               *msg);
+void          soup_client_message_io_owner_changed        (SoupClientMessageIO       *io);
diff --git a/libsoup/soup-connection-manager.c b/libsoup/soup-connection-manager.c
index 0feeb398..cad51a51 100644
--- a/libsoup/soup-connection-manager.c
+++ b/libsoup/soup-connection-manager.c
@@ -341,7 +341,6 @@ connection_state_changed (SoupConnection        *conn,
                           GParamSpec            *param,
                           SoupConnectionManager *manager)
 {
-
         if (soup_connection_get_state (conn) != SOUP_CONNECTION_IDLE)
                 return;
 
@@ -387,7 +386,7 @@ soup_connection_manager_get_connection_locked (SoupConnectionManager *manager,
 
                         switch (soup_connection_get_state (conn)) {
                         case SOUP_CONNECTION_IN_USE:
-                                if (!need_new_connection && http_version == SOUP_HTTP_2_0 && 
soup_connection_is_reusable (conn))
+                                if (!need_new_connection && http_version == SOUP_HTTP_2_0 && 
soup_connection_get_owner (conn) == g_thread_self () && soup_connection_is_reusable (conn))
                                         return conn;
                                 break;
                         case SOUP_CONNECTION_IDLE:
@@ -401,7 +400,7 @@ soup_connection_manager_get_connection_locked (SoupConnectionManager *manager,
                                 /* Always wait if we have a pending connection as it may be
                                  * an h2 connection which will be shared. http/1.x connections
                                  * will only be slightly delayed. */
-                                if (force_http_version > SOUP_HTTP_1_1 && !need_new_connection && 
!item->connect_only && item->async)
+                                if (force_http_version > SOUP_HTTP_1_1 && !need_new_connection && 
!item->connect_only && item->async && soup_connection_get_owner (conn) == g_thread_self ())
                                         return NULL;
                         default:
                                 break;
diff --git a/libsoup/soup-connection.c b/libsoup/soup-connection.c
index 0c9788b9..64ee4afa 100644
--- a/libsoup/soup-connection.c
+++ b/libsoup/soup-connection.c
@@ -47,6 +47,7 @@ typedef struct {
         GTlsCertificate *tls_client_cert;
 
        GCancellable *cancellable;
+        GThread *owner;
 } SoupConnectionPrivate;
 
 G_DEFINE_FINAL_TYPE_WITH_PRIVATE (SoupConnection, soup_connection, G_TYPE_OBJECT)
@@ -96,6 +97,7 @@ soup_connection_init (SoupConnection *conn)
 
         priv->http_version = SOUP_HTTP_1_1;
         priv->force_http_version = G_MAXUINT8;
+        priv->owner = g_thread_self ();
 }
 
 static void
@@ -1136,8 +1138,12 @@ soup_connection_set_in_use (SoupConnection *conn,
 
         if (in_use) {
                 g_atomic_int_inc (&priv->in_use);
-                if (g_atomic_int_compare_and_exchange (&priv->state, SOUP_CONNECTION_IDLE, 
SOUP_CONNECTION_IN_USE))
+                if (g_atomic_int_compare_and_exchange (&priv->state, SOUP_CONNECTION_IDLE, 
SOUP_CONNECTION_IN_USE)) {
+                        priv->owner = g_thread_self ();
+                        soup_client_message_io_owner_changed (priv->io_data);
                         g_object_notify_by_pspec (G_OBJECT (conn), properties[PROP_STATE]);
+                }
+
                 return;
         }
 
@@ -1350,3 +1356,11 @@ soup_connection_is_reusable (SoupConnection *conn)
 
         return priv->io_data && soup_client_message_io_is_reusable (priv->io_data);
 }
+
+GThread *
+soup_connection_get_owner (SoupConnection *conn)
+{
+        SoupConnectionPrivate *priv = soup_connection_get_instance_private (conn);
+
+        return priv->owner;
+}
diff --git a/libsoup/soup-connection.h b/libsoup/soup-connection.h
index e8e66d18..18a21749 100644
--- a/libsoup/soup-connection.h
+++ b/libsoup/soup-connection.h
@@ -85,6 +85,7 @@ guint64              soup_connection_get_id                     (SoupConnection
 GSocketAddress      *soup_connection_get_remote_address         (SoupConnection *conn);
 SoupHTTPVersion      soup_connection_get_negotiated_protocol    (SoupConnection *conn);
 gboolean             soup_connection_is_reusable                (SoupConnection *conn);
+GThread             *soup_connection_get_owner                  (SoupConnection *conn);
 
 G_END_DECLS
 


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]