[libsoup/websockets-fixes-2.64: 14/19] WebSockets: only poll IO stream when needed




commit f4326fed53295bd12ebc42a25f6f72785295c6cb
Author: Carlos Garcia Campos <cgarcia igalia com>
Date:   Wed Aug 28 10:51:18 2019 +0200

    WebSockets: only poll IO stream when needed
    
    Instead of having two pollable sources constantly running, always try to
    read/write without blocking and start polling if the operation returns
    G_IO_ERROR_WOULD_BLOCK. This patch also fixes test
    /websocket/direct/close-after-close that was passing but not actually
    testing what we wanted, because the client close was never sent. When
    the mutex is released, the frame has been queued, but not sent.

 libsoup/soup-websocket-connection.c | 115 +++++++++++++++++++++++-------------
 tests/websocket-test.c              |   5 +-
 2 files changed, 77 insertions(+), 43 deletions(-)
---
diff --git a/libsoup/soup-websocket-connection.c b/libsoup/soup-websocket-connection.c
index 1053d52e..3825c717 100644
--- a/libsoup/soup-websocket-connection.c
+++ b/libsoup/soup-websocket-connection.c
@@ -148,6 +148,7 @@ struct _SoupWebsocketConnectionPrivate {
 };
 
 #define MAX_INCOMING_PAYLOAD_SIZE_DEFAULT   128 * 1024
+#define READ_BUFFER_SIZE 1024
 
 G_DEFINE_TYPE_WITH_PRIVATE (SoupWebsocketConnection, soup_websocket_connection, G_TYPE_OBJECT)
 
@@ -156,6 +157,11 @@ static void queue_frame (SoupWebsocketConnection *self, SoupWebsocketQueueFlags
 
 static void protocol_error_and_close (SoupWebsocketConnection *self);
 
+static gboolean on_web_socket_input (GObject *pollable_stream,
+                                    gpointer user_data);
+static gboolean on_web_socket_output (GObject *pollable_stream,
+                                     gpointer user_data);
+
 /* Code below is based on g_utf8_validate() implementation,
  * but handling NULL characters as valid, as expected by
  * WebSockets and compliant with RFC 3629.
@@ -284,7 +290,20 @@ on_iostream_closed (GObject *source,
 }
 
 static void
-stop_input (SoupWebsocketConnection *self)
+soup_websocket_connection_start_input_source (SoupWebsocketConnection *self)
+{
+       SoupWebsocketConnectionPrivate *pv = self->pv;
+
+       if (pv->input_source)
+               return;
+
+       pv->input_source = g_pollable_input_stream_create_source (pv->input, NULL);
+       g_source_set_callback (pv->input_source, (GSourceFunc)on_web_socket_input, self, NULL);
+       g_source_attach (pv->input_source, pv->main_context);
+}
+
+static void
+soup_websocket_connection_stop_input_source (SoupWebsocketConnection *self)
 {
        SoupWebsocketConnectionPrivate *pv = self->pv;
 
@@ -297,7 +316,20 @@ stop_input (SoupWebsocketConnection *self)
 }
 
 static void
-stop_output (SoupWebsocketConnection *self)
+soup_websocket_connection_start_output_source (SoupWebsocketConnection *self)
+{
+       SoupWebsocketConnectionPrivate *pv = self->pv;
+
+       if (pv->output_source)
+               return;
+
+       pv->output_source = g_pollable_output_stream_create_source (pv->output, NULL);
+       g_source_set_callback (pv->output_source, (GSourceFunc)on_web_socket_output, self, NULL);
+       g_source_attach (pv->output_source, pv->main_context);
+}
+
+static void
+soup_websocket_connection_stop_output_source (SoupWebsocketConnection *self)
 {
        SoupWebsocketConnectionPrivate *pv = self->pv;
 
@@ -342,8 +374,8 @@ close_io_stream (SoupWebsocketConnection *self)
        close_io_stop_timeout (self);
 
        if (!pv->io_closing) {
-               stop_input (self);
-               stop_output (self);
+               soup_websocket_connection_stop_input_source (self);
+               soup_websocket_connection_stop_output_source (self);
                pv->io_closing = TRUE;
                g_debug ("closing io stream");
                g_io_stream_close_async (pv->io_stream, G_PRIORITY_DEFAULT,
@@ -361,7 +393,7 @@ shutdown_wr_io_stream (SoupWebsocketConnection *self)
        GIOStream *base_iostream;
        GError *error = NULL;
 
-       stop_output (self);
+       soup_websocket_connection_stop_output_source (self);
 
        base_iostream = SOUP_IS_IO_STREAM (pv->io_stream) ?
                soup_io_stream_get_base_iostream (SOUP_IO_STREAM (pv->io_stream)) :
@@ -618,9 +650,6 @@ too_big_error_and_close (SoupWebsocketConnection *self,
                 self->pv->connection_type == SOUP_WEBSOCKET_CONNECTION_SERVER ? "server" : "client",
                 payload_len, self->pv->max_incoming_payload_size);
        emit_error_and_close (self, error, TRUE);
-
-       /* The input is in an invalid state now */
-       stop_input (self);
 }
 
 static void
