[glib: 2/15] Add writev() / writev_all() API to GOutputStream and GPollableOutputStream



commit 0bcc1773781264f7fae146dbabb1305af1da44f0
Author: Sebastian Dröge <sebastian centricular com>
Date:   Thu Sep 13 13:10:36 2018 +0300

    Add writev() / writev_all() API to GOutputStream and GPollableOutputStream
    
    This comes with default implementations around the normal write
    functions and async variants.
    
    Fixes https://gitlab.gnome.org/GNOME/glib/issues/1431

 docs/reference/gio/gio-sections.txt |   8 +
 gio/gioprivate.h                    |   1 +
 gio/goutputstream.c                 | 793 +++++++++++++++++++++++++++++++++++-
 gio/goutputstream.h                 |  68 +++-
 gio/gpollableoutputstream.c         | 172 +++++++-
 gio/gpollableoutputstream.h         |  13 +
 6 files changed, 1043 insertions(+), 12 deletions(-)
---
diff --git a/docs/reference/gio/gio-sections.txt b/docs/reference/gio/gio-sections.txt
index 3f2dd9a18..e61001f68 100644
--- a/docs/reference/gio/gio-sections.txt
+++ b/docs/reference/gio/gio-sections.txt
@@ -843,6 +843,12 @@ g_output_stream_write
 g_output_stream_write_all
 g_output_stream_write_all_async
 g_output_stream_write_all_finish
+g_output_stream_writev
+g_output_stream_writev_all
+g_output_stream_writev_async
+g_output_stream_writev_finish
+g_output_stream_writev_all_async
+g_output_stream_writev_all_finish
 g_output_stream_splice
 g_output_stream_flush
 g_output_stream_close
@@ -2124,6 +2130,7 @@ g_socket_receive_with_blocking
 g_socket_send
 g_socket_send_to
 g_socket_send_message
+g_socket_send_message_with_timeout
 g_socket_send_messages
 g_socket_send_with_blocking
 g_socket_close
@@ -3648,6 +3655,7 @@ g_pollable_output_stream_can_poll
 g_pollable_output_stream_is_writable
 g_pollable_output_stream_create_source
 g_pollable_output_stream_write_nonblocking
+g_pollable_output_stream_writev_nonblocking
 <SUBSECTION Standard>
 G_POLLABLE_OUTPUT_STREAM
 G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE
diff --git a/gio/gioprivate.h b/gio/gioprivate.h
index b79192566..a917510b9 100644
--- a/gio/gioprivate.h
+++ b/gio/gioprivate.h
@@ -29,6 +29,7 @@ G_BEGIN_DECLS
 gboolean g_input_stream_async_read_is_via_threads (GInputStream *stream);
 gboolean g_input_stream_async_close_is_via_threads (GInputStream *stream);
 gboolean g_output_stream_async_write_is_via_threads (GOutputStream *stream);
