[glib] gtask: remove hardcoded GTask thread-pool size



commit 86866a2a6d4b7aa289f4f782dddac156869400bf
Author: Dan Winship <danw gnome org>
Date:   Mon Mar 9 16:33:16 2015 -0400

    gtask: remove hardcoded GTask thread-pool size
    
    GTask used a 10-thread thread pool for g_task_run_in_thread() /
    g_task_run_in_thread_sync(), but this ran into problems when task
    threads blocked waiting for another g_task_run_in_thread_sync()
    operation to complete. Previously there was a workaround for this, by
    bumping up the thread limit when that case was detected, but deadlocks
    could still happen if there were non-GTask threads involved. (Eg, task
    A sends a message to thread X and waits for a response, but thread X
    needs to complete task B in a thread before returning the response to
    task A.)
    
    So, allow GTask's thread pool to be expanded dynamically, by watching
    it from the glib worker thread, and growing it (at an
    exponentially-decreasing rate) if too much time passes without any
    tasks completing. This should solve the deadlocking problems without
    causing sudden breakage in apps that assume they can queue huge
    numbers of tasks at once without consequences.
    
    https://bugzilla.gnome.org/show_bug.cgi?id=687223

 README.in        |   18 +++++++
 gio/gtask.c      |  138 ++++++++++++++++++++++++++++++++++++++++--------------
 gio/tests/task.c |   91 +++++++++++++++++++++++++++++++++++
 3 files changed, 212 insertions(+), 35 deletions(-)
---
diff --git a/README.in b/README.in
index 6a86f85..c2841a5 100644
--- a/README.in
+++ b/README.in
@@ -67,6 +67,24 @@ and attach the patch to that bug report.
 
 Patches should be in unified diff form. (The -up option to GNU diff.)
 
+Notes about GLib 2.46
+=====================
+
+* GTask no longer imposes a fixed limit on the number of tasks that
+  can be run_in_thread() simultaneously, since doing this inevitably
+  results in deadlocks in some use cases. Instead, it now has a base
+  number of threads that can be used "for free", but will gradually
+  add more threads to the pool if too much time passes without any
+  tasks completing.
+
+  The exact behavior may continue to change in the future, and it's
+  possible that some future version of GLib may not do any
+  rate-limiting at all. As a result, you should no longer assume that
+  GTask will rate-limit tasks itself (or, by extension, that calls to
+  certain async gio methods will automatically be rate-limited for
+  you). If you have a very large number of tasks to run, and don't
+  want them to all run at once, you should rate-limit them yourself.
+
 Notes about GLib 2.40
 =====================
 
diff --git a/gio/gtask.c b/gio/gtask.c
index 566f7cb..2bf82a2 100644
--- a/gio/gtask.c
+++ b/gio/gtask.c
@@ -22,6 +22,7 @@
 
 #include "gasyncresult.h"
 #include "gcancellable.h"
+#include "glib-private.h"
 
 #include "glibintl.h"
 