@@ -1030,32 +1059,31 @@ process_incoming (SoupWebsocketConnection *self)
                ;
 }
 
-static gboolean
-on_web_socket_input (GObject *pollable_stream,
-                    gpointer user_data)
+static void
+soup_websocket_connection_read (SoupWebsocketConnection *self)
 {
-       SoupWebsocketConnection *self = SOUP_WEBSOCKET_CONNECTION (user_data);
        SoupWebsocketConnectionPrivate *pv = self->pv;
        GError *error = NULL;
        gboolean end = FALSE;
        gssize count;
        gsize len;
 
+       soup_websocket_connection_stop_input_source (self);
+
        do {
                len = pv->incoming->len;
-               g_byte_array_set_size (pv->incoming, len + 1024);
+               g_byte_array_set_size (pv->incoming, len + READ_BUFFER_SIZE);
 
                count = g_pollable_input_stream_read_nonblocking (pv->input,
                                                                  pv->incoming->data + len,
-                                                                 1024, NULL, &error);
-
+                                                                 READ_BUFFER_SIZE, NULL, &error);
                if (count < 0) {
                        if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
                                g_error_free (error);
                                count = 0;
                        } else {
                                emit_error_and_close (self, error, TRUE);
-                               return TRUE;
+                               return;
                        }
                } else if (count == 0) {
                        end = TRUE;
@@ -1075,16 +1103,24 @@ on_web_socket_input (GObject *pollable_stream,
                }
 
                close_io_stream (self);
+               return;
        }
 
-       return TRUE;
+       soup_websocket_connection_start_input_source (self);
 }
 
 static gboolean
-on_web_socket_output (GObject *pollable_stream,
-                     gpointer user_data)
+on_web_socket_input (GObject *pollable_stream,
+                    gpointer user_data)
+{
+       soup_websocket_connection_read (SOUP_WEBSOCKET_CONNECTION (user_data));
+
+       return G_SOURCE_REMOVE;
+}
+
+static void
+soup_websocket_connection_write (SoupWebsocketConnection *self)
 {
-       SoupWebsocketConnection *self = SOUP_WEBSOCKET_CONNECTION (user_data);
        SoupWebsocketConnectionPrivate *pv = self->pv;
        const guint8 *data;
        GError *error = NULL;
@@ -1092,19 +1128,18 @@ on_web_socket_output (GObject *pollable_stream,
        gssize count;
        gsize len;
 
+       soup_websocket_connection_stop_output_source (self);
+
        if (soup_websocket_connection_get_state (self) == SOUP_WEBSOCKET_STATE_CLOSED) {
                g_debug ("Ignoring message since the connection is closed");
-               stop_output (self);
-               return TRUE;
+               return;
        }
 
        frame = g_queue_peek_head (&pv->outgoing);
 
        /* No more frames to send */
-       if (frame == NULL) {
-               stop_output (self);
-               return TRUE;
-       }
+       if (frame == NULL)
+               return;
 
        data = g_bytes_get_data (frame->data, &len);
        g_assert (len > 0);
@@ -1124,7 +1159,7 @@ on_web_socket_output (GObject *pollable_stream,
                        frame->pending = TRUE;
                } else {
                        emit_error_and_close (self, error, TRUE);
-                       return FALSE;
+                       return;
                }
        }
 
@@ -1142,23 +1177,21 @@ on_web_socket_output (GObject *pollable_stream,
                        }
                }
                frame_free (frame);
