[glib] gtask: don't deadlock when tasks block on other tasks



commit 07bb8097e535eea27c5130c523f70759fb2ff45d
Author: Dan Winship <danw gnome org>
Date:   Sat Dec 15 11:44:59 2012 -0500

    gtask: don't deadlock when tasks block on other tasks
    
    If tasks block waiting for other tasks to complete then the system can
    end up starved for threads. Avoid this by bumping up max-threads in
    that case.
    
    This also reverts 7b1f8c58 and reverts max-threads for GTask's
    GThreadPool back to 10.
    
    https://bugzilla.gnome.org/show_bug.cgi?id=687223

 gio/gtask.c      |   41 ++++++++++++++++++++++++--
 gio/tests/task.c |   83 +++++++++++++++++++++++++++++++++++++++++++++++++----
 2 files changed, 114 insertions(+), 10 deletions(-)
---
diff --git a/gio/gtask.c b/gio/gtask.c
index 0a80c24..bdef1f4 100644
--- a/gio/gtask.c
+++ b/gio/gtask.c
@@ -584,6 +584,7 @@ struct _GTask {
   gboolean thread_cancelled;
   gboolean synchronous;
   gboolean thread_complete;
+  gboolean blocking_other_task;
 
   GError *error;
   union {
@@ -613,6 +614,8 @@ G_DEFINE_TYPE_WITH_CODE (GTask, g_task, G_TYPE_OBJECT,
                          g_task_thread_pool_init ();)
 
 static GThreadPool *task_pool;
+static GMutex task_pool_mutex;
+static GPrivate task_private = G_PRIVATE_INIT (NULL);
 
 static void
 g_task_init (GTask *task)
@@ -1208,6 +1211,15 @@ g_task_thread_complete (GTask *task)
     }
 
   task->thread_complete = TRUE;
+
+  if (task->blocking_other_task)
+    {
+      g_mutex_lock (&task_pool_mutex);
+      g_thread_pool_set_max_threads (task_pool,
+                                     g_thread_pool_get_max_threads (task_pool) - 1,
+                                     NULL);
+      g_mutex_unlock (&task_pool_mutex);
+    }
   g_mutex_unlock (&task->lock);
 
   if (task->cancellable)
@@ -1225,9 +1237,13 @@ g_task_thread_pool_thread (gpointer thread_data,
 {
   GTask *task = thread_data;
 
+  g_private_set (&task_private, task);
+
   task->task_func (task, task->source_object, task->task_data,
                    task->cancellable);
   g_task_thread_complete (task);
+
+  g_private_set (&task_private, NULL);
   g_object_unref (task);
 }
 
@@ -1294,6 +1310,18 @@ g_task_start_task_thread (GTask           *task,
   g_thread_pool_push (task_pool, g_object_ref (task), &task->error);
   if (task->error)
     task->thread_complete = TRUE;
+  else if (g_private_get (&task_private))
+    {
+      /* This thread is being spawned from another GTask thread, so
+       * bump up max-threads so we don't starve.
+       */
+      g_mutex_lock (&task_pool_mutex);
+      if (g_thread_pool_set_max_threads (task_pool,
+                                         g_thread_pool_get_max_threads (task_pool) + 1,
+                                         NULL))
+        task->blocking_other_task = TRUE;
+      g_mutex_unlock (&task_pool_mutex);
+    }
 }
 
 /**
@@ -1747,12 +1775,19 @@ g_task_compare_priority (gconstpointer a,
   const GTask *tb = b;
   gboolean a_cancelled, b_cancelled;
 
+  /* Tasks that are causing other tasks to block have higher
+   * priority.
+   */
+  if (ta->blocking_other_task && !tb->blocking_other_task)
+    return -1;
+  else if (tb->blocking_other_task && !ta->blocking_other_task)
+    return 1;
+
+  /* Let already-cancelled tasks finish right away */
   a_cancelled = (ta->check_cancellable &&
                  g_cancellable_is_cancelled (ta->cancellable));
   b_cancelled = (tb->check_cancellable &&
                  g_cancellable_is_cancelled (tb->cancellable));
-
-  /* Let already-cancelled tasks finish right away */
   if (a_cancelled && !b_cancelled)
     return -1;
   else if (b_cancelled && !a_cancelled)
@@ -1766,7 +1801,7 @@ static void
 g_task_thread_pool_init (void)
 {
   task_pool = g_thread_pool_new (g_task_thread_pool_thread, NULL,
-                                 100, FALSE, NULL);
+                                 10, FALSE, NULL);
   g_assert (task_pool != NULL);
 
   g_thread_pool_set_sort_function (task_pool, g_task_compare_priority, NULL);
diff --git a/gio/tests/task.c b/gio/tests/task.c
index 8065807..6dbff5c 100644
--- a/gio/tests/task.c
+++ b/gio/tests/task.c
@@ -824,29 +824,37 @@ fake_task_thread (GTask        *task,
   g_task_return_boolean (task, TRUE);
 }
 
-#define G_TASK_THREAD_POOL_SIZE 100
+#define G_TASK_THREAD_POOL_SIZE 10
+static int fake_tasks_running;
 
 static void
-test_run_in_thread_priority (void)
+fake_task_callback (GObject      *source,
+                    GAsyncResult *result,
+                    gpointer      user_data)
+{
+  if (--fake_tasks_running == 0)
+    g_main_loop_quit (loop);
+}
+
+static void
+clog_up_thread_pool (void)
 {
   GTask *task;
-  GCancellable *cancellable;
-  int seq_a, seq_b, seq_c, seq_d;
   int i;
 
-  /* Flush the thread pool, then clog it up with junk tasks */
   g_thread_pool_stop_unused_threads ();
 
   g_mutex_lock (&fake_task_mutex);
   for (i = 0; i < G_TASK_THREAD_POOL_SIZE - 1; i++)
     {
-      task = g_task_new (NULL, NULL, NULL, NULL);
+      task = g_task_new (NULL, NULL, fake_task_callback, NULL);
       g_task_set_task_data (task, &fake_task_mutex, NULL);
       g_assert_cmpint (g_task_get_priority (task), ==, G_PRIORITY_DEFAULT);
       g_task_set_priority (task, G_PRIORITY_HIGH * 2);
       g_assert_cmpint (g_task_get_priority (task), ==, G_PRIORITY_HIGH * 2);
       g_task_run_in_thread (task, fake_task_thread);
       g_object_unref (task);
+      fake_tasks_running++;
     }
 
   g_mutex_lock (&last_fake_task_mutex);
@@ -855,6 +863,23 @@ test_run_in_thread_priority (void)
   g_task_set_priority (task, G_PRIORITY_HIGH * 2);
   g_task_run_in_thread (task, fake_task_thread);
   g_object_unref (task);
+}
+
+static void
+unclog_thread_pool (void)
+{
+  g_mutex_unlock (&fake_task_mutex);
+  g_main_loop_run (loop);
+}
+
+static void
+test_run_in_thread_priority (void)
+{
+  GTask *task;
+  GCancellable *cancellable;
+  int seq_a, seq_b, seq_c, seq_d;
+
+  clog_up_thread_pool ();
 
   /* Queue three more tasks that we'll arrange to have run serially */
   task = g_task_new (NULL, NULL, NULL, NULL);
@@ -894,7 +919,50 @@ test_run_in_thread_priority (void)
   g_assert_cmpint (seq_a, ==, 3);
   g_assert_cmpint (seq_b, ==, 4);
 
-  g_mutex_unlock (&fake_task_mutex);
+  unclog_thread_pool ();
+}
+
+/* test_run_in_thread_nested: task threads that block waiting on
+ * other task threads will not cause the thread pool to starve.
+ */
+
+static void
+run_nested_task_thread (GTask        *task,
+                        gpointer      source_object,
+                        gpointer      task_data,
+                        GCancellable *cancellable)
+{
+  GTask *nested;
+  int *nested_tasks_left = task_data;
+
+  if ((*nested_tasks_left)--)
+    {
+      nested = g_task_new (NULL, NULL, NULL, NULL);
+      g_task_set_task_data (nested, nested_tasks_left, NULL);
+      g_task_run_in_thread_sync (nested, run_nested_task_thread);
+      g_object_unref (nested);
+    }
+
+  g_task_return_boolean (task, TRUE);
+}
+
+static void
+test_run_in_thread_nested (void)
+{
+  GTask *task;
+  int nested_tasks_left = 2;
+
+  clog_up_thread_pool ();
+
+  task = g_task_new (NULL, NULL, quit_main_loop_callback, NULL);
+  g_task_set_task_data (task, &nested_tasks_left, NULL);
+  g_task_run_in_thread (task, run_nested_task_thread);
+  g_object_unref (task);
+
+  g_mutex_unlock (&last_fake_task_mutex);
+  g_main_loop_run (loop);
+
+  unclog_thread_pool ();
 }
 
 /* test_return_on_cancel */
@@ -1652,6 +1720,7 @@ main (int argc, char **argv)
   g_test_add_func ("/gtask/run-in-thread", test_run_in_thread);
   g_test_add_func ("/gtask/run-in-thread-sync", test_run_in_thread_sync);
   g_test_add_func ("/gtask/run-in-thread-priority", test_run_in_thread_priority);
+  g_test_add_func ("/gtask/run-in-thread-nested", test_run_in_thread_nested);
   g_test_add_func ("/gtask/return-on-cancel", test_return_on_cancel);
   g_test_add_func ("/gtask/return-on-cancel-sync", test_return_on_cancel_sync);
   g_test_add_func ("/gtask/return-on-cancel-atomic", test_return_on_cancel_atomic);



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]