[libsoup/carlosgc/websockets-pollable-sources] WebSockets: only poll IO stream when needed



commit 35f1bac5ff9ec694e64b65e51f0e7a3226aa3aaf
Author: Carlos Garcia Campos <cgarcia igalia com>
Date:   Wed Aug 28 10:51:18 2019 +0200

    WebSockets: only poll IO stream when needed
    
    Instead of having two pollable sources constantly running, always try to
    read/write without blocking and start polling if the operation returns
    G_IO_ERROR_WOULD_BLOCK. This patch also fixes test
    /websocket/direct/close-after-close that was passing but not actually
    testing what we wanted, because the client close was never sent. When
    the mutex is released, the frame has been queued, but not sent.

 libsoup/soup-websocket-connection.c | 115 +++++++++++++++++++++++-------------
 tests/websocket-test.c              |   5 +-
 2 files changed, 77 insertions(+), 43 deletions(-)
---
diff --git a/libsoup/soup-websocket-connection.c b/libsoup/soup-websocket-connection.c
index 345040fe..6afbbe67 100644
--- a/libsoup/soup-websocket-connection.c
+++ b/libsoup/soup-websocket-connection.c
@@ -152,6 +152,7 @@ struct _SoupWebsocketConnectionPrivate {
 };
 
 #define MAX_INCOMING_PAYLOAD_SIZE_DEFAULT   128 * 1024
+#define READ_BUFFER_SIZE 1024
 
 G_DEFINE_TYPE_WITH_PRIVATE (SoupWebsocketConnection, soup_websocket_connection, G_TYPE_OBJECT)
 
@@ -163,6 +164,11 @@ static void emit_error_and_close (SoupWebsocketConnection *self,
 
 static void protocol_error_and_close (SoupWebsocketConnection *self);
 
+static gboolean on_web_socket_input (GObject *pollable_stream,
+                                    gpointer user_data);
+static gboolean on_web_socket_output (GObject *pollable_stream,
+                                     gpointer user_data);
+
 /* Code below is based on g_utf8_validate() implementation,
  * but handling NULL characters as valid, as expected by
  * WebSockets and compliant with RFC 3629.
@@ -291,7 +297,20 @@ on_iostream_closed (GObject *source,
 }
 
 static void
-stop_input (SoupWebsocketConnection *self)
+soup_websocket_connection_start_input_source (SoupWebsocketConnection *self)
+{
+       SoupWebsocketConnectionPrivate *pv = self->pv;
+
+       if (pv->input_source)
+               return;
+
+       pv->input_source = g_pollable_input_stream_create_source (pv->input, NULL);
+       g_source_set_callback (pv->input_source, (GSourceFunc)on_web_socket_input, self, NULL);
+       g_source_attach (pv->input_source, pv->main_context);
+}
+
+static void
+soup_websocket_connection_stop_input_source (SoupWebsocketConnection *self)
 {
        SoupWebsocketConnectionPrivate *pv = self->pv;
 
@@ -304,7 +323,20 @@ stop_input (SoupWebsocketConnection *self)
 }
 
 static void
-stop_output (SoupWebsocketConnection *self)
+soup_websocket_connection_start_output_source (SoupWebsocketConnection *self)
+{
+       SoupWebsocketConnectionPrivate *pv = self->pv;
+
+       if (pv->output_source)
+               return;
+
+       pv->output_source = g_pollable_output_stream_create_source (pv->output, NULL);
+       g_source_set_callback (pv->output_source, (GSourceFunc)on_web_socket_output, self, NULL);
+       g_source_attach (pv->output_source, pv->main_context);
+}
+
+static void
+soup_websocket_connection_stop_output_source (SoupWebsocketConnection *self)
 {
        SoupWebsocketConnectionPrivate *pv = self->pv;
 
@@ -349,8 +381,8 @@ close_io_stream (SoupWebsocketConnection *self)
        close_io_stop_timeout (self);
 
        if (!pv->io_closing) {
-               stop_input (self);
-               stop_output (self);
+               soup_websocket_connection_stop_input_source (self);
+               soup_websocket_connection_stop_output_source (self);
                pv->io_closing = TRUE;
                g_debug ("closing io stream");
                g_io_stream_close_async (pv->io_stream, G_PRIORITY_DEFAULT,
@@ -368,7 +400,7 @@ shutdown_wr_io_stream (SoupWebsocketConnection *self)
        GIOStream *base_iostream;
        GError *error = NULL;
 
-       stop_output (self);
+       soup_websocket_connection_stop_output_source (self);
 
        base_iostream = SOUP_IS_IO_STREAM (pv->io_stream) ?
                soup_io_stream_get_base_iostream (SOUP_IO_STREAM (pv->io_stream)) :
@@ -644,9 +676,6 @@ too_big_error_and_close (SoupWebsocketConnection *self,
                 self->pv->connection_type == SOUP_WEBSOCKET_CONNECTION_SERVER ? "server" : "client",
                 payload_len, self->pv->max_incoming_payload_size);
        emit_error_and_close (self, error, TRUE);
-
-       /* The input is in an invalid state now */
-       stop_input (self);
 }
 
 static void
