[wing] Fix cancellable on iocp input and output streams



commit fc7b6273ca820750172fa4037829ebe6481d21ee
Author: Davide Benotto <benotto amazon com>
Date:   Thu May 9 16:09:39 2019 +0200

    Fix cancellable on iocp input and output streams
    
    Cancellation of tasks was not handled, so that streams were kept open
    instead of closing.

 wing/wingiocpinputstream.c  | 28 ++++++++++++++++++++++++++++
 wing/wingiocpoutputstream.c | 28 ++++++++++++++++++++++++++++
 wing/wingutils.h            |  1 +
 3 files changed, 57 insertions(+)
---
diff --git a/wing/wingiocpinputstream.c b/wing/wingiocpinputstream.c
index 39bad80..217e6bf 100644
--- a/wing/wingiocpinputstream.c
+++ b/wing/wingiocpinputstream.c
@@ -163,6 +163,7 @@ threadpool_io_completion (PTP_CALLBACK_INSTANCE instance,
                           PTP_IO                threadpool_io,
                           gpointer              user_data)
 {
+  WingOverlappedData *overlapped_data = overlapped;
   GTask *task;
 
   task = G_TASK (user_data);
@@ -181,10 +182,26 @@ threadpool_io_completion (PTP_CALLBACK_INSTANCE instance,
       g_free (emsg);
     }
 
+  if (g_task_get_cancellable (task) != NULL)
+    g_cancellable_disconnect (g_task_get_cancellable (task),
+                              overlapped_data->cancellable_id);
   g_object_unref (task);
   g_slice_free (WingOverlappedData, overlapped);
 }
 
+static void
+on_cancellable_cancelled (GCancellable *cancellable,
+                          gpointer      user_data)
+{
+  WingIocpInputStream *wing_stream;
+  WingIocpInputStreamPrivate *priv;
+
+  wing_stream = WING_IOCP_INPUT_STREAM (user_data);
+  priv = wing_iocp_input_stream_get_instance_private (wing_stream);
+
+  CancelIo (priv->handle);
+}
+
 static void
 wing_iocp_input_stream_read_async (GInputStream        *stream,
                                    void                *buffer,
@@ -206,6 +223,9 @@ wing_iocp_input_stream_read_async (GInputStream        *stream,
   task = g_task_new (stream, cancellable, callback, user_data);
   g_task_set_priority (task, io_priority);
 
+  if (g_task_return_error_if_cancelled (task))
+    return;
+
   if (priv->handle == INVALID_HANDLE_VALUE)
     {
       g_task_return_new_error (task, G_IO_ERROR,
@@ -220,6 +240,11 @@ wing_iocp_input_stream_read_async (GInputStream        *stream,
   overlapped->user_data = task;
   overlapped->callback = threadpool_io_completion;
 
+  if (g_task_get_cancellable (task) != NULL)
+    overlapped->cancellable_id = g_cancellable_connect (g_task_get_cancellable (task),
+                                                        G_CALLBACK (on_cancellable_cancelled),
+                                                        wing_stream, NULL);
+
   StartThreadpoolIo (priv->threadpool_io);
 
   ReadFile (priv->handle, buffer, (DWORD) count, NULL, (OVERLAPPED *) overlapped);
@@ -236,6 +261,9 @@ wing_iocp_input_stream_read_async (GInputStream        *stream,
 
       CancelThreadpoolIo (priv->threadpool_io);
 
+      if (g_task_get_cancellable (task) != NULL)
+        g_cancellable_disconnect (g_task_get_cancellable (task),
+                                  overlapped->cancellable_id);
       g_object_unref (task);
       g_slice_free (WingOverlappedData, overlapped);
     }
diff --git a/wing/wingiocpoutputstream.c b/wing/wingiocpoutputstream.c
index 8ea63b8..51f8891 100644
--- a/wing/wingiocpoutputstream.c
+++ b/wing/wingiocpoutputstream.c
@@ -163,6 +163,7 @@ threadpool_io_completion (PTP_CALLBACK_INSTANCE instance,
                           PTP_IO                threadpool_io,
                           gpointer              user_data)
 {
+  WingOverlappedData *overlapped_data = overlapped;
   GTask *task;
 
   task = G_TASK (user_data);
@@ -181,10 +182,26 @@ threadpool_io_completion (PTP_CALLBACK_INSTANCE instance,
       g_free (emsg);
     }
 
+  if (g_task_get_cancellable (task) != NULL)
+    g_cancellable_disconnect (g_task_get_cancellable (task),
+                              overlapped_data->cancellable_id);
   g_object_unref (task);
   g_slice_free (WingOverlappedData, overlapped);
 }
 
+static void
+on_cancellable_cancelled (GCancellable *cancellable,
+                          gpointer      user_data)
+{
+  WingIocpOutputStream *wing_stream;
+  WingIocpOutputStreamPrivate *priv;
+
+  wing_stream = WING_IOCP_OUTPUT_STREAM (user_data);
+  priv = wing_iocp_output_stream_get_instance_private (wing_stream);
+
+  CancelIo (priv->handle);
+}
+
 static void
 wing_iocp_output_stream_write_async (GOutputStream       *stream,
                                      const void          *buffer,
@@ -206,6 +223,9 @@ wing_iocp_output_stream_write_async (GOutputStream       *stream,
   task = g_task_new (stream, cancellable, callback, user_data);
   g_task_set_priority (task, io_priority);
 
+  if (g_task_return_error_if_cancelled (task))
+    return;
+
   if (priv->handle == INVALID_HANDLE_VALUE)
     {
       g_task_return_new_error (task, G_IO_ERROR,
@@ -220,6 +240,11 @@ wing_iocp_output_stream_write_async (GOutputStream       *stream,
   overlapped->user_data = task;
   overlapped->callback = threadpool_io_completion;
 
+  if (g_task_get_cancellable (task) != NULL)
+    overlapped->cancellable_id = g_cancellable_connect (g_task_get_cancellable (task),
+                                                        G_CALLBACK (on_cancellable_cancelled),
+                                                        wing_stream, NULL);
+
   StartThreadpoolIo (priv->threadpool_io);
 
   WriteFile (priv->handle, buffer, (DWORD) count, NULL, (OVERLAPPED *) overlapped);
@@ -236,6 +261,9 @@ wing_iocp_output_stream_write_async (GOutputStream       *stream,
 
       CancelThreadpoolIo (priv->threadpool_io);
 
+      if (g_task_get_cancellable (task) != NULL)
+        g_cancellable_disconnect (g_task_get_cancellable (task),
+                                  overlapped->cancellable_id);
       g_object_unref (task);
       g_slice_free (WingOverlappedData, overlapped);
     }
diff --git a/wing/wingutils.h b/wing/wingutils.h
index 5f953e3..f928cc2 100644
--- a/wing/wingutils.h
+++ b/wing/wingutils.h
@@ -28,6 +28,7 @@ G_BEGIN_DECLS
 typedef struct {
   OVERLAPPED overlapped;
   gpointer user_data;
+  gulong cancellable_id;
   void (*callback) (PTP_CALLBACK_INSTANCE instance,
                     PVOID                 ctxt,
                     PVOID                 overlapped,


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]