+gboolean g_output_stream_async_writev_is_via_threads (GOutputStream *stream);
 gboolean g_output_stream_async_close_is_via_threads (GOutputStream *stream);
 
 void g_socket_connection_set_cached_remote_address (GSocketConnection *connection,
diff --git a/gio/goutputstream.c b/gio/goutputstream.c
index 3e658e88a..f80cc0a2a 100644
--- a/gio/goutputstream.c
+++ b/gio/goutputstream.c
@@ -72,6 +72,23 @@ static void     g_output_stream_real_write_async   (GOutputStream             *s
 static gssize   g_output_stream_real_write_finish  (GOutputStream             *stream,
                                                    GAsyncResult              *result,
                                                    GError                   **error);
+static gboolean g_output_stream_real_writev        (GOutputStream             *stream,
+                                                   const GOutputVector       *vectors,
+                                                   gsize                      n_vectors,
+                                                   gsize                     *bytes_written,
+                                                   GCancellable              *cancellable,
+                                                   GError                   **error);
+static void     g_output_stream_real_writev_async  (GOutputStream             *stream,
+                                                   const GOutputVector       *vectors,
+                                                   gsize                      n_vectors,
+                                                   int                        io_priority,
+                                                   GCancellable              *cancellable,
+                                                   GAsyncReadyCallback        callback,
+                                                   gpointer                   data);
+static gboolean g_output_stream_real_writev_finish (GOutputStream             *stream,
+                                                   GAsyncResult              *result,
+                                                   gsize                     *bytes_written,
+                                                   GError                   **error);
 static void     g_output_stream_real_splice_async  (GOutputStream             *stream,
                                                    GInputStream              *source,
                                                    GOutputStreamSpliceFlags   flags,
@@ -134,6 +151,9 @@ g_output_stream_class_init (GOutputStreamClass *klass)
   
   klass->write_async = g_output_stream_real_write_async;
   klass->write_finish = g_output_stream_real_write_finish;
+  klass->writev_fn = g_output_stream_real_writev;
+  klass->writev_async = g_output_stream_real_writev_async;
+  klass->writev_finish = g_output_stream_real_writev_finish;
   klass->splice_async = g_output_stream_real_splice_async;
   klass->splice_finish = g_output_stream_real_splice_finish;
   klass->flush_async = g_output_stream_real_flush_async;
@@ -299,6 +319,204 @@ g_output_stream_write_all (GOutputStream  *stream,
   return TRUE;
 }
 
+/**
+ * g_output_stream_writev:
+ * @stream: a #GOutputStream.
+ * @vectors: (array length=n_vectors): the buffer containing the #GOutputVectors to write.
+ * @n_vectors: the number of vectors to write
+ * @bytes_written: (out) (optional): location to store the number of bytes that were
+ *     written to the stream
+ * @cancellable: (nullable): optional cancellable object
+ * @error: location to store the error occurring, or %NULL to ignore
+ *
+ * Tries to write the bytes contained in the @n_vectors @vectors into the
+ * stream. Will block during the operation.
+ *
+ * If @n_vectors is 0 or the sum of all bytes in @vectors is 0, returns 0 and
+ * does nothing.
+ *
+ * On success, the number of bytes written to the stream is returned.
+ * It is not an error if this is not the same as the requested size, as it
+ * can happen e.g. on a partial I/O error, or if there is not enough
+ * storage in the stream. All writes block until at least one byte
+ * is written or an error occurs; 0 is never returned (unless
+ * @n_vectors is 0 or the sum of all bytes in @vectors is 0).
+ *
+ * If @cancellable is not %NULL, then the operation can be cancelled by
+ * triggering the cancellable object from another thread. If the operation
+ * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
+ * operation was partially finished when the operation was cancelled the
+ * partial result will be returned, without an error.
+ *
+ * Some implementations of g_output_stream_writev() may have limitations on the
+ * aggregate buffer size, and will return %G_IO_ERROR_INVALID_ARGUMENT if these
+ * are exceeded. For example, when writing to a local file on UNIX platforms,
+ * the aggregate buffer size must not exceed %G_MAXSSIZE bytes.
+ *
+ * Virtual: writev_fn
+ *
+ * Returns: %TRUE on success, %FALSE if there was an error
+ *
+ * Since: 2.60
+ */
+gboolean
+g_output_stream_writev (GOutputStream        *stream,
+                       const GOutputVector  *vectors,
+                       gsize                 n_vectors,
+                       gsize                *bytes_written,
+                       GCancellable         *cancellable,
+                       GError              **error)
+{
+  GOutputStreamClass *class;
+  gboolean res;
+  gsize _bytes_written = 0;
+
+  if (bytes_written)
+    *bytes_written = 0;
+
+  g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
+  g_return_val_if_fail (vectors != NULL || n_vectors == 0, FALSE);
+  g_return_val_if_fail (cancellable == NULL || G_IS_CANCELLABLE (cancellable), FALSE);
+  g_return_val_if_fail (error == NULL || *error == NULL, FALSE);
+
+  if (n_vectors == 0)
+    return TRUE;
+
+  class = G_OUTPUT_STREAM_GET_CLASS (stream);
+
+  g_return_val_if_fail (class->writev_fn != NULL, FALSE);
+
+  if (!g_output_stream_set_pending (stream, error))
+    return FALSE;
+
+  if (cancellable)
+    g_cancellable_push_current (cancellable);
+
+  res = class->writev_fn (stream, vectors, n_vectors, &_bytes_written, cancellable, error);
+
+  g_warn_if_fail (res || _bytes_written == 0);
+  g_warn_if_fail (res || (error == NULL || *error != NULL));
+
+  if (cancellable)
+    g_cancellable_pop_current (cancellable);
+
+  g_output_stream_clear_pending (stream);
+
+  if (bytes_written)
+    *bytes_written = _bytes_written;
+
+  return res;
+}
+
+/**
+ * g_output_stream_writev_all:
+ * @stream: a #GOutputStream.
+ * @vectors: (array length=n_vectors): the buffer containing the #GOutputVectors to write.
+ * @n_vectors: the number of vectors to write
+ * @bytes_written: (out) (optional): location to store the number of bytes that were
+ *     written to the stream
+ * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
+ * @error: location to store the error occurring, or %NULL to ignore
+ *
+ * Tries to write the bytes contained in the @n_vectors @vectors into the
+ * stream. Will block during the operation.
+ *
+ * This function is similar to g_output_stream_writev(), except it tries to
+ * write as many bytes as requested, only stopping on an error.
+ *
+ * On a successful write of all @n_vectors vectors, %TRUE is returned, and
+ * @bytes_written is set to the sum of all the sizes of @vectors.
+ *
+ * If there is an error during the operation %FALSE is returned and @error
+ * is set to indicate the error status.
+ *
+ * As a special exception to the normal conventions for functions that
+ * use #GError, if this function returns %FALSE (and sets @error) then
+ * @bytes_written will be set to the number of bytes that were
+ * successfully written before the error was encountered.  This
+ * functionality is only available from C. If you need it from another
+ * language then you must write your own loop around
+ * g_output_stream_write().
+ *
+ * The content of the individual elements of @vectors might be changed by this
+ * function.
+ *
+ * Returns: %TRUE on success, %FALSE if there was an error
+ *
+ * Since: 2.60
+ */
+gboolean
+g_output_stream_writev_all (GOutputStream  *stream,
+                           GOutputVector  *vectors,
+                           gsize           n_vectors,
+                           gsize          *bytes_written,
+                           GCancellable   *cancellable,
+                           GError        **error)
+{
+  gsize _bytes_written = 0;
+  gsize i, to_be_written = 0;
+
+  if (bytes_written)
+    *bytes_written = 0;
+
+  g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
+  g_return_val_if_fail (vectors != NULL || n_vectors == 0, FALSE);
+  g_return_val_if_fail (cancellable == NULL || G_IS_CANCELLABLE (cancellable), FALSE);
+  g_return_val_if_fail (error == NULL || *error == NULL, FALSE);
+
+  /* We can't write more than G_MAXSIZE bytes overall, otherwise we
+   * would overflow the bytes_written counter */
+  for (i = 0; i < n_vectors; i++)
+    {
+       if (to_be_written > G_MAXSIZE - vectors[i].size)
+         {
+           g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
+                        _("Sum of vectors passed to %s too large"), G_STRFUNC);
+           return FALSE;
+         }
+       to_be_written += vectors[i].size;
+    }
+
+  _bytes_written = 0;
+  while (n_vectors > 0 && to_be_written > 0)
+    {
+      gsize n_written = 0;
+      gboolean res;
+
+      res = g_output_stream_writev (stream, vectors, n_vectors, &n_written, cancellable, error);
+
+      if (!res)
+        {
+          if (bytes_written)
+            *bytes_written = _bytes_written;
+          return FALSE;
+        }
+
+      if (n_written == 0)
+        g_warning ("Write returned zero without error");
+      _bytes_written += n_written;
+
+      /* skip vectors that have been written in full */
+      while (n_vectors > 0 && n_written >= vectors[0].size)
+        {
+          n_written -= vectors[0].size;
+          ++vectors;
+          --n_vectors;
+        }
+      /* skip partially written vector data */
+      if (n_written > 0 && n_vectors > 0)
+        {
+          vectors[0].size -= n_written;
+          vectors[0].buffer = ((guint8 *) vectors[0].buffer) + n_written;
+        }
+    }
+
+  if (bytes_written)
+    *bytes_written = _bytes_written;
+
+  return TRUE;
+}
+
 /**
  * g_output_stream_printf:
  * @stream: a #GOutputStream.
@@ -923,7 +1141,6 @@ write_all_callback (GObject      *stream,
       g_task_return_boolean (task, TRUE);
       g_object_unref (task);
     }
-
   else
     g_output_stream_write_async (G_OUTPUT_STREAM (stream),
                                  data->buffer + data->bytes_written,
@@ -1060,6 +1277,329 @@ g_output_stream_write_all_finish (GOutputStream  *stream,
   return g_task_propagate_boolean (task, error);
 }
 
+/**
+ * g_output_stream_writev_async:
+ * @stream: A #GOutputStream.
+ * @vectors: (array length=n_vectors): the buffer containing the #GOutputVectors to write.
+ * @n_vectors: the number of vectors to write
+ * @io_priority: the I/O priority of the request.
+ * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
+ * @callback: (scope async): callback to call when the request is satisfied
+ * @user_data: (closure): the data to pass to callback function
+ *
+ * Request an asynchronous write of the bytes contained in @n_vectors @vectors into
+ * the stream. When the operation is finished @callback will be called.
+ * You can then call g_output_stream_writev_finish() to get the result of the
+ * operation.
+ *
+ * During an async request no other sync and async calls are allowed,
+ * and will result in %G_IO_ERROR_PENDING errors.
+ *
+ * On success, the number of bytes written will be passed to the
+ * @callback. It is not an error if this is not the same as the
+ * requested size, as it can happen e.g. on a partial I/O error,
+ * but generally we try to write as many bytes as requested.
+ *
+ * You are guaranteed that this method will never fail with
+ * %G_IO_ERROR_WOULD_BLOCK — if @stream can't accept more data, the
+ * method will just wait until this changes.
+ *
+ * Any outstanding I/O request with higher priority (lower numerical
+ * value) will be executed before an outstanding request with lower
+ * priority. Default priority is %G_PRIORITY_DEFAULT.
+ *
+ * The asynchronous methods have a default fallback that uses threads
+ * to implement asynchronicity, so they are optional for inheriting
+ * classes. However, if you override one you must override all.
+ *
+ * For the synchronous, blocking version of this function, see
+ * g_output_stream_writev().
+ *
+ * Note that no copy of @vectors will be made, so it must stay valid
+ * until @callback is called.
+ *
+ * Since: 2.60
+ */
+void
+g_output_stream_writev_async (GOutputStream             *stream,
+                             const GOutputVector       *vectors,
+                             gsize                      n_vectors,
+                             int                        io_priority,
+                             GCancellable              *cancellable,
+                             GAsyncReadyCallback        callback,
+                             gpointer                   user_data)
+{
+  GOutputStreamClass *class;
+
+  g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
+  g_return_if_fail (vectors != NULL || n_vectors == 0);
+  g_return_if_fail (cancellable == NULL || G_IS_CANCELLABLE (cancellable));
+
+  class = G_OUTPUT_STREAM_GET_CLASS (stream);
+  g_return_if_fail (class->writev_async != NULL);
+
+  class->writev_async (stream, vectors, n_vectors, io_priority, cancellable,
+                       callback, user_data);
+}
+
+/**
+ * g_output_stream_writev_finish:
+ * @stream: a #GOutputStream.
+ * @result: a #GAsyncResult.
+ * @bytes_written: (out) (optional): location to store the number of bytes that were written to the stream
+ * @error: a #GError location to store the error occurring, or %NULL to
+ * ignore.
+ *
+ * Finishes a stream writev operation.
+ *
+ * Returns: %TRUE on success, %FALSE if there was an error
+ *
+ * Since: 2.60
+ */
+gboolean
+g_output_stream_writev_finish (GOutputStream  *stream,
+                               GAsyncResult   *result,
+                               gsize          *bytes_written,
+                               GError        **error)
+{
+  GOutputStreamClass *class;
+  gboolean res;
+  gsize _bytes_written = 0;
+
+  g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
+  g_return_val_if_fail (G_IS_ASYNC_RESULT (result), FALSE);
+  g_return_val_if_fail (error == NULL || *error == NULL, FALSE);
+
+  class = G_OUTPUT_STREAM_GET_CLASS (stream);
+  g_return_val_if_fail (class->writev_finish != NULL, FALSE);
+
+  res = class->writev_finish (stream, result, &_bytes_written, error);
+
+  g_warn_if_fail (res || _bytes_written == 0);
+  g_warn_if_fail (res || (error == NULL || *error != NULL));
+
+  if (bytes_written)
+    *bytes_written = _bytes_written;
+
+  return res;
+}
+
+typedef struct
+{
+  GOutputVector *vectors;
+  gsize n_vectors; /* (unowned) */
+  gsize bytes_written;
+} AsyncWritevAll;
+
+static void
+free_async_writev_all (gpointer data)
+{
+  g_slice_free (AsyncWritevAll, data);
+}
+
+static void
+writev_all_callback (GObject      *stream,
+                     GAsyncResult *result,
+                     gpointer      user_data)
+{
+  GTask *task = user_data;
+  AsyncWritevAll *data = g_task_get_task_data (task);
+  gint priority = g_task_get_priority (task);
+  GCancellable *cancellable = g_task_get_cancellable (task);
+
+  if (result)
+    {
+      GError *error = NULL;
+      gboolean res;
+      gsize n_written = 0;
+
+      res = g_output_stream_writev_finish (G_OUTPUT_STREAM (stream), result, &n_written, &error);
+
+      if (!res)
+        {
+          g_task_return_error (task, g_steal_pointer (&error));
+          g_object_unref (task);
+          return;
+        }
+
+      g_warn_if_fail (n_written > 0);
+      data->bytes_written += n_written;
+
+      /* skip vectors that have been written in full */
+      while (data->n_vectors > 0 && n_written >= data->vectors[0].size)
+        {
+          n_written -= data->vectors[0].size;
+          ++data->vectors;
+          --data->n_vectors;
+        }
+      /* skip partially written vector data */
+      if (n_written > 0 && data->n_vectors > 0)
+        {
+          data->vectors[0].size -= n_written;
+          data->vectors[0].buffer = ((guint8 *) data->vectors[0].buffer) + n_written;
+        }
+    }
+
+  if (data->n_vectors == 0)
+    {
+      g_task_return_boolean (task, TRUE);
+      g_object_unref (task);
+    }
+  else
+    g_output_stream_writev_async (G_OUTPUT_STREAM (stream),
+                                  data->vectors,
+                                  data->n_vectors,
+                                  priority,
+                                  cancellable,
+                                  writev_all_callback, g_steal_pointer (&task));
+}
+
+static void
+writev_all_async_thread (GTask        *task,
+                         gpointer      source_object,
+                         gpointer      task_data,
+                         GCancellable *cancellable)
+{
+  GOutputStream *stream = G_OUTPUT_STREAM (source_object);
+  AsyncWritevAll *data = task_data;
+  GError *error = NULL;
+
+  if (g_output_stream_writev_all (stream, data->vectors, data->n_vectors, &data->bytes_written,
+                                  g_task_get_cancellable (task), &error))
+    g_task_return_boolean (task, TRUE);
+  else
+    g_task_return_error (task, g_steal_pointer (&error));
+}
+
+/**
+ * g_output_stream_writev_all_async:
+ * @stream: A #GOutputStream
+ * @vectors: (array length=n_vectors): the buffer containing the #GOutputVectors to write.
+ * @n_vectors: the number of vectors to write
+ * @io_priority: the I/O priority of the request
+ * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore
+ * @callback: (scope async): callback to call when the request is satisfied
+ * @user_data: (closure): the data to pass to callback function
+ *
+ * Request an asynchronous write of the bytes contained in the @n_vectors @vectors into
+ * the stream. When the operation is finished @callback will be called.
+ * You can then call g_output_stream_writev_all_finish() to get the result of the
+ * operation.
+ *
+ * This is the asynchronous version of g_output_stream_writev_all().
+ *
+ * Call g_output_stream_writev_all_finish() to collect the result.
+ *
+ * Any outstanding I/O request with higher priority (lower numerical
+ * value) will be executed before an outstanding request with lower
+ * priority. Default priority is %G_PRIORITY_DEFAULT.
+ *
+ * Note that no copy of @vectors will be made, so it must stay valid
+ * until @callback is called. The content of the individual elements
+ * of @vectors might be changed by this function.
+ *
+ * Since: 2.60
+ */
+void
+g_output_stream_writev_all_async (GOutputStream       *stream,
+                                  GOutputVector       *vectors,
+                                  gsize                n_vectors,
+                                  int                  io_priority,
+                                  GCancellable        *cancellable,
+                                  GAsyncReadyCallback  callback,
+                                  gpointer             user_data)
+{
+  AsyncWritevAll *data;
+  GTask *task;
+  gsize i, to_be_written = 0;
+
+  g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
+  g_return_if_fail (vectors != NULL || n_vectors == 0);
+  g_return_if_fail (cancellable == NULL || G_IS_CANCELLABLE (cancellable));
+
+  task = g_task_new (stream, cancellable, callback, user_data);
+  data = g_slice_new0 (AsyncWritevAll);
+  data->vectors = vectors;
+  data->n_vectors = n_vectors;
+
+  g_task_set_source_tag (task, g_output_stream_writev_all_async);
+  g_task_set_task_data (task, data, free_async_writev_all);
+  g_task_set_priority (task, io_priority);
+
+  /* We can't write more than G_MAXSIZE bytes overall, otherwise we
+   * would overflow the bytes_written counter */
+  for (i = 0; i < n_vectors; i++)
+    {
+       if (to_be_written > G_MAXSIZE - vectors[i].size)
+         {
+           g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
+                                    _("Sum of vectors passed to %s too large"),
+                                    G_STRFUNC);
+           g_object_unref (task);
+           return;
+         }
+       to_be_written += vectors[i].size;
+    }
+
+  /* If async writes are going to be handled via the threadpool anyway
+   * then we may as well do it with a single dispatch instead of
+   * bouncing in and out.
+   */
+  if (g_output_stream_async_writev_is_via_threads (stream))
+    {
+      g_task_run_in_thread (task, writev_all_async_thread);
+      g_object_unref (task);
+    }
+  else
+    writev_all_callback (G_OBJECT (stream), NULL, g_steal_pointer (&task));
+}
+
+/**
+ * g_output_stream_writev_all_finish:
+ * @stream: a #GOutputStream
+ * @result: a #GAsyncResult
+ * @bytes_written: (out) (optional): location to store the number of bytes that were written to the stream
+ * @error: a #GError location to store the error occurring, or %NULL to ignore.
+ *
+ * Finishes an asynchronous stream write operation started with
+ * g_output_stream_writev_all_async().
+ *
+ * As a special exception to the normal conventions for functions that
+ * use #GError, if this function returns %FALSE (and sets @error) then
+ * @bytes_written will be set to the number of bytes that were
+ * successfully written before the error was encountered.  This
+ * functionality is only available from C.  If you need it from another
+ * language then you must write your own loop around
+ * g_output_stream_writev_async().
+ *
+ * Returns: %TRUE on success, %FALSE if there was an error
+ *
+ * Since: 2.60
+ */
+gboolean
+g_output_stream_writev_all_finish (GOutputStream  *stream,
+                                   GAsyncResult   *result,
+                                   gsize          *bytes_written,
+                                   GError        **error)
+{
+  GTask *task;
+
+  g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
+  g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
+  g_return_val_if_fail (error == NULL || *error == NULL, FALSE);
+
+  task = G_TASK (result);
+
+  if (bytes_written)
+    {
+      AsyncWritevAll *data = (AsyncWritevAll *)g_task_get_task_data (task);
+
+      *bytes_written = data->bytes_written;
+    }
+
+  return g_task_propagate_boolean (task, error);
+}
+
 static void
 write_bytes_callback (GObject      *stream,
                       GAsyncResult *result,
@@ -1712,6 +2252,28 @@ g_output_stream_async_write_is_via_threads (GOutputStream *stream)
         g_pollable_output_stream_can_poll (G_POLLABLE_OUTPUT_STREAM (stream))));
 }
 