@@ -558,7 +559,6 @@ struct _GTask {
   gboolean thread_cancelled;
   gboolean synchronous;
   gboolean thread_complete;
-  gboolean blocking_other_task;
 
   GError *error;
   union {
@@ -594,7 +594,25 @@ G_DEFINE_TYPE_WITH_CODE (GTask, g_task, G_TYPE_OBJECT,
 
 static GThreadPool *task_pool;
 static GMutex task_pool_mutex;
-static GPrivate task_private = G_PRIVATE_INIT (NULL);
+static GSource *task_pool_manager;
+static guint64 task_wait_time;
+static gint tasks_running;
+
+/* When the task pool fills up and blocks, and the program keeps
+ * queueing more tasks, we will slowly add more threads to the pool
+ * (in case the existing tasks are trying to queue subtasks of their
+ * own) until tasks start completing again. These "overflow" threads
+ * will only run one task apiece, and then exit, so the pool will
+ * eventually get back down to its base size.
+ *
+ * The base and multiplier below gives us 10 extra threads after about
+ * a second of blocking, 30 after 5 seconds, 100 after a minute, and
+ * 200 after 20 minutes.
+ */
+#define G_TASK_POOL_SIZE 10
+#define G_TASK_WAIT_TIME_BASE 100000
+#define G_TASK_WAIT_TIME_MULTIPLIER 1.03
+#define G_TASK_WAIT_TIME_MAX (30 * 60 * 1000000)
 
 static void
 g_task_init (GTask *task)
@@ -1200,15 +1218,6 @@ g_task_thread_complete (GTask *task)
     }
 
   task->thread_complete = TRUE;
-
-  if (task->blocking_other_task)
-    {
-      g_mutex_lock (&task_pool_mutex);
-      g_thread_pool_set_max_threads (task_pool,
-                                     g_thread_pool_get_max_threads (task_pool) - 1,
-                                     NULL);
-      g_mutex_unlock (&task_pool_mutex);
-    }
   g_mutex_unlock (&task->lock);
 
   if (task->cancellable)
@@ -1220,20 +1229,65 @@ g_task_thread_complete (GTask *task)
     g_task_return (task, G_TASK_RETURN_FROM_THREAD);
 }
 
+static gboolean
+task_pool_manager_timeout (gpointer user_data)
+{
+  g_mutex_lock (&task_pool_mutex);
+  g_thread_pool_set_max_threads (task_pool, tasks_running + 1, NULL);
+  g_source_set_ready_time (task_pool_manager, -1);
+  g_mutex_unlock (&task_pool_mutex);
+
+  return TRUE;
+}
+
+static void
+g_task_thread_setup (void)
+{
+  g_mutex_lock (&task_pool_mutex);
+  tasks_running++;
+
+  if (tasks_running == G_TASK_POOL_SIZE)
+    task_wait_time = G_TASK_WAIT_TIME_BASE;
+  else if (tasks_running > G_TASK_POOL_SIZE && task_wait_time < G_TASK_WAIT_TIME_MAX)
+    task_wait_time *= G_TASK_WAIT_TIME_MULTIPLIER;
+
+  if (tasks_running >= G_TASK_POOL_SIZE)
+    g_source_set_ready_time (task_pool_manager, g_get_monotonic_time () + task_wait_time);
+
+  g_mutex_unlock (&task_pool_mutex);
+}
+
+static void
+g_task_thread_cleanup (void)
+{
+  gint tasks_pending;
+
+  g_mutex_lock (&task_pool_mutex);
+  tasks_pending = g_thread_pool_unprocessed (task_pool);
+
+  if (tasks_running > G_TASK_POOL_SIZE)
+    g_thread_pool_set_max_threads (task_pool, tasks_running - 1, NULL);
+  else if (tasks_running + tasks_pending < G_TASK_POOL_SIZE)
+    g_source_set_ready_time (task_pool_manager, -1);
+
+  tasks_running--;
+  g_mutex_unlock (&task_pool_mutex);
+}
+
 static void
 g_task_thread_pool_thread (gpointer thread_data,
                            gpointer pool_data)
 {
   GTask *task = thread_data;
 
-  g_private_set (&task_private, task);
+  g_task_thread_setup ();
 
   task->task_func (task, task->source_object, task->task_data,
                    task->cancellable);
   g_task_thread_complete (task);
-
-  g_private_set (&task_private, NULL);
   g_object_unref (task);
+
+  g_task_thread_cleanup ();
 }
 
 static void
@@ -1305,18 +1359,6 @@ g_task_start_task_thread (GTask           *task,
     }
 
   g_thread_pool_push (task_pool, g_object_ref (task), NULL);
-  if (g_private_get (&task_private))
-    {
-      /* This thread is being spawned from another GTask thread, so
-       * bump up max-threads so we don't starve.
-       */
-      g_mutex_lock (&task_pool_mutex);
-      if (g_thread_pool_set_max_threads (task_pool,
-                                         g_thread_pool_get_max_threads (task_pool) + 1,
-                                         NULL))
-        task->blocking_other_task = TRUE;
-      g_mutex_unlock (&task_pool_mutex);
-    }
 }
 
 /**
@@ -1331,6 +1373,12 @@ g_task_start_task_thread (GTask           *task,
  *
  * See #GTaskThreadFunc for more details about how @task_func is handled.
  *
+ * Although GLib currently rate-limits the tasks queued via
+ * g_task_run_in_thread(), you should not assume that it will always
+ * do this. If you have a very large number of tasks to run, but don't
+ * want them to all run at once, you should only queue a limited
+ * number of them at a time.
+ *
  * Since: 2.36
  */
 void
