[libsoup/carlosgc/server-http2: 8/13] server: make server clients be the connections and not the messages




commit 29e11eec9d60f63d66001ce2e30bf99736fb3549
Author: Carlos Garcia Campos <cgarcia igalia com>
Date:   Mon Jul 18 11:53:27 2022 +0200

    server: make server clients be the connections and not the messages
    
    Client list is now a list of connections, owned by the server. The
    connection creates the messages and notifies the server with
    request-started signal. The first message is created before the TLS
    handshake, and then request-started is emmitted to allow the user to
    connect to accept-certificate signal on the message. If the connection
    is persistent, the next request-started signal will be emitted if
    there's an actual request from the client. This is a change in behavior,
    before we always emitted the request-started when waiting for a new
    request even the request never actually started. There are two tests
    depending on this behavior that are skipped for now.

 .../server/http1/soup-server-message-io-http1.c    |  88 +++++++++++---
 .../server/http1/soup-server-message-io-http1.h    |   5 +-
 libsoup/server/soup-server-connection.c            |  42 ++++++-
 libsoup/server/soup-server-message-io.h            |   3 +
 libsoup/server/soup-server.c                       | 130 ++++++++++-----------
 tests/connection-test.c                            |   6 +
 6 files changed, 180 insertions(+), 94 deletions(-)
---
diff --git a/libsoup/server/http1/soup-server-message-io-http1.c 
b/libsoup/server/http1/soup-server-message-io-http1.c
index 36f37de0..f735ecb6 100644
--- a/libsoup/server/http1/soup-server-message-io-http1.c
+++ b/libsoup/server/http1/soup-server-message-io-http1.c
@@ -41,12 +41,37 @@ typedef struct {
         GInputStream *istream;
         GOutputStream *ostream;
 
+        SoupMessageIOStartedFn started_cb;
+        gpointer started_user_data;
+
+        gboolean in_io_run;
+
         SoupMessageIOHTTP1 *msg_io;
 } SoupServerMessageIOHTTP1;
 
 #define RESPONSE_BLOCK_SIZE 8192
 #define HEADER_SIZE_LIMIT (64 * 1024)
 