+/*< internal >
+ * g_output_stream_async_writev_is_via_threads:
+ * @stream: a #GOutputStream.
+ *
+ * Checks if an output stream's writev_async function uses threads.
+ *
+ * Returns: %TRUE if @stream's writev_async function uses threads.
+ **/
+gboolean
+g_output_stream_async_writev_is_via_threads (GOutputStream *stream)
+{
+  GOutputStreamClass *class;
+
+  g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
+
+  class = G_OUTPUT_STREAM_GET_CLASS (stream);
+
+  return (class->writev_async == g_output_stream_real_writev_async &&
+      !(G_IS_POLLABLE_OUTPUT_STREAM (stream) &&
+        g_pollable_output_stream_can_poll (G_POLLABLE_OUTPUT_STREAM (stream))));
+}
+
 /*< internal >
  * g_output_stream_async_close_is_via_threads:
  * @stream: output stream
@@ -1732,6 +2294,69 @@ g_output_stream_async_close_is_via_threads (GOutputStream *stream)
   return class->close_async == g_output_stream_real_close_async;
 }
 
+/********************************************
+ *   Default implementation of sync ops    *
+ ********************************************/
+static gboolean
+g_output_stream_real_writev (GOutputStream         *stream,
+                             const GOutputVector   *vectors,
+                             gsize                  n_vectors,
+                             gsize                 *bytes_written,
+                             GCancellable          *cancellable,
+                             GError               **error)
+{
+  GOutputStreamClass *class;
+  gsize _bytes_written = 0;
+  gsize i;
+  GError *err = NULL;
+
+  class = G_OUTPUT_STREAM_GET_CLASS (stream);
+
+  if (bytes_written)
+    *bytes_written = 0;
+
+  for (i = 0; i < n_vectors; i++)
+    {
+      gssize res = 0;
+
+      /* Would we overflow here? In that case simply return and let the caller
+       * handle this like a short write */
+      if (_bytes_written > G_MAXSIZE - vectors[i].size)
+        break;
+
+      res = class->write_fn (stream, vectors[i].buffer, vectors[i].size, cancellable, &err);
+
+      if (res == -1)
+        {
+          /* If we already wrote something  we handle this like a short write
+           * and assume that on the next call the same error happens again, or
+           * everything finishes successfully without data loss then
+           */
+          if (_bytes_written > 0)
+            {
+              if (bytes_written)
+                *bytes_written = _bytes_written;
+
+              g_clear_error (&err);
+              return TRUE;
+            }
+
+          g_propagate_error (error, err);
+          return FALSE;
+        }
+
+      _bytes_written += res;
+      /* if we had a short write break the loop here */
+      if (res < vectors[i].size)
+        break;
+    }
+
+  if (bytes_written)
+    *bytes_written = _bytes_written;
+
+  return TRUE;
+}
+
 /********************************************
  *   Default implementation of async ops    *
  ********************************************/
