[gegl] gegl: add gegl-parallel



commit 4351962035ebd47a176285d1c4064afe60c63543
Author: Ell <ell_se yahoo com>
Date:   Sat Nov 10 15:30:07 2018 -0500

    gegl: add gegl-parallel
    
    gegl-parallel provides various parallel algorithms.  Currently,
    this is limited to the gegl_parallel_distribute() family of
    functions, which distribute work across multiple threads, migrated
    from GIMP.
    
    The following commits use these functions to replace the various
    thread-pools we use to auto-parallelize operations with simpler
    code, fixing potential dealocks as a result of nested operation
    processing, as in bug #790810, along the way.
    
    Since gegl-parallel is public API, it also eases manual
    parallelization of operations, inside and outside of GEGL.

 gegl/Makefile.am             |   3 +
 gegl/gegl-init.c             |   3 +
 gegl/gegl-parallel-private.h |  33 ++++
 gegl/gegl-parallel.c         | 418 +++++++++++++++++++++++++++++++++++++++++++
 gegl/gegl-parallel.h         | 191 ++++++++++++++++++++
 gegl/gegl.h                  |   1 +
 6 files changed, 649 insertions(+)
---
diff --git a/gegl/Makefile.am b/gegl/Makefile.am
index cf14f2416..c3dca9f2b 100644
--- a/gegl/Makefile.am
+++ b/gegl/Makefile.am
@@ -55,6 +55,7 @@ GEGL_introspectable_headers = \
        gegl-matrix.h                   \
        gegl-lookup.h                   \
        gegl-random.h                   \
+       gegl-parallel.h                 \
        gegl-init.h                     \
        gegl-version.h                  \
        buffer/gegl-buffer.h            \
@@ -100,6 +101,7 @@ GEGL_sources = \
        gegl-xml.c                      \
        gegl-gio.c                      \
        gegl-random.c                   \
+       gegl-parallel.c                 \
        gegl-serialize.c                \
        gegl-stats.c                    \
        gegl-matrix.c                   \
@@ -118,6 +120,7 @@ GEGL_sources = \
        gegl-op.h                           \
        gegl-plugin.h                   \
        gegl-random-private.h           \
+       gegl-parallel-private.h         \
        gegl-stats.h                    \
        gegl-gio-private.h              \
        gegl-types-internal.h           \
diff --git a/gegl/gegl-init.c b/gegl/gegl-init.c
index 39bb6c323..414ebeb1f 100644
--- a/gegl/gegl-init.c
+++ b/gegl/gegl-init.c
@@ -105,6 +105,7 @@ guint gegl_debug_flags = 0;
 #include "gegl-stats.h"
 #include "graph/gegl-node-private.h"
 #include "gegl-random-private.h"