+static gboolean io_run_ready (SoupServerMessage *msg,
+                              gpointer           user_data);
+static void io_run (SoupServerMessageIOHTTP1 *server_io);
+
+static SoupMessageIOHTTP1 *
+soup_message_io_http1_new (SoupServerMessage *msg)
+{
+        SoupMessageIOHTTP1 *msg_io;
+
+        msg_io = g_new0 (SoupMessageIOHTTP1, 1);
+        msg_io->msg = msg;
+        msg_io->base.read_header_buf = g_byte_array_new ();
+        msg_io->base.write_buf = g_string_new (NULL);
+        msg_io->base.read_state = SOUP_MESSAGE_IO_STATE_HEADERS;
+        msg_io->base.write_state = SOUP_MESSAGE_IO_STATE_NOT_STARTED;
+        msg_io->async_context = g_main_context_ref_thread_default ();
+
+        return msg_io;
+}
+
 static void
 soup_message_io_http1_free (SoupMessageIOHTTP1 *msg_io)
 {
@@ -84,6 +109,7 @@ soup_server_message_io_http1_finished (SoupServerMessageIO *iface,
         SoupMessageIOCompletionFn completion_cb;
         gpointer completion_data;
         SoupMessageIOCompletion completion;
+        SoupServerConnection *conn;
 
        completion_cb = io->msg_io->base.completion_cb;
         completion_data = io->msg_io->base.completion_data;
@@ -98,6 +124,20 @@ soup_server_message_io_http1_finished (SoupServerMessageIO *iface,
         g_clear_pointer (&io->msg_io, soup_message_io_http1_free);
        if (completion_cb)
                 completion_cb (G_OBJECT (msg), completion, completion_data);
+        conn = soup_server_message_get_connection (msg);
+        if (completion == SOUP_MESSAGE_IO_COMPLETE &&
+            soup_server_connection_is_connected (conn) &&
+            soup_server_message_is_keepalive (msg)) {
+                io->msg_io = soup_message_io_http1_new (soup_server_message_new (conn));
+                io->msg_io->base.io_source = soup_message_io_data_get_source (&io->msg_io->base,
+                                                                              G_OBJECT (io->msg_io->msg),
+                                                                              io->istream,
+                                                                              io->ostream,
+                                                                              NULL,
+                                                                              
(SoupMessageIOSourceFunc)io_run_ready,
+                                                                              NULL);
+                g_source_attach (io->msg_io->base.io_source, io->msg_io->async_context);
+        }
         g_object_unref (msg);
 }
 
@@ -687,12 +727,20 @@ io_read (SoupServerMessageIOHTTP1 *server_io,
         gssize nread;
         guint status;
        SoupMessageHeaders *request_headers;
+        gboolean succeeded;
+        gboolean is_first_read;
 
         switch (io->read_state) {
         case SOUP_MESSAGE_IO_STATE_HEADERS:
-                if (!soup_message_io_data_read_headers (io, SOUP_FILTER_INPUT_STREAM (server_io->istream), 
FALSE, NULL, NULL, error)) {
-                       if (g_error_matches (*error, G_IO_ERROR, G_IO_ERROR_PARTIAL_INPUT))
-                               soup_server_message_set_status (msg, SOUP_STATUS_BAD_REQUEST, NULL);
+                is_first_read = io->read_header_buf->len == 0 && !soup_server_message_get_method (msg);
+
+                succeeded = soup_message_io_data_read_headers (io, SOUP_FILTER_INPUT_STREAM 
(server_io->istream), FALSE, NULL, NULL, error);
+                if (is_first_read && io->read_header_buf->len > 0 && !io->completion_cb)
+                        server_io->started_cb (msg, server_io->started_user_data);
+
+                if (!succeeded) {
+                        if (g_error_matches (*error, G_IO_ERROR, G_IO_ERROR_PARTIAL_INPUT))
+                                soup_server_message_set_status (msg, SOUP_STATUS_BAD_REQUEST, NULL);
                         return FALSE;
                }
 
@@ -848,8 +896,6 @@ io_run_until (SoupServerMessageIOHTTP1 *server_io,
         return done;
 }
 
-static void io_run (SoupServerMessageIOHTTP1 *server_io);
-
 static gboolean
 io_run_ready (SoupServerMessage *msg,
               gpointer           user_data)
@@ -865,6 +911,9 @@ io_run (SoupServerMessageIOHTTP1 *server_io)
        SoupMessageIOData *io = &server_io->msg_io->base;
         GError *error = NULL;
 
+        g_assert (!server_io->in_io_run);
+        server_io->in_io_run = TRUE;
+
         if (io->io_source) {
                 g_source_destroy (io->io_source);
                 g_source_unref (io->io_source);
@@ -892,6 +941,8 @@ io_run (SoupServerMessageIOHTTP1 *server_io)
        }
        g_object_unref (msg);
        g_clear_error (&error);
+
+        server_io->in_io_run = FALSE;
 }
 
 static void
@@ -901,24 +952,15 @@ soup_server_message_io_http1_read_request (SoupServerMessageIO      *iface,
                                            gpointer                  user_data)
 {
         SoupServerMessageIOHTTP1 *io = (SoupServerMessageIOHTTP1 *)iface;
-        SoupMessageIOHTTP1 *msg_io;
+        SoupMessageIOHTTP1 *msg_io = io->msg_io;
 
-        msg_io = g_new0 (SoupMessageIOHTTP1, 1);
-        msg_io->msg = g_object_ref (msg);
+        g_assert (msg_io->msg == msg);
 
         msg_io->base.completion_cb = completion_cb;
         msg_io->base.completion_data = user_data;
 
-        msg_io->base.read_header_buf = g_byte_array_new ();
-        msg_io->base.write_buf = g_string_new (NULL);
-
-        msg_io->base.read_state = SOUP_MESSAGE_IO_STATE_HEADERS;
-        msg_io->base.write_state = SOUP_MESSAGE_IO_STATE_NOT_STARTED;
-
-        msg_io->async_context = g_main_context_ref_thread_default ();
-        io->msg_io = msg_io;
-
-        io_run (io);
+        if (!io->in_io_run)
+                io_run (io);
 }
 
 static void
@@ -989,7 +1031,10 @@ static const SoupServerMessageIOFuncs io_funcs = {
 };
 
 SoupServerMessageIO *
-soup_server_message_io_http1_new (SoupServerConnection *conn)
+soup_server_message_io_http1_new (SoupServerConnection  *conn,
+                                  SoupServerMessage     *msg,
+                                  SoupMessageIOStartedFn started_cb,
+                                  gpointer               user_data)
 {
         SoupServerMessageIOHTTP1 *io;
 
@@ -998,7 +1043,12 @@ soup_server_message_io_http1_new (SoupServerConnection *conn)
         io->istream = g_io_stream_get_input_stream (io->iostream);
         io->ostream = g_io_stream_get_output_stream (io->iostream);
 
+        io->started_cb = started_cb;
+        io->started_user_data = user_data;
+
         io->iface.funcs = &io_funcs;
 
+        io->msg_io = soup_message_io_http1_new (msg);
+
         return (SoupServerMessageIO *)io;
 }
diff --git a/libsoup/server/http1/soup-server-message-io-http1.h 
b/libsoup/server/http1/soup-server-message-io-http1.h
index a9d34da9..e934f688 100644
--- a/libsoup/server/http1/soup-server-message-io-http1.h
+++ b/libsoup/server/http1/soup-server-message-io-http1.h
@@ -8,4 +8,7 @@
 #include "soup-server-connection.h"
 #include "soup-server-message-io.h"
 
-SoupServerMessageIO *soup_server_message_io_http1_new (SoupServerConnection *conn);
+SoupServerMessageIO *soup_server_message_io_http1_new (SoupServerConnection  *conn,
+                                                       SoupServerMessage     *msg,
+                                                       SoupMessageIOStartedFn started_cb,
+                                                       gpointer               user_data);
diff --git a/libsoup/server/soup-server-connection.c b/libsoup/server/soup-server-connection.c
index 4880a324..6421443f 100644
--- a/libsoup/server/soup-server-connection.c
+++ b/libsoup/server/soup-server-connection.c
@@ -25,6 +25,7 @@ enum {
         CONNECTED,
         DISCONNECTED,
         ACCEPT_CERTIFICATE,
+        REQUEST_STARTED,
         LAST_SIGNAL
 };
 
@@ -56,6 +57,7 @@ typedef struct {
         GSocket *socket;
         GIOStream *conn;
         GIOStream *iostream;
+        SoupServerMessage *initial_msg;
         SoupServerMessageIO *io_data;
 
         GSocketAddress *local_addr;
@@ -68,6 +70,13 @@ typedef struct {
 
 G_DEFINE_FINAL_TYPE_WITH_PRIVATE (SoupServerConnection, soup_server_connection, G_TYPE_OBJECT)
 
+static void
+request_started_cb (SoupServerMessage    *msg,
+                    SoupServerConnection *conn)
+{
+        g_signal_emit (conn, signals[REQUEST_STARTED], 0, msg);
+}
+
 static void
 soup_server_connection_init (SoupServerConnection *conn)
 {
@@ -124,10 +133,8 @@ soup_server_connection_set_property (GObject      *object,
                 break;
         case PROP_CONNECTION:
                 priv->conn = g_value_dup_object (value);
-                if (priv->conn) {
+                if (priv->conn)
                         priv->iostream = soup_io_stream_new (priv->conn, FALSE);
-                        priv->io_data = soup_server_message_io_http1_new (conn);
-                }
                 break;
         case PROP_LOCAL_ADDRESS:
                 priv->local_addr = g_value_dup_object (value);
@@ -236,6 +243,15 @@ soup_server_connection_class_init (SoupServerConnectionClass *conn_class)
                               G_TYPE_BOOLEAN, 2,
                               G_TYPE_TLS_CERTIFICATE,
                               G_TYPE_TLS_CERTIFICATE_FLAGS);
+        signals[REQUEST_STARTED] =
+                g_signal_new ("request-started",
+                              G_OBJECT_CLASS_TYPE (object_class),
+                              G_SIGNAL_RUN_LAST,
+                              0,
+                              NULL, NULL,
+                              NULL,
+                              G_TYPE_NONE, 1,
+                              SOUP_TYPE_SERVER_MESSAGE);
 
         /* properties */
         properties[PROP_SOCKET] =
@@ -361,7 +377,10 @@ soup_server_connection_create_io_data (SoupServerConnection *conn)
         SoupServerConnectionPrivate *priv = soup_server_connection_get_instance_private (conn);
 
         g_assert (!priv->io_data);
-        priv->io_data = soup_server_message_io_http1_new (conn);
+        priv->io_data = soup_server_message_io_http1_new (conn,
+                                                          g_steal_pointer (&priv->initial_msg),
+                                                          (SoupMessageIOStartedFn)request_started_cb,
+                                                          conn);
 }
 
 static gboolean
@@ -413,12 +432,27 @@ soup_server_connection_setup_async (SoupServerConnection *conn,
 
         task = g_task_new (conn, cancellable, callback, user_data);
         if (priv->conn || !priv->socket) {
+                SoupServerMessage *msg;
+
+                msg = soup_server_message_new (conn);
+                g_signal_emit (conn, signals[REQUEST_STARTED], 0, msg);
+                priv->io_data = soup_server_message_io_http1_new (conn, msg,
+                                                                  (SoupMessageIOStartedFn)request_started_cb,
+                                                                  conn);
+                g_signal_emit (conn, signals[CONNECTED], 0);
                 g_task_return_boolean (task, TRUE);
                 g_object_unref (task);
 
                 return;
         }
 
+        /* We need to create the first message earlier here because SoupServerMessage is used
+         * to accept the TLS certificate.
+         */
+        g_assert (!priv->initial_msg);
+        priv->initial_msg = soup_server_message_new (conn);
+        g_signal_emit (conn, signals[REQUEST_STARTED], 0, priv->initial_msg);
+
         connection = (GIOStream *)g_socket_connection_factory_create_connection (priv->socket);
         g_socket_set_option (priv->socket, IPPROTO_TCP, TCP_NODELAY, TRUE, NULL);
 
diff --git a/libsoup/server/soup-server-message-io.h b/libsoup/server/soup-server-message-io.h
index 78b9c7d4..02a0b23b 100644
--- a/libsoup/server/soup-server-message-io.h
+++ b/libsoup/server/soup-server-message-io.h
@@ -31,6 +31,9 @@ struct _SoupServerMessageIO {
         const SoupServerMessageIOFuncs *funcs;
 };
 
+typedef void (* SoupMessageIOStartedFn) (SoupServerMessage *msg,
+                                         gpointer           user_data);
+
 void       soup_server_message_io_destroy      (SoupServerMessageIO       *io);
 void       soup_server_message_io_finished     (SoupServerMessageIO       *io,
                                                 SoupServerMessage         *msg);
diff --git a/libsoup/server/soup-server.c b/libsoup/server/soup-server.c
index 99ea192b..7e45ca08 100644
--- a/libsoup/server/soup-server.c
+++ b/libsoup/server/soup-server.c
@@ -987,39 +987,65 @@ got_body (SoupServer        *server,
 }
 
 static void
-client_disconnected (SoupServer        *server,
-                    SoupServerMessage *msg)
+message_connected (SoupServer        *server,
+                   SoupServerMessage *msg)
+{
+        soup_server_message_read_request (msg,
+                                          (SoupMessageIOCompletionFn)request_finished,
+                                          server);
+}
+
+static void
+client_disconnected (SoupServer           *server,
+                    SoupServerConnection *conn)
 {
        SoupServerPrivate *priv = soup_server_get_instance_private (server);
 
-       priv->clients = g_slist_remove (priv->clients, msg);
+       priv->clients = g_slist_remove (priv->clients, conn);
+        g_object_unref (conn);
 }
 
-typedef struct {
-        SoupServer *server;
-        SoupServerMessage *msg;
-} SetupConnectionData;
+static void
+request_started_cb (SoupServer           *server,
+                    SoupServerMessage    *msg,
+                    SoupServerConnection *conn)
+{
+        SoupServerPrivate *priv = soup_server_get_instance_private (server);
+
+        g_signal_connect_object (msg, "got-headers",
+                                 G_CALLBACK (got_headers),
+                                 server, G_CONNECT_SWAPPED);
+        g_signal_connect_object (msg, "got-body",
+                                 G_CALLBACK (got_body),
+                                 server, G_CONNECT_SWAPPED);
+
+        if (priv->server_header) {
+                SoupMessageHeaders *headers;
+
+                headers = soup_server_message_get_response_headers (msg);
+                soup_message_headers_append_common (headers, SOUP_HEADER_SERVER,
+                                                    priv->server_header);
+        }
+
+        g_signal_emit (server, signals[REQUEST_STARTED], 0, msg);
+
+        if (soup_server_message_get_io_data (msg)) {
+                message_connected (server, msg);
+                return;
+        }
+
+        g_signal_connect_object (msg, "connected",
+                                 G_CALLBACK (message_connected),
+                                 server, G_CONNECT_SWAPPED);
+}
 
 static void
 connection_setup_ready (SoupServerConnection *conn,
                         GAsyncResult         *result,
-                        SetupConnectionData  *data)
+                        gpointer              user_data)
 {
-        SoupServerPrivate *priv = soup_server_get_instance_private (data->server);
-
-        if (soup_server_connection_setup_finish (conn, result, NULL)) {
-                if (g_slist_find (priv->clients, data->msg)) {
-                        soup_server_message_read_request (data->msg,
-                                                          (SoupMessageIOCompletionFn)request_finished,
-                                                          data->server);
-                }
-       } else {
+        if (!soup_server_connection_setup_finish (conn, result, NULL))
                 soup_server_connection_disconnect (conn);
-       }
-
-        g_object_unref (data->msg);
-        g_object_unref (data->server);
-        g_free (data);
 }
 
 static void
@@ -1027,42 +1053,16 @@ soup_server_accept_connection (SoupServer           *server,
                                SoupServerConnection *conn)
 {
        SoupServerPrivate *priv = soup_server_get_instance_private (server);
-       SoupServerMessage *msg;
-        SetupConnectionData *data;
-
-       msg = soup_server_message_new (conn);
-       g_signal_connect_object (msg, "disconnected",
-                                G_CALLBACK (client_disconnected),
-                                server, G_CONNECT_SWAPPED);
-       g_signal_connect_object (msg, "got-headers",
-                                G_CALLBACK (got_headers),
-                                server, G_CONNECT_SWAPPED);
-       g_signal_connect_object (msg, "got-body",
-                                G_CALLBACK (got_body),
-                                server, G_CONNECT_SWAPPED);
-       if (priv->server_header) {
-               SoupMessageHeaders *headers;
-
-               headers = soup_server_message_get_response_headers (msg);
-               soup_message_headers_append_common (headers, SOUP_HEADER_SERVER,
-                                                    priv->server_header);
-       }
 
-       priv->clients = g_slist_prepend (priv->clients, msg);
+        priv->clients = g_slist_prepend (priv->clients, g_object_ref (conn));
+        g_signal_connect_object (conn, "disconnected",
+                                 G_CALLBACK (client_disconnected),
+                                 server, G_CONNECT_SWAPPED);
+        g_signal_connect_object (conn, "request-started",
+                                 G_CALLBACK (request_started_cb),
+                                 server, G_CONNECT_SWAPPED);
 
-        g_signal_emit (server, signals[REQUEST_STARTED], 0, msg);
-
-        if (soup_server_connection_is_connected (conn)) {
-                soup_server_message_read_request (msg,
-                                                  (SoupMessageIOCompletionFn)request_finished,
-                                                  server);
-                return;
-        }
-
-        data = g_new (SetupConnectionData, 1);
-        data->server = g_object_ref (server);
-        data->msg = g_object_ref (msg);
-        soup_server_connection_setup_async (conn, NULL, (GAsyncReadyCallback)connection_setup_ready, data);
+        soup_server_connection_setup_async (conn, NULL, (GAsyncReadyCallback)connection_setup_ready, NULL);
 }
 
 static void
@@ -1074,10 +1074,8 @@ request_finished (SoupServerMessage      *msg,
        SoupServerConnection *conn = soup_server_message_get_connection (msg);
        gboolean failed;
 
-       if (completion == SOUP_MESSAGE_IO_STOLEN) {
-               g_object_unref (msg);
+       if (completion == SOUP_MESSAGE_IO_STOLEN)
                return;
-       }
 
        /* Complete the message, assuming it actually really started. */
        if (soup_server_message_get_method (msg)) {
@@ -1093,18 +1091,10 @@ request_finished (SoupServerMessage      *msg,
        if (completion == SOUP_MESSAGE_IO_COMPLETE &&
            soup_server_connection_is_connected (conn) &&
            soup_server_message_is_keepalive (msg) &&
-           priv->listeners) {
-               g_object_ref (conn);
-               priv->clients = g_slist_remove (priv->clients, msg);
-               g_object_unref (msg);
-
-               soup_server_accept_connection (server, conn);
-               g_object_unref (conn);
+           priv->listeners)
                return;
-       }
 
        soup_server_connection_disconnect (conn);
-       g_object_unref (msg);
 }
 
 /**
@@ -1176,9 +1166,9 @@ soup_server_disconnect (SoupServer *server)
        priv->listeners = NULL;
 
        for (iter = clients; iter; iter = iter->next) {
-               SoupServerMessage *msg = iter->data;
+               SoupServerConnection *conn = iter->data;
 
-               soup_server_connection_disconnect (soup_server_message_get_connection (msg));
+               soup_server_connection_disconnect (conn);
        }
        g_slist_free (clients);
 
diff --git a/tests/connection-test.c b/tests/connection-test.c
index 6b07d6bd..2269cc84 100644
--- a/tests/connection-test.c
+++ b/tests/connection-test.c
@@ -318,6 +318,9 @@ do_persistent_connection_timeout_test (void)
 {
        SoupSession *session;
 
+        g_test_skip ("Investigate how to rewrite this test");
+        return;
+
        g_test_bug ("631525");
 
        debug_printf (1, "  Normal session, message API\n");
@@ -338,6 +341,9 @@ do_persistent_connection_timeout_test_with_cancellation (void)
        int i;
        char buf[8192];
 
+        g_test_skip ("Investigate how to rewrite this test");
+        return;
+
        session = soup_test_session_new (NULL);
 
        g_signal_connect (session, "request-queued",


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]