[wing/wip/poll-stream] outputstream: implement write_async instead of pollable
- From: Ignacio Casal Quinteiro <icq src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [wing/wip/poll-stream] outputstream: implement write_async instead of pollable
- Date: Fri, 30 Nov 2018 09:48:21 +0000 (UTC)
commit 162fb0ebf85055d03c970fefa290e5936d402b6f
Author: Ignacio Casal Quinteiro <qignacio amazon com>
Date: Fri Nov 30 10:47:15 2018 +0100
outputstream: implement write_async instead of pollable
wing/wingoutputstream.c | 224 ++++++++++++++++++++++++------------------------
1 file changed, 112 insertions(+), 112 deletions(-)
---
diff --git a/wing/wingoutputstream.c b/wing/wingoutputstream.c
index 8d397c1..9910abd 100644
--- a/wing/wingoutputstream.c
+++ b/wing/wingoutputstream.c
@@ -53,12 +53,7 @@ enum {
static GParamSpec *props[LAST_PROP];
-static void wing_output_stream_pollable_iface_init (GPollableOutputStreamInterface *iface);
-
-G_DEFINE_TYPE_WITH_CODE (WingOutputStream, wing_output_stream, G_TYPE_OUTPUT_STREAM,
- G_ADD_PRIVATE (WingOutputStream)
- G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_OUTPUT_STREAM,
wing_output_stream_pollable_iface_init)
- )
+G_DEFINE_TYPE_WITH_PRIVATE (WingOutputStream, wing_output_stream, G_TYPE_OUTPUT_STREAM)
static void
wing_output_stream_finalize (GObject *object)
@@ -127,17 +122,17 @@ wing_output_stream_get_property (GObject *object,
}
static gssize
-write_internal (GOutputStream *stream,
- const void *buffer,
- gsize count,
- gboolean blocking,
- GCancellable *cancellable,
- GError **error)
+wing_output_stream_write (GOutputStream *stream,
+ const void *buffer,
+ gsize count,
+ GCancellable *cancellable,
+ GError **error)
{
WingOutputStream *wing_stream;
WingOutputStreamPrivate *priv;
BOOL res;
DWORD nbytes, nwritten;
+ OVERLAPPED overlap = { 0, };
gssize retval = -1;
wing_stream = WING_OUTPUT_STREAM (stream);
@@ -146,64 +141,31 @@ write_internal (GOutputStream *stream,
if (g_cancellable_set_error_if_cancelled (cancellable, error))
return -1;
- if (!blocking && g_pollable_output_stream_is_writable (G_POLLABLE_OUTPUT_STREAM (stream)))
- {
- gboolean result;
-
- result = GetOverlappedResult (priv->overlap.hEvent, &priv->overlap, &nwritten, FALSE);
- if (!result && GetLastError () == ERROR_IO_INCOMPLETE)
- {
- g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
- g_strerror (EAGAIN));
- return -1;
- }
-
- ResetEvent (priv->overlap.hEvent);
-
- retval = nwritten;
- goto end;
- }
-
if (count > G_MAXINT)
nbytes = G_MAXINT;
else
nbytes = count;
- ResetEvent (priv->overlap.hEvent);
+ overlap.hEvent = CreateEvent (NULL, FALSE, FALSE, NULL);
+ g_return_val_if_fail (overlap.hEvent != NULL, -1);
- res = WriteFile (priv->handle, buffer, nbytes, &nwritten, &priv->overlap);
+ res = WriteFile (priv->handle, buffer, nbytes, &nwritten, &overlap);
if (res)
- {
- retval = nwritten;
- ResetEvent (priv->overlap.hEvent);
- }
+ retval = nwritten;
else
{
int errsv = GetLastError ();
- if (errsv == ERROR_IO_PENDING)
+ if (errsv == ERROR_IO_PENDING &&
+ wing_overlap_wait_result (priv->handle,
+ &overlap, &nwritten, cancellable))
{
- if (!blocking)
- {
- g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
- g_strerror (EAGAIN));
- goto end;
- }
- else if (blocking && wing_overlap_wait_result (priv->handle,
- &priv->overlap,
- &nwritten, cancellable))
- {
- retval = nwritten;
- ResetEvent (priv->overlap.hEvent);
- goto end;
- }
+ retval = nwritten;
+ goto end;
}
if (g_cancellable_set_error_if_cancelled (cancellable, error))
- {
- ResetEvent (priv->overlap.hEvent);
- goto end;
- }
+ goto end;
errsv = GetLastError ();
if (errsv == ERROR_HANDLE_EOF ||
@@ -218,26 +180,17 @@ write_internal (GOutputStream *stream,
emsg = g_win32_error_message (errsv);
g_set_error (error, G_IO_ERROR,
g_io_error_from_win32_error (errsv),
- "Error writing to handle: %s",
+ _("Error writing to handle: %s"),
emsg);
g_free (emsg);
}
}
end:
+ CloseHandle (overlap.hEvent);
return retval;
}
-static gssize
-wing_output_stream_write (GOutputStream *stream,
- const void *buffer,
- gsize count,
- GCancellable *cancellable,
- GError **error)
-{
- return write_internal (stream, buffer, count, TRUE, cancellable, error);
-}
-
static gboolean
wing_output_stream_close (GOutputStream *stream,
GCancellable *cancellable,
@@ -270,6 +223,98 @@ wing_output_stream_close (GOutputStream *stream,
return TRUE;
}
+static gboolean
+write_async_ready (WingInputStream *stream,
+ gpointer user_data)
+{
+ WingInputStreamPrivate *priv;
+ GTask *task = user_data;
+ DWORD nwritten;
+ gboolean result;
+
+ priv = wing_input_stream_get_instance_private (wing_stream);
+
+ result = GetOverlappedResult (priv->overlap.hEvent, &priv->overlap, &nwritten, FALSE);
+ if (!result && GetLastError () == ERROR_IO_INCOMPLETE)
+ {
+ /* Try again to wait for the event to get ready */
+ ResetEvent (priv->overlap.hEvent);
+ return G_SOURCE_CONTINUE;
+ }
+
+ ResetEvent (priv->overlap.hEvent);
+
+ g_task_return_int (task, nwritten);
+
+ return G_SOURCE_REMOVE;
+}
+
+static void
+wing_output_stream_write_async (GOutputStream *stream,
+ const void *buffer,
+ gsize count,
+ int io_priority,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer user_data)
+{
+ WingOutputStream *wing_stream;
+ WingOutputStreamPrivate *priv;
+ DWORD nbytes, nwritten;
+ int errsv;
+ GTask *task;
+ gchar *emsg;
+
+ wing_stream = WING_OUTPUT_STREAM (stream);
+ priv = wing_output_stream_get_instance_private (wing_stream);
+
+ task = g_task_new (stream, cancellable, callback, user_data);
+
+ if (count > G_MAXINT)
+ nbytes = G_MAXINT;
+ else
+ nbytes = count;
+
+ ResetEvent (priv->overlap.hEvent);
+
+ res = WriteFile (priv->handle, buffer, nbytes, &nwritten, &priv->overlap);
+ if (res)
+ {
+ ResetEvent (priv->overlap.hEvent);
+ g_task_return_int (task, nwritten);
+ g_object_unref (task);
+ return;
+ }
+
+ errsv = GetLastError ();
+
+ if (errsv == ERROR_IO_PENDING)
+ {
+ GSource *handle_source;
+
+ handle_source = wing_create_source (priv->overlap.hEvent, G_IO_IN, cancellable);
+ g_task_attach_source (task, handle_source,
+ (GSourceFunc)write_async_ready);
+ g_source_unref (handle_source);
+ return;
+ }
+
+ if (errsv == ERROR_HANDLE_EOF ||
+ errsv == ERROR_BROKEN_PIPE)
+ {
+ g_task_return_int (task, 0);
+ return;
+ }
+
+ emsg = g_win32_error_message (errsv);
+ g_task_report_new_error (stream, callback, user_data,
+ wing_input_stream_read_async,
+ G_IO_ERROR, g_io_error_from_win32_error (errsv),
+ "Error writing to handle: %s",
+ emsg);
+ g_free (emsg);
+}
+
static void
wing_output_stream_class_init (WingOutputStreamClass *klass)
{
@@ -282,6 +327,7 @@ wing_output_stream_class_init (WingOutputStreamClass *klass)
stream_class->write_fn = wing_output_stream_write;
stream_class->close_fn = wing_output_stream_close;
+ stream_class->write_async = wing_output_stream_write_async;
/**
* WingOutputStream:handle:
@@ -324,52 +370,6 @@ wing_output_stream_init (WingOutputStream *wing_stream)
g_return_if_fail (priv->overlap.hEvent != INVALID_HANDLE_VALUE);
}
-static gboolean
-wing_output_stream_pollable_is_writable (GPollableOutputStream *pollable)
-{
- WingOutputStream *wing_stream = WING_OUTPUT_STREAM (pollable);
- WingOutputStreamPrivate *priv;
-
- priv = wing_output_stream_get_instance_private (wing_stream);
-
- return WaitForSingleObject (priv->overlap.hEvent, 0) == WAIT_OBJECT_0;
-}
-
-static GSource *
-wing_output_stream_pollable_create_source (GPollableOutputStream *pollable,
- GCancellable *cancellable)
-{
- WingOutputStream *wing_stream = WING_OUTPUT_STREAM (pollable);
- WingOutputStreamPrivate *priv;
- GSource *handle_source, *pollable_source;
-
- priv = wing_output_stream_get_instance_private (wing_stream);
-
- handle_source = wing_create_source (priv->overlap.hEvent,
- G_IO_IN, cancellable);
- pollable_source = g_pollable_source_new_full (pollable, handle_source, cancellable);
- g_source_unref (handle_source);
-
- return pollable_source;
-}
-
-static gssize
-wing_output_stream_pollable_write_nonblocking (GPollableOutputStream *pollable,
- const void *buffer,
- gsize size,
- GError **error)
-{
- return write_internal (G_OUTPUT_STREAM (pollable), buffer, size, FALSE, NULL, error);
-}
-
-static void
-wing_output_stream_pollable_iface_init (GPollableOutputStreamInterface *iface)
-{
- iface->is_writable = wing_output_stream_pollable_is_writable;
- iface->create_source = wing_output_stream_pollable_create_source;
- iface->write_nonblocking = wing_output_stream_pollable_write_nonblocking;
-}
-
/**
* wing_output_stream_new:
* @handle: a Win32 file handle
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]