[phodav] spice: OutputQueue refactor
- From: Victor Toso <victortoso src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [phodav] spice: OutputQueue refactor
- Date: Tue, 3 Mar 2020 11:12:52 +0000 (UTC)
commit da0bfa1e921ef886262bbb718fff26dcb17a25f8
Author: Jakub Janků <jjanku redhat com>
Date: Sat Aug 24 11:13:31 2019 +0200
spice: OutputQueue refactor
The main issue with the current implementation is that
g_output_stream_write_all() in output_queue_idle()
is called synchronously in the main context.
This is fine for GBufferedOutputStream, since the data is just
copied to a buffer, but not for GUnixOutputStream or
GWin32OutputStream (class of mux_ostream) which might block
the main thread.
OutputQueue is also used for each client, where it doesn't make
much sense because the queue has never more than 1 item at a time.
So let's just use g_output_stream_write_all() instead.
Signed-off-by: Jakub Janků <jjanku redhat com>
spice/output-queue.c | 93 +++++++++++++++------------------------------------
spice/spice-webdavd.c | 50 +++++++++++----------------
2 files changed, 47 insertions(+), 96 deletions(-)
---
diff --git a/spice/output-queue.c b/spice/output-queue.c
index 0c4034f..0e1dc76 100644
--- a/spice/output-queue.c
+++ b/spice/output-queue.c
@@ -20,7 +20,6 @@
typedef struct _OutputQueueElem
{
- OutputQueue *queue;
const guint8 *buf;
gsize size;
PushedCb cb;
@@ -31,14 +30,15 @@ struct _OutputQueue
{
GObject parent_instance;
GOutputStream *output;
- gboolean flushing;
- guint idle_id;
+ gboolean writing;
GQueue *queue;
GCancellable *cancel;
};
G_DEFINE_TYPE (OutputQueue, output_queue, G_TYPE_OBJECT);
+static void output_queue_kick (OutputQueue *q);
+
static void output_queue_init (OutputQueue *self)
{
self->queue = g_queue_new ();
@@ -48,10 +48,6 @@ static void output_queue_finalize (GObject *obj)
{
OutputQueue *self = OUTPUT_QUEUE (obj);
- g_warn_if_fail (g_queue_get_length (self->queue) == 0);
- g_warn_if_fail (!self->flushing);
- g_warn_if_fail (!self->idle_id);
-
g_queue_free_full (self->queue, g_free);
g_object_unref (self->output);
g_object_unref (self->cancel);
@@ -73,74 +69,41 @@ OutputQueue* output_queue_new (GOutputStream *output, GCancellable *cancel)
return self;
}
-static gboolean output_queue_idle (gpointer user_data);
-
static void
-output_queue_flush_cb (GObject *source_object,
- GAsyncResult *res,
- gpointer user_data)
+write_cb (GObject *source_object,
+ GAsyncResult *res,
+ gpointer user_data)
{
- GError *error = NULL;
- OutputQueueElem *e = user_data;
- OutputQueue *q = e->queue;
-
- g_debug ("flushed");
- q->flushing = FALSE;
- g_output_stream_flush_finish (G_OUTPUT_STREAM (source_object),
- res, &error);
- if (error)
- g_warning ("error: %s", error->message);
+ OutputQueue *q = user_data;
+ OutputQueueElem *e;
+ GError *err = NULL;
- g_clear_error (&error);
+ e = g_queue_pop_head (q->queue);
+ g_output_stream_write_all_finish (G_OUTPUT_STREAM (source_object), res, NULL, &err);
- if (!q->idle_id)
- q->idle_id = g_idle_add (output_queue_idle, g_object_ref (q));
+ if (e->cb)
+ e->cb (q, e->user_data, err);
g_free (e);
+ q->writing = FALSE;
+ if (!err)
+ output_queue_kick (q);
+ g_clear_error (&err);
g_object_unref (q);
}
-static gboolean
-output_queue_idle (gpointer user_data)
+static void
+output_queue_kick (OutputQueue *q)
{
- OutputQueue *q = user_data;
- OutputQueueElem *e = NULL;
- GError *error = NULL;
-
- if (q->flushing)
- {
- g_debug ("already flushing");
- goto end;
- }
-
- e = g_queue_pop_head (q->queue);
- if (!e)
- {
- g_debug ("No more data to flush");
- goto end;
- }
-
- g_debug ("flushing %" G_GSIZE_FORMAT, e->size);
- g_output_stream_write_all (q->output, e->buf, e->size, NULL, q->cancel, &error);
- if (e->cb)
- e->cb (q, e->user_data, error);
-
- if (error)
- goto end;
-
- q->flushing = TRUE;
- g_output_stream_flush_async (q->output, G_PRIORITY_DEFAULT, q->cancel, output_queue_flush_cb, e);
-
- q->idle_id = 0;
- return FALSE;
+ OutputQueueElem *e;
-end:
- g_clear_error (&error);
- q->idle_id = 0;
- g_free (e);
- g_object_unref (q);
+ if (!q || q->writing || g_queue_is_empty (q->queue))
+ return;
- return FALSE;
+ e = g_queue_peek_head (q->queue);
+ q->writing = TRUE;
+ g_output_stream_write_all_async (q->output, e->buf, e->size,
+ G_PRIORITY_DEFAULT, q->cancel, write_cb, g_object_ref (q));
}
void
@@ -156,9 +119,7 @@ output_queue_push (OutputQueue *q, const guint8 *buf, gsize size,
e->size = size;
e->cb = pushed_cb;
e->user_data = user_data;
- e->queue = q;
g_queue_push_tail (q->queue, e);
- if (!q->idle_id && !q->flushing)
- q->idle_id = g_idle_add (output_queue_idle, g_object_ref (q));
+ output_queue_kick (q);
}
diff --git a/spice/spice-webdavd.c b/spice/spice-webdavd.c
index 1b01158..10b62b6 100644
--- a/spice/spice-webdavd.c
+++ b/spice/spice-webdavd.c
@@ -68,7 +68,6 @@ typedef struct _Client
guint8 buf[G_MAXUINT16];
} mux;
GSocketConnection *client_connection;
- OutputQueue *queue;
} Client;
static volatile gboolean quit_service;
@@ -110,22 +109,12 @@ signal_handler (gpointer user_data)
static Client *
add_client (GSocketConnection *client_connection)
{
- GIOStream *iostream = G_IO_STREAM (client_connection);
- GOutputStream *ostream = g_io_stream_get_output_stream (iostream);
- GOutputStream *bostream;
Client *client;
-
- bostream = g_buffered_output_stream_new (ostream);
- g_buffered_output_stream_set_auto_grow (G_BUFFERED_OUTPUT_STREAM (bostream), TRUE);
-
client = g_new0 (Client, 1);
client->ref_count = 1;
client->client_connection = g_object_ref (client_connection);
// TODO: check if usage of this idiom is portable, or if we need to check collisions
client->mux.id = GPOINTER_TO_INT (client_connection);
- client->queue = output_queue_new (bostream, cancel);
- g_object_unref (bostream);
-
g_hash_table_insert (clients, &client->mux.id, client);
g_warn_if_fail (g_hash_table_lookup (clients, &client->mux.id));
@@ -150,7 +139,6 @@ client_unref (gpointer user_data)
g_io_stream_close (G_IO_STREAM (c->client_connection), NULL, NULL);
g_object_unref (c->client_connection);
- g_object_unref (c->queue);
g_free (c);
}
@@ -163,24 +151,21 @@ remove_client (Client *client)
}
static void
-handle_push_error (OutputQueue *q, gpointer user_data, GError *error)
+mux_pushed_client_cb (GObject *source_object,
+ GAsyncResult *res,
+ gpointer user_data)
{
Client *client = user_data;
+ GError *error = NULL;
+ g_output_stream_write_all_finish (G_OUTPUT_STREAM (source_object), res, NULL, &error);
- if (!error)
- return;
-
- g_warning ("push error: %s", error->message);
- remove_client (client);
-}
-
-static void
-mux_pushed_client_cb (OutputQueue *q, gpointer user_data, GError *error)
-{
- if (error) {
- handle_push_error (q, user_data, error);
- }
- client_unref (user_data);
+ if (error)
+ {
+ g_warning ("error pushing to client %p: %s", client, error->message);
+ g_error_free (error);
+ remove_client (client);
+ }
+ client_unref (client);
start_mux_read (mux_istream);
}
@@ -210,8 +195,12 @@ mux_data_read_cb (GObject *source_object,
g_warn_if_fail(c != NULL);
if (c)
- output_queue_push (c->queue, (guint8 *) demux.buf, demux.size,
- mux_pushed_client_cb, client_ref (c));
+ {
+ GOutputStream *out;
+ out = g_io_stream_get_output_stream (G_IO_STREAM (c->client_connection));
+ g_output_stream_write_all_async (out, demux.buf, demux.size,
+ G_PRIORITY_DEFAULT, cancel, mux_pushed_client_cb, client_ref (c));
+ }
else
start_mux_read (mux_istream);
}
@@ -289,7 +278,8 @@ mux_pushed_cb (OutputQueue *q, gpointer user_data, GError *error)
if (error)
{
- handle_push_error (q, client, error);
+ g_warning ("error pushing to mux from client %p: %s", client, error->message);
+ remove_client (client);
goto end;
}
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]