[evolution-data-server] Add camel_service_queue_task().
- From: Matthew Barnes <mbarnes src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [evolution-data-server] Add camel_service_queue_task().
- Date: Thu, 5 Dec 2013 14:40:08 +0000 (UTC)
commit 3218a4014fedd97b576898fe2501eaa23256c8a0
Author: Matthew Barnes <mbarnes redhat com>
Date: Sat Nov 30 12:08:25 2013 -0500
Add camel_service_queue_task().
Adds a GTask to a queue of waiting tasks with the same source object.
Queued tasks execute one at a time from a separate thread in the order
they were added.
This is primarily intended for use by CamelStore, CamelTransport and
CamelFolder to achieve ordered invocation of synchronous class methods.
This will finally fix Camel's long-standing (mis)use of mutexes as a
dispatch mechanism, which doesn't preserve order and isn't cancellable.
camel/camel-service.c | 211 ++++++++++++++++++++++++++++++-
camel/camel-service.h | 4 +-
docs/reference/camel/camel-sections.txt | 1 +
3 files changed, 213 insertions(+), 3 deletions(-)
---
diff --git a/camel/camel-service.c b/camel/camel-service.c
index 269aeda..2fea2e8 100644
--- a/camel/camel-service.c
+++ b/camel/camel-service.c
@@ -52,8 +52,11 @@
(G_TYPE_INSTANCE_GET_PRIVATE \
((obj), CAMEL_TYPE_SERVICE, CamelServicePrivate))
+#define DISPATCH_DATA_KEY "camel-service-dispatch-data"
+
typedef struct _AsyncContext AsyncContext;
typedef struct _ConnectionOp ConnectionOp;
+typedef struct _DispatchData DispatchData;
struct _CamelServicePrivate {
GWeakRef session;
@@ -74,6 +77,10 @@ struct _CamelServicePrivate {
ConnectionOp *connection_op;
CamelServiceConnectionStatus status;
+ /* Queues of GTasks, by source object. */
+ GHashTable *task_table;
+ GMutex task_table_lock;
+
gboolean network_service_inited;
};
@@ -94,6 +101,12 @@ struct _ConnectionOp {
gulong cancel_id;
};
+struct _DispatchData {
+ GWeakRef service;
+ gboolean return_on_cancel;
+ GTaskThreadFunc task_func;
+};
+
enum {
PROP_0,
PROP_CONNECTION_STATUS,
@@ -107,8 +120,10 @@ enum {
};
/* Forward Declarations */
-void camel_network_service_init (CamelNetworkService *service);
-static void camel_service_initable_init (GInitableIface *interface);
+void camel_network_service_init (CamelNetworkService *service);
+static void camel_service_initable_init (GInitableIface *interface);
+static void service_task_dispatch (CamelService *service,
+ GTask *task);
G_DEFINE_ABSTRACT_TYPE_WITH_CODE (
CamelService, camel_service, CAMEL_TYPE_OBJECT,
@@ -245,6 +260,135 @@ connection_op_complete_pending (ConnectionOp *op,
}
}
+static void
+dispatch_data_free (DispatchData *dispatch_data)
+{
+ g_weak_ref_set (&dispatch_data->service, NULL);
+
+ g_slice_free (DispatchData, dispatch_data);
+}
+
+static void
+task_queue_free (GQueue *task_queue)
+{
+ g_queue_free_full (task_queue, g_object_unref);
+}
+
+static void
+service_task_table_push (CamelService *service,
+ GTask *task)
+{
+ GQueue *task_queue;
+ gpointer source_object;
+ gboolean queue_was_empty;
+
+ g_return_if_fail (CAMEL_IS_SERVICE (service));
+ g_return_if_fail (G_IS_TASK (task));
+
+ source_object = g_task_get_source_object (task);
+ if (source_object == NULL)
+ source_object = service;
+
+ g_mutex_lock (&service->priv->task_table_lock);
+
+ task_queue = g_hash_table_lookup (
+ service->priv->task_table, source_object);
+
+ /* Create on demand. */
+ if (task_queue == NULL) {
+ task_queue = g_queue_new ();
+ g_hash_table_insert (
+ service->priv->task_table,
+ source_object, task_queue);
+ }
+
+ queue_was_empty = g_queue_is_empty (task_queue);
+ g_queue_push_tail (task_queue, g_object_ref (task));
+
+ g_mutex_unlock (&service->priv->task_table_lock);
+
+ if (queue_was_empty)
+ service_task_dispatch (service, task);
+}
+
+static void
+service_task_table_done (CamelService *service,
+ GTask *task)
+{
+ GQueue *task_queue;
+ gpointer source_object;
+ GTask *next = NULL;
+
+ g_return_if_fail (CAMEL_IS_SERVICE (service));
+ g_return_if_fail (G_IS_TASK (task));
+
+ source_object = g_task_get_source_object (task);
+ if (source_object == NULL)
+ source_object = service;
+
+ g_mutex_lock (&service->priv->task_table_lock);
+
+ task_queue = g_hash_table_lookup (
+ service->priv->task_table, source_object);
+
+ if (task_queue != NULL) {
+ if (g_queue_remove (task_queue, task))
+ g_object_unref (task);
+
+ next = g_queue_peek_head (task_queue);
+ if (next != NULL)
+ g_object_ref (next);
+ }
+
+ g_mutex_unlock (&service->priv->task_table_lock);
+
+ if (next != NULL) {
+ service_task_dispatch (service, next);
+ g_object_unref (next);
+ }
+}
+
+static void
+service_task_thread (GTask *task,
+ gpointer source_object,
+ gpointer task_data,
+ GCancellable *cancellable)
+{
+ CamelService *service;
+ DispatchData *data;
+
+ data = g_object_get_data (G_OBJECT (task), DISPATCH_DATA_KEY);
+ g_return_if_fail (data != NULL);
+
+ service = g_weak_ref_get (&data->service);
+ g_return_if_fail (service != NULL);
+
+ data->task_func (task, source_object, task_data, cancellable);
+
+ service_task_table_done (service, task);
+
+ g_object_unref (service);
+}
+
+static void
+service_task_dispatch (CamelService *service,
+ GTask *task)
+{
+ DispatchData *data;
+
+ data = g_object_get_data (G_OBJECT (task), DISPATCH_DATA_KEY);
+ g_return_if_fail (data != NULL);
+
+ /* Restore the task's previous "return-on-cancel" flag.
+ * This returns FALSE if the task is already cancelled,
+ * in which case we skip calling g_task_run_in_thread()
+ * so the task doesn't complete twice. */
+ if (g_task_set_return_on_cancel (task, data->return_on_cancel))
+ g_task_run_in_thread (task, service_task_thread);
+ else
+ service_task_table_done (service, task);
+}
+
static gchar *
service_find_old_data_dir (CamelService *service)
{
@@ -698,6 +842,8 @@ service_dispose (GObject *object)
g_clear_object (&priv->settings);
g_clear_object (&priv->proxy_resolver);
+ g_hash_table_remove_all (priv->task_table);
+
/* Chain up to parent's dispose() method. */
G_OBJECT_CLASS (camel_service_parent_class)->dispose (object);
}
@@ -721,6 +867,9 @@ service_finalize (GObject *object)
g_warn_if_fail (priv->connection_op == NULL);
g_mutex_clear (&priv->connection_lock);
+ g_hash_table_destroy (priv->task_table);
+ g_mutex_clear (&priv->task_table_lock);
+
/* Chain up to parent's finalize() method. */
G_OBJECT_CLASS (camel_service_parent_class)->finalize (object);
}
@@ -929,6 +1078,14 @@ camel_service_initable_init (GInitableIface *interface)
static void
camel_service_init (CamelService *service)
{
+ GHashTable *task_table;
+
+ task_table = g_hash_table_new_full (
+ (GHashFunc) g_direct_hash,
+ (GEqualFunc) g_direct_equal,
+ (GDestroyNotify) NULL,
+ (GDestroyNotify) task_queue_free);
+
service->priv = CAMEL_SERVICE_GET_PRIVATE (service);
g_mutex_init (&service->priv->property_lock);
@@ -938,6 +1095,9 @@ camel_service_init (CamelService *service)
service->priv->proxy_resolver = g_proxy_resolver_get_default ();
if (service->priv->proxy_resolver != NULL)
g_object_ref (service->priv->proxy_resolver);
+
+ service->priv->task_table = task_table;
+ g_mutex_init (&service->priv->task_table_lock);
}
G_DEFINE_QUARK (camel-service-error-quark, camel_service_error)
@@ -1520,6 +1680,53 @@ camel_service_get_uid (CamelService *service)
}
/**
+ * camel_service_queue_task:
+ * @service: a #CamelService
+ * @task: a #GTask
+ * @task_func: function to call when @task is dispatched
+ *
+ * Adds @task to a queue of waiting tasks with the same source object.
+ * Queued tasks execute one at a time in the order they were added. When
+ * @task reaches the front of the queue, it will be dispatched by invoking
+ * @task_func in a separate thread. If @task is cancelled while queued,
+ * it will complete immediately with an appropriate error.
+ *
+ * This is primarily intended for use by #CamelStore, #CamelTransport and
+ * #CamelFolder to achieve ordered invocation of synchronous class methods.
+ *
+ * Since: 3.12
+ **/
+void
+camel_service_queue_task (CamelService *service,
+ GTask *task,
+ GTaskThreadFunc task_func)
+{
+ DispatchData *dispatch_data;
+ gboolean return_on_cancel;
+
+ g_return_if_fail (CAMEL_IS_SERVICE (service));
+ g_return_if_fail (G_IS_TASK (task));
+ g_return_if_fail (task_func != NULL);
+
+ return_on_cancel = g_task_get_return_on_cancel (task);
+
+ dispatch_data = g_slice_new0 (DispatchData);
+ g_weak_ref_set (&dispatch_data->service, service);
+ dispatch_data->return_on_cancel = return_on_cancel;
+ dispatch_data->task_func = task_func;
+
+ /* Complete immediately if cancelled while queued. */
+ g_task_set_return_on_cancel (task, TRUE);
+
+ /* Stash this until it's time to dispatch the GTask. */
+ g_object_set_data_full (
+ G_OBJECT (task), DISPATCH_DATA_KEY,
+ dispatch_data, (GDestroyNotify) dispatch_data_free);
+
+ service_task_table_push (service, task);
+}
+
+/**
* camel_service_connect_sync:
* @service: a #CamelService
* @cancellable: optional #GCancellable object, or %NULL
diff --git a/camel/camel-service.h b/camel/camel-service.h
index fd7c0a8..c44a241 100644
--- a/camel/camel-service.h
+++ b/camel/camel-service.h
@@ -162,7 +162,9 @@ CamelSettings * camel_service_ref_settings (CamelService *service);
void camel_service_set_settings (CamelService *service,
CamelSettings *settings);
const gchar * camel_service_get_uid (CamelService *service);
-
+void camel_service_queue_task (CamelService *service,
+ GTask *task,
+ GTaskThreadFunc task_func);
gboolean camel_service_connect_sync (CamelService *service,
GCancellable *cancellable,
GError **error);
diff --git a/docs/reference/camel/camel-sections.txt b/docs/reference/camel/camel-sections.txt
index 02aaa39..06afe5e 100644
--- a/docs/reference/camel/camel-sections.txt
+++ b/docs/reference/camel/camel-sections.txt
@@ -1871,6 +1871,7 @@ camel_service_ref_session
camel_service_ref_settings
camel_service_set_settings
camel_service_get_uid
+camel_service_queue_task
camel_service_connect_sync
camel_service_connect
camel_service_connect_finish
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]