[wing/wip/nacho/poll-stream] Add WingInputStream class



commit 8d5bc53ff24e054d42a4848af0d94ef1de7e5d30
Author: Ignacio Casal Quinteiro <qignacio amazon com>
Date:   Wed Nov 28 15:54:26 2018 +0100

    Add WingInputStream class
    
    This is almost the same as the GWin32InputStream but with the
    difference that it also implements the Pollable interface

 wing/meson.build               |   2 +
 wing/wing.h                    |   1 +
 wing/winginputstream.c         | 402 +++++++++++++++++++++++++++++++++++++++++
 wing/winginputstream.h         |  64 +++++++
 wing/wingnamedpipeconnection.c |   3 +-
 wing/wingutils.c               |  51 ++++++
 wing/wingutils.h               |   6 +
 7 files changed, 527 insertions(+), 2 deletions(-)
---
diff --git a/wing/meson.build b/wing/meson.build
index a7ca34e..a573077 100644
--- a/wing/meson.build
+++ b/wing/meson.build
@@ -2,6 +2,7 @@ headers = [
   'wing.h',
   'wingcredentials.h',
   'wingeventwindow.h',
+  'winginputstream.h',
   'wingversionmacros.h',
   'wingnamedpipeclient.h',
   'wingnamedpipeconnection.h',
@@ -16,6 +17,7 @@ sources = [
   'wingcredentials.c',
   'wingeventwindow.c',
   'wing-init.c',
+  'winginputstream.c',
   'wingnamedpipeclient.c',
   'wingnamedpipeconnection.c',
   'wingnamedpipelistener.c',
diff --git a/wing/wing.h b/wing/wing.h
index 5cc2ed3..4c2e8e8 100644
--- a/wing/wing.h
+++ b/wing/wing.h
@@ -22,6 +22,7 @@
 
 #include <wing/wingversionmacros.h>
 #include <wing/wingeventwindow.h>
+#include <wing/winginputstream.h>
 #include <wing/wingnamedpipeclient.h>
 #include <wing/wingnamedpipelistener.h>
 #include <wing/wingnamedpipeconnection.h>
diff --git a/wing/winginputstream.c b/wing/winginputstream.c
new file mode 100644
index 0000000..edaf10f
--- /dev/null
+++ b/wing/winginputstream.c
@@ -0,0 +1,402 @@
+/*
+ * Copyright (C) 2006-2010 Red Hat, Inc.
+ * Copyright (C) 2018 NICE s.r.l.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General
+ * Public License along with this library; if not, see <http://www.gnu.org/licenses/>.
+ *
+ * Author: Alexander Larsson <alexl redhat com>
+ * Author: Tor Lillqvist <tml iki fi>
+ * Author: Ignacio Casal Quinteiro <qignacio amazon com>
+ */
+
+#include "winginputstream.h"
+
+#include <windows.h>
+
+/**
+ * SECTION:winginputstream
+ * @short_description: Streaming input operations for Windows file handles
+ * @see_also: #GInputStream
+ *
+ * #WingInputStream implements #GInputStream for reading from a
+ * Windows file handle.
+ */
+
+struct _WingInputStreamPrivate {
+  HANDLE handle;
+  gboolean close_handle;
+
+  OVERLAPPED overlap;
+};
+
+enum {
+  PROP_0,
+  PROP_HANDLE,
+  PROP_CLOSE_HANDLE,
+  LAST_PROP
+};
+
+static GParamSpec *props[LAST_PROP];
+
+static void wing_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface);
+
+G_DEFINE_TYPE_WITH_CODE (WingInputStream, wing_input_stream, G_TYPE_INPUT_STREAM,
+                         G_ADD_PRIVATE (WingInputStream)
+                         G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM, 
wing_input_stream_pollable_iface_init)
+                         )
+
+static void
+wing_input_stream_set_property (GObject         *object,
+                                guint            prop_id,
+                                const GValue    *value,
+                                GParamSpec      *pspec)
+{
+  WingInputStream *wing_stream;
+
+  wing_stream = WING_INPUT_STREAM (object);
+
+  switch (prop_id)
+    {
+    case PROP_HANDLE:
+      wing_stream->priv->handle = g_value_get_pointer (value);
+      break;
+    case PROP_CLOSE_HANDLE:
+      wing_stream->priv->close_handle = g_value_get_boolean (value);
+      break;
+    default:
+      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+      break;
+    }
+}
+
+static void
+wing_input_stream_get_property (GObject    *object,
+                                guint       prop_id,
+                                GValue     *value,
+                                GParamSpec *pspec)
+{
+  WingInputStream *wing_stream;
+
+  wing_stream = WING_INPUT_STREAM (object);
+
+  switch (prop_id)
+    {
+    case PROP_HANDLE:
+      g_value_set_pointer (value, wing_stream->priv->handle);
+      break;
+    case PROP_CLOSE_HANDLE:
+      g_value_set_boolean (value, wing_stream->priv->close_handle);
+      break;
+    default:
+      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+    }
+}
+
+static gssize
+read_internal (GInputStream  *stream,
+               void          *buffer,
+               gsize          count,
+               gboolean       blocking,
+               GCancellable  *cancellable,
+               GError       **error)
+{
+  WingInputStream *wing_stream;
+  BOOL res;
+  DWORD nbytes, nread;
+  gssize retval = -1;
+
+  wing_stream = WING_INPUT_STREAM (stream);
+
+  if (g_cancellable_set_error_if_cancelled (cancellable, error))
+    return -1;
+
+  if (count > G_MAXINT)
+    nbytes = G_MAXINT;
+  else
+    nbytes = count;
+
+  ResetEvent (wing_stream->priv->overlap.hEvent);
+
+  res = ReadFile (wing_stream->priv->handle, buffer, nbytes, &nread, &wing_stream->priv->overlap);
+  if (res)
+    retval = nread;
+  else
+    {
+      int errsv = GetLastError ();
+
+      if (errsv == ERROR_IO_PENDING)
+        {
+          if (!blocking ||
+              blocking && wing_overlap_wait_result (win32_stream->priv->handle,
+                                                    &wing_stream->priv->overlap,
+                                                    &nread, cancellable))
+            retval = nread;
+            goto end;
+        }
+
+      if (g_cancellable_set_error_if_cancelled (cancellable, error))
+        goto end;
+
+      errsv = GetLastError ();
+      if (errsv == ERROR_MORE_DATA)
+        {
+          /* If a named pipe is being read in message mode and the
+           * next message is longer than the nNumberOfBytesToRead
+           * parameter specifies, ReadFile returns FALSE and
+           * GetLastError returns ERROR_MORE_DATA */
+          retval = nread;
+          goto end;
+        }
+      else if (errsv == ERROR_HANDLE_EOF ||
+               errsv == ERROR_BROKEN_PIPE)
+        {
+          /* TODO: the other end of a pipe may call the WriteFile
+           * function with nNumberOfBytesToWrite set to zero. In this
+           * case, it's not possible for the caller to know if it's
+           * broken pipe or a read of 0. Perhaps we should add a
+           * is_broken flag for this win32 case.. */
+          retval = 0;
+        }
+      else
+        {
+          gchar *emsg;
+
+          emsg = wing_error_message (errsv);
+          g_set_error (error, G_IO_ERROR,
+                       g_io_error_from_win32_error (errsv),
+                       "Error reading from handle: %s",
+                       emsg);
+          g_free (emsg);
+        }
+    }
+
+end:
+  return retval;
+}
+
+static gssize
+wing_input_stream_read (GInputStream  *stream,
+                        void          *buffer,
+                        gsize          count,
+                        GCancellable  *cancellable,
+                        GError       **error)
+{
+  return read_internal (stream, buffer, count, TRUE, cancellable, error);
+}
+
+static gboolean
+wing_input_stream_close (GInputStream  *stream,
+                           GCancellable  *cancellable,
+                           GError       **error)
+{
+  WingInputStream *wing_stream;
+  BOOL res;
+
+  wing_stream = WING_INPUT_STREAM (stream);
+
+  if (!wing_stream->priv->close_handle)
+    return TRUE;
+
+  res = CloseHandle (wing_stream->priv->handle);
+  if (!res)
+    {
+      int errsv = GetLastError ();
+      gchar *emsg = wing_error_message (errsv);
+
+      g_set_error (error, G_IO_ERROR,
+                   g_io_error_from_win32_error (errsv),
+                   "Error closing handle: %s",
+                   emsg);
+      g_free (emsg);
+      return FALSE;
+    }
+
+  return TRUE;
+}
+
+static void
+wing_input_stream_class_init (WingInputStreamClass *klass)
+{
+  GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
+  GInputStreamClass *stream_class = G_INPUT_STREAM_CLASS (klass);
+
+  gobject_class->get_property = wing_input_stream_get_property;
+  gobject_class->set_property = wing_input_stream_set_property;
+
+  stream_class->read_fn = wing_input_stream_read;
+  stream_class->close_fn = wing_input_stream_close;
+
+  /**
+   * WingInputStream:handle:
+   *
+   * The handle that the stream reads from.
+   */
+  props[PROP_HANDLE] =
+    g_param_spec_pointer ("handle",
+                          "File handle",
+                          "The file handle to read from",
+                          G_PARAM_READWRITE |
+                          G_PARAM_CONSTRUCT_ONLY |
+                          G_PARAM_STATIC_STRINGS);
+
+  /**
+   * WingInputStream:close-handle:
+   *
+   * Whether to close the file handle when the stream is closed.
+   */
+  props[PROP_CLOSE_HANDLE] =
+    g_param_spec_boolean ("close-handle",
+                          "Close file handle",
+                          "Whether to close the file handle when the stream is closed",
+                          TRUE,
+                          G_PARAM_READWRITE |
+                          G_PARAM_STATIC_STRINGS);
+
+  g_object_class_install_properties (gobject_class, LAST_PROP, props);
+}
+
+static void
+wing_input_stream_init (WingInputStream *wing_stream)
+{
+  wing_stream->priv = wing_input_stream_get_instance_private (wing_stream);
+  wing_stream->priv->handle = NULL;
+  wing_stream->priv->close_handle = TRUE;
+  wing_stream->priv->overlap.hEvent = CreateEvent (NULL, FALSE, FALSE, NULL);
+  g_return_val_if_fail (wing_stream->priv->overlap.hEvent != NULL, -1);
+}
+
+
+static gboolean
+wing_input_stream_pollable_is_readable (GPollableInputStream *pollable)
+{
+  GSocketInputStream *input_stream = G_SOCKET_INPUT_STREAM (pollable);
+
+  return g_socket_condition_check (input_stream->priv->socket, G_IO_IN);
+}
+
+static GSource *
+wing_input_stream_pollable_create_source (GPollableInputStream *pollable,
+                                          GCancellable         *cancellable)
+{
+  WingInputStream *input_stream = WING_INPUT_STREAM (pollable);
+  GSource *handle_source, *pollable_source;
+
+  pollable_source = g_pollable_source_new (G_OBJECT (input_stream));
+  handle_source = wing_create_source (input_stream->priv->overlapped.hEvent,
+                                      G_IO_IN, cancellable);
+  g_source_set_dummy_callback (handle_source);
+  g_source_add_child_source (pollable_source, handle_source);
+  g_source_unref (handle_source);
+
+  return pollable_source;
+}
+
+static gssize
+g_socket_input_stream_pollable_read_nonblocking (GPollableInputStream  *pollable,
+                                                 void                  *buffer,
+                                                 gsize                  count,
+                                                 GError               **error)
+{
+  return read_internal (stream, buffer, count, FALSE, NULL, error);
+}
+
+static void
+wing_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface)
+{
+  iface->is_readable = wing_input_stream_pollable_is_readable;
+  iface->create_source = wing_input_stream_pollable_create_source;
+  iface->read_nonblocking = wing_input_stream_pollable_read_nonblocking;
+}
+
+/**
+ * wing_input_stream_new:
+ * @handle: a Win32 file handle
+ * @close_handle: %TRUE to close the handle when done
+ *
+ * Creates a new #WingInputStream for the given @handle.
+ *
+ * If @close_handle is %TRUE, the handle will be closed
+ * when the stream is closed.
+ *
+ * Note that "handle" here means a Win32 HANDLE, not a "file descriptor"
+ * as used in the Windows C libraries.
+ *
+ * Returns: a new #WingInputStream
+ **/
+GInputStream *
+wing_input_stream_new (void     *handle,
+                          gboolean close_handle)
+{
+  g_return_val_if_fail (handle != NULL, NULL);
+
+  return g_object_new (G_TYPE_WIN32_INPUT_STREAM,
+                       "handle", handle,
+                       "close-handle", close_handle,
+                       NULL);
+}
+
+/**
+ * wing_input_stream_set_close_handle:
+ * @stream: a #WingInputStream
+ * @close_handle: %TRUE to close the handle when done
+ *
+ * Sets whether the handle of @stream shall be closed
+ * when the stream is closed.
+ */
+void
+wing_input_stream_set_close_handle (WingInputStream *stream,
+                                       gboolean          close_handle)
+{
+  g_return_if_fail (G_IS_WIN32_INPUT_STREAM (stream));
+
+  close_handle = close_handle != FALSE;
+  if (stream->priv->close_handle != close_handle)
+    {
+      stream->priv->close_handle = close_handle;
+      g_object_notify (G_OBJECT (stream), "close-handle");
+    }
+}
+
+/**
+ * wing_input_stream_get_close_handle:
+ * @stream: a #WingInputStream
+ *
+ * Returns whether the handle of @stream will be
+ * closed when the stream is closed.
+ *
+ * Returns: %TRUE if the handle is closed when done
+ */
+gboolean
+wing_input_stream_get_close_handle (WingInputStream *stream)
+{
+  g_return_val_if_fail (G_IS_WIN32_INPUT_STREAM (stream), FALSE);
+
+  return stream->priv->close_handle;
+}
+
+/**
+ * wing_input_stream_get_handle:
+ * @stream: a #WingInputStream
+ *
+ * Return the Windows file handle that the stream reads from.
+ *
+ * Returns: The file handle of @stream
+ */
+void *
+wing_input_stream_get_handle (WingInputStream *stream)
+{
+  g_return_val_if_fail (G_IS_WIN32_INPUT_STREAM (stream), NULL);
+
+  return stream->priv->handle;
+}
diff --git a/wing/winginputstream.h b/wing/winginputstream.h
new file mode 100644
index 0000000..c0c682a
--- /dev/null
+++ b/wing/winginputstream.h
@@ -0,0 +1,64 @@
+/*
+ * Copyright (C) 2006-2010 Red Hat, Inc.
+ * Copyright (C) 2018 NICE s.r.l.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General
+ * Public License along with this library; if not, see <http://www.gnu.org/licenses/>.
+ *
+ * Author: Alexander Larsson <alexl redhat com>
+ * Author: Tor Lillqvist <tml iki fi>
+ * Author: Ignacio Casal Quinteiro <qignacio amazon com>
+ */
+
+#ifndef WING_INPUT_STREAM_H
+#define WING_INPUT_STREAM_H
+
+#include <gio/gio.h>
+
+G_BEGIN_DECLS
+
+#define WING_TYPE_INPUT_STREAM (wing_input_stream_get_type ())
+
+/**
+ * WingInputStream:
+ *
+ * Implements #GInputStream for reading from selectable Windows file handles
+ **/
+WING_AVAILABLE_IN_ALL
+G_DECLARE_FINAL_TYPE (WingInputStream, wing_input_stream, WING, INPUT_STREAM, GInputStream)
+
+struct _WingInputStreamClass
+{
+  GInputStreamClass parent_class;
+
+  /*< private >*/
+  gpointer padding[10];
+};
+
+GLIB_AVAILABLE_IN_ALL
+GType          wing_input_stream_get_type         (void) G_GNUC_CONST;
+
+GLIB_AVAILABLE_IN_ALL
+GInputStream * wing_input_stream_new              (void              *handle,
+                                                   gboolean           close_handle);
+GLIB_AVAILABLE_IN_ALL
+void           wing_input_stream_set_close_handle (WingInputStream *stream,
+                                                   gboolean           close_handle);
+GLIB_AVAILABLE_IN_ALL
+gboolean       wing_input_stream_get_close_handle (WingInputStream *stream);
+GLIB_AVAILABLE_IN_ALL
+void          *wing_input_stream_get_handle       (WingInputStream *stream);
+
+G_END_DECLS
+
+#endif /* __WING_INPUT_STREAM_H__ */
diff --git a/wing/wingnamedpipeconnection.c b/wing/wingnamedpipeconnection.c
index e3ac37b..f0bce98 100644
--- a/wing/wingnamedpipeconnection.c
+++ b/wing/wingnamedpipeconnection.c
@@ -21,7 +21,6 @@
 #include "wingnamedpipeconnection.h"
 
 #include <gio/gio.h>
