[gegl] buffer: improved and fixed a bug in the async file backend



commit f0340d69f227bb20383f202937ed2dd137c6bbbd
Author: Ville Sokk <ville sokk gmail com>
Date:   Mon Aug 13 12:09:50 2012 +0300

    buffer: improved and fixed a bug in the async file backend

 gegl/buffer/gegl-tile-backend-file-async.c |  135 ++++++++++++++++++----------
 gegl/buffer/gegl-tile-backend-file.h       |    2 +-
 2 files changed, 90 insertions(+), 47 deletions(-)
---
diff --git a/gegl/buffer/gegl-tile-backend-file-async.c b/gegl/buffer/gegl-tile-backend-file-async.c
index 061c051..b0f9b1b 100644
--- a/gegl/buffer/gegl-tile-backend-file-async.c
+++ b/gegl/buffer/gegl-tile-backend-file-async.c
@@ -151,19 +151,20 @@ static gint file_size      = 0;
 static gint peak_allocs    = 0;
 static gint peak_file_size = 0;
 
-static GQueue  queue      = G_QUEUE_INIT;
-static GMutex *mutex      = NULL;
-static GCond  *queue_cond = NULL;
-static GCond  *max_cond   = NULL;
+static GQueue  queue       = G_QUEUE_INIT;
+static GMutex *queue_mutex = NULL;
+static GCond  *queue_cond  = NULL;
+static GCond  *max_cond    = NULL;
+static GMutex *write_mutex = NULL;
 
 
 static void
 gegl_tile_backend_file_finish_writing (GeglTileBackendFile *self)
 {
-  g_mutex_lock (mutex);
+  g_mutex_lock (queue_mutex);
   while (self->pending_ops != 0)
-    g_cond_wait (self->cond, mutex);
-  g_mutex_unlock (mutex);
+    g_cond_wait (self->cond, queue_mutex);
+  g_mutex_unlock (queue_mutex);
 }
 
 static void
@@ -171,19 +172,24 @@ gegl_tile_backend_file_push_queue (GeglFileBackendThreadParams *params)
 {
   guint length;
 
-  g_mutex_lock (mutex);
+  g_mutex_lock (queue_mutex);
 
   length = g_queue_get_length (&queue);
 
+  /* block if the queue has gotten too big */
   if (length > gegl_config ()->queue_limit)
-    g_cond_wait (max_cond, mutex);
+    g_cond_wait (max_cond, queue_mutex);
 
   params->file->pending_ops += 1;
   g_queue_push_tail (&queue, params);
+
+  if (params->entry)
+    params->entry->link = g_queue_peek_tail_link (&queue);
+
   if (length == 0) /* wake up the writer thread */
     g_cond_signal (queue_cond);
 
-  g_mutex_unlock (mutex);
+  g_mutex_unlock (queue_mutex);
 }
 
 static inline void
@@ -231,13 +237,21 @@ gegl_tile_backend_file_writer_thread (gpointer ignored)
     {
       GeglFileBackendThreadParams *params;
 
-      g_mutex_lock (mutex);
+      g_mutex_lock (queue_mutex);
 
       while (g_queue_is_empty (&queue))
-        g_cond_wait (queue_cond, mutex);
+        g_cond_wait (queue_cond, queue_mutex);
+
+      g_mutex_lock (write_mutex);
+      if (g_queue_is_empty (&queue))
+        {
+          g_mutex_unlock (queue_mutex);
+          g_mutex_unlock (write_mutex);
+          continue;
+        }
 
       params = (GeglFileBackendThreadParams *)g_queue_peek_head (&queue);
-      g_mutex_unlock (mutex);
+      g_mutex_unlock (queue_mutex);
 
       switch (params->operation)
         {
@@ -252,25 +266,28 @@ gegl_tile_backend_file_writer_thread (gpointer ignored)
           break;
         }
 
-      g_mutex_lock (mutex);
+      g_mutex_lock (queue_mutex);
       g_queue_pop_head (&queue);
 
+      /* the file maybe waiting for its file operations to finish */
       params->file->pending_ops -= 1;
       if (params->file->pending_ops == 0)
         g_cond_signal (params->file->cond);
 
+      /* unblock the main thread if the queue had gotten too big */
       if (g_queue_get_length (&queue) < gegl_config ()->queue_limit)
         g_cond_signal (max_cond);
 
       if (params->entry)
-        params->entry->in_queue = FALSE;
+        params->entry->link = NULL;
 
       if (params->operation == OP_WRITE)
         g_free (params->source);
 
       g_free (params);
 
-      g_mutex_unlock (mutex);
+      g_mutex_unlock (queue_mutex);
+      g_mutex_unlock (write_mutex);
     }
 
   return NULL;
@@ -287,30 +304,17 @@ gegl_tile_backend_file_entry_read (GeglTileBackendFile  *self,
 
   gegl_tile_backend_file_ensure_exist (self);
 
-  if (entry->in_queue)
+  g_mutex_lock (queue_mutex);
+  if (entry->link)
     {
-      GList *link;
-
-      g_mutex_lock (mutex);
-
-      link = g_queue_peek_tail_link (&queue);
-
-      while (link)
-        {
-          GeglFileBackendThreadParams *queued_op =
-            (GeglFileBackendThreadParams *)link->data;
-
-          if (queued_op->file == self && queued_op->offset == offset)
-            {
-              memcpy (dest, queued_op->source, to_be_read);
-              g_mutex_unlock (mutex);
-              GEGL_NOTE (GEGL_DEBUG_TILE_BACKEND, "read entry %i,%i,%i from write queue", entry->tile->x, entry->tile->y, entry->tile->z);
-              return;
-            }
-          link = link->prev;
-        }
-      g_mutex_unlock (mutex);
+      memcpy (dest,
+              ((GeglFileBackendThreadParams *)entry->link->data)->source,
+              to_be_read);
+      g_mutex_unlock (queue_mutex);
+      GEGL_NOTE (GEGL_DEBUG_TILE_BACKEND, "read entry %i,%i,%i from write queue", entry->tile->x, entry->tile->y, entry->tile->z);
+      return;
     }
+  g_mutex_unlock (queue_mutex);
 
   if (self->in_offset != offset)
     {
@@ -347,7 +351,7 @@ gegl_tile_backend_file_entry_write (GeglTileBackendFile  *self,
                                     GeglFileBackendEntry *entry,
                                     guchar               *source)
 {
-  GeglFileBackendThreadParams *params = g_new0 (GeglFileBackendThreadParams, 1);
+  GeglFileBackendThreadParams *params;
   gint    length     = gegl_tile_backend_get_tile_size (GEGL_TILE_BACKEND (self));
   guchar *new_source = g_malloc (length);
 
@@ -355,6 +359,24 @@ gegl_tile_backend_file_entry_write (GeglTileBackendFile  *self,
 
   memcpy (new_source, source, length);
 
+  if (entry->link)
+    {
+      g_mutex_lock (write_mutex);
+
+      if (entry->link)
+        {
+          params = (GeglFileBackendThreadParams *)entry->link->data;
+
+          g_free (params->source);
+          params->source = new_source;
+          g_mutex_unlock (write_mutex);
+          return;
+        }
+
+      g_mutex_unlock (write_mutex);
+    }
+
+  params            = g_new0 (GeglFileBackendThreadParams, 1);
   params->operation = OP_WRITE;
   params->length    = length;
   params->offset    = entry->tile->offset;
@@ -362,8 +384,6 @@ gegl_tile_backend_file_entry_write (GeglTileBackendFile  *self,
   params->source    = new_source;
   params->entry     = entry;
 
-  entry->in_queue = TRUE;
-
   gegl_tile_backend_file_push_queue (params);
 
   GEGL_NOTE (GEGL_DEBUG_TILE_BACKEND, "pushed entry write %i,%i,%i at %i", entry->tile->x, entry->tile->y, entry->tile->z, (gint)entry->tile->offset);
@@ -376,8 +396,8 @@ gegl_tile_backend_file_file_entry_create (gint x,
 {
   GeglFileBackendEntry *entry = g_new0 (GeglFileBackendEntry, 1);
 
-  entry->tile     = gegl_tile_entry_new (x, y, z);
-  entry->in_queue = FALSE;
+  entry->tile = gegl_tile_entry_new (x, y, z);
+  entry->link = NULL;
 
   return entry;
 }
@@ -439,6 +459,28 @@ gegl_tile_backend_file_file_entry_destroy (GeglFileBackendEntry *entry,
 {
   /* XXX: EEEk, throwing away bits */
   guint offset = entry->tile->offset;
+
+  /* Write mutex is used to completely stop the writer thread from working,
+   * which is required to remove the entry from the queue. entry->link is
+   * checked twice because while waiting for the mutex the writer thread
+   * may have already removed it. If we would lock the mutex before the
+   * first check we would spend too much time since it's not that common to
+   * have the entry in queue.
+   */
+  if (entry->link)
+    {
+      g_mutex_lock (write_mutex);
+
+      if (entry->link)
+        {
+          g_free (((GeglFileBackendThreadParams *)entry->link->data)->source);
+          g_free (entry->link->data);
+          g_queue_delete_link (&queue, entry->link);
+        }
+
+      g_mutex_unlock (write_mutex);
+    }
+
   self->free_list = g_slist_prepend (self->free_list,
                                      GUINT_TO_POINTER (offset));
   g_hash_table_remove (self->index, entry);
@@ -1130,9 +1172,10 @@ gegl_tile_backend_file_class_init (GeglTileBackendFileClass *klass)
   gobject_class->constructor  = gegl_tile_backend_file_constructor;
   gobject_class->finalize     = gegl_tile_backend_file_finalize;
 
-  queue_cond = g_cond_new ();
-  max_cond   = g_cond_new ();
-  mutex      = g_mutex_new ();
+  queue_cond  = g_cond_new ();
+  max_cond    = g_cond_new ();
+  queue_mutex = g_mutex_new ();
+  write_mutex = g_mutex_new ();
   g_thread_create_full (gegl_tile_backend_file_writer_thread,
                         NULL, 0, TRUE, TRUE, G_THREAD_PRIORITY_NORMAL, NULL);
 
diff --git a/gegl/buffer/gegl-tile-backend-file.h b/gegl/buffer/gegl-tile-backend-file.h
index 1151889..d9348c6 100644
--- a/gegl/buffer/gegl-tile-backend-file.h
+++ b/gegl/buffer/gegl-tile-backend-file.h
@@ -49,7 +49,7 @@ typedef enum
 typedef struct
 {
   GeglBufferTile *tile;
-  gchar           in_queue;
+  GList          *link;
 } GeglFileBackendEntry;
 
 typedef struct



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]