[gegl/threaded-base-classes: 9/22] Make GeglOperation base classes threadable



commit 038f106544b4736c070d8ce4988602c0361270c7
Author: Øyvind Kolås <pippin gimp org>
Date:   Tue Jun 24 01:54:20 2014 +0200

    Make GeglOperation base classes threadable
    
    All ops are force opted in by now; some give broken results like gaussian
    blur; others crash. Point filters/composers ops seem to work correctly -
    but might be limited by memory bandwidth.

 gegl/gegl-types.h                         |    1 +
 gegl/graph/gegl-node-private.h            |    2 +-
 gegl/operation/gegl-operation-composer.c  |   92 ++++++++++++++++++++++++++++-
 gegl/operation/gegl-operation-composer3.c |   93 ++++++++++++++++++++++++++++-
 gegl/operation/gegl-operation-filter.c    |   91 +++++++++++++++++++++++++++-
 gegl/operation/gegl-operation-source.c    |   91 +++++++++++++++++++++++++++-
 gegl/operation/gegl-operation.h           |    3 +-
 7 files changed, 365 insertions(+), 8 deletions(-)
---
diff --git a/gegl/gegl-types.h b/gegl/gegl-types.h
index 714bdd3..8442911 100644
--- a/gegl/gegl-types.h
+++ b/gegl/gegl-types.h
@@ -25,6 +25,7 @@
 G_BEGIN_DECLS
 
 #define GEGL_AUTO_ROWSTRIDE 0