@@ -1080,32 +1109,31 @@ process_incoming (SoupWebsocketConnection *self)
                ;
 }
 
-static gboolean
-on_web_socket_input (GObject *pollable_stream,
-                    gpointer user_data)
+static void
+soup_websocket_connection_read (SoupWebsocketConnection *self)
 {
-       SoupWebsocketConnection *self = SOUP_WEBSOCKET_CONNECTION (user_data);
        SoupWebsocketConnectionPrivate *pv = self->pv;
        GError *error = NULL;
        gboolean end = FALSE;
        gssize count;
        gsize len;
 
+       soup_websocket_connection_stop_input_source (self);
+
        do {
                len = pv->incoming->len;
-               g_byte_array_set_size (pv->incoming, len + 1024);
+               g_byte_array_set_size (pv->incoming, len + READ_BUFFER_SIZE);
 
                count = g_pollable_input_stream_read_nonblocking (pv->input,
                                                                  pv->incoming->data + len,
-                                                                 1024, NULL, &error);
-
+                                                                 READ_BUFFER_SIZE, NULL, &error);
                if (count < 0) {
                        if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
                                g_error_free (error);
                                count = 0;
                        } else {
                                emit_error_and_close (self, error, TRUE);
-                               return TRUE;
+                               return;
                        }
                } else if (count == 0) {
                        end = TRUE;
@@ -1125,16 +1153,24 @@ on_web_socket_input (GObject *pollable_stream,
                }
 
                close_io_stream (self);
+               return;
        }
 
-       return TRUE;
+       soup_websocket_connection_start_input_source (self);
 }
 
 static gboolean
-on_web_socket_output (GObject *pollable_stream,
-                     gpointer user_data)
+on_web_socket_input (GObject *pollable_stream,
+                    gpointer user_data)
+{
+       soup_websocket_connection_read (SOUP_WEBSOCKET_CONNECTION (user_data));
+
+       return G_SOURCE_REMOVE;
+}
+
+static void
+soup_websocket_connection_write (SoupWebsocketConnection *self)
 {
-       SoupWebsocketConnection *self = SOUP_WEBSOCKET_CONNECTION (user_data);
        SoupWebsocketConnectionPrivate *pv = self->pv;
        const guint8 *data;
        GError *error = NULL;
@@ -1142,19 +1178,18 @@ on_web_socket_output (GObject *pollable_stream,
        gssize count;
        gsize len;
 
+       soup_websocket_connection_stop_output_source (self);
+
        if (soup_websocket_connection_get_state (self) == SOUP_WEBSOCKET_STATE_CLOSED) {
                g_debug ("Ignoring message since the connection is closed");
-               stop_output (self);
-               return TRUE;
+               return;
        }
 
        frame = g_queue_peek_head (&pv->outgoing);
 
        /* No more frames to send */
-       if (frame == NULL) {
-               stop_output (self);
-               return TRUE;
-       }
+       if (frame == NULL)
+               return;
 
        data = g_bytes_get_data (frame->data, &len);
        g_assert (len > 0);
@@ -1174,7 +1209,7 @@ on_web_socket_output (GObject *pollable_stream,
                        frame->pending = TRUE;
                } else {
                        emit_error_and_close (self, error, TRUE);
-                       return FALSE;
+                       return;
                }
        }
 
@@ -1192,23 +1227,21 @@ on_web_socket_output (GObject *pollable_stream,
                        }
                }
                frame_free (frame);
