[gegl] operation: add threading capabilities to base classes



commit 65da6c34be79084c5b9a17d37425e3e975cad5e4
Author: Øyvind Kolås <pippin gimp org>
Date:   Mon Jun 30 21:20:38 2014 +0200

    operation: add threading capabilities to base classes
    
    The point of parallelization is during each ops process() at the stage when
    input and output buffers have been rigged up. This also means that all pixels
    required to be valid in the various input buffers are valid at this point.

 gegl/gegl-config.h                              |    2 +
 gegl/graph/gegl-node-private.h                  |    1 -
 gegl/operation/gegl-operation-composer.c        |  102 ++++++++++++-
 gegl/operation/gegl-operation-composer3.c       |  195 +++++++++++++++++------
 gegl/operation/gegl-operation-filter.c          |  105 ++++++++++++-
 gegl/operation/gegl-operation-point-composer3.c |    2 +-
 gegl/operation/gegl-operation-point-render.c    |    1 +
 gegl/operation/gegl-operation-source.c          |  103 ++++++++++++-
 gegl/operation/gegl-operation.c                 |   25 +++
 9 files changed, 481 insertions(+), 55 deletions(-)
---
diff --git a/gegl/gegl-config.h b/gegl/gegl-config.h
index f5330d0..6c17e84 100644
--- a/gegl/gegl-config.h
+++ b/gegl/gegl-config.h
@@ -54,6 +54,8 @@ struct _GeglConfigClass
 
 gint gegl_config_threads (void);
 
+#define GEGL_MAX_THREADS 16
+
 G_END_DECLS
 
 #endif
diff --git a/gegl/graph/gegl-node-private.h b/gegl/graph/gegl-node-private.h
index 446a0bb..222d141 100644
--- a/gegl/graph/gegl-node-private.h
+++ b/gegl/graph/gegl-node-private.h
@@ -131,7 +131,6 @@ void
 gegl_node_emit_computed (GeglNode *node,
                          const GeglRectangle *rect);
 
-#define GEGL_MAX_THREADS 16
 
 G_END_DECLS
 
diff --git a/gegl/operation/gegl-operation-composer.c b/gegl/operation/gegl-operation-composer.c
index 66c0353..e9e46b9 100644
--- a/gegl/operation/gegl-operation-composer.c
+++ b/gegl/operation/gegl-operation-composer.c
@@ -24,6 +24,7 @@
 #include "gegl.h"
 #include "gegl-operation-composer.h"
 #include "gegl-operation-context.h"
