[libsoup/carlosgc/thread-safe: 9/19] http2: make message IO thread safe




commit fbba3e3adc90157ecd66681c0b79e084248af2ff
Author: Carlos Garcia Campos <cgarcia igalia com>
Date:   Mon Apr 18 15:53:01 2022 +0200

    http2: make message IO thread safe
    
    nghttp2 session can't be used by multiple threads at the same time, so
    we need to ensure that only messages from the same thread share the
    connection. Connections in idle state can be reused from other threads,
    though but we need to ensure all the pending IO is completed before
    switching to another thread. When the connection switches to IN_USE
    state, the current thread becomes the owner of the connection IO. In the
    case of HTTP/2 there might be session IO not related to a particular
    message, in that case a thread with no default context is considered
    synchronous and all IO that is not explicitly sync or async will be
    sync.

 libsoup/http2/soup-client-message-io-http2.c | 90 +++++++++++++++++++++-------
 libsoup/soup-client-message-io.c             |  7 +++
 libsoup/soup-client-message-io.h             |  2 +
 libsoup/soup-connection-manager.c            |  5 +-
 libsoup/soup-connection.c                    | 16 ++++-
 libsoup/soup-connection.h                    |  1 +
 6 files changed, 96 insertions(+), 25 deletions(-)
---
diff --git a/libsoup/http2/soup-client-message-io-http2.c b/libsoup/http2/soup-client-message-io-http2.c
index 5216693b..0d401bc1 100644
--- a/libsoup/http2/soup-client-message-io-http2.c
+++ b/libsoup/http2/soup-client-message-io-http2.c
@@ -62,6 +62,8 @@ typedef enum {
 typedef struct {
         SoupClientMessageIO iface;
 
+        GThread *owner;
+        gboolean async;
         SoupConnection *conn;
         GIOStream *stream;
         GInputStream *istream;
@@ -429,7 +431,7 @@ io_try_write (SoupClientMessageIOHTTP2 *io,
                         io_write (io, blocking, NULL, &error);
         }
 
-        if (io->in_callback || g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
+        if (!blocking && (io->in_callback || g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))) {
                 g_clear_error (&error);
                 io->write_source = g_pollable_output_stream_create_source (G_POLLABLE_OUTPUT_STREAM 
(io->ostream), NULL);
                 g_source_set_name (io->write_source, "Soup HTTP/2 write source");
@@ -546,7 +548,7 @@ soup_client_message_io_http2_terminate_session (SoupClientMessageIOHTTP2 *io)
 
         io->session_terminated = TRUE;
         NGCHECK (nghttp2_session_terminate_session (io->session, NGHTTP2_NO_ERROR));
-        io_try_write (io, FALSE);
+        io_try_write (io, !io->async);
 }
 
 /* HTTP2 read callbacks */
@@ -908,7 +910,11 @@ on_frame_send_callback (nghttp2_session     *session,
                 break;
         case NGHTTP2_RST_STREAM:
                 h2_debug (io, data, "[SEND] [RST_STREAM] stream_id=%u", frame->hd.stream_id);
-                g_hash_table_foreach_remove (io->closed_messages, (GHRFunc)remove_closed_stream, 
(gpointer)frame);
+                if (g_hash_table_foreach_remove (io->closed_messages, (GHRFunc)remove_closed_stream, 
(gpointer)frame)) {
+                        if (io->conn)
+                                soup_connection_set_in_use (io->conn, FALSE);
+                }
+
                 break;
         case NGHTTP2_GOAWAY:
                 h2_debug (io, data, "[SEND] [%s]", frame_type_to_string (frame->hd.type));
@@ -920,7 +926,7 @@ on_frame_send_callback (nghttp2_session     *session,
                         source = g_idle_source_new ();
                         g_source_set_name (source, "Soup HTTP/2 close source");
                         g_source_set_callback (source, (GSourceFunc)close_in_idle_cb, io, NULL);
-                        g_source_attach (source, g_main_context_get_thread_default ());
+                        g_source_attach (source, g_task_get_context (io->close_task));
                         g_source_unref (source);
                 }
                 break;
@@ -1403,6 +1409,11 @@ soup_client_message_io_http2_finished (SoupClientMessageIO *iface,
                         g_warn_if_reached ();
                 if (!g_hash_table_add (io->closed_messages, data))
                         g_warn_if_reached ();
+
+                if (io->conn)
+                        soup_connection_set_in_use (io->conn, TRUE);
+
+                io_try_write (io, !io->async);
         } else {
                 if (!g_hash_table_remove (io->messages, msg))
                         g_warn_if_reached ();
@@ -1413,12 +1424,8 @@ soup_client_message_io_http2_finished (SoupClientMessageIO *iface,
 
        g_object_unref (msg);
 
-        if (io->is_shutdown) {
+        if (io->is_shutdown)
                 soup_client_message_io_http2_terminate_session (io);
-                return;
-        }
-
-        io_try_write (io, FALSE);
 }
 
 static void
@@ -1553,12 +1560,13 @@ io_run (SoupHTTP2MessageData *data,
         GCancellable         *cancellable,
         GError              **error)
 {
+        SoupClientMessageIOHTTP2 *io = data->io;
         gboolean progress = FALSE;
 
-        if (data->state < STATE_WRITE_DONE && nghttp2_session_want_write (data->io->session))
-                progress = io_write (data->io, TRUE, cancellable, error);
-        else if (data->state < STATE_READ_DONE && nghttp2_session_want_read (data->io->session))
-                progress = io_read (data->io, TRUE, cancellable, error);
+        if (data->state < STATE_WRITE_DONE && !io->in_callback && nghttp2_session_want_write (io->session))
+                progress = io_write (io, TRUE, cancellable, error);
+        else if (data->state < STATE_READ_DONE && !io->in_callback && nghttp2_session_want_read 
(io->session))
+                progress = io_read (io, TRUE, cancellable, error);
 
         return progress;
 }
@@ -1687,6 +1695,31 @@ soup_client_message_io_http2_run_until_read_async (SoupClientMessageIO *iface,
                 soup_http2_message_data_check_status (data);
 }
 
+static void
+soup_client_message_io_http2_set_owner (SoupClientMessageIOHTTP2 *io,
+                                        GThread                  *owner)
+{
+        if (owner == io->owner)
+                return;
+
+        io->owner = owner;
+        g_assert (!io->write_source);
+        if (io->read_source) {
+                g_source_destroy (io->read_source);
+                g_source_unref (io->read_source);
+                io->read_source = NULL;
+        }
+
+        io->async = g_main_context_is_owner (g_main_context_get_thread_default ());
+        if (!io->async)
+                return;
+
+        io->read_source = g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM (io->istream), 
NULL);
+        g_source_set_name (io->read_source, "Soup HTTP/2 read source");
+        g_source_set_callback (io->read_source, (GSourceFunc)io_read_ready, io, NULL);
+        g_source_attach (io->read_source, g_main_context_get_thread_default ());
+}
+
 static gboolean
 soup_client_message_io_http2_close_async (SoupClientMessageIO *iface,
                                           SoupConnection      *conn,
@@ -1697,9 +1730,18 @@ soup_client_message_io_http2_close_async (SoupClientMessageIO *iface,
         if (io->goaway_sent)
                 return FALSE;
 
-        g_assert (!io->close_task);
-        io->close_task = g_task_new (conn, NULL, callback, NULL);
+        soup_client_message_io_http2_set_owner (io, g_thread_self ());
+        if (io->async) {
+                g_assert (!io->close_task);
+                io->close_task = g_task_new (conn, NULL, callback, NULL);
+        }
+
         soup_client_message_io_http2_terminate_session (io);
+        if (!io->async) {
+                g_assert (io->goaway_sent);
+                return FALSE;
+        }
+
         return TRUE;
 }
 
@@ -1730,6 +1772,14 @@ soup_client_message_io_http2_destroy (SoupClientMessageIO *iface)
         g_free (io);
 }
 
+static void
+soup_client_message_io_http2_owner_changed (SoupClientMessageIO *iface)
+{
+        SoupClientMessageIOHTTP2 *io = (SoupClientMessageIOHTTP2 *)iface;
+
+        soup_client_message_io_http2_set_owner (io, g_thread_self ());
+}
+
 static const SoupClientMessageIOFuncs io_funcs = {
         soup_client_message_io_http2_destroy,
         soup_client_message_io_http2_finished,
@@ -1747,7 +1797,8 @@ static const SoupClientMessageIOFuncs io_funcs = {
         soup_client_message_io_http2_is_open,
         soup_client_message_io_http2_in_progress,
         soup_client_message_io_http2_is_reusable,
-        soup_client_message_io_http2_get_cancellable
+        soup_client_message_io_http2_get_cancellable,
+        soup_client_message_io_http2_owner_changed
 };
 
 G_GNUC_PRINTF(1, 0)
@@ -1815,10 +1866,7 @@ soup_client_message_io_http2_new (SoupConnection *conn)
         io->ostream = g_io_stream_get_output_stream (io->stream);
         io->connection_id = soup_connection_get_id (conn);
 
-        io->read_source = g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM (io->istream), 
NULL);
-        g_source_set_name (io->read_source, "Soup HTTP/2 read source");
-        g_source_set_callback (io->read_source, (GSourceFunc)io_read_ready, io, NULL);
-        g_source_attach (io->read_source, g_main_context_get_thread_default ());
+        soup_client_message_io_http2_set_owner (io, soup_connection_get_owner (conn));
 
         NGCHECK (nghttp2_session_set_local_window_size (io->session, NGHTTP2_FLAG_NONE, 0, 
INITIAL_WINDOW_SIZE));
 
@@ -1828,7 +1876,7 @@ soup_client_message_io_http2_new (SoupConnection *conn)
                 { NGHTTP2_SETTINGS_ENABLE_PUSH, 0 },
         };
         NGCHECK (nghttp2_submit_settings (io->session, NGHTTP2_FLAG_NONE, settings, G_N_ELEMENTS 
(settings)));
-        io_try_write (io, FALSE);
+        io_try_write (io, !io->async);
 
         return (SoupClientMessageIO *)io;
 }
diff --git a/libsoup/soup-client-message-io.c b/libsoup/soup-client-message-io.c
index 5fb2c38a..a6a62a55 100644
--- a/libsoup/soup-client-message-io.c
+++ b/libsoup/soup-client-message-io.c
@@ -140,3 +140,10 @@ soup_client_message_io_get_cancellable (SoupClientMessageIO *io,
 {
         return io->funcs->get_cancellable (io, msg);
 }
+
+void
+soup_client_message_io_owner_changed (SoupClientMessageIO *io)
+{
+        if (io->funcs->owner_changed)
+                io->funcs->owner_changed (io);
+}
diff --git a/libsoup/soup-client-message-io.h b/libsoup/soup-client-message-io.h
index ae41e6e8..25f67a2a 100644
--- a/libsoup/soup-client-message-io.h
+++ b/libsoup/soup-client-message-io.h
@@ -55,6 +55,7 @@ typedef struct {
         gboolean      (*is_reusable)          (SoupClientMessageIO       *io);
         GCancellable *(*get_cancellable)      (SoupClientMessageIO       *io,
                                                SoupMessage               *msg);
+        void          (*owner_changed)        (SoupClientMessageIO       *io);
 } SoupClientMessageIOFuncs;
 
 struct _SoupClientMessageIO {
@@ -105,3 +106,4 @@ gboolean      soup_client_message_io_in_progress          (SoupClientMessageIO
 gboolean      soup_client_message_io_is_reusable          (SoupClientMessageIO       *io);
 GCancellable *soup_client_message_io_get_cancellable      (SoupClientMessageIO       *io,
                                                            SoupMessage               *msg);
+void          soup_client_message_io_owner_changed        (SoupClientMessageIO       *io);
diff --git a/libsoup/soup-connection-manager.c b/libsoup/soup-connection-manager.c
index c64d7e31..bdf4d291 100644
--- a/libsoup/soup-connection-manager.c
+++ b/libsoup/soup-connection-manager.c
@@ -341,7 +341,6 @@ connection_state_changed (SoupConnection        *conn,
                           GParamSpec            *param,
                           SoupConnectionManager *manager)
 {
-
         if (soup_connection_get_state (conn) != SOUP_CONNECTION_IDLE)
                 return;
 
@@ -387,7 +386,7 @@ soup_connection_manager_get_connection_locked (SoupConnectionManager *manager,
 
                         switch (soup_connection_get_state (conn)) {
                         case SOUP_CONNECTION_IN_USE:
-                                if (!need_new_connection && http_version == SOUP_HTTP_2_0 && 
soup_connection_is_reusable (conn))
+                                if (!need_new_connection && http_version == SOUP_HTTP_2_0 && 
soup_connection_get_owner (conn) == g_thread_self () && soup_connection_is_reusable (conn))
                                         return conn;
                                 break;
                         case SOUP_CONNECTION_IDLE:
@@ -401,7 +400,7 @@ soup_connection_manager_get_connection_locked (SoupConnectionManager *manager,
                                 /* Always wait if we have a pending connection as it may be
                                  * an h2 connection which will be shared. http/1.x connections
                                  * will only be slightly delayed. */
-                                if (force_http_version > SOUP_HTTP_1_1 && !need_new_connection && 
!item->connect_only && item->async)
+                                if (force_http_version > SOUP_HTTP_1_1 && !need_new_connection && 
!item->connect_only && item->async && soup_connection_get_owner (conn) == g_thread_self ())
                                         return NULL;
                         default:
                                 break;
diff --git a/libsoup/soup-connection.c b/libsoup/soup-connection.c
index 0c9788b9..64ee4afa 100644
--- a/libsoup/soup-connection.c
+++ b/libsoup/soup-connection.c
@@ -47,6 +47,7 @@ typedef struct {
         GTlsCertificate *tls_client_cert;
 
        GCancellable *cancellable;
+        GThread *owner;
 } SoupConnectionPrivate;
 
 G_DEFINE_FINAL_TYPE_WITH_PRIVATE (SoupConnection, soup_connection, G_TYPE_OBJECT)
@@ -96,6 +97,7 @@ soup_connection_init (SoupConnection *conn)
 
         priv->http_version = SOUP_HTTP_1_1;
         priv->force_http_version = G_MAXUINT8;
+        priv->owner = g_thread_self ();
 }
 
 static void
@@ -1136,8 +1138,12 @@ soup_connection_set_in_use (SoupConnection *conn,
 
         if (in_use) {
                 g_atomic_int_inc (&priv->in_use);
-                if (g_atomic_int_compare_and_exchange (&priv->state, SOUP_CONNECTION_IDLE, 
SOUP_CONNECTION_IN_USE))
+                if (g_atomic_int_compare_and_exchange (&priv->state, SOUP_CONNECTION_IDLE, 
SOUP_CONNECTION_IN_USE)) {
+                        priv->owner = g_thread_self ();
+                        soup_client_message_io_owner_changed (priv->io_data);
                         g_object_notify_by_pspec (G_OBJECT (conn), properties[PROP_STATE]);
+                }
+
                 return;
         }
 
@@ -1350,3 +1356,11 @@ soup_connection_is_reusable (SoupConnection *conn)
 
         return priv->io_data && soup_client_message_io_is_reusable (priv->io_data);
 }
+
+GThread *
+soup_connection_get_owner (SoupConnection *conn)
+{
+        SoupConnectionPrivate *priv = soup_connection_get_instance_private (conn);
+
+        return priv->owner;
+}
diff --git a/libsoup/soup-connection.h b/libsoup/soup-connection.h
index e8e66d18..18a21749 100644
--- a/libsoup/soup-connection.h
+++ b/libsoup/soup-connection.h
@@ -85,6 +85,7 @@ guint64              soup_connection_get_id                     (SoupConnection
 GSocketAddress      *soup_connection_get_remote_address         (SoupConnection *conn);
 SoupHTTPVersion      soup_connection_get_negotiated_protocol    (SoupConnection *conn);
 gboolean             soup_connection_is_reusable                (SoupConnection *conn);
+GThread             *soup_connection_get_owner                  (SoupConnection *conn);
 
 G_END_DECLS
 


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]