+
+               if (g_queue_is_empty (&pv->outgoing))
+                       return;
        }
 
-       return TRUE;
+       soup_websocket_connection_start_output_source (self);
 }
 
-static void
-start_output (SoupWebsocketConnection *self)
+static gboolean
+on_web_socket_output (GObject *pollable_stream,
+                     gpointer user_data)
 {
-       SoupWebsocketConnectionPrivate *pv = self->pv;
+       soup_websocket_connection_write (SOUP_WEBSOCKET_CONNECTION (user_data));
 
-       if (pv->output_source)
-               return;
-
-       g_debug ("starting output source");
-       pv->output_source = g_pollable_output_stream_create_source (pv->output, NULL);
-       g_source_set_callback (pv->output_source, (GSourceFunc)on_web_socket_output, self, NULL);
-       g_source_attach (pv->output_source, pv->main_context);
+       return G_SOURCE_REMOVE;
 }
 
 static void
@@ -1199,7 +1232,7 @@ queue_frame (SoupWebsocketConnection *self,
                g_queue_push_tail (&pv->outgoing, frame);
        }
 
-       start_output (self);
+       soup_websocket_connection_write (self);
 }
 
 static void
@@ -1224,9 +1257,7 @@ soup_websocket_connection_constructed (GObject *object)
        pv->output = G_POLLABLE_OUTPUT_STREAM (os);
        g_return_if_fail (g_pollable_output_stream_can_poll (pv->output));
 
-       pv->input_source = g_pollable_input_stream_create_source (pv->input, NULL);
-       g_source_set_callback (pv->input_source, (GSourceFunc)on_web_socket_input, self, NULL);
-       g_source_attach (pv->input_source, pv->main_context);
+       soup_websocket_connection_start_input_source (self);
 }
 
 static void
diff --git a/tests/websocket-test.c b/tests/websocket-test.c
index 3b5d6669..d9afd28c 100644
--- a/tests/websocket-test.c
+++ b/tests/websocket-test.c
@@ -922,6 +922,7 @@ close_after_close_server_thread (gpointer user_data)
        const char frames[] =
                "\x88\x09\x03\xe8""reason1"
                "\x88\x09\x03\xe8""reason2";
+       GSocket *socket;
        GError *error = NULL;
 
        g_mutex_lock (&test->mutex);
@@ -931,7 +932,8 @@ close_after_close_server_thread (gpointer user_data)
                                   frames, sizeof (frames) -1, &written, NULL, &error);
        g_assert_no_error (error);
        g_assert_cmpuint (written, ==, sizeof (frames) - 1);
-       g_io_stream_close (test->raw_server, NULL, &error);
+       socket = g_socket_connection_get_socket (G_SOCKET_CONNECTION (test->raw_server));
+       g_socket_shutdown (socket, FALSE, TRUE, &error);
        g_assert_no_error (error);
 
        return NULL;
@@ -955,6 +957,7 @@ test_close_after_close (Test *test,
        WAIT_UNTIL (soup_websocket_connection_get_state (test->client) == SOUP_WEBSOCKET_STATE_CLOSED);
        g_assert_cmpuint (soup_websocket_connection_get_close_code (test->client), ==, 
SOUP_WEBSOCKET_CLOSE_NORMAL);
        g_assert_cmpstr (soup_websocket_connection_get_close_data (test->client), ==, "reason1");
+       g_io_stream_close (test->raw_server, NULL, NULL);
 }
 
 static gpointer


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]