[wing/wip/poll-stream] inputstream: rework to implement read_async



commit e934ec3951882e2621598e889252d7051471eb0f
Author: Ignacio Casal Quinteiro <qignacio amazon com>
Date:   Fri Nov 30 10:30:04 2018 +0100

    inputstream: rework to implement read_async

 wing/winginputstream.c | 240 ++++++++++++++++++++++++++-----------------------
 1 file changed, 127 insertions(+), 113 deletions(-)
---
diff --git a/wing/winginputstream.c b/wing/winginputstream.c
index 851b71b..5750432 100644
--- a/wing/winginputstream.c
+++ b/wing/winginputstream.c
@@ -52,12 +52,7 @@ enum {
 
 static GParamSpec *props[LAST_PROP];
 
-static void wing_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface);
-
-G_DEFINE_TYPE_WITH_CODE (WingInputStream, wing_input_stream, G_TYPE_INPUT_STREAM,
-                         G_ADD_PRIVATE (WingInputStream)
-                         G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM, 
wing_input_stream_pollable_iface_init)
-                         )
+G_DEFINE_TYPE_WITH_PRIVATE (WingInputStream, wing_input_stream, G_TYPE_INPUT_STREAM)
 
 static void
 wing_input_stream_finalize (GObject *object)
@@ -126,17 +121,17 @@ wing_input_stream_get_property (GObject    *object,
 }
 
 static gssize
