[glib] GOutputStream: Use async read/write of streams in splice_async()



commit 4e9e7d0cba53a711bd650e9a5e28452b93f0d849
Author: Mike Ruprecht <mike ruprecht collabora co uk>
Date:   Mon Feb 18 08:12:50 2013 -0600

    GOutputStream: Use async read/write of streams in splice_async()
    
    There are some corner cases where using the sync version of read/write
    in a thread could cause thread-safety issues. In these cases it's
    possible to override the output stream's splice_async() function,
    but for input streams one would need to do some acrobatics to
    stay thread-safe. Alternatively, some implementations may not even
    override their sync read/write functions.
    
    This patch refactors the default splice_async() implementation to
    call the sync read and write functions in a thread only when both
    async versions are thread-based. When one or both are non-threaded,
    it calls the virtual write_async() and read_async() functions of the
    involved streams within the same thread.
    
    https://bugzilla.gnome.org/show_bug.cgi?id=691581

 gio/goutputstream.c |  180 +++++++++++++++++++++++++++++++++++++++++++++++++--
 1 files changed, 175 insertions(+), 5 deletions(-)
---
diff --git a/gio/goutputstream.c b/gio/goutputstream.c
index 1b85e9e..7e03df4 100644
--- a/gio/goutputstream.c
+++ b/gio/goutputstream.c
@@ -1559,16 +1559,177 @@ g_output_stream_real_write_finish (GOutputStream  *stream,
 typedef struct {
   GInputStream *source;
   GOutputStreamSpliceFlags flags;
+  gssize n_read;
+  gssize n_written;
+  gsize bytes_copied;
+  GError *error;
+  guint8 *buffer;
 } SpliceData;
 
 static void
 free_splice_data (SpliceData *op)
 {
+  g_clear_pointer (&op->buffer, g_free);
   g_object_unref (op->source);
+  g_clear_error (&op->error);
   g_free (op);
 }
 
 static void
+real_splice_async_complete_cb (GTask *task)
+{
+  SpliceData *op = g_task_get_task_data (task);
+
+  if (op->flags & G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE &&
+      !g_input_stream_is_closed (op->source))
+    return;
+
+  if (op->flags & G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET &&
+      !g_output_stream_is_closed (g_task_get_source_object (task)))
+    return;
+
+  if (op->error != NULL)
+    {
+      g_task_return_error (task, op->error);
+      op->error = NULL;
+    }
+  else
+    {
+      g_task_return_int (task, op->bytes_copied);
+    }
+
+  g_object_unref (task);
+}
+
+static void
+real_splice_async_close_input_cb (GObject      *source,
+                                  GAsyncResult *res,
+                                  gpointer      user_data)
+{
+  GTask *task = user_data;
+
+  g_input_stream_close_finish (G_INPUT_STREAM (source), res, NULL);
+
+  real_splice_async_complete_cb (task);
+}
+
+static void
+real_splice_async_close_output_cb (GObject      *source,
+                                   GAsyncResult *res,
+                                   gpointer      user_data)
+{
+  GTask *task = G_TASK (user_data);
+  SpliceData *op = g_task_get_task_data (task);
+  GError **error = (op->error == NULL) ? &op->error : NULL;
+
+  g_output_stream_internal_close_finish (G_OUTPUT_STREAM (source), res, error);
+
+  real_splice_async_complete_cb (task);
+}
+
+static void
+real_splice_async_complete (GTask *task)
+{
+  SpliceData *op = g_task_get_task_data (task);
+  gboolean done = TRUE;
+
+  if (op->flags & G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE)
+    {
+      done = FALSE;
+      g_input_stream_close_async (op->source, g_task_get_priority (task),
+                                  g_task_get_cancellable (task),
+                                  real_splice_async_close_input_cb, task);
+    }
+
+  if (op->flags & G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET)
+    {
+      done = FALSE;
+      g_output_stream_internal_close_async (g_task_get_source_object (task),
+                                            g_task_get_priority (task),
+                                            g_task_get_cancellable (task),
+                                            real_splice_async_close_output_cb,
+                                            task);
+    }
+
+  if (done)
+    real_splice_async_complete_cb (task);
+}
+
+static void real_splice_async_read_cb (GObject      *source,
+                                       GAsyncResult *res,
+                                       gpointer      user_data);
+
+static void
+real_splice_async_write_cb (GObject      *source,
+                            GAsyncResult *res,
+                            gpointer      user_data)
+{
+  GOutputStreamClass *class;
+  GTask *task = G_TASK (user_data);
+  SpliceData *op = g_task_get_task_data (task);
+  gssize ret;
+
+  class = G_OUTPUT_STREAM_GET_CLASS (g_task_get_source_object (task));
+
+  ret = class->write_finish (G_OUTPUT_STREAM (source), res, &op->error);
+
+  if (ret == -1)
+    {
+      real_splice_async_complete (task);
+      return;
+    }
+
+  op->n_written += ret;
+  op->bytes_copied += ret;
+  if (op->bytes_copied > G_MAXSSIZE)
+    op->bytes_copied = G_MAXSSIZE;
+
+  if (op->n_written < op->n_read)
+    {
+      class->write_async (g_task_get_source_object (task),
+                          op->buffer + op->n_written,
+                          op->n_read - op->n_written,
+                          g_task_get_priority (task),
+                          g_task_get_cancellable (task),
+                          real_splice_async_write_cb, task);
+      return;
+    }
+
+  g_input_stream_read_async (op->source, op->buffer, 8192,
+                             g_task_get_priority (task),
+                             g_task_get_cancellable (task),
+                             real_splice_async_read_cb, task);
+}
+
+static void
+real_splice_async_read_cb (GObject      *source,
+                           GAsyncResult *res,
+                           gpointer      user_data)
+{
+  GOutputStreamClass *class;
+  GTask *task = G_TASK (user_data);
+  SpliceData *op = g_task_get_task_data (task);
+  gssize ret;
+
+  class = G_OUTPUT_STREAM_GET_CLASS (g_task_get_source_object (task));
+
+  ret = g_input_stream_read_finish (op->source, res, &op->error);
+  if (ret == -1 || ret == 0)
+    {
+      real_splice_async_complete (task);
+      return;
+    }
+
+  op->n_read = ret;
+  op->n_written = 0;
+
+  class->write_async (g_task_get_source_object (task), op->buffer,
+                      op->n_read, g_task_get_priority (task),
+                      g_task_get_cancellable (task),
+                      real_splice_async_write_cb, task);
+}
+
+static void
 splice_async_thread (GTask        *task,
                      gpointer      source_object,
                      gpointer      task_data,
@@ -1611,11 +1772,20 @@ g_output_stream_real_splice_async (GOutputStream             *stream,
   op->flags = flags;
   op->source = g_object_ref (source);
 
-  /* TODO: In the case where both source and destintion have
-     non-threadbased async calls we can use a true async copy here */
-  
-  g_task_run_in_thread (task, splice_async_thread);
-  g_object_unref (task);
+  if (g_input_stream_async_read_is_via_threads (source) &&
+      g_output_stream_async_write_is_via_threads (stream))
+    {
+      g_task_run_in_thread (task, splice_async_thread);
+      g_object_unref (task);
+    }
+  else
+    {
+      op->buffer = g_malloc (8192);
+      g_input_stream_read_async (op->source, op->buffer, 8192,
+                                 g_task_get_priority (task),
+                                 g_task_get_cancellable (task),
+                                 real_splice_async_read_cb, task);
+    }
 }
 
 static gssize


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]