[wing] fix bug on WingNamedPipeConnection close when iocp is used



commit 7f579a478034381b021604138038e28a4a1af7c0
Author: Silvio Lazzeretti <silviola amazon com>
Date:   Wed Mar 3 10:31:41 2021 +0100

    fix bug on WingNamedPipeConnection close when iocp is used
    
    Sometimes the wing_iocp_input_stream_read_async crashed the application
    because it was starting a read operation when the ThreadPoolIo object
    was already deallocated. This was happening after the close of
    WingNamedPipeConnection. This patch keeps the ThreadPoolIo object
    alive until there is someone using it. It also invalidates the handles
    in the input and output streams when the connection closes it.

 wing/meson.build               |   2 +
 wing/wingiocpinputstream.c     | 127 +++++++++++++----------------
 wing/wingiocpinputstream.h     |   7 +-
 wing/wingiocpoutputstream.c    | 116 ++++++++++++--------------
 wing/wingiocpoutputstream.h    |   7 +-
 wing/wingnamedpipeconnection.c |  93 +++++++++------------
 wing/wingthreadpoolio.c        | 181 +++++++++++++++++++++++++++++++++++++++++
 wing/wingthreadpoolio.h        |  61 ++++++++++++++
 8 files changed, 394 insertions(+), 200 deletions(-)
---
diff --git a/wing/meson.build b/wing/meson.build
index 379d34d..b747b6c 100644
--- a/wing/meson.build
+++ b/wing/meson.build
@@ -12,6 +12,7 @@ headers = [
   'wingservice.h',
   'wingservicemanager.h',
   'wingsource.h',
+  'wingthreadpoolio.h',
   'wingutils.h',
 ]
 
@@ -30,6 +31,7 @@ sources = [
   'wingservice-private.h',
   'wingservicemanager.c',
   'wingsource.c',
+  'wingthreadpoolio.c',
   'wingutils.c',
 ]
 