@@ -1852,6 +2477,172 @@ g_output_stream_real_write_finish (GOutputStream  *stream,
   return g_task_propagate_int (G_TASK (result), error);
 }
 
+typedef struct {
+  const GOutputVector *vectors;
+  gsize                n_vectors; /* (unowned) */
+  gsize                bytes_written;
+} WritevData;
+
+static void
+free_writev_data (WritevData *op)
+{
+  g_slice_free (WritevData, op);
+}
+
+static void
+writev_async_thread (GTask        *task,
+                     gpointer      source_object,
+                     gpointer      task_data,
+                     GCancellable *cancellable)
+{
+  GOutputStream *stream = source_object;
+  WritevData *op = task_data;
+  GOutputStreamClass *class;
+  GError *error = NULL;
+  gboolean res;
+
+  class = G_OUTPUT_STREAM_GET_CLASS (stream);
+  res = class->writev_fn (stream, op->vectors, op->n_vectors,
+                          &op->bytes_written, cancellable, &error);
+
+  g_warn_if_fail (res || op->bytes_written == 0);
+  g_warn_if_fail (res || error != NULL);
+
+  if (!res)
+    g_task_return_error (task, g_steal_pointer (&error));
+  else
+    g_task_return_boolean (task, TRUE);
+}
+
+static void writev_async_pollable (GPollableOutputStream *stream,
+                                   GTask                 *task);
+
+static gboolean
+writev_async_pollable_ready (GPollableOutputStream *stream,
+                            gpointer               user_data)
+{
+  GTask *task = user_data;
+
+  writev_async_pollable (stream, task);
+  return G_SOURCE_REMOVE;
+}
+
+static void
+writev_async_pollable (GPollableOutputStream *stream,
+                       GTask                 *task)
+{
+  GError *error = NULL;
+  WritevData *op = g_task_get_task_data (task);
+  GPollableReturn res;
+  gsize bytes_written = 0;
+
+  if (g_task_return_error_if_cancelled (task))
+    return;
+
+  res = G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE (stream)->
+    writev_nonblocking (stream, op->vectors, op->n_vectors, &bytes_written, &error);
+
+  switch (res)
+    {
+    case G_POLLABLE_RETURN_WOULD_BLOCK:
+        {
+          GSource *source;
+
+          g_warn_if_fail (error == NULL);
+          g_warn_if_fail (bytes_written == 0);
+
+          source = g_pollable_output_stream_create_source (stream,
+                                                           g_task_get_cancellable (task));
+          g_task_attach_source (task, source,
+                                (GSourceFunc) writev_async_pollable_ready);
+          g_source_unref (source);
+        }
+        break;
+      case G_POLLABLE_RETURN_OK:
+        g_warn_if_fail (error == NULL);
+        op->bytes_written = bytes_written;
+        g_task_return_boolean (task, TRUE);
+        break;
+      case G_POLLABLE_RETURN_FAILED:
+        g_warn_if_fail (bytes_written == 0);
+        g_warn_if_fail (error != NULL);
+        g_task_return_error (task, g_steal_pointer (&error));
+        break;
+      default:
+        g_assert_not_reached ();
+    }
+}
+
+static void
+g_output_stream_real_writev_async (GOutputStream        *stream,
+                                   const GOutputVector  *vectors,
+                                   gsize                 n_vectors,
+                                   int                   io_priority,
+                                   GCancellable         *cancellable,
+                                   GAsyncReadyCallback   callback,
+                                   gpointer              user_data)
+{
+  GTask *task;
+  WritevData *op;
+  GError *error = NULL;
+
+  op = g_slice_new0 (WritevData);
+  task = g_task_new (stream, cancellable, callback, user_data);
+  op->vectors = vectors;
+  op->n_vectors = n_vectors;
+
+  g_task_set_check_cancellable (task, FALSE);
+  g_task_set_source_tag (task, g_output_stream_writev_async);
+  g_task_set_priority (task, io_priority);
+  g_task_set_task_data (task, op, (GDestroyNotify) free_writev_data);
+
+  if (n_vectors == 0)
+    {
+      g_task_return_boolean (task, TRUE);
+      g_object_unref (task);
+      return;
+    }
+
+  if (!g_output_stream_set_pending (stream, &error))
+    {
+      g_task_return_error (task, g_steal_pointer (&error));
+      g_object_unref (task);
+      return;
+    }
+
+  if (!g_output_stream_async_writev_is_via_threads (stream))
+    writev_async_pollable (G_POLLABLE_OUTPUT_STREAM (stream), task);
+  else
+    g_task_run_in_thread (task, writev_async_thread);
+
+  g_object_unref (task);
+}
+
+static gboolean
+g_output_stream_real_writev_finish (GOutputStream   *stream,
+                                    GAsyncResult    *result,
+                                    gsize           *bytes_written,
+                                    GError         **error)
+{
+  GTask *task;
+
+  g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
+  g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_writev_async), FALSE);
+
+  g_output_stream_clear_pending (stream);
+
+  task = G_TASK (result);
+
+  if (bytes_written)
+    {
+      WritevData *op = g_task_get_task_data (task);
+
+      *bytes_written = op->bytes_written;
+    }
+
+  return g_task_propagate_boolean (task, error);
+}
+
 typedef struct {
   GInputStream *source;
   GOutputStreamSpliceFlags flags;
diff --git a/gio/goutputstream.h b/gio/goutputstream.h
index fef1b8fdf..dc0f4925a 100644
--- a/gio/goutputstream.h
+++ b/gio/goutputstream.h
@@ -119,11 +119,28 @@ struct _GOutputStreamClass
                                  GAsyncResult             *result,
                                  GError                  **error);
 
