[gegl] buffer, gegl-config: compress tile data stored in the swap



commit 9a7738da8ab7e5fb9138b07c7ca2859a18a68fc5
Author: Ell <ell_se yahoo com>
Date:   Mon Dec 17 05:57:48 2018 -0500

    buffer, gegl-config: compress tile data stored in the swap
    
    Use gegl-compression to compress tile data stored in the swap.
    This can both reduce the swap size, and improve its speed, by
    minimizing I/O.  Compression is normally performed by the writer
    thread, however, if the swap queue is full, and a thread attempting
    to store a tile has to block, the thread attempting the store
    compresses the tile before blocking, to minimize the tile cost in
    the queue, and to distribute compression over multiple threads,
    improving tile-store speed.  Decompression is always performed by
    the thread fetching the tile.
    
    Add a "swap-compression" property to GeglConfig (and a
    corresponding --gegl-swap-compression command-line option, and
    GEGL_SWAP_COMPRESSION environment variable), which specifies the
    compression algorithm to be used by the swap.  This property is set
    to "fast" by default.  Setting it to "none" disables compression
    (in contrast to "nop", which doesn't technically *disables*
    compression, but rather uses a NOP compression).
    
    Note that the "swap-compression" property can be changed during
    runtime, which affects the compression used for future tile stores
    (but doesn't affect existing stored tiles).  This can be used, for
    example, to select a better algorithm when running out of swap
    space.

 gegl/buffer/gegl-buffer-config.c     |  18 ++
 gegl/buffer/gegl-buffer-config.h     |   1 +
 gegl/buffer/gegl-tile-backend-swap.c | 561 ++++++++++++++++++++++++-----------
 gegl/gegl-config.c                   |  18 ++
 gegl/gegl-config.h                   |   1 +
 gegl/gegl-init.c                     |  32 +-
 6 files changed, 459 insertions(+), 172 deletions(-)
---
diff --git a/gegl/buffer/gegl-buffer-config.c b/gegl/buffer/gegl-buffer-config.c
index b80eb2995..df6259d4a 100644
--- a/gegl/buffer/gegl-buffer-config.c
+++ b/gegl/buffer/gegl-buffer-config.c
@@ -36,6 +36,7 @@ enum
   PROP_0,
   PROP_TILE_CACHE_SIZE,
   PROP_SWAP,
+  PROP_SWAP_COMPRESSION,
   PROP_TILE_WIDTH,
   PROP_TILE_HEIGHT,
   PROP_QUEUE_SIZE,
@@ -67,6 +68,10 @@ gegl_buffer_config_get_property (GObject    *gobject,
         g_value_set_string (value, config->swap);
         break;
 
+      case PROP_SWAP_COMPRESSION:
+        g_value_set_string (value, config->swap_compression);
+        break;
+
       case PROP_QUEUE_SIZE:
         g_value_set_int (value, config->queue_size);
         break;
@@ -103,6 +108,10 @@ gegl_buffer_config_set_property (GObject      *gobject,
         g_free (config->swap);
         config->swap = g_value_dup_string (value);
         break;
+      case PROP_SWAP_COMPRESSION:
+        g_free (config->swap_compression);
+        config->swap_compression = g_value_dup_string (value);
+        break;
       default:
         G_OBJECT_WARN_INVALID_PROPERTY_ID (gobject, property_id, pspec);
         break;
@@ -115,6 +124,7 @@ gegl_buffer_config_finalize (GObject *gobject)
   GeglBufferConfig *config = GEGL_BUFFER_CONFIG (gobject);
 
   g_free (config->swap);
+  g_free (config->swap_compression);
 
   G_OBJECT_CLASS (gegl_buffer_config_parent_class)->finalize (gobject);
 }
@@ -163,6 +173,14 @@ gegl_buffer_config_class_init (GeglBufferConfigClass *klass)
                                                         G_PARAM_READWRITE |
                                                         G_PARAM_CONSTRUCT));
 
+  g_object_class_install_property (gobject_class, PROP_SWAP_COMPRESSION,
+                                   g_param_spec_string ("swap-compression",
+                                                        "Swap compression",
+                                                        "compression algorithm used for data stored in the 
swap",
+                                                        "fast",
+                                                        G_PARAM_READWRITE |
+                                                        G_PARAM_CONSTRUCT));
+
   g_object_class_install_property (gobject_class, PROP_QUEUE_SIZE,
                                    g_param_spec_int ("queue-size",
                                                      "Queue size",
diff --git a/gegl/buffer/gegl-buffer-config.h b/gegl/buffer/gegl-buffer-config.h
index 5e3855c00..89a3bd0cc 100644
--- a/gegl/buffer/gegl-buffer-config.h
+++ b/gegl/buffer/gegl-buffer-config.h
@@ -44,6 +44,7 @@ struct _GeglBufferConfig
   GObject  parent_instance;
 
   gchar   *swap;
+  gchar   *swap_compression;
   guint64  tile_cache_size;
   gint     tile_width;
   gint     tile_height;
diff --git a/gegl/buffer/gegl-tile-backend-swap.c b/gegl/buffer/gegl-tile-backend-swap.c
index f4485d3ba..fabee182b 100644
--- a/gegl/buffer/gegl-tile-backend-swap.c
+++ b/gegl/buffer/gegl-tile-backend-swap.c
@@ -35,6 +35,7 @@
 #include "gegl-buffer-backend.h"
 #include "gegl-buffer-private.h"
 #include "gegl-buffer-swap.h"
+#include "gegl-compression.h"
 #include "gegl-tile-backend.h"
 #include "gegl-tile-backend-swap.h"
 #include "gegl-debug.h"
@@ -57,6 +58,11 @@
  */
 #define QUEUED_MAX_RATIO 0.1
 
+/* maximal tile-data compression ratio, above which we use the uncompressed
+ * tile, to avoid decompression overhead.
+ */
+#define COMPRESSION_MAX_RATIO 0.95
+
 
 G_DEFINE_TYPE (GeglTileBackendSwap, gegl_tile_backend_swap, GEGL_TYPE_TILE_BACKEND)
 
@@ -71,9 +77,11 @@ typedef enum
 
 typedef struct
 {
-  gint    ref_count;
-  gint64  offset;
-  GList  *link;
+  gint                   ref_count;
+  gint                   size;
+  const GeglCompression *compression;
+  GList                 *link;
+  gint64                 offset;
 } SwapBlock;
 
 typedef struct
@@ -86,11 +94,13 @@ typedef struct
 
 typedef struct
 {
-  SwapBlock *block;
-  gint       length;
-  gint       cost;
-  GeglTile  *tile;
-  ThreadOp   operation;
+  SwapBlock  *block;
+  const Babl *format;
+  GeglTile   *tile;
+  gpointer    compressed;
+  gint        size;
+  gint        compressed_size;
+  ThreadOp    operation;
 } ThreadParams;
 
 typedef struct
@@ -105,7 +115,9 @@ static void        gegl_tile_backend_swap_push_queue             (ThreadParams
 static void        gegl_tile_backend_swap_resize                 (gint64                    size);
 static SwapGap *   gegl_tile_backend_swap_gap_new                (gint64                    start,
                                                                   gint64                    end);
-static gint64      gegl_tile_backend_swap_find_offset            (gint                      tile_size);
+static gint64      gegl_tile_backend_swap_find_offset            (gint                      block_size);
+static void        gegl_tile_backend_swap_free_block             (SwapBlock                *block);
+static void        gegl_tile_backend_swap_free_data              (ThreadParams             *params);
 static void        gegl_tile_backend_swap_write                  (ThreadParams             *params);
 static void        gegl_tile_backend_swap_destroy                (ThreadParams             *params);
 static gpointer    gegl_tile_backend_swap_writer_thread          (gpointer ignored);
@@ -178,29 +190,32 @@ static void        gegl_tile_backend_swap_init                   (GeglTileBacken
 void               gegl_tile_backend_swap_cleanup                (void);
 
 
-static gchar    *path         = NULL;
-static gint      in_fd        = -1;
-static gint      out_fd       = -1;
-static gint64    in_offset    = 0;
-static gint64    out_offset   = 0;
-static GList    *gap_list     = NULL;
-static gint64    file_size    = 0;
-static gint64    total        = 0;
-static guintptr  cloned_total = 0;
-static gboolean  busy         = FALSE;
-static gboolean  reading      = FALSE;
-static gint64    read_total   = 0;
-static gboolean  writing      = FALSE;
-static gint64    write_total  = 0;
-static gint64    queued_total = 0;
-static gint64    queued_cost  = 0;
-static gint64    queued_max   = 0;
-static gint      queue_stalls = 0;
-
-static GThread      *writer_thread = NULL;
-static GQueue       *queue         = NULL;
-static ThreadParams *in_progress   = NULL;
-static gboolean      exit_thread   = FALSE;
+static gchar                 *path               = NULL;
+static const GeglCompression *compression        = NULL;
+static gint                   in_fd              = -1;
+static gint                   out_fd             = -1;
+static gint64                 in_offset          = 0;
+static gint64                 out_offset         = 0;
+static GList                 *gap_list           = NULL;
+static gint64                 file_size          = 0;
+static gint64                 total              = 0;
+static guintptr               total_uncompressed = 0;
+static gboolean               busy               = FALSE;
+static gboolean               reading            = FALSE;
+static gint64                 read_total         = 0;
+static gboolean               writing            = FALSE;
+static gint64                 write_total        = 0;
+static gint64                 queued_total       = 0;
+static gint64                 queued_cost        = 0;
+static gint64                 queued_max         = 0;
+static gint                   queue_stalls       = 0;
+
+static GThread      *writer_thread           = NULL;
+static GQueue       *queue                   = NULL;
+static ThreadParams *in_progress             = NULL;
+static gboolean      exit_thread             = FALSE;
+static gpointer      compression_buffer      = NULL;
+static gint          compression_buffer_size = 0;
 static GMutex        read_mutex;
 static GMutex        queue_mutex;
 static GCond         queue_cond;
@@ -211,18 +226,59 @@ static void
 gegl_tile_backend_swap_push_queue (ThreadParams *params,
                                    gboolean      head)
 {
-  if (params->tile)
+  if (params->tile || params->compressed)
     {
+      if (params->tile)
+        params->block->compression = compression;
+
       if (queued_cost > queued_max)
         {
           queue_stalls++;
 
+          if (params->tile && compression)
+            {
+              gint     bpp = babl_format_get_bytes_per_pixel (params->format);
+              gpointer compressed;
+              gint     max_compressed_size;
+              gint     compressed_size;
+
+              g_mutex_unlock (&queue_mutex);
+
+              max_compressed_size = params->size * COMPRESSION_MAX_RATIO;
+              compressed          = g_malloc (max_compressed_size);
+
+              if (gegl_compression_compress (compression, params->format,
+                                             gegl_tile_get_data (params->tile),
+                                             params->size / bpp,
+                                             compressed, &compressed_size,
+                                             max_compressed_size))
+                {
+                  gegl_tile_unref (params->tile);
+                  params->tile = NULL;
+
+                  params->compressed      = compressed;
+                  params->compressed_size = compressed_size;
+                }
+              else
+                {
+                  params->block->compression = NULL;
+
+                  g_free (compressed);
+                }
+
+              g_mutex_lock (&queue_mutex);
+            }
+
           while (queued_cost > queued_max)
             g_cond_wait (&push_cond, &queue_mutex);
         }
 
-      queued_total += params->length;
-      queued_cost  += params->cost;
+      if (params->tile)
+        queued_total += params->size;
+      else
+        queued_total += params->compressed_size;
+
+      queued_cost  += params->compressed_size;
     }
 
   busy = TRUE;
@@ -266,12 +322,12 @@ gegl_tile_backend_swap_gap_new (gint64 start,
 }
 
 static gint64
-gegl_tile_backend_swap_find_offset (gint tile_size)
+gegl_tile_backend_swap_find_offset (gint block_size)
 {
   SwapGap *gap;
   gint64   offset;
 
-  total += tile_size;
+  total += block_size;
 
   if (gap_list)
     {
@@ -284,14 +340,14 @@ gegl_tile_backend_swap_find_offset (gint tile_size)
           gap    = link->data;
           length = gap->end - gap->start;
 
-          if (length > tile_size)
+          if (length > block_size)
             {
               offset = gap->start;
-              gap->start += tile_size;
+              gap->start += block_size;
 
               return offset;
             }
-          else if (length == tile_size)
+          else if (length == block_size)
             {
               offset = gap->start;
               g_slice_free (SwapGap, gap);
@@ -306,83 +362,29 @@ gegl_tile_backend_swap_find_offset (gint tile_size)
 
   offset = file_size;
 
-  gegl_tile_backend_swap_resize (file_size + 32 * tile_size);
+  gegl_tile_backend_swap_resize (file_size + 32 * block_size);
 
-  gap = gegl_tile_backend_swap_gap_new (offset + tile_size, file_size);
+  gap = gegl_tile_backend_swap_gap_new (offset + block_size, file_size);
   gap_list = g_list_append (gap_list, gap);
 
   return offset;
 }
 
 static void
-gegl_tile_backend_swap_write (ThreadParams *params)
-{
-  gint   to_be_written = params->length;
-  gint64 offset        = params->block->offset;
-
-  gegl_tile_backend_swap_ensure_exist ();
-
-  if (offset < 0)
-    {
-      /* storage for entry not allocated yet.  allocate now. */
-      offset = gegl_tile_backend_swap_find_offset (to_be_written);
-
-      params->block->offset = offset;
-    }
-
-  if (out_offset != offset)
-    {
-      if (lseek (out_fd, offset, SEEK_SET) < 0)
-        {
-          g_warning ("unable to seek to tile in buffer: %s", g_strerror (errno));
-          return;
-        }
-      out_offset = offset;
-    }
-
-  writing = TRUE;
-
-  while (to_be_written > 0)
-    {
-      gint wrote;
-      wrote = write (out_fd,
-                     gegl_tile_get_data (params->tile) + params->length
-                     - to_be_written,
-                     to_be_written);
-      if (wrote <= 0)
-        {
-          g_message ("unable to write tile data to self: "
-                     "%s (%d/%d bytes written)",
-                     g_strerror (errno), wrote, to_be_written);
-          break;
-        }
-
-      to_be_written -= wrote;
-      out_offset    += wrote;
-
-      write_total   += wrote;
-    }
-
-  writing = FALSE;
-
-  GEGL_NOTE (GEGL_DEBUG_TILE_BACKEND, "writer thread wrote at %i", (gint)offset);
-}
-
-static void
-gegl_tile_backend_swap_destroy (ThreadParams *params)
+gegl_tile_backend_swap_free_block (SwapBlock *block)
 {
   gint64  start, end;
   GList  *hlink;
 
-  start = params->block->offset;
-  end   = start + params->length;
-
-  gegl_tile_backend_swap_block_free (params->block);
-
   /* storage for entry not allocated yet.  nothing more to do. */
-  if (start < 0)
+  if (block->offset < 0)
     return;
 
+  start = block->offset;
+  end   = start + block->size;
+
+  block->offset = -1;
+
   if ((hlink = gap_list))
     while (hlink)
       {
@@ -460,6 +462,153 @@ gegl_tile_backend_swap_destroy (ThreadParams *params)
   total -= end - start;
 }
 
+static void
+gegl_tile_backend_swap_free_data (ThreadParams *params)
+{
+  if (params->tile || params->compressed)
+    {
+      if (params->tile)
+        {
+          queued_total -= params->size;
+
+          gegl_tile_unref (params->tile);
+          params->tile = NULL;
+        }
+      else
+        {
+          queued_total -= params->compressed_size;
+
+          g_free (params->compressed);
+          params->compressed = NULL;
+        }
+
+      queued_cost -= params->compressed_size;
+
+      if (queued_cost <= queued_max &&
+          queued_cost + params->compressed_size > queued_max)
+        {
+          g_cond_broadcast (&push_cond);
+        }
+    }
+}
+
+static void
+gegl_tile_backend_swap_write (ThreadParams *params)
+{
+  const guint8 *data;
+  gint64        offset = params->block->offset;
+  gint          to_be_written;
+
+  gegl_tile_backend_swap_ensure_exist ();
+
+  if (params->tile)
+    {
+      data          = gegl_tile_get_data (params->tile);
+      to_be_written = params->size;
+
+      if (params->block->compression)
+        {
+          gint bpp = babl_format_get_bytes_per_pixel (params->format);
+          gint compressed_size;
+          gint max_compressed_size;
+
+          max_compressed_size = params->size * COMPRESSION_MAX_RATIO;
+
+          if (max_compressed_size > compression_buffer_size)
+            {
+              compression_buffer      = g_realloc (compression_buffer,
+                                                   max_compressed_size);
+              compression_buffer_size = max_compressed_size;
+            }
+
+          if (gegl_compression_compress (params->block->compression,
+                                         params->format,
+                                         data, params->size / bpp,
+                                         compression_buffer, &compressed_size,
+                                         max_compressed_size))
+            {
+              data          = compression_buffer;
+              to_be_written = compressed_size;
+            }
+          else
+            {
+              params->block->compression = NULL;
+            }
+        }
+    }
+  else
+    {
+      data          = params->compressed;
+      to_be_written = params->compressed_size;
+    }
+
+  if (offset >= 0 && params->block->size != to_be_written)
+    {
+      g_atomic_pointer_add (&total_uncompressed, -params->size);
+
+      gegl_tile_backend_swap_free_block (params->block);
+
+      offset = -1;
+    }
+
+  if (offset < 0)
+    {
+      /* storage for entry not allocated yet.  allocate now. */
+      offset = gegl_tile_backend_swap_find_offset (to_be_written);
+
+      params->block->offset = offset;
+      params->block->size   = to_be_written;
+
+      g_atomic_pointer_add (&total_uncompressed, +params->size);
+    }
+
+  if (out_offset != offset)
+    {
+      if (lseek (out_fd, offset, SEEK_SET) < 0)
+        {
+          g_warning ("unable to seek to tile in buffer: %s", g_strerror (errno));
+          return;
+        }
+      out_offset = offset;
+    }
+
+  writing = TRUE;
+
+  while (to_be_written > 0)
+    {
+      gint wrote;
+      wrote = write (out_fd, data, to_be_written);
+      if (wrote <= 0)
+        {
+          g_message ("unable to write tile data to self: "
+                     "%s (%d/%d bytes written)",
+                     g_strerror (errno), wrote, to_be_written);
+          break;
+        }
+
+      data          += wrote;
+      to_be_written -= wrote;
+      out_offset    += wrote;
+
+      write_total   += wrote;
+    }
+
+  writing = FALSE;
+
+  GEGL_NOTE (GEGL_DEBUG_TILE_BACKEND, "writer thread wrote at %i", (gint)offset);
+}
+
+static void
+gegl_tile_backend_swap_destroy (ThreadParams *params)
+{
+  if (params->block->offset >= 0)
+    g_atomic_pointer_add (&total_uncompressed, -params->size);
+
+  gegl_tile_backend_swap_free_block (params->block);
+
+  gegl_tile_backend_swap_block_free (params->block);
+}
+
 static gpointer
 gegl_tile_backend_swap_writer_thread (gpointer ignored)
 {
@@ -502,19 +651,7 @@ gegl_tile_backend_swap_writer_thread (gpointer ignored)
 
       in_progress = NULL;
 
-      if (params->tile)
-        {
-          gegl_tile_unref (params->tile);
-
-          queued_total -= params->length;
-          queued_cost  -= params->cost;
-
-          if (queued_cost <= queued_max &&
-              queued_cost + params->cost > queued_max)
-            {
-              g_cond_broadcast (&push_cond);
-            }
-        }
+      gegl_tile_backend_swap_free_data (params);
 
       g_slice_free (ThreadParams, params);
     }
@@ -528,11 +665,19 @@ static GeglTile *
 gegl_tile_backend_swap_entry_read (GeglTileBackendSwap *self,
                                    SwapEntry           *entry)
 {
-  gint      tile_size  = gegl_tile_backend_get_tile_size (GEGL_TILE_BACKEND (self));
-  gint      to_be_read = tile_size;
-  gint64    offset;
-  GeglTile *tile;
-  guchar   *dest;
+  GeglTileBackend *backend = GEGL_TILE_BACKEND (self);
+  const Babl      *format;
+  GeglTile        *tile;
+  guint8          *data;
+  guint8          *dest;
+  gint64           offset;
+  gint             tile_size;
+  gint             bpp;
+  gint             to_be_read;
+
+  format    = gegl_tile_backend_get_format (backend);
+  tile_size = gegl_tile_backend_get_tile_size (backend);
+  bpp       = babl_format_get_bytes_per_pixel (format);
 
   g_mutex_lock (&queue_mutex);
 
@@ -547,7 +692,23 @@ gegl_tile_backend_swap_entry_read (GeglTileBackendSwap *self,
 
       if (queued_op)
         {
-          tile = gegl_tile_dup (queued_op->tile);
+          if (queued_op->tile)
+            {
+              tile = gegl_tile_dup (queued_op->tile);
+            }
+          else
+            {
+              tile = gegl_tile_new (tile_size);
+              dest = gegl_tile_get_data (tile);
+
+              if (! gegl_compression_decompress (
+                      entry->block->compression, format,
+                      dest, tile_size / bpp,
+                      queued_op->compressed, queued_op->compressed_size))
+                {
+                  g_warning ("failed to decompress tile");
+                }
+            }
 
           g_mutex_unlock (&queue_mutex);
 
@@ -573,6 +734,11 @@ gegl_tile_backend_swap_entry_read (GeglTileBackendSwap *self,
   dest = gegl_tile_get_data (tile);
   gegl_tile_mark_as_stored (tile);
 
+  if (entry->block->compression)
+    data = g_malloc (entry->block->size);
+  else
+    data = dest;
+
   g_mutex_lock (&read_mutex);
 
   if (in_offset != offset)
@@ -589,33 +755,54 @@ gegl_tile_backend_swap_entry_read (GeglTileBackendSwap *self,
 
   reading = TRUE;
 
+  to_be_read = entry->block->size;
+
   while (to_be_read > 0)
     {
       GError *error = NULL;
-      gint    byte_read;
+      gint    bytes_read;
+
+      bytes_read = read (in_fd,
+                         data + entry->block->size - to_be_read, to_be_read);
 
-      byte_read = read (in_fd, dest + tile_size - to_be_read, to_be_read);
-      if (byte_read <= 0)
+      if (bytes_read <= 0)
         {
           reading = FALSE;
 
           g_mutex_unlock (&read_mutex);
 
+          if (entry->block->compression)
+            g_free (data);
+
           g_message ("unable to read tile data from swap: "
                      "%s (%d/%d bytes read) %s",
-                     g_strerror (errno), byte_read, to_be_read, error?error->message:"--");
+                     g_strerror (errno), bytes_read, to_be_read, error?error->message:"--");
           return tile;
         }
-      to_be_read -= byte_read;
-      in_offset  += byte_read;
 
-      read_total += byte_read;
+      to_be_read -= bytes_read;
+      in_offset  += bytes_read;
+
+      read_total += bytes_read;
     }
 
   reading = FALSE;
 
   g_mutex_unlock (&read_mutex);
 
+  if (entry->block->compression)
+    {
+      if (! gegl_compression_decompress (
+              entry->block->compression, format,
+              dest, tile_size / bpp,
+              data, entry->block->size))
+        {
+          g_warning ("failed to decompress tile");
+        }
+
+      g_free (data);
+    }
+
   GEGL_NOTE(GEGL_DEBUG_TILE_BACKEND, "read entry %i, %i, %i from %i", entry->x, entry->y, entry->z, 
(gint)offset);
 
   return tile;
@@ -626,36 +813,58 @@ gegl_tile_backend_swap_entry_write (GeglTileBackendSwap *self,
                                     SwapEntry           *entry,
                                     GeglTile            *tile)
 {
-  ThreadParams *params;
-  gint          n_clones;
-  gint          length;
-  gint          cost;
+  GeglTileBackend *backend = GEGL_TILE_BACKEND (self);
+  ThreadParams    *params;
+  gint             n_clones;
+  gint             size;
+  gint             cost;
 
   n_clones = *gegl_tile_n_clones (tile);
-  length   = gegl_tile_backend_get_tile_size (GEGL_TILE_BACKEND (self));
-  cost     = (length + n_clones / 2) / n_clones;
+  size     = gegl_tile_backend_get_tile_size (backend);
+  cost     = (size + n_clones / 2) / n_clones;
 
   g_mutex_lock (&queue_mutex);
 
   if (entry->block->link)
     {
       params = entry->block->link->data;
+
       g_assert (params->operation == OP_WRITE);
-      gegl_tile_unref (params->tile);
-      params->tile = gegl_tile_dup (tile);
-      g_mutex_unlock (&queue_mutex);
 
-      GEGL_NOTE(GEGL_DEBUG_TILE_BACKEND, "tile %i, %i, %i at %i is already enqueued, changed data", 
entry->x, entry->y, entry->z, (gint)entry->block->offset);
+      gegl_tile_backend_swap_free_data (params);
 
-      return;
+      if (queued_cost <= queued_max)
+        {
+          params->block->compression = compression;
+
+          params->tile            = gegl_tile_dup (tile);
+          params->compressed_size = cost;
+
+          queued_total += size;
+          queued_cost  += cost;
+
+          g_mutex_unlock (&queue_mutex);
+
+          GEGL_NOTE(GEGL_DEBUG_TILE_BACKEND, "tile %i, %i, %i at %i is already enqueued, changed data", 
entry->x, entry->y, entry->z, (gint)entry->block->offset);
+
+          return;
+        }
+
+      g_queue_delete_link (queue, entry->block->link);
+      entry->block->link = NULL;
+    }
+  else
+    {
+      params = g_slice_new0 (ThreadParams);
     }
 
-  params            = g_slice_new0 (ThreadParams);
-  params->operation = OP_WRITE;
-  params->length    = length;
-  params->cost      = cost;
-  params->tile      = gegl_tile_dup (tile);
-  params->block     = entry->block;
+  params->operation       = OP_WRITE;
+  params->block           = entry->block;
+  params->format          = gegl_tile_backend_get_format (backend);
+  params->tile            = gegl_tile_dup (tile);
+  params->compressed      = NULL;
+  params->size            = size;
+  params->compressed_size = cost;
 
   gegl_tile_backend_swap_push_queue (params, /* head = */ FALSE);
 
@@ -691,7 +900,7 @@ gegl_tile_backend_swap_block_ref (GeglTileBackendSwap *self,
   g_atomic_int_inc (&block->ref_count);
 
   g_atomic_pointer_add (
-    &cloned_total,
+    &total_uncompressed,
     +gegl_tile_backend_get_tile_size (GEGL_TILE_BACKEND (self)));
 
   return block;
@@ -712,20 +921,7 @@ gegl_tile_backend_swap_block_unref (GeglTileBackendSwap *self,
           GList        *link      = block->link;
           ThreadParams *queued_op = link->data;
 
-          if (queued_op->tile)
-            {
-              gegl_tile_unref (queued_op->tile);
-              queued_op->tile = NULL;
-
-              queued_total -= queued_op->length;
-              queued_cost  -= queued_op->cost;
-
-              if (queued_cost <= queued_max &&
-                  queued_cost + queued_op->cost > queued_max)
-                {
-                  g_cond_broadcast (&push_cond);
-                }
-            }
+          gegl_tile_backend_swap_free_data (queued_op);
 
           /* reuse the queued op, changing it to an OP_DESTROY. */
           queued_op->operation = OP_DESTROY;
@@ -742,9 +938,9 @@ gegl_tile_backend_swap_block_unref (GeglTileBackendSwap *self,
 
           params            = g_slice_new0 (ThreadParams);
           params->operation = OP_DESTROY;
-          params->length    = gegl_tile_backend_get_tile_size (
-                                GEGL_TILE_BACKEND (self));
           params->block     = block;
+          params->size      = gegl_tile_backend_get_tile_size (
+                                GEGL_TILE_BACKEND (self));
 
           /* push the destroy op at the top of the queue, so that it gets
            * served before any write ops, which are then free to reuse the
@@ -759,7 +955,7 @@ gegl_tile_backend_swap_block_unref (GeglTileBackendSwap *self,
   else
     {
       g_atomic_pointer_add (
-        &cloned_total,
+        &total_uncompressed,
         -gegl_tile_backend_get_tile_size (GEGL_TILE_BACKEND (self)));
     }
 }
@@ -1124,6 +1320,26 @@ gegl_tile_backend_swap_ensure_exist (void)
     }
 }
 
+static void
+gegl_tile_backend_swap_compression_notify (GObject    *config,
+                                           GParamSpec *pspec,
+                                           gpointer    data)
+{
+  gchar *swap_compression;
+
+  g_mutex_lock (&queue_mutex);
+
+  g_object_get (config,
+                "swap-compression", &swap_compression,
+                NULL);
+
+  compression = gegl_compression (swap_compression);
+
+  g_free (swap_compression);
+
+  g_mutex_unlock (&queue_mutex);
+}
+
 static void
 gegl_tile_backend_swap_tile_cache_size_notify (GObject    *config,
                                                GParamSpec *pspec,
@@ -1157,6 +1373,13 @@ gegl_tile_backend_swap_class_init (GeglTileBackendSwapClass *klass)
                                 gegl_tile_backend_swap_writer_thread,
                                 NULL);
 
+  g_signal_connect (gegl_buffer_config (), "notify::swap-compression",
+                    G_CALLBACK (gegl_tile_backend_swap_compression_notify),
+                    NULL);
+
+  gegl_tile_backend_swap_compression_notify (G_OBJECT (gegl_buffer_config ()),
+                                             NULL, NULL);
+
   g_signal_connect (gegl_buffer_config (), "notify::tile-cache-size",
                     G_CALLBACK (gegl_tile_backend_swap_tile_cache_size_notify),
                     NULL);
@@ -1176,6 +1399,11 @@ gegl_tile_backend_swap_cleanup (void)
     gegl_tile_backend_swap_tile_cache_size_notify,
     NULL);
 
+  g_signal_handlers_disconnect_by_func (
+    gegl_buffer_config (),
+    gegl_tile_backend_swap_compression_notify,
+    NULL);
+
   g_mutex_lock (&queue_mutex);
   exit_thread = TRUE;
   g_cond_signal (&queue_cond);
@@ -1189,6 +1417,9 @@ gegl_tile_backend_swap_cleanup (void)
   g_queue_free (queue);
   queue = NULL;
 
+  g_clear_pointer (&compression_buffer, g_free);
+  compression_buffer_size = 0;
+
   if (gap_list)
     {
       SwapGap *gap = gap_list->data;
@@ -1251,7 +1482,7 @@ gegl_tile_backend_swap_get_total (void)
 guint64
 gegl_tile_backend_swap_get_total_uncloned (void)
 {
-  return total + cloned_total;
+  return total_uncompressed;
 }
 
 guint64
diff --git a/gegl/gegl-config.c b/gegl/gegl-config.c
index b24388970..30e7af1e7 100644
--- a/gegl/gegl-config.c
+++ b/gegl/gegl-config.c
@@ -41,6 +41,7 @@ enum
   PROP_TILE_CACHE_SIZE,
   PROP_CHUNK_SIZE,
   PROP_SWAP,
+  PROP_SWAP_COMPRESSION,
   PROP_TILE_WIDTH,
   PROP_TILE_HEIGHT,
   PROP_THREADS,
@@ -85,6 +86,10 @@ gegl_config_get_property (GObject    *gobject,
         g_value_set_string (value, config->swap);
         break;
 
+      case PROP_SWAP_COMPRESSION:
+        g_value_set_string (value, config->swap_compression);
+        break;
+
       case PROP_THREADS:
         g_value_set_int (value, _gegl_threads);
         break;
@@ -136,6 +141,10 @@ gegl_config_set_property (GObject      *gobject,
         g_free (config->swap);
         config->swap = g_value_dup_string (value);
         break;
+      case PROP_SWAP_COMPRESSION:
+        g_free (config->swap_compression);
+        config->swap_compression = g_value_dup_string (value);
+        break;
       case PROP_THREADS:
         _gegl_threads = g_value_get_int (value);
         return;
@@ -161,6 +170,7 @@ gegl_config_finalize (GObject *gobject)
   GeglConfig *config = GEGL_CONFIG (gobject);
 
   g_free (config->swap);
+  g_free (config->swap_compression);
   g_free (config->application_license);
 
   G_OBJECT_CLASS (gegl_config_parent_class)->finalize (gobject);
@@ -222,6 +232,13 @@ gegl_config_class_init (GeglConfigClass *klass)
                                                         NULL,
                                                         G_PARAM_READWRITE));
 
+  g_object_class_install_property (gobject_class, PROP_SWAP_COMPRESSION,
+                                   g_param_spec_string ("swap-compression",
+                                                        "Swap compression",
+                                                        "compression algorithm used for data stored in the 
swap",
+                                                        NULL,
+                                                        G_PARAM_READWRITE));
+
   _gegl_threads = g_get_num_processors ();
   _gegl_threads = MIN (_gegl_threads, GEGL_MAX_THREADS);
   g_object_class_install_property (gobject_class, PROP_THREADS,
@@ -261,6 +278,7 @@ static void
 gegl_config_init (GeglConfig *self)
 {
   char *forward_props[]={"swap",
+                         "swap-compression",
                          "queue-size",
                          "tile-width",
                          "tile-height",
diff --git a/gegl/gegl-config.h b/gegl/gegl-config.h
index f830390a5..7ebb7f85b 100644
--- a/gegl/gegl-config.h
+++ b/gegl/gegl-config.h
@@ -36,6 +36,7 @@ struct _GeglConfig
   GObject  parent_instance;
 
   gchar   *swap;
+  gchar   *swap_compression;
   guint64  tile_cache_size;
   gint     chunk_size; /* The size of elements being processed at once */
   gdouble  quality;
diff --git a/gegl/gegl-init.c b/gegl/gegl-init.c
index 99cd3618b..1b42cc2c2 100644
--- a/gegl/gegl-init.c
+++ b/gegl/gegl-init.c
@@ -77,6 +77,7 @@ guint gegl_debug_flags = 0;
 #include "buffer/gegl-buffer-private.h"
 #include "buffer/gegl-buffer-iterator-private.h"
 #include "buffer/gegl-buffer-swap-private.h"
+#include "buffer/gegl-compression.h"
 #include "buffer/gegl-tile-backend-ram.h"
 #include "buffer/gegl-tile-backend-file.h"
 #include "gegl-config.h"
@@ -196,13 +197,14 @@ gegl_init (gint    *argc,
   g_option_context_free (context);
 }
 
-static gchar    *cmd_gegl_swap           = NULL;
-static gchar    *cmd_gegl_cache_size     = NULL;
-static gchar    *cmd_gegl_chunk_size     = NULL;
-static gchar    *cmd_gegl_quality        = NULL;
-static gchar    *cmd_gegl_tile_size      = NULL;
-static gchar    *cmd_gegl_threads        = NULL;
-static gboolean *cmd_gegl_disable_opencl = NULL;
+static gchar    *cmd_gegl_swap             = NULL;
+static gchar    *cmd_gegl_swap_compression = NULL;
+static gchar    *cmd_gegl_cache_size       = NULL;
+static gchar    *cmd_gegl_chunk_size       = NULL;
+static gchar    *cmd_gegl_quality          = NULL;
+static gchar    *cmd_gegl_tile_size        = NULL;
+static gchar    *cmd_gegl_threads          = NULL;
+static gboolean *cmd_gegl_disable_opencl   = NULL;
 
 static const GOptionEntry cmd_entries[]=
 {
@@ -211,6 +213,11 @@ static const GOptionEntry cmd_entries[]=
      G_OPTION_ARG_STRING, &cmd_gegl_swap,
      N_("Where GEGL stores its swap"), "<uri>"
     },
+    {
+     "gegl-swap-compression", 0, 0,
+     G_OPTION_ARG_STRING, &cmd_gegl_swap_compression,
+     N_("Compression algorithm used for data stored in the swap"), "<algorithm>"
+    },
     {
      "gegl-cache-size", 0, 0,
      G_OPTION_ARG_STRING, &cmd_gegl_cache_size,
@@ -334,6 +341,13 @@ static void gegl_config_parse_env (GeglConfig *config)
 
   if (g_getenv ("GEGL_SWAP"))
     g_object_set (config, "swap", g_getenv ("GEGL_SWAP"), NULL);
+
+  if (g_getenv ("GEGL_SWAP_COMPRESSION"))
+    {
+      g_object_set (config,
+                    "swap-compression", g_getenv ("GEGL_SWAP_COMPRESSION"),
+                    NULL);
+    }
 }
 
 GeglConfig *gegl_config (void)
@@ -374,6 +388,7 @@ gegl_exit (void)
   gegl_tile_cache_destroy ();
   gegl_operation_gtype_cleanup ();
   gegl_operation_handlers_cleanup ();
+  gegl_compression_cleanup ();
   gegl_random_cleanup ();
   gegl_parallel_cleanup ();
   gegl_buffer_swap_cleanup ();
@@ -525,6 +540,8 @@ gegl_post_parse_hook (GOptionContext *context,
 
   if (cmd_gegl_swap)
     g_object_set (config, "swap", cmd_gegl_swap, NULL);
+  if (cmd_gegl_swap_compression)
+    g_object_set (config, "swap-compression", cmd_gegl_swap_compression, NULL);
   if (cmd_gegl_quality)
     config->quality = atof (cmd_gegl_quality);
   if (cmd_gegl_cache_size)
@@ -567,6 +584,7 @@ gegl_post_parse_hook (GOptionContext *context,
 
   gegl_buffer_swap_init ();
   gegl_parallel_init ();
+  gegl_compression_init ();
   gegl_operation_gtype_init ();
   gegl_tile_cache_init ();
 


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]