[glib/mcatanzaro/#1346] gtask: ensure task is destroyed on its context's thread
- From: Michael Catanzaro <mcatanzaro src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [glib/mcatanzaro/#1346] gtask: ensure task is destroyed on its context's thread
- Date: Sat, 25 May 2019 23:40:41 +0000 (UTC)
commit 201935a9ccb01026d5cf46b3e505e031070eecc0
Author: Michael Catanzaro <mcatanzaro igalia com>
Date: Sat May 25 18:30:12 2019 -0500
gtask: ensure task is destroyed on its context's thread
The GTask must be destroyed on the thread that is running its
GMainContext, i.e. the thread that started the task. It must never be
destroyed on the actual task thread when running with
g_task_run_in_thread(), because when it is destroyed, it will unref its
source object and destroy its user data (if a GDestroyNotify was set for
the data using g_task_set_task_data()). The source object and task data
might not be safe to destroy on a secondary thread, though, so this is
incorrect. We have to ensure they are destroyed on the task's context's
thread.
There are different ways we could do this, but the simplest by far is to
ensure the task thread has unreffed the task before the context's thread
executes the callback. And that is simple enough to do using a condition
variable. We have to keep a static global map of all GTasks with
outstanding task threads, which is slightly unfortunate, but we already
have a bunch of global data in this file for managing the thread pool,
and the map will only contain tasks that are currently running in
threads, so it should be small.
Fixes #1346
gio/gtask.c | 104 ++++++++++++++++++++++++++++++++++++++++++++++++++++++-
gio/tests/task.c | 84 ++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 187 insertions(+), 1 deletion(-)
---
diff --git a/gio/gtask.c b/gio/gtask.c
index f6c89c974..6c1a39152 100644
--- a/gio/gtask.c
+++ b/gio/gtask.c
@@ -616,6 +616,46 @@ static GSource *task_pool_manager;
static guint64 task_wait_time;
static gint tasks_running;
+static GHashTable *completion_synchronizer_table; /* GTask -> ThreadedTaskCompletionSynchronizer */
+static GMutex completion_synchronizer_table_mutex;
+
+typedef struct {
+ GMutex mutex;
+ GCond condition;
+ gboolean finished;
+ gatomicrefcount refcount;
+} ThreadedTaskCompletionSynchronizer;
+
+static ThreadedTaskCompletionSynchronizer *
+threaded_task_completion_synchronizer_new (void)
+{
+ ThreadedTaskCompletionSynchronizer *synchronizer = g_new (ThreadedTaskCompletionSynchronizer, 1);
+
+ g_mutex_init (&synchronizer->mutex);
+ g_cond_init (&synchronizer->condition);
+ synchronizer->finished = FALSE;
+ g_atomic_ref_count_init (&synchronizer->refcount);
+
+ return synchronizer;
+}
+
+static void
+threaded_task_completion_synchronizer_ref (ThreadedTaskCompletionSynchronizer *synchronizer)
+{
+ g_atomic_ref_count_inc (&synchronizer->refcount);
+}
+
+static void
+threaded_task_completion_synchronizer_unref (ThreadedTaskCompletionSynchronizer *synchronizer)
+{
+ if (g_atomic_ref_count_dec (&synchronizer->refcount))
+ {
+ g_mutex_clear (&synchronizer->mutex);
+ g_cond_clear (&synchronizer->condition);
+ g_free (synchronizer);
+ }
+}
+
/* When the task pool fills up and blocks, and the program keeps
* queueing more tasks, we will slowly add more threads to the pool
* (in case the existing tasks are trying to queue subtasks of their
@@ -1218,8 +1258,33 @@ g_task_return_now (GTask *task)
}
static gboolean
-complete_in_idle_cb (gpointer task)
+complete_in_idle_cb (gpointer user_data)
{
+ GTask *task = user_data;
+
+ if (G_TASK_IS_THREADED (task))
+ {
+ ThreadedTaskCompletionSynchronizer *synchronizer;
+
+ /* Here we ensure that the task has already been unreffed on the task
+ * thread, to ensure we never destroy the task's source object or user
+ * data on the task thread.
+ */
+ g_mutex_lock (&completion_synchronizer_table_mutex);
+ synchronizer = g_hash_table_lookup (completion_synchronizer_table, task);
+ g_mutex_unlock (&completion_synchronizer_table_mutex);
+
+ g_assert (synchronizer != NULL);
+ g_mutex_lock (&synchronizer->mutex);
+ while (!synchronizer->finished)
+ g_cond_wait (&synchronizer->condition, &synchronizer->mutex);
+ g_mutex_unlock (&synchronizer->mutex);
+
+ g_mutex_lock (&completion_synchronizer_table_mutex);
+ g_hash_table_remove (completion_synchronizer_table, task);
+ g_mutex_unlock (&completion_synchronizer_table_mutex);
+ }
+
g_task_return_now (task);
g_object_unref (task);
return FALSE;
@@ -1398,6 +1463,7 @@ g_task_thread_pool_thread (gpointer thread_data,
gpointer pool_data)
{
GTask *task = thread_data;
+ ThreadedTaskCompletionSynchronizer *synchronizer;
g_task_thread_setup ();
@@ -1406,6 +1472,26 @@ g_task_thread_pool_thread (gpointer thread_data,
g_task_thread_complete (task);
g_object_unref (task);
+ /* Now we have to ensure that the task is not destroyed in this thread, since
+ * that would cause its source object and user data to be destroyed, and that
+ * needs to happen back in the original thread. So signal to the original
+ * thread that we are done. Note we cannot use task at this point except for
+ * its address.
+ */
+ g_mutex_lock (&completion_synchronizer_table_mutex);
+ synchronizer = g_hash_table_lookup (completion_synchronizer_table, task);
+ g_mutex_unlock (&completion_synchronizer_table_mutex);
+
+ g_assert (synchronizer != NULL);
+ threaded_task_completion_synchronizer_ref (synchronizer);
+
+ g_mutex_lock (&synchronizer->mutex);
+ synchronizer->finished = TRUE;
+ g_cond_signal (&synchronizer->condition);
+ g_mutex_unlock (&synchronizer->mutex);
+
+ threaded_task_completion_synchronizer_unref (synchronizer);
+
g_task_thread_cleanup ();
}
@@ -1448,6 +1534,18 @@ static void
g_task_start_task_thread (GTask *task,
GTaskThreadFunc task_func)
{
+ g_mutex_lock (&completion_synchronizer_table_mutex);
+ if (g_hash_table_contains (completion_synchronizer_table, task))
+ {
+ g_critical ("Task is already running in thread");
+ g_mutex_unlock (&completion_synchronizer_table_mutex);
+ return;
+ }
+
+ g_hash_table_insert (completion_synchronizer_table, task,
+ threaded_task_completion_synchronizer_new ());
+ g_mutex_unlock (&completion_synchronizer_table_mutex);
+
g_mutex_init (&task->lock);
g_cond_init (&task->cond);
@@ -2061,6 +2159,10 @@ g_task_thread_pool_init (void)
g_source_attach (task_pool_manager,
GLIB_PRIVATE_CALL (g_get_worker_context ()));
g_source_unref (task_pool_manager);
+
+ completion_synchronizer_table = g_hash_table_new_full (g_direct_hash, g_direct_equal,
+ NULL,
+
(GDestroyNotify)threaded_task_completion_synchronizer_unref);
}
static void
diff --git a/gio/tests/task.c b/gio/tests/task.c
index 9b2c6912c..8f9b10e15 100644
--- a/gio/tests/task.c
+++ b/gio/tests/task.c
@@ -1408,6 +1408,89 @@ test_run_in_thread_overflow (void)
g_assert_cmpint (i + strspn (buf + i, "X"), ==, NUM_OVERFLOW_TASKS);
}
+/* test_run_in_thread_destruction: source object and user data must be
+ * destroyed on the original thread, not the task thread
+ */
+
+#define SOURCE_OBJECT_TYPE_DESTRUCTION_THREAD_CHECKER (source_object_destruction_thread_checker_get_type())
+
+G_DECLARE_FINAL_TYPE (SourceObjectDestructionThreadChecker, source_object_destruction_thread_checker,
SOURCE_OBJECT, DESTRUCTION_THREAD_CHECKER, GObject)
+
+struct _SourceObjectDestructionThreadChecker
+{
+ GObject parent_instance;
+};
+
+G_DEFINE_TYPE (SourceObjectDestructionThreadChecker, source_object_destruction_thread_checker, G_TYPE_OBJECT)
+
+static void
+source_object_destruction_thread_checker_finalize (GObject *object)
+{
+ g_assert_true (main_thread == g_thread_self ());
+
+ G_OBJECT_CLASS (source_object_destruction_thread_checker_parent_class)->finalize (object);
+}
+
+static void
+source_object_destruction_thread_checker_class_init (SourceObjectDestructionThreadCheckerClass *klass)
+{
+ GObjectClass *object_class = G_OBJECT_CLASS (klass);
+
+ object_class->finalize = source_object_destruction_thread_checker_finalize;
+}
+
+static void
+source_object_destruction_thread_checker_init (SourceObjectDestructionThreadChecker *self)
+{
+}
+
+static void
+user_data_destruction_thread_checker (gpointer user_data)
+{
+ g_assert_true (main_thread == g_thread_self ());
+}
+
+static void
+destruction_thread_test_cb (GObject *source_object,
+ GAsyncResult *res,
+ gpointer user_data)
+{
+ g_main_loop_quit ((GMainLoop *)user_data);
+}
+
+static void
+destruction_thread_test_thread (GTask *task,
+ gpointer source_object,
+ gpointer task_data,
+ GCancellable *cancellable)
+{
+ g_assert_false (main_thread == g_thread_self ());
+ g_object_unref (source_object);
+}
+
+static void
+test_run_in_thread_destruction (void)
+{
+ SourceObjectDestructionThreadChecker *source_object;
+ GMainLoop *main_loop;
+ GTask *task;
+ int i;
+
+ main_loop = g_main_loop_new (NULL, FALSE);
+
+ for (i = 0; i < 1000000; i++)
+ {
+ source_object = g_object_new (SOURCE_OBJECT_TYPE_DESTRUCTION_THREAD_CHECKER, NULL);
+ task = g_task_new (source_object, NULL, destruction_thread_test_cb, main_loop);
+ g_task_set_task_data (task, NULL, user_data_destruction_thread_checker);
+ g_task_run_in_thread (task, destruction_thread_test_thread);
+ g_main_loop_run (main_loop);
+ g_object_unref (task);
+ }
+
+ g_main_loop_unref (main_loop);
+}
+
/* test_return_on_cancel */
GMutex roc_init_mutex, roc_finish_mutex;
@@ -2345,6 +2428,7 @@ main (int argc, char **argv)
g_test_add_func ("/gtask/run-in-thread-priority", test_run_in_thread_priority);
g_test_add_func ("/gtask/run-in-thread-nested", test_run_in_thread_nested);
g_test_add_func ("/gtask/run-in-thread-overflow", test_run_in_thread_overflow);
+ g_test_add_func ("/gtask/run-in-thread-destruction", test_run_in_thread_destruction);
g_test_add_func ("/gtask/return-on-cancel", test_return_on_cancel);
g_test_add_func ("/gtask/return-on-cancel-sync", test_return_on_cancel_sync);
g_test_add_func ("/gtask/return-on-cancel-atomic", test_return_on_cancel_atomic);
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]