diff --git a/wing/wingiocpinputstream.c b/wing/wingiocpinputstream.c
index 006ca6f..3f01b0e 100644
--- a/wing/wingiocpinputstream.c
+++ b/wing/wingiocpinputstream.c
@@ -36,14 +36,12 @@
  */
 
 typedef struct {
-  HANDLE handle;
   gboolean close_handle;
-  PTP_IO threadpool_io;
+  WingThreadPoolIo *thread_pool_io;
 } WingIocpInputStreamPrivate;
 
 enum {
   PROP_0,
-  PROP_HANDLE,
   PROP_CLOSE_HANDLE,
   PROP_THREADPOOL_IO,
   LAST_PROP
@@ -53,6 +51,26 @@ static GParamSpec *props[LAST_PROP];
 
 G_DEFINE_TYPE_WITH_PRIVATE (WingIocpInputStream, wing_iocp_input_stream, G_TYPE_INPUT_STREAM)
 
+static void
+wing_iocp_input_stream_finalize (GObject *object)
+{
+  WingIocpInputStream *wing_stream;
+  WingIocpInputStreamPrivate *priv;
+
+  wing_stream = WING_IOCP_INPUT_STREAM (object);
+  priv = wing_iocp_input_stream_get_instance_private (wing_stream);
+
+  if (priv->thread_pool_io)
+    {
+      if (priv->close_handle)
+        wing_thread_pool_io_close_handle (priv->thread_pool_io, NULL);
+
+      wing_thread_pool_io_unref (priv->thread_pool_io);
+    }
+
+  G_OBJECT_CLASS (wing_iocp_input_stream_parent_class)->finalize (object);
+}
+
 static void
 wing_iocp_input_stream_set_property (GObject         *object,
                                      guint            prop_id,
@@ -67,16 +85,12 @@ wing_iocp_input_stream_set_property (GObject         *object,
 
   switch (prop_id)
     {
-    case PROP_HANDLE:
-      priv->handle = g_value_get_pointer (value);
-      break;
-
     case PROP_CLOSE_HANDLE:
       priv->close_handle = g_value_get_boolean (value);
       break;
 
     case PROP_THREADPOOL_IO:
-      priv->threadpool_io = g_value_get_pointer (value);
+      priv->thread_pool_io = g_value_dup_boxed (value);
       break;
 
     default:
@@ -99,14 +113,11 @@ wing_iocp_input_stream_get_property (GObject    *object,
 
   switch (prop_id)
     {
-    case PROP_HANDLE:
-      g_value_set_pointer (value, priv->handle);
-      break;
     case PROP_CLOSE_HANDLE:
       g_value_set_boolean (value, priv->close_handle);
       break;
     case PROP_THREADPOOL_IO:
-      g_value_set_pointer (value, priv->threadpool_io);
+      g_value_set_boxed (value, priv->thread_pool_io);
       break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
@@ -128,30 +139,11 @@ wing_iocp_input_stream_close (GInputStream  *stream,
   if (!priv->close_handle)
     return TRUE;
 
-  res = CloseHandle (priv->handle);
-  if (!res)
-    {
-      int errsv = GetLastError ();
-      gchar *emsg = g_win32_error_message (errsv);
-
-      g_set_error (error, G_IO_ERROR,
-                   g_io_error_from_win32_error (errsv),
-                   "Error closing handle: %s",
-                   emsg);
-      g_free (emsg);
-      return FALSE;
-    }
-
-  priv->handle = INVALID_HANDLE_VALUE;
-
-  if (priv->threadpool_io != NULL)
-    {
-      WaitForThreadpoolIoCallbacks (priv->threadpool_io, FALSE);
-      CloseThreadpoolIo (priv->threadpool_io);
-      priv->threadpool_io = NULL;
-    }
+  res = wing_thread_pool_io_close_handle (priv->thread_pool_io, error);
+  if (res)
+    g_clear_pointer (&priv->thread_pool_io, wing_thread_pool_io_unref);
 
-  return TRUE;
+  return res;
 }
 
 static void
@@ -167,6 +159,7 @@ threadpool_io_completion (PTP_CALLBACK_INSTANCE instance,
   GTask *task;
 
   task = G_TASK (user_data);
+
   if (result == NO_ERROR)
     {
       g_task_return_int (task, number_of_bytes_transferred);
@@ -199,7 +192,7 @@ on_cancellable_cancelled (GCancellable *cancellable,
   wing_stream = WING_IOCP_INPUT_STREAM (user_data);
   priv = wing_iocp_input_stream_get_instance_private (wing_stream);
 
-  CancelIo (priv->handle);
+  CancelIo (wing_thread_pool_get_handle (priv->thread_pool_io));
 }
 
 static void
@@ -216,6 +209,7 @@ wing_iocp_input_stream_read_async (GInputStream        *stream,
   WingIocpInputStream *wing_stream;
   WingIocpInputStreamPrivate *priv;
   int errsv;
+  HANDLE handle;
 
   wing_stream = WING_IOCP_INPUT_STREAM (stream);
   priv = wing_iocp_input_stream_get_instance_private (wing_stream);
@@ -229,7 +223,8 @@ wing_iocp_input_stream_read_async (GInputStream        *stream,
       return;
     }
 
-  if (priv->handle == INVALID_HANDLE_VALUE)
+  handle = wing_thread_pool_get_handle (priv->thread_pool_io);
+  if (handle == INVALID_HANDLE_VALUE)
     {
       g_task_return_new_error (task, G_IO_ERROR,
                                G_IO_ERROR_CLOSED,
@@ -248,9 +243,9 @@ wing_iocp_input_stream_read_async (GInputStream        *stream,
                                                         G_CALLBACK (on_cancellable_cancelled),
                                                         wing_stream, NULL);
 
-  StartThreadpoolIo (priv->threadpool_io);
+  wing_thread_pool_io_start (priv->thread_pool_io);
 
-  ReadFile (priv->handle, buffer, (DWORD) count, NULL, (OVERLAPPED *) overlapped);
+  ReadFile (handle, buffer, (DWORD) count, NULL, (OVERLAPPED *) overlapped);
   errsv = GetLastError ();
   if (errsv != NO_ERROR && errsv != ERROR_IO_PENDING)
     {
@@ -262,7 +257,7 @@ wing_iocp_input_stream_read_async (GInputStream        *stream,
                                emsg);
       g_free (emsg);
 
-      CancelThreadpoolIo (priv->threadpool_io);
+      wing_thread_pool_io_cancel (priv->thread_pool_io);
 
       if (g_task_get_cancellable (task) != NULL)
         g_cancellable_disconnect (g_task_get_cancellable (task),
@@ -285,6 +280,7 @@ wing_iocp_input_stream_read (GInputStream  *stream,
   DWORD nbytes, nread;
   OVERLAPPED overlap = { 0, };
   gssize retval = -1;
+  HANDLE handle;
 
   wing_stream = WING_IOCP_INPUT_STREAM (stream);
   priv = wing_iocp_input_stream_get_instance_private (wing_stream);
@@ -300,6 +296,8 @@ wing_iocp_input_stream_read (GInputStream  *stream,
   overlap.hEvent = CreateEvent (NULL, FALSE, FALSE, NULL);
   g_return_val_if_fail (overlap.hEvent != NULL, -1);
 
+  handle = wing_thread_pool_get_handle (priv->thread_pool_io);
+
   /* This prevents the I/O completion port to be notified.
    * It is described in the documentation of the GetQueuedCompletionStatus
    * function, in the section related to the lpOverlapped parameter
@@ -310,7 +308,7 @@ wing_iocp_input_stream_read (GInputStream  *stream,
   overlap.hEvent = (HANDLE) ((gint) overlap.hEvent | 0x1);
 #endif
 
-  res = ReadFile (priv->handle, buffer, nbytes, &nread, &overlap);
+  res = ReadFile (handle, buffer, nbytes, &nread, &overlap);
   if (res)
     retval = nread;
   else
@@ -318,7 +316,7 @@ wing_iocp_input_stream_read (GInputStream  *stream,
       int errsv = GetLastError ();
 
       if (errsv == ERROR_IO_PENDING &&
-          wing_overlap_wait_result (priv->handle,
+          wing_overlap_wait_result (handle,
                                     &overlap, &nread,
                                     cancellable))
         {
@@ -374,6 +372,7 @@ wing_iocp_input_stream_class_init (WingIocpInputStreamClass *klass)
   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
   GInputStreamClass *stream_class = G_INPUT_STREAM_CLASS (klass);
 
+  gobject_class->finalize = wing_iocp_input_stream_finalize;
   gobject_class->get_property = wing_iocp_input_stream_get_property;
   gobject_class->set_property = wing_iocp_input_stream_set_property;
 
@@ -381,19 +380,6 @@ wing_iocp_input_stream_class_init (WingIocpInputStreamClass *klass)
   stream_class->read_async = wing_iocp_input_stream_read_async;
   stream_class->read_fn = wing_iocp_input_stream_read;
 
-  /**
-   * WingIocpInputStream:handle:
-   *
-   * The handle that the stream reads from.
-   */
-  props[PROP_HANDLE] =
-    g_param_spec_pointer ("handle",
-                          "File handle",
-                          "The file handle to read from",
-                          G_PARAM_READWRITE |
-                          G_PARAM_CONSTRUCT_ONLY |
-                          G_PARAM_STATIC_STRINGS);
-
   /**
    * WingIocpInputStream:close-handle:
    *
@@ -410,16 +396,16 @@ wing_iocp_input_stream_class_init (WingIocpInputStreamClass *klass)
   /**
    * WingNamedPipeConnection:threadpool-io:
    *
-   * The threadpool I/O object, returned by CreateThreadpoolIo, used to
-   * perform async I/O with completion ports.
+   * The threadpool I/O object, used to perform async I/O with completion ports.
    */
   props[PROP_THREADPOOL_IO] =
-    g_param_spec_pointer ("threadpool-io",
-                          "Threadpool I/O object",
-                          "The threadpool I/O object, returned by CreateThreadpoolIo, used to perform async 
I / O with completion ports",
-                          G_PARAM_READABLE |
-                          G_PARAM_WRITABLE |
-                          G_PARAM_STATIC_STRINGS);
+    g_param_spec_boxed ("threadpool-io",
+                        "Threadpool I/O object",
+                        "The threadpool I/O object used to perform async I / O with completion ports",
+                        WING_TYPE_THREAD_POOL_IO,
+                        G_PARAM_READWRITE |
+                        G_PARAM_CONSTRUCT_ONLY |
+                        G_PARAM_STATIC_STRINGS);
 
   g_object_class_install_properties (gobject_class, LAST_PROP, props);
 }
@@ -438,7 +424,7 @@ wing_iocp_input_stream_init (WingIocpInputStream *wing_stream)
  * wing_iocp_input_stream_new:
  * @handle: a Win32 file handle
  * @close_handle: %TRUE to close the handle when done
- * @threadpool_io: a pointer to the TP_IO structure returned by CreateThreadpoolIo
+ * @threadpool_io: a #WingThreadPoolIo object
  *
  * Creates a new #WingIocpInputStream for the given @handle.
  *
@@ -451,17 +437,14 @@ wing_iocp_input_stream_init (WingIocpInputStream *wing_stream)
  * Returns: a new #WingIocpInputStream
  **/
 GInputStream *
-wing_iocp_input_stream_new (void     *handle,
-                            gboolean  close_handle,
-                            PTP_IO    threadpool_io)
+wing_iocp_input_stream_new (gboolean          close_handle,
+                            WingThreadPoolIo *thread_pool_io)
 {
-  g_return_val_if_fail (handle != NULL, NULL);
-  g_return_val_if_fail (threadpool_io != NULL, NULL);
+  g_return_val_if_fail (thread_pool_io != NULL, NULL);
 
   return g_object_new (WING_TYPE_IOCP_INPUT_STREAM,
-                       "handle", handle,
                        "close-handle", close_handle,
-                       "threadpool-io", threadpool_io,
+                       "threadpool-io", thread_pool_io,
                        NULL);
 }
 
@@ -529,5 +512,5 @@ wing_iocp_input_stream_get_handle (WingIocpInputStream *stream)
 
   priv = wing_iocp_input_stream_get_instance_private (stream);
 
-  return priv->handle;
+  return wing_thread_pool_get_handle (priv->thread_pool_io);
 }
diff --git a/wing/wingiocpinputstream.h b/wing/wingiocpinputstream.h
index aa7aca4..8cb3a78 100644
--- a/wing/wingiocpinputstream.h
+++ b/wing/wingiocpinputstream.h
@@ -26,7 +26,7 @@
 
 #include <gio/gio.h>
 #include <wing/wingversionmacros.h>
-#include <windows.h>
+#include <wing/wingthreadpoolio.h>
 
 G_BEGIN_DECLS
 
@@ -52,9 +52,8 @@ WING_AVAILABLE_IN_ALL
 GType          wing_iocp_input_stream_get_type         (void) G_GNUC_CONST;
 
 WING_AVAILABLE_IN_ALL
-GInputStream * wing_iocp_input_stream_new              (void                *handle,
-                                                        gboolean             close_handle,
-                                                        PTP_IO               threadpool_io);
+GInputStream * wing_iocp_input_stream_new              (gboolean             close_handle,
+                                                        WingThreadPoolIo    *thread_pool_io);
 WING_AVAILABLE_IN_ALL
 void           wing_iocp_input_stream_set_close_handle (WingIocpInputStream *stream,
                                                         gboolean             close_handle);
diff --git a/wing/wingiocpoutputstream.c b/wing/wingiocpoutputstream.c
index b14d72f..55c035a 100644
--- a/wing/wingiocpoutputstream.c
+++ b/wing/wingiocpoutputstream.c
@@ -36,14 +36,12 @@
   */
 
 typedef struct {
-  HANDLE handle;
   gboolean close_handle;
-  PTP_IO threadpool_io;
+  WingThreadPoolIo *thread_pool_io;
 } WingIocpOutputStreamPrivate;
 
 enum {
   PROP_0,
-  PROP_HANDLE,
   PROP_CLOSE_HANDLE,
   PROP_THREADPOOL_IO,
   LAST_PROP
@@ -53,6 +51,26 @@ static GParamSpec *props[LAST_PROP];
 
 G_DEFINE_TYPE_WITH_PRIVATE (WingIocpOutputStream, wing_iocp_output_stream, G_TYPE_OUTPUT_STREAM)
 
+static void
+wing_iocp_output_stream_finalize (GObject *object)
+{
+  WingIocpOutputStream *wing_stream;
+  WingIocpOutputStreamPrivate *priv;
+
+  wing_stream = WING_IOCP_OUTPUT_STREAM (object);
+  priv = wing_iocp_output_stream_get_instance_private (wing_stream);
+
+  if (priv->thread_pool_io)
+    {
+      if (priv->close_handle)
+        wing_thread_pool_io_close_handle (priv->thread_pool_io, NULL);
+
+      wing_thread_pool_io_unref (priv->thread_pool_io);
+    }
+
+  G_OBJECT_CLASS (wing_iocp_output_stream_parent_class)->finalize (object);
+}
+
 static void
 wing_iocp_output_stream_set_property (GObject         *object,
                                       guint            prop_id,
@@ -67,16 +85,12 @@ wing_iocp_output_stream_set_property (GObject         *object,
 
   switch (prop_id)
     {
-    case PROP_HANDLE:
-      priv->handle = g_value_get_pointer (value);
-      break;
-
     case PROP_CLOSE_HANDLE:
       priv->close_handle = g_value_get_boolean (value);
       break;
 
     case PROP_THREADPOOL_IO:
-      priv->threadpool_io = g_value_get_pointer (value);
+      priv->thread_pool_io = g_value_dup_boxed (value);
       break;
 
     default:
@@ -99,14 +113,11 @@ wing_iocp_output_stream_get_property (GObject    *object,
 
   switch (prop_id)
     {
-    case PROP_HANDLE:
-      g_value_set_pointer (value, priv->handle);
-      break;
     case PROP_CLOSE_HANDLE:
       g_value_set_boolean (value, priv->close_handle);
       break;
     case PROP_THREADPOOL_IO:
-      g_value_set_pointer (value, priv->threadpool_io);
+      g_value_set_boxed (value, priv->thread_pool_io);
       break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
@@ -128,30 +139,12 @@ wing_iocp_output_stream_close (GOutputStream  *stream,
   if (!priv->close_handle)
     return TRUE;
 
-  res = CloseHandle (priv->handle);
-  if (!res)
-    {
-      int errsv = GetLastError ();
-      gchar *emsg = g_win32_error_message (errsv);
-
-      g_set_error (error, G_IO_ERROR,
-                   g_io_error_from_win32_error (errsv),
-                   "Error closing handle: %s",
-                   emsg);
-      g_free (emsg);
-      return FALSE;
-    }
+  res = wing_thread_pool_io_close_handle (priv->thread_pool_io, error);
 
-  priv->handle = INVALID_HANDLE_VALUE;
-
-  if (priv->threadpool_io != NULL)
-    {
-      WaitForThreadpoolIoCallbacks (priv->threadpool_io, FALSE);
-      CloseThreadpoolIo (priv->threadpool_io);
-      priv->threadpool_io = NULL;
-    }
+  if (res)
+    g_clear_pointer (&priv->thread_pool_io, wing_thread_pool_io_unref);
 
-  return TRUE;
+  return res;
 }
 
 static void
@@ -199,7 +192,7 @@ on_cancellable_cancelled (GCancellable *cancellable,
   wing_stream = WING_IOCP_OUTPUT_STREAM (user_data);
   priv = wing_iocp_output_stream_get_instance_private (wing_stream);
 
-  CancelIo (priv->handle);
+  CancelIo (wing_thread_pool_get_handle (priv->thread_pool_io));
 }
 
 static void
@@ -216,6 +209,7 @@ wing_iocp_output_stream_write_async (GOutputStream       *stream,
   WingIocpOutputStream *wing_stream;
   WingIocpOutputStreamPrivate *priv;
   int errsv;
+  HANDLE handle;
 
   wing_stream = WING_IOCP_OUTPUT_STREAM (stream);
   priv = wing_iocp_output_stream_get_instance_private (wing_stream);
@@ -229,7 +223,8 @@ wing_iocp_output_stream_write_async (GOutputStream       *stream,
       return;
     }
 
-  if (priv->handle == INVALID_HANDLE_VALUE)
+  handle = wing_thread_pool_get_handle (priv->thread_pool_io);
+  if (handle == INVALID_HANDLE_VALUE)
     {
       g_task_return_new_error (task, G_IO_ERROR,
                                G_IO_ERROR_CLOSED,
@@ -248,9 +243,9 @@ wing_iocp_output_stream_write_async (GOutputStream       *stream,
                                                         G_CALLBACK (on_cancellable_cancelled),
                                                         wing_stream, NULL);
 
-  StartThreadpoolIo (priv->threadpool_io);
+  wing_thread_pool_io_start (priv->thread_pool_io);
 
-  WriteFile (priv->handle, buffer, (DWORD) count, NULL, (OVERLAPPED *) overlapped);
+  WriteFile (handle, buffer, (DWORD) count, NULL, (OVERLAPPED *) overlapped);
   errsv = GetLastError ();
   if (errsv != NO_ERROR && errsv != ERROR_IO_PENDING)
     {
@@ -262,7 +257,7 @@ wing_iocp_output_stream_write_async (GOutputStream       *stream,
                                emsg);
       g_free (emsg);
 
-      CancelThreadpoolIo (priv->threadpool_io);
+      wing_thread_pool_io_cancel (priv->thread_pool_io);
 
       if (g_task_get_cancellable (task) != NULL)
         g_cancellable_disconnect (g_task_get_cancellable (task),
@@ -285,6 +280,7 @@ wing_iocp_output_stream_write (GOutputStream  *stream,
   DWORD nbytes, nwritten;
   OVERLAPPED overlap = { 0, };
   gssize retval = -1;
+  HANDLE handle;
 
   wing_stream = WING_IOCP_OUTPUT_STREAM (stream);
   priv = wing_iocp_output_stream_get_instance_private (wing_stream);
@@ -300,6 +296,8 @@ wing_iocp_output_stream_write (GOutputStream  *stream,
   overlap.hEvent = CreateEvent (NULL, FALSE, FALSE, NULL);
   g_return_val_if_fail (overlap.hEvent != NULL, -1);
 
+  handle = wing_thread_pool_get_handle (priv->thread_pool_io);
+
   /* This prevents the I/O completion port to be notified.
    * It is described in the documentation of the GetQueuedCompletionStatus
    * function, in the section related to the lpOverlapped parameter
@@ -310,7 +308,7 @@ wing_iocp_output_stream_write (GOutputStream  *stream,
   overlap.hEvent = (HANDLE) ((gint) overlap.hEvent | 0x1);
 #endif
 
-  res = WriteFile (priv->handle, buffer, nbytes, &nwritten, &overlap);
+  res = WriteFile (handle, buffer, nbytes, &nwritten, &overlap);
   if (res)
     retval = nwritten;
   else
@@ -318,7 +316,7 @@ wing_iocp_output_stream_write (GOutputStream  *stream,
       int errsv = GetLastError ();
 
       if (errsv == ERROR_IO_PENDING &&
-          wing_overlap_wait_result (priv->handle,
+          wing_overlap_wait_result (handle,
                                     &overlap, &nwritten,
                                     cancellable))
         {
@@ -360,6 +358,7 @@ wing_iocp_output_stream_class_init (WingIocpOutputStreamClass *klass)
   GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
   GOutputStreamClass *stream_class = G_OUTPUT_STREAM_CLASS (klass);
 
+  gobject_class->finalize = wing_iocp_output_stream_finalize;
   gobject_class->get_property = wing_iocp_output_stream_get_property;
   gobject_class->set_property = wing_iocp_output_stream_set_property;
 
@@ -367,19 +366,6 @@ wing_iocp_output_stream_class_init (WingIocpOutputStreamClass *klass)
   stream_class->write_async = wing_iocp_output_stream_write_async;
   stream_class->write_fn = wing_iocp_output_stream_write;
 
-   /**
-   * WingIocpOutputStream:handle:
-   *
-   * The file handle that the stream writes to.
-   */
-  props[PROP_HANDLE] =
-    g_param_spec_pointer ("handle",
-                          "File handle",
-                          "The file handle to write to",
-                          G_PARAM_READWRITE |
-                          G_PARAM_CONSTRUCT_ONLY |
-                          G_PARAM_STATIC_STRINGS);
-
   /**
    * WingIocpOutputStream:close-handle:
    *
@@ -400,12 +386,13 @@ wing_iocp_output_stream_class_init (WingIocpOutputStreamClass *klass)
    * perform async I/O with completion ports.
    */
   props[PROP_THREADPOOL_IO] =
-    g_param_spec_pointer ("threadpool-io",
-                          "Threadpool I/O object",
-                          "The threadpool I/O object, returned by CreateThreadpoolIo, used to perform async 
I / O with completion ports",
-                          G_PARAM_READABLE |
-                          G_PARAM_WRITABLE |
-                          G_PARAM_STATIC_STRINGS);
+    g_param_spec_boxed ("threadpool-io",
+                        "Threadpool I/O object",
+                        "The threadpool I/O object used to perform async I / O with completion ports",
+                        WING_TYPE_THREAD_POOL_IO,
+                        G_PARAM_READWRITE |
+                        G_PARAM_CONSTRUCT_ONLY |
+                        G_PARAM_STATIC_STRINGS);
 
   g_object_class_install_properties (gobject_class, LAST_PROP, props);
 }
@@ -437,15 +424,12 @@ wing_iocp_output_stream_init (WingIocpOutputStream *wing_stream)
  * Returns: a new #GOutputStream
 **/
 GOutputStream *
-wing_iocp_output_stream_new (void     *handle,
-                             gboolean  close_handle,
-                             PTP_IO    threadpool_io)
+wing_iocp_output_stream_new (gboolean          close_handle,
+                             WingThreadPoolIo *threadpool_io)
 {
-    g_return_val_if_fail (handle != NULL, NULL);
     g_return_val_if_fail (threadpool_io != NULL, NULL);
 
   return g_object_new (WING_TYPE_IOCP_OUTPUT_STREAM,
-                       "handle", handle,
                        "close-handle", close_handle,
                        "threadpool-io", threadpool_io,
                        NULL);
@@ -515,5 +499,5 @@ wing_iocp_output_stream_get_handle (WingIocpOutputStream *stream)
 
   priv = wing_iocp_output_stream_get_instance_private (stream);
 
-  return priv->handle;
+  return wing_thread_pool_get_handle (priv->thread_pool_io);
 }
diff --git a/wing/wingiocpoutputstream.h b/wing/wingiocpoutputstream.h
index d4af7f3..d47113e 100644
--- a/wing/wingiocpoutputstream.h
+++ b/wing/wingiocpoutputstream.h
@@ -26,7 +26,7 @@
 
 #include <gio/gio.h>
 #include <wing/wingversionmacros.h>
-#include <windows.h>
+#include <wing/wingthreadpoolio.h>
 
 G_BEGIN_DECLS
 
@@ -48,9 +48,8 @@ struct _WingIocpOutputStreamClass
 };
 
 WING_AVAILABLE_IN_ALL
-GOutputStream  *wing_iocp_output_stream_new              (void                 *handle,
-                                                          gboolean              close_handle,
-                                                          PTP_IO                threadpool_io);
+GOutputStream  *wing_iocp_output_stream_new              (gboolean              close_handle,
+                                                          WingThreadPoolIo     *thread_pool_io);
 
 WING_AVAILABLE_IN_ALL
 void            wing_iocp_output_stream_set_close_handle (WingIocpOutputStream *stream,
diff --git a/wing/wingnamedpipeconnection.c b/wing/wingnamedpipeconnection.c
index f8dc2d4..1824392 100644
--- a/wing/wingnamedpipeconnection.c
+++ b/wing/wingnamedpipeconnection.c
@@ -21,6 +21,7 @@
 #include "wingnamedpipeconnection.h"
 #include "winginputstream.h"
 #include "wingoutputstream.h"
+#include "wingthreadpoolio.h"
 #include "wingiocpinputstream.h"
 #include "wingiocpoutputstream.h"
 #include "wingutils.h"
@@ -56,7 +57,7 @@ struct _WingNamedPipeConnection
   GOutputStream *output_stream;
 
   gboolean use_iocp;
-  PTP_IO threadpool_io;
+  WingThreadPoolIo *thread_pool_io;
 };
 
 struct _WingNamedPipeConnectionClass
@@ -80,25 +81,6 @@ static GParamSpec *props[LAST_PROP];
 
 G_DEFINE_TYPE (WingNamedPipeConnection, wing_named_pipe_connection, G_TYPE_IO_STREAM)
 
-static void CALLBACK
-threadpool_io_completion (PTP_CALLBACK_INSTANCE instance,
-                          PVOID                 ctxt,
-                          PVOID                 overlapped,
-                          ULONG                 result,
-                          ULONG_PTR             number_of_bytes_transferred,
-                          PTP_IO                threadpool_io)
-{
-  WingOverlappedData *overlapped_data = (WingOverlappedData *) overlapped;
-
-  overlapped_data->callback (instance,
-                             ctxt,
-                             overlapped,
-                             result,
-                             number_of_bytes_transferred,
-                             threadpool_io,
-                             overlapped_data->user_data);
-}
-
 static void
 wing_named_pipe_connection_finalize (GObject *object)
 {
@@ -112,17 +94,20 @@ wing_named_pipe_connection_finalize (GObject *object)
   if (connection->output_stream)
     g_object_unref (connection->output_stream);
 
-  if (connection->close_handle && connection->handle != INVALID_HANDLE_VALUE)
+  if (connection->close_handle)
     {
-      DisconnectNamedPipe (connection->handle);
-      CloseHandle (connection->handle);
+      if (connection->thread_pool_io != NULL)
+        {
+          wing_thread_pool_io_close_handle(connection->thread_pool_io, NULL);
+        }
+      else if (connection->handle != INVALID_HANDLE_VALUE)
+        {
+          DisconnectNamedPipe (connection->handle);
+          CloseHandle (connection->handle);
+        }
     }
 
-  if (connection->threadpool_io != NULL)
-    {
-      WaitForThreadpoolIoCallbacks (connection->threadpool_io, FALSE);
-      CloseThreadpoolIo (connection->threadpool_io);
-    }
+  g_clear_pointer (&connection->thread_pool_io, wing_thread_pool_io_unref);
 
   G_OBJECT_CLASS (wing_named_pipe_connection_parent_class)->finalize (object);
 }
@@ -199,27 +184,24 @@ wing_named_pipe_connection_constructed (GObject *object)
   if (connection->handle != NULL &&
       connection->handle != INVALID_HANDLE_VALUE)
     {
-      if (!connection->use_iocp)
-        {
-          connection->input_stream = wing_input_stream_new (connection->handle, FALSE);
-          connection->output_stream = wing_output_stream_new (connection->handle, FALSE);
-        }
-      else
+      if (connection->use_iocp)
         {
-          connection->threadpool_io = CreateThreadpoolIo (connection->handle, threadpool_io_completion, 
NULL, NULL);
-          if (connection->threadpool_io == NULL)
+          connection->thread_pool_io = wing_thread_pool_io_new (connection->handle);
+          if (connection->thread_pool_io != NULL)
             {
-              gchar *emsg;
-
-              emsg = g_win32_error_message (GetLastError ());
-              g_warning ("Failed to create thread pool IO: %s", emsg);
-              g_free (emsg);
-
-              g_assert_not_reached ();
+              connection->input_stream = wing_iocp_input_stream_new (FALSE, connection->thread_pool_io);
+              connection->output_stream = wing_iocp_output_stream_new (FALSE, connection->thread_pool_io);
             }
+          else
+           {
+             g_info ("Failed to create thread pool io, falling back to not iocp version");
+           }
+        }
 
-          connection->input_stream = wing_iocp_input_stream_new (connection->handle, FALSE, 
connection->threadpool_io);
-          connection->output_stream = wing_iocp_output_stream_new (connection->handle, FALSE, 
connection->threadpool_io);
+      if (!connection->thread_pool_io)
+        {
+          connection->input_stream = wing_input_stream_new (connection->handle, FALSE);
+          connection->output_stream = wing_output_stream_new (connection->handle, FALSE);
         }
     }
 
@@ -255,19 +237,22 @@ wing_named_pipe_connection_close (GIOStream     *stream,
   if (connection->input_stream)
     g_input_stream_close (connection->input_stream, cancellable, NULL);
 
-  if (connection->close_handle && connection->handle != INVALID_HANDLE_VALUE)
+  if (connection->close_handle)
     {
-      DisconnectNamedPipe (connection->handle);
-      CloseHandle (connection->handle);
+      if (connection->thread_pool_io != NULL)
+        {
+          wing_thread_pool_io_close_handle(connection->thread_pool_io, NULL);
+        }
+      else if (connection->handle != INVALID_HANDLE_VALUE)
+        {
+          DisconnectNamedPipe (connection->handle);
+          CloseHandle (connection->handle);
+        }
+
       connection->handle = INVALID_HANDLE_VALUE;
     }
 
-  if (connection->threadpool_io != NULL)
-    {
-      WaitForThreadpoolIoCallbacks (connection->threadpool_io, FALSE);
-      CloseThreadpoolIo (connection->threadpool_io);
-      connection->threadpool_io = NULL;
-    }
+  g_clear_pointer (&connection->thread_pool_io, wing_thread_pool_io_unref);
 
   return TRUE;
 }
diff --git a/wing/wingthreadpoolio.c b/wing/wingthreadpoolio.c
new file mode 100644
index 0000000..a011dae
--- /dev/null
+++ b/wing/wingthreadpoolio.c
@@ -0,0 +1,181 @@
+/*
+ * Copyright © 2021 NICE s.r.l.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published
+ * by the Free Software Foundation; either version 2 of the licence or (at
+ * your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General
+ * Public License along with this library; if not, see <http://www.gnu.org/licenses/>.
+ *
+ * Authors: Ignacio Casal Quinteiro <ignacio casal nice-software com>
+ * Authors: Silvio Lazzeretti <silviola amazon com>
+ */
+
+
+#include "wingthreadpoolio.h"
+#include "wingutils.h"
+#include <windows.h>
+
+/**
+ * SECTION:wingthreadpoolio
+ * @short_description: A wrapper around a Windows thread pool IO.
+ *
+ * WingThreadPoolIo creates a ThreadpoolIO object attached to an handle.
+ */
+
+/**
+ * WingThreadPoolIo:
+ *
+ * A wrapper around a Windows thread pool IO.
+ */
+struct _WingThreadPoolIo
+{
+  volatile gint ref_count;
+
+  PTP_IO thread_pool_io;
+  HANDLE handle;
+};
+
+G_DEFINE_BOXED_TYPE(WingThreadPoolIo, wing_thread_pool_io, wing_thread_pool_io_ref, 
wing_thread_pool_io_unref)
+
+static void CALLBACK
+threadpool_io_completion (PTP_CALLBACK_INSTANCE instance,
+                          PVOID                 ctxt,
+                          PVOID                 overlapped,
+                          ULONG                 result,
+                          ULONG_PTR             number_of_bytes_transferred,
+                          PTP_IO                threadpool_io)
+{
+  WingOverlappedData *overlapped_data = (WingOverlappedData *) overlapped;
+
+  overlapped_data->callback (instance,
+                             ctxt,
+                             overlapped,
+                             result,
+                             number_of_bytes_transferred,
+                             threadpool_io,
+                             overlapped_data->user_data);
+}
+
+WingThreadPoolIo *
+wing_thread_pool_io_new (void *handle)
+{
+  WingThreadPoolIo *self;
+  PTP_IO thread_pool_io;
+
+  g_return_val_if_fail (handle != NULL && (HANDLE) handle != INVALID_HANDLE_VALUE, NULL);
+
+  thread_pool_io = CreateThreadpoolIo ((HANDLE) handle, threadpool_io_completion, NULL, NULL);
+  if (thread_pool_io == NULL)
+    {
+      gchar *emsg;
+
+      emsg = g_win32_error_message (GetLastError ());
+      g_warning ("Failed to create thread pool IO: %s", emsg);
+      g_free (emsg);
+
+      return NULL;
+    }
+
+  self = g_slice_new (WingThreadPoolIo);
+  self->ref_count = 1;
+
+  self->handle = (HANDLE) handle;
+  self->thread_pool_io = thread_pool_io;
+
+  return self;
+}
+
+WingThreadPoolIo *
+wing_thread_pool_io_ref (WingThreadPoolIo *self)
+{
+  g_return_val_if_fail (self != NULL, NULL);
+
+  g_atomic_int_inc (&self->ref_count);
+
+  return self;
+}
+
+static gboolean
+destroy_thread_pool_io_idle (gpointer user_data)
+{
+  WingThreadPoolIo *self = (WingThreadPoolIo *) user_data;
+
+  WaitForThreadpoolIoCallbacks (self->thread_pool_io, FALSE);
+  CloseThreadpoolIo (self->thread_pool_io);
+  g_slice_free (WingThreadPoolIo, self);
+
+  return G_SOURCE_REMOVE;
+}
+
+void
+wing_thread_pool_io_unref (WingThreadPoolIo *self)
+{
+  g_return_if_fail (self != NULL);
+
+  if (g_atomic_int_dec_and_test (&self->ref_count))
+    {
+      if (self->handle != INVALID_HANDLE_VALUE)
+        CloseHandle (self->handle);
+
+      g_idle_add (destroy_thread_pool_io_idle, (gpointer) self);
+    }
+}
+
+void
+wing_thread_pool_io_start (WingThreadPoolIo *self)
+{
+  g_return_if_fail (self != NULL);
+
+  StartThreadpoolIo (self->thread_pool_io);
+}
+
+void
+wing_thread_pool_io_cancel (WingThreadPoolIo *self)
+{
+  g_return_if_fail (self != NULL);
+
+  CancelThreadpoolIo (self->thread_pool_io);
+}
+
+void *
+wing_thread_pool_get_handle (WingThreadPoolIo *self)
+{
+  g_return_val_if_fail (self != NULL, (void *) INVALID_HANDLE_VALUE);
+
+  return (void *) self->handle;
+}
+
+gboolean
+wing_thread_pool_io_close_handle (WingThreadPoolIo  *self,
+                                  GError           **error)
+{
+  gboolean res;
+
+  g_return_val_if_fail (self != NULL, FALSE);
+
+  res = CloseHandle (self->handle);
+  if (!res && error != NULL)
+    {
+      int errsv = GetLastError ();
+      gchar *emsg = g_win32_error_message (errsv);
+
+      g_set_error (error, G_IO_ERROR,
+                   g_io_error_from_win32_error (errsv),
+                   "Error closing handle: %s",
+                   emsg);
+      g_free (emsg);
+      return FALSE;
+    }
+
+  self->handle = INVALID_HANDLE_VALUE;
+
+  return TRUE;
+}
\ No newline at end of file
diff --git a/wing/wingthreadpoolio.h b/wing/wingthreadpoolio.h
new file mode 100644
index 0000000..2e3cd52
--- /dev/null
+++ b/wing/wingthreadpoolio.h
@@ -0,0 +1,61 @@
+/*
+ * Copyright © 2021 NICE s.r.l.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published
+ * by the Free Software Foundation; either version 2 of the licence or (at
+ * your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General
+ * Public License along with this library; if not, see <http://www.gnu.org/licenses/>.
+ *
+ * Authors: Ignacio Casal Quinteiro <ignacio casal nice-software com>
+ * Authors: Silvio Lazzeretti <silviola amazon com>
+ */
+
+#ifndef WING_THREAD_POOL_IO_H
+#define WING_THREAD_POOL_IO_H
+
+#include <gio/gio.h>
+#include <wing/wingversionmacros.h>
+
+G_BEGIN_DECLS
+
+#define WING_TYPE_THREAD_POOL_IO                         (wing_thread_pool_io_get_type ())
+#define WING_THREAD_POOL_IO(obj)                         ((WingThreadPoolIo *)obj)
+
+typedef struct _WingThreadPoolIo WingThreadPoolIo;
+
+WING_AVAILABLE_IN_ALL
+GType                         wing_thread_pool_io_get_type                       (void) G_GNUC_CONST;
+
+WING_AVAILABLE_IN_ALL
+WingThreadPoolIo             *wing_thread_pool_io_new                            (void                     
*handle);
+
+WING_AVAILABLE_IN_ALL
+WingThreadPoolIo             *wing_thread_pool_io_ref                            (WingThreadPoolIo         
*self);
+
+WING_AVAILABLE_IN_ALL
+void                          wing_thread_pool_io_unref                          (WingThreadPoolIo         
*self);
+
+WING_AVAILABLE_IN_ALL
+void                          wing_thread_pool_io_start                          (WingThreadPoolIo         
*self);
+
+WING_AVAILABLE_IN_ALL
+void                          wing_thread_pool_io_cancel                         (WingThreadPoolIo         
*self);
+
+WING_AVAILABLE_IN_ALL
+void                         *wing_thread_pool_get_handle                        (WingThreadPoolIo         
*self);
+
+WING_AVAILABLE_IN_ALL
+gboolean                      wing_thread_pool_io_close_handle                   (WingThreadPoolIo         
*self,
+                                                                                  GError                  
**error);
+
+G_END_DECLS
+
+#endif /* WING_THREAD_POOL_IO_H */


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]