@@ -1372,6 +1420,12 @@ g_task_run_in_thread (GTask           *task,
  * have a callback, it will not be invoked when @task_func returns.
  * #GTask:completed will be set to %TRUE just before this function returns.
  *
+ * Although GLib currently rate-limits the tasks queued via
+ * g_task_run_in_thread_sync(), you should not assume that it will
+ * always do this. If you have a very large number of tasks to run,
+ * but don't want them to all run at once, you should only queue a
+ * limited number of them at a time.
+ *
  * Since: 2.36
  */
 void
@@ -1794,14 +1848,6 @@ g_task_compare_priority (gconstpointer a,
   const GTask *tb = b;
   gboolean a_cancelled, b_cancelled;
 
-  /* Tasks that are causing other tasks to block have higher
-   * priority.
-   */
-  if (ta->blocking_other_task && !tb->blocking_other_task)
-    return -1;
-  else if (tb->blocking_other_task && !ta->blocking_other_task)
-    return 1;
-
   /* Let already-cancelled tasks finish right away */
   a_cancelled = (ta->check_cancellable &&
                  g_cancellable_is_cancelled (ta->cancellable));
@@ -1816,14 +1862,36 @@ g_task_compare_priority (gconstpointer a,
   return ta->priority - tb->priority;
 }
 
+static gboolean
+trivial_source_dispatch (GSource     *source,
+                         GSourceFunc  callback,
+                         gpointer     user_data)
+{
+  return callback (user_data);
+}
+
+GSourceFuncs trivial_source_funcs = {
+  NULL, /* prepare */
+  NULL, /* check */
+  trivial_source_dispatch,
+  NULL
+};
+
 static void
 g_task_thread_pool_init (void)
 {
   task_pool = g_thread_pool_new (g_task_thread_pool_thread, NULL,
-                                 10, FALSE, NULL);
+                                 G_TASK_POOL_SIZE, FALSE, NULL);
   g_assert (task_pool != NULL);
 
   g_thread_pool_set_sort_function (task_pool, g_task_compare_priority, NULL);
+
+  task_pool_manager = g_source_new (&trivial_source_funcs, sizeof (GSource));
+  g_source_set_callback (task_pool_manager, task_pool_manager_timeout, NULL, NULL);
+  g_source_set_ready_time (task_pool_manager, -1);
+  g_source_attach (task_pool_manager,
+                   GLIB_PRIVATE_CALL (g_get_worker_context ()));
+  g_source_unref (task_pool_manager);
 }
 
 static void
diff --git a/gio/tests/task.c b/gio/tests/task.c
index 9493fed..c4e209c 100644
--- a/gio/tests/task.c
+++ b/gio/tests/task.c
@@ -10,6 +10,7 @@
  */
 
 #include <gio/gio.h>
+#include <string.h>
 
 static GMainLoop *loop;
 static GThread *main_thread;
@@ -1097,6 +1098,95 @@ test_run_in_thread_nested (void)
   unclog_thread_pool ();
 }
 
