[gegl] buffer: in the swap backend, compress queued tiles while stalling



commit 16d65b5d676229a87cc2662a770c73080d018d58
Author: Ell <ell_se yahoo com>
Date:   Sun Dec 30 14:18:04 2018 -0500

    buffer: in the swap backend, compress queued tiles while stalling
    
    When the write queue is full while storing a tile to the swap
    backend, walk the queue and compress any uncompressed tiles before
    blocking the current thread.  Previously, we would only compress
    the currently-stored tile in this situation.  This allows the
    storing thread to participate in tile compression, reducing the
    cost of the write queue, shortening stall time.

 gegl/buffer/gegl-tile-backend-swap.c | 227 +++++++++++++++++++++++------------
 1 file changed, 148 insertions(+), 79 deletions(-)
---
diff --git a/gegl/buffer/gegl-tile-backend-swap.c b/gegl/buffer/gegl-tile-backend-swap.c
index 3c8933d09..62cf3c97e 100644
--- a/gegl/buffer/gegl-tile-backend-swap.c
+++ b/gegl/buffer/gegl-tile-backend-swap.c
@@ -100,7 +100,8 @@ typedef struct
   gpointer    compressed;
   gint        size;
   gint        compressed_size;
-  ThreadOp    operation;
+  guint       operation   : 1;
+  guint       compressing : 1;
 } ThreadParams;
 
 typedef struct
