[glib/mcatanzaro/#1346] gtask: ensure task is destroyed on its context's thread



commit 201935a9ccb01026d5cf46b3e505e031070eecc0
Author: Michael Catanzaro <mcatanzaro igalia com>
Date:   Sat May 25 18:30:12 2019 -0500

    gtask: ensure task is destroyed on its context's thread
    
    The GTask must be destroyed on the thread that is running its
    GMainContext, i.e. the thread that started the task. It must never be
    destroyed on the actual task thread when running with
    g_task_run_in_thread(), because when it is destroyed, it will unref its
    source object and destroy its user data (if a GDestroyNotify was set for
    the data using g_task_set_task_data()). The source object and task data
    might not be safe to destroy on a secondary thread, though, so this is
    incorrect. We have to ensure they are destroyed on the task's context's
    thread.
    
    There are different ways we could do this, but the simplest by far is to
    ensure the task thread has unreffed the task before the context's thread
    executes the callback. And that is simple enough to do using a condition
    variable. We have to keep a static global map of all GTasks with
    outstanding task threads, which is slightly unfortunate, but we already
    have a bunch of global data in this file for managing the thread pool,
    and the map will only contain tasks that are currently running in
    threads, so it should be small.
    
    Fixes #1346

 gio/gtask.c      | 104 ++++++++++++++++++++++++++++++++++++++++++++++++++++++-
 gio/tests/task.c |  84 ++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 187 insertions(+), 1 deletion(-)
---
diff --git a/gio/gtask.c b/gio/gtask.c
index f6c89c974..6c1a39152 100644
--- a/gio/gtask.c
+++ b/gio/gtask.c
@@ -616,6 +616,46 @@ static GSource *task_pool_manager;
 static guint64 task_wait_time;
 static gint tasks_running;
 
+static GHashTable *completion_synchronizer_table; /* GTask -> ThreadedTaskCompletionSynchronizer */
+static GMutex completion_synchronizer_table_mutex;
+
+typedef struct {
+  GMutex mutex;
+  GCond condition;
+  gboolean finished;
+  gatomicrefcount refcount;
+} ThreadedTaskCompletionSynchronizer;
+
+static ThreadedTaskCompletionSynchronizer *
+threaded_task_completion_synchronizer_new (void)
+{
+  ThreadedTaskCompletionSynchronizer *synchronizer = g_new (ThreadedTaskCompletionSynchronizer, 1);
+
+  g_mutex_init (&synchronizer->mutex);
+  g_cond_init (&synchronizer->condition);
+  synchronizer->finished = FALSE;
+  g_atomic_ref_count_init (&synchronizer->refcount);
+
+  return synchronizer;
+}
+
+static void
+threaded_task_completion_synchronizer_ref (ThreadedTaskCompletionSynchronizer *synchronizer)
+{
+  g_atomic_ref_count_inc (&synchronizer->refcount);
+}
+
+static void
+threaded_task_completion_synchronizer_unref (ThreadedTaskCompletionSynchronizer *synchronizer)
+{
+  if (g_atomic_ref_count_dec (&synchronizer->refcount))
+    {
+      g_mutex_clear (&synchronizer->mutex);
+      g_cond_clear (&synchronizer->condition);
+      g_free (synchronizer);
+    }
+}
+
 /* When the task pool fills up and blocks, and the program keeps
  * queueing more tasks, we will slowly add more threads to the pool
  * (in case the existing tasks are trying to queue subtasks of their
@@ -1218,8 +1258,33 @@ g_task_return_now (GTask *task)
 }
 
 static gboolean
-complete_in_idle_cb (gpointer task)
+complete_in_idle_cb (gpointer user_data)
 {
+  GTask *task = user_data;
+
+  if (G_TASK_IS_THREADED (task))
+    {
+      ThreadedTaskCompletionSynchronizer *synchronizer;
+
+      /* Here we ensure that the task has already been unreffed on the task
+       * thread, to ensure we never destroy the task's source object or user
+       * data on the task thread.
+       */
+      g_mutex_lock (&completion_synchronizer_table_mutex);
+      synchronizer = g_hash_table_lookup (completion_synchronizer_table, task);
+      g_mutex_unlock (&completion_synchronizer_table_mutex);
+
+      g_assert (synchronizer != NULL);
+      g_mutex_lock (&synchronizer->mutex);
+      while (!synchronizer->finished)
+        g_cond_wait (&synchronizer->condition, &synchronizer->mutex);
+      g_mutex_unlock (&synchronizer->mutex);
+
+      g_mutex_lock (&completion_synchronizer_table_mutex);
+      g_hash_table_remove (completion_synchronizer_table, task);
+      g_mutex_unlock (&completion_synchronizer_table_mutex);
+    }
+
   g_task_return_now (task);
   g_object_unref (task);
   return FALSE;
