[glib/wip/task] GTask: Add g_task_set_scheduling()



commit a676dec260563d844fbc8529e859d693900e3579
Author: Colin Walters <walters verbum org>
Date:   Thu Dec 13 17:32:22 2012 -0500

    GTask: Add g_task_set_scheduling()
    
    My application (OSTree) has suffered really bad performance
    degredation recently, and doing some analysis of the problem, I think
    a lot stems from the recent order of magnitude increase in the default
    GLib worker threads.
    
    OSTree does a *lot* of async I/O (directory enumeration, calling
    link(), reading/writing files) etc.  It very quickly reaches 100
    threads, but there is up/down thread churn as threads complete, but
    process gets I/O bound and thus the threads are starved of work, but
    then the main thread manages to fill the queue more, suddenly spinning
    up lots of worker threads again.
    
    Now, as this patch forces users to specify, there are several distinct
    classes of tasks.  It basically never makes sense to run more
    CPU-bound threads than there are CPUs.  Spawning 90+ threads to do
    SHA256 calculation for example is just creating pointless extra
    contention on a dual-processor system.
    
    Therefore, the limit of the thread pool for G_TASK_THREAD_KIND_CPU is
    the number of processors.
    
    Similarly, there's a real limit to how much I/O traffic it makes sense
    to schedule simultaneously.  I need to do more research here - what
    makes sense for my laptop with 1 magnetic hard drive is likely
    different from a fast server with RAID on top of SSDs.  For the
    moment, I've chosen to limit I/O bound threads to 4.
    
    The limit for _DEFAULT remains 100 - but we can solve this better if
    we have e.g. a way for tasks to express dependencies, among other things.
    
    But for now, with this patch, and OSTree modified to use
    G_TASK_THREAD_KIND_IO for its rename()/link() threads, I am seeing
    better performance.
    
    https://bugzilla.gnome.org/show_bug.cgi?id=687223

 gio/gfile.c             |   30 ++++++------
 gio/gfileenumerator.c   |    4 +-
 gio/gfileinputstream.c  |    2 +-
 gio/gfileoutputstream.c |    2 +-
 gio/gio.symbols         |    1 +
 gio/gioenums.h          |   18 +++++++
 gio/gtask.c             |  114 +++++++++++++++++++++++++++++++++++++++++++----
 gio/gtask.h             |    4 ++
 gio/tests/task.c        |   17 +++++++
 9 files changed, 164 insertions(+), 28 deletions(-)
---
diff --git a/gio/gfile.c b/gio/gfile.c
index 74c0f9b..fcc7908 100644
--- a/gio/gfile.c
+++ b/gio/gfile.c
@@ -5007,7 +5007,7 @@ g_file_real_query_info_async (GFile               *file,
 
   task = g_task_new (file, cancellable, callback, user_data);
   g_task_set_task_data (task, data, (GDestroyNotify)query_info_data_free);
-  g_task_set_priority (task, io_priority);
+  g_task_set_scheduling (task, io_priority, G_TASK_THREAD_KIND_IO);
   g_task_run_in_thread (task, query_info_async_thread);
   g_object_unref (task);
 }
@@ -5051,7 +5051,7 @@ g_file_real_query_filesystem_info_async (GFile               *file,
 
   task = g_task_new (file, cancellable, callback, user_data);
   g_task_set_task_data (task, g_strdup (attributes), g_free);
-  g_task_set_priority (task, io_priority);
+  g_task_set_scheduling (task, io_priority, G_TASK_THREAD_KIND_IO);
   g_task_run_in_thread (task, query_filesystem_info_async_thread);
   g_object_unref (task);
 }
@@ -5101,7 +5101,7 @@ g_file_real_enumerate_children_async (GFile               *file,
 
   task = g_task_new (file, cancellable, callback, user_data);
   g_task_set_task_data (task, data, (GDestroyNotify)query_info_data_free);
-  g_task_set_priority (task, io_priority);
+  g_task_set_scheduling (task, io_priority, G_TASK_THREAD_KIND_IO);
   g_task_run_in_thread (task, enumerate_children_async_thread);
   g_object_unref (task);
 }