@@ -126,15 +127,14 @@ static GeglTile   *gegl_tile_backend_swap_entry_read             (GeglTileBacken
 static void        gegl_tile_backend_swap_entry_write            (GeglTileBackendSwap      *self,
                                                                   SwapEntry                *entry,
                                                                   GeglTile                 *tile);
-static SwapBlock * gegl_tile_backend_swap_block_create           (GeglTileBackendSwap      *self);
+static SwapBlock * gegl_tile_backend_swap_block_create           (void);
 static void        gegl_tile_backend_swap_block_free             (SwapBlock                *block);
-static SwapBlock * gegl_tile_backend_swap_block_ref              (GeglTileBackendSwap      *self,
-                                                                  SwapBlock                *block);
-static void        gegl_tile_backend_swap_block_unref            (GeglTileBackendSwap      *self,
-                                                                  SwapBlock                *block,
+static SwapBlock * gegl_tile_backend_swap_block_ref              (SwapBlock                *block,
+                                                                  gint                      tile_size);
+static void        gegl_tile_backend_swap_block_unref            (SwapBlock                *block,
+                                                                  gint                      tile_size,
                                                                   gboolean                  lock);
-static gboolean    gegl_tile_backend_swap_block_is_unique        (GeglTileBackendSwap      *self,
-                                                                  SwapBlock                *block);
+static gboolean    gegl_tile_backend_swap_block_is_unique        (SwapBlock                *block);
 static SwapEntry * gegl_tile_backend_swap_entry_create           (GeglTileBackendSwap      *self,
                                                                   gint                      x,
                                                                   gint                      y,
@@ -226,8 +226,33 @@ static void
 gegl_tile_backend_swap_push_queue (ThreadParams *params,
                                    gboolean      head)
 {
+  busy = TRUE;
+
+  if (head)
+    g_queue_push_head (queue, params);
+  else
+    g_queue_push_tail (queue, params);
+
+  if (params->block)
+    {
+      if (head)
+        params->block->link = g_queue_peek_head_link (queue);
+      else
+        params->block->link = g_queue_peek_tail_link (queue);
+    }
+
+  /* wake up the writer thread */
+  g_cond_signal (&queue_cond);
+
   if (params->tile || params->compressed)
     {
+      if (params->tile)
+        queued_total += params->size;
+      else
+        queued_total += params->compressed_size;
+
+      queued_cost += params->compressed_size;
+
       if (params->tile)
         params->block->compression = compression;
 
@@ -235,65 +260,97 @@ gegl_tile_backend_swap_push_queue (ThreadParams *params,
         {
           queue_stalls++;
 
-          if (params->tile && params->block->compression)
+          while (params                        &&
+                 params->operation == OP_WRITE &&
+                 queued_cost > queued_max)
             {
-              gint     bpp = babl_format_get_bytes_per_pixel (params->format);
-              gpointer compressed;
-              gint     max_compressed_size;
-              gint     compressed_size;
-
-              g_mutex_unlock (&queue_mutex);
-
-              max_compressed_size = params->size * COMPRESSION_MAX_RATIO;
-              compressed          = g_malloc (max_compressed_size);
-
-              if (gegl_compression_compress (params->block->compression,
-                                             params->format,
-                                             gegl_tile_get_data (params->tile),
-                                             params->size / bpp,
-                                             compressed, &compressed_size,
-                                             max_compressed_size))
+              if (params->tile               &&
+                  params->block->compression &&
+                  ! params->compressing)
                 {
-                  gegl_tile_unref (params->tile);
-                  params->tile = NULL;
+                  SwapBlock             *block;
+                  GeglTile              *tile;
+                  const GeglCompression *compression;
+                  const Babl            *format;
+                  gint                   tile_size;
+                  gint                   bpp;
+                  gpointer               compressed;
+                  gint                   max_compressed_size;
+                  gint                   compressed_size;
+                  gboolean               success;
+
+                  block       = params->block;
+                  tile        = gegl_tile_ref (params->tile);
+                  compression = block->compression;
+                  format      = params->format;
+                  tile_size   = params->size;
+
+                  gegl_tile_backend_swap_block_ref (block, tile_size);
+
+                  params->compressing = TRUE;
+
+                  g_mutex_unlock (&queue_mutex);
+
+                  bpp = babl_format_get_bytes_per_pixel (format);
+
+                  max_compressed_size = tile_size * COMPRESSION_MAX_RATIO;
+                  compressed          = g_malloc (max_compressed_size);
+
+                  success = gegl_compression_compress (
+                    compression, format,
+                    gegl_tile_get_data (tile), tile_size / bpp,
+                    compressed, &compressed_size, max_compressed_size);
+
+                  g_mutex_lock (&queue_mutex);
+
+                  params = NULL;
 
-                  params->compressed      = compressed;
-                  params->compressed_size = compressed_size;
+                  if (block->link)
+                    {
+                      params = block->link->data;
+
+                      if (params->tile != tile)
+                        params = NULL;
+                    }
+
+                  if (success && params)
+                    {
+                      gegl_tile_backend_swap_free_data (params);
+
+                      params->compressed      = compressed;
+                      params->compressed_size = compressed_size;
+
+                      queued_total += params->compressed_size;
+                      queued_cost  += params->compressed_size;
+                    }
+                  else
+                    {
+                      if (params)
+                        params->block->compression = NULL;
+
+                      g_free (compressed);
+                    }
+
+                  gegl_tile_backend_swap_block_unref (block, tile_size, FALSE);
+
+                  gegl_tile_unref (tile);
                 }
-              else
+
+              if (params)
                 {
-                  params->block->compression = NULL;
+                  GList *link = g_list_previous (params->block->link);
 
-                  g_free (compressed);
+                  if (link)
+                    params = link->data;
+                  else
+                    params = NULL;
                 }
-
-              g_mutex_lock (&queue_mutex);
             }
 
           while (queued_cost > queued_max)
             g_cond_wait (&push_cond, &queue_mutex);
         }
-
-      if (params->tile)
-        queued_total += params->size;
-      else
-        queued_total += params->compressed_size;
-
-      queued_cost  += params->compressed_size;
     }
-
-  busy = TRUE;
-
-  if (head)
-    g_queue_push_head (queue, params);
-  else
-    g_queue_push_tail (queue, params);
-
-  if (params->block)
-    params->block->link = g_queue_peek_tail_link (queue);
-
-  /* wake up the writer thread */
-  g_cond_signal (&queue_cond);
 }
 
 static void
@@ -840,6 +897,7 @@ gegl_tile_backend_swap_entry_write (GeglTileBackendSwap *self,
 
           params->tile            = gegl_tile_dup (tile);
           params->compressed_size = cost;
+          params->compressing     = FALSE;
 
           queued_total += size;
           queued_cost  += cost;
@@ -866,6 +924,7 @@ gegl_tile_backend_swap_entry_write (GeglTileBackendSwap *self,
   params->compressed      = NULL;
   params->size            = size;
   params->compressed_size = cost;
+  params->compressing     = FALSE;
 
   gegl_tile_backend_swap_push_queue (params, /* head = */ FALSE);
 
@@ -875,7 +934,7 @@ gegl_tile_backend_swap_entry_write (GeglTileBackendSwap *self,
 }
 
 static SwapBlock *
-gegl_tile_backend_swap_block_create (GeglTileBackendSwap *self)
+gegl_tile_backend_swap_block_create (void)
 {
   SwapBlock *block = g_slice_new (SwapBlock);
 
@@ -895,22 +954,20 @@ gegl_tile_backend_swap_block_free (SwapBlock *block)
 }
 
 static SwapBlock *
-gegl_tile_backend_swap_block_ref (GeglTileBackendSwap *self,
-                                  SwapBlock           *block)
+gegl_tile_backend_swap_block_ref (SwapBlock *block,
+                                  gint       tile_size)
 {
   g_atomic_int_inc (&block->ref_count);
 
-  g_atomic_pointer_add (
-    &total_uncompressed,
-    +gegl_tile_backend_get_tile_size (GEGL_TILE_BACKEND (self)));
+  g_atomic_pointer_add (&total_uncompressed, +tile_size);
 
   return block;
 }
 
 static void
-gegl_tile_backend_swap_block_unref (GeglTileBackendSwap *self,
-                                    SwapBlock           *block,
-                                    gboolean             lock)
+gegl_tile_backend_swap_block_unref (SwapBlock *block,
+                                    gint       tile_size,
+                                    gboolean   lock)
 {
   if (g_atomic_int_dec_and_test (&block->ref_count))
     {
@@ -940,8 +997,7 @@ gegl_tile_backend_swap_block_unref (GeglTileBackendSwap *self,
           params            = g_slice_new0 (ThreadParams);
           params->operation = OP_DESTROY;
           params->block     = block;
-          params->size      = gegl_tile_backend_get_tile_size (
-                                GEGL_TILE_BACKEND (self));
+          params->size      = tile_size;
 
           /* push the destroy op at the top of the queue, so that it gets
            * served before any write ops, which are then free to reuse the
@@ -955,15 +1011,12 @@ gegl_tile_backend_swap_block_unref (GeglTileBackendSwap *self,
     }
   else
     {
-      g_atomic_pointer_add (
-        &total_uncompressed,
-        -gegl_tile_backend_get_tile_size (GEGL_TILE_BACKEND (self)));
+      g_atomic_pointer_add (&total_uncompressed, -tile_size);
     }
 }
 
 static gboolean
-gegl_tile_backend_swap_block_is_unique (GeglTileBackendSwap *self,
-                                        SwapBlock           *block)
+gegl_tile_backend_swap_block_is_unique (SwapBlock *block)
 {
   return g_atomic_int_get (&block->ref_count) == 1;
 }
@@ -978,9 +1031,15 @@ gegl_tile_backend_swap_entry_create (GeglTileBackendSwap *self,
   SwapEntry *entry = g_slice_new (SwapEntry);
 
   if (block)
-    gegl_tile_backend_swap_block_ref (self, block);
+    {
+      gegl_tile_backend_swap_block_ref (
+        block,
+        gegl_tile_backend_get_tile_size (GEGL_TILE_BACKEND (self)));
+    }
   else
-    block = gegl_tile_backend_swap_block_create (self);
+    {
+      block = gegl_tile_backend_swap_block_create ();
+    }
 
   entry->x     = x;
   entry->y     = y;
@@ -995,7 +1054,10 @@ gegl_tile_backend_swap_entry_destroy (GeglTileBackendSwap *self,
                                       SwapEntry           *entry,
                                       gboolean             lock)
 {
-  gegl_tile_backend_swap_block_unref (self, entry->block, lock);
+  gegl_tile_backend_swap_block_unref (
+    entry->block,
+    gegl_tile_backend_get_tile_size (GEGL_TILE_BACKEND (self)),
+    lock);
 
   g_slice_free (SwapEntry, entry);
 }
@@ -1057,10 +1119,13 @@ gegl_tile_backend_swap_set_tile (GeglTileSource *self,
 
   if (entry)
     {
-      if (! gegl_tile_backend_swap_block_is_unique (swap, entry->block))
+      if (! gegl_tile_backend_swap_block_is_unique (entry->block))
         {
-          gegl_tile_backend_swap_block_unref (swap, entry->block, TRUE);
-          entry->block = gegl_tile_backend_swap_block_create (swap);
+          gegl_tile_backend_swap_block_unref (
+            entry->block,
+            gegl_tile_backend_get_tile_size (GEGL_TILE_BACKEND (swap)),
+            TRUE);
+          entry->block = gegl_tile_backend_swap_block_create ();
         }
     }
   else
@@ -1156,9 +1221,13 @@ gegl_tile_backend_swap_copy_tile (GeglTileSource           *self,
         }
       else
         {
-          gegl_tile_backend_swap_block_unref (dst_swap, dst_entry->block, TRUE);
-          dst_entry->block = gegl_tile_backend_swap_block_ref (dst_swap,
-                                                               entry->block);
+          gegl_tile_backend_swap_block_unref (
+            dst_entry->block,
+            gegl_tile_backend_get_tile_size (GEGL_TILE_BACKEND (dst_swap)),
+            TRUE);
+          dst_entry->block = gegl_tile_backend_swap_block_ref (
+            entry->block,
+            gegl_tile_backend_get_tile_size (GEGL_TILE_BACKEND (dst_swap)));
         }
     }
   else


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]