[wing/wip/poll-stream] outputstream: implement write_async instead of pollable



commit 162fb0ebf85055d03c970fefa290e5936d402b6f
Author: Ignacio Casal Quinteiro <qignacio amazon com>
Date:   Fri Nov 30 10:47:15 2018 +0100

    outputstream: implement write_async instead of pollable

 wing/wingoutputstream.c | 224 ++++++++++++++++++++++++------------------------
 1 file changed, 112 insertions(+), 112 deletions(-)
---
diff --git a/wing/wingoutputstream.c b/wing/wingoutputstream.c
index 8d397c1..9910abd 100644
--- a/wing/wingoutputstream.c
+++ b/wing/wingoutputstream.c
@@ -53,12 +53,7 @@ enum {
 
 static GParamSpec *props[LAST_PROP];
 
-static void wing_output_stream_pollable_iface_init (GPollableOutputStreamInterface *iface);
-
-G_DEFINE_TYPE_WITH_CODE (WingOutputStream, wing_output_stream, G_TYPE_OUTPUT_STREAM,
-                         G_ADD_PRIVATE (WingOutputStream)
-                         G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_OUTPUT_STREAM, 
wing_output_stream_pollable_iface_init)
-                         )
+G_DEFINE_TYPE_WITH_PRIVATE (WingOutputStream, wing_output_stream, G_TYPE_OUTPUT_STREAM)
 
 static void
 wing_output_stream_finalize (GObject *object)
@@ -127,17 +122,17 @@ wing_output_stream_get_property (GObject    *object,
 }
 
 static gssize
-write_internal (GOutputStream  *stream,
-                const void     *buffer,
-                gsize           count,
-                gboolean        blocking,
-                GCancellable   *cancellable,
-                GError        **error)
+wing_output_stream_write (GOutputStream  *stream,
+                          const void     *buffer,
+                          gsize           count,
+                          GCancellable   *cancellable,
+                          GError        **error)
 {
   WingOutputStream *wing_stream;
   WingOutputStreamPrivate *priv;
   BOOL res;
   DWORD nbytes, nwritten;
+  OVERLAPPED overlap = { 0, };
   gssize retval = -1;
 
   wing_stream = WING_OUTPUT_STREAM (stream);
@@ -146,64 +141,31 @@ write_internal (GOutputStream  *stream,
   if (g_cancellable_set_error_if_cancelled (cancellable, error))
     return -1;
 
-  if (!blocking && g_pollable_output_stream_is_writable (G_POLLABLE_OUTPUT_STREAM (stream)))
-    {
-      gboolean result;
-
-      result = GetOverlappedResult (priv->overlap.hEvent, &priv->overlap, &nwritten, FALSE);
-      if (!result && GetLastError () == ERROR_IO_INCOMPLETE)
-        {
-          g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
-                               g_strerror (EAGAIN));
-          return -1;
-        }
-
-      ResetEvent (priv->overlap.hEvent);
-
-      retval = nwritten;
-      goto end;
-    }
-
   if (count > G_MAXINT)
     nbytes = G_MAXINT;
   else
     nbytes = count;
 
-  ResetEvent (priv->overlap.hEvent);
+  overlap.hEvent = CreateEvent (NULL, FALSE, FALSE, NULL);
+  g_return_val_if_fail (overlap.hEvent != NULL, -1);
 
-  res = WriteFile (priv->handle, buffer, nbytes, &nwritten, &priv->overlap);
+  res = WriteFile (priv->handle, buffer, nbytes, &nwritten, &overlap);
   if (res)
-    {
-      retval = nwritten;
-      ResetEvent (priv->overlap.hEvent);
-    }
+    retval = nwritten;
   else
     {
       int errsv = GetLastError ();
 
-      if (errsv == ERROR_IO_PENDING)
+      if (errsv == ERROR_IO_PENDING &&
+          wing_overlap_wait_result (priv->handle,
+                                    &overlap, &nwritten, cancellable))
         {
-          if (!blocking)
-            {
-              g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
-                                   g_strerror (EAGAIN));
-              goto end;
-            }
-          else if (blocking && wing_overlap_wait_result (priv->handle,
-                                                         &priv->overlap,
-                                                         &nwritten, cancellable))
-            {
-              retval = nwritten;
-              ResetEvent (priv->overlap.hEvent);
-              goto end;
-            }
+          retval = nwritten;
+          goto end;
         }
 
       if (g_cancellable_set_error_if_cancelled (cancellable, error))
-        {
-          ResetEvent (priv->overlap.hEvent);
-          goto end;
-        }
+        goto end;
 
       errsv = GetLastError ();
       if (errsv == ERROR_HANDLE_EOF ||
@@ -218,26 +180,17 @@ write_internal (GOutputStream  *stream,
           emsg = g_win32_error_message (errsv);
           g_set_error (error, G_IO_ERROR,
                        g_io_error_from_win32_error (errsv),
-                       "Error writing to handle: %s",
+                       _("Error writing to handle: %s"),
                        emsg);
           g_free (emsg);
         }
     }
 
 end:
+  CloseHandle (overlap.hEvent);
   return retval;
 }
 