+#define GEGL_MAX_THREADS 16
 
 typedef enum
 {
diff --git a/gegl/graph/gegl-node-private.h b/gegl/graph/gegl-node-private.h
index 446a0bb..d80fce5 100644
--- a/gegl/graph/gegl-node-private.h
+++ b/gegl/graph/gegl-node-private.h
@@ -26,6 +26,7 @@
 
 G_BEGIN_DECLS
 
+
 #define GEGL_NODE_CLASS(klass)    (G_TYPE_CHECK_CLASS_CAST ((klass),  GEGL_TYPE_NODE, GeglNodeClass))
 #define GEGL_IS_NODE_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE ((klass),  GEGL_TYPE_NODE))
 #define GEGL_NODE_GET_CLASS(obj)  (G_TYPE_INSTANCE_GET_CLASS ((obj),  GEGL_TYPE_NODE, GeglNodeClass))
@@ -131,7 +132,6 @@ void
 gegl_node_emit_computed (GeglNode *node,
                          const GeglRectangle *rect);
 
-#define GEGL_MAX_THREADS 16
 
 G_END_DECLS
 
diff --git a/gegl/operation/gegl-operation-composer.c b/gegl/operation/gegl-operation-composer.c
index 739e914..de6beca 100644
--- a/gegl/operation/gegl-operation-composer.c
+++ b/gegl/operation/gegl-operation-composer.c
@@ -24,6 +24,7 @@
 #include "gegl.h"
 #include "gegl-operation-composer.h"
 #include "gegl-operation-context.h"
+#include "gegl-config.h"
 
 static gboolean gegl_operation_composer_process (GeglOperation       *operation,
                               GeglOperationContext     *context,
@@ -95,6 +96,39 @@ attach (GeglOperation *self)
   g_param_spec_sink (pspec);
 }
 
+typedef struct ThreadData
+{
+  GeglOperationComposerClass *klass;
+  GeglOperation              *operation;
+  GeglBuffer                 *input;
+  GeglBuffer                 *aux;
+  GeglBuffer                 *output;
+  gint                       *pending;
+  gint                        level;
+  gboolean                    success;
+  GeglRectangle               roi;
+} ThreadData;
+
+static void thread_process (gpointer thread_data, gpointer unused)
+{
+  ThreadData *data = thread_data;
+  if (!data->klass->process (data->operation,
+                       data->input, data->aux, data->output, &data->roi, data->level))
+    data->success = FALSE;
+  g_atomic_int_add (data->pending, -1);
+}
+
+static GThreadPool *thread_pool (void)
+{
+  static GThreadPool *pool = NULL;
+  if (!pool)
+    {
+      pool =  g_thread_pool_new (thread_process, NULL, gegl_config()->threads,
+                                 FALSE, NULL);
+    }
+  return pool;
+}
+
 static gboolean
 gegl_operation_composer_process (GeglOperation        *operation,
                                  GeglOperationContext *context,
@@ -103,6 +137,7 @@ gegl_operation_composer_process (GeglOperation        *operation,
                                  gint                  level)
 {
   GeglOperationComposerClass *klass   = GEGL_OPERATION_COMPOSER_GET_CLASS (operation);
+  GeglOperationClass         *op_class = GEGL_OPERATION_CLASS (klass);
   GeglBuffer                 *input;
   GeglBuffer                 *aux;
   GeglBuffer                 *output;
@@ -124,7 +159,62 @@ gegl_operation_composer_process (GeglOperation        *operation,
   if (input != NULL ||
       aux != NULL)
     {
-      success = klass->process (operation, input, aux, output, result, level);
+//      success = klass->process (operation, input, aux, output, result, level);
+
+      op_class->parallelize = 1;
+      if (op_class->parallelize && result->width * result->height > 64*64)
+      {
+        GThreadPool *pool = thread_pool ();
+        gint threads = gegl_config ()->threads;
+        ThreadData thread_data[GEGL_MAX_THREADS];
+        gint pending = threads;
+
+        if (result->width > result->height)
+        {
+          gint bit = result->width / threads;
+          for (gint j = 0; j < threads; j++)
+          {
+            thread_data[j].roi.y = result->y;
+            thread_data[j].roi.height = result->height;
+            thread_data[j].roi.x = result->x + bit * j;
+            thread_data[j].roi.width = bit;
+          }
+          thread_data[threads-1].roi.width = result->width - (bit * (threads-1));
+        }
+        else
+        {
+          gint bit = result->height / threads;
+          for (gint j = 0; j < threads; j++)
+          {
+            thread_data[j].roi.x = result->x;
+            thread_data[j].roi.width = result->width;
+            thread_data[j].roi.y = result->y + bit * j;
+            thread_data[j].roi.height = bit;
+          }
+          thread_data[threads-1].roi.height = result->height - (bit * (threads-1));
+        }
+        for (gint i = 0; i < threads; i++)
+        {
+          thread_data[i].klass = klass;
+          thread_data[i].operation = operation;
+          thread_data[i].input = input;
+          thread_data[i].aux = aux;
+          thread_data[i].output = output;
+          thread_data[i].pending = &pending;
+          thread_data[i].level = level;
+          thread_data[i].success = TRUE;
+        }
+
+        for (gint i = 1; i < threads; i++)
+          g_thread_pool_push (pool, &thread_data[i], NULL);
+        thread_process (&thread_data[0], NULL);
+
+        while (pending > 0) {g_usleep(3);};
+      }
+      else
+      {
+        success = klass->process (operation, input, aux, output, result, level);
+      }
 
       if (input)
         g_object_unref (input);
diff --git a/gegl/operation/gegl-operation-composer3.c b/gegl/operation/gegl-operation-composer3.c
index 67a7cb0..d9f5baa 100644
--- a/gegl/operation/gegl-operation-composer3.c
+++ b/gegl/operation/gegl-operation-composer3.c
@@ -24,6 +24,7 @@
 #include "gegl.h"
 #include "gegl-operation-composer3.h"
 #include "gegl-operation-context.h"
+#include "gegl-config.h"
 
 static gboolean gegl_operation_composer3_process
                              (GeglOperation        *operation,
@@ -105,6 +106,42 @@ attach (GeglOperation *self)
   g_param_spec_sink (pspec);
 }
 
+typedef struct ThreadData
+{
+  GeglOperationComposer3Class *klass;
+  GeglOperation               *operation;
+  GeglBuffer                  *input;
+  GeglBuffer                  *aux;
+  GeglBuffer                  *aux2;
+  GeglBuffer                  *output;
+  gint                        *pending;
+  gint                         level;
+  gboolean                     success;
+  GeglRectangle                roi;
+} ThreadData;
+
+static void thread_process (gpointer thread_data, gpointer unused)
+{
+  ThreadData *data = thread_data;
+  if (!data->klass->process (data->operation,
+                       data->input, data->aux, data->aux2, 
+                       data->output, &data->roi, data->level))
+    data->success = FALSE;
+  g_atomic_int_add (data->pending, -1);
+}
+
+static GThreadPool *thread_pool (void)
+{
+  static GThreadPool *pool = NULL;
+  if (!pool)
+    {
+      pool =  g_thread_pool_new (thread_process, NULL, gegl_config()->threads,
+                                 FALSE, NULL);
+    }
+  return pool;
+}
+
+
 static gboolean
 gegl_operation_composer3_process (GeglOperation        *operation,
                                   GeglOperationContext *context,
@@ -155,7 +192,61 @@ gegl_operation_composer3_process (GeglOperation        *operation,
       aux != NULL ||
       aux2 != NULL)
     {
-      success = klass->process (operation, input, aux, aux2, output, result, level);
+      op_class->parallelize = 1;
+      if (op_class->parallelize && result->width * result->height > 64*64)
+      {
+        GThreadPool *pool = thread_pool ();
+        gint threads = gegl_config ()->threads;
+        ThreadData thread_data[GEGL_MAX_THREADS];
+        gint pending = threads;
+
+        if (result->width > result->height)
+        {
+          gint bit = result->width / threads;
+          for (gint j = 0; j < threads; j++)
+          {
+            thread_data[j].roi.y = result->y;
+            thread_data[j].roi.height = result->height;
+            thread_data[j].roi.x = result->x + bit * j;
+            thread_data[j].roi.width = bit;
+          }
+          thread_data[threads-1].roi.width = result->width - (bit * (threads-1));
+        }
+        else
+        {
+          gint bit = result->height / threads;
+          for (gint j = 0; j < threads; j++)
+          {
+            thread_data[j].roi.x = result->x;
+            thread_data[j].roi.width = result->width;
+            thread_data[j].roi.y = result->y + bit * j;
+            thread_data[j].roi.height = bit;
+          }
+          thread_data[threads-1].roi.height = result->height - (bit * (threads-1));
+        }
+        for (gint i = 0; i < threads; i++)
+        {
+          thread_data[i].klass = klass;
+          thread_data[i].operation = operation;
+          thread_data[i].input = input;
+          thread_data[i].aux = aux;
+          thread_data[i].aux2 = aux2;
+          thread_data[i].output = output;
+          thread_data[i].pending = &pending;
+          thread_data[i].level = level;
+          thread_data[i].success = TRUE;
+        }
+
+        for (gint i = 1; i < threads; i++)
+          g_thread_pool_push (pool, &thread_data[i], NULL);
+        thread_process (&thread_data[0], NULL);
+
+        while (pending > 0) {g_usleep(3);};
+      }
+      else
+      {
+        success = klass->process (operation, input, aux, aux2, output, result, level);
+      }
 
       if (input)
         g_object_unref (input);
diff --git a/gegl/operation/gegl-operation-filter.c b/gegl/operation/gegl-operation-filter.c
index f3c924c..ea3e9bf 100644
--- a/gegl/operation/gegl-operation-filter.c
+++ b/gegl/operation/gegl-operation-filter.c
@@ -13,7 +13,7 @@
  * You should have received a copy of the GNU Lesser General Public
  * License along with GEGL; if not, see <http://www.gnu.org/licenses/>.
  *
- * Copyright 2006 Øyvind Kolås
+ * Copyright 2006, 2014 Øyvind Kolås
  */
 
 #include "config.h"
@@ -24,6 +24,7 @@
 #include "gegl.h"
 #include "gegl-operation-filter.h"
 #include "gegl-operation-context.h"
+#include "gegl-config.h"
 
 static gboolean gegl_operation_filter_process
                                       (GeglOperation        *operation,
@@ -103,6 +104,38 @@ detect (GeglOperation *operation,
   return operation->node;
 }
 
+typedef struct ThreadData
+{
+  GeglOperationFilterClass *klass;
+  GeglOperation            *operation;
+  GeglBuffer               *input;
+  GeglBuffer               *output;
+  gint                     *pending;
+  gint                      level;
+  gboolean                  success;
+  GeglRectangle             roi;
+} ThreadData;
+
+static void thread_process (gpointer thread_data, gpointer unused)
+{
+  ThreadData *data = thread_data;
+  if (!data->klass->process (data->operation,
+                       data->input, data->output, &data->roi, data->level))
+    data->success = FALSE;
+  g_atomic_int_add (data->pending, -1);
+}
+
+static GThreadPool *thread_pool (void)
+{
+  static GThreadPool *pool = NULL;
+  if (!pool)
+    {
+      pool =  g_thread_pool_new (thread_process, NULL, gegl_config()->threads,
+                                 FALSE, NULL);
+    }
+  return pool;
+}
+
 static gboolean
 gegl_operation_filter_process (GeglOperation        *operation,
                                GeglOperationContext *context,
@@ -111,11 +144,13 @@ gegl_operation_filter_process (GeglOperation        *operation,
                                gint                  level)
 {
   GeglOperationFilterClass *klass;
+  GeglOperationClass       *op_class;
   GeglBuffer               *input;
   GeglBuffer               *output;
   gboolean                  success = FALSE;
 
   klass = GEGL_OPERATION_FILTER_GET_CLASS (operation);
+  op_class = GEGL_OPERATION_CLASS (klass);
 
   g_assert (klass->process);
 
@@ -128,7 +163,59 @@ gegl_operation_filter_process (GeglOperation        *operation,
   input  = gegl_operation_context_get_source (context, "input");
   output = gegl_operation_context_get_target (context, "output");
 
-  success = klass->process (operation, input, output, result, level);
+  op_class->parallelize = 1;
+  if (op_class->parallelize && result->width * result->height > 64*64)
+  {
+    GThreadPool *pool = thread_pool ();
+    gint threads = gegl_config ()->threads;
+    ThreadData thread_data[GEGL_MAX_THREADS];
+    gint pending = threads;
+
+    if (result->width > result->height)
+    {
+      gint bit = result->width / threads;
+      for (gint j = 0; j < threads; j++)
+      {
+        thread_data[j].roi.y = result->y;
+        thread_data[j].roi.height = result->height;
+        thread_data[j].roi.x = result->x + bit * j;
+        thread_data[j].roi.width = bit;
+      }
+      thread_data[threads-1].roi.width = result->width - (bit * (threads-1));
+    }
+    else
+    {
+      gint bit = result->height / threads;
+      for (gint j = 0; j < threads; j++)
+      {
+        thread_data[j].roi.x = result->x;
+        thread_data[j].roi.width = result->width;
+        thread_data[j].roi.y = result->y + bit * j;
+        thread_data[j].roi.height = bit;
+      }
+      thread_data[threads-1].roi.height = result->height - (bit * (threads-1));
+    }
+    for (gint i = 0; i < threads; i++)
+    {
+      thread_data[i].klass = klass;
+      thread_data[i].operation = operation;
+      thread_data[i].input = input;
+      thread_data[i].output = output;
+      thread_data[i].pending = &pending;
+      thread_data[i].level = level;
+      thread_data[i].success = TRUE;
+    }
+
+    for (gint i = 1; i < threads; i++)
+      g_thread_pool_push (pool, &thread_data[i], NULL);
+    thread_process (&thread_data[0], NULL);
+
+    while (pending > 0) {g_usleep(3);};
+  }
+  else
+  {
+    success = klass->process (operation, input, output, result, level);
+  }
 
   if (input != NULL)
     g_object_unref (input);
diff --git a/gegl/operation/gegl-operation-source.c b/gegl/operation/gegl-operation-source.c
index 38a3b9c..ad102ec 100644
--- a/gegl/operation/gegl-operation-source.c
+++ b/gegl/operation/gegl-operation-source.c
@@ -24,6 +24,7 @@
 #include "gegl.h"
 #include "gegl-operation-source.h"
 #include "gegl-operation-context.h"
+#include "gegl-config.h"
 
 static gboolean gegl_operation_source_process
                              (GeglOperation        *operation,
@@ -79,6 +80,37 @@ attach (GeglOperation *self)
   g_param_spec_sink (pspec);
 }
 
+typedef struct ThreadData
+{
+  GeglOperationSourceClass *klass;
+  GeglOperation            *operation;
+  GeglBuffer               *output;
+  gint                     *pending;
+  gint                      level;
+  gboolean                  success;
+  GeglRectangle             roi;
+} ThreadData;
+
+static void thread_process (gpointer thread_data, gpointer unused)
+{
+  ThreadData *data = thread_data;
+  if (!data->klass->process (data->operation,
+                       data->output, &data->roi, data->level))
+    data->success = FALSE;
+  g_atomic_int_add (data->pending, -1);
+}
+
+static GThreadPool *thread_pool (void)
+{
+  static GThreadPool *pool = NULL;
+  if (!pool)
+    {
+      pool =  g_thread_pool_new (thread_process, NULL, gegl_config()->threads,
+                                 FALSE, NULL);
+    }
+  return pool;
+}
+
 static gboolean
 gegl_operation_source_process (GeglOperation        *operation,
                                GeglOperationContext *context,
@@ -87,8 +119,11 @@ gegl_operation_source_process (GeglOperation        *operation,
                                gint                  level)
 {
   GeglOperationSourceClass *klass = GEGL_OPERATION_SOURCE_GET_CLASS (operation);
+  GeglOperationClass       *op_class;
   GeglBuffer               *output;
-  gboolean                  success;
+  gboolean                  success = FALSE;
+
+  op_class = GEGL_OPERATION_CLASS (klass);
 
   if (strcmp (output_prop, "output"))
     {
@@ -98,7 +133,59 @@ gegl_operation_source_process (GeglOperation        *operation,
 
   g_assert (klass->process);
   output = gegl_operation_context_get_target (context, "output");
-  success = klass->process (operation, output, result, level);
+
+  op_class->parallelize= 1;
+  if (op_class->parallelize && result->width * result->height > 64*64)
+  {
+    GThreadPool *pool = thread_pool ();
+    gint threads = gegl_config ()->threads;
+    ThreadData thread_data[GEGL_MAX_THREADS];
+    gint pending = threads;
+
+    if (result->width > result->height)
+    {
+      gint bit = result->width / threads;
+      for (gint j = 0; j < threads; j++)
+      {
+        thread_data[j].roi.y = result->y;
+        thread_data[j].roi.height = result->height;
+        thread_data[j].roi.x = result->x + bit * j;
+        thread_data[j].roi.width = bit;
+      }
+      thread_data[threads-1].roi.width = result->width - (bit * (threads-1));
+    }
+    else
+    {
+      gint bit = result->height / threads;
+      for (gint j = 0; j < threads; j++)
+      {
+        thread_data[j].roi.x = result->x;
+        thread_data[j].roi.width = result->width;
+        thread_data[j].roi.y = result->y + bit * j;
+        thread_data[j].roi.height = bit;
+      }
+      thread_data[threads-1].roi.height = result->height - (bit * (threads-1));
+    }
+    for (gint i = 0; i < threads; i++)
+    {
+      thread_data[i].klass = klass;
+      thread_data[i].operation = operation;
+      thread_data[i].output = output;
+      thread_data[i].pending = &pending;
+      thread_data[i].level = level;
+      thread_data[i].success = TRUE;
+    }
+
+    for (gint i = 1; i < threads; i++)
+      g_thread_pool_push (pool, &thread_data[i], NULL);
+    thread_process (&thread_data[0], NULL);
+
+    while (pending > 0) {g_usleep(3);};
+  }
+  else
+  {
+    success = klass->process (operation, output, result, level);
+  }
 
   return success;
 }
diff --git a/gegl/operation/gegl-operation.h b/gegl/operation/gegl-operation.h
index cd1cb59..e54f28e 100644
--- a/gegl/operation/gegl-operation.h
+++ b/gegl/operation/gegl-operation.h
@@ -82,7 +82,8 @@ struct _GeglOperationClass
                                       processing, making output buffer =
                                       input buffer.
                                       */
-  guint64         bit_pad:61;
+  guint           parallelize:1;
+  guint64         bit_pad:60;
 
   /* attach this operation with a GeglNode, override this if you are creating a
    * GeglGraph, it is already defined for Filters/Sources/Composers.


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]