[gtk+/wip/otte/clipboard: 8/11] x11: Use async queue and implement sync reads on selections stream



commit e186c4088bd6bc7c72454a896605fc52179b7a5f
Author: Benjamin Otte <otte redhat com>
Date:   Mon Nov 20 21:29:32 2017 +0100

    x11: Use async queue and implement sync reads on selections stream
    
    Turns out, way too many async operations are implemented by running the
    sync operation in a thread. The easiest solution is to support that is
    to use a GAsyncQueue for the buffers and deadlock if called from the
    main thread.

 gdk/x11/gdkselectioninputstream-x11.c |   51 ++++++++++++++++++++++----------
 1 files changed, 35 insertions(+), 16 deletions(-)
---
diff --git a/gdk/x11/gdkselectioninputstream-x11.c b/gdk/x11/gdkselectioninputstream-x11.c
index 97ceedd..187b00e 100644
--- a/gdk/x11/gdkselectioninputstream-x11.c
+++ b/gdk/x11/gdkselectioninputstream-x11.c
@@ -33,7 +33,7 @@ typedef struct GdkX11SelectionInputStreamPrivate  GdkX11SelectionInputStreamPriv
 
 struct GdkX11SelectionInputStreamPrivate {
   GdkDisplay *display;
-  GQueue chunks;
+  GAsyncQueue *chunks;
   char *selection;
   Atom xselection;
   char *target;
@@ -61,29 +61,41 @@ gdk_x11_selection_input_stream_has_data (GdkX11SelectionInputStream *stream)
 {
   GdkX11SelectionInputStreamPrivate *priv = gdk_x11_selection_input_stream_get_instance_private (stream);
 
-  return !g_queue_is_empty (&priv->chunks) || priv->complete;
+  return g_async_queue_length (priv->chunks) > 0 || priv->complete;
 }
 
-static gssize
+/* NB: blocks when no data is in buffer */
+static gsize
 gdk_x11_selection_input_stream_fill_buffer (GdkX11SelectionInputStream *stream,
                                             guchar                     *buffer,
                                             gsize                       count)
 {
   GdkX11SelectionInputStreamPrivate *priv = gdk_x11_selection_input_stream_get_instance_private (stream);
-  gssize result = 0;
+  GBytes *bytes;
+  
+  gsize result = 0;
 
-  while (!g_queue_is_empty (&priv->chunks) && count > 0)
+  g_async_queue_lock (priv->chunks);
+
+  for (bytes = g_async_queue_pop_unlocked (priv->chunks);
+       bytes != NULL && count > 0;
+       bytes = g_async_queue_try_pop_unlocked (priv->chunks))
   {
-    GBytes *bytes = g_queue_pop_head (&priv->chunks);
     gsize size = g_bytes_get_size (bytes);
 
-    if (g_bytes_get_size (bytes) > count)
+    if (size == 0)
+      {
+        /* EOF marker, put it back */
+        g_async_queue_push_front_unlocked (priv->chunks, bytes);
+        break;
+      }
+    else if (size > count)
       {
         GBytes *subbytes;
         if (buffer)
           memcpy (buffer, g_bytes_get_data (bytes, NULL), count);
         subbytes = g_bytes_new_from_bytes (bytes, count, size - count);
-        g_queue_push_head (&priv->chunks, subbytes);
+        g_async_queue_push_front_unlocked (priv->chunks, subbytes);
         size = count;
       }
     else
@@ -99,6 +111,11 @@ gdk_x11_selection_input_stream_fill_buffer (GdkX11SelectionInputStream *stream,
     count -= size;
   }
 
+  if (bytes)
+    g_async_queue_push_front_unlocked (priv->chunks, bytes);
+
+  g_async_queue_unlock (priv->chunks);
+
   return result;
 }
 
@@ -132,6 +149,7 @@ gdk_x11_selection_input_stream_complete (GdkX11SelectionInputStream *stream)
 
   priv->complete = TRUE;
 
+  g_async_queue_push (priv->chunks, g_bytes_new (NULL, 0));
   gdk_x11_selection_input_stream_flush (stream);
 
   GDK_X11_DISPLAY (priv->display)->input_streams = g_slist_remove (GDK_X11_DISPLAY 
(priv->display)->input_streams, stream);
@@ -141,16 +159,15 @@ gdk_x11_selection_input_stream_complete (GdkX11SelectionInputStream *stream)
 }
 
 static gssize
-gdk_x11_selection_input_stream_read (GInputStream  *stream,
+gdk_x11_selection_input_stream_read (GInputStream  *input_stream,
                                      void          *buffer,
                                      gsize          count,
                                      GCancellable  *cancellable,
                                      GError       **error)
 {
-  g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
-                       _("X11 Selections cannot be read synchronously"));
+  GdkX11SelectionInputStream *stream = GDK_X11_SELECTION_INPUT_STREAM (input_stream);
 
-  return -1;
+  return gdk_x11_selection_input_stream_fill_buffer (stream, buffer, count);
 }
 
 static gboolean
@@ -234,8 +251,7 @@ gdk_x11_selection_input_stream_finalize (GObject *object)
 
   gdk_x11_selection_input_stream_complete (stream);
 
-  g_queue_foreach (&priv->chunks, (GFunc) g_bytes_unref, NULL);
-  g_queue_clear (&priv->chunks);
+  g_async_queue_unref (priv->chunks);
 
   g_free (priv->selection);
   g_free (priv->target);
@@ -264,6 +280,9 @@ gdk_x11_selection_input_stream_class_init (GdkX11SelectionInputStreamClass *klas
 static void
 gdk_x11_selection_input_stream_init (GdkX11SelectionInputStream *stream)
 {
+  GdkX11SelectionInputStreamPrivate *priv = gdk_x11_selection_input_stream_get_instance_private (stream);
+
+  priv->chunks = g_async_queue_new_full ((GDestroyNotify) g_bytes_unref);
 }
 
 static void
@@ -375,7 +394,7 @@ gdk_x11_selection_input_stream_filter_event (GdkXEvent *xev,
         }
       else
         {
-          g_queue_push_tail (&priv->chunks, bytes);
+          g_async_queue_push (priv->chunks, bytes);
           gdk_x11_selection_input_stream_flush (stream);
         }
 
@@ -413,7 +432,7 @@ gdk_x11_selection_input_stream_filter_event (GdkXEvent *xev,
               }
             else
               {
-                g_queue_push_tail (&priv->chunks, bytes);
+                g_async_queue_push (priv->chunks, bytes);
 
                 gdk_x11_selection_input_stream_complete (stream);
               }


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]