-static gssize
-wing_output_stream_write (GOutputStream  *stream,
-                          const void     *buffer,
-                          gsize           count,
-                          GCancellable   *cancellable,
-                          GError        **error)
-{
-  return write_internal (stream, buffer, count, TRUE, cancellable, error);
-}
-
 static gboolean
 wing_output_stream_close (GOutputStream  *stream,
                           GCancellable   *cancellable,
@@ -270,6 +223,98 @@ wing_output_stream_close (GOutputStream  *stream,
   return TRUE;
 }
 
+static gboolean
+write_async_ready (WingInputStream *stream,
+                   gpointer         user_data)
+{
+  WingInputStreamPrivate *priv;
+  GTask *task = user_data;
+  DWORD nwritten;
+  gboolean result;
+
+  priv = wing_input_stream_get_instance_private (wing_stream);
+
+  result = GetOverlappedResult (priv->overlap.hEvent, &priv->overlap, &nwritten, FALSE);
+  if (!result && GetLastError () == ERROR_IO_INCOMPLETE)
+    {
+      /* Try again to wait for the event to get ready */
+      ResetEvent (priv->overlap.hEvent);
+      return G_SOURCE_CONTINUE;
+    }
+
+  ResetEvent (priv->overlap.hEvent);
+
+  g_task_return_int (task, nwritten);
+
+  return G_SOURCE_REMOVE;
+}
+
+static void
+wing_output_stream_write_async (GOutputStream            *stream,
+                                const void               *buffer,
+                                gsize                     count,
+                                int                       io_priority,
+                                GCancellable             *cancellable,
+                                GAsyncReadyCallback       callback,
+                                gpointer                  user_data)
+{
+  WingOutputStream *wing_stream;
+  WingOutputStreamPrivate *priv;
+  DWORD nbytes, nwritten;
+  int errsv;
+  GTask *task;
+  gchar *emsg;
+
+  wing_stream = WING_OUTPUT_STREAM (stream);
+  priv = wing_output_stream_get_instance_private (wing_stream);
+
+  task = g_task_new (stream, cancellable, callback, user_data);
+
+  if (count > G_MAXINT)
+    nbytes = G_MAXINT;
+  else
+    nbytes = count;
+
+  ResetEvent (priv->overlap.hEvent);
+
+  res = WriteFile (priv->handle, buffer, nbytes, &nwritten, &priv->overlap);
+  if (res)
+    {
+      ResetEvent (priv->overlap.hEvent);
+      g_task_return_int (task, nwritten);
+      g_object_unref (task);
+      return;
+    }
+
+  errsv = GetLastError ();
+
+  if (errsv == ERROR_IO_PENDING)
+    {
+      GSource *handle_source;
+
+      handle_source = wing_create_source (priv->overlap.hEvent, G_IO_IN, cancellable);
+      g_task_attach_source (task, handle_source,
+                            (GSourceFunc)write_async_ready);
+      g_source_unref (handle_source);
+      return;
+    }
+
+  if (errsv == ERROR_HANDLE_EOF ||
+      errsv == ERROR_BROKEN_PIPE)
+    {
+      g_task_return_int (task, 0);
+      return;
+    }
+
+  emsg = g_win32_error_message (errsv);
+  g_task_report_new_error (stream, callback, user_data,
+                           wing_input_stream_read_async,
+                           G_IO_ERROR, g_io_error_from_win32_error (errsv),
+                           "Error writing to handle: %s",
+                           emsg);
+  g_free (emsg);
+}
+
 static void
 wing_output_stream_class_init (WingOutputStreamClass *klass)
 {
@@ -282,6 +327,7 @@ wing_output_stream_class_init (WingOutputStreamClass *klass)
 
   stream_class->write_fn = wing_output_stream_write;
   stream_class->close_fn = wing_output_stream_close;
+  stream_class->write_async = wing_output_stream_write_async;
 
    /**
    * WingOutputStream:handle:
@@ -324,52 +370,6 @@ wing_output_stream_init (WingOutputStream *wing_stream)
   g_return_if_fail (priv->overlap.hEvent != INVALID_HANDLE_VALUE);
 }
 
-static gboolean
-wing_output_stream_pollable_is_writable (GPollableOutputStream *pollable)
-{
-  WingOutputStream *wing_stream = WING_OUTPUT_STREAM (pollable);
-  WingOutputStreamPrivate *priv;
-
-  priv = wing_output_stream_get_instance_private (wing_stream);
-
-  return WaitForSingleObject (priv->overlap.hEvent, 0) == WAIT_OBJECT_0;
-}
-
-static GSource *
-wing_output_stream_pollable_create_source (GPollableOutputStream *pollable,
-                                           GCancellable          *cancellable)
-{
-  WingOutputStream *wing_stream = WING_OUTPUT_STREAM (pollable);
-  WingOutputStreamPrivate *priv;
-  GSource *handle_source, *pollable_source;
-
-  priv = wing_output_stream_get_instance_private (wing_stream);
-
-  handle_source = wing_create_source (priv->overlap.hEvent,
-                                      G_IO_IN, cancellable);
-  pollable_source = g_pollable_source_new_full (pollable, handle_source, cancellable);
-  g_source_unref (handle_source);
-
-  return pollable_source;
-}
-
-static gssize
-wing_output_stream_pollable_write_nonblocking (GPollableOutputStream  *pollable,
-                                               const void             *buffer,
-                                               gsize                   size,
-                                               GError                **error)
-{
-  return write_internal (G_OUTPUT_STREAM (pollable), buffer, size, FALSE, NULL, error);
-}
-
-static void
-wing_output_stream_pollable_iface_init (GPollableOutputStreamInterface *iface)
-{
-  iface->is_writable = wing_output_stream_pollable_is_writable;
-  iface->create_source = wing_output_stream_pollable_create_source;
-  iface->write_nonblocking = wing_output_stream_pollable_write_nonblocking;
-}
-
 /**
  * wing_output_stream_new:
  * @handle: a Win32 file handle


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]