+
+               if (g_queue_is_empty (&pv->outgoing))
+                       return;
        }
 
-       return TRUE;
+       soup_websocket_connection_start_output_source (self);
 }
 
-static void
-start_output (SoupWebsocketConnection *self)
+static gboolean
+on_web_socket_output (GObject *pollable_stream,
+                     gpointer user_data)
 {
-       SoupWebsocketConnectionPrivate *pv = self->pv;
+       soup_websocket_connection_write (SOUP_WEBSOCKET_CONNECTION (user_data));
 
-       if (pv->output_source)
-               return;
-
-       g_debug ("starting output source");
-       pv->output_source = g_pollable_output_stream_create_source (pv->output, NULL);
-       g_source_set_callback (pv->output_source, (GSourceFunc)on_web_socket_output, self, NULL);
-       g_source_attach (pv->output_source, pv->main_context);
+       return G_SOURCE_REMOVE;
 }
 
 static void
@@ -1249,7 +1282,7 @@ queue_frame (SoupWebsocketConnection *self,
                g_queue_push_tail (&pv->outgoing, frame);
        }
 
-       start_output (self);
+       soup_websocket_connection_write (self);
 }
 
 static void
@@ -1274,9 +1307,7 @@ soup_websocket_connection_constructed (GObject *object)
        pv->output = G_POLLABLE_OUTPUT_STREAM (os);
        g_return_if_fail (g_pollable_output_stream_can_poll (pv->output));
 
-       pv->input_source = g_pollable_input_stream_create_source (pv->input, NULL);
-       g_source_set_callback (pv->input_source, (GSourceFunc)on_web_socket_input, self, NULL);
-       g_source_attach (pv->input_source, pv->main_context);
+       soup_websocket_connection_start_input_source (self);
 }
 
 static void
diff --git a/tests/websocket-test.c b/tests/websocket-test.c
index 146fdf82..26d064df 100644
--- a/tests/websocket-test.c
+++ b/tests/websocket-test.c
@@ -1017,6 +1017,7 @@ close_after_close_server_thread (gpointer user_data)
        const char frames[] =
                "\x88\x09\x03\xe8""reason1"
                "\x88\x09\x03\xe8""reason2";
+       GSocket *socket;
        GError *error = NULL;
 
        g_mutex_lock (&test->mutex);
@@ -1026,7 +1027,8 @@ close_after_close_server_thread (gpointer user_data)
                                   frames, sizeof (frames) -1, &written, NULL, &error);
        g_assert_no_error (error);
        g_assert_cmpuint (written, ==, sizeof (frames) - 1);
-       g_io_stream_close (test->raw_server, NULL, &error);
+       socket = g_socket_connection_get_socket (G_SOCKET_CONNECTION (test->raw_server));
+       g_socket_shutdown (socket, FALSE, TRUE, &error);
        g_assert_no_error (error);
 
        return NULL;
@@ -1050,6 +1052,7 @@ test_close_after_close (Test *test,
        WAIT_UNTIL (soup_websocket_connection_get_state (test->client) == SOUP_WEBSOCKET_STATE_CLOSED);
        g_assert_cmpuint (soup_websocket_connection_get_close_code (test->client), ==, 
SOUP_WEBSOCKET_CLOSE_NORMAL);
        g_assert_cmpstr (soup_websocket_connection_get_close_data (test->client), ==, "reason1");
+       g_io_stream_close (test->raw_server, NULL, NULL);
 }
 
 static gpointer


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]