+#include "gegl-config.h"
 
 static gboolean gegl_operation_composer_process (GeglOperation       *operation,
                               GeglOperationContext     *context,
@@ -49,6 +50,7 @@ gegl_operation_composer_class_init (GeglOperationComposerClass * klass)
 {
   GeglOperationClass *operation_class = GEGL_OPERATION_CLASS (klass);
 
+  operation_class->threaded = TRUE;
   operation_class->process = gegl_operation_composer_process;
   operation_class->attach = attach;
   operation_class->detect = detect;
@@ -95,6 +97,48 @@ attach (GeglOperation *self)
   g_param_spec_sink (pspec);
 }
 
+typedef struct ThreadData
+{
+  GeglOperationComposerClass *klass;
+  GeglOperation              *operation;
+  GeglBuffer                 *input;
+  GeglBuffer                 *aux;
+  GeglBuffer                 *output;
+  gint                       *pending;
+  gint                        level;
+  gboolean                    success;
+  GeglRectangle               roi;
+} ThreadData;
+
+static GMutex pool_mutex = {0,};
+static GCond  pool_cond  = {0,};
+
+static void thread_process (gpointer thread_data, gpointer unused)
+{
+  ThreadData *data = thread_data;
+  if (!data->klass->process (data->operation,
+                       data->input, data->aux, data->output, &data->roi, data->level))
+    data->success = FALSE;
+  g_atomic_int_add (data->pending, -1);
+  if (*data->pending == 0)
+  {
+    g_mutex_lock (&pool_mutex);
+    g_cond_signal (&pool_cond);
+    g_mutex_unlock (&pool_mutex);
+  }
+}
+
+static GThreadPool *thread_pool (void)
+{
+  static GThreadPool *pool = NULL;
+  if (!pool)
+    {
+      pool =  g_thread_pool_new (thread_process, NULL, gegl_config()->threads,
+                                 FALSE, NULL);
+    }
+  return pool;
+}
+
 static gboolean
 gegl_operation_composer_process (GeglOperation        *operation,
                                  GeglOperationContext *context,
@@ -127,10 +171,64 @@ gegl_operation_composer_process (GeglOperation        *operation,
   if (input != NULL ||
       aux != NULL)
     {
-      if (result->width == 0 || result->height == 0)
-        success = TRUE;
+      if (gegl_operation_use_threading (operation, result))
+      {
+        gint threads = gegl_config ()->threads;
+        GThreadPool *pool = thread_pool ();
+        ThreadData thread_data[GEGL_MAX_THREADS];
+        gint pending = threads;
+
+        if (result->width > result->height)
+        {
+          gint bit = result->width / threads;
+          for (gint j = 0; j < threads; j++)
+          {
+            thread_data[j].roi.y = result->y;
+            thread_data[j].roi.height = result->height;
+            thread_data[j].roi.x = result->x + bit * j;
+            thread_data[j].roi.width = bit;
+          }
+          thread_data[threads-1].roi.width = result->width - (bit * (threads-1));
+        }
+        else
+        {
+          gint bit = result->height / threads;
+          for (gint j = 0; j < threads; j++)
+          {
+            thread_data[j].roi.x = result->x;
+            thread_data[j].roi.width = result->width;
+            thread_data[j].roi.y = result->y + bit * j;
+            thread_data[j].roi.height = bit;
+          }
+          thread_data[threads-1].roi.height = result->height - (bit * (threads-1));
+        }
+        for (gint i = 0; i < threads; i++)
+        {
+          thread_data[i].klass = klass;
+          thread_data[i].operation = operation;
+          thread_data[i].input = input;
+          thread_data[i].aux = aux;
+          thread_data[i].output = output;
+          thread_data[i].pending = &pending;
+          thread_data[i].level = level;
+          thread_data[i].success = TRUE;
+        }
+
+        g_mutex_lock (&pool_mutex);
+
+        for (gint i = 0; i < threads; i++)
+          g_thread_pool_push (pool, &thread_data[i], NULL);
+
+        while (pending != 0)
+          g_cond_wait (&pool_cond, &pool_mutex);
+
+        g_mutex_unlock (&pool_mutex);
+        success = thread_data[0].success;
+      }
       else
+      {
         success = klass->process (operation, input, aux, output, result, level);
+      }
 
       if (input)
         g_object_unref (input);
diff --git a/gegl/operation/gegl-operation-composer3.c b/gegl/operation/gegl-operation-composer3.c
index a7417e1..4b1429f 100644
--- a/gegl/operation/gegl-operation-composer3.c
+++ b/gegl/operation/gegl-operation-composer3.c
@@ -24,28 +24,29 @@
 #include "gegl.h"
 #include "gegl-operation-composer3.h"
 #include "gegl-operation-context.h"
+#include "gegl-config.h"
 
 static gboolean gegl_operation_composer3_process
-                             (GeglOperation        *operation,
-                              GeglOperationContext *context,
-                              const gchar          *output_prop,
-                              const GeglRectangle  *result,
-                              gint                  level);
+(GeglOperation        *operation,
+ GeglOperationContext *context,
+ const gchar          *output_prop,
+ const GeglRectangle  *result,
+ gint                  level);
 static void     attach       (GeglOperation        *operation);
 static GeglNode*detect       (GeglOperation        *operation,
-                              gint                  x,
-                              gint                  y);
+    gint                  x,
+    gint                  y);
 
 static GeglRectangle get_bounding_box        (GeglOperation        *self);
 static GeglRectangle get_required_for_output (GeglOperation        *self,
-                                               const gchar         *input_pad,
-                                               const GeglRectangle *roi);
+    const gchar         *input_pad,
+    const GeglRectangle *roi);
 
 G_DEFINE_TYPE (GeglOperationComposer3, gegl_operation_composer3,
-               GEGL_TYPE_OPERATION)
+    GEGL_TYPE_OPERATION)
 
 
-static void
+  static void
 gegl_operation_composer3_class_init (GeglOperationComposer3Class * klass)
 {
   GeglOperationClass *operation_class = GEGL_OPERATION_CLASS (klass);
@@ -57,60 +58,105 @@ gegl_operation_composer3_class_init (GeglOperationComposer3Class * klass)
   operation_class->get_required_for_output = get_required_for_output;
 }
 
-static void
+  static void
 gegl_operation_composer3_init (GeglOperationComposer3 *self)
 {
 }
 
-static void
+  static void
 attach (GeglOperation *self)
 {
   GeglOperation *operation = GEGL_OPERATION (self);
   GParamSpec    *pspec;
 
   pspec = g_param_spec_object ("output",
-                               "Output",
-                               "Output pad for generated image buffer.",
-                               GEGL_TYPE_BUFFER,
-                               G_PARAM_READABLE |
-                               GEGL_PARAM_PAD_OUTPUT);
+      "Output",
+      "Output pad for generated image buffer.",
+      GEGL_TYPE_BUFFER,
+      G_PARAM_READABLE |
+      GEGL_PARAM_PAD_OUTPUT);
   gegl_operation_create_pad (operation, pspec);
   g_param_spec_sink (pspec);
 
   pspec = g_param_spec_object ("input",
-                               "Input",
-                               "Input pad, for image buffer input.",
-                               GEGL_TYPE_BUFFER,
-                               G_PARAM_READWRITE |
-                               GEGL_PARAM_PAD_INPUT);
+      "Input",
+      "Input pad, for image buffer input.",
+      GEGL_TYPE_BUFFER,
+      G_PARAM_READWRITE |
+      GEGL_PARAM_PAD_INPUT);
   gegl_operation_create_pad (operation, pspec);
   g_param_spec_sink (pspec);
 
   pspec = g_param_spec_object ("aux",
-                               "Aux",
-                               "Auxiliary image buffer input pad.",
-                               GEGL_TYPE_BUFFER,
-                               G_PARAM_READWRITE |
-                               GEGL_PARAM_PAD_INPUT);
+      "Aux",
+      "Auxiliary image buffer input pad.",
+      GEGL_TYPE_BUFFER,
+      G_PARAM_READWRITE |
+      GEGL_PARAM_PAD_INPUT);
   gegl_operation_create_pad (operation, pspec);
   g_param_spec_sink (pspec);
 
   pspec = g_param_spec_object ("aux2",
-                               "Aux2",
-                               "Second auxiliary image buffer input pad.",
-                               GEGL_TYPE_BUFFER,
-                               G_PARAM_READWRITE |
-                               GEGL_PARAM_PAD_INPUT);
+      "Aux2",
+      "Second auxiliary image buffer input pad.",
+      GEGL_TYPE_BUFFER,
+      G_PARAM_READWRITE |
+      GEGL_PARAM_PAD_INPUT);
   gegl_operation_create_pad (operation, pspec);
   g_param_spec_sink (pspec);
 }
 