+  gboolean    (* writev_fn)     (GOutputStream            *stream,
+                                 const GOutputVector      *vectors,
+                                 gsize                     n_vectors,
+                                 gsize                    *bytes_written,
+                                 GCancellable             *cancellable,
+                                 GError                  **error);
+
+  void        (* writev_async)  (GOutputStream            *stream,
+                                 const GOutputVector      *vectors,
+                                 gsize                     n_vectors,
+                                 int                       io_priority,
+                                 GCancellable             *cancellable,
+                                 GAsyncReadyCallback       callback,
+                                 gpointer                  user_data);
+
+  gboolean    (* writev_finish) (GOutputStream            *stream,
+                                 GAsyncResult             *result,
+                                 gsize                    *bytes_written,
+                                 GError                  **error);
+
   /*< private >*/
   /* Padding for future expansion */
-  void (*_g_reserved1) (void);
-  void (*_g_reserved2) (void);
-  void (*_g_reserved3) (void);
   void (*_g_reserved4) (void);
   void (*_g_reserved5) (void);
   void (*_g_reserved6) (void);
@@ -147,6 +164,22 @@ gboolean g_output_stream_write_all     (GOutputStream             *stream,
                                        gsize                     *bytes_written,
                                        GCancellable              *cancellable,
                                        GError                   **error);
