[gegl] buffer: fix a bug in the async file backend; optimize and fix the memory mapped backend



commit 62e87b860e2952af1304f4feadb706841f3e0f23
Author: Ville Sokk <ville sokk gmail com>
Date:   Fri Aug 17 20:41:54 2012 +0300

    buffer: fix a bug in the async file backend; optimize and fix the memory mapped backend

 gegl/buffer/gegl-tile-backend-file-async.c  |   15 ++-
 gegl/buffer/gegl-tile-backend-file-mapped.c |  200 ++++++++++++++++++--------
 2 files changed, 148 insertions(+), 67 deletions(-)
---
diff --git a/gegl/buffer/gegl-tile-backend-file-async.c b/gegl/buffer/gegl-tile-backend-file-async.c
index 020b63b..a10fa95 100644
--- a/gegl/buffer/gegl-tile-backend-file-async.c
+++ b/gegl/buffer/gegl-tile-backend-file-async.c
@@ -289,7 +289,7 @@ gegl_tile_backend_file_writer_thread (gpointer ignored)
       if (g_queue_get_length (&queue) < gegl_config ()->queue_limit)
         g_cond_signal (max_cond);
 
-      if (params->operation == OP_WRITE || params->operation == OP_WRITE_BLOCK)
+      if (params->source)
         g_free (params->source);
 
       g_free (params);