+/* test_run_in_thread_overflow: if you queue lots and lots and lots of
+ * tasks, they won't all run at once.
+ */
+static GMutex overflow_mutex;
+
+static void
+run_overflow_task_thread (GTask        *task,
+                          gpointer      source_object,
+                          gpointer      task_data,
+                          GCancellable *cancellable)
+{
+  gchar *result = task_data;
+
+  if (g_task_return_error_if_cancelled (task))
+    {
+      *result = 'X';
+      return;
+    }
+
+  /* Block until the main thread is ready. */
+  g_mutex_lock (&overflow_mutex);
+  g_mutex_unlock (&overflow_mutex);
+
+  *result = '.';
+
+  g_task_return_boolean (task, TRUE);
+}
+
+#define NUM_OVERFLOW_TASKS 1024
+
+static void
+test_run_in_thread_overflow (void)
+{
+  GCancellable *cancellable;
+  GTask *task;
+  gchar buf[NUM_OVERFLOW_TASKS + 1];
+  gint i;
+
+  /* Queue way too many tasks and then sleep for a bit. The first 10
+   * tasks will be dispatched to threads and will then block on
+   * overflow_mutex, so more threads will be created while this thread
+   * is sleeping. Then we cancel the cancellable, unlock the mutex,
+   * wait for all of the tasks to complete, and make sure that we got
+   * the behavior we expected.
+   */
+
+  memset (buf, 0, sizeof (buf));
+  cancellable = g_cancellable_new ();
+
+  g_mutex_lock (&overflow_mutex);
+
+  for (i = 0; i < NUM_OVERFLOW_TASKS; i++)
+    {
+      task = g_task_new (NULL, cancellable, NULL, NULL);
+      g_task_set_task_data (task, buf + i, NULL);
+      g_task_run_in_thread (task, run_overflow_task_thread);
+      g_object_unref (task);
+    }
+
+  if (g_test_slow ())
+    g_usleep (5000000); /* 5 s */
+  else
+    g_usleep (500000);  /* 0.5 s */
+  g_cancellable_cancel (cancellable);
+  g_object_unref (cancellable);
+
+  g_mutex_unlock (&overflow_mutex);
+
+  /* Wait for all tasks to complete. */
+  while (!buf[NUM_OVERFLOW_TASKS - 1])
+    g_usleep (1000);
+
+  i = strspn (buf, ".");
+  /* Given the sleep times above, i should be 14 for normal, 40 for
+   * slow. But if the machine is too slow/busy then the scheduling
+   * might get messed up and we'll get more or fewer threads than
+   * expected. But there are limits to how messed up it could
+   * plausibly get (and we hope that if gtask is actually broken then
+   * it will exceed those limits).
+   */
+  g_assert_cmpint (i, >=, 10);
+  if (g_test_slow ())
+    g_assert_cmpint (i, <, 50);
+  else
+    g_assert_cmpint (i, <, 20);
+
+  g_assert_cmpint (i + strspn (buf + i, "X"), ==, NUM_OVERFLOW_TASKS);
+}
+
 /* test_return_on_cancel */
 
 GMutex roc_init_mutex, roc_finish_mutex;
@@ -1893,6 +1983,7 @@ main (int argc, char **argv)
   g_test_add_func ("/gtask/run-in-thread-sync", test_run_in_thread_sync);
   g_test_add_func ("/gtask/run-in-thread-priority", test_run_in_thread_priority);
   g_test_add_func ("/gtask/run-in-thread-nested", test_run_in_thread_nested);
+  g_test_add_func ("/gtask/run-in-thread-overflow", test_run_in_thread_overflow);
   g_test_add_func ("/gtask/return-on-cancel", test_return_on_cancel);
   g_test_add_func ("/gtask/return-on-cancel-sync", test_return_on_cancel_sync);
   g_test_add_func ("/gtask/return-on-cancel-atomic", test_return_on_cancel_atomic);


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]