@@ -1398,6 +1463,7 @@ g_task_thread_pool_thread (gpointer thread_data,
                            gpointer pool_data)
 {
   GTask *task = thread_data;
+  ThreadedTaskCompletionSynchronizer *synchronizer;
 
   g_task_thread_setup ();
 
@@ -1406,6 +1472,26 @@ g_task_thread_pool_thread (gpointer thread_data,
   g_task_thread_complete (task);
   g_object_unref (task);
 
+  /* Now we have to ensure that the task is not destroyed in this thread, since
+   * that would cause its source object and user data to be destroyed, and that
+   * needs to happen back in the original thread. So signal to the original
+   * thread that we are done. Note we cannot use task at this point except for
+   * its address.
+   */
+  g_mutex_lock (&completion_synchronizer_table_mutex);
+  synchronizer = g_hash_table_lookup (completion_synchronizer_table, task);
+  g_mutex_unlock (&completion_synchronizer_table_mutex);
+
+  g_assert (synchronizer != NULL);
+  threaded_task_completion_synchronizer_ref (synchronizer);
+
+  g_mutex_lock (&synchronizer->mutex);
+  synchronizer->finished = TRUE;
+  g_cond_signal (&synchronizer->condition);
+  g_mutex_unlock (&synchronizer->mutex);
+
+  threaded_task_completion_synchronizer_unref (synchronizer);
+
   g_task_thread_cleanup ();
 }
 
@@ -1448,6 +1534,18 @@ static void
 g_task_start_task_thread (GTask           *task,
                           GTaskThreadFunc  task_func)
 {
+  g_mutex_lock (&completion_synchronizer_table_mutex);
+  if (g_hash_table_contains (completion_synchronizer_table, task))
+    {
+      g_critical ("Task is already running in thread");
+      g_mutex_unlock (&completion_synchronizer_table_mutex);
+      return;
+    }
+
+  g_hash_table_insert (completion_synchronizer_table, task,
+                       threaded_task_completion_synchronizer_new ());
+  g_mutex_unlock (&completion_synchronizer_table_mutex);
+
   g_mutex_init (&task->lock);
   g_cond_init (&task->cond);
 
@@ -2061,6 +2159,10 @@ g_task_thread_pool_init (void)
   g_source_attach (task_pool_manager,
                    GLIB_PRIVATE_CALL (g_get_worker_context ()));
   g_source_unref (task_pool_manager);
+
+  completion_synchronizer_table = g_hash_table_new_full (g_direct_hash, g_direct_equal,
+                                                         NULL,
+                                                         
(GDestroyNotify)threaded_task_completion_synchronizer_unref);
 }
 
 static void
diff --git a/gio/tests/task.c b/gio/tests/task.c
index 9b2c6912c..8f9b10e15 100644
--- a/gio/tests/task.c
+++ b/gio/tests/task.c
@@ -1408,6 +1408,89 @@ test_run_in_thread_overflow (void)
   g_assert_cmpint (i + strspn (buf + i, "X"), ==, NUM_OVERFLOW_TASKS);
 }
 