-static gboolean
+typedef struct ThreadData
+{
+  GeglOperationComposer3Class *klass;
+  GeglOperation               *operation;
+  GeglBuffer                  *input;
+  GeglBuffer                  *aux;
+  GeglBuffer                  *aux2;
+  GeglBuffer                  *output;
+  gint                        *pending;
+  gint                         level;
+  gboolean                     success;
+  GeglRectangle                roi;
+} ThreadData;
+
+static GMutex pool_mutex = {0,};
+static GCond  pool_cond  = {0,};
+
+static void thread_process (gpointer thread_data, gpointer unused)
+{
+  ThreadData *data = thread_data;
+  if (!data->klass->process (data->operation,
+        data->input, data->aux, data->aux2, 
+        data->output, &data->roi, data->level))
+    data->success = FALSE;
+  g_atomic_int_add (data->pending, -1);
+  if (*data->pending == 0)
+    {
+      g_mutex_lock (&pool_mutex);
+      g_cond_signal (&pool_cond);
+      g_mutex_unlock (&pool_mutex);
+    }
+}
+
+static GThreadPool *thread_pool (void)
+{
+  static GThreadPool *pool = NULL;
+  if (!pool)
+  {
+    pool =  g_thread_pool_new (thread_process, NULL, gegl_config()->threads,
+        FALSE, NULL);
+  }
+  return pool;
+}
+
+
+  static gboolean
 gegl_operation_composer3_process (GeglOperation        *operation,
-                                  GeglOperationContext *context,
-                                  const gchar          *output_prop,
-                                  const GeglRectangle  *result,
-                                  gint                  level)
+    GeglOperationContext *context,
+    const gchar          *output_prop,
+    const GeglRectangle  *result,
+    gint                  level)
 {
   GeglOperationComposer3Class *klass   = GEGL_OPERATION_COMPOSER3_GET_CLASS (operation);
   GeglBuffer                  *input;
@@ -120,14 +166,10 @@ gegl_operation_composer3_process (GeglOperation        *operation,
   gboolean                     success = FALSE;
 
   if (strcmp (output_prop, "output"))
-    {
-      g_warning ("requested processing of %s pad on a composer", output_prop);
-      return FALSE;
-    }
-  output = gegl_operation_context_get_target (context, "output");
-
-  if (result->width == 0 || result->height == 0)
-    return TRUE;
+  {
+    g_warning ("requested processing of %s pad on a composer", output_prop);
+    return FALSE;
+  }
 
   if (result->width == 0 || result->height == 0)
   {
@@ -151,7 +193,66 @@ gegl_operation_composer3_process (GeglOperation        *operation,
       aux != NULL ||
       aux2 != NULL)
     {
-      success = klass->process (operation, input, aux, aux2, output, result, level);
+      if (gegl_operation_use_threading (operation, result))
+      {
+        gint threads = gegl_config ()->threads;
+        GThreadPool *pool = thread_pool ();
+        ThreadData thread_data[GEGL_MAX_THREADS];
+        gint pending = threads;
+
+        if (result->width > result->height)
+        {
+          gint bit = result->width / threads;
+          for (gint j = 0; j < threads; j++)
+          {
+            thread_data[j].roi.y = result->y;
+            thread_data[j].roi.height = result->height;
+            thread_data[j].roi.x = result->x + bit * j;
+            thread_data[j].roi.width = bit;
+          }
+          thread_data[threads-1].roi.width = result->width - (bit * (threads-1));
+        }
+        else
+        {
+          gint bit = result->height / threads;
+          for (gint j = 0; j < threads; j++)
+          {
+            thread_data[j].roi.x = result->x;
+            thread_data[j].roi.width = result->width;
+            thread_data[j].roi.y = result->y + bit * j;
+            thread_data[j].roi.height = bit;
+          }
+          thread_data[threads-1].roi.height = result->height - (bit * (threads-1));
+        }
+        for (gint i = 0; i < threads; i++)
+        {
+          thread_data[i].klass = klass;
+          thread_data[i].operation = operation;
+          thread_data[i].input = input;
+          thread_data[i].aux = aux;
+          thread_data[i].aux2 = aux2;
+          thread_data[i].output = output;
+          thread_data[i].pending = &pending;
+          thread_data[i].level = level;
+          thread_data[i].success = TRUE;
+        }
+
+        g_mutex_lock (&pool_mutex);
+
+        for (gint i = 0; i < threads; i++)
+          g_thread_pool_push (pool, &thread_data[i], NULL);
+
+        while (pending != 0)
+          g_cond_wait (&pool_cond, &pool_mutex);
+        
+        g_mutex_unlock (&pool_mutex);
+
+        success = thread_data[0].success;
+      }
+      else
+      {
+        success = klass->process (operation, input, aux, aux2, output, result, level);
+      }
 
       if (input)
         g_object_unref (input);
diff --git a/gegl/operation/gegl-operation-filter.c b/gegl/operation/gegl-operation-filter.c
index c23236b..c9a7afa 100644
--- a/gegl/operation/gegl-operation-filter.c
+++ b/gegl/operation/gegl-operation-filter.c
@@ -13,7 +13,7 @@
  * You should have received a copy of the GNU Lesser General Public
  * License along with GEGL; if not, see <http://www.gnu.org/licenses/>.
  *
- * Copyright 2006 Øyvind Kolås
+ * Copyright 2006, 2014 Øyvind Kolås
  */
 
 #include "config.h"
@@ -24,6 +24,7 @@
 #include "gegl.h"
 #include "gegl-operation-filter.h"
 #include "gegl-operation-context.h"
+#include "gegl-config.h"
 
 static gboolean gegl_operation_filter_process
                                       (GeglOperation        *operation,
@@ -56,6 +57,7 @@ gegl_operation_filter_class_init (GeglOperationFilterClass * klass)
   operation_class->detect                  = detect;
   operation_class->get_bounding_box        = get_bounding_box;
   operation_class->get_required_for_output = get_required_for_output;
+  operation_class->threaded                = TRUE;
 }
 
 static void
@@ -103,6 +105,47 @@ detect (GeglOperation *operation,
   return operation->node;
 }
 
+typedef struct ThreadData
+{
+  GeglOperationFilterClass *klass;
+  GeglOperation            *operation;
+  GeglBuffer               *input;
+  GeglBuffer               *output;
+  gint                     *pending;
+  gint                      level;
+  gboolean                  success;
+  GeglRectangle             roi;
+} ThreadData;
+
+static GMutex pool_mutex = {0,};
+static GCond  pool_cond  = {0,};
+
+static void thread_process (gpointer thread_data, gpointer unused)
+{
+  ThreadData *data = thread_data;
+  if (!data->klass->process (data->operation,
+                       data->input, data->output, &data->roi, data->level))
+    data->success = FALSE;
+  g_atomic_int_add (data->pending, -1);
+  if (*data->pending == 0)
+  {
+    g_mutex_lock (&pool_mutex);
+    g_cond_signal (&pool_cond);
+    g_mutex_unlock (&pool_mutex);
+  }
+}
+
+static GThreadPool *thread_pool (void)
+{
+  static GThreadPool *pool = NULL;
+  if (!pool)
+    {
+      pool =  g_thread_pool_new (thread_process, NULL, gegl_config()->threads,
+                                 FALSE, NULL);
+    }
+  return pool;
+}
+
 static gboolean
 gegl_operation_filter_process (GeglOperation        *operation,
                                GeglOperationContext *context,
@@ -131,7 +174,65 @@ gegl_operation_filter_process (GeglOperation        *operation,
                                                              input,
                                                              result);
 
-  success = klass->process (operation, input, output, result, level);
+  if (gegl_operation_use_threading (operation, result))
+  {
+    gint threads = gegl_config ()->threads;
+    GThreadPool *pool = thread_pool ();
+    ThreadData thread_data[GEGL_MAX_THREADS];
+    gint pending = threads;
+
+    if (result->width > result->height)
+    {
+      gint bit = result->width / threads;
+      for (gint j = 0; j < threads; j++)
+      {
+        thread_data[j].roi.y = result->y;
+        thread_data[j].roi.height = result->height;
+        thread_data[j].roi.x = result->x + bit * j;
+        thread_data[j].roi.width = bit;
+      }
+      thread_data[threads-1].roi.width = result->width - (bit * (threads-1));
+    }
+    else
+    {
+      gint bit = result->height / threads;
+      for (gint j = 0; j < threads; j++)
+      {
+        thread_data[j].roi.x = result->x;
+        thread_data[j].roi.width = result->width;
+        thread_data[j].roi.y = result->y + bit * j;
+        thread_data[j].roi.height = bit;
+      }
+      thread_data[threads-1].roi.height = result->height - (bit * (threads-1));
+    }
+    for (gint i = 0; i < threads; i++)
+    {
+      thread_data[i].klass = klass;
+      thread_data[i].operation = operation;
+      thread_data[i].input = input;
+      thread_data[i].output = output;
+      thread_data[i].pending = &pending;
+      thread_data[i].level = level;
+      thread_data[i].success = TRUE;
+    }
+
+    g_mutex_lock (&pool_mutex);
+
+    for (gint i = 0; i < threads; i++)
+      g_thread_pool_push (pool, &thread_data[i], NULL);
+
+    while (pending != 0)
+      g_cond_wait (&pool_cond, &pool_mutex);
+
+    g_mutex_unlock (&pool_mutex);
+
+
+    success = thread_data[0].success;
+  }
+  else
+  {
+    success = klass->process (operation, input, output, result, level);
+  }
 
   if (input != NULL)
     g_object_unref (input);
diff --git a/gegl/operation/gegl-operation-point-composer3.c b/gegl/operation/gegl-operation-point-composer3.c
index fd0faa1..aceda5b 100644
--- a/gegl/operation/gegl-operation-point-composer3.c
+++ b/gegl/operation/gegl-operation-point-composer3.c
@@ -55,7 +55,7 @@ gegl_operation_point_composer3_class_init (GeglOperationPointComposer3Class *kla
 
   composer_class->process = gegl_operation_point_composer3_process;
   operation_class->prepare = prepare;
-  operation_class->no_cache =TRUE;
+  operation_class->no_cache = TRUE;
   operation_class->want_in_place = TRUE;
 }
 
diff --git a/gegl/operation/gegl-operation-point-render.c b/gegl/operation/gegl-operation-point-render.c
index e46c2db..b26a43f 100644
--- a/gegl/operation/gegl-operation-point-render.c
+++ b/gegl/operation/gegl-operation-point-render.c
@@ -57,6 +57,7 @@ gegl_operation_point_render_class_init (GeglOperationPointRenderClass *klass)
 
   operation_class->detect = detect;
   operation_class->no_cache = FALSE;
+  operation_class->threaded = TRUE;
   operation_class->get_cached_region = NULL; /* we are able to compute anything
                                                  anywhere when we're our kind
                                                  of class */
diff --git a/gegl/operation/gegl-operation-source.c b/gegl/operation/gegl-operation-source.c
index 38a3b9c..7eac026 100644
--- a/gegl/operation/gegl-operation-source.c
+++ b/gegl/operation/gegl-operation-source.c
@@ -24,6 +24,7 @@
 #include "gegl.h"
 #include "gegl-operation-source.h"
 #include "gegl-operation-context.h"
+#include "gegl-config.h"
 
 static gboolean gegl_operation_source_process
                              (GeglOperation        *operation,
@@ -79,6 +80,47 @@ attach (GeglOperation *self)
   g_param_spec_sink (pspec);
 }
 
+typedef struct ThreadData
+{
+  GeglOperationSourceClass *klass;
+  GeglOperation            *operation;
+  GeglBuffer               *output;
+  gint                     *pending;
+  gint                      level;
+  gboolean                  success;
+  GeglRectangle             roi;
+} ThreadData;
+
+static GMutex pool_mutex = {0,};
+static GCond  pool_cond  = {0,};
+
+static void thread_process (gpointer thread_data, gpointer unused)
+{
+  ThreadData *data = thread_data;
+  if (!data->klass->process (data->operation,
+                       data->output, &data->roi, data->level))
+    data->success = FALSE;
+  g_atomic_int_add (data->pending, -1);
+
+  if (*data->pending == 0)
+  {
+    g_mutex_lock (&pool_mutex);
+    g_cond_signal (&pool_cond);
+    g_mutex_unlock (&pool_mutex);
+  }
+}
+
+static GThreadPool *thread_pool (void)
+{
+  static GThreadPool *pool = NULL;
+  if (!pool)
+    {
+      pool =  g_thread_pool_new (thread_process, NULL, gegl_config()->threads,
+                                 FALSE, NULL);
+    }
+  return pool;
+}
+
 static gboolean
 gegl_operation_source_process (GeglOperation        *operation,
                                GeglOperationContext *context,
@@ -88,7 +130,7 @@ gegl_operation_source_process (GeglOperation        *operation,
 {
   GeglOperationSourceClass *klass = GEGL_OPERATION_SOURCE_GET_CLASS (operation);
   GeglBuffer               *output;
-  gboolean                  success;
+  gboolean                  success = FALSE;
 
   if (strcmp (output_prop, "output"))
     {
@@ -98,7 +140,64 @@ gegl_operation_source_process (GeglOperation        *operation,
 
   g_assert (klass->process);
   output = gegl_operation_context_get_target (context, "output");
-  success = klass->process (operation, output, result, level);
+
+  if (gegl_operation_use_threading (operation, result))
+  {
+    gint threads = gegl_config ()->threads;
+    GThreadPool *pool = thread_pool ();
+    ThreadData thread_data[GEGL_MAX_THREADS];
+    gint pending = threads;
+
+    if (result->width > result->height)
+    {
+      gint bit = result->width / threads;
+      for (gint j = 0; j < threads; j++)
+      {
+        thread_data[j].roi.y = result->y;
+        thread_data[j].roi.height = result->height;
+        thread_data[j].roi.x = result->x + bit * j;
+        thread_data[j].roi.width = bit;
+      }
+      thread_data[threads-1].roi.width = result->width - (bit * (threads-1));
+    }
+    else
+    {
+      gint bit = result->height / threads;
+      for (gint j = 0; j < threads; j++)
+      {
+        thread_data[j].roi.x = result->x;
+        thread_data[j].roi.width = result->width;
+        thread_data[j].roi.y = result->y + bit * j;
+        thread_data[j].roi.height = bit;
+      }
+      thread_data[threads-1].roi.height = result->height - (bit * (threads-1));
+    }
+    for (gint i = 0; i < threads; i++)
+    {
+      thread_data[i].klass = klass;
+      thread_data[i].operation = operation;
+      thread_data[i].output = output;
+      thread_data[i].pending = &pending;
+      thread_data[i].level = level;
+      thread_data[i].success = TRUE;
+    }
+
+    g_mutex_lock (&pool_mutex);
+
+    for (gint i = 0; i < threads; i++)
+      g_thread_pool_push (pool, &thread_data[i], NULL);
+
+    while (pending != 0)
+      g_cond_wait (&pool_cond, &pool_mutex);
+
+    g_mutex_unlock (&pool_mutex);
+
+    success = thread_data[0].success;
+  }
+  else
+  {
+    success = klass->process (operation, output, result, level);
+  }
 
   return success;
 }
diff --git a/gegl/operation/gegl-operation.c b/gegl/operation/gegl-operation.c
index 560a80d..8bbfb59 100644
--- a/gegl/operation/gegl-operation.c
+++ b/gegl/operation/gegl-operation.c
@@ -23,6 +23,7 @@
 #include <string.h>
 
 #include "gegl.h"
+#include "gegl-config.h"
 #include "gegl-types-internal.h"
 #include "gegl-operation.h"
 #include "gegl-operation-context.h"
@@ -86,6 +87,7 @@ gegl_operation_class_init (GeglOperationClass *klass)
   klass->attach                    = attach;
   klass->prepare                   = NULL;
   klass->no_cache                  = FALSE;
+  klass->threaded                  = FALSE;
   klass->get_bounding_box          = get_bounding_box;
   klass->get_invalidated_by_change = get_invalidated_by_change;
   klass->get_required_for_output   = get_required_for_output;
@@ -744,3 +746,26 @@ gegl_operation_get_source_format (GeglOperation *operation,
     }
   return NULL;
 }
+
+gboolean
+gegl_operation_use_threading (GeglOperation *operation,
+                              const GeglRectangle *roi)
+{
+  gint threads = gegl_config ()->threads;
+  if (threads == 1)
+    return FALSE;
+
+  {
+    GeglOperationClass       *op_class;
+    op_class = GEGL_OPERATION_GET_CLASS (operation);
+
+    if (op_class->opencl_support && gegl_cl_is_accelerated ())
+      return FALSE;
+
+    if (op_class->threaded &&
+        roi->width * roi->height > 64*64)
+      return TRUE;
+  }
+  return FALSE;
+}
+


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]