-#include <gio/gwin32inputstream.h>
 #include <gio/gwin32outputstream.h>
 
 #include <windows.h>
@@ -110,7 +109,7 @@ wing_named_pipe_connection_set_property (GObject      *object,
       connection->handle = g_value_get_pointer (value);
       if (connection->handle != NULL && connection->handle != INVALID_HANDLE_VALUE)
         {
-          connection->input_stream = g_win32_input_stream_new (connection->handle, FALSE);
+          connection->input_stream = wing_input_stream_new (connection->handle, FALSE);
           connection->output_stream = g_win32_output_stream_new (connection->handle, FALSE);
         }
       break;
diff --git a/wing/wingutils.c b/wing/wingutils.c
index 1cc1e49..4bdd4a7 100644
--- a/wing/wingutils.c
+++ b/wing/wingutils.c
@@ -127,3 +127,54 @@ wing_get_n_processors (void)
 
   return n > 1 ? (guint)n : 1;
 }
+
+gboolean
+wing_overlap_wait_result (HANDLE           hfile,
+                          OVERLAPPED      *overlap,
+                          DWORD           *transferred,
+                          GCancellable    *cancellable)
+{
+  GPollFD pollfd[2];
+  gboolean result = FALSE;
+  gint num, npoll;
+
+#if GLIB_SIZEOF_VOID_P == 8
+  pollfd[0].fd = (gint64)overlap->hEvent;
+#else
+  pollfd[0].fd = (gint)overlap->hEvent;
+#endif
+  pollfd[0].events = G_IO_IN;
+  num = 1;
+
+  if (g_cancellable_make_pollfd (cancellable, &pollfd[1]))
+    num++;
+
+loop:
+  npoll = g_poll (pollfd, num, -1);
+  if (npoll <= 0)
+    /* error out, should never happen */
+    goto end;
+
+  if (g_cancellable_is_cancelled (cancellable))
+    {
+      /* CancelIO only cancels pending operations issued by the
+       * current thread and since we're doing only sync operations,
+       * this is safe.... */
+      /* CancelIoEx is only Vista+. Since we have only one overlap
+       * operaton on this thread, we can just use: */
+      result = CancelIo (hfile);
+      g_warn_if_fail (result);
+    }
+
+  result = GetOverlappedResult (overlap->hEvent, overlap, transferred, FALSE);
+  if (result == FALSE &&
+      GetLastError () == ERROR_IO_INCOMPLETE &&
+      !g_cancellable_is_cancelled (cancellable))
+    goto loop;
+
+end:
+  if (num > 1)
+    g_cancellable_release_fd (cancellable);
+
+  return result;
+}
diff --git a/wing/wingutils.h b/wing/wingutils.h
index da6653b..e385f14 100644
--- a/wing/wingutils.h
+++ b/wing/wingutils.h
@@ -44,6 +44,12 @@ gboolean     wing_get_process_times    (gint64 *current_user_time,
 WING_AVAILABLE_IN_ALL
 guint        wing_get_n_processors     (void);
 
+WING_AVAILABLE_IN_ALL
+gboolean     wing_overlap_wait_result  (HANDLE           hfile,
+                                        OVERLAPPED      *overlap,
+                                        DWORD           *transferred,
+                                        GCancellable    *cancellable);
+
 G_END_DECLS
 
 #endif /* WING_UTILS_H */


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]