[tracker/queue: 3/3] Use queues for all requests



commit af84a0201badaf7347deac71c773989876920bd3
Author: Jürg Billeter <j bitron ch>
Date:   Wed Apr 7 15:13:54 2010 +0200

    Use queues for all requests

 src/tracker-store/tracker-resources.c |  173 +++++++++++++++------------
 src/tracker-store/tracker-store.c     |  213 +++++++++++++++++++++++----------
 src/tracker-store/tracker-store.h     |   39 ++++--
 3 files changed, 272 insertions(+), 153 deletions(-)
---
diff --git a/src/tracker-store/tracker-resources.c b/src/tracker-store/tracker-resources.c
index 2b964f6..4f2e437 100644
--- a/src/tracker-store/tracker-resources.c
+++ b/src/tracker-store/tracker-resources.c
@@ -178,16 +178,43 @@ tracker_resources_load (TrackerResources         *object,
 	g_object_unref (file);
 }
 
+static void
+query_callback (TrackerDBResultSet *result_set, GError *error, gpointer user_data)
+{
+	TrackerDBusMethodInfo *info = user_data;
+	GPtrArray *values;
+
+	if (error) {
+		tracker_dbus_request_failed (info->request_id,
+		                             info->context,
+		                             &error,
+		                             NULL);
+		dbus_g_method_return_error (info->context, error);
+		return;
+	}
+
+	tracker_dbus_request_success (info->request_id,
+	                              info->context);
+
+	values = tracker_dbus_query_result_to_ptr_array (result_set);
+
+	dbus_g_method_return (info->context, values);
+
+	tracker_dbus_results_ptr_array_free (&values);
+}
+
 void
 tracker_resources_sparql_query (TrackerResources         *self,
                                 const gchar              *query,
                                 DBusGMethodInvocation    *context,
                                 GError                  **error)
 {
-	TrackerDBResultSet   *result_set;
-	GError               *actual_error = NULL;
+	TrackerDBusMethodInfo   *info;
+	TrackerResourcesPrivate *priv;
 	guint                 request_id;
-	GPtrArray            *values;
+	gchar                 *sender;
+
+	priv = TRACKER_RESOURCES_GET_PRIVATE (self);
 
 	request_id = tracker_dbus_get_next_request_id ();
 
@@ -199,28 +226,37 @@ tracker_resources_sparql_query (TrackerResources         *self,
 	                          __FUNCTION__,
 	                          query);
 
-	result_set = tracker_store_sparql_query (query, &actual_error);
+	info = g_slice_new (TrackerDBusMethodInfo);
 
-	if (actual_error) {
-		tracker_dbus_request_failed (request_id,
-		                             context,
-		                             &actual_error,
-		                             NULL);
-		dbus_g_method_return_error (context, actual_error);
-		g_error_free (actual_error);
-		return;
-	}
+	info->request_id = request_id;
+	info->context = context;
 
-	values = tracker_dbus_query_result_to_ptr_array (result_set);
+	sender = dbus_g_method_get_sender (context);
 
-	tracker_dbus_request_success (request_id, context);
-	dbus_g_method_return (context, values);
+	tracker_store_sparql_query (query, TRACKER_STORE_PRIORITY_HIGH,
+	                            query_callback, sender,
+	                            info, destroy_method_info);
 
-	tracker_dbus_results_ptr_array_free (&values);
+	g_free (sender);
+}
+
+static void
+update_callback (GError *error, gpointer user_data)
+{
+	TrackerDBusMethodInfo *info = user_data;
 
-	if (result_set) {
-		g_object_unref (result_set);
+	if (error) {
+		tracker_dbus_request_failed (info->request_id,
+		                             info->context,
+		                             &error,
+		                             NULL);
+		dbus_g_method_return_error (info->context, error);
+		return;
 	}
+
+	tracker_dbus_request_success (info->request_id,
+	                              info->context);
+	dbus_g_method_return (info->context);
 }
 
 void
@@ -229,9 +265,10 @@ tracker_resources_sparql_update (TrackerResources        *self,
                                  DBusGMethodInvocation   *context,
                                  GError                 **error)
 {
+	TrackerDBusMethodInfo   *info;
 	TrackerResourcesPrivate *priv;
-	GError                       *actual_error = NULL;
 	guint                 request_id;
+	gchar                 *sender;
 
 	priv = TRACKER_RESOURCES_GET_PRIVATE (self);
 
@@ -245,20 +282,37 @@ tracker_resources_sparql_update (TrackerResources        *self,
 	                          __FUNCTION__,
 	                          update);
 
-	tracker_store_sparql_update (update, &actual_error);
+	info = g_slice_new (TrackerDBusMethodInfo);
 
-	if (actual_error) {
-		tracker_dbus_request_failed (request_id,
-		                             context,
-		                             &actual_error,
+	info->request_id = request_id;
+	info->context = context;
+
+	sender = dbus_g_method_get_sender (context);
+
+	tracker_store_sparql_update (update, TRACKER_STORE_PRIORITY_HIGH, FALSE,
+	                             update_callback, sender,
+	                             info, destroy_method_info);
+
+	g_free (sender);
+}
+
+static void
+update_blank_callback (GPtrArray *blank_nodes, GError *error, gpointer user_data)
+{
+	TrackerDBusMethodInfo *info = user_data;
+
+	if (error) {
+		tracker_dbus_request_failed (info->request_id,
+		                             info->context,
+		                             &error,
 		                             NULL);
-		dbus_g_method_return_error (context, actual_error);
-		g_error_free (actual_error);
+		dbus_g_method_return_error (info->context, error);
 		return;
 	}
 
-	tracker_dbus_request_success (request_id, context);
-	dbus_g_method_return (context);
+	tracker_dbus_request_success (info->request_id,
+	                              info->context);
+	dbus_g_method_return (info->context, blank_nodes);
 }
 
 void
@@ -267,10 +321,10 @@ tracker_resources_sparql_update_blank (TrackerResources       *self,
                                        DBusGMethodInvocation  *context,
                                        GError                **error)
 {
+	TrackerDBusMethodInfo   *info;
 	TrackerResourcesPrivate *priv;
-	GError                       *actual_error = NULL;
 	guint                 request_id;
-	GPtrArray            *blank_nodes;
+	gchar                 *sender;
 
 	priv = TRACKER_RESOURCES_GET_PRIVATE (self);
 
@@ -284,35 +338,18 @@ tracker_resources_sparql_update_blank (TrackerResources       *self,
 	                          __FUNCTION__,
 	                          update);
 
-	blank_nodes = tracker_store_sparql_update_blank (update, &actual_error);
-
-	if (actual_error) {
-		tracker_dbus_request_failed (request_id,
-		                             context,
-		                             &actual_error,
-		                             NULL);
-		dbus_g_method_return_error (context, actual_error);
-		g_error_free (actual_error);
-		return;
-	}
+	info = g_slice_new (TrackerDBusMethodInfo);
 
-	if (!blank_nodes) {
-		/* Create empty GPtrArray for dbus-glib to be happy */
-		blank_nodes = g_ptr_array_new ();
-	}
+	info->request_id = request_id;
+	info->context = context;
 
-	tracker_dbus_request_success (request_id, context);
-	dbus_g_method_return (context, blank_nodes);
+	sender = dbus_g_method_get_sender (context);
 
-	if (blank_nodes) {
-		gint i;
+	tracker_store_sparql_update_blank (update, TRACKER_STORE_PRIORITY_HIGH,
+	                                   update_blank_callback, sender,
+	                                   info, destroy_method_info);
 
-		for (i = 0; i < blank_nodes->len; i++) {
-			g_ptr_array_foreach (blank_nodes->pdata[i], (GFunc) g_hash_table_unref, NULL);
-			g_ptr_array_free (blank_nodes->pdata[i], TRUE);
-		}
-		g_ptr_array_free (blank_nodes, TRUE);
-	}
+	g_free (sender);
 }
 
 void
@@ -335,25 +372,6 @@ tracker_resources_sync (TrackerResources        *self,
 	dbus_g_method_return (context);
 }
 
-static void
-batch_update_callback (GError *error, gpointer user_data)
-{
-	TrackerDBusMethodInfo *info = user_data;
-
-	if (error) {
-		tracker_dbus_request_failed (info->request_id,
-		                             info->context,
-		                             &error,
-		                             NULL);
-		dbus_g_method_return_error (info->context, error);
-		return;
-	}
-
-	tracker_dbus_request_success (info->request_id,
-	                              info->context);
-	dbus_g_method_return (info->context);
-}
-
 void
 tracker_resources_batch_sparql_update (TrackerResources          *self,
                                        const gchar               *update,
@@ -384,8 +402,9 @@ tracker_resources_batch_sparql_update (TrackerResources          *self,
 
 	sender = dbus_g_method_get_sender (context);
 
-	tracker_store_queue_sparql_update (update, batch_update_callback,
-	                                   sender, info, destroy_method_info);
+	tracker_store_sparql_update (update, TRACKER_STORE_PRIORITY_LOW, TRUE,
+	                             update_callback, sender,
+	                             info, destroy_method_info);
 
 	g_free (sender);
 }
diff --git a/src/tracker-store/tracker-store.c b/src/tracker-store/tracker-store.c
index 280f2e4..63f7364 100644
--- a/src/tracker-store/tracker-store.c
+++ b/src/tracker-store/tracker-store.c
@@ -45,9 +45,11 @@ typedef struct {
 } TrackerStorePrivate;
 
 typedef enum {
-	TRACKER_STORE_TASK_TYPE_UPDATE = 0,
-	TRACKER_STORE_TASK_TYPE_COMMIT = 1,
-	TRACKER_STORE_TASK_TYPE_TURTLE = 2,
+	TRACKER_STORE_TASK_TYPE_QUERY,
+	TRACKER_STORE_TASK_TYPE_UPDATE,
+	TRACKER_STORE_TASK_TYPE_UPDATE_BLANK,
+	TRACKER_STORE_TASK_TYPE_COMMIT,
+	TRACKER_STORE_TASK_TYPE_TURTLE,
 } TrackerStoreTaskType;
 
 typedef struct {
@@ -55,6 +57,10 @@ typedef struct {
 	union {
 		struct {
 			gchar                   *query;
+		} query;
+		struct {
+			gchar                   *query;
+			gboolean                 batch;
 			gchar                   *client_id;
 		} update;
 		struct {
@@ -65,9 +71,11 @@ typedef struct {
 	gpointer                   user_data;
 	GDestroyNotify             destroy;
 	union {
-		TrackerStoreSparqlUpdateCallback update_callback;
-		TrackerStoreCommitCallback       commit_callback;
-		TrackerStoreTurtleCallback       turtle_callback;
+		TrackerStoreSparqlQueryCallback       query_callback;
+		TrackerStoreSparqlUpdateCallback      update_callback;
+		TrackerStoreSparqlUpdateBlankCallback update_blank_callback;
+		TrackerStoreCommitCallback            commit_callback;
+		TrackerStoreTurtleCallback            turtle_callback;
 	} callback;
 } TrackerStoreTask;
 
@@ -199,18 +207,44 @@ queue_idle_handler (gpointer user_data)
 	}
 	g_return_val_if_fail (task != NULL, FALSE);
 
-	if (task->type == TRACKER_STORE_TASK_TYPE_UPDATE) {
+	if (task->type == TRACKER_STORE_TASK_TYPE_QUERY) {
 		GError *error = NULL;
+		TrackerDBResultSet *result_set;
 
-		begin_batch (private);
+		result_set = tracker_data_query_sparql (task->data.query.query, &error);
+
+		if (task->callback.query_callback) {
+			task->callback.query_callback (result_set, error, task->user_data);
+		}
+
+		if (result_set) {
+			g_object_unref (result_set);
+		}
+
+		if (error) {
+			g_clear_error (&error);
+		}
+	} else if (task->type == TRACKER_STORE_TASK_TYPE_UPDATE) {
+		GError *error = NULL;
+
+		if (task->data.update.batch) {
+			begin_batch (private);
+		} else {
+			end_batch (private);
+			tracker_data_begin_db_transaction ();
+		}
 
 		tracker_data_update_sparql (task->data.update.query, &error);
 
-		if (!error) {
-			private->batch_count++;
-			if (private->batch_count >= TRACKER_STORE_TRANSACTION_MAX) {
-				end_batch (private);
+		if (task->data.update.batch) {
+			if (!error) {
+				private->batch_count++;
+				if (private->batch_count >= TRACKER_STORE_TRANSACTION_MAX) {
+					end_batch (private);
+				}
 			}
+		} else {
+			tracker_data_commit_db_transaction ();
 		}
 
 		if (task->callback.update_callback) {
@@ -220,6 +254,52 @@ queue_idle_handler (gpointer user_data)
 		if (error) {
 			g_clear_error (&error);
 		}
+	} else if (task->type == TRACKER_STORE_TASK_TYPE_UPDATE_BLANK) {
+		GError *error = NULL;
+		GPtrArray *blank_nodes;
+
+		if (task->data.update.batch) {
+			begin_batch (private);
+		} else {
+			end_batch (private);
+			tracker_data_begin_db_transaction ();
+		}
+
+		blank_nodes = tracker_data_update_sparql_blank (task->data.update.query, &error);
+
+		if (task->data.update.batch) {
+			if (!error) {
+				private->batch_count++;
+				if (private->batch_count >= TRACKER_STORE_TRANSACTION_MAX) {
+					end_batch (private);
+				}
+			}
+		} else {
+			tracker_data_commit_db_transaction ();
+		}
+
+		if (task->callback.update_blank_callback) {
+			if (!blank_nodes) {
+				/* Create empty GPtrArray for dbus-glib to be happy */
+				blank_nodes = g_ptr_array_new ();
+			}
+
+			task->callback.update_blank_callback (blank_nodes, error, task->user_data);
+		}
+
+		if (blank_nodes) {
+			gint i;
+
+			for (i = 0; i < blank_nodes->len; i++) {
+				g_ptr_array_foreach (blank_nodes->pdata[i], (GFunc) g_hash_table_unref, NULL);
+				g_ptr_array_free (blank_nodes->pdata[i], TRUE);
+			}
+			g_ptr_array_free (blank_nodes, TRUE);
+		}
+
+		if (error) {
+			g_clear_error (&error);
+		}
 	} else if (task->type == TRACKER_STORE_TASK_TYPE_COMMIT) {
 		end_batch (private);
 
@@ -379,13 +459,13 @@ tracker_store_queue_commit (TrackerStoreCommitCallback callback,
 	}
 }
 
-
 void
-tracker_store_queue_sparql_update (const gchar *sparql,
-                                   TrackerStoreSparqlUpdateCallback callback,
-                                   const gchar *client_id,
-                                   gpointer user_data,
-                                   GDestroyNotify destroy)
+tracker_store_sparql_query (const gchar *sparql,
+                            TrackerStorePriority priority,
+                            TrackerStoreSparqlQueryCallback callback,
+                            const gchar *client_id,
+                            gpointer user_data,
+                            GDestroyNotify destroy)
 {
 	TrackerStorePrivate *private;
 	TrackerStoreTask    *task;
@@ -396,14 +476,14 @@ tracker_store_queue_sparql_update (const gchar *sparql,
 	g_return_if_fail (private != NULL);
 
 	task = g_slice_new0 (TrackerStoreTask);
-	task->type = TRACKER_STORE_TASK_TYPE_UPDATE;
+	task->type = TRACKER_STORE_TASK_TYPE_QUERY;
 	task->data.update.query = g_strdup (sparql);
 	task->user_data = user_data;
-	task->callback.update_callback = callback;
+	task->callback.query_callback = callback;
 	task->destroy = destroy;
 	task->data.update.client_id = g_strdup (client_id);
 
-	g_queue_push_tail (private->queues[TRACKER_STORE_PRIORITY_LOW], task);
+	g_queue_push_tail (private->queues[priority], task);
 
 	if (!private->have_handler) {
 		start_handler (private);
@@ -411,27 +491,32 @@ tracker_store_queue_sparql_update (const gchar *sparql,
 }
 
 void
-tracker_store_queue_turtle_import (GFile                      *file,
-                                   TrackerStoreTurtleCallback  callback,
-                                   gpointer                    user_data,
-                                   GDestroyNotify              destroy)
+tracker_store_sparql_update (const gchar *sparql,
+                             TrackerStorePriority priority,
+                             gboolean batch,
+                             TrackerStoreSparqlUpdateCallback callback,
+                             const gchar *client_id,
+                             gpointer user_data,
+                             GDestroyNotify destroy)
 {
 	TrackerStorePrivate *private;
 	TrackerStoreTask    *task;
 
-	g_return_if_fail (G_IS_FILE (file));
+	g_return_if_fail (sparql != NULL);
 
 	private = g_static_private_get (&private_key);
 	g_return_if_fail (private != NULL);
 
 	task = g_slice_new0 (TrackerStoreTask);
-	task->type = TRACKER_STORE_TASK_TYPE_TURTLE;
-	task->data.turtle.path = g_file_get_path (file);
+	task->type = TRACKER_STORE_TASK_TYPE_UPDATE;
+	task->data.update.query = g_strdup (sparql);
+	task->data.update.batch = batch;
 	task->user_data = user_data;
 	task->callback.update_callback = callback;
 	task->destroy = destroy;
+	task->data.update.client_id = g_strdup (client_id);
 
-	g_queue_push_tail (private->queues[TRACKER_STORE_PRIORITY_LOW], task);
+	g_queue_push_tail (private->queues[priority], task);
 
 	if (!private->have_handler) {
 		start_handler (private);
@@ -439,60 +524,62 @@ tracker_store_queue_turtle_import (GFile                      *file,
 }
 
 void
-tracker_store_sparql_update (const gchar *sparql,
-                             GError     **error)
+tracker_store_sparql_update_blank (const gchar *sparql,
+                                   TrackerStorePriority priority,
+                                   TrackerStoreSparqlUpdateBlankCallback callback,
+                                   const gchar *client_id,
+                                   gpointer user_data,
+                                   GDestroyNotify destroy)
 {
 	TrackerStorePrivate *private;
+	TrackerStoreTask    *task;
 
 	g_return_if_fail (sparql != NULL);
 
 	private = g_static_private_get (&private_key);
 	g_return_if_fail (private != NULL);
 
-	if (private->batch_mode) {
-		/* commit pending batch items */
-		tracker_data_commit_db_transaction ();
-		private->batch_mode = FALSE;
-		private->batch_count = 0;
-	}
+	task = g_slice_new0 (TrackerStoreTask);
+	task->type = TRACKER_STORE_TASK_TYPE_UPDATE_BLANK;
+	task->data.update.query = g_strdup (sparql);
+	task->user_data = user_data;
+	task->callback.update_blank_callback = callback;
+	task->destroy = destroy;
+	task->data.update.client_id = g_strdup (client_id);
 
-	tracker_data_begin_db_transaction ();
-	tracker_data_update_sparql (sparql, error);
-	tracker_data_commit_db_transaction ();
+	g_queue_push_tail (private->queues[priority], task);
 
+	if (!private->have_handler) {
+		start_handler (private);
+	}
 }
 
-GPtrArray *
-tracker_store_sparql_update_blank (const gchar *sparql,
-                                   GError     **error)
+void
+tracker_store_queue_turtle_import (GFile                      *file,
+                                   TrackerStoreTurtleCallback  callback,
+                                   gpointer                    user_data,
+                                   GDestroyNotify              destroy)
 {
 	TrackerStorePrivate *private;
-	GPtrArray *blank_nodes;
+	TrackerStoreTask    *task;
 
-	g_return_val_if_fail (sparql != NULL, NULL);
+	g_return_if_fail (G_IS_FILE (file));
 
 	private = g_static_private_get (&private_key);
-	g_return_val_if_fail (private != NULL, NULL);
-
-	if (private->batch_mode) {
-		/* commit pending batch items */
-		tracker_data_commit_db_transaction ();
-		private->batch_mode = FALSE;
-		private->batch_count = 0;
-	}
+	g_return_if_fail (private != NULL);
 
-	tracker_data_begin_db_transaction ();
-	blank_nodes = tracker_data_update_sparql_blank (sparql, error);
-	tracker_data_commit_db_transaction ();
+	task = g_slice_new0 (TrackerStoreTask);
+	task->type = TRACKER_STORE_TASK_TYPE_TURTLE;
+	task->data.turtle.path = g_file_get_path (file);
+	task->user_data = user_data;
+	task->callback.update_callback = callback;
+	task->destroy = destroy;
 
-	return blank_nodes;
-}
+	g_queue_push_tail (private->queues[TRACKER_STORE_PRIORITY_LOW], task);
 
-TrackerDBResultSet*
-tracker_store_sparql_query (const gchar *sparql,
-                            GError     **error)
-{
-	return tracker_data_query_sparql (sparql, error);
+	if (!private->have_handler) {
+		start_handler (private);
+	}
 }
 
 guint
diff --git a/src/tracker-store/tracker-store.h b/src/tracker-store/tracker-store.h
index c74e965..28368eb 100644
--- a/src/tracker-store/tracker-store.h
+++ b/src/tracker-store/tracker-store.h
@@ -35,11 +35,17 @@ typedef enum {
 	TRACKER_STORE_N_PRIORITIES
 } TrackerStorePriority;
 
-typedef void (* TrackerStoreSparqlUpdateCallback) (GError          *error,
-                                                   gpointer         user_data);
-typedef void (* TrackerStoreCommitCallback)       (gpointer         user_data);
-typedef void (* TrackerStoreTurtleCallback)       (GError          *error,
-                                                   gpointer         user_data);
+typedef void (* TrackerStoreSparqlQueryCallback)       (TrackerDBResultSet *result_set,
+                                                        GError          *error,
+                                                        gpointer         user_data);
+typedef void (* TrackerStoreSparqlUpdateCallback)      (GError          *error,
+                                                        gpointer         user_data);
+typedef void (* TrackerStoreSparqlUpdateBlankCallback) (GPtrArray       *blank_nodes,
+                                                        GError          *error,
+                                                        gpointer         user_data);
+typedef void (* TrackerStoreCommitCallback)            (gpointer         user_data);
+typedef void (* TrackerStoreTurtleCallback)            (GError          *error,
+                                                        gpointer         user_data);
 
 void         tracker_store_init                   (void);
 void         tracker_store_shutdown               (void);
@@ -47,22 +53,29 @@ void         tracker_store_queue_commit           (TrackerStoreCommitCallback ca
                                                    const gchar   *client_id,
                                                    gpointer       user_data,
                                                    GDestroyNotify destroy);
-void         tracker_store_queue_sparql_update    (const gchar   *sparql,
+void         tracker_store_sparql_query           (const gchar   *sparql,
+                                                   TrackerStorePriority priority,
+                                                   TrackerStoreSparqlQueryCallback callback,
+                                                   const gchar   *client_id,
+                                                   gpointer       user_data,
+                                                   GDestroyNotify destroy);
+void         tracker_store_sparql_update          (const gchar   *sparql,
+                                                   TrackerStorePriority priority,
+                                                   gboolean       batch,
                                                    TrackerStoreSparqlUpdateCallback callback,
                                                    const gchar   *client_id,
                                                    gpointer       user_data,
                                                    GDestroyNotify destroy);
+void         tracker_store_sparql_update_blank    (const gchar   *sparql,
+                                                   TrackerStorePriority priority,
+                                                   TrackerStoreSparqlUpdateBlankCallback callback,
+                                                   const gchar   *client_id,
+                                                   gpointer       user_data,
+                                                   GDestroyNotify destroy);
 void         tracker_store_queue_turtle_import    (GFile         *file,
                                                    TrackerStoreTurtleCallback callback,
                                                    gpointer       user_data,
                                                    GDestroyNotify destroy);
-void         tracker_store_sparql_update          (const gchar   *sparql,
-                                                   GError       **error);
-GPtrArray *  tracker_store_sparql_update_blank    (const gchar   *sparql,
-                                                   GError       **error);
-TrackerDBResultSet*
-tracker_store_sparql_query           (const gchar   *sparql,
-                                      GError       **error);
 
 guint        tracker_store_get_queue_size         (void);
 



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]