+/* test_run_in_thread_destruction: source object and user data must be
+ * destroyed on the original thread, not the task thread
+ */
+
+#define SOURCE_OBJECT_TYPE_DESTRUCTION_THREAD_CHECKER (source_object_destruction_thread_checker_get_type())
+
+G_DECLARE_FINAL_TYPE (SourceObjectDestructionThreadChecker, source_object_destruction_thread_checker, 
SOURCE_OBJECT, DESTRUCTION_THREAD_CHECKER, GObject)
+
+struct _SourceObjectDestructionThreadChecker
+{
+  GObject parent_instance;
+};
+
+G_DEFINE_TYPE (SourceObjectDestructionThreadChecker, source_object_destruction_thread_checker, G_TYPE_OBJECT)
+
+static void
+source_object_destruction_thread_checker_finalize (GObject *object)
+{
+  g_assert_true (main_thread == g_thread_self ());
+
+  G_OBJECT_CLASS (source_object_destruction_thread_checker_parent_class)->finalize (object);
+}
+
+static void
+source_object_destruction_thread_checker_class_init (SourceObjectDestructionThreadCheckerClass *klass)
+{
+  GObjectClass *object_class = G_OBJECT_CLASS (klass);
+
+  object_class->finalize = source_object_destruction_thread_checker_finalize;
+}
+
+static void
+source_object_destruction_thread_checker_init (SourceObjectDestructionThreadChecker *self)
+{
+}
+
+static void
+user_data_destruction_thread_checker (gpointer user_data)
+{
+  g_assert_true (main_thread == g_thread_self ());
+}
+
+static void
+destruction_thread_test_cb (GObject      *source_object,
+                            GAsyncResult *res,
+                            gpointer      user_data)
+{
+  g_main_loop_quit ((GMainLoop *)user_data);
+}
+
+static void
+destruction_thread_test_thread (GTask        *task,
+                                gpointer      source_object,
+                                gpointer      task_data,
+                                GCancellable *cancellable)
+{
+  g_assert_false (main_thread == g_thread_self ());
+  g_object_unref (source_object);
+}
+
+static void
+test_run_in_thread_destruction (void)
+{
+  SourceObjectDestructionThreadChecker *source_object;
+  GMainLoop *main_loop;
+  GTask *task;
+  int i;
+
+  main_loop = g_main_loop_new (NULL, FALSE);
+
+  for (i = 0; i < 1000000; i++)
+    {
+      source_object = g_object_new (SOURCE_OBJECT_TYPE_DESTRUCTION_THREAD_CHECKER, NULL);
+      task = g_task_new (source_object, NULL, destruction_thread_test_cb, main_loop);
+      g_task_set_task_data (task, NULL, user_data_destruction_thread_checker);
+      g_task_run_in_thread (task, destruction_thread_test_thread);
+      g_main_loop_run (main_loop);
+      g_object_unref (task);
+    }
+
+  g_main_loop_unref (main_loop);
+}
+
 /* test_return_on_cancel */
 
 GMutex roc_init_mutex, roc_finish_mutex;
@@ -2345,6 +2428,7 @@ main (int argc, char **argv)
   g_test_add_func ("/gtask/run-in-thread-priority", test_run_in_thread_priority);
   g_test_add_func ("/gtask/run-in-thread-nested", test_run_in_thread_nested);
   g_test_add_func ("/gtask/run-in-thread-overflow", test_run_in_thread_overflow);
+  g_test_add_func ("/gtask/run-in-thread-destruction", test_run_in_thread_destruction);
   g_test_add_func ("/gtask/return-on-cancel", test_return_on_cancel);
   g_test_add_func ("/gtask/return-on-cancel-sync", test_return_on_cancel_sync);
   g_test_add_func ("/gtask/return-on-cancel-atomic", test_return_on_cancel_atomic);


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]