[glib/wip/task] GTask: Add g_task_set_scheduling()
- From: Colin Walters <walters src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [glib/wip/task] GTask: Add g_task_set_scheduling()
- Date: Sat, 15 Dec 2012 14:56:21 +0000 (UTC)
commit a676dec260563d844fbc8529e859d693900e3579
Author: Colin Walters <walters verbum org>
Date: Thu Dec 13 17:32:22 2012 -0500
GTask: Add g_task_set_scheduling()
My application (OSTree) has suffered really bad performance
degredation recently, and doing some analysis of the problem, I think
a lot stems from the recent order of magnitude increase in the default
GLib worker threads.
OSTree does a *lot* of async I/O (directory enumeration, calling
link(), reading/writing files) etc. It very quickly reaches 100
threads, but there is up/down thread churn as threads complete, but
process gets I/O bound and thus the threads are starved of work, but
then the main thread manages to fill the queue more, suddenly spinning
up lots of worker threads again.
Now, as this patch forces users to specify, there are several distinct
classes of tasks. It basically never makes sense to run more
CPU-bound threads than there are CPUs. Spawning 90+ threads to do
SHA256 calculation for example is just creating pointless extra
contention on a dual-processor system.
Therefore, the limit of the thread pool for G_TASK_THREAD_KIND_CPU is
the number of processors.
Similarly, there's a real limit to how much I/O traffic it makes sense
to schedule simultaneously. I need to do more research here - what
makes sense for my laptop with 1 magnetic hard drive is likely
different from a fast server with RAID on top of SSDs. For the
moment, I've chosen to limit I/O bound threads to 4.
The limit for _DEFAULT remains 100 - but we can solve this better if
we have e.g. a way for tasks to express dependencies, among other things.
But for now, with this patch, and OSTree modified to use
G_TASK_THREAD_KIND_IO for its rename()/link() threads, I am seeing
better performance.
https://bugzilla.gnome.org/show_bug.cgi?id=687223
gio/gfile.c | 30 ++++++------
gio/gfileenumerator.c | 4 +-
gio/gfileinputstream.c | 2 +-
gio/gfileoutputstream.c | 2 +-
gio/gio.symbols | 1 +
gio/gioenums.h | 18 +++++++
gio/gtask.c | 114 +++++++++++++++++++++++++++++++++++++++++++----
gio/gtask.h | 4 ++
gio/tests/task.c | 17 +++++++
9 files changed, 164 insertions(+), 28 deletions(-)
---
diff --git a/gio/gfile.c b/gio/gfile.c
index 74c0f9b..fcc7908 100644
--- a/gio/gfile.c
+++ b/gio/gfile.c
@@ -5007,7 +5007,7 @@ g_file_real_query_info_async (GFile *file,
task = g_task_new (file, cancellable, callback, user_data);
g_task_set_task_data (task, data, (GDestroyNotify)query_info_data_free);
- g_task_set_priority (task, io_priority);
+ g_task_set_scheduling (task, io_priority, G_TASK_THREAD_KIND_IO);
g_task_run_in_thread (task, query_info_async_thread);
g_object_unref (task);
}
@@ -5051,7 +5051,7 @@ g_file_real_query_filesystem_info_async (GFile *file,
task = g_task_new (file, cancellable, callback, user_data);
g_task_set_task_data (task, g_strdup (attributes), g_free);
- g_task_set_priority (task, io_priority);
+ g_task_set_scheduling (task, io_priority, G_TASK_THREAD_KIND_IO);
g_task_run_in_thread (task, query_filesystem_info_async_thread);
g_object_unref (task);
}
@@ -5101,7 +5101,7 @@ g_file_real_enumerate_children_async (GFile *file,
task = g_task_new (file, cancellable, callback, user_data);
g_task_set_task_data (task, data, (GDestroyNotify)query_info_data_free);
- g_task_set_priority (task, io_priority);
+ g_task_set_scheduling (task, io_priority, G_TASK_THREAD_KIND_IO);
g_task_run_in_thread (task, enumerate_children_async_thread);
g_object_unref (task);
}
@@ -5153,7 +5153,7 @@ g_file_real_read_async (GFile *file,
GTask *task;
task = g_task_new (file, cancellable, callback, user_data);
- g_task_set_priority (task, io_priority);
+ g_task_set_scheduling (task, io_priority, G_TASK_THREAD_KIND_IO);
g_task_run_in_thread (task, open_read_async_thread);
g_object_unref (task);
}
@@ -5204,7 +5204,7 @@ g_file_real_append_to_async (GFile *file,
task = g_task_new (file, cancellable, callback, user_data);
g_task_set_task_data (task, data, g_free);
- g_task_set_priority (task, io_priority);
+ g_task_set_scheduling (task, io_priority, G_TASK_THREAD_KIND_IO);
g_task_run_in_thread (task, append_to_async_thread);
g_object_unref (task);
@@ -5256,7 +5256,7 @@ g_file_real_create_async (GFile *file,
task = g_task_new (file, cancellable, callback, user_data);
g_task_set_task_data (task, data, g_free);
- g_task_set_priority (task, io_priority);
+ g_task_set_scheduling (task, io_priority, G_TASK_THREAD_KIND_IO);
g_task_run_in_thread (task, create_async_thread);
g_object_unref (task);
@@ -5334,7 +5334,7 @@ g_file_real_replace_async (GFile *file,
task = g_task_new (file, cancellable, callback, user_data);
g_task_set_task_data (task, data, (GDestroyNotify)replace_async_data_free);
- g_task_set_priority (task, io_priority);
+ g_task_set_scheduling (task, io_priority, G_TASK_THREAD_KIND_IO);
g_task_run_in_thread (task, replace_async_thread);
g_object_unref (task);
@@ -5380,7 +5380,7 @@ g_file_real_delete_async (GFile *file,
GTask *task;
task = g_task_new (file, cancellable, callback, user_data);
- g_task_set_priority (task, io_priority);
+ g_task_set_scheduling (task, io_priority, G_TASK_THREAD_KIND_IO);
g_task_run_in_thread (task, delete_async_thread);
g_object_unref (task);
}
@@ -5432,7 +5432,7 @@ g_file_real_open_readwrite_async (GFile *file,
GTask *task;
task = g_task_new (file, cancellable, callback, user_data);
- g_task_set_priority (task, io_priority);
+ g_task_set_scheduling (task, io_priority, G_TASK_THREAD_KIND_IO);
g_task_run_in_thread (task, open_readwrite_async_thread);
g_object_unref (task);
@@ -5492,7 +5492,7 @@ g_file_real_create_readwrite_async (GFile *file,
task = g_task_new (file, cancellable, callback, user_data);
g_task_set_task_data (task, data, g_free);
- g_task_set_priority (task, io_priority);
+ g_task_set_scheduling (task, io_priority, G_TASK_THREAD_KIND_IO);
g_task_run_in_thread (task, create_readwrite_async_thread);
g_object_unref (task);
@@ -5567,7 +5567,7 @@ g_file_real_replace_readwrite_async (GFile *file,
task = g_task_new (file, cancellable, callback, user_data);
g_task_set_task_data (task, data, (GDestroyNotify)replace_rw_async_data_free);
- g_task_set_priority (task, io_priority);
+ g_task_set_scheduling (task, io_priority, G_TASK_THREAD_KIND_IO);
g_task_run_in_thread (task, replace_readwrite_async_thread);
g_object_unref (task);
@@ -5613,7 +5613,7 @@ g_file_real_set_display_name_async (GFile *file,
task = g_task_new (file, cancellable, callback, user_data);
g_task_set_task_data (task, g_strdup (display_name), g_free);
- g_task_set_priority (task, io_priority);
+ g_task_set_scheduling (task, io_priority, G_TASK_THREAD_KIND_IO);
g_task_run_in_thread (task, set_display_name_async_thread);
g_object_unref (task);
@@ -5680,7 +5680,7 @@ g_file_real_set_attributes_async (GFile *file,
task = g_task_new (file, cancellable, callback, user_data);
g_task_set_task_data (task, data, (GDestroyNotify)set_info_data_free);
- g_task_set_priority (task, io_priority);
+ g_task_set_scheduling (task, io_priority, G_TASK_THREAD_KIND_IO);
g_task_run_in_thread (task, set_info_async_thread);
g_object_unref (task);
@@ -5734,7 +5734,7 @@ g_file_real_find_enclosing_mount_async (GFile *file,
GTask *task;
task = g_task_new (file, cancellable, callback, user_data);
- g_task_set_priority (task, io_priority);
+ g_task_set_scheduling (task, io_priority, G_TASK_THREAD_KIND_IO);
g_task_run_in_thread (task, find_enclosing_mount_async_thread);
g_object_unref (task);
@@ -5853,7 +5853,7 @@ g_file_real_copy_async (GFile *source,
task = g_task_new (source, cancellable, callback, user_data);
g_task_set_task_data (task, data, (GDestroyNotify)copy_async_data_free);
- g_task_set_priority (task, io_priority);
+ g_task_set_scheduling (task, io_priority, G_TASK_THREAD_KIND_IO);
g_task_run_in_thread (task, copy_async_thread);
g_object_unref (task);
}
diff --git a/gio/gfileenumerator.c b/gio/gfileenumerator.c
index d2779ce..ae661c5 100644
--- a/gio/gfileenumerator.c
+++ b/gio/gfileenumerator.c
@@ -678,7 +678,7 @@ g_file_enumerator_real_next_files_async (GFileEnumerator *enumerator,
task = g_task_new (enumerator, cancellable, callback, user_data);
g_task_set_task_data (task, GINT_TO_POINTER (num_files), NULL);
- g_task_set_priority (task, io_priority);
+ g_task_set_scheduling (task, io_priority, G_TASK_THREAD_KIND_IO);
g_task_run_in_thread (task, next_files_thread);
g_object_unref (task);
@@ -723,7 +723,7 @@ g_file_enumerator_real_close_async (GFileEnumerator *enumerator,
GTask *task;
task = g_task_new (enumerator, cancellable, callback, user_data);
- g_task_set_priority (task, io_priority);
+ g_task_set_scheduling (task, io_priority, G_TASK_THREAD_KIND_IO);
g_task_run_in_thread (task, close_async_thread);
g_object_unref (task);
diff --git a/gio/gfileinputstream.c b/gio/gfileinputstream.c
index 73e7b78..ff89ef3 100644
--- a/gio/gfileinputstream.c
+++ b/gio/gfileinputstream.c
@@ -417,7 +417,7 @@ g_file_input_stream_real_query_info_async (GFileInputStream *stream,
task = g_task_new (stream, cancellable, callback, user_data);
g_task_set_task_data (task, g_strdup (attributes), g_free);
- g_task_set_priority (task, io_priority);
+ g_task_set_scheduling (task, io_priority, G_TASK_THREAD_KIND_IO);
g_task_run_in_thread (task, query_info_async_thread);
g_object_unref (task);
diff --git a/gio/gfileoutputstream.c b/gio/gfileoutputstream.c
index 7b77ba5..3d9bbdd 100644
--- a/gio/gfileoutputstream.c
+++ b/gio/gfileoutputstream.c
@@ -520,7 +520,7 @@ g_file_output_stream_real_query_info_async (GFileOutputStream *stream,
task = g_task_new (stream, cancellable, callback, user_data);
g_task_set_task_data (task, g_strdup (attributes), g_free);
- g_task_set_priority (task, io_priority);
+ g_task_set_scheduling (task, io_priority, G_TASK_THREAD_KIND_IO);
g_task_run_in_thread (task, query_info_async_thread);
g_object_unref (task);
diff --git a/gio/gio.symbols b/gio/gio.symbols
index 3096755..19fc9ac 100644
--- a/gio/gio.symbols
+++ b/gio/gio.symbols
@@ -1793,6 +1793,7 @@ g_task_run_in_thread
g_task_run_in_thread_sync
g_task_set_check_cancellable
g_task_set_priority
+g_task_set_kind
g_task_set_return_on_cancel
g_task_set_source_tag
g_task_set_task_data
diff --git a/gio/gioenums.h b/gio/gioenums.h
index 39c43c0..4030501 100644
--- a/gio/gioenums.h
+++ b/gio/gioenums.h
@@ -1655,6 +1655,24 @@ typedef enum /*< flags >*/ {
G_TEST_DBUS_NONE = 0
} GTestDBusFlags;
+/**
+ * GTaskThreadKind:
+ * @G_TASK_THREAD_KIND_DEFAULT: Work with unknown performance characteristics
+ * @G_TASK_THREAD_KIND_LOCAL_IO: Performs synchronous I/O on local files
+ * @G_TASK_THREAD_KIND_CPU: Mostly uses the processor
+ * @G_TASK_THREAD_KIND_MIXED: Alternates between IO and CPU
+ *
+ * Flags to control how a #GTask is scheduled.
+ *
+ * Since: 2.36
+ */
+typedef enum {
+ G_TASK_THREAD_KIND_DEFAULT = 0,
+ G_TASK_THREAD_KIND_IO,
+ G_TASK_THREAD_KIND_CPU,
+ G_TASK_THREAD_KIND_MIXED
+} GTaskThreadKind;
+
G_END_DECLS
#endif /* __GIO_ENUMS_H__ */
diff --git a/gio/gtask.c b/gio/gtask.c
index 0a80c24..b72e5ac 100644
--- a/gio/gtask.c
+++ b/gio/gtask.c
@@ -24,6 +24,10 @@
#include "gasyncresult.h"
#include "gcancellable.h"
+#ifdef G_OS_UNIX
+#include <unistd.h>
+#include <errno.h>
+#endif
/**
* SECTION:gtask
@@ -578,6 +582,7 @@ struct _GTask {
gpointer callback_data;
GTaskThreadFunc task_func;
+ GTaskThreadKind kind;
GMutex lock;
GCond cond;
gboolean return_on_cancel;
@@ -612,12 +617,16 @@ G_DEFINE_TYPE_WITH_CODE (GTask, g_task, G_TYPE_OBJECT,
g_task_async_result_iface_init);
g_task_thread_pool_init ();)
-static GThreadPool *task_pool;
+static GThreadPool *task_pool_default;
+static GThreadPool *task_pool_cpu;
+static GThreadPool *task_pool_io;
+static GThreadPool *task_pool_mixed;
static void
g_task_init (GTask *task)
{
task->check_cancellable = TRUE;
+ task->kind = G_TASK_THREAD_KIND_DEFAULT;
}
static void
@@ -825,6 +834,34 @@ g_task_set_priority (GTask *task,
}
/**
+ * g_task_set_scheduling:
+ * @task: the #GTask
+ * @priority: the <link linkend="io-priority">priority</link>
+ * of the request.
+ * @kind: Performance characteristics of @task
+ *
+ * First, uses @priority like g_task_set_priority(). Additionally,
+ * the argument @kind allows GLib to more efficiently schedule threads
+ * created internally by #GTask. For example, if you set
+ * %G_TASK_THREAD_KIND_CPU, the system will ensure that the number of
+ * threads does not significantly exceed the number of processors
+ * available.
+ *
+ * This function must be used before invoking g_task_run_in_thread()
+ * or similar.
+ *
+ * Since: 2.36
+ */
+void
+g_task_set_scheduling (GTask *task,
+ gint priority,
+ GTaskThreadKind kind)
+{
+ g_task_set_priority (task, priority);
+ task->kind = kind;
+}
+
+/**
* g_task_set_check_cancellable:
* @task: the #GTask
* @check_cancellable: whether #GTask will check the state of
@@ -1267,6 +1304,8 @@ static void
g_task_start_task_thread (GTask *task,
GTaskThreadFunc task_func)
{
+ GThreadPool *pool = NULL;
+
g_mutex_init (&task->lock);
g_cond_init (&task->cond);
@@ -1274,6 +1313,23 @@ g_task_start_task_thread (GTask *task,
task->task_func = task_func;
+ switch (task->kind)
+ {
+ case G_TASK_THREAD_KIND_DEFAULT:
+ pool = task_pool_default;
+ break;
+ case G_TASK_THREAD_KIND_CPU:
+ pool = task_pool_cpu;
+ break;
+ case G_TASK_THREAD_KIND_IO:
+ pool = task_pool_io;
+ break;
+ case G_TASK_THREAD_KIND_MIXED:
+ pool = task_pool_mixed;
+ break;
+ }
+ g_assert (pool);
+
if (task->cancellable)
{
if (task->return_on_cancel &&
@@ -1281,7 +1337,7 @@ g_task_start_task_thread (GTask *task,
&task->error))
{
task->thread_cancelled = task->thread_complete = TRUE;
- g_thread_pool_push (task_pool, g_object_ref (task), NULL);
+ g_thread_pool_push (pool, g_object_ref (task), NULL);
return;
}
@@ -1291,7 +1347,7 @@ g_task_start_task_thread (GTask *task,
task_thread_cancelled_disconnect_notify, 0);
}
- g_thread_pool_push (task_pool, g_object_ref (task), &task->error);
+ g_thread_pool_push (pool, g_object_ref (task), &task->error);
if (task->error)
task->thread_complete = TRUE;
}
@@ -1299,11 +1355,20 @@ g_task_start_task_thread (GTask *task,
/**
* g_task_run_in_thread:
* @task: a #GTask
+ * @kind: Performance characteristics of @task_func
* @task_func: a #GTaskThreadFunc
*
* Runs @task_func in another thread. When @task_func returns, @task's
* #GAsyncReadyCallback will be invoked in @task's #GMainContext.
*
+ * The parameter @kind determines how the work will be scheduled. If
+ * the given @task_func synchronously invokes any other GLib-based
+ * API, you must use #G_TASK_THREAD_KIND_DEFAULT. Otherwise, there is
+ * the possibility of deadlock.
+ *
+ * For example, GLib will try to avoid running significantly more CPU
+ * bound threads than there are system processors. IO bound threads
+ *
* This takes a ref on @task until the task completes.
*
* See #GTaskThreadFunc for more details about how @task_func is handled.
@@ -1762,20 +1827,51 @@ g_task_compare_priority (gconstpointer a,
return ta->priority - tb->priority;
}
+static gint
+get_nproc_onln (void)
+{
+ gint result;
+#ifdef G_OS_UNIX
+ result = sysconf (_SC_NPROCESSORS_ONLN);
+ if (G_UNLIKELY (result == -1 && errno == EINVAL))
+ return 2;
+ return result;
+#else
+ return 2;
+#endif
+}
+
+static GThreadPool *
+generic_pool_new (gint max_threads)
+{
+ GThreadPool *result = g_thread_pool_new (g_task_thread_pool_thread, NULL,
+ 100, FALSE, NULL);
+ g_thread_pool_set_sort_function (result, g_task_compare_priority, NULL);
+ g_assert (result != NULL);
+ return result;
+}
+
static void
g_task_thread_pool_init (void)
{
- task_pool = g_thread_pool_new (g_task_thread_pool_thread, NULL,
- 100, FALSE, NULL);
- g_assert (task_pool != NULL);
-
- g_thread_pool_set_sort_function (task_pool, g_task_compare_priority, NULL);
+ int nproc;
+ /* This number was picked out of thin air */
+ const int max_io_threads = 4;
+ /* See https://bugzilla.gnome.org/show_bug.cgi?id=687223 */
+ const int max_default_threads = 100;
+
+ nproc = get_nproc_onln ();
+
+ task_pool_default = generic_pool_new (max_default_threads);
+ task_pool_cpu = generic_pool_new (nproc);
+ task_pool_io = generic_pool_new (max_io_threads);
+ task_pool_mixed = generic_pool_new (nproc + max_io_threads);
}
static void
g_task_thread_pool_resort (void)
{
- g_thread_pool_set_sort_function (task_pool, g_task_compare_priority, NULL);
+ g_thread_pool_set_sort_function (task_pool_default, g_task_compare_priority, NULL);
}
static void
diff --git a/gio/gtask.h b/gio/gtask.h
index 2cc5f65..d4e9cd9 100644
--- a/gio/gtask.h
+++ b/gio/gtask.h
@@ -71,6 +71,10 @@ GLIB_AVAILABLE_IN_2_36
void g_task_set_priority (GTask *task,
gint priority);
GLIB_AVAILABLE_IN_2_36
+void g_task_set_scheduling (GTask *task,
+ gint glib_priority,
+ GTaskThreadKind kind);
+GLIB_AVAILABLE_IN_2_36
void g_task_set_check_cancellable (GTask *task,
gboolean check_cancellable);
GLIB_AVAILABLE_IN_2_36
diff --git a/gio/tests/task.c b/gio/tests/task.c
index 8065807..db2eeca 100644
--- a/gio/tests/task.c
+++ b/gio/tests/task.c
@@ -897,6 +897,22 @@ test_run_in_thread_priority (void)
g_mutex_unlock (&fake_task_mutex);
}
+static void
+test_run_in_thread_kind (void)
+{
+ GTask *task;
+ gint seq = 42;
+
+ /* No real semantics here, we're basically just calling
+ * g_task_set_scheduling() to get basic code coverage.
+ */
+ task = g_task_new (NULL, NULL, quit_main_loop_callback, NULL);
+ g_task_set_task_data (task, &seq, NULL);
+ g_task_set_scheduling (task, G_PRIORITY_DEFAULT, G_TASK_THREAD_KIND_IO);
+ g_task_run_in_thread (task, set_sequence_number_thread);
+ g_object_unref (task);
+}
+
/* test_return_on_cancel */
GMutex roc_init_mutex, roc_finish_mutex;
@@ -1652,6 +1668,7 @@ main (int argc, char **argv)
g_test_add_func ("/gtask/run-in-thread", test_run_in_thread);
g_test_add_func ("/gtask/run-in-thread-sync", test_run_in_thread_sync);
g_test_add_func ("/gtask/run-in-thread-priority", test_run_in_thread_priority);
+ g_test_add_func ("/gtask/run-in-thread-kind", test_run_in_thread_kind);
g_test_add_func ("/gtask/return-on-cancel", test_return_on_cancel);
g_test_add_func ("/gtask/return-on-cancel-sync", test_return_on_cancel_sync);
g_test_add_func ("/gtask/return-on-cancel-atomic", test_return_on_cancel_atomic);
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]