-read_internal (GInputStream  *stream,
-               void          *buffer,
-               gsize          count,
-               gboolean       blocking,
-               GCancellable  *cancellable,
-               GError       **error)
+wing_input_stream_read (GInputStream  *stream,
+                        void          *buffer,
+                        gsize          count,
+                        GCancellable  *cancellable,
+                        GError       **error)
 {
   WingInputStream *wing_stream;
   WingInputStreamPrivate *priv;
   BOOL res;
   DWORD nbytes, nread;
+  OVERLAPPED overlap = { 0, };
   gssize retval = -1;
 
   wing_stream = WING_INPUT_STREAM (stream);
@@ -145,64 +140,31 @@ read_internal (GInputStream  *stream,
   if (g_cancellable_set_error_if_cancelled (cancellable, error))
     return -1;
 
-  if (!blocking && g_pollable_input_stream_is_readable (G_POLLABLE_INPUT_STREAM (stream)))
-    {
-      gboolean result;
-
-      result = GetOverlappedResult (priv->overlap.hEvent, &priv->overlap, &nread, FALSE);
-      if (!result && GetLastError () == ERROR_IO_INCOMPLETE)
-        {
-          g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
-                               g_strerror (EAGAIN));
-          return -1;
-        }
-
-      ResetEvent (priv->overlap.hEvent);
-
-      retval = nread;
-      goto end;
-    }
-
   if (count > G_MAXINT)
     nbytes = G_MAXINT;
   else
     nbytes = count;
 
-  ResetEvent (priv->overlap.hEvent);
+  overlap.hEvent = CreateEvent (NULL, FALSE, FALSE, NULL);
+  g_return_val_if_fail (overlap.hEvent != NULL, -1);
 
-  res = ReadFile (priv->handle, buffer, nbytes, &nread, &priv->overlap);
+  res = ReadFile (priv->handle, buffer, nbytes, &nread, &overlap);
   if (res)
-    {
-      retval = nread;
-      ResetEvent (priv->overlap.hEvent);
-    }
+    retval = nread;
   else
     {
       int errsv = GetLastError ();
 
-      if (errsv == ERROR_IO_PENDING)
+      if (errsv == ERROR_IO_PENDING &&
+          wing_overlap_wait_result (priv->handle,
+                                    &overlap, &nread, cancellable))
         {
-          if (!blocking)
-            {
-              g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
-                                   g_strerror (EAGAIN));
-              goto end;
-            }
-          else if (blocking && wing_overlap_wait_result (priv->handle,
-                                                         &priv->overlap,
-                                                         &nread, cancellable))
-            {
-              retval = nread;
-              ResetEvent (priv->overlap.hEvent);
-              goto end;
-            }
+          retval = nread;
+          goto end;
         }
 
       if (g_cancellable_set_error_if_cancelled (cancellable, error))
-        {
-          ResetEvent (priv->overlap.hEvent);
-          goto end;
-        }
+        goto end;
 
       errsv = GetLastError ();
       if (errsv == ERROR_MORE_DATA)
@@ -212,7 +174,6 @@ read_internal (GInputStream  *stream,
            * parameter specifies, ReadFile returns FALSE and
            * GetLastError returns ERROR_MORE_DATA */
           retval = nread;
-          ResetEvent (priv->overlap.hEvent);
           goto end;
         }
       else if (errsv == ERROR_HANDLE_EOF ||
@@ -239,19 +200,10 @@ read_internal (GInputStream  *stream,
     }
 
 end:
+  CloseHandle (overlap.hEvent);
   return retval;
 }
 
-static gssize
-wing_input_stream_read (GInputStream  *stream,
-                        void          *buffer,
-                        gsize          count,
-                        GCancellable  *cancellable,
-                        GError       **error)
-{
-  return read_internal (stream, buffer, count, TRUE, cancellable, error);
-}
-
 static gboolean
 wing_input_stream_close (GInputStream  *stream,
                            GCancellable  *cancellable,
@@ -284,6 +236,114 @@ wing_input_stream_close (GInputStream  *stream,
   return TRUE;
 }
 
+static gboolean
+read_async_ready (WingInputStream *stream,
+                  gpointer         user_data)
+{
+  WingInputStreamPrivate *priv;
+  GTask *task = user_data;
+  DWORD nread;
+  gboolean result;
+
+  priv = wing_input_stream_get_instance_private (wing_stream);
+
+  result = GetOverlappedResult (priv->overlap.hEvent, &priv->overlap, &nread, FALSE);
+  if (!result && GetLastError () == ERROR_IO_INCOMPLETE)
+    {
+      /* Try again to wait for the event to get ready */
+      ResetEvent (priv->overlap.hEvent);
+      return G_SOURCE_CONTINUE;
+    }
+
+  ResetEvent (priv->overlap.hEvent);
+
+  g_task_return_int (task, nread);
+
+  return G_SOURCE_REMOVE;
+}
+
+static void
+wing_input_stream_read_async (GInputStream        *stream,
+                              void                *buffer,
+                              gsize                count,
+                              int                  io_priority,
+                              GCancellable        *cancellable,
+                              GAsyncReadyCallback  callback,
+                              gpointer             user_data)
+{
+  WingInputStream *wing_stream;
+  WingInputStreamPrivate *priv;
+  DWORD nbytes, nread;
+  int errsv;
+  GTask *task;
+  gchar *emsg;
+
+  wing_stream = WING_INPUT_STREAM (stream);
+  priv = wing_input_stream_get_instance_private (wing_stream);
+
+  task = g_task_new (stream, cancellable, callback, user_data);
+
+  if (count > G_MAXINT)
+    nbytes = G_MAXINT;
+  else
+    nbytes = count;
+
+  ResetEvent (priv->overlap.hEvent);
+
+  res = ReadFile (priv->handle, buffer, nbytes, &nread, &priv->overlap);
+  if (res)
+    {
+      ResetEvent (priv->overlap.hEvent);
+      g_task_return_int (task, nread);
+      g_object_unref (task);
+      return;
+    }
+
+  errsv = GetLastError ();
+
+  if (errsv == ERROR_IO_PENDING)
+    {
+      GSource *handle_source;
+
+      handle_source = wing_create_source (priv->overlap.hEvent, G_IO_IN, cancellable);
+      g_task_attach_source (task, handle_source,
+                            (GSourceFunc)read_async_ready);
+      g_source_unref (handle_source);
+      return;
+    }
+
+  if (errsv == ERROR_MORE_DATA)
+    {
+      /* If a named pipe is being read in message mode and the
+       * next message is longer than the nNumberOfBytesToRead
+       * parameter specifies, ReadFile returns FALSE and
+       * GetLastError returns ERROR_MORE_DATA */
+      ResetEvent (priv->overlap.hEvent);
+      g_task_return_int (task, nread);
+      return
+    }
+
+  if (errsv == ERROR_HANDLE_EOF ||
+      errsv == ERROR_BROKEN_PIPE)
+    {
+      /* TODO: the other end of a pipe may call the WriteFile
+       * function with nNumberOfBytesToWrite set to zero. In this
+       * case, it's not possible for the caller to know if it's
+       * broken pipe or a read of 0. Perhaps we should add a
+       * is_broken flag for this win32 case.. */
+      g_task_return_int (task, 0);
+      return;
+    }
+
+  emsg = g_win32_error_message (errsv);
+  g_task_report_new_error (stream, callback, user_data,
+                           wing_input_stream_read_async,
+                           G_IO_ERROR, g_io_error_from_win32_error (errsv),
+                           "Error reading from handle: %s",
+                           emsg);
+  g_free (emsg);
+}
+
 static void
 wing_input_stream_class_init (WingInputStreamClass *klass)
 {
@@ -296,6 +356,7 @@ wing_input_stream_class_init (WingInputStreamClass *klass)
 
   stream_class->read_fn = wing_input_stream_read;
   stream_class->close_fn = wing_input_stream_close;
+  stream_class->read_async = wing_input_stream_read_async;
 
   /**
    * WingInputStream:handle:
@@ -338,53 +399,6 @@ wing_input_stream_init (WingInputStream *wing_stream)
   g_return_if_fail (priv->overlap.hEvent != INVALID_HANDLE_VALUE);
 }
 
-
-static gboolean
-wing_input_stream_pollable_is_readable (GPollableInputStream *pollable)
-{
-  WingInputStream *wing_stream = WING_INPUT_STREAM (pollable);
-  WingInputStreamPrivate *priv;
-
-  priv = wing_input_stream_get_instance_private (wing_stream);
-
-  return WaitForSingleObject (priv->overlap.hEvent, 0) == WAIT_OBJECT_0;
-}
-
-static GSource *
-wing_input_stream_pollable_create_source (GPollableInputStream *pollable,
-                                          GCancellable         *cancellable)
-{
-  WingInputStream *wing_stream = WING_INPUT_STREAM (pollable);
-  WingInputStreamPrivate *priv;
-  GSource *handle_source, *pollable_source;
-
-  priv = wing_input_stream_get_instance_private (wing_stream);
-
-  handle_source = wing_create_source (priv->overlap.hEvent,
-                                      G_IO_IN, cancellable);
-  pollable_source = g_pollable_source_new_full (pollable, handle_source, cancellable);
-  g_source_unref (handle_source);
-
-  return pollable_source;
-}
-
-static gssize
-wing_input_stream_pollable_read_nonblocking (GPollableInputStream  *pollable,
-                                             void                  *buffer,
-                                             gsize                  count,
-                                             GError               **error)
-{
-  return read_internal (G_INPUT_STREAM (pollable), buffer, count, FALSE, NULL, error);
-}
-
-static void
-wing_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface)
-{
-  iface->is_readable = wing_input_stream_pollable_is_readable;
-  iface->create_source = wing_input_stream_pollable_create_source;
-  iface->read_nonblocking = wing_input_stream_pollable_read_nonblocking;
-}
-
 /**
  * wing_input_stream_new:
  * @handle: a Win32 file handle


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]