@@ -311,15 +311,20 @@ gegl_tile_backend_file_entry_read (GeglTileBackendFile  *self,
 
   gegl_tile_backend_file_ensure_exist (self);
 
-  if (entry->tile_link)
+  if (entry->tile_link || in_progress)
     {
+      GeglFileBackendThreadParams *queued_op = NULL;
       g_mutex_lock (mutex);
 
       if (entry->tile_link)
+        queued_op = entry->tile_link->data;
+      else if (in_progress && in_progress->entry == entry &&
+               in_progress->operation == OP_WRITE)
+        queued_op = in_progress;
+
+      if (queued_op)
         {
-          memcpy (dest,
-                  ((GeglFileBackendThreadParams *)entry->tile_link->data)->source,
-                  to_be_read);
+          memcpy (dest, queued_op->source, to_be_read);
           g_mutex_unlock (mutex);
 
           GEGL_NOTE (GEGL_DEBUG_TILE_BACKEND, "read entry %i,%i,%i from queue", entry->tile->x, entry->tile->y, entry->tile->z);
diff --git a/gegl/buffer/gegl-tile-backend-file-mapped.c b/gegl/buffer/gegl-tile-backend-file-mapped.c
index b401bf8..3021146 100644
--- a/gegl/buffer/gegl-tile-backend-file-mapped.c
+++ b/gegl/buffer/gegl-tile-backend-file-mapped.c
@@ -105,7 +105,7 @@ struct _GeglTileBackendFile
    * at all times to be able to keep track of the ->next offsets in
    * the blocks.
    */
-  GeglBufferBlock *in_holding;
+  GeglFileBackendEntry *in_holding;
 
   /* loading buffer */
   GList           *tiles;
@@ -134,11 +134,11 @@ struct _GeglTileBackendFile
 };
 
 
-static void     gegl_tile_backend_file_ensure_exist (GeglTileBackendFile *self);
-static gboolean gegl_tile_backend_file_write_block  (GeglTileBackendFile *self,
-                                                     GeglBufferBlock     *block);
-static void     gegl_tile_backend_file_dbg_alloc    (int                  size);
-static void     gegl_tile_backend_file_dbg_dealloc  (int                  size);
+static void     gegl_tile_backend_file_ensure_exist (GeglTileBackendFile  *self);
+static gboolean gegl_tile_backend_file_write_block  (GeglTileBackendFile  *self,
+                                                     GeglFileBackendEntry *block);
+static void     gegl_tile_backend_file_dbg_alloc    (int                   size);
+static void     gegl_tile_backend_file_dbg_dealloc  (int                   size);
 
 
 G_DEFINE_TYPE (GeglTileBackendFile, gegl_tile_backend_file, GEGL_TYPE_TILE_BACKEND)
@@ -154,6 +154,7 @@ static GQueue  queue      = G_QUEUE_INIT;
 static GMutex *mutex      = NULL;
 static GCond  *queue_cond = NULL;
 static GCond  *max_cond   = NULL;
+static GeglFileBackendThreadParams *in_progress;
 
 
 static void
@@ -179,6 +180,15 @@ gegl_tile_backend_file_push_queue (GeglFileBackendThreadParams *params)
 
   params->file->pending_ops += 1;
   g_queue_push_tail (&queue, params);
+
+  if (params->entry)
+    {
+      if (params->operation == OP_WRITE)
+        params->entry->tile_link = g_queue_peek_tail_link (&queue);
+      else /* OP_WRITE_BLOCK */
+        params->entry->block_link = g_queue_peek_tail_link (&queue);
+    }
+
   if (length == 0) /* wake up the writer thread */
     g_cond_signal (queue_cond);
 
@@ -186,11 +196,12 @@ gegl_tile_backend_file_push_queue (GeglFileBackendThreadParams *params)
 }
 
 static void
-gegl_tile_backend_file_write (GeglTileBackendFile  *self,
-                              goffset               offset,
-                              gchar                *source,
-                              guint                 length,
-                              GeglFileBackendEntry *entry)
+gegl_tile_backend_file_write (GeglTileBackendFile     *self,
+                              goffset                  offset,
+                              gchar                   *source,
+                              guint                    length,
+                              GeglFileBackendEntry    *entry,
+                              GeglFileBackendThreadOp  operation)
 {
   if (g_queue_is_empty (&queue))
     {
@@ -198,19 +209,51 @@ gegl_tile_backend_file_write (GeglTileBackendFile  *self,
     }
   else
     {
-      GeglFileBackendThreadParams *params = g_new0 (GeglFileBackendThreadParams, 1);
-      guchar *new_source = g_malloc (length);
+      GeglFileBackendThreadParams *params = NULL;
+      guchar *new_source;
+
+      if (entry && (entry->tile_link || entry->block_link))
+        {
+          gchar *msg;
+
+          g_mutex_lock (mutex);
+
+          if (operation == OP_WRITE && entry->tile_link)
+            {
+              params = entry->tile_link->data;
+              msg = "entry";
+            }
+          else if (operation == OP_WRITE_BLOCK && entry->block_link)
+            {
+              params = entry->block_link->data;
+              msg = "block";
+            }
+
+          if (params)
+            {
+              memcpy (params->source, source, length);
+              params->offset = offset;
+              g_mutex_unlock (mutex);
+
+              GEGL_NOTE (GEGL_DEBUG_TILE_BACKEND, "overwrote queue %s %i,%i,%i at %i", msg, entry->tile->x, entry->tile->y, entry->tile->z, (gint)entry->tile->offset);
+
+              return;
+            }
+
+          g_mutex_unlock (mutex);
+        }
+
+      params = g_new0 (GeglFileBackendThreadParams, 1);
+      new_source = g_malloc (length);
 
       memcpy (new_source, source, length);
 
-      params->operation = OP_WRITE;
+      params->operation = operation;
       params->offset    = offset;
       params->file      = self;
       params->length    = length;
       params->source    = new_source;
-
-      if (entry)
-        entry->in_queue = TRUE;
+      params->entry     = entry;
 
       gegl_tile_backend_file_push_queue (params);
     }
@@ -251,7 +294,15 @@ gegl_tile_backend_file_writer_thread (gpointer ignored)
       while (g_queue_is_empty (&queue))
         g_cond_wait (queue_cond, mutex);
 
-      params = (GeglFileBackendThreadParams *)g_queue_peek_head (&queue);
+      params = (GeglFileBackendThreadParams *)g_queue_pop_head (&queue);
+      if (params->entry)
+        {
+          in_progress = params;
+          if (params->operation == OP_WRITE)
+            params->entry->tile_link = NULL;
+          else /* OP_WRITE_BLOCK */
+            params->entry->block_link = NULL;
+        }
       g_mutex_unlock (mutex);
 
       switch (params->operation)
@@ -260,6 +311,10 @@ gegl_tile_backend_file_writer_thread (gpointer ignored)
           memcpy (params->file->map + params->offset,
                   params->source, params->length);
           break;
+        case OP_WRITE_BLOCK:
+          memcpy (params->file->map + params->offset,
+                  params->source, params->length);
+          break;
         case OP_TRUNCATE:
           {
             GeglTileBackendFile *file = params->file;
@@ -281,7 +336,7 @@ gegl_tile_backend_file_writer_thread (gpointer ignored)
         }
 
       g_mutex_lock (mutex);
-      g_queue_pop_head (&queue);
+      in_progress = NULL;
 
       params->file->pending_ops -= 1;
       if (params->file->pending_ops == 0)
@@ -290,10 +345,7 @@ gegl_tile_backend_file_writer_thread (gpointer ignored)
       if (g_queue_get_length (&queue) < gegl_config ()->queue_limit)
         g_cond_signal (max_cond);
 
-      if (params->entry)
-        params->entry->in_queue = FALSE;
-
-      if (params->operation == OP_WRITE)
+      if (params->source)
         g_free (params->source);
 
       g_free (params);
@@ -314,28 +366,27 @@ gegl_tile_backend_file_file_entry_read (GeglTileBackendFile  *self,
 
   gegl_tile_backend_file_ensure_exist (self);
 
-  if (entry->in_queue)
+  if (entry->tile_link || in_progress)
     {
-      GList *link;
-
+      GeglFileBackendThreadParams *queued_op = NULL;
       g_mutex_lock (mutex);
 
-      link = g_queue_peek_tail_link (&queue);
+      if (entry->tile_link)
+        queued_op = entry->tile_link->data;
+      else if (in_progress && in_progress->entry == entry &&
+               in_progress->operation == OP_WRITE)
+        queued_op = in_progress;
 
-      while (link)
+      if (queued_op)
         {
-          GeglFileBackendThreadParams *queued_op =
-            (GeglFileBackendThreadParams *)link->data;
+          memcpy (dest, queued_op->source, tile_size);
+          g_mutex_unlock (mutex);
 
-          if (queued_op->file == self && queued_op->offset == offset)
-            {
-              memcpy (dest, queued_op->source, tile_size);
-              g_mutex_unlock (mutex);
-              GEGL_NOTE (GEGL_DEBUG_TILE_BACKEND, "read entry %i,%i,%i from write queue", entry->tile->x, entry->tile->y, entry->tile->z);
-              return;
-            }
-          link = link->prev;
+          GEGL_NOTE (GEGL_DEBUG_TILE_BACKEND, "read entry %i,%i,%i from queue", entry->tile->x, entry->tile->y, entry->tile->z);
+
+          return;
         }
+
       g_mutex_unlock (mutex);
     }
 
@@ -351,8 +402,9 @@ gegl_tile_backend_file_file_entry_create (gint x,
 {
   GeglFileBackendEntry *entry = g_new0 (GeglFileBackendEntry, 1);
 
-  entry->tile     = gegl_tile_entry_new (x, y, z);
-  entry->in_queue = FALSE;
+  entry->tile       = gegl_tile_entry_new (x, y, z);
+  entry->tile_link  = NULL;
+  entry->block_link = NULL;
 
   return entry;
 }
@@ -413,11 +465,37 @@ gegl_tile_backend_file_file_entry_destroy (GeglFileBackendEntry *entry,
 {
   /* XXX: EEEk, throwing away bits */
   guint offset = entry->tile->offset;
+
+  if (entry->tile_link || entry->block_link)
+    {
+      gint   i;
+      GList *link;
+
+      g_mutex_lock (mutex);
+
+      for (i = 0, link = entry->tile_link;
+           i < 2;
+           i++, link = entry->block_link)
+        {
+          if (link)
+            {
+              GeglFileBackendThreadParams *queued_op = link->data;
+              queued_op->file->pending_ops -= 1;
+              g_queue_delete_link (&queue, link);
+              g_free (queued_op->source);
+              g_free (queued_op);
+            }
+        }
+
+      g_mutex_unlock (mutex);
+    }
+
   self->free_list = g_slist_prepend (self->free_list,
                                      GUINT_TO_POINTER (offset));
   g_hash_table_remove (self->index, entry);
 
   gegl_tile_backend_file_dbg_dealloc (gegl_tile_backend_get_tile_size (GEGL_TILE_BACKEND (self)));
+
   g_free (entry->tile);
   g_free (entry);
 }
@@ -427,56 +505,54 @@ gegl_tile_backend_file_write_header (GeglTileBackendFile *self)
 {
   gegl_tile_backend_file_ensure_exist (self);
 
-  gegl_tile_backend_file_write (self, 0, (gchar*)&(self->header), 256, NULL);
+  gegl_tile_backend_file_write (self, 0, (gchar*)&(self->header), 256, NULL, OP_WRITE);
 
   GEGL_NOTE (GEGL_DEBUG_TILE_BACKEND, "Wrote header, next=%i", (gint)self->header.next);
   return TRUE;
 }
 
 static gboolean
-gegl_tile_backend_file_write_block (GeglTileBackendFile *self,
-                                    GeglBufferBlock     *block)
+gegl_tile_backend_file_write_block (GeglTileBackendFile  *self,
+                                    GeglFileBackendEntry *item)
 {
   gegl_tile_backend_file_ensure_exist (self);
   if (self->in_holding)
     {
-      gint    offset          = self->offset;
-      guint64 next_allocation = offset + self->in_holding->length;
+      GeglBufferBlock *block           = &(self->in_holding->tile->block);
+      guint64          next_allocation = self->offset + block->length;
+      gint             length          = block->length;
 
       /* update the next offset pointer in the previous block */
-      if (block == NULL)
+      if (item == NULL)
           /* the previous block was the last block */
-          self->in_holding->next = 0;
+          block->next = 0;
       else
-          self->in_holding->next = next_allocation;
+          block->next = next_allocation;
 
       /* XXX: should promiscuosuly try to compress here as well,. if revisions
               are not matching..
        */
 
-      gegl_tile_backend_file_write (self, offset, (gchar*)self->in_holding, self->in_holding->length, NULL);
+      gegl_tile_backend_file_write (self, self->offset, (gchar*)block, length,
+                                    self->in_holding, OP_WRITE_BLOCK);
 
-      self->offset += self->in_holding->length;
       GEGL_NOTE (GEGL_DEBUG_TILE_BACKEND, "Wrote block: length:%i flags:%i next:%i at offset %i",
-                 self->in_holding->length,
-                 self->in_holding->flags,
-                 (gint)self->in_holding->next,
-                 offset);
-
-      g_assert (next_allocation == self->offset); /* true as long as
-                                                     the simple allocation
-                                                     scheme is used */
+                 length,
+                 block->flags,
+                 (gint)block->next,
+                 self->offset);
+
+      self->offset = next_allocation;
     }
   else
     {
       /* we're setting up for the first write */
-
       self->offset = self->next_pre_alloc; /* start writing header at end
                                             * of file, worry about writing
                                             * header inside free list later
                                             */
     }
-  self->in_holding = block;
+  self->in_holding = item;
 
   return TRUE;
 }
@@ -582,7 +658,7 @@ gegl_tile_backend_file_set_tile (GeglTileSource *self,
   gegl_tile_backend_file_write (tile_backend_file, entry->tile->offset,
                                 (gchar*)gegl_tile_get_data (tile),
                                 gegl_tile_backend_get_tile_size (backend),
-                                entry);
+                                entry, OP_WRITE);
 
   GEGL_NOTE (GEGL_DEBUG_TILE_BACKEND, "wrote entry %i, %i, %i at %i", entry->tile->x, entry->tile->y, entry->tile->z, (gint)entry->tile->offset);
 
@@ -665,7 +741,7 @@ gegl_tile_backend_file_flush (GeglTileSource *source,
         {
           GeglFileBackendEntry *item = iter->data;
 
-          gegl_tile_backend_file_write_block (self, &(item->tile->block));
+          gegl_tile_backend_file_write_block (self, item);
         }
       gegl_tile_backend_file_write_block (self, NULL); /* terminate the index */
       g_list_free (tiles);



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]