[gegl] buffer: in async file backend, overwrite GeglBufferBlock queue entries when flushing and don't use t



commit 3907a3a520c7555d07794a662b5807fdc1ed72be
Author: Ville Sokk <ville sokk gmail com>
Date:   Thu Aug 16 20:25:16 2012 +0300

    buffer: in async file backend, overwrite GeglBufferBlock queue entries when flushing and don't use two mutexes

 gegl/buffer/gegl-tile-backend-file-async.c |  239 ++++++++++++++++------------
 gegl/buffer/gegl-tile-backend-file.h       |    7 +-
 2 files changed, 145 insertions(+), 101 deletions(-)
---
diff --git a/gegl/buffer/gegl-tile-backend-file-async.c b/gegl/buffer/gegl-tile-backend-file-async.c
index b27f2be..020b63b 100644
--- a/gegl/buffer/gegl-tile-backend-file-async.c
+++ b/gegl/buffer/gegl-tile-backend-file-async.c
@@ -113,7 +113,7 @@ struct _GeglTileBackendFile
    * at all times to be able to keep track of the ->next offsets in
    * the blocks.
    */
-  GeglBufferBlock *in_holding;
+  GeglFileBackendEntry *in_holding;
 
   /* loading buffer */
   GList           *tiles;
@@ -137,11 +137,11 @@ struct _GeglTileBackendFile
 };
 
 
-static void     gegl_tile_backend_file_ensure_exist (GeglTileBackendFile *self);
-static gboolean gegl_tile_backend_file_write_block  (GeglTileBackendFile *self,
-                                                     GeglBufferBlock     *block);
-static void     gegl_tile_backend_file_dbg_alloc    (int                  size);
-static void     gegl_tile_backend_file_dbg_dealloc  (int                  size);
+static void     gegl_tile_backend_file_ensure_exist (GeglTileBackendFile  *self);
+static gboolean gegl_tile_backend_file_write_block  (GeglTileBackendFile  *self,
+                                                     GeglFileBackendEntry *block);
+static void     gegl_tile_backend_file_dbg_alloc    (int                   size);
+static void     gegl_tile_backend_file_dbg_dealloc  (int                   size);
 
 
 G_DEFINE_TYPE (GeglTileBackendFile, gegl_tile_backend_file, GEGL_TYPE_TILE_BACKEND)
@@ -154,20 +154,20 @@ static gint file_size      = 0;
 static gint peak_allocs    = 0;
 static gint peak_file_size = 0;
 
-static GQueue  queue       = G_QUEUE_INIT;
-static GMutex *queue_mutex = NULL;
-static GCond  *queue_cond  = NULL;
-static GCond  *max_cond    = NULL;
-static GMutex *write_mutex = NULL;
+static GQueue  queue      = G_QUEUE_INIT;
+static GMutex *mutex      = NULL;
+static GCond  *queue_cond = NULL;
+static GCond  *max_cond   = NULL;
+static GeglFileBackendThreadParams *in_progress;
 
 
 static void
 gegl_tile_backend_file_finish_writing (GeglTileBackendFile *self)
 {
-  g_mutex_lock (queue_mutex);
+  g_mutex_lock (mutex);
   while (self->pending_ops != 0)
-    g_cond_wait (self->cond, queue_mutex);
-  g_mutex_unlock (queue_mutex);
+    g_cond_wait (self->cond, mutex);
+  g_mutex_unlock (mutex);
 }
 
 static void
@@ -175,24 +175,29 @@ gegl_tile_backend_file_push_queue (GeglFileBackendThreadParams *params)
 {
   guint length;
 
-  g_mutex_lock (queue_mutex);
+  g_mutex_lock (mutex);
 
   length = g_queue_get_length (&queue);
 
   /* block if the queue has gotten too big */
   if (length > gegl_config ()->queue_limit)
-    g_cond_wait (max_cond, queue_mutex);
+    g_cond_wait (max_cond, mutex);
 
   params->file->pending_ops += 1;
   g_queue_push_tail (&queue, params);
 
   if (params->entry)
-    params->entry->link = g_queue_peek_tail_link (&queue);
+    {
+      if (params->operation == OP_WRITE)
+        params->entry->tile_link = g_queue_peek_tail_link (&queue);
+      else /* OP_WRITE_BLOCK */
+        params->entry->block_link = g_queue_peek_tail_link (&queue);
+    }
 
   if (length == 0) /* wake up the writer thread */
     g_cond_signal (queue_cond);
 
-  g_mutex_unlock (queue_mutex);
+  g_mutex_unlock (mutex);
 }
 
 static inline void