@@ -5153,7 +5153,7 @@ g_file_real_read_async (GFile               *file,
   GTask *task;
 
   task = g_task_new (file, cancellable, callback, user_data);
-  g_task_set_priority (task, io_priority);
+  g_task_set_scheduling (task, io_priority, G_TASK_THREAD_KIND_IO);
   g_task_run_in_thread (task, open_read_async_thread);
   g_object_unref (task);
 }
@@ -5204,7 +5204,7 @@ g_file_real_append_to_async (GFile               *file,
 
   task = g_task_new (file, cancellable, callback, user_data);
   g_task_set_task_data (task, data, g_free);
-  g_task_set_priority (task, io_priority);
+  g_task_set_scheduling (task, io_priority, G_TASK_THREAD_KIND_IO);
 
   g_task_run_in_thread (task, append_to_async_thread);
   g_object_unref (task);
@@ -5256,7 +5256,7 @@ g_file_real_create_async (GFile               *file,
 
   task = g_task_new (file, cancellable, callback, user_data);
   g_task_set_task_data (task, data, g_free);
-  g_task_set_priority (task, io_priority);
+  g_task_set_scheduling (task, io_priority, G_TASK_THREAD_KIND_IO);
 
   g_task_run_in_thread (task, create_async_thread);
   g_object_unref (task);
@@ -5334,7 +5334,7 @@ g_file_real_replace_async (GFile               *file,
 
   task = g_task_new (file, cancellable, callback, user_data);
   g_task_set_task_data (task, data, (GDestroyNotify)replace_async_data_free);
-  g_task_set_priority (task, io_priority);
+  g_task_set_scheduling (task, io_priority, G_TASK_THREAD_KIND_IO);
 
   g_task_run_in_thread (task, replace_async_thread);
   g_object_unref (task);
@@ -5380,7 +5380,7 @@ g_file_real_delete_async (GFile               *file,
   GTask *task;
 
   task = g_task_new (file, cancellable, callback, user_data);
-  g_task_set_priority (task, io_priority);
+  g_task_set_scheduling (task, io_priority, G_TASK_THREAD_KIND_IO);
   g_task_run_in_thread (task, delete_async_thread);
   g_object_unref (task);
 }
@@ -5432,7 +5432,7 @@ g_file_real_open_readwrite_async (GFile               *file,
   GTask *task;
 
   task = g_task_new (file, cancellable, callback, user_data);
-  g_task_set_priority (task, io_priority);
+  g_task_set_scheduling (task, io_priority, G_TASK_THREAD_KIND_IO);
 
   g_task_run_in_thread (task, open_readwrite_async_thread);
   g_object_unref (task);
@@ -5492,7 +5492,7 @@ g_file_real_create_readwrite_async (GFile               *file,
 
   task = g_task_new (file, cancellable, callback, user_data);
   g_task_set_task_data (task, data, g_free);
-  g_task_set_priority (task, io_priority);
+  g_task_set_scheduling (task, io_priority, G_TASK_THREAD_KIND_IO);
 
   g_task_run_in_thread (task, create_readwrite_async_thread);
   g_object_unref (task);
@@ -5567,7 +5567,7 @@ g_file_real_replace_readwrite_async (GFile               *file,
 
   task = g_task_new (file, cancellable, callback, user_data);
   g_task_set_task_data (task, data, (GDestroyNotify)replace_rw_async_data_free);
-  g_task_set_priority (task, io_priority);
+  g_task_set_scheduling (task, io_priority, G_TASK_THREAD_KIND_IO);
 
   g_task_run_in_thread (task, replace_readwrite_async_thread);
   g_object_unref (task);
@@ -5613,7 +5613,7 @@ g_file_real_set_display_name_async (GFile               *file,
 
   task = g_task_new (file, cancellable, callback, user_data);
   g_task_set_task_data (task, g_strdup (display_name), g_free);
-  g_task_set_priority (task, io_priority);
+  g_task_set_scheduling (task, io_priority, G_TASK_THREAD_KIND_IO);
 
   g_task_run_in_thread (task, set_display_name_async_thread);
   g_object_unref (task);
@@ -5680,7 +5680,7 @@ g_file_real_set_attributes_async (GFile               *file,
 
   task = g_task_new (file, cancellable, callback, user_data);
   g_task_set_task_data (task, data, (GDestroyNotify)set_info_data_free);
-  g_task_set_priority (task, io_priority);
+  g_task_set_scheduling (task, io_priority, G_TASK_THREAD_KIND_IO);
 
   g_task_run_in_thread (task, set_info_async_thread);
   g_object_unref (task);
@@ -5734,7 +5734,7 @@ g_file_real_find_enclosing_mount_async (GFile               *file,
   GTask *task;
 
   task = g_task_new (file, cancellable, callback, user_data);
-  g_task_set_priority (task, io_priority);
+  g_task_set_scheduling (task, io_priority, G_TASK_THREAD_KIND_IO);
 
   g_task_run_in_thread (task, find_enclosing_mount_async_thread);
   g_object_unref (task);
@@ -5853,7 +5853,7 @@ g_file_real_copy_async (GFile                  *source,
 
   task = g_task_new (source, cancellable, callback, user_data);
   g_task_set_task_data (task, data, (GDestroyNotify)copy_async_data_free);
-  g_task_set_priority (task, io_priority);
+  g_task_set_scheduling (task, io_priority, G_TASK_THREAD_KIND_IO);
   g_task_run_in_thread (task, copy_async_thread);
   g_object_unref (task);
 }
diff --git a/gio/gfileenumerator.c b/gio/gfileenumerator.c
index d2779ce..ae661c5 100644
--- a/gio/gfileenumerator.c
+++ b/gio/gfileenumerator.c
@@ -678,7 +678,7 @@ g_file_enumerator_real_next_files_async (GFileEnumerator     *enumerator,
 
   task = g_task_new (enumerator, cancellable, callback, user_data);
   g_task_set_task_data (task, GINT_TO_POINTER (num_files), NULL);
-  g_task_set_priority (task, io_priority);
+  g_task_set_scheduling (task, io_priority, G_TASK_THREAD_KIND_IO);
 
   g_task_run_in_thread (task, next_files_thread);
   g_object_unref (task);
@@ -723,7 +723,7 @@ g_file_enumerator_real_close_async (GFileEnumerator     *enumerator,
   GTask *task;
 
   task = g_task_new (enumerator, cancellable, callback, user_data);
-  g_task_set_priority (task, io_priority);
+  g_task_set_scheduling (task, io_priority, G_TASK_THREAD_KIND_IO);
   
   g_task_run_in_thread (task, close_async_thread);
   g_object_unref (task);
diff --git a/gio/gfileinputstream.c b/gio/gfileinputstream.c
index 73e7b78..ff89ef3 100644
--- a/gio/gfileinputstream.c
+++ b/gio/gfileinputstream.c
@@ -417,7 +417,7 @@ g_file_input_stream_real_query_info_async (GFileInputStream    *stream,
 
   task = g_task_new (stream, cancellable, callback, user_data);
   g_task_set_task_data (task, g_strdup (attributes), g_free);
-  g_task_set_priority (task, io_priority);
+  g_task_set_scheduling (task, io_priority, G_TASK_THREAD_KIND_IO);
   
   g_task_run_in_thread (task, query_info_async_thread);
   g_object_unref (task);
diff --git a/gio/gfileoutputstream.c b/gio/gfileoutputstream.c
index 7b77ba5..3d9bbdd 100644
--- a/gio/gfileoutputstream.c
+++ b/gio/gfileoutputstream.c
@@ -520,7 +520,7 @@ g_file_output_stream_real_query_info_async (GFileOutputStream     *stream,
 
   task = g_task_new (stream, cancellable, callback, user_data);
   g_task_set_task_data (task, g_strdup (attributes), g_free);
-  g_task_set_priority (task, io_priority);
+  g_task_set_scheduling (task, io_priority, G_TASK_THREAD_KIND_IO);
   
   g_task_run_in_thread (task, query_info_async_thread);
   g_object_unref (task);
diff --git a/gio/gio.symbols b/gio/gio.symbols
index 3096755..19fc9ac 100644
--- a/gio/gio.symbols
+++ b/gio/gio.symbols
@@ -1793,6 +1793,7 @@ g_task_run_in_thread
 g_task_run_in_thread_sync
 g_task_set_check_cancellable
 g_task_set_priority
+g_task_set_kind
 g_task_set_return_on_cancel
 g_task_set_source_tag
 g_task_set_task_data
diff --git a/gio/gioenums.h b/gio/gioenums.h
index 39c43c0..4030501 100644
--- a/gio/gioenums.h
+++ b/gio/gioenums.h
@@ -1655,6 +1655,24 @@ typedef enum /*< flags >*/ {
   G_TEST_DBUS_NONE = 0
 } GTestDBusFlags;
 
+/**
+ * GTaskThreadKind:
+ * @G_TASK_THREAD_KIND_DEFAULT: Work with unknown performance characteristics
+ * @G_TASK_THREAD_KIND_LOCAL_IO: Performs synchronous I/O on local files
+ * @G_TASK_THREAD_KIND_CPU: Mostly uses the processor
+ * @G_TASK_THREAD_KIND_MIXED: Alternates between IO and CPU
+ *
+ * Flags to control how a #GTask is scheduled.  
+ *
+ * Since: 2.36
+ */
+typedef enum {
+  G_TASK_THREAD_KIND_DEFAULT = 0,
+  G_TASK_THREAD_KIND_IO,
+  G_TASK_THREAD_KIND_CPU,
+  G_TASK_THREAD_KIND_MIXED
+} GTaskThreadKind;
+
 G_END_DECLS
 
 #endif /* __GIO_ENUMS_H__ */
diff --git a/gio/gtask.c b/gio/gtask.c
index 0a80c24..b72e5ac 100644
--- a/gio/gtask.c
+++ b/gio/gtask.c
@@ -24,6 +24,10 @@
 
 #include "gasyncresult.h"
 #include "gcancellable.h"
+#ifdef G_OS_UNIX
+#include <unistd.h>
+#include <errno.h>
+#endif
 
 /**
  * SECTION:gtask
@@ -578,6 +582,7 @@ struct _GTask {
   gpointer callback_data;
 
   GTaskThreadFunc task_func;
+  GTaskThreadKind kind;
   GMutex lock;
   GCond cond;
   gboolean return_on_cancel;
@@ -612,12 +617,16 @@ G_DEFINE_TYPE_WITH_CODE (GTask, g_task, G_TYPE_OBJECT,
                                                 g_task_async_result_iface_init);
                          g_task_thread_pool_init ();)
 
-static GThreadPool *task_pool;
+static GThreadPool *task_pool_default;
+static GThreadPool *task_pool_cpu;
+static GThreadPool *task_pool_io;
+static GThreadPool *task_pool_mixed;
 
 static void
 g_task_init (GTask *task)
 {
   task->check_cancellable = TRUE;
+  task->kind = G_TASK_THREAD_KIND_DEFAULT;
 }
 
 static void
@@ -825,6 +834,34 @@ g_task_set_priority (GTask *task,
 }
 
 /**
+ * g_task_set_scheduling:
+ * @task: the #GTask
+ * @priority: the <link linkend="io-priority">priority</link>
+ *   of the request.
+ * @kind: Performance characteristics of @task
+ *
+ * First, uses @priority like g_task_set_priority().  Additionally,
+ * the argument @kind allows GLib to more efficiently schedule threads
+ * created internally by #GTask.  For example, if you set
+ * %G_TASK_THREAD_KIND_CPU, the system will ensure that the number of
+ * threads does not significantly exceed the number of processors
+ * available.
+ *
+ * This function must be used before invoking g_task_run_in_thread()
+ * or similar.
+ *
+ * Since: 2.36
+ */
+void
+g_task_set_scheduling (GTask           *task,
+                       gint             priority,
+                       GTaskThreadKind  kind)
+{
+  g_task_set_priority (task, priority);
+  task->kind = kind;
+}
+
+/**
  * g_task_set_check_cancellable:
  * @task: the #GTask
  * @check_cancellable: whether #GTask will check the state of
@@ -1267,6 +1304,8 @@ static void
 g_task_start_task_thread (GTask           *task,
                           GTaskThreadFunc  task_func)
 {
+  GThreadPool *pool = NULL;
+
   g_mutex_init (&task->lock);
   g_cond_init (&task->cond);
 
@@ -1274,6 +1313,23 @@ g_task_start_task_thread (GTask           *task,
 
   task->task_func = task_func;
 
+  switch (task->kind)
+    {
+    case G_TASK_THREAD_KIND_DEFAULT:
+      pool = task_pool_default;
+      break;
+    case G_TASK_THREAD_KIND_CPU:
+      pool = task_pool_cpu;
+      break;
+    case G_TASK_THREAD_KIND_IO:
+      pool = task_pool_io;
+      break;
+    case G_TASK_THREAD_KIND_MIXED:
+      pool = task_pool_mixed;
+      break;
+    }
+  g_assert (pool);
+
   if (task->cancellable)
     {
       if (task->return_on_cancel &&
@@ -1281,7 +1337,7 @@ g_task_start_task_thread (GTask           *task,
                                                 &task->error))
         {
           task->thread_cancelled = task->thread_complete = TRUE;
-          g_thread_pool_push (task_pool, g_object_ref (task), NULL);
+          g_thread_pool_push (pool, g_object_ref (task), NULL);
           return;
         }
 
@@ -1291,7 +1347,7 @@ g_task_start_task_thread (GTask           *task,
                              task_thread_cancelled_disconnect_notify, 0);
     }
 
-  g_thread_pool_push (task_pool, g_object_ref (task), &task->error);
+  g_thread_pool_push (pool, g_object_ref (task), &task->error);
   if (task->error)
     task->thread_complete = TRUE;
 }
@@ -1299,11 +1355,20 @@ g_task_start_task_thread (GTask           *task,
 /**
  * g_task_run_in_thread:
  * @task: a #GTask
+ * @kind: Performance characteristics of @task_func
  * @task_func: a #GTaskThreadFunc
  *
  * Runs @task_func in another thread. When @task_func returns, @task's
  * #GAsyncReadyCallback will be invoked in @task's #GMainContext.
  *
+ * The parameter @kind determines how the work will be scheduled.  If
+ * the given @task_func synchronously invokes any other GLib-based
+ * API, you must use #G_TASK_THREAD_KIND_DEFAULT.  Otherwise, there is
+ * the possibility of deadlock.
+ *
+ * For example, GLib will try to avoid running significantly more CPU
+ * bound threads than there are system processors.  IO bound threads
+ *
  * This takes a ref on @task until the task completes.
  *
  * See #GTaskThreadFunc for more details about how @task_func is handled.
@@ -1762,20 +1827,51 @@ g_task_compare_priority (gconstpointer a,
   return ta->priority - tb->priority;
 }
 
+static gint
+get_nproc_onln (void)
+{
+  gint result;
+#ifdef G_OS_UNIX
+  result = sysconf (_SC_NPROCESSORS_ONLN);
+  if (G_UNLIKELY (result == -1 && errno == EINVAL))
+    return 2;
+  return result;
+#else
+  return 2;
+#endif
+}
+
+static GThreadPool *
+generic_pool_new (gint max_threads)
+{
+  GThreadPool *result = g_thread_pool_new (g_task_thread_pool_thread, NULL,
+                                           100, FALSE, NULL);
+  g_thread_pool_set_sort_function (result, g_task_compare_priority, NULL);
+  g_assert (result != NULL);
+  return result;
+}
+
 static void
 g_task_thread_pool_init (void)
 {
-  task_pool = g_thread_pool_new (g_task_thread_pool_thread, NULL,
-                                 100, FALSE, NULL);
-  g_assert (task_pool != NULL);
-
-  g_thread_pool_set_sort_function (task_pool, g_task_compare_priority, NULL);
+  int nproc;
+  /* This number was picked out of thin air */
+  const int max_io_threads = 4;
+  /* See https://bugzilla.gnome.org/show_bug.cgi?id=687223 */
+  const int max_default_threads = 100;
+
+  nproc = get_nproc_onln ();
+
+  task_pool_default = generic_pool_new (max_default_threads);
+  task_pool_cpu = generic_pool_new (nproc);
+  task_pool_io = generic_pool_new (max_io_threads);
+  task_pool_mixed = generic_pool_new (nproc + max_io_threads);
 }
 
 static void
 g_task_thread_pool_resort (void)
 {
-  g_thread_pool_set_sort_function (task_pool, g_task_compare_priority, NULL);
+  g_thread_pool_set_sort_function (task_pool_default, g_task_compare_priority, NULL);
 }
 
 static void
diff --git a/gio/gtask.h b/gio/gtask.h
index 2cc5f65..d4e9cd9 100644
--- a/gio/gtask.h
+++ b/gio/gtask.h
@@ -71,6 +71,10 @@ GLIB_AVAILABLE_IN_2_36
 void          g_task_set_priority          (GTask               *task,
                                             gint                 priority);
 GLIB_AVAILABLE_IN_2_36
+void          g_task_set_scheduling        (GTask               *task,
+                                            gint                 glib_priority,
+                                            GTaskThreadKind      kind);
+GLIB_AVAILABLE_IN_2_36
 void          g_task_set_check_cancellable (GTask               *task,
                                             gboolean             check_cancellable);
 GLIB_AVAILABLE_IN_2_36
diff --git a/gio/tests/task.c b/gio/tests/task.c
index 8065807..db2eeca 100644
--- a/gio/tests/task.c
+++ b/gio/tests/task.c
@@ -897,6 +897,22 @@ test_run_in_thread_priority (void)
   g_mutex_unlock (&fake_task_mutex);
 }
 
+static void
+test_run_in_thread_kind (void)
+{
+  GTask *task;
+  gint seq = 42;
+  
+  /* No real semantics here, we're basically just calling
+   * g_task_set_scheduling() to get basic code coverage.
+   */
+  task = g_task_new (NULL, NULL, quit_main_loop_callback, NULL);
+  g_task_set_task_data (task, &seq, NULL);
+  g_task_set_scheduling (task, G_PRIORITY_DEFAULT, G_TASK_THREAD_KIND_IO);
+  g_task_run_in_thread (task, set_sequence_number_thread);
+  g_object_unref (task);
+}
+
 /* test_return_on_cancel */
 
 GMutex roc_init_mutex, roc_finish_mutex;
@@ -1652,6 +1668,7 @@ main (int argc, char **argv)
   g_test_add_func ("/gtask/run-in-thread", test_run_in_thread);
   g_test_add_func ("/gtask/run-in-thread-sync", test_run_in_thread_sync);
   g_test_add_func ("/gtask/run-in-thread-priority", test_run_in_thread_priority);
+  g_test_add_func ("/gtask/run-in-thread-kind", test_run_in_thread_kind);
   g_test_add_func ("/gtask/return-on-cancel", test_return_on_cancel);
   g_test_add_func ("/gtask/return-on-cancel-sync", test_return_on_cancel_sync);
   g_test_add_func ("/gtask/return-on-cancel-atomic", test_return_on_cancel_atomic);



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]