+
+GLIB_AVAILABLE_IN_2_60
+gboolean g_output_stream_writev        (GOutputStream             *stream,
+                                       const GOutputVector       *vectors,
+                                       gsize                      n_vectors,
+                                       gsize                     *bytes_written,
+                                       GCancellable              *cancellable,
+                                       GError                   **error);
+GLIB_AVAILABLE_IN_2_60
+gboolean g_output_stream_writev_all    (GOutputStream             *stream,
+                                       GOutputVector             *vectors,
+                                       gsize                      n_vectors,
+                                       gsize                     *bytes_written,
+                                       GCancellable              *cancellable,
+                                       GError                   **error);
+
 GLIB_AVAILABLE_IN_2_40
 gboolean g_output_stream_printf        (GOutputStream             *stream,
                                         gsize                     *bytes_written,
@@ -208,6 +241,35 @@ gboolean g_output_stream_write_all_finish (GOutputStream          *stream,
                                            gsize                  *bytes_written,
                                            GError                **error);
 
+GLIB_AVAILABLE_IN_2_60
+void     g_output_stream_writev_async  (GOutputStream             *stream,
+                                       const GOutputVector       *vectors,
+                                       gsize                      n_vectors,
+                                       int                        io_priority,
+                                       GCancellable              *cancellable,
+                                       GAsyncReadyCallback        callback,
+                                       gpointer                   user_data);
+GLIB_AVAILABLE_IN_2_60
+gboolean g_output_stream_writev_finish (GOutputStream             *stream,
+                                       GAsyncResult              *result,
+                                       gsize                     *bytes_written,
+                                       GError                   **error);
+
+GLIB_AVAILABLE_IN_2_60
+void     g_output_stream_writev_all_async (GOutputStream           *stream,
+                                           GOutputVector           *vectors,
+                                           gsize                    n_vectors,
+                                           int                      io_priority,
+                                           GCancellable            *cancellable,
+                                           GAsyncReadyCallback      callback,
+                                           gpointer                 user_data);
+
+GLIB_AVAILABLE_IN_2_60
+gboolean g_output_stream_writev_all_finish (GOutputStream          *stream,
+                                            GAsyncResult           *result,
+                                            gsize                  *bytes_written,
+                                            GError                **error);
+
 GLIB_AVAILABLE_IN_2_34
 void     g_output_stream_write_bytes_async  (GOutputStream             *stream,
                                             GBytes                    *bytes,
diff --git a/gio/gpollableoutputstream.c b/gio/gpollableoutputstream.c
index 40c649f0d..c17cf9268 100644
--- a/gio/gpollableoutputstream.c
+++ b/gio/gpollableoutputstream.c
@@ -41,17 +41,23 @@
 
 G_DEFINE_INTERFACE (GPollableOutputStream, g_pollable_output_stream, G_TYPE_OUTPUT_STREAM)
 
-static gboolean g_pollable_output_stream_default_can_poll          (GPollableOutputStream *stream);
-static gssize   g_pollable_output_stream_default_write_nonblocking (GPollableOutputStream  *stream,
-                                                                   const void             *buffer,
-                                                                   gsize                   count,
-                                                                   GError                **error);
+static gboolean g_pollable_output_stream_default_can_poll           (GPollableOutputStream *stream);
+static gssize   g_pollable_output_stream_default_write_nonblocking  (GPollableOutputStream  *stream,
+                                                                    const void             *buffer,
+                                                                    gsize                   count,
+                                                                    GError                **error);
+static GPollableReturn g_pollable_output_stream_default_writev_nonblocking (GPollableOutputStream  *stream,
+                                                                           const GOutputVector    *vectors,
+                                                                           gsize                   n_vectors,
+                                                                           gsize                  
*bytes_written,
+                                                                           GError                **error);
 
 static void
 g_pollable_output_stream_default_init (GPollableOutputStreamInterface *iface)
 {
-  iface->can_poll          = g_pollable_output_stream_default_can_poll;
-  iface->write_nonblocking = g_pollable_output_stream_default_write_nonblocking;
+  iface->can_poll           = g_pollable_output_stream_default_can_poll;
+  iface->write_nonblocking  = g_pollable_output_stream_default_write_nonblocking;
+  iface->writev_nonblocking = g_pollable_output_stream_default_writev_nonblocking;
 }
 
 static gboolean
@@ -157,6 +163,67 @@ g_pollable_output_stream_default_write_nonblocking (GPollableOutputStream  *stre
     write_fn (G_OUTPUT_STREAM (stream), buffer, count, NULL, error);
 }
 
+static GPollableReturn
+g_pollable_output_stream_default_writev_nonblocking (GPollableOutputStream  *stream,
+                                                    const GOutputVector    *vectors,
+                                                    gsize                   n_vectors,
+                                                    gsize                  *bytes_written,
+                                                    GError                **error)
+{
+  gsize _bytes_written = 0;
+  GPollableOutputStreamInterface *iface = G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE (stream);
+  gsize i;
+  GError *err = NULL;
+
+  for (i = 0; i < n_vectors; i++)
+    {
+      gssize res;
+
+      /* Would we overflow here? In that case simply return and let the caller
+       * handle this like a short write */
+      if (_bytes_written > G_MAXSIZE - vectors[i].size)
+        break;
+
+      res = iface->write_nonblocking (stream, vectors[i].buffer, vectors[i].size, &err);
+      if (res == -1)
+        {
+          if (bytes_written)
+            *bytes_written = _bytes_written;
+
+          /* If something was written already we handle this like a short
+           * write and assume that the next call would either give the same
+           * error again or successfully finish writing without errors or data
+           * loss
+           */
+          if (_bytes_written > 0)
+            {
+              g_clear_error (&err);
+              return G_POLLABLE_RETURN_OK;
+            }
+          else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
+            {
+              g_clear_error (&err);
+              return G_POLLABLE_RETURN_WOULD_BLOCK;
+            }
+          else
+            {
+              g_propagate_error (error, err);
+              return G_POLLABLE_RETURN_FAILED;
+            }
+        }
+
+      _bytes_written += res;
+      /* if we had a short write break the loop here */
+      if (res < vectors[i].size)
+        break;
+    }
+
+  if (bytes_written)
+    *bytes_written = _bytes_written;
+
+  return G_POLLABLE_RETURN_OK;
+}
+
 /**
  * g_pollable_output_stream_write_nonblocking:
  * @stream: a #GPollableOutputStream
@@ -179,7 +246,8 @@ g_pollable_output_stream_default_write_nonblocking (GPollableOutputStream  *stre
  * to having been cancelled.
  *
  * Also note that if %G_IO_ERROR_WOULD_BLOCK is returned some underlying
- * transports like D/TLS require that you send the same @buffer and @count.
+ * transports like D/TLS require that you re-send the same @buffer and
+ * @count in the next write call.
  *
  * Virtual: write_nonblocking
  * Returns: the number of bytes written, or -1 on error (including
@@ -221,3 +289,91 @@ g_pollable_output_stream_write_nonblocking (GPollableOutputStream  *stream,
 
   return res;
 }
+
+/**
+ * g_pollable_output_stream_writev_nonblocking:
+ * @stream: a #GPollableOutputStream
+ * @vectors: (array length=n_vectors): the buffer containing the #GOutputVectors to write.
+ * @n_vectors: the number of vectors to write
+ * @bytes_written: (out) (optional): location to store the number of bytes that were
+ *     written to the stream
+ * @cancellable: (nullable): a #GCancellable, or %NULL
+ * @error: #GError for error reporting, or %NULL to ignore.
+ *
+ * Attempts to write the bytes contained in the @n_vectors @vectors to @stream,
+ * as with g_output_stream_writev(). If @stream is not currently writable,
+ * this will immediately return %@G_POLLABLE_RETURN_WOULD_BLOCK, and you can
+ * use g_pollable_output_stream_create_source() to create a #GSource
+ * that will be triggered when @stream is writable. @error will *not* be
+ * set in that case.
+ *
+ * Note that since this method never blocks, you cannot actually
+ * use @cancellable to cancel it. However, it will return an error
+ * if @cancellable has already been cancelled when you call, which
+ * may happen if you call this method after a source triggers due
+ * to having been cancelled.
+ *
+ * Also note that if %G_POLLABLE_RETURN_WOULD_BLOCK is returned some underlying
+ * transports like D/TLS require that you re-send the same @vectors and
+ * @n_vectors in the next write call.
+ *
+ * Virtual: writev_nonblocking
+ *
+ * Returns: %@G_POLLABLE_RETURN_OK on success, %G_POLLABLE_RETURN_WOULD_BLOCK
+ * if the stream is not currently writable (and @error is *not* set), or
+ * %G_POLLABLE_RETURN_FAILED if there was an error in which case @error will
+ * be set.
+ *
+ * Since: 2.60
+ */
+GPollableReturn
+g_pollable_output_stream_writev_nonblocking (GPollableOutputStream  *stream,
+                                            const GOutputVector    *vectors,
+                                            gsize                   n_vectors,
+                                            gsize                  *bytes_written,
+                                            GCancellable           *cancellable,
+                                            GError                **error)
+{
+  GPollableOutputStreamInterface *iface;
+  GPollableReturn res;
+  gsize _bytes_written = 0;
+
+  if (bytes_written)
+    *bytes_written = 0;
+
+  g_return_val_if_fail (G_IS_POLLABLE_OUTPUT_STREAM (stream), G_POLLABLE_RETURN_FAILED);
+  g_return_val_if_fail (vectors != NULL || n_vectors == 0, G_POLLABLE_RETURN_FAILED);
+  g_return_val_if_fail (cancellable == NULL || G_IS_CANCELLABLE (cancellable), G_POLLABLE_RETURN_FAILED);
+  g_return_val_if_fail (error == NULL || *error == NULL, G_POLLABLE_RETURN_FAILED);
+
+  if (g_cancellable_set_error_if_cancelled (cancellable, error))
+    return G_POLLABLE_RETURN_FAILED;
+
+  if (n_vectors == 0)
+    return G_POLLABLE_RETURN_OK;
+
+  iface = G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE (stream);
+  g_return_val_if_fail (iface->writev_nonblocking != NULL, G_POLLABLE_RETURN_FAILED);
+
+  if (cancellable)
+    g_cancellable_push_current (cancellable);
+
+  res = iface->
+    writev_nonblocking (stream, vectors, n_vectors, &_bytes_written, error);
+
+  if (cancellable)
+    g_cancellable_pop_current (cancellable);
+
+  if (res == G_POLLABLE_RETURN_FAILED)
+    g_warn_if_fail (error == NULL || (*error != NULL && !g_error_matches (*error, G_IO_ERROR, 
G_IO_ERROR_WOULD_BLOCK)));
+  else if (res == G_POLLABLE_RETURN_WOULD_BLOCK)
+    g_warn_if_fail (error == NULL || *error == NULL);
+
+  /* in case of not-OK nothing must've been written */
+  g_warn_if_fail (res == G_POLLABLE_RETURN_OK || _bytes_written == 0);
+
+  if (bytes_written)
+    *bytes_written = _bytes_written;
+
+  return res;
+}
diff --git a/gio/gpollableoutputstream.h b/gio/gpollableoutputstream.h
index bf13584d5..1ef830b57 100644
--- a/gio/gpollableoutputstream.h
+++ b/gio/gpollableoutputstream.h
@@ -77,6 +77,11 @@ struct _GPollableOutputStreamInterface
                                     const void             *buffer,
                                     gsize                   count,
                                     GError                **error);
+  GPollableReturn (*writev_nonblocking) (GPollableOutputStream  *stream,
+                                        const GOutputVector    *vectors,
+                                        gsize                   n_vectors,
+                                        gsize                  *bytes_written,
+                                        GError                **error);
 };
 
 GLIB_AVAILABLE_IN_ALL
@@ -98,6 +103,14 @@ gssize   g_pollable_output_stream_write_nonblocking (GPollableOutputStream  *str
                                                     GCancellable           *cancellable,
                                                     GError                **error);
 
+GLIB_AVAILABLE_IN_2_60
+GPollableReturn g_pollable_output_stream_writev_nonblocking (GPollableOutputStream  *stream,
+                                                            const GOutputVector    *vectors,
+                                                            gsize                   n_vectors,
+                                                            gsize                  *bytes_written,
+                                                            GCancellable           *cancellable,
+                                                            GError                **error);
+
 G_END_DECLS
 
 



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]