[glib: 2/15] Add writev() / writev_all() API to GOutputStream and GPollableOutputStream
- From: Philip Withnall <pwithnall src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [glib: 2/15] Add writev() / writev_all() API to GOutputStream and GPollableOutputStream
- Date: Thu, 24 Jan 2019 14:43:14 +0000 (UTC)
commit 0bcc1773781264f7fae146dbabb1305af1da44f0
Author: Sebastian Dröge <sebastian centricular com>
Date: Thu Sep 13 13:10:36 2018 +0300
Add writev() / writev_all() API to GOutputStream and GPollableOutputStream
This comes with default implementations around the normal write
functions and async variants.
Fixes https://gitlab.gnome.org/GNOME/glib/issues/1431
docs/reference/gio/gio-sections.txt | 8 +
gio/gioprivate.h | 1 +
gio/goutputstream.c | 793 +++++++++++++++++++++++++++++++++++-
gio/goutputstream.h | 68 +++-
gio/gpollableoutputstream.c | 172 +++++++-
gio/gpollableoutputstream.h | 13 +
6 files changed, 1043 insertions(+), 12 deletions(-)
---
diff --git a/docs/reference/gio/gio-sections.txt b/docs/reference/gio/gio-sections.txt
index 3f2dd9a18..e61001f68 100644
--- a/docs/reference/gio/gio-sections.txt
+++ b/docs/reference/gio/gio-sections.txt
@@ -843,6 +843,12 @@ g_output_stream_write
g_output_stream_write_all
g_output_stream_write_all_async
g_output_stream_write_all_finish
+g_output_stream_writev
+g_output_stream_writev_all
+g_output_stream_writev_async
+g_output_stream_writev_finish
+g_output_stream_writev_all_async
+g_output_stream_writev_all_finish
g_output_stream_splice
g_output_stream_flush
g_output_stream_close
@@ -2124,6 +2130,7 @@ g_socket_receive_with_blocking
g_socket_send
g_socket_send_to
g_socket_send_message
+g_socket_send_message_with_timeout
g_socket_send_messages
g_socket_send_with_blocking
g_socket_close
@@ -3648,6 +3655,7 @@ g_pollable_output_stream_can_poll
g_pollable_output_stream_is_writable
g_pollable_output_stream_create_source
g_pollable_output_stream_write_nonblocking
+g_pollable_output_stream_writev_nonblocking
<SUBSECTION Standard>
G_POLLABLE_OUTPUT_STREAM
G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE
diff --git a/gio/gioprivate.h b/gio/gioprivate.h
index b79192566..a917510b9 100644
--- a/gio/gioprivate.h
+++ b/gio/gioprivate.h
@@ -29,6 +29,7 @@ G_BEGIN_DECLS
gboolean g_input_stream_async_read_is_via_threads (GInputStream *stream);
gboolean g_input_stream_async_close_is_via_threads (GInputStream *stream);
gboolean g_output_stream_async_write_is_via_threads (GOutputStream *stream);
+gboolean g_output_stream_async_writev_is_via_threads (GOutputStream *stream);
gboolean g_output_stream_async_close_is_via_threads (GOutputStream *stream);
void g_socket_connection_set_cached_remote_address (GSocketConnection *connection,
diff --git a/gio/goutputstream.c b/gio/goutputstream.c
index 3e658e88a..f80cc0a2a 100644
--- a/gio/goutputstream.c
+++ b/gio/goutputstream.c
@@ -72,6 +72,23 @@ static void g_output_stream_real_write_async (GOutputStream *s
static gssize g_output_stream_real_write_finish (GOutputStream *stream,
GAsyncResult *result,
GError **error);
+static gboolean g_output_stream_real_writev (GOutputStream *stream,
+ const GOutputVector *vectors,
+ gsize n_vectors,
+ gsize *bytes_written,
+ GCancellable *cancellable,
+ GError **error);
+static void g_output_stream_real_writev_async (GOutputStream *stream,
+ const GOutputVector *vectors,
+ gsize n_vectors,
+ int io_priority,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer data);
+static gboolean g_output_stream_real_writev_finish (GOutputStream *stream,
+ GAsyncResult *result,
+ gsize *bytes_written,
+ GError **error);
static void g_output_stream_real_splice_async (GOutputStream *stream,
GInputStream *source,
GOutputStreamSpliceFlags flags,
@@ -134,6 +151,9 @@ g_output_stream_class_init (GOutputStreamClass *klass)
klass->write_async = g_output_stream_real_write_async;
klass->write_finish = g_output_stream_real_write_finish;
+ klass->writev_fn = g_output_stream_real_writev;
+ klass->writev_async = g_output_stream_real_writev_async;
+ klass->writev_finish = g_output_stream_real_writev_finish;
klass->splice_async = g_output_stream_real_splice_async;
klass->splice_finish = g_output_stream_real_splice_finish;
klass->flush_async = g_output_stream_real_flush_async;
@@ -299,6 +319,204 @@ g_output_stream_write_all (GOutputStream *stream,
return TRUE;
}
+/**
+ * g_output_stream_writev:
+ * @stream: a #GOutputStream.
+ * @vectors: (array length=n_vectors): the buffer containing the #GOutputVectors to write.
+ * @n_vectors: the number of vectors to write
+ * @bytes_written: (out) (optional): location to store the number of bytes that were
+ * written to the stream
+ * @cancellable: (nullable): optional cancellable object
+ * @error: location to store the error occurring, or %NULL to ignore
+ *
+ * Tries to write the bytes contained in the @n_vectors @vectors into the
+ * stream. Will block during the operation.
+ *
+ * If @n_vectors is 0 or the sum of all bytes in @vectors is 0, returns 0 and
+ * does nothing.
+ *
+ * On success, the number of bytes written to the stream is returned.
+ * It is not an error if this is not the same as the requested size, as it
+ * can happen e.g. on a partial I/O error, or if there is not enough
+ * storage in the stream. All writes block until at least one byte
+ * is written or an error occurs; 0 is never returned (unless
+ * @n_vectors is 0 or the sum of all bytes in @vectors is 0).
+ *
+ * If @cancellable is not %NULL, then the operation can be cancelled by
+ * triggering the cancellable object from another thread. If the operation
+ * was cancelled, the error %G_IO_ERROR_CANCELLED will be returned. If an
+ * operation was partially finished when the operation was cancelled the
+ * partial result will be returned, without an error.
+ *
+ * Some implementations of g_output_stream_writev() may have limitations on the
+ * aggregate buffer size, and will return %G_IO_ERROR_INVALID_ARGUMENT if these
+ * are exceeded. For example, when writing to a local file on UNIX platforms,
+ * the aggregate buffer size must not exceed %G_MAXSSIZE bytes.
+ *
+ * Virtual: writev_fn
+ *
+ * Returns: %TRUE on success, %FALSE if there was an error
+ *
+ * Since: 2.60
+ */
+gboolean
+g_output_stream_writev (GOutputStream *stream,
+ const GOutputVector *vectors,
+ gsize n_vectors,
+ gsize *bytes_written,
+ GCancellable *cancellable,
+ GError **error)
+{
+ GOutputStreamClass *class;
+ gboolean res;
+ gsize _bytes_written = 0;
+
+ if (bytes_written)
+ *bytes_written = 0;
+
+ g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
+ g_return_val_if_fail (vectors != NULL || n_vectors == 0, FALSE);
+ g_return_val_if_fail (cancellable == NULL || G_IS_CANCELLABLE (cancellable), FALSE);
+ g_return_val_if_fail (error == NULL || *error == NULL, FALSE);
+
+ if (n_vectors == 0)
+ return TRUE;
+
+ class = G_OUTPUT_STREAM_GET_CLASS (stream);
+
+ g_return_val_if_fail (class->writev_fn != NULL, FALSE);
+
+ if (!g_output_stream_set_pending (stream, error))
+ return FALSE;
+
+ if (cancellable)
+ g_cancellable_push_current (cancellable);
+
+ res = class->writev_fn (stream, vectors, n_vectors, &_bytes_written, cancellable, error);
+
+ g_warn_if_fail (res || _bytes_written == 0);
+ g_warn_if_fail (res || (error == NULL || *error != NULL));
+
+ if (cancellable)
+ g_cancellable_pop_current (cancellable);
+
+ g_output_stream_clear_pending (stream);
+
+ if (bytes_written)
+ *bytes_written = _bytes_written;
+
+ return res;
+}
+
+/**
+ * g_output_stream_writev_all:
+ * @stream: a #GOutputStream.
+ * @vectors: (array length=n_vectors): the buffer containing the #GOutputVectors to write.
+ * @n_vectors: the number of vectors to write
+ * @bytes_written: (out) (optional): location to store the number of bytes that were
+ * written to the stream
+ * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
+ * @error: location to store the error occurring, or %NULL to ignore
+ *
+ * Tries to write the bytes contained in the @n_vectors @vectors into the
+ * stream. Will block during the operation.
+ *
+ * This function is similar to g_output_stream_writev(), except it tries to
+ * write as many bytes as requested, only stopping on an error.
+ *
+ * On a successful write of all @n_vectors vectors, %TRUE is returned, and
+ * @bytes_written is set to the sum of all the sizes of @vectors.
+ *
+ * If there is an error during the operation %FALSE is returned and @error
+ * is set to indicate the error status.
+ *
+ * As a special exception to the normal conventions for functions that
+ * use #GError, if this function returns %FALSE (and sets @error) then
+ * @bytes_written will be set to the number of bytes that were
+ * successfully written before the error was encountered. This
+ * functionality is only available from C. If you need it from another
+ * language then you must write your own loop around
+ * g_output_stream_write().
+ *
+ * The content of the individual elements of @vectors might be changed by this
+ * function.
+ *
+ * Returns: %TRUE on success, %FALSE if there was an error
+ *
+ * Since: 2.60
+ */
+gboolean
+g_output_stream_writev_all (GOutputStream *stream,
+ GOutputVector *vectors,
+ gsize n_vectors,
+ gsize *bytes_written,
+ GCancellable *cancellable,
+ GError **error)
+{
+ gsize _bytes_written = 0;
+ gsize i, to_be_written = 0;
+
+ if (bytes_written)
+ *bytes_written = 0;
+
+ g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
+ g_return_val_if_fail (vectors != NULL || n_vectors == 0, FALSE);
+ g_return_val_if_fail (cancellable == NULL || G_IS_CANCELLABLE (cancellable), FALSE);
+ g_return_val_if_fail (error == NULL || *error == NULL, FALSE);
+
+ /* We can't write more than G_MAXSIZE bytes overall, otherwise we
+ * would overflow the bytes_written counter */
+ for (i = 0; i < n_vectors; i++)
+ {
+ if (to_be_written > G_MAXSIZE - vectors[i].size)
+ {
+ g_set_error (error, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
+ _("Sum of vectors passed to %s too large"), G_STRFUNC);
+ return FALSE;
+ }
+ to_be_written += vectors[i].size;
+ }
+
+ _bytes_written = 0;
+ while (n_vectors > 0 && to_be_written > 0)
+ {
+ gsize n_written = 0;
+ gboolean res;
+
+ res = g_output_stream_writev (stream, vectors, n_vectors, &n_written, cancellable, error);
+
+ if (!res)
+ {
+ if (bytes_written)
+ *bytes_written = _bytes_written;
+ return FALSE;
+ }
+
+ if (n_written == 0)
+ g_warning ("Write returned zero without error");
+ _bytes_written += n_written;
+
+ /* skip vectors that have been written in full */
+ while (n_vectors > 0 && n_written >= vectors[0].size)
+ {
+ n_written -= vectors[0].size;
+ ++vectors;
+ --n_vectors;
+ }
+ /* skip partially written vector data */
+ if (n_written > 0 && n_vectors > 0)
+ {
+ vectors[0].size -= n_written;
+ vectors[0].buffer = ((guint8 *) vectors[0].buffer) + n_written;
+ }
+ }
+
+ if (bytes_written)
+ *bytes_written = _bytes_written;
+
+ return TRUE;
+}
+
/**
* g_output_stream_printf:
* @stream: a #GOutputStream.
@@ -923,7 +1141,6 @@ write_all_callback (GObject *stream,
g_task_return_boolean (task, TRUE);
g_object_unref (task);
}
-
else
g_output_stream_write_async (G_OUTPUT_STREAM (stream),
data->buffer + data->bytes_written,
@@ -1060,6 +1277,329 @@ g_output_stream_write_all_finish (GOutputStream *stream,
return g_task_propagate_boolean (task, error);
}
+/**
+ * g_output_stream_writev_async:
+ * @stream: A #GOutputStream.
+ * @vectors: (array length=n_vectors): the buffer containing the #GOutputVectors to write.
+ * @n_vectors: the number of vectors to write
+ * @io_priority: the I/O priority of the request.
+ * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore.
+ * @callback: (scope async): callback to call when the request is satisfied
+ * @user_data: (closure): the data to pass to callback function
+ *
+ * Request an asynchronous write of the bytes contained in @n_vectors @vectors into
+ * the stream. When the operation is finished @callback will be called.
+ * You can then call g_output_stream_writev_finish() to get the result of the
+ * operation.
+ *
+ * During an async request no other sync and async calls are allowed,
+ * and will result in %G_IO_ERROR_PENDING errors.
+ *
+ * On success, the number of bytes written will be passed to the
+ * @callback. It is not an error if this is not the same as the
+ * requested size, as it can happen e.g. on a partial I/O error,
+ * but generally we try to write as many bytes as requested.
+ *
+ * You are guaranteed that this method will never fail with
+ * %G_IO_ERROR_WOULD_BLOCK — if @stream can't accept more data, the
+ * method will just wait until this changes.
+ *
+ * Any outstanding I/O request with higher priority (lower numerical
+ * value) will be executed before an outstanding request with lower
+ * priority. Default priority is %G_PRIORITY_DEFAULT.
+ *
+ * The asynchronous methods have a default fallback that uses threads
+ * to implement asynchronicity, so they are optional for inheriting
+ * classes. However, if you override one you must override all.
+ *
+ * For the synchronous, blocking version of this function, see
+ * g_output_stream_writev().
+ *
+ * Note that no copy of @vectors will be made, so it must stay valid
+ * until @callback is called.
+ *
+ * Since: 2.60
+ */
+void
+g_output_stream_writev_async (GOutputStream *stream,
+ const GOutputVector *vectors,
+ gsize n_vectors,
+ int io_priority,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer user_data)
+{
+ GOutputStreamClass *class;
+
+ g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
+ g_return_if_fail (vectors != NULL || n_vectors == 0);
+ g_return_if_fail (cancellable == NULL || G_IS_CANCELLABLE (cancellable));
+
+ class = G_OUTPUT_STREAM_GET_CLASS (stream);
+ g_return_if_fail (class->writev_async != NULL);
+
+ class->writev_async (stream, vectors, n_vectors, io_priority, cancellable,
+ callback, user_data);
+}
+
+/**
+ * g_output_stream_writev_finish:
+ * @stream: a #GOutputStream.
+ * @result: a #GAsyncResult.
+ * @bytes_written: (out) (optional): location to store the number of bytes that were written to the stream
+ * @error: a #GError location to store the error occurring, or %NULL to
+ * ignore.
+ *
+ * Finishes a stream writev operation.
+ *
+ * Returns: %TRUE on success, %FALSE if there was an error
+ *
+ * Since: 2.60
+ */
+gboolean
+g_output_stream_writev_finish (GOutputStream *stream,
+ GAsyncResult *result,
+ gsize *bytes_written,
+ GError **error)
+{
+ GOutputStreamClass *class;
+ gboolean res;
+ gsize _bytes_written = 0;
+
+ g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
+ g_return_val_if_fail (G_IS_ASYNC_RESULT (result), FALSE);
+ g_return_val_if_fail (error == NULL || *error == NULL, FALSE);
+
+ class = G_OUTPUT_STREAM_GET_CLASS (stream);
+ g_return_val_if_fail (class->writev_finish != NULL, FALSE);
+
+ res = class->writev_finish (stream, result, &_bytes_written, error);
+
+ g_warn_if_fail (res || _bytes_written == 0);
+ g_warn_if_fail (res || (error == NULL || *error != NULL));
+
+ if (bytes_written)
+ *bytes_written = _bytes_written;
+
+ return res;
+}
+
+typedef struct
+{
+ GOutputVector *vectors;
+ gsize n_vectors; /* (unowned) */
+ gsize bytes_written;
+} AsyncWritevAll;
+
+static void
+free_async_writev_all (gpointer data)
+{
+ g_slice_free (AsyncWritevAll, data);
+}
+
+static void
+writev_all_callback (GObject *stream,
+ GAsyncResult *result,
+ gpointer user_data)
+{
+ GTask *task = user_data;
+ AsyncWritevAll *data = g_task_get_task_data (task);
+ gint priority = g_task_get_priority (task);
+ GCancellable *cancellable = g_task_get_cancellable (task);
+
+ if (result)
+ {
+ GError *error = NULL;
+ gboolean res;
+ gsize n_written = 0;
+
+ res = g_output_stream_writev_finish (G_OUTPUT_STREAM (stream), result, &n_written, &error);
+
+ if (!res)
+ {
+ g_task_return_error (task, g_steal_pointer (&error));
+ g_object_unref (task);
+ return;
+ }
+
+ g_warn_if_fail (n_written > 0);
+ data->bytes_written += n_written;
+
+ /* skip vectors that have been written in full */
+ while (data->n_vectors > 0 && n_written >= data->vectors[0].size)
+ {
+ n_written -= data->vectors[0].size;
+ ++data->vectors;
+ --data->n_vectors;
+ }
+ /* skip partially written vector data */
+ if (n_written > 0 && data->n_vectors > 0)
+ {
+ data->vectors[0].size -= n_written;
+ data->vectors[0].buffer = ((guint8 *) data->vectors[0].buffer) + n_written;
+ }
+ }
+
+ if (data->n_vectors == 0)
+ {
+ g_task_return_boolean (task, TRUE);
+ g_object_unref (task);
+ }
+ else
+ g_output_stream_writev_async (G_OUTPUT_STREAM (stream),
+ data->vectors,
+ data->n_vectors,
+ priority,
+ cancellable,
+ writev_all_callback, g_steal_pointer (&task));
+}
+
+static void
+writev_all_async_thread (GTask *task,
+ gpointer source_object,
+ gpointer task_data,
+ GCancellable *cancellable)
+{
+ GOutputStream *stream = G_OUTPUT_STREAM (source_object);
+ AsyncWritevAll *data = task_data;
+ GError *error = NULL;
+
+ if (g_output_stream_writev_all (stream, data->vectors, data->n_vectors, &data->bytes_written,
+ g_task_get_cancellable (task), &error))
+ g_task_return_boolean (task, TRUE);
+ else
+ g_task_return_error (task, g_steal_pointer (&error));
+}
+
+/**
+ * g_output_stream_writev_all_async:
+ * @stream: A #GOutputStream
+ * @vectors: (array length=n_vectors): the buffer containing the #GOutputVectors to write.
+ * @n_vectors: the number of vectors to write
+ * @io_priority: the I/O priority of the request
+ * @cancellable: (nullable): optional #GCancellable object, %NULL to ignore
+ * @callback: (scope async): callback to call when the request is satisfied
+ * @user_data: (closure): the data to pass to callback function
+ *
+ * Request an asynchronous write of the bytes contained in the @n_vectors @vectors into
+ * the stream. When the operation is finished @callback will be called.
+ * You can then call g_output_stream_writev_all_finish() to get the result of the
+ * operation.
+ *
+ * This is the asynchronous version of g_output_stream_writev_all().
+ *
+ * Call g_output_stream_writev_all_finish() to collect the result.
+ *
+ * Any outstanding I/O request with higher priority (lower numerical
+ * value) will be executed before an outstanding request with lower
+ * priority. Default priority is %G_PRIORITY_DEFAULT.
+ *
+ * Note that no copy of @vectors will be made, so it must stay valid
+ * until @callback is called. The content of the individual elements
+ * of @vectors might be changed by this function.
+ *
+ * Since: 2.60
+ */
+void
+g_output_stream_writev_all_async (GOutputStream *stream,
+ GOutputVector *vectors,
+ gsize n_vectors,
+ int io_priority,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer user_data)
+{
+ AsyncWritevAll *data;
+ GTask *task;
+ gsize i, to_be_written = 0;
+
+ g_return_if_fail (G_IS_OUTPUT_STREAM (stream));
+ g_return_if_fail (vectors != NULL || n_vectors == 0);
+ g_return_if_fail (cancellable == NULL || G_IS_CANCELLABLE (cancellable));
+
+ task = g_task_new (stream, cancellable, callback, user_data);
+ data = g_slice_new0 (AsyncWritevAll);
+ data->vectors = vectors;
+ data->n_vectors = n_vectors;
+
+ g_task_set_source_tag (task, g_output_stream_writev_all_async);
+ g_task_set_task_data (task, data, free_async_writev_all);
+ g_task_set_priority (task, io_priority);
+
+ /* We can't write more than G_MAXSIZE bytes overall, otherwise we
+ * would overflow the bytes_written counter */
+ for (i = 0; i < n_vectors; i++)
+ {
+ if (to_be_written > G_MAXSIZE - vectors[i].size)
+ {
+ g_task_return_new_error (task, G_IO_ERROR, G_IO_ERROR_INVALID_ARGUMENT,
+ _("Sum of vectors passed to %s too large"),
+ G_STRFUNC);
+ g_object_unref (task);
+ return;
+ }
+ to_be_written += vectors[i].size;
+ }
+
+ /* If async writes are going to be handled via the threadpool anyway
+ * then we may as well do it with a single dispatch instead of
+ * bouncing in and out.
+ */
+ if (g_output_stream_async_writev_is_via_threads (stream))
+ {
+ g_task_run_in_thread (task, writev_all_async_thread);
+ g_object_unref (task);
+ }
+ else
+ writev_all_callback (G_OBJECT (stream), NULL, g_steal_pointer (&task));
+}
+
+/**
+ * g_output_stream_writev_all_finish:
+ * @stream: a #GOutputStream
+ * @result: a #GAsyncResult
+ * @bytes_written: (out) (optional): location to store the number of bytes that were written to the stream
+ * @error: a #GError location to store the error occurring, or %NULL to ignore.
+ *
+ * Finishes an asynchronous stream write operation started with
+ * g_output_stream_writev_all_async().
+ *
+ * As a special exception to the normal conventions for functions that
+ * use #GError, if this function returns %FALSE (and sets @error) then
+ * @bytes_written will be set to the number of bytes that were
+ * successfully written before the error was encountered. This
+ * functionality is only available from C. If you need it from another
+ * language then you must write your own loop around
+ * g_output_stream_writev_async().
+ *
+ * Returns: %TRUE on success, %FALSE if there was an error
+ *
+ * Since: 2.60
+ */
+gboolean
+g_output_stream_writev_all_finish (GOutputStream *stream,
+ GAsyncResult *result,
+ gsize *bytes_written,
+ GError **error)
+{
+ GTask *task;
+
+ g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
+ g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
+ g_return_val_if_fail (error == NULL || *error == NULL, FALSE);
+
+ task = G_TASK (result);
+
+ if (bytes_written)
+ {
+ AsyncWritevAll *data = (AsyncWritevAll *)g_task_get_task_data (task);
+
+ *bytes_written = data->bytes_written;
+ }
+
+ return g_task_propagate_boolean (task, error);
+}
+
static void
write_bytes_callback (GObject *stream,
GAsyncResult *result,
@@ -1712,6 +2252,28 @@ g_output_stream_async_write_is_via_threads (GOutputStream *stream)
g_pollable_output_stream_can_poll (G_POLLABLE_OUTPUT_STREAM (stream))));
}
+/*< internal >
+ * g_output_stream_async_writev_is_via_threads:
+ * @stream: a #GOutputStream.
+ *
+ * Checks if an output stream's writev_async function uses threads.
+ *
+ * Returns: %TRUE if @stream's writev_async function uses threads.
+ **/
+gboolean
+g_output_stream_async_writev_is_via_threads (GOutputStream *stream)
+{
+ GOutputStreamClass *class;
+
+ g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), FALSE);
+
+ class = G_OUTPUT_STREAM_GET_CLASS (stream);
+
+ return (class->writev_async == g_output_stream_real_writev_async &&
+ !(G_IS_POLLABLE_OUTPUT_STREAM (stream) &&
+ g_pollable_output_stream_can_poll (G_POLLABLE_OUTPUT_STREAM (stream))));
+}
+
/*< internal >
* g_output_stream_async_close_is_via_threads:
* @stream: output stream
@@ -1732,6 +2294,69 @@ g_output_stream_async_close_is_via_threads (GOutputStream *stream)
return class->close_async == g_output_stream_real_close_async;
}
+/********************************************
+ * Default implementation of sync ops *
+ ********************************************/
+static gboolean
+g_output_stream_real_writev (GOutputStream *stream,
+ const GOutputVector *vectors,
+ gsize n_vectors,
+ gsize *bytes_written,
+ GCancellable *cancellable,
+ GError **error)
+{
+ GOutputStreamClass *class;
+ gsize _bytes_written = 0;
+ gsize i;
+ GError *err = NULL;
+
+ class = G_OUTPUT_STREAM_GET_CLASS (stream);
+
+ if (bytes_written)
+ *bytes_written = 0;
+
+ for (i = 0; i < n_vectors; i++)
+ {
+ gssize res = 0;
+
+ /* Would we overflow here? In that case simply return and let the caller
+ * handle this like a short write */
+ if (_bytes_written > G_MAXSIZE - vectors[i].size)
+ break;
+
+ res = class->write_fn (stream, vectors[i].buffer, vectors[i].size, cancellable, &err);
+
+ if (res == -1)
+ {
+ /* If we already wrote something we handle this like a short write
+ * and assume that on the next call the same error happens again, or
+ * everything finishes successfully without data loss then
+ */
+ if (_bytes_written > 0)
+ {
+ if (bytes_written)
+ *bytes_written = _bytes_written;
+
+ g_clear_error (&err);
+ return TRUE;
+ }
+
+ g_propagate_error (error, err);
+ return FALSE;
+ }
+
+ _bytes_written += res;
+ /* if we had a short write break the loop here */
+ if (res < vectors[i].size)
+ break;
+ }
+
+ if (bytes_written)
+ *bytes_written = _bytes_written;
+
+ return TRUE;
+}
+
/********************************************
* Default implementation of async ops *
********************************************/
@@ -1852,6 +2477,172 @@ g_output_stream_real_write_finish (GOutputStream *stream,
return g_task_propagate_int (G_TASK (result), error);
}
+typedef struct {
+ const GOutputVector *vectors;
+ gsize n_vectors; /* (unowned) */
+ gsize bytes_written;
+} WritevData;
+
+static void
+free_writev_data (WritevData *op)
+{
+ g_slice_free (WritevData, op);
+}
+
+static void
+writev_async_thread (GTask *task,
+ gpointer source_object,
+ gpointer task_data,
+ GCancellable *cancellable)
+{
+ GOutputStream *stream = source_object;
+ WritevData *op = task_data;
+ GOutputStreamClass *class;
+ GError *error = NULL;
+ gboolean res;
+
+ class = G_OUTPUT_STREAM_GET_CLASS (stream);
+ res = class->writev_fn (stream, op->vectors, op->n_vectors,
+ &op->bytes_written, cancellable, &error);
+
+ g_warn_if_fail (res || op->bytes_written == 0);
+ g_warn_if_fail (res || error != NULL);
+
+ if (!res)
+ g_task_return_error (task, g_steal_pointer (&error));
+ else
+ g_task_return_boolean (task, TRUE);
+}
+
+static void writev_async_pollable (GPollableOutputStream *stream,
+ GTask *task);
+
+static gboolean
+writev_async_pollable_ready (GPollableOutputStream *stream,
+ gpointer user_data)
+{
+ GTask *task = user_data;
+
+ writev_async_pollable (stream, task);
+ return G_SOURCE_REMOVE;
+}
+
+static void
+writev_async_pollable (GPollableOutputStream *stream,
+ GTask *task)
+{
+ GError *error = NULL;
+ WritevData *op = g_task_get_task_data (task);
+ GPollableReturn res;
+ gsize bytes_written = 0;
+
+ if (g_task_return_error_if_cancelled (task))
+ return;
+
+ res = G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE (stream)->
+ writev_nonblocking (stream, op->vectors, op->n_vectors, &bytes_written, &error);
+
+ switch (res)
+ {
+ case G_POLLABLE_RETURN_WOULD_BLOCK:
+ {
+ GSource *source;
+
+ g_warn_if_fail (error == NULL);
+ g_warn_if_fail (bytes_written == 0);
+
+ source = g_pollable_output_stream_create_source (stream,
+ g_task_get_cancellable (task));
+ g_task_attach_source (task, source,
+ (GSourceFunc) writev_async_pollable_ready);
+ g_source_unref (source);
+ }
+ break;
+ case G_POLLABLE_RETURN_OK:
+ g_warn_if_fail (error == NULL);
+ op->bytes_written = bytes_written;
+ g_task_return_boolean (task, TRUE);
+ break;
+ case G_POLLABLE_RETURN_FAILED:
+ g_warn_if_fail (bytes_written == 0);
+ g_warn_if_fail (error != NULL);
+ g_task_return_error (task, g_steal_pointer (&error));
+ break;
+ default:
+ g_assert_not_reached ();
+ }
+}
+
+static void
+g_output_stream_real_writev_async (GOutputStream *stream,
+ const GOutputVector *vectors,
+ gsize n_vectors,
+ int io_priority,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer user_data)
+{
+ GTask *task;
+ WritevData *op;
+ GError *error = NULL;
+
+ op = g_slice_new0 (WritevData);
+ task = g_task_new (stream, cancellable, callback, user_data);
+ op->vectors = vectors;
+ op->n_vectors = n_vectors;
+
+ g_task_set_check_cancellable (task, FALSE);
+ g_task_set_source_tag (task, g_output_stream_writev_async);
+ g_task_set_priority (task, io_priority);
+ g_task_set_task_data (task, op, (GDestroyNotify) free_writev_data);
+
+ if (n_vectors == 0)
+ {
+ g_task_return_boolean (task, TRUE);
+ g_object_unref (task);
+ return;
+ }
+
+ if (!g_output_stream_set_pending (stream, &error))
+ {
+ g_task_return_error (task, g_steal_pointer (&error));
+ g_object_unref (task);
+ return;
+ }
+
+ if (!g_output_stream_async_writev_is_via_threads (stream))
+ writev_async_pollable (G_POLLABLE_OUTPUT_STREAM (stream), task);
+ else
+ g_task_run_in_thread (task, writev_async_thread);
+
+ g_object_unref (task);
+}
+
+static gboolean
+g_output_stream_real_writev_finish (GOutputStream *stream,
+ GAsyncResult *result,
+ gsize *bytes_written,
+ GError **error)
+{
+ GTask *task;
+
+ g_return_val_if_fail (g_task_is_valid (result, stream), FALSE);
+ g_return_val_if_fail (g_async_result_is_tagged (result, g_output_stream_writev_async), FALSE);
+
+ g_output_stream_clear_pending (stream);
+
+ task = G_TASK (result);
+
+ if (bytes_written)
+ {
+ WritevData *op = g_task_get_task_data (task);
+
+ *bytes_written = op->bytes_written;
+ }
+
+ return g_task_propagate_boolean (task, error);
+}
+
typedef struct {
GInputStream *source;
GOutputStreamSpliceFlags flags;
diff --git a/gio/goutputstream.h b/gio/goutputstream.h
index fef1b8fdf..dc0f4925a 100644
--- a/gio/goutputstream.h
+++ b/gio/goutputstream.h
@@ -119,11 +119,28 @@ struct _GOutputStreamClass
GAsyncResult *result,
GError **error);
+ gboolean (* writev_fn) (GOutputStream *stream,
+ const GOutputVector *vectors,
+ gsize n_vectors,
+ gsize *bytes_written,
+ GCancellable *cancellable,
+ GError **error);
+
+ void (* writev_async) (GOutputStream *stream,
+ const GOutputVector *vectors,
+ gsize n_vectors,
+ int io_priority,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer user_data);
+
+ gboolean (* writev_finish) (GOutputStream *stream,
+ GAsyncResult *result,
+ gsize *bytes_written,
+ GError **error);
+
/*< private >*/
/* Padding for future expansion */
- void (*_g_reserved1) (void);
- void (*_g_reserved2) (void);
- void (*_g_reserved3) (void);
void (*_g_reserved4) (void);
void (*_g_reserved5) (void);
void (*_g_reserved6) (void);
@@ -147,6 +164,22 @@ gboolean g_output_stream_write_all (GOutputStream *stream,
gsize *bytes_written,
GCancellable *cancellable,
GError **error);
+
+GLIB_AVAILABLE_IN_2_60
+gboolean g_output_stream_writev (GOutputStream *stream,
+ const GOutputVector *vectors,
+ gsize n_vectors,
+ gsize *bytes_written,
+ GCancellable *cancellable,
+ GError **error);
+GLIB_AVAILABLE_IN_2_60
+gboolean g_output_stream_writev_all (GOutputStream *stream,
+ GOutputVector *vectors,
+ gsize n_vectors,
+ gsize *bytes_written,
+ GCancellable *cancellable,
+ GError **error);
+
GLIB_AVAILABLE_IN_2_40
gboolean g_output_stream_printf (GOutputStream *stream,
gsize *bytes_written,
@@ -208,6 +241,35 @@ gboolean g_output_stream_write_all_finish (GOutputStream *stream,
gsize *bytes_written,
GError **error);
+GLIB_AVAILABLE_IN_2_60
+void g_output_stream_writev_async (GOutputStream *stream,
+ const GOutputVector *vectors,
+ gsize n_vectors,
+ int io_priority,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer user_data);
+GLIB_AVAILABLE_IN_2_60
+gboolean g_output_stream_writev_finish (GOutputStream *stream,
+ GAsyncResult *result,
+ gsize *bytes_written,
+ GError **error);
+
+GLIB_AVAILABLE_IN_2_60
+void g_output_stream_writev_all_async (GOutputStream *stream,
+ GOutputVector *vectors,
+ gsize n_vectors,
+ int io_priority,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer user_data);
+
+GLIB_AVAILABLE_IN_2_60
+gboolean g_output_stream_writev_all_finish (GOutputStream *stream,
+ GAsyncResult *result,
+ gsize *bytes_written,
+ GError **error);
+
GLIB_AVAILABLE_IN_2_34
void g_output_stream_write_bytes_async (GOutputStream *stream,
GBytes *bytes,
diff --git a/gio/gpollableoutputstream.c b/gio/gpollableoutputstream.c
index 40c649f0d..c17cf9268 100644
--- a/gio/gpollableoutputstream.c
+++ b/gio/gpollableoutputstream.c
@@ -41,17 +41,23 @@
G_DEFINE_INTERFACE (GPollableOutputStream, g_pollable_output_stream, G_TYPE_OUTPUT_STREAM)
-static gboolean g_pollable_output_stream_default_can_poll (GPollableOutputStream *stream);
-static gssize g_pollable_output_stream_default_write_nonblocking (GPollableOutputStream *stream,
- const void *buffer,
- gsize count,
- GError **error);
+static gboolean g_pollable_output_stream_default_can_poll (GPollableOutputStream *stream);
+static gssize g_pollable_output_stream_default_write_nonblocking (GPollableOutputStream *stream,
+ const void *buffer,
+ gsize count,
+ GError **error);
+static GPollableReturn g_pollable_output_stream_default_writev_nonblocking (GPollableOutputStream *stream,
+ const GOutputVector *vectors,
+ gsize n_vectors,
+ gsize
*bytes_written,
+ GError **error);
static void
g_pollable_output_stream_default_init (GPollableOutputStreamInterface *iface)
{
- iface->can_poll = g_pollable_output_stream_default_can_poll;
- iface->write_nonblocking = g_pollable_output_stream_default_write_nonblocking;
+ iface->can_poll = g_pollable_output_stream_default_can_poll;
+ iface->write_nonblocking = g_pollable_output_stream_default_write_nonblocking;
+ iface->writev_nonblocking = g_pollable_output_stream_default_writev_nonblocking;
}
static gboolean
@@ -157,6 +163,67 @@ g_pollable_output_stream_default_write_nonblocking (GPollableOutputStream *stre
write_fn (G_OUTPUT_STREAM (stream), buffer, count, NULL, error);
}
+static GPollableReturn
+g_pollable_output_stream_default_writev_nonblocking (GPollableOutputStream *stream,
+ const GOutputVector *vectors,
+ gsize n_vectors,
+ gsize *bytes_written,
+ GError **error)
+{
+ gsize _bytes_written = 0;
+ GPollableOutputStreamInterface *iface = G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE (stream);
+ gsize i;
+ GError *err = NULL;
+
+ for (i = 0; i < n_vectors; i++)
+ {
+ gssize res;
+
+ /* Would we overflow here? In that case simply return and let the caller
+ * handle this like a short write */
+ if (_bytes_written > G_MAXSIZE - vectors[i].size)
+ break;
+
+ res = iface->write_nonblocking (stream, vectors[i].buffer, vectors[i].size, &err);
+ if (res == -1)
+ {
+ if (bytes_written)
+ *bytes_written = _bytes_written;
+
+ /* If something was written already we handle this like a short
+ * write and assume that the next call would either give the same
+ * error again or successfully finish writing without errors or data
+ * loss
+ */
+ if (_bytes_written > 0)
+ {
+ g_clear_error (&err);
+ return G_POLLABLE_RETURN_OK;
+ }
+ else if (g_error_matches (err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
+ {
+ g_clear_error (&err);
+ return G_POLLABLE_RETURN_WOULD_BLOCK;
+ }
+ else
+ {
+ g_propagate_error (error, err);
+ return G_POLLABLE_RETURN_FAILED;
+ }
+ }
+
+ _bytes_written += res;
+ /* if we had a short write break the loop here */
+ if (res < vectors[i].size)
+ break;
+ }
+
+ if (bytes_written)
+ *bytes_written = _bytes_written;
+
+ return G_POLLABLE_RETURN_OK;
+}
+
/**
* g_pollable_output_stream_write_nonblocking:
* @stream: a #GPollableOutputStream
@@ -179,7 +246,8 @@ g_pollable_output_stream_default_write_nonblocking (GPollableOutputStream *stre
* to having been cancelled.
*
* Also note that if %G_IO_ERROR_WOULD_BLOCK is returned some underlying
- * transports like D/TLS require that you send the same @buffer and @count.
+ * transports like D/TLS require that you re-send the same @buffer and
+ * @count in the next write call.
*
* Virtual: write_nonblocking
* Returns: the number of bytes written, or -1 on error (including
@@ -221,3 +289,91 @@ g_pollable_output_stream_write_nonblocking (GPollableOutputStream *stream,
return res;
}
+
+/**
+ * g_pollable_output_stream_writev_nonblocking:
+ * @stream: a #GPollableOutputStream
+ * @vectors: (array length=n_vectors): the buffer containing the #GOutputVectors to write.
+ * @n_vectors: the number of vectors to write
+ * @bytes_written: (out) (optional): location to store the number of bytes that were
+ * written to the stream
+ * @cancellable: (nullable): a #GCancellable, or %NULL
+ * @error: #GError for error reporting, or %NULL to ignore.
+ *
+ * Attempts to write the bytes contained in the @n_vectors @vectors to @stream,
+ * as with g_output_stream_writev(). If @stream is not currently writable,
+ * this will immediately return %@G_POLLABLE_RETURN_WOULD_BLOCK, and you can
+ * use g_pollable_output_stream_create_source() to create a #GSource
+ * that will be triggered when @stream is writable. @error will *not* be
+ * set in that case.
+ *
+ * Note that since this method never blocks, you cannot actually
+ * use @cancellable to cancel it. However, it will return an error
+ * if @cancellable has already been cancelled when you call, which
+ * may happen if you call this method after a source triggers due
+ * to having been cancelled.
+ *
+ * Also note that if %G_POLLABLE_RETURN_WOULD_BLOCK is returned some underlying
+ * transports like D/TLS require that you re-send the same @vectors and
+ * @n_vectors in the next write call.
+ *
+ * Virtual: writev_nonblocking
+ *
+ * Returns: %@G_POLLABLE_RETURN_OK on success, %G_POLLABLE_RETURN_WOULD_BLOCK
+ * if the stream is not currently writable (and @error is *not* set), or
+ * %G_POLLABLE_RETURN_FAILED if there was an error in which case @error will
+ * be set.
+ *
+ * Since: 2.60
+ */
+GPollableReturn
+g_pollable_output_stream_writev_nonblocking (GPollableOutputStream *stream,
+ const GOutputVector *vectors,
+ gsize n_vectors,
+ gsize *bytes_written,
+ GCancellable *cancellable,
+ GError **error)
+{
+ GPollableOutputStreamInterface *iface;
+ GPollableReturn res;
+ gsize _bytes_written = 0;
+
+ if (bytes_written)
+ *bytes_written = 0;
+
+ g_return_val_if_fail (G_IS_POLLABLE_OUTPUT_STREAM (stream), G_POLLABLE_RETURN_FAILED);
+ g_return_val_if_fail (vectors != NULL || n_vectors == 0, G_POLLABLE_RETURN_FAILED);
+ g_return_val_if_fail (cancellable == NULL || G_IS_CANCELLABLE (cancellable), G_POLLABLE_RETURN_FAILED);
+ g_return_val_if_fail (error == NULL || *error == NULL, G_POLLABLE_RETURN_FAILED);
+
+ if (g_cancellable_set_error_if_cancelled (cancellable, error))
+ return G_POLLABLE_RETURN_FAILED;
+
+ if (n_vectors == 0)
+ return G_POLLABLE_RETURN_OK;
+
+ iface = G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE (stream);
+ g_return_val_if_fail (iface->writev_nonblocking != NULL, G_POLLABLE_RETURN_FAILED);
+
+ if (cancellable)
+ g_cancellable_push_current (cancellable);
+
+ res = iface->
+ writev_nonblocking (stream, vectors, n_vectors, &_bytes_written, error);
+
+ if (cancellable)
+ g_cancellable_pop_current (cancellable);
+
+ if (res == G_POLLABLE_RETURN_FAILED)
+ g_warn_if_fail (error == NULL || (*error != NULL && !g_error_matches (*error, G_IO_ERROR,
G_IO_ERROR_WOULD_BLOCK)));
+ else if (res == G_POLLABLE_RETURN_WOULD_BLOCK)
+ g_warn_if_fail (error == NULL || *error == NULL);
+
+ /* in case of not-OK nothing must've been written */
+ g_warn_if_fail (res == G_POLLABLE_RETURN_OK || _bytes_written == 0);
+
+ if (bytes_written)
+ *bytes_written = _bytes_written;
+
+ return res;
+}
diff --git a/gio/gpollableoutputstream.h b/gio/gpollableoutputstream.h
index bf13584d5..1ef830b57 100644
--- a/gio/gpollableoutputstream.h
+++ b/gio/gpollableoutputstream.h
@@ -77,6 +77,11 @@ struct _GPollableOutputStreamInterface
const void *buffer,
gsize count,
GError **error);
+ GPollableReturn (*writev_nonblocking) (GPollableOutputStream *stream,
+ const GOutputVector *vectors,
+ gsize n_vectors,
+ gsize *bytes_written,
+ GError **error);
};
GLIB_AVAILABLE_IN_ALL
@@ -98,6 +103,14 @@ gssize g_pollable_output_stream_write_nonblocking (GPollableOutputStream *str
GCancellable *cancellable,
GError **error);
+GLIB_AVAILABLE_IN_2_60
+GPollableReturn g_pollable_output_stream_writev_nonblocking (GPollableOutputStream *stream,
+ const GOutputVector *vectors,
+ gsize n_vectors,
+ gsize *bytes_written,
+ GCancellable *cancellable,
+ GError **error);
+
G_END_DECLS
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]