[gimp] app: add gimp_parallel_run_async()



commit 1b646804ead5852dab3b1e12e20ace6aff438bb9
Author: Ell <ell_se yahoo com>
Date:   Fri May 11 11:43:06 2018 -0400

    app: add gimp_parallel_run_async()
    
    ... which runs a user-provided function asynchronously, returning a
    corresponding GimpAsync object.  This can be used to execute code
    off the main thread, using the GimpAsync object to synchronize as
    necessary.
    
    Note that while the code allows for running multiple asynchronous
    functions in parallel in a thread pool, we currently limit the pool
    to a single thread, queueing overlapping async function, since we
    have no use for parallel asynchronous functions at the moment.

 app/core/gimp-parallel.cc |  298 ++++++++++++++++++++++++++++++++++++---------
 app/core/gimp-parallel.h  |   55 +++++++--
 2 files changed, 282 insertions(+), 71 deletions(-)
---
diff --git a/app/core/gimp-parallel.cc b/app/core/gimp-parallel.cc
index 82a9c86..29f5678 100644
--- a/app/core/gimp-parallel.cc
+++ b/app/core/gimp-parallel.cc
@@ -33,48 +33,79 @@ extern "C"
 
 #include "gimp.h"
 #include "gimp-parallel.h"
+#include "gimpasync.h"
 
 
-#define GIMP_PARALLEL_MAX_THREADS 64
+#define GIMP_PARALLEL_MAX_THREADS            64
+#define GIMP_PARALLEL_RUN_ASYNC_MAX_THREADS  1
+#define GIMP_PARALLEL_DISTRIBUTE_MAX_THREADS GIMP_PARALLEL_MAX_THREADS
 
 
 typedef struct
 {
+  GimpAsync                *async;
+  GimpParallelRunAsyncFunc  func;
+  gpointer                  user_data;
+} GimpParallelRunAsyncTask;
+
+typedef struct
+{
+  GThread  *thread;
+
+  gboolean  quit;
+} GimpParallelRunAsyncThread;
+
+typedef struct
+{
   GimpParallelDistributeFunc func;
   gint                       n;
   gpointer                   user_data;
-} GimpParallelTask;
+} GimpParallelDistributeTask;
 
 typedef struct
 {
-  GThread          *thread;
-  GMutex            mutex;
-  GCond             cond;
+  GThread                    *thread;
+  GMutex                      mutex;
+  GCond                       cond;
 
-  gboolean          quit;
+  gboolean                    quit;
 
-  GimpParallelTask *volatile task;
-  volatile gint     i;
-} GimpParallelThread;
+  GimpParallelDistributeTask *volatile task;
+  volatile gint               i;
+} GimpParallelDistributeThread;
 
 
 /*  local function prototypes  */
 
-static void       gimp_parallel_notify_num_processors (GimpGeglConfig     *config);
+static void       gimp_parallel_notify_num_processors    (GimpGeglConfig               *config);
+
+static void       gimp_parallel_set_n_threads            (gint                          n_threads);
+
+static void       gimp_parallel_run_async_set_n_threads  (gint                          n_threads);
+static gpointer   gimp_parallel_run_async_thread_func    (GimpParallelRunAsyncThread   *thread);
+static void       gimp_parallel_run_async_execute_task   (GimpParallelRunAsyncTask     *task);
 
-static void       gimp_parallel_set_n_threads         (gint                n_threads);
-static gpointer   gimp_parallel_thread_func           (GimpParallelThread *thread);
+static void       gimp_parallel_distribute_set_n_threads (gint                          n_threads);
+static gpointer   gimp_parallel_distribute_thread_func   (GimpParallelDistributeThread *thread);
 
 
 /*  local variables  */
 
-static gint               gimp_parallel_n_threads = 1;
-static GimpParallelThread gimp_parallel_threads[GIMP_PARALLEL_MAX_THREADS];
+static gint                         gimp_parallel_run_async_n_threads = 0;
+static GimpParallelRunAsyncThread   gimp_parallel_run_async_threads[GIMP_PARALLEL_RUN_ASYNC_MAX_THREADS];
 
-static GMutex             gimp_parallel_completion_mutex;
-static GCond              gimp_parallel_completion_cond;
-static volatile gint      gimp_parallel_completion_counter;
-static volatile gint      gimp_parallel_busy;
+static GMutex                       gimp_parallel_run_async_mutex;
+static GCond                        gimp_parallel_run_async_cond;
+static GQueue                       gimp_parallel_run_async_queue = G_QUEUE_INIT;
+static gboolean                     gimp_parallel_run_async_quit;
+
+static gint                         gimp_parallel_distribute_n_threads = 1;
+static GimpParallelDistributeThread gimp_parallel_distribute_threads[GIMP_PARALLEL_DISTRIBUTE_MAX_THREADS - 
1];
+
+static GMutex                       gimp_parallel_distribute_completion_mutex;
+static GCond                        gimp_parallel_distribute_completion_cond;
+static volatile gint                gimp_parallel_distribute_completion_counter;
+static volatile gint                gimp_parallel_distribute_busy;
 
 
 /*  public functions  */
@@ -99,6 +130,8 @@ gimp_parallel_init (Gimp *gimp)
 void
 gimp_parallel_exit (Gimp *gimp)
 {
+  GimpParallelRunAsyncTask *task;
+
   g_return_if_fail (GIMP_IS_GIMP (gimp));
 
   g_signal_handlers_disconnect_by_func (gimp->config,
@@ -106,7 +139,45 @@ gimp_parallel_exit (Gimp *gimp)
                                         NULL);
 
   /* stop all threads */
-  gimp_parallel_set_n_threads (1);
+  gimp_parallel_set_n_threads (0);
+
+  /* finish remaining tasks */
+  while ((task = (GimpParallelRunAsyncTask *) g_queue_pop_head (&gimp_parallel_run_async_queue)))
+    gimp_parallel_run_async_execute_task (task);
+}
+
+GimpAsync *
+gimp_parallel_run_async (GimpParallelRunAsyncFunc func,
+                         gpointer                 user_data)
+{
+  GimpAsync                *async;
+  GimpParallelRunAsyncTask *task;
+
+  g_return_val_if_fail (func != NULL, NULL);
+
+  async = gimp_async_new ();
+
+  task = g_slice_new (GimpParallelRunAsyncTask);
+
+  task->async     = GIMP_ASYNC (g_object_ref (async));
+  task->func      = func;
+  task->user_data = user_data;
+
+  if (gimp_parallel_run_async_n_threads > 0)
+    {
+      g_mutex_lock (&gimp_parallel_run_async_mutex);
+
+      g_queue_push_tail (&gimp_parallel_run_async_queue, task);
+      g_cond_signal (&gimp_parallel_run_async_cond);
+
+      g_mutex_unlock (&gimp_parallel_run_async_mutex);
+    }
+  else
+    {
+      gimp_parallel_run_async_execute_task (task);
+    }
+
+  return async;
 }
 
 void
@@ -114,8 +185,8 @@ gimp_parallel_distribute (gint                       max_n,
                           GimpParallelDistributeFunc func,
                           gpointer                   user_data)
 {
-  GimpParallelTask task;
-  gint             i;
+  GimpParallelDistributeTask task;
+  gint                       i;
 
   g_return_if_fail (func != NULL);
 
@@ -123,12 +194,13 @@ gimp_parallel_distribute (gint                       max_n,
     return;
 
   if (max_n < 0)
-    max_n = gimp_parallel_n_threads;
+    max_n = gimp_parallel_distribute_n_threads;
   else
-    max_n = MIN (max_n, gimp_parallel_n_threads);
+    max_n = MIN (max_n, gimp_parallel_distribute_n_threads);
 
   if (max_n == 1 ||
-      ! g_atomic_int_compare_and_exchange (&gimp_parallel_busy, 0, 1))
+      ! g_atomic_int_compare_and_exchange (&gimp_parallel_distribute_busy,
+                                           0, 1))
     {
       func (0, 1, user_data);
 
@@ -139,11 +211,12 @@ gimp_parallel_distribute (gint                       max_n,
   task.func      = func;
   task.user_data = user_data;
 
-  g_atomic_int_set (&gimp_parallel_completion_counter, task.n - 1);
+  g_atomic_int_set (&gimp_parallel_distribute_completion_counter, task.n - 1);
 
   for (i = 0; i < task.n - 1; i++)
     {
-      GimpParallelThread *thread = &gimp_parallel_threads[i];
+      GimpParallelDistributeThread *thread =
+        &gimp_parallel_distribute_threads[i];
 
       g_mutex_lock (&thread->mutex);
 
@@ -157,20 +230,20 @@ gimp_parallel_distribute (gint                       max_n,
 
   func (i, task.n, user_data);
 
-  if (g_atomic_int_get (&gimp_parallel_completion_counter))
+  if (g_atomic_int_get (&gimp_parallel_distribute_completion_counter))
     {
-      g_mutex_lock (&gimp_parallel_completion_mutex);
+      g_mutex_lock (&gimp_parallel_distribute_completion_mutex);
 
-      while (g_atomic_int_get (&gimp_parallel_completion_counter))
+      while (g_atomic_int_get (&gimp_parallel_distribute_completion_counter))
         {
-          g_cond_wait (&gimp_parallel_completion_cond,
-                       &gimp_parallel_completion_mutex);
+          g_cond_wait (&gimp_parallel_distribute_completion_cond,
+                       &gimp_parallel_distribute_completion_mutex);
         }
 
-      g_mutex_unlock (&gimp_parallel_completion_mutex);
+      g_mutex_unlock (&gimp_parallel_distribute_completion_mutex);
     }
 
-  g_atomic_int_set (&gimp_parallel_busy, 0);
+  g_atomic_int_set (&gimp_parallel_distribute_busy, 0);
 }
 
 void
@@ -189,7 +262,7 @@ gimp_parallel_distribute_range (gsize                           size,
   if (min_sub_size > 1)
     n /= min_sub_size;
 
-  n = CLAMP (n, 1, gimp_parallel_n_threads);
+  n = CLAMP (n, 1, gimp_parallel_distribute_n_threads);
 
   gimp_parallel_distribute (n, [=] (gint i, gint n)
     {
@@ -222,7 +295,7 @@ gimp_parallel_distribute_area (const GeglRectangle            *area,
   if (min_sub_area > 1)
     n /= min_sub_area;
 
-  n = CLAMP (n, 1, gimp_parallel_n_threads);
+  n = CLAMP (n, 1, gimp_parallel_distribute_n_threads);
 
   gimp_parallel_distribute (n, [=] (gint i, gint n)
     {
@@ -268,32 +341,139 @@ gimp_parallel_notify_num_processors (GimpGeglConfig *config)
 static void
 gimp_parallel_set_n_threads (gint n_threads)
 {
+  gimp_parallel_run_async_set_n_threads (n_threads);
+  gimp_parallel_distribute_set_n_threads (n_threads);
+}
+
+static void
+gimp_parallel_run_async_set_n_threads (gint n_threads)
+{
   gint i;
 
-  if (! g_atomic_int_compare_and_exchange (&gimp_parallel_busy, 0, 1))
-    g_return_if_reached ();
+  n_threads = CLAMP (n_threads, 0, GIMP_PARALLEL_RUN_ASYNC_MAX_THREADS);
+
+  if (n_threads > gimp_parallel_run_async_n_threads) /* need more threads */
+    {
+      for (i = gimp_parallel_run_async_n_threads; i < n_threads; i++)
+        {
+          GimpParallelRunAsyncThread *thread =
+            &gimp_parallel_run_async_threads[i];
 
-  n_threads = CLAMP (n_threads, 1, GIMP_PARALLEL_MAX_THREADS + 1);
+          thread->quit = FALSE;
 
-  if (n_threads > gimp_parallel_n_threads) /* need more threads */
+          thread->thread = g_thread_new (
+            "async",
+            (GThreadFunc) gimp_parallel_run_async_thread_func,
+            thread);
+        }
+    }
+  else if (n_threads < gimp_parallel_run_async_n_threads) /* need less threads */
     {
-      for (i = gimp_parallel_n_threads - 1; i < n_threads - 1; i++)
+      g_mutex_lock (&gimp_parallel_run_async_mutex);
+
+      for (i = n_threads; i < gimp_parallel_run_async_n_threads; i++)
+        {
+          GimpParallelRunAsyncThread *thread =
+            &gimp_parallel_run_async_threads[i];
+
+          thread->quit = TRUE;
+        }
+
+      g_cond_broadcast (&gimp_parallel_run_async_cond);
+
+      g_mutex_unlock (&gimp_parallel_run_async_mutex);
+
+      for (i = n_threads; i < gimp_parallel_run_async_n_threads; i++)
         {
-          GimpParallelThread *thread = &gimp_parallel_threads[i];
+          GimpParallelRunAsyncThread *thread =
+            &gimp_parallel_run_async_threads[i];
+
+          g_thread_join (thread->thread);
+        }
+    }
+
+  gimp_parallel_run_async_n_threads = n_threads;
+}
+static gpointer
+gimp_parallel_run_async_thread_func (GimpParallelRunAsyncThread *thread)
+{
+  g_mutex_lock (&gimp_parallel_run_async_mutex);
+
+  while (TRUE)
+    {
+      GimpParallelRunAsyncTask *task;
+
+      if (gimp_parallel_run_async_quit)
+        {
+          break;
+        }
+
+      while ((task = (GimpParallelRunAsyncTask *) g_queue_pop_head (&gimp_parallel_run_async_queue)))
+        {
+          g_mutex_unlock (&gimp_parallel_run_async_mutex);
+
+          gimp_parallel_run_async_execute_task (task);
+
+          g_mutex_lock (&gimp_parallel_run_async_mutex);
+        }
+
+      g_cond_wait (&gimp_parallel_run_async_cond,
+                   &gimp_parallel_run_async_mutex);
+    }
+
+  g_mutex_unlock (&gimp_parallel_run_async_mutex);
+
+  return NULL;
+}
+
+static void
+gimp_parallel_run_async_execute_task (GimpParallelRunAsyncTask *task)
+{
+  task->func (task->async, task->user_data);
+
+  if (! gimp_async_is_stopped (task->async))
+    gimp_async_abort (task->async);
+
+  g_object_unref (task->async);
+
+  g_slice_free (GimpParallelRunAsyncTask, task);
+}
+
+static void
+gimp_parallel_distribute_set_n_threads (gint n_threads)
+{
+  gint i;
+
+  if (! g_atomic_int_compare_and_exchange (&gimp_parallel_distribute_busy,
+                                           0, 1))
+    {
+      g_return_if_reached ();
+    }
+
+  n_threads = CLAMP (n_threads, 1, GIMP_PARALLEL_DISTRIBUTE_MAX_THREADS);
+
+  if (n_threads > gimp_parallel_distribute_n_threads) /* need more threads */
+    {
+      for (i = gimp_parallel_distribute_n_threads - 1; i < n_threads - 1; i++)
+        {
+          GimpParallelDistributeThread *thread =
+            &gimp_parallel_distribute_threads[i];
 
           thread->quit = FALSE;
           thread->task = NULL;
 
-          thread->thread = g_thread_new ("worker",
-                                         (GThreadFunc) gimp_parallel_thread_func,
-                                         thread);
+          thread->thread = g_thread_new (
+            "worker",
+            (GThreadFunc) gimp_parallel_distribute_thread_func,
+            thread);
         }
     }
-  else if (n_threads < gimp_parallel_n_threads) /* need less threads */
+  else if (n_threads < gimp_parallel_distribute_n_threads) /* need less threads */
     {
-      for (i = n_threads - 1; i < gimp_parallel_n_threads - 1; i++)
+      for (i = n_threads - 1; i < gimp_parallel_distribute_n_threads - 1; i++)
         {
-          GimpParallelThread *thread = &gimp_parallel_threads[i];
+          GimpParallelDistributeThread *thread =
+            &gimp_parallel_distribute_threads[i];
 
           g_mutex_lock (&thread->mutex);
 
@@ -303,28 +483,27 @@ gimp_parallel_set_n_threads (gint n_threads)
           g_mutex_unlock (&thread->mutex);
         }
 
-      for (i = n_threads - 1; i < gimp_parallel_n_threads - 1; i++)
+      for (i = n_threads - 1; i < gimp_parallel_distribute_n_threads - 1; i++)
         {
-          GimpParallelThread *thread = &gimp_parallel_threads[i];
+          GimpParallelDistributeThread *thread =
+            &gimp_parallel_distribute_threads[i];
 
           g_thread_join (thread->thread);
         }
     }
 
-  gimp_parallel_n_threads = n_threads;
+  gimp_parallel_distribute_n_threads = n_threads;
 
-  g_atomic_int_set (&gimp_parallel_busy, 0);
+  g_atomic_int_set (&gimp_parallel_distribute_busy, 0);
 }
 
 static gpointer
-gimp_parallel_thread_func (GimpParallelThread *thread)
+gimp_parallel_distribute_thread_func (GimpParallelDistributeThread *thread)
 {
   g_mutex_lock (&thread->mutex);
 
   while (TRUE)
     {
-      g_cond_wait (&thread->cond, &thread->mutex);
-
       if (thread->quit)
         {
           break;
@@ -334,17 +513,20 @@ gimp_parallel_thread_func (GimpParallelThread *thread)
           thread->task->func (thread->i, thread->task->n,
                               thread->task->user_data);
 
-          if (g_atomic_int_dec_and_test (&gimp_parallel_completion_counter))
+          if (g_atomic_int_dec_and_test (
+                &gimp_parallel_distribute_completion_counter))
             {
-              g_mutex_lock (&gimp_parallel_completion_mutex);
+              g_mutex_lock (&gimp_parallel_distribute_completion_mutex);
 
-              g_cond_signal (&gimp_parallel_completion_cond);
+              g_cond_signal (&gimp_parallel_distribute_completion_cond);
 
-              g_mutex_unlock (&gimp_parallel_completion_mutex);
+              g_mutex_unlock (&gimp_parallel_distribute_completion_mutex);
             }
 
           thread->task = NULL;
         }
+
+      g_cond_wait (&thread->cond, &thread->mutex);
     }
 
   g_mutex_unlock (&thread->mutex);
diff --git a/app/core/gimp-parallel.h b/app/core/gimp-parallel.h
index 3c93526..55a8584 100644
--- a/app/core/gimp-parallel.h
+++ b/app/core/gimp-parallel.h
@@ -22,6 +22,9 @@
 #define __GIMP_PARALLEL_H__
 
 
+typedef void (* GimpParallelRunAsyncFunc)        (GimpAsync            *async,
+                                                  gpointer              user_data);
+
 typedef void (* GimpParallelDistributeFunc)      (gint                 i,
                                                   gint                 n,
                                                   gpointer             user_data);
@@ -32,26 +35,52 @@ typedef void (* GimpParallelDistributeAreaFunc)  (const GeglRectangle *area,
                                                   gpointer             user_data);
 
 
-void   gimp_parallel_init             (Gimp                            *gimp);
-void   gimp_parallel_exit             (Gimp                            *gimp);
+void        gimp_parallel_init             (Gimp                            *gimp);
+void        gimp_parallel_exit             (Gimp                            *gimp);
+
+GimpAsync * gimp_parallel_run_async        (GimpParallelRunAsyncFunc         func,
+                                            gpointer                         user_data);
 
-void   gimp_parallel_distribute       (gint                             max_n,
-                                       GimpParallelDistributeFunc       func,
-                                       gpointer                         user_data);
-void   gimp_parallel_distribute_range (gsize                            size,
-                                       gsize                            min_sub_size,
-                                       GimpParallelDistributeRangeFunc  func,
-                                       gpointer                         user_data);
-void   gimp_parallel_distribute_area  (const GeglRectangle             *area,
-                                       gsize                            min_sub_area,
-                                       GimpParallelDistributeAreaFunc   func,
-                                       gpointer                         user_data);
+void        gimp_parallel_distribute       (gint                             max_n,
+                                            GimpParallelDistributeFunc       func,
+                                            gpointer                         user_data);
+void        gimp_parallel_distribute_range (gsize                            size,
+                                            gsize                            min_sub_size,
+                                            GimpParallelDistributeRangeFunc  func,
+                                            gpointer                         user_data);
+void        gimp_parallel_distribute_area  (const GeglRectangle             *area,
+                                            gsize                            min_sub_area,
+                                            GimpParallelDistributeAreaFunc   func,
+                                            gpointer                         user_data);
 
 #ifdef __cplusplus
 
 extern "C++"
 {
 
+#include <new>
+
+template <class ParallelRunAsyncFunc>
+inline GimpAsync *
+gimp_parallel_run_async (ParallelRunAsyncFunc func)
+{
+  ParallelRunAsyncFunc *func_copy = g_new (ParallelRunAsyncFunc, 1);
+
+  new (func_copy) ParallelRunAsyncFunc (func);
+
+  return gimp_parallel_run_async ([] (GimpAsync *async,
+                                      gpointer   user_data)
+                                  {
+                                    ParallelRunAsyncFunc *func_copy =
+                                      (ParallelRunAsyncFunc *) user_data;
+
+                                    (*func_copy) (async);
+
+                                    func_copy->~ParallelRunAsyncFunc ();
+                                    g_free (func_copy);
+                                  }, func_copy);
+}
+
 template <class ParallelDistributeFunc>
 inline void
 gimp_parallel_distribute (gint                   max_n,


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]