+#include "gegl-parallel-private.h"
 
 static gboolean  gegl_post_parse_hook (GOptionContext *context,
                                        GOptionGroup   *group,
@@ -489,6 +490,7 @@ gegl_exit (void)
   gegl_operation_gtype_cleanup ();
   gegl_operation_handlers_cleanup ();
   gegl_random_cleanup ();
+  gegl_parallel_cleanup ();
   gegl_cl_cleanup ();
 
   gegl_temp_buffer_free ();
@@ -702,6 +704,7 @@ gegl_post_parse_hook (GOptionContext *context,
 
   GEGL_INSTRUMENT_START();
 
+  gegl_parallel_init ();
   gegl_operation_gtype_init ();
   gegl_tile_cache_init ();
 
diff --git a/gegl/gegl-parallel-private.h b/gegl/gegl-parallel-private.h
new file mode 100644
index 000000000..3c13ec85d
--- /dev/null
+++ b/gegl/gegl-parallel-private.h
@@ -0,0 +1,33 @@
+/* This file is part of GEGL.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with GEGL; if not, see <https://www.gnu.org/licenses/>.
+ *
+ * Copyright 2018 Ell
+ */
+
+#ifndef __GEGL_PARALLEL_PRIVATE_H__
+#define __GEGL_PARALLEL_PRIVATE_H__
+
+
+G_BEGIN_DECLS
+
+
+void   gegl_parallel_init    (void);
+void   gegl_parallel_cleanup (void);
+
+
+G_END_DECLS
+
+
+#endif /* __GEGL_PARALLEL_PRIVATE_H__ */
diff --git a/gegl/gegl-parallel.c b/gegl/gegl-parallel.c
new file mode 100644
index 000000000..38f564da3
--- /dev/null
+++ b/gegl/gegl-parallel.c
@@ -0,0 +1,418 @@
+/* This file is part of GEGL.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with GEGL; if not, see <https://www.gnu.org/licenses/>.
+ *
+ * Copyright 2018 Ell
+ */
+
+#include "config.h"
+
+#include <glib.h>
+
+#include "gegl.h"
+#include "gegl-config.h"
+#include "gegl-parallel.h"
+#include "gegl-parallel-private.h"
+
+
+#define GEGL_PARALLEL_DISTRIBUTE_MAX_THREADS GEGL_MAX_THREADS
+
+
+typedef struct
+{
+  GeglParallelDistributeFunc func;
+  gint                       n;
+  gpointer                   user_data;
+} GeglParallelDistributeTask;
+
+typedef struct
+{
+  GThread                    *thread;
+  GMutex                      mutex;
+  GCond                       cond;
+
+  gboolean                    quit;
+
+  GeglParallelDistributeTask *volatile task;
+  volatile gint               i;
+} GeglParallelDistributeThread;
+
+
+/*  local function prototypes  */
+
+static void                       gegl_parallel_notify_threads           (GeglConfig                   
*config);
+
+static void                       gegl_parallel_set_n_threads            (gint                          
n_threads,
+                                                                          gboolean                      
finish_tasks);
+
+static void                       gegl_parallel_distribute_set_n_threads (gint                          
n_threads);
+static gpointer                   gegl_parallel_distribute_thread_func   (GeglParallelDistributeThread 
*thread);
+
+
+/*  local variables  */
+
+static gint                         gegl_parallel_distribute_n_threads = 1;
+static GeglParallelDistributeThread gegl_parallel_distribute_threads[GEGL_PARALLEL_DISTRIBUTE_MAX_THREADS - 
1];
+
+static GMutex                       gegl_parallel_distribute_completion_mutex;
+static GCond                        gegl_parallel_distribute_completion_cond;
+static volatile gint                gegl_parallel_distribute_completion_counter;
+static volatile gint                gegl_parallel_distribute_busy;
+
+
+/*  public functions  */
+
+
+void
+gegl_parallel_init (void)
+{
+  g_signal_connect (gegl_config (), "notify::threads",
+                    G_CALLBACK (gegl_parallel_notify_threads),
+                    NULL);
+
+  gegl_parallel_notify_threads (gegl_config ());
+}
+
+void
+gegl_parallel_cleanup (void)
+{
+  g_signal_handlers_disconnect_by_func (gegl_config (),
+                                        gegl_parallel_notify_threads,
+                                        NULL);
+
+  /* stop all threads */
+  gegl_parallel_set_n_threads (0, /* finish_tasks = */ FALSE);
+}
+
+void
+gegl_parallel_distribute (gint                       max_n,
+                          GeglParallelDistributeFunc func,
+                          gpointer                   user_data)
+{
+  GeglParallelDistributeTask task;
+  gint                       i;
+
+  g_return_if_fail (func != NULL);
+
+  if (max_n == 0)
+    return;
+
+  if (max_n < 0)
+    max_n = gegl_parallel_distribute_n_threads;
+  else
+    max_n = MIN (max_n, gegl_parallel_distribute_n_threads);
+
+  if (max_n == 1 ||
+      ! g_atomic_int_compare_and_exchange (&gegl_parallel_distribute_busy,
+                                           0, 1))
+    {
+      func (0, 1, user_data);
+
+      return;
+    }
+
+  task.n         = max_n;
+  task.func      = func;
+  task.user_data = user_data;
+
+  g_atomic_int_set (&gegl_parallel_distribute_completion_counter, task.n - 1);
+
+  for (i = 0; i < task.n - 1; i++)
+    {
+      GeglParallelDistributeThread *thread =
+        &gegl_parallel_distribute_threads[i];
+
+      g_mutex_lock (&thread->mutex);
+
+      thread->task = &task;
+      thread->i    = i;
+
+      g_cond_signal (&thread->cond);
+
+      g_mutex_unlock (&thread->mutex);
+    }
+
+  func (i, task.n, user_data);
+
+  if (g_atomic_int_get (&gegl_parallel_distribute_completion_counter))
+    {
+      g_mutex_lock (&gegl_parallel_distribute_completion_mutex);
+
+      while (g_atomic_int_get (&gegl_parallel_distribute_completion_counter))
+        {
+          g_cond_wait (&gegl_parallel_distribute_completion_cond,
+                       &gegl_parallel_distribute_completion_mutex);
+        }
+
+      g_mutex_unlock (&gegl_parallel_distribute_completion_mutex);
+    }
+
+  g_atomic_int_set (&gegl_parallel_distribute_busy, 0);
+}
+
+typedef struct
+{
+  gsize                           size;
+  GeglParallelDistributeRangeFunc func;
+  gpointer                        user_data;
+} GeglParallelDistributeRangeData;
+
+static void
+gegl_parallel_distribute_range_func (gint                             i,
+                                     gint                             n,
+                                     GeglParallelDistributeRangeData *data)
+{
+  gsize offset;
+  gsize sub_size;
+
+  offset   = (2 * i       * data->size + n) / (2 * n);
+  sub_size = (2 * (i + 1) * data->size + n) / (2 * n) - offset;
+
+  data->func (offset, sub_size, data->user_data);
+}
+
+void
+gegl_parallel_distribute_range (gsize                           size,
+                                gsize                           min_sub_size,
+                                GeglParallelDistributeRangeFunc func,
+                                gpointer                        user_data)
+{
+  GeglParallelDistributeRangeData data;
+  gsize                           n = size;
+
+  g_return_if_fail (func != NULL);
+
+  if (size == 0)
+    return;
+
+  if (min_sub_size > 1)
+    n /= min_sub_size;
+
+  n = CLAMP (n, 1, gegl_parallel_distribute_n_threads);
+
+  data.size      = size;
+  data.func      = func;
+  data.user_data = user_data;
+
+  gegl_parallel_distribute (
+    n,
+    (GeglParallelDistributeFunc) gegl_parallel_distribute_range_func,
+    &data);
+}
+
+typedef struct
+{
+  const GeglRectangle            *area;
+  GeglSplitStrategy               split_strategy;
+  GeglParallelDistributeAreaFunc  func;
+  gpointer                        user_data;
+} GeglParallelDistributeAreaData;
+
+static void
+gegl_parallel_distribute_area_func (gint                            i,
+                                    gint                            n,
+                                    GeglParallelDistributeAreaData *data)
+{
+  GeglRectangle sub_area;
+
+  switch (data->split_strategy)
+    {
+    case GEGL_SPLIT_STRATEGY_HORIZONTAL:
+      sub_area.x       = data->area->x;
+      sub_area.width   = data->area->width;
+
+      sub_area.y       = (2 * i       * data->area->height + n) / (2 * n);
+      sub_area.height  = (2 * (i + 1) * data->area->height + n) / (2 * n);
+
+      sub_area.height -= sub_area.y;
+      sub_area.y      += data->area->y;
+
+      break;
+
+    case GEGL_SPLIT_STRATEGY_VERTICAL:
+      sub_area.y       = data->area->y;
+      sub_area.height  = data->area->height;
+
+      sub_area.x       = (2 * i       * data->area->width + n) / (2 * n);
+      sub_area.width   = (2 * (i + 1) * data->area->width + n) / (2 * n);
+
+      sub_area.width  -= sub_area.x;
+      sub_area.x      += data->area->x;
+
+      break;
+
+    default:
+      g_return_if_reached ();
+    }
+
+  data->func (&sub_area, data->user_data);
+}
+
+void
+gegl_parallel_distribute_area (const GeglRectangle            *area,
+                               gsize                           min_sub_area,
+                               GeglSplitStrategy               split_strategy,
+                               GeglParallelDistributeAreaFunc  func,
+                               gpointer                        user_data)
+{
+  GeglParallelDistributeAreaData data;
+  gsize                          n;
+
+  g_return_if_fail (area != NULL);
+  g_return_if_fail (func != NULL);
+
+  if (area->width <= 0 || area->height <= 0)
+    return;
+
+  if (split_strategy == GEGL_SPLIT_STRATEGY_AUTO)
+    {
+      if (area->width > area->height)
+        split_strategy = GEGL_SPLIT_STRATEGY_VERTICAL;
+      else
+        split_strategy = GEGL_SPLIT_STRATEGY_HORIZONTAL;
+    }
+
+  n = (gsize) area->width * (gsize) area->height;
+
+  if (min_sub_area > 1)
+    n /= min_sub_area;
+
+  n = CLAMP (n, 1, gegl_parallel_distribute_n_threads);
+
+  data.area           = area;
+  data.split_strategy = split_strategy;
+  data.func           = func;
+  data.user_data      = user_data;
+
+  gegl_parallel_distribute (
+    n,
+    (GeglParallelDistributeFunc) gegl_parallel_distribute_area_func,
+    &data);
+}
+
+
+/*  private functions  */
+
+
+static void
+gegl_parallel_notify_threads (GeglConfig *config)
+{
+  gint n_threads;
+
+  g_object_get (config,
+                "threads", &n_threads,
+                NULL);
+
+  gegl_parallel_set_n_threads (n_threads,
+                               /* finish_tasks = */ TRUE);
+}
+
+static void
+gegl_parallel_set_n_threads (gint     n_threads,
+                             gboolean finish_tasks)
+{
+  gegl_parallel_distribute_set_n_threads (n_threads);
+}
+
+static void
+gegl_parallel_distribute_set_n_threads (gint n_threads)
+{
+  gint i;
+
+  while (! g_atomic_int_compare_and_exchange (&gegl_parallel_distribute_busy,
+                                              0, 1));
+
+  n_threads = CLAMP (n_threads, 1, GEGL_PARALLEL_DISTRIBUTE_MAX_THREADS);
+
+  if (n_threads > gegl_parallel_distribute_n_threads) /* need more threads */
+    {
+      for (i = gegl_parallel_distribute_n_threads - 1; i < n_threads - 1; i++)
+        {
+          GeglParallelDistributeThread *thread =
+            &gegl_parallel_distribute_threads[i];
+
+          thread->quit = FALSE;
+          thread->task = NULL;
+
+          thread->thread = g_thread_new (
+            "worker",
+            (GThreadFunc) gegl_parallel_distribute_thread_func,
+            thread);
+        }
+    }
+  else if (n_threads < gegl_parallel_distribute_n_threads) /* need less threads */
+    {
+      for (i = n_threads - 1; i < gegl_parallel_distribute_n_threads - 1; i++)
+        {
+          GeglParallelDistributeThread *thread =
+            &gegl_parallel_distribute_threads[i];
+
+          g_mutex_lock (&thread->mutex);
+
+          thread->quit = TRUE;
+          g_cond_signal (&thread->cond);
+
+          g_mutex_unlock (&thread->mutex);
+        }
+
+      for (i = n_threads - 1; i < gegl_parallel_distribute_n_threads - 1; i++)
+        {
+          GeglParallelDistributeThread *thread =
+            &gegl_parallel_distribute_threads[i];
+
+          g_thread_join (thread->thread);
+        }
+    }
+
+  gegl_parallel_distribute_n_threads = n_threads;
+
+  g_atomic_int_set (&gegl_parallel_distribute_busy, 0);
+}
+
+static gpointer
+gegl_parallel_distribute_thread_func (GeglParallelDistributeThread *thread)
+{
+  g_mutex_lock (&thread->mutex);
+
+  while (TRUE)
+    {
+      if (thread->quit)
+        {
+          break;
+        }
+      else if (thread->task)
+        {
+          thread->task->func (thread->i, thread->task->n,
+                              thread->task->user_data);
+
+          if (g_atomic_int_dec_and_test (
+                &gegl_parallel_distribute_completion_counter))
+            {
+              g_mutex_lock (&gegl_parallel_distribute_completion_mutex);
+
+              g_cond_signal (&gegl_parallel_distribute_completion_cond);
+
+              g_mutex_unlock (&gegl_parallel_distribute_completion_mutex);
+            }
+
+          thread->task = NULL;
+        }
+
+      g_cond_wait (&thread->cond, &thread->mutex);
+    }
+
+  g_mutex_unlock (&thread->mutex);
+
+  return NULL;
+}
diff --git a/gegl/gegl-parallel.h b/gegl/gegl-parallel.h
new file mode 100644
index 000000000..4e8798331
--- /dev/null
+++ b/gegl/gegl-parallel.h
@@ -0,0 +1,191 @@
+/* This file is part of GEGL.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with GEGL; if not, see <https://www.gnu.org/licenses/>.
+ *
+ * Copyright 2018 Ell
+ */
+
+#ifndef __GEGL_PARALLEL_H__
+#define __GEGL_PARALLEL_H__
+
+
+G_BEGIN_DECLS
+
+
+/**
+ * GeglParallelDistributeFunc:
+ * @i: the current thread index, in the range [0,@n)
+ * @n: the number of threads execution is distributed across
+ * @user_data: user data pointer
+ *
+ * Specifies the type of function passed to gegl_parallel_distribute().
+ *
+ * The function should process the @i-th part of the data, out of @n
+ * equal parts.  @n may be less-than or equal-to the @max_n argument
+ * passed to gegl_parallel_distribute().
+ */
+typedef void (* GeglParallelDistributeFunc)      (gint                 i,
+                                                  gint                 n,
+                                                  gpointer             user_data);
+
+/**
+ * GeglParallelDistributeRangeFunc:
+ * @offset: the current data offset
+ * @size: the current data size
+ * @user_data: user data pointer
+ *
+ * Specifies the type of function passed to gegl_parallel_distribute_range().
+ *
+ * The function should process @size elements of the data, starting
+ * at @offset.  @size may be greater-than or equal-to the @min_sub_size
+ * argument passed to gegl_parallel_distribute_range().
+ */
+typedef void (* GeglParallelDistributeRangeFunc) (gsize                offset,
+                                                  gsize                size,
+                                                  gpointer             user_data);
+
+/**
+ * GeglParallelDistributeAreaFunc:
+ * @area: the current sub-region
+ * @user_data: user data pointer
+ *
+ * Specifies the type of function passed to gegl_parallel_distribute_area().
+ *
+ * The function should process the sub-region specified by @area, whose
+ * area may be greater-than or equal-to the @min_sub_area argument passed
+ * to gegl_parallel_distribute_area().
+ *
+ */
+typedef void (* GeglParallelDistributeAreaFunc)  (const GeglRectangle *area,
+                                                  gpointer             user_data);
+
+
+/**
+ * gegl_parallel_distribute:
+ * @max_n: the maximal number of threads to use
+ * @func: (closure user_data) (scope call): the function to call
+ * @user_data: user data to pass to the function
+ *
+ * Distributes the execution of a function across multiple threads,
+ * by calling it with a different index on each thread.
+ */
+void   gegl_parallel_distribute       (gint                             max_n,
+                                       GeglParallelDistributeFunc       func,
+                                       gpointer                         user_data);
+
+/**
+ * gegl_parallel_distribute_range:
+ * @size: the total size of the data
+ * @min_sub_size: the minimal data size to be processed by each thread
+ * @func: (closure user_data) (scope call): the function to call
+ * @user_data: user data to pass to the function
+ *
+ * Distributes the processing of a linear data-structure across
+ * multiple threads, by calling the given function with different
+ * sub-ranges on different threads.
+ */
+void   gegl_parallel_distribute_range (gsize                            size,
+                                       gsize                            min_sub_size,
+                                       GeglParallelDistributeRangeFunc  func,
+                                       gpointer                         user_data);
+
+/**
+ * gegl_parallel_distribute_area:
+ * @area: the region to process
+ * @min_sub_area: the minimal area to be processed by each thread
+ * @split_strategy: the strategy to use for dividing the region
+ * @func: (closure user_data) (scope call): the function to call
+ * @user_data: user data to pass to the function
+ *
+ * Distributes the processing of a planar data-structure across
+ * multiple threads, by calling the given function with different
+ * sub-regions on different threads.
+ */
+void   gegl_parallel_distribute_area  (const GeglRectangle             *area,
+                                       gsize                            min_sub_area,
+                                       GeglSplitStrategy                split_strategy,
+                                       GeglParallelDistributeAreaFunc   func,
+                                       gpointer                         user_data);
+
+
+#ifdef __cplusplus
+
+extern "C++"
+{
+
+template <class ParallelDistributeFunc>
+inline void
+gegl_parallel_distribute (gint                   max_n,
+                          ParallelDistributeFunc func)
+{
+  gegl_parallel_distribute (max_n,
+                            [] (gint     i,
+                                gint     n,
+                                gpointer user_data)
+                            {
+                              ParallelDistributeFunc func_copy (
+                                *(const ParallelDistributeFunc *) user_data);
+
+                              func_copy (i, n);
+                            },
+                            &func);
+}
+
+template <class ParallelDistributeRangeFunc>
+inline void
+gegl_parallel_distribute_range (gsize                       size,
+                                gsize                       min_sub_size,
+                                ParallelDistributeRangeFunc func)
+{
+  gegl_parallel_distribute_range (size, min_sub_size,
+                                  [] (gsize    offset,
+                                      gsize    size,
+                                      gpointer user_data)
+                                  {
+                                    ParallelDistributeRangeFunc func_copy (
+                                      *(const ParallelDistributeRangeFunc *) user_data);
+
+                                    func_copy (offset, size);
+                                  },
+                                  &func);
+}
+
+template <class ParallelDistributeAreaFunc>
+inline void
+gegl_parallel_distribute_area (const GeglRectangle        *area,
+                               gsize                       min_sub_area,
+                               GeglSplitStrategy           split_strategy,
+                               ParallelDistributeAreaFunc  func)
+{
+  gegl_parallel_distribute_area (area, min_sub_area, split_strategy,
+                                 [] (const GeglRectangle *area,
+                                     gpointer             user_data)
+                                 {
+                                   ParallelDistributeAreaFunc func_copy (
+                                     *(const ParallelDistributeAreaFunc *) user_data);
+
+                                   func_copy (area);
+                                 },
+                                 &func);
+}
+
+}
+
+#endif /* __cplusplus */
+
+
+G_END_DECLS
+
+
+#endif /* __GEGL_PARALLEL_H__ */
diff --git a/gegl/gegl.h b/gegl/gegl.h
index 1d2e34bbb..6bd3324d6 100644
--- a/gegl/gegl.h
+++ b/gegl/gegl.h
@@ -37,6 +37,7 @@
 #include <gegl-init.h>
 #include <gegl-version.h>
 #include <gegl-random.h>
+#include <gegl-parallel.h>
 #include <gegl-node.h>
 #include <gegl-processor.h>
 #include <gegl-apply.h>


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]