[glib] GOutputStream: Use async read/write of streams in splice_async()
- From: Dan Winship <danw src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [glib] GOutputStream: Use async read/write of streams in splice_async()
- Date: Sun, 29 Sep 2013 21:49:38 +0000 (UTC)
commit 4e9e7d0cba53a711bd650e9a5e28452b93f0d849
Author: Mike Ruprecht <mike ruprecht collabora co uk>
Date: Mon Feb 18 08:12:50 2013 -0600
GOutputStream: Use async read/write of streams in splice_async()
There are some corner cases where using the sync version of read/write
in a thread could cause thread-safety issues. In these cases it's
possible to override the output stream's splice_async() function,
but for input streams one would need to do some acrobatics to
stay thread-safe. Alternatively, some implementations may not even
override their sync read/write functions.
This patch refactors the default splice_async() implementation to
call the sync read and write functions in a thread only when both
async versions are thread-based. When one or both are non-threaded,
it calls the virtual write_async() and read_async() functions of the
involved streams within the same thread.
https://bugzilla.gnome.org/show_bug.cgi?id=691581
gio/goutputstream.c | 180 +++++++++++++++++++++++++++++++++++++++++++++++++--
1 files changed, 175 insertions(+), 5 deletions(-)
---
diff --git a/gio/goutputstream.c b/gio/goutputstream.c
index 1b85e9e..7e03df4 100644
--- a/gio/goutputstream.c
+++ b/gio/goutputstream.c
@@ -1559,16 +1559,177 @@ g_output_stream_real_write_finish (GOutputStream *stream,
typedef struct {
GInputStream *source;
GOutputStreamSpliceFlags flags;
+ gssize n_read;
+ gssize n_written;
+ gsize bytes_copied;
+ GError *error;
+ guint8 *buffer;
} SpliceData;
static void
free_splice_data (SpliceData *op)
{
+ g_clear_pointer (&op->buffer, g_free);
g_object_unref (op->source);
+ g_clear_error (&op->error);
g_free (op);
}
static void
+real_splice_async_complete_cb (GTask *task)
+{
+ SpliceData *op = g_task_get_task_data (task);
+
+ if (op->flags & G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE &&
+ !g_input_stream_is_closed (op->source))
+ return;
+
+ if (op->flags & G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET &&
+ !g_output_stream_is_closed (g_task_get_source_object (task)))
+ return;
+
+ if (op->error != NULL)
+ {
+ g_task_return_error (task, op->error);
+ op->error = NULL;
+ }
+ else
+ {
+ g_task_return_int (task, op->bytes_copied);
+ }
+
+ g_object_unref (task);
+}
+
+static void
+real_splice_async_close_input_cb (GObject *source,
+ GAsyncResult *res,
+ gpointer user_data)
+{
+ GTask *task = user_data;
+
+ g_input_stream_close_finish (G_INPUT_STREAM (source), res, NULL);
+
+ real_splice_async_complete_cb (task);
+}
+
+static void
+real_splice_async_close_output_cb (GObject *source,
+ GAsyncResult *res,
+ gpointer user_data)
+{
+ GTask *task = G_TASK (user_data);
+ SpliceData *op = g_task_get_task_data (task);
+ GError **error = (op->error == NULL) ? &op->error : NULL;
+
+ g_output_stream_internal_close_finish (G_OUTPUT_STREAM (source), res, error);
+
+ real_splice_async_complete_cb (task);
+}
+
+static void
+real_splice_async_complete (GTask *task)
+{
+ SpliceData *op = g_task_get_task_data (task);
+ gboolean done = TRUE;
+
+ if (op->flags & G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE)
+ {
+ done = FALSE;
+ g_input_stream_close_async (op->source, g_task_get_priority (task),
+ g_task_get_cancellable (task),
+ real_splice_async_close_input_cb, task);
+ }
+
+ if (op->flags & G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET)
+ {
+ done = FALSE;
+ g_output_stream_internal_close_async (g_task_get_source_object (task),
+ g_task_get_priority (task),
+ g_task_get_cancellable (task),
+ real_splice_async_close_output_cb,
+ task);
+ }
+
+ if (done)
+ real_splice_async_complete_cb (task);
+}
+
+static void real_splice_async_read_cb (GObject *source,
+ GAsyncResult *res,
+ gpointer user_data);
+
+static void
+real_splice_async_write_cb (GObject *source,
+ GAsyncResult *res,
+ gpointer user_data)
+{
+ GOutputStreamClass *class;
+ GTask *task = G_TASK (user_data);
+ SpliceData *op = g_task_get_task_data (task);
+ gssize ret;
+
+ class = G_OUTPUT_STREAM_GET_CLASS (g_task_get_source_object (task));
+
+ ret = class->write_finish (G_OUTPUT_STREAM (source), res, &op->error);
+
+ if (ret == -1)
+ {
+ real_splice_async_complete (task);
+ return;
+ }
+
+ op->n_written += ret;
+ op->bytes_copied += ret;
+ if (op->bytes_copied > G_MAXSSIZE)
+ op->bytes_copied = G_MAXSSIZE;
+
+ if (op->n_written < op->n_read)
+ {
+ class->write_async (g_task_get_source_object (task),
+ op->buffer + op->n_written,
+ op->n_read - op->n_written,
+ g_task_get_priority (task),
+ g_task_get_cancellable (task),
+ real_splice_async_write_cb, task);
+ return;
+ }
+
+ g_input_stream_read_async (op->source, op->buffer, 8192,
+ g_task_get_priority (task),
+ g_task_get_cancellable (task),
+ real_splice_async_read_cb, task);
+}
+
+static void
+real_splice_async_read_cb (GObject *source,
+ GAsyncResult *res,
+ gpointer user_data)
+{
+ GOutputStreamClass *class;
+ GTask *task = G_TASK (user_data);
+ SpliceData *op = g_task_get_task_data (task);
+ gssize ret;
+
+ class = G_OUTPUT_STREAM_GET_CLASS (g_task_get_source_object (task));
+
+ ret = g_input_stream_read_finish (op->source, res, &op->error);
+ if (ret == -1 || ret == 0)
+ {
+ real_splice_async_complete (task);
+ return;
+ }
+
+ op->n_read = ret;
+ op->n_written = 0;
+
+ class->write_async (g_task_get_source_object (task), op->buffer,
+ op->n_read, g_task_get_priority (task),
+ g_task_get_cancellable (task),
+ real_splice_async_write_cb, task);
+}
+
+static void
splice_async_thread (GTask *task,
gpointer source_object,
gpointer task_data,
@@ -1611,11 +1772,20 @@ g_output_stream_real_splice_async (GOutputStream *stream,
op->flags = flags;
op->source = g_object_ref (source);
- /* TODO: In the case where both source and destintion have
- non-threadbased async calls we can use a true async copy here */
-
- g_task_run_in_thread (task, splice_async_thread);
- g_object_unref (task);
+ if (g_input_stream_async_read_is_via_threads (source) &&
+ g_output_stream_async_write_is_via_threads (stream))
+ {
+ g_task_run_in_thread (task, splice_async_thread);
+ g_object_unref (task);
+ }
+ else
+ {
+ op->buffer = g_malloc (8192);
+ g_input_stream_read_async (op->source, op->buffer, 8192,
+ g_task_get_priority (task),
+ g_task_get_cancellable (task),
+ real_splice_async_read_cb, task);
+ }
}
static gssize
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]