@@ -240,27 +245,30 @@ gegl_tile_backend_file_writer_thread (gpointer ignored)
     {
       GeglFileBackendThreadParams *params;
 
-      g_mutex_lock (queue_mutex);
+      g_mutex_lock (mutex);
 
       while (g_queue_is_empty (&queue))
-        g_cond_wait (queue_cond, queue_mutex);
+        g_cond_wait (queue_cond, mutex);
 
-      g_mutex_lock (write_mutex);
-      if (g_queue_is_empty (&queue))
+      params = (GeglFileBackendThreadParams *)g_queue_pop_head (&queue);
+      if (params->entry)
         {
-          g_mutex_unlock (queue_mutex);
-          g_mutex_unlock (write_mutex);
-          continue;
+          in_progress = params;
+          if (params->operation == OP_WRITE)
+            params->entry->tile_link = NULL;
+          else /* OP_WRITE_BLOCK */
+            params->entry->block_link = NULL;
         }
-
-      params = (GeglFileBackendThreadParams *)g_queue_peek_head (&queue);
-      g_mutex_unlock (queue_mutex);
+      g_mutex_unlock (mutex);
 
       switch (params->operation)
         {
         case OP_WRITE:
           gegl_tile_backend_file_write (params);
           break;
+        case OP_WRITE_BLOCK:
+          gegl_tile_backend_file_write (params);
+          break;
         case OP_TRUNCATE:
           ftruncate (params->file->o, params->length);
           break;
@@ -269,8 +277,8 @@ gegl_tile_backend_file_writer_thread (gpointer ignored)
           break;
         }
 
-      g_mutex_lock (queue_mutex);
-      g_queue_pop_head (&queue);
+      g_mutex_lock (mutex);
+      in_progress = NULL;
 
       /* the file maybe waiting for its file operations to finish */
       params->file->pending_ops -= 1;
@@ -281,16 +289,12 @@ gegl_tile_backend_file_writer_thread (gpointer ignored)
       if (g_queue_get_length (&queue) < gegl_config ()->queue_limit)
         g_cond_signal (max_cond);
 
-      if (params->entry)
-        params->entry->link = NULL;
-
-      if (params->operation == OP_WRITE)
+      if (params->operation == OP_WRITE || params->operation == OP_WRITE_BLOCK)
         g_free (params->source);
 
       g_free (params);
 
-      g_mutex_unlock (queue_mutex);
-      g_mutex_unlock (write_mutex);
+      g_mutex_unlock (mutex);
     }
 
   return NULL;
@@ -307,17 +311,24 @@ gegl_tile_backend_file_entry_read (GeglTileBackendFile  *self,
 
   gegl_tile_backend_file_ensure_exist (self);
 
-  g_mutex_lock (queue_mutex);
-  if (entry->link)
+  if (entry->tile_link)
     {
-      memcpy (dest,
-              ((GeglFileBackendThreadParams *)entry->link->data)->source,
-              to_be_read);
-      g_mutex_unlock (queue_mutex);
-      GEGL_NOTE (GEGL_DEBUG_TILE_BACKEND, "read entry %i,%i,%i from write queue", entry->tile->x, entry->tile->y, entry->tile->z);
-      return;
+      g_mutex_lock (mutex);
+
+      if (entry->tile_link)
+        {
+          memcpy (dest,
+                  ((GeglFileBackendThreadParams *)entry->tile_link->data)->source,
+                  to_be_read);
+          g_mutex_unlock (mutex);
+
+          GEGL_NOTE (GEGL_DEBUG_TILE_BACKEND, "read entry %i,%i,%i from queue", entry->tile->x, entry->tile->y, entry->tile->z);
+
+          return;
+        }
+
+      g_mutex_unlock (mutex);
     }
-  g_mutex_unlock (queue_mutex);
 
   if (self->in_offset != offset)
     {
@@ -360,19 +371,22 @@ gegl_tile_backend_file_entry_write (GeglTileBackendFile  *self,
 
   gegl_tile_backend_file_ensure_exist (self);
 
-  if (entry->link)
+  if (entry->tile_link)
     {
-      g_mutex_lock (write_mutex);
+      g_mutex_lock (mutex);
 
-      if (entry->link)
+      if (entry->tile_link)
         {
-          params = (GeglFileBackendThreadParams *)entry->link->data;
+          params = entry->tile_link->data;
           memcpy (params->source, source, length);
-          g_mutex_unlock (write_mutex);
+          g_mutex_unlock (mutex);
+
+          GEGL_NOTE (GEGL_DEBUG_TILE_BACKEND, "overwrote queue entry %i,%i,%i at %i", entry->tile->x, entry->tile->y, entry->tile->z, (gint)entry->tile->offset);
+
           return;
         }
 
-      g_mutex_unlock (write_mutex);
+      g_mutex_unlock (mutex);
     }
 
   new_source = g_malloc (length);
@@ -398,8 +412,9 @@ gegl_tile_backend_file_file_entry_create (gint x,
 {
   GeglFileBackendEntry *entry = g_new0 (GeglFileBackendEntry, 1);
 
-  entry->tile = gegl_tile_entry_new (x, y, z);
-  entry->link = NULL;
+  entry->tile       = gegl_tile_entry_new (x, y, z);
+  entry->tile_link  = NULL;
+  entry->block_link = NULL;
 
   return entry;
 }
@@ -462,27 +477,28 @@ gegl_tile_backend_file_file_entry_destroy (GeglFileBackendEntry *entry,
   /* XXX: EEEk, throwing away bits */
   guint offset = entry->tile->offset;
 
-  /* Write mutex is used to completely stop the writer thread from working,
-   * which is required to remove the entry from the queue. entry->link is
-   * checked twice because while waiting for the mutex the writer thread
-   * may have already removed it. If we would lock the mutex before the
-   * first check we would spend too much time since it's not that common to
-   * have the entry in queue.
-   */
-  if (entry->link)
+  if (entry->tile_link || entry->block_link)
     {
-      g_mutex_lock (write_mutex);
+      gint   i;
+      GList *link;
 
-      if (entry->link)
+      g_mutex_lock (mutex);
+
+      for (i = 0, link = entry->tile_link;
+           i < 2;
+           i++, link = entry->block_link)
         {
-          GeglFileBackendThreadParams *queued_op = entry->link->data;
-          queued_op->file->pending_ops -= 1;
-          g_queue_delete_link (&queue, entry->link);
-          g_free (queued_op->source);
-          g_free (queued_op);
+          if (link)
+            {
+              GeglFileBackendThreadParams *queued_op = link->data;
+              queued_op->file->pending_ops -= 1;
+              g_queue_delete_link (&queue, link);
+              g_free (queued_op->source);
+              g_free (queued_op);
+            }
         }
 
-      g_mutex_unlock (write_mutex);
+      g_mutex_unlock (mutex);
     }
 
   self->free_list = g_slist_prepend (self->free_list,
@@ -490,6 +506,7 @@ gegl_tile_backend_file_file_entry_destroy (GeglFileBackendEntry *entry,
   g_hash_table_remove (self->index, entry);
 
   gegl_tile_backend_file_dbg_dealloc (gegl_tile_backend_get_tile_size (GEGL_TILE_BACKEND (self)));
+
   g_free (entry->tile);
   g_free (entry);
 }
@@ -524,47 +541,72 @@ gegl_tile_backend_file_write_header (GeglTileBackendFile *self)
 }
 
 static gboolean
-gegl_tile_backend_file_write_block (GeglTileBackendFile *self,
-                                    GeglBufferBlock     *block)
+gegl_tile_backend_file_write_block (GeglTileBackendFile  *self,
+                                    GeglFileBackendEntry *item)
 {
   gegl_tile_backend_file_ensure_exist (self);
   if (self->in_holding)
     {
-      guint64 next_allocation = self->offset + self->in_holding->length;
+      GeglFileBackendThreadParams *params;
+      GeglBufferBlock *block           = &(self->in_holding->tile->block);
+      guint64          next_allocation = self->offset + block->length;
+      gint             length          = block->length;
+      guchar          *new_source;
 
       /* update the next offset pointer in the previous block */
-      if (block == NULL)
+      if (item == NULL)
           /* the previous block was the last block */
-          self->in_holding->next = 0;
+          block->next = 0;
       else
-          self->in_holding->next = next_allocation;
+          block->next = next_allocation;
 
       /* XXX: should promiscuosuly try to compress here as well,. if revisions
               are not matching..
        */
 
-      {
-        GeglFileBackendThreadParams *params =
-          g_new0 (GeglFileBackendThreadParams, 1);
-        gint    length     = self->in_holding->length;
-        guchar *new_source = g_malloc (length);
+      if (self->in_holding->block_link)
+        {
+          g_mutex_lock (mutex);
+
+          if (self->in_holding->block_link)
+            {
+              params = self->in_holding->block_link->data;
+              params->offset = self->offset;
+              memcpy (params->source, block, length);
+              g_mutex_unlock (mutex);
+
+              self->offset = next_allocation;
+              self->in_holding = item;
+              GEGL_NOTE (GEGL_DEBUG_TILE_BACKEND, "Overwrote queue block: length:%i flags:%i next:%i at offset %i",
+                         block->length,
+                         block->flags,
+                         (gint)block->next,
+                         (gint)self->offset);
+              return TRUE;
+            }
+
+          g_mutex_unlock (mutex);
+        }
+
+      params     = g_new0 (GeglFileBackendThreadParams, 1);
+      new_source = g_malloc (length);
 
-        memcpy (new_source, self->in_holding, length);
+      memcpy (new_source, self->in_holding, length);
 
-        params->operation = OP_WRITE;
-        params->length    = length;
-        params->file      = self;
-        params->offset    = self->offset;
-        params->source    = new_source;
+      params->operation = OP_WRITE_BLOCK;
+      params->length    = length;
+      params->file      = self;
+      params->offset    = self->offset;
+      params->source    = new_source;
+      params->entry     = self->in_holding;
 
-        gegl_tile_backend_file_push_queue (params);
+      gegl_tile_backend_file_push_queue (params);
 
-        GEGL_NOTE (GEGL_DEBUG_TILE_BACKEND, "Pushed write of block: length:%i flags:%i next:%i at offset %i",
-                   self->in_holding->length,
-                   self->in_holding->flags,
-                   (gint)self->in_holding->next,
-                   (gint)self->offset);
-      }
+      GEGL_NOTE (GEGL_DEBUG_TILE_BACKEND, "Pushed write of block: length:%i flags:%i next:%i at offset %i",
+                 block->length,
+                 block->flags,
+                 (gint)block->next,
+                 (gint)self->offset);
 
       self->offset = next_allocation;
     }
@@ -576,7 +618,7 @@ gegl_tile_backend_file_write_block (GeglTileBackendFile *self,
                                             * header inside free list later
                                             */
     }
-  self->in_holding = block;
+  self->in_holding = item;
 
   return TRUE;
 }
@@ -758,7 +800,7 @@ gegl_tile_backend_file_flush (GeglTileSource *source,
         {
           GeglFileBackendEntry *item = iter->data;
 
-          gegl_tile_backend_file_write_block (self, &(item->tile->block));
+          gegl_tile_backend_file_write_block (self, item);
         }
       gegl_tile_backend_file_write_block (self, NULL); /* terminate the index */
       g_list_free (tiles);
@@ -1176,10 +1218,9 @@ gegl_tile_backend_file_class_init (GeglTileBackendFileClass *klass)
   gobject_class->constructor  = gegl_tile_backend_file_constructor;
   gobject_class->finalize     = gegl_tile_backend_file_finalize;
 
-  queue_cond  = g_cond_new ();
-  max_cond    = g_cond_new ();
-  queue_mutex = g_mutex_new ();
-  write_mutex = g_mutex_new ();
+  queue_cond = g_cond_new ();
+  max_cond   = g_cond_new ();
+  mutex      = g_mutex_new ();
   g_thread_create_full (gegl_tile_backend_file_writer_thread,
                         NULL, 0, TRUE, TRUE, G_THREAD_PRIORITY_NORMAL, NULL);
 
diff --git a/gegl/buffer/gegl-tile-backend-file.h b/gegl/buffer/gegl-tile-backend-file.h
index b9366e9..00ba5fc 100644
--- a/gegl/buffer/gegl-tile-backend-file.h
+++ b/gegl/buffer/gegl-tile-backend-file.h
@@ -42,6 +42,7 @@ typedef struct _GeglTileBackendFileClass GeglTileBackendFileClass;
 typedef enum
 {
   OP_WRITE,
+  OP_WRITE_BLOCK,
   OP_TRUNCATE,
   OP_SYNC
 } GeglFileBackendThreadOp;
@@ -49,8 +50,10 @@ typedef enum
 typedef struct
 {
   GeglBufferTile *tile;
-  /* reference to the writer queue link of this entry */
-  GList          *link;
+  /* reference to the writer queue links of this entry when writing
+     tile data or a GeglBufferBlock*/
+  GList          *tile_link;
+  GList          *block_link;
 } GeglFileBackendEntry;
 
 typedef struct



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]