[tracker] TrackerMiner: Make SPARQL API fully asynchronous.



commit 8f9cd2ddb26087950d6a5f83070532663d94aaff
Author: Carlos Garnacho <carlos lanedo com>
Date:   Fri Oct 30 16:53:15 2009 +0100

    TrackerMiner: Make SPARQL API fully asynchronous.
    
    Callers have been modified in TrackerMinerFS and TrackerMinerFiles.

 src/libtracker-miner/tracker-miner-fs.c    |  271 ++++++++++++++++-----
 src/libtracker-miner/tracker-miner.c       |  361 +++++++++++++++++++++-------
 src/libtracker-miner/tracker-miner.h       |   41 +++-
 src/tracker-miner-fs/tracker-miner-files.c |  101 +++++---
 4 files changed, 570 insertions(+), 204 deletions(-)
---
diff --git a/src/libtracker-miner/tracker-miner-fs.c b/src/libtracker-miner/tracker-miner-fs.c
index ccd3ed0..7d7ef67 100644
--- a/src/libtracker-miner/tracker-miner-fs.c
+++ b/src/libtracker-miner/tracker-miner-fs.c
@@ -57,6 +57,19 @@ typedef struct {
 	TrackerSparqlBuilder *builder;
 } ProcessData;
 
+typedef struct {
+	GMainLoop *main_loop;
+	gboolean value;
+} SparqlQueryData;
+
+typedef struct {
+	GMainLoop *main_loop;
+	gint       level;
+	GString   *sparql;
+	const gchar *source_uri;
+	const gchar *uri;
+} RecursiveMoveData;
+
 struct TrackerMinerFSPrivate {
 	TrackerMonitor *monitor;
 	TrackerCrawler *crawler;
@@ -192,6 +205,10 @@ static void           crawl_directories_stop       (TrackerMinerFS *fs);
 
 static void           item_queue_handlers_set_up   (TrackerMinerFS *fs);
 
+static void           item_update_uri_recursively (TrackerMinerFS    *fs,
+						   RecursiveMoveData *data,
+						   const gchar       *source_uri,
+						   const gchar       *uri);
 
 static guint signals[LAST_SIGNAL] = { 0, };
 
@@ -436,10 +453,16 @@ process_data_new (GFile                *file,
 {
 	ProcessData *data;
 
-	data = g_slice_new (ProcessData);
+	data = g_slice_new0 (ProcessData);
 	data->file = g_object_ref (file);
-	data->cancellable = g_object_ref (cancellable);
-	data->builder = g_object_ref (builder);
+
+	if (cancellable) {
+		data->cancellable = g_object_ref (cancellable);
+	}
+
+	if (builder) {
+		data->builder = g_object_ref (builder);
+	}
 
 	return data;
 }
@@ -448,8 +471,15 @@ static void
 process_data_free (ProcessData *data)
 {
 	g_object_unref (data->file);
-	g_object_unref (data->cancellable);
-	g_object_unref (data->builder);
+
+	if (data->cancellable) {
+		g_object_unref (data->cancellable);
+	}
+
+	if (data->builder) {
+		g_object_unref (data->builder);
+	}
+
 	g_slice_free (ProcessData, data);
 }
 
@@ -689,12 +719,22 @@ process_print_stats (TrackerMinerFS *fs)
 }
 
 static void
+commit_cb (TrackerMiner *miner,
+	   const GError *error,
+	   gpointer      user_data)
+{
+	if (error) {
+		g_critical ("Could not commit: %s", error->message);
+	}
+}
+
+static void
 process_stop (TrackerMinerFS *fs) 
 {
 	/* Now we have finished crawling, print stats and enable monitor events */
 	process_print_stats (fs);
 
-	tracker_miner_commit (TRACKER_MINER (fs));
+	tracker_miner_commit (TRACKER_MINER (fs), NULL, commit_cb, NULL);
 
 	g_message ("Idle");
 
@@ -745,6 +785,48 @@ item_moved_data_free (ItemMovedData *data)
 }
 
 static void
+sparql_update_cb (TrackerMiner *miner,
+		  const GError *error,
+		  gpointer      user_data)
+{
+	TrackerMinerFS *fs;
+	TrackerMinerFSPrivate *priv;
+	ProcessData *data;
+
+	fs = TRACKER_MINER_FS (miner);
+	priv = fs->private;
+	data = user_data;
+
+	if (error) {
+		g_critical ("Could not execute sparql: %s", error->message);
+	} else {
+		if (fs->private->been_crawled) {
+			/* Only commit immediately for
+			 * changes after initial crawling.
+			 */
+			tracker_miner_commit (TRACKER_MINER (fs), NULL, commit_cb, NULL);
+		}
+	}
+
+	priv->processing_pool = g_list_remove (priv->processing_pool, data);
+	process_data_free (data);
+
+	item_queue_handlers_set_up (fs);
+}
+
+static void
+sparql_query_cb (TrackerMiner *miner,
+		 GPtrArray    *result,
+		 const GError *error,
+		 gpointer      user_data)
+{
+	SparqlQueryData *data = user_data;
+
+	data->value = result && result->len == 1;
+	g_main_loop_quit (data->main_loop);
+}
+
+static void
 item_add_or_update_cb (TrackerMinerFS *fs,
 		       ProcessData    *data,
 		       const GError   *error)
@@ -755,6 +837,12 @@ item_add_or_update_cb (TrackerMinerFS *fs,
 
 	if (error) {
 		g_message ("Could not process '%s': %s", uri, error->message);
+
+		fs->private->processing_pool =
+			g_list_remove (fs->private->processing_pool, data);
+		process_data_free (data);
+
+		item_queue_handlers_set_up (fs);
 	} else {
 		gchar *full_sparql;
 
@@ -763,24 +851,15 @@ item_add_or_update_cb (TrackerMinerFS *fs,
 		full_sparql = g_strdup_printf ("DROP GRAPH <%s> %s",
 					       uri, tracker_sparql_builder_get_result (data->builder));
 
-		tracker_miner_execute_batch_update (TRACKER_MINER (fs), full_sparql, NULL);
+		tracker_miner_execute_batch_update (TRACKER_MINER (fs),
+						    full_sparql,
+						    NULL,
+						    sparql_update_cb,
+						    data);
 		g_free (full_sparql);
-
-		if (fs->private->been_crawled) {
-			/* Only commit immediately for
-			 * changes after initial crawling.
-			 */
-			tracker_miner_commit (TRACKER_MINER (fs));
-		}
 	}
 
-	fs->private->processing_pool =
-		g_list_remove (fs->private->processing_pool, data);
-	process_data_free (data);
-
 	g_free (uri);
-
-	item_queue_handlers_set_up (fs);
 }
 
 static gboolean
@@ -849,29 +928,39 @@ item_query_exists (TrackerMinerFS *miner,
 {
 	gboolean   result;
 	gchar     *sparql, *uri;
-	GPtrArray *sparql_result;
+	SparqlQueryData data;
 
 	uri = g_file_get_uri (file);
 	sparql = g_strdup_printf ("SELECT ?s WHERE { ?s a rdfs:Resource . FILTER (?s = <%s>) }",
 	                          uri);
 
-	sparql_result = tracker_miner_execute_sparql (TRACKER_MINER (miner), sparql, NULL);
+	data.main_loop = g_main_loop_new (NULL, FALSE);
+	data.value = FALSE;
+
+	tracker_miner_execute_sparql (TRACKER_MINER (miner),
+				      sparql,
+				      NULL,
+				      sparql_query_cb,
+				      &data);
 
-	result = (sparql_result && sparql_result->len == 1);
+	g_main_loop_run (data.main_loop);
+	result = data.value;
+
+	g_main_loop_unref (data.main_loop);
 
-	tracker_dbus_results_ptr_array_free (&sparql_result);
 	g_free (sparql);
 	g_free (uri);
 
 	return result;
 }
 
-static void
+static gboolean
 item_remove (TrackerMinerFS *fs,
 	     GFile          *file)
 {
 	GString *sparql;
 	gchar *uri, *slash_uri;
+	ProcessData *data;
 
 	uri = g_file_get_uri (file);
 
@@ -880,7 +969,7 @@ item_remove (TrackerMinerFS *fs,
 
 	if (!item_query_exists (fs, file)) {
 		g_debug ("  File does not exist anyway (uri:'%s')", uri);
-		return;
+		return TRUE;
 	}
 
 	if (!g_str_has_suffix (uri, "/")) {
@@ -902,59 +991,84 @@ item_remove (TrackerMinerFS *fs,
 				"DELETE { <%s> a rdfs:Resource }",
 				uri);
 
-	tracker_miner_execute_batch_update (TRACKER_MINER (fs), sparql->str, NULL);
+	data = process_data_new (file, NULL, NULL);
+	fs->private->processing_pool = g_list_prepend (fs->private->processing_pool, data);
+
+	tracker_miner_execute_batch_update (TRACKER_MINER (fs),
+					    sparql->str,
+					    NULL,
+					    sparql_update_cb,
+					    data);
 
 	g_string_free (sparql, TRUE);
 	g_free (slash_uri);
 	g_free (uri);
+
+	return FALSE;
 }
 
 static void
-item_update_uri_recursively (TrackerMinerFS *fs,
-			     GString        *sparql_update,
-			     const gchar    *source_uri,
-			     const gchar    *uri)
+item_update_uri_recursively_cb (TrackerMiner *miner,
+				GPtrArray    *result,
+				const GError *error,
+				gpointer      user_data)
 {
-	gchar *sparql;
-	GPtrArray *result_set;
+	TrackerMinerFS *fs = TRACKER_MINER_FS (miner);
+	RecursiveMoveData *data = user_data;
 
-	g_debug ("Moving item from '%s' to '%s'",
-		 source_uri,
-		 uri);
-
-	g_string_append_printf (sparql_update, " <%s> tracker:uri <%s> .", source_uri, uri);
-
-	sparql = g_strdup_printf ("SELECT ?child WHERE { ?child nfo:belongsToContainer <%s> }", source_uri);
-	result_set = tracker_miner_execute_sparql (TRACKER_MINER (fs), sparql, NULL);
-	g_free (sparql);
-
-	if (result_set) {
+	if (result) {
 		gint i;
 
-		for (i = 0; i < result_set->len; i++) {
+		for (i = 0; i < result->len; i++) {
 			gchar **child_source_uri, *child_uri;
 
-			child_source_uri = g_ptr_array_index (result_set, i);
+			child_source_uri = g_ptr_array_index (result, i);
 
-			if (!g_str_has_prefix (*child_source_uri, source_uri)) {
+			if (!g_str_has_prefix (*child_source_uri, data->source_uri)) {
 				g_warning ("Child URI '%s' does not start with parent URI '%s'",
 				           *child_source_uri,
-				           source_uri);
+				           data->source_uri);
 				continue;
 			}
 
-			child_uri = g_strdup_printf ("%s%s", uri, *child_source_uri + strlen (source_uri));
+			child_uri = g_strdup_printf ("%s%s", data->uri, *child_source_uri + strlen (data->source_uri));
 
-			item_update_uri_recursively (fs, sparql_update, *child_source_uri, child_uri);
+			item_update_uri_recursively (fs, data, *child_source_uri, child_uri);
 
-			g_free (child_source_uri);
 			g_free (child_uri);
 		}
+	}
 
-		g_ptr_array_free (result_set, TRUE);
+	data->level--;
+
+	g_assert (data->level >= 0);
+
+	if (data->level == 0) {
+		g_main_loop_quit (data->main_loop);
 	}
 }
 
+static void
+item_update_uri_recursively (TrackerMinerFS    *fs,
+			     RecursiveMoveData *move_data,
+			     const gchar       *source_uri,
+			     const gchar       *uri)
+{
+	gchar *sparql;
+
+	move_data->level++;
+
+	g_string_append_printf (move_data->sparql, " <%s> tracker:uri <%s> .", source_uri, uri);
+
+	sparql = g_strdup_printf ("SELECT ?child WHERE { ?child nfo:belongsToContainer <%s> }", source_uri);
+	tracker_miner_execute_sparql (TRACKER_MINER (fs),
+				      sparql,
+				      NULL,
+				      item_update_uri_recursively_cb,
+				      move_data);
+	g_free (sparql);
+}
+
 static gboolean
 item_move (TrackerMinerFS *fs,
 	   GFile          *file,
@@ -963,6 +1077,8 @@ item_move (TrackerMinerFS *fs,
 	gchar     *uri, *source_uri, *escaped_filename;
 	GFileInfo *file_info;
 	GString   *sparql;
+	RecursiveMoveData move_data;
+	ProcessData *data;
 
 	uri = g_file_get_uri (file);
 	source_uri = g_file_get_uri (source_file);
@@ -987,15 +1103,21 @@ item_move (TrackerMinerFS *fs,
 				       NULL, NULL);
 
 	if (!file_info) {
+		gboolean retval;
+
 		/* Destination file has gone away, ignore dest file and remove source if any */
-		item_remove (fs, source_file);
+		retval = item_remove (fs, source_file);
 
 		g_free (source_uri);
 		g_free (uri);
 
-		return TRUE;
+		return retval;
 	}
 
+	g_debug ("Moving item from '%s' to '%s'",
+		 source_uri,
+		 uri);
+
 	sparql = g_string_new ("");
 
 	g_string_append_printf (sparql,
@@ -1008,11 +1130,28 @@ item_move (TrackerMinerFS *fs,
 
 	g_string_append_printf (sparql, " <%s> nfo:fileName \"%s\" .", source_uri, escaped_filename);
 
-	item_update_uri_recursively (fs, sparql, source_uri, uri);
+	move_data.main_loop = g_main_loop_new (NULL, FALSE);
+	move_data.level = 0;
+	move_data.sparql = sparql;
+	move_data.source_uri = source_uri;
+	move_data.uri = uri;
+
+	item_update_uri_recursively (fs, &move_data, source_uri, uri);
+
+	g_main_loop_run (move_data.main_loop);
+
+	g_main_loop_unref (move_data.main_loop);
 
 	g_string_append (sparql, " }");
 
-	tracker_miner_execute_batch_update (TRACKER_MINER (fs), sparql->str, NULL);
+	data = process_data_new (file, NULL, NULL);
+	fs->private->processing_pool = g_list_prepend (fs->private->processing_pool, data);
+
+	tracker_miner_execute_batch_update (TRACKER_MINER (fs),
+					    sparql->str,
+					    NULL,
+					    sparql_update_cb,
+					    data);
 
 	g_free (uri);
 	g_free (source_uri);
@@ -1136,7 +1275,7 @@ item_queue_handlers_cb (gpointer user_data)
 		keep_processing = item_move (fs, file, source_file);
 		break;
 	case QUEUE_DELETED:
-		item_remove (fs, file);
+		keep_processing = item_remove (fs, file);
 		break;
 	case QUEUE_CREATED:
 	case QUEUE_UPDATED:
@@ -1162,7 +1301,7 @@ item_queue_handlers_cb (gpointer user_data)
 			/* Only commit immediately for
 			 * changes after initial crawling.
 			 */
-			tracker_miner_commit (TRACKER_MINER (fs));
+			tracker_miner_commit (TRACKER_MINER (fs), NULL, commit_cb, NULL);
 		}
 
 		return TRUE;
@@ -1224,12 +1363,12 @@ should_change_index_for_file (TrackerMinerFS *fs,
 			      GFile          *file)
 {
 	gboolean            uptodate;
-	GPtrArray          *sparql_result;
 	GFileInfo          *file_info;
 	guint64             time;
 	time_t              mtime;
 	struct tm           t;
 	gchar              *query, *uri;
+	SparqlQueryData     data;
 
 	file_info = g_file_query_info (file, 
 				       G_FILE_ATTRIBUTE_TIME_MODIFIED, 
@@ -1259,12 +1398,20 @@ should_change_index_for_file (TrackerMinerFS *fs,
 				 t.tm_min,
 				 t.tm_sec,
 				 uri);
-	sparql_result = tracker_miner_execute_sparql (TRACKER_MINER (fs), query, NULL);
 
-	uptodate = sparql_result && sparql_result->len == 1;
+	data.main_loop = g_main_loop_new (NULL, FALSE);
+	data.value = FALSE;
+
+	tracker_miner_execute_sparql (TRACKER_MINER (fs),
+						      query,
+						      NULL,
+						      sparql_query_cb,
+						      &data);
 
-	tracker_dbus_results_ptr_array_free (&sparql_result);
+	g_main_loop_run (data.main_loop);
+	uptodate = data.value;
 
+	g_main_loop_unref (data.main_loop);
 	g_free (query);
 	g_free (uri);
 
diff --git a/src/libtracker-miner/tracker-miner.c b/src/libtracker-miner/tracker-miner.c
index eec2d56..3c6f43c 100644
--- a/src/libtracker-miner/tracker-miner.c
+++ b/src/libtracker-miner/tracker-miner.c
@@ -44,6 +44,8 @@
 
 #define TRACKER_MINER_GET_PRIVATE(o) (G_TYPE_INSTANCE_GET_PRIVATE ((o), TRACKER_TYPE_MINER, TrackerMinerPrivate))
 
+static GQuark miner_error_quark = 0;
+
 struct TrackerMinerPrivate {
 	TrackerClient *client;
 	
@@ -56,6 +58,8 @@ struct TrackerMinerPrivate {
 	gdouble progress;
 
 	gint availability_cookie;
+
+	GPtrArray *async_calls;
 };
 
 typedef struct {
@@ -70,6 +74,17 @@ typedef struct {
 	gchar *reason;	
 } PauseData;
 
+typedef struct {
+	TrackerMiner *miner;
+	GCancellable *cancellable;
+	gpointer callback;
+	gpointer user_data;
+
+	guint id;
+	guint signal_id;
+	gboolean update;
+} AsyncCallData;
+
 enum {
 	PROP_0,
 	PROP_NAME,
@@ -109,6 +124,12 @@ static void       pause_data_destroy (gpointer      data);
 static PauseData *pause_data_new     (const gchar  *application,
 				      const gchar  *reason);
 
+static void       async_call_notify_error (AsyncCallData *data,
+					   gint           code,
+					   const gchar   *message);
+static void       async_call_data_destroy (AsyncCallData *data);
+
+
 G_DEFINE_ABSTRACT_TYPE (TrackerMiner, tracker_miner, G_TYPE_OBJECT)
 
 static void
@@ -250,6 +271,8 @@ tracker_miner_class_init (TrackerMinerClass *klass)
 							      G_PARAM_READWRITE));
 
 	g_type_class_add_private (object_class, sizeof (TrackerMinerPrivate));
+
+	miner_error_quark = g_quark_from_static_string ("TrackerMiner");
 }
 
 static void
@@ -266,6 +289,7 @@ tracker_miner_init (TrackerMiner *miner)
 					      g_direct_equal,
 					      NULL,
 					      pause_data_destroy);
+	priv->async_calls = g_ptr_array_new ();
 }
 
 static void
@@ -350,6 +374,14 @@ miner_get_property (GObject    *object,
 }
 
 static void
+async_call_finalize_foreach (AsyncCallData *data,
+			     gpointer       user_data)
+{
+	async_call_notify_error (data, 0, "Miner is being finalized");
+	async_call_data_destroy (data);
+}
+
+static void
 miner_finalize (GObject *object)
 {
 	TrackerMiner *miner = TRACKER_MINER (object);
@@ -367,6 +399,11 @@ miner_finalize (GObject *object)
 
 	g_hash_table_unref (miner->private->pauses);
 
+	g_ptr_array_foreach (miner->private->async_calls,
+			     (GFunc) async_call_finalize_foreach,
+			     object);
+	g_ptr_array_free (miner->private->async_calls, TRUE);
+
 	G_OBJECT_CLASS (tracker_miner_parent_class)->finalize (object);
 }
 
@@ -688,151 +725,293 @@ tracker_miner_is_started (TrackerMiner  *miner)
 	return miner->private->started;
 }
 
+static void
+async_call_data_destroy (AsyncCallData *data)
+{
+	TrackerMiner *miner = data->miner;
+
+	if (data->cancellable) {
+		if (data->signal_id) {
+			g_signal_handler_disconnect (data->cancellable, data->signal_id);
+		}
+
+		g_object_unref (data->cancellable);
+	}
+
+	if (data->id != 0) {
+		tracker_cancel_call (miner->private->client, data->id);
+		data->id = 0;
+	}
+
+	g_slice_free (AsyncCallData, data);
+}
+
+static void
+run_update_callback (AsyncCallData *data,
+		     const GError  *error)
+{
+	TrackerMinerUpdateCallback callback;
+
+	callback = data->callback;
+
+	if (callback) {
+		(callback) (data->miner, error, data->user_data);
+	}
+}
+
+static void
+run_query_callback (AsyncCallData *data,
+		    GPtrArray     *result,
+		    const GError  *error)
+{
+	TrackerMinerQueryCallback callback;
+
+	callback = data->callback;
+
+	if (callback) {
+		(callback) (data->miner, result, error, data->user_data);
+	}
+}
+
+static void
+sparql_update_cb (GError   *error,
+		  gpointer  user_data)
+{
+	AsyncCallData *data = user_data;
+
+	run_update_callback (data, error);
+
+	async_call_data_destroy (data);
+}
+
+static void
+sparql_query_cb (GPtrArray *result,
+		 GError    *error,
+		 gpointer   user_data)
+{
+	AsyncCallData *data = user_data;
+
+	run_query_callback (data, result, error);
+
+	if (result) {
+		tracker_dbus_results_ptr_array_free (&result);
+	}
+
+	async_call_data_destroy (data);
+}
+
+static void
+sparql_cancelled_cb (GCancellable  *cancellable,
+		     AsyncCallData *data)
+{
+	TrackerMinerPrivate *priv;
+
+	async_call_notify_error (data, 0, "SPARQL operation was cancelled");
+
+	priv = TRACKER_MINER_GET_PRIVATE (data->miner);
+
+	g_ptr_array_remove (priv->async_calls, data);
+	async_call_data_destroy (data);
+}
+
+static AsyncCallData *
+async_call_data_new (TrackerMiner *miner,
+		     GCancellable *cancellable,
+		     gpointer      callback,
+		     gpointer      user_data,
+		     gboolean      update)
+{
+	AsyncCallData *data;
+
+	data = g_slice_new0 (AsyncCallData);
+	data->miner = miner;
+	data->callback = callback;
+	data->user_data = user_data;
+	data->update = update;
+
+	if (cancellable) {
+		data->cancellable = g_object_ref (cancellable);
+
+		data->signal_id = g_signal_connect (cancellable, "cancelled",
+						    G_CALLBACK (sparql_cancelled_cb), data);
+	}
+
+	g_ptr_array_add (miner->private->async_calls, data);
+
+	return data;
+}
+
+static void
+async_call_notify_error (AsyncCallData *data,
+			 gint           code,
+			 const gchar   *message)
+{
+	TrackerMiner *miner;
+	GError *error;
+
+	miner = data->miner;
+
+	if (data->id != 0) {
+		tracker_cancel_call (miner->private->client, data->id);
+		data->id = 0;
+	}
+
+	if (data->callback) {
+		error = g_error_new_literal (miner_error_quark, code, message);
+
+		if (data->update) {
+			run_update_callback (data, error);
+		} else {
+			run_query_callback (data, NULL, error);
+		}
+
+		g_error_free (error);
+	}
+}
+
 /**
  * tracker_miner_execute_update:
  * @miner: a #TrackerMiner
  * @sparql: a SPARQL query
- * @error: return location for errors
+ * @cancellable: a #GCancellable to control the operation
+ * @callback: a #TrackerMinerUpdateCallback to call when the operation is finished
+ * @user_data: data to pass to @callback
  *
  * Executes an update SPARQL query on tracker-store, use this
  * whenever you want to perform data insertions or modifications.
  *
- * Returns: #TRUE if the SPARQL query was executed successfully.
+ * When the operation is finished, @callback will be called, providing the error, if any.
+ * If the operation is cancelled, @callback will be called anyways, with error filled in.
  **/
-gboolean
-tracker_miner_execute_update (TrackerMiner  *miner,
-			      const gchar   *sparql,
-			      GError       **error)
+void
+tracker_miner_execute_update (TrackerMiner               *miner,
+			      const gchar                *sparql,
+			      GCancellable               *cancellable,
+			      TrackerMinerUpdateCallback  callback,
+			      gpointer                    user_data)
 {
-	GError *internal_error = NULL;
-
-	g_return_val_if_fail (TRACKER_IS_MINER (miner), FALSE);
-
-	tracker_resources_sparql_update (miner->private->client,
-					 sparql, 
-					 &internal_error);
+	TrackerMinerPrivate *priv;
+	AsyncCallData *data;
 
-	if (!internal_error) {
-		return TRUE;
-	}
+	g_return_if_fail (TRACKER_IS_MINER (miner));
+	g_return_if_fail (sparql != NULL);
+	g_return_if_fail (!cancellable || G_IS_CANCELLABLE (cancellable));
 
-	if (error) {
-		g_propagate_error (error, internal_error);
-	} else {
-		g_warning ("Error running sparql queries: %s", internal_error->message);
-		g_error_free (internal_error);
-	}
+	priv = TRACKER_MINER_GET_PRIVATE (miner);
+	data = async_call_data_new (miner, cancellable, callback, user_data, TRUE);
 
-	return FALSE;
+	data->id = tracker_resources_sparql_update_async (miner->private->client,
+							  sparql, sparql_update_cb,
+							  data);
 }
 
 /**
  * tracker_miner_execute_sparql:
  * @miner: a #TrackerMiner
  * @sparql: a SPARQL query
- * @error: return location for errors
+ * @cancellable: a #GCancellable to control the operation
+ * @callback: a #TrackerMinerQueryCallback to call when the operation is finished
+ * @user_data: data to pass to @callback
  *
- * Executes the SPARQL query on tracker-store and returns the
- * queried data. Use this whenever you need to get data from
+ * Executes the SPARQL query on tracker-store and returns asynchronously
+ * the queried data. Use this whenever you need to get data from
  * already stored information.
  *
- * Returns: a #GPtrArray with the returned data.
+ * When the operation is finished, @callback will be called, providing the queried data,
+ * or the error, if any. If the operation is cancelled, @callback will be called with the
+ * error parameter filled in.
  **/
-GPtrArray *
-tracker_miner_execute_sparql (TrackerMiner  *miner,
-			      const gchar   *sparql,
-			      GError       **error)
+void
+tracker_miner_execute_sparql (TrackerMiner              *miner,
+			      const gchar               *sparql,
+			      GCancellable              *cancellable,
+			      TrackerMinerQueryCallback  callback,
+			      gpointer                   user_data)
 {
-	GError *internal_error = NULL;
-	GPtrArray *res;
-
-	g_return_val_if_fail (TRACKER_IS_MINER (miner), FALSE);
-
-	res = tracker_resources_sparql_query (miner->private->client,
-					      sparql, 
-					      &internal_error);
+	TrackerMinerPrivate *priv;
+	AsyncCallData *data;
 
-	if (!internal_error) {
-		return res;
-	}
+	g_return_if_fail (TRACKER_IS_MINER (miner));
+	g_return_if_fail (sparql != NULL);
+	g_return_if_fail (!cancellable || G_IS_CANCELLABLE (cancellable));
+	g_return_if_fail (callback != NULL);
 
-	if (error) {
-		g_propagate_error (error, internal_error);
-	} else {
-		g_warning ("Error running sparql queries: %s", internal_error->message);
-		g_error_free (internal_error);
-	}
+	priv = TRACKER_MINER_GET_PRIVATE (miner);
+	data = async_call_data_new (miner, cancellable, callback, user_data, FALSE);
 
-	return res;
+	data->id = tracker_resources_sparql_query_async (miner->private->client,
+							 sparql, sparql_query_cb,
+							 data);
 }
 
 /**
  * tracker_miner_execute_batch_update:
  * @miner: a #TrackerMiner
  * @sparql: a set of SPARQL updates
- * @error: return location for errors
+ * @cancellable: a #GCancellable to control the operation
+ * @callback: a #TrackerMinerUpdateCallback to call when the operation is finished
+ * @user_data: data to pass to @callback
  *
  * Executes a batch of update SPARQL queries on tracker-store, use this
  * whenever you want to perform data insertions or modifications in
  * batches.
  *
- * Returns: #TRUE if the SPARQL query was executed successfully.
+ * When the operation is finished, @callback will be called, providing the error, if any.
+ * If the operation is cancelled, @callback will be called anyways, with error filled in.
  **/
-gboolean
-tracker_miner_execute_batch_update (TrackerMiner  *miner,
-				    const gchar   *sparql,
-				    GError       **error)
+void
+tracker_miner_execute_batch_update (TrackerMiner               *miner,
+				    const gchar                *sparql,
+				    GCancellable               *cancellable,
+				    TrackerMinerUpdateCallback  callback,
+				    gpointer                    user_data)
 {
-	GError *internal_error = NULL;
-
-	g_return_val_if_fail (TRACKER_IS_MINER (miner), FALSE);
+	TrackerMinerPrivate *priv;
+	AsyncCallData *data;
 
-	tracker_resources_batch_sparql_update (miner->private->client,
-					       sparql, 
-					       &internal_error);
-	if (!internal_error) {
-		return TRUE;
-	}
+	g_return_if_fail (TRACKER_IS_MINER (miner));
+	g_return_if_fail (sparql != NULL);
+	g_return_if_fail (!cancellable || G_IS_CANCELLABLE (cancellable));
 
-	if (error) {
-		g_propagate_error (error, internal_error);
-	} else {
-		g_warning ("Error running sparql queries: %s", internal_error->message);
-		g_error_free (internal_error);
-	}
+	priv = TRACKER_MINER_GET_PRIVATE (miner);
+	data = async_call_data_new (miner, cancellable, callback, user_data, TRUE);
 
-	return FALSE;
+	data->id = tracker_resources_batch_sparql_update_async (miner->private->client,
+								sparql, sparql_update_cb,
+								data);
 }
 
 /**
  * tracker_miner_commit:
  * @miner: a #TrackerMiner
+ * @cancellable: a #GCancellable to control the operation
+ * @callback: a #TrackerMinerUpdateCallback to call when the operation is finished
+ * @user_data: data to pass to @callback
  *
- * Commits all pending batch updates. see tracker_miner_execute_batch_update()
- *
- * Returns: #TRUE if the data was committed successfully.
+ * Commits all pending batch updates. see tracker_miner_execute_batch_update(). When
+ * the operation is finished, @callback will be called, with the error parameter filled
+ * in, if any. If the operation is cancelled through @cancellable, the callback will be
+ * called anyways with error filled in.
  **/
-gboolean
-tracker_miner_commit (TrackerMiner *miner)
-{
-	GError *error = NULL;
-
-	g_return_val_if_fail (TRACKER_IS_MINER (miner), FALSE);
+void
+tracker_miner_commit (TrackerMiner               *miner,
+		      GCancellable               *cancellable,
+		      TrackerMinerUpdateCallback  callback,
+		      gpointer                    user_data)
 
-	if (g_hash_table_size (miner->private->pauses) > 0) {
-		g_warning ("Can not commit while miner is paused");
-		return FALSE;
-	}
+{
+	TrackerMinerPrivate *priv;
+	AsyncCallData *data;
 
-	tracker_resources_batch_commit (miner->private->client, &error);
+	g_return_if_fail (TRACKER_IS_MINER (miner));
+	g_return_if_fail (!cancellable || G_IS_CANCELLABLE (cancellable));
 
-	if (error) {
-		g_critical ("Could not commit: %s", error->message);
-		g_error_free (error);
-		return FALSE;
-	}
+	priv = TRACKER_MINER_GET_PRIVATE (miner);
+	data = async_call_data_new (miner, cancellable, callback, user_data, TRUE);
 
-	return TRUE;
+	data->id = tracker_resources_batch_commit_async (miner->private->client,
+							 sparql_update_cb,
+							 data);
 }
 
 static gint
diff --git a/src/libtracker-miner/tracker-miner.h b/src/libtracker-miner/tracker-miner.h
index 3406947..af046e4 100644
--- a/src/libtracker-miner/tracker-miner.h
+++ b/src/libtracker-miner/tracker-miner.h
@@ -22,6 +22,7 @@
 #define __LIBTRACKERMINER_MINER_H__
 
 #include <glib-object.h>
+#include <gio/gio.h>
 #include <libtracker-client/tracker.h>
 
 G_BEGIN_DECLS
@@ -82,6 +83,14 @@ typedef struct {
 			     GError       *error);
 } TrackerMinerClass;
 
+typedef void (* TrackerMinerUpdateCallback) (TrackerMiner *miner,
+					     const GError *error,
+					     gpointer      user_data);
+typedef void (* TrackerMinerQueryCallback)  (TrackerMiner *miner,
+					     GPtrArray    *result,
+					     const GError *error,
+					     gpointer      user_data);
+
 GType          tracker_miner_get_type       (void) G_GNUC_CONST;
 GQuark	       tracker_miner_error_quark    (void);
 
@@ -90,18 +99,6 @@ void           tracker_miner_stop           (TrackerMiner  *miner);
 
 gboolean       tracker_miner_is_started     (TrackerMiner  *miner);
 
-gboolean       tracker_miner_execute_update (TrackerMiner  *miner,
-					     const gchar   *sparql,
-					     GError       **error);
-GPtrArray *    tracker_miner_execute_sparql (TrackerMiner  *miner,
-					     const gchar   *sparql,
-					     GError       **error);
-gboolean       tracker_miner_execute_batch_update
-					    (TrackerMiner  *miner,
-					     const gchar   *sparql,
-					     GError       **error);
-gboolean       tracker_miner_commit         (TrackerMiner  *miner);
-
 gint           tracker_miner_pause          (TrackerMiner  *miner,
 					     const gchar   *reason,
 					     GError       **error);
@@ -109,6 +106,26 @@ gboolean       tracker_miner_resume         (TrackerMiner  *miner,
 					     gint           cookie,
 					     GError       **error);
 
+void           tracker_miner_execute_update       (TrackerMiner               *miner,
+						   const gchar                *sparql,
+						   GCancellable               *cancellable,
+						   TrackerMinerUpdateCallback  callback,
+						   gpointer                    user_data);
+void           tracker_miner_execute_sparql       (TrackerMiner               *miner,
+						   const gchar                *sparql,
+						   GCancellable               *cancellable,
+						   TrackerMinerQueryCallback   callback,
+						   gpointer                    user_data);
+void           tracker_miner_execute_batch_update (TrackerMiner               *miner,
+						   const gchar                *sparql,
+						   GCancellable               *cancellable,
+						   TrackerMinerUpdateCallback  callback,
+						   gpointer                    user_data);
+void           tracker_miner_commit               (TrackerMiner               *miner,
+						   GCancellable               *cancellable,
+						   TrackerMinerUpdateCallback  callback,
+						   gpointer                    user_data);
+
 
 G_END_DECLS
 
diff --git a/src/tracker-miner-fs/tracker-miner-files.c b/src/tracker-miner-fs/tracker-miner-files.c
index 6b9ab4c..9f60d08 100644
--- a/src/tracker-miner-fs/tracker-miner-files.c
+++ b/src/tracker-miner-fs/tracker-miner-files.c
@@ -430,6 +430,21 @@ miner_files_constructed (GObject *object)
 }
 
 static void
+set_up_mount_point_cb (TrackerMiner *miner,
+		       const GError *error,
+		       gpointer      user_data)
+{
+	gchar *removable_device_urn = user_data;
+
+	if (error) {
+		g_critical ("Could not set up mount point '%s': %s",
+			    removable_device_urn, error->message);
+	}
+
+	g_free (removable_device_urn);
+}
+
+static void
 set_up_mount_point (TrackerMinerFiles *miner,
                     const gchar       *removable_device_urn,
                     const gchar       *mount_point,
@@ -437,7 +452,6 @@ set_up_mount_point (TrackerMinerFiles *miner,
                     GString           *accumulator)
 {
 	GString *queries;
-	GError *error = NULL;
 
 	g_debug ("Setting up mount point '%s'", removable_device_urn);
 
@@ -500,58 +514,59 @@ set_up_mount_point (TrackerMinerFiles *miner,
 	if (accumulator) {
 		g_string_append_printf (accumulator, "%s ", queries->str);
 	} else {
-
-		tracker_miner_execute_update (TRACKER_MINER (miner), queries->str, &error);
-
-		if (error) {
-			g_critical ("Could not set up mount point '%s': %s",
-			            removable_device_urn, error->message);
-			g_error_free (error);
-		}
+		tracker_miner_execute_update (TRACKER_MINER (miner),
+					      queries->str,
+					      NULL,
+					      set_up_mount_point_cb,
+					      g_strdup (removable_device_urn));
 	}
 
 	g_string_free (queries, TRUE);
 }
 
 static void
-init_mount_points (TrackerMinerFiles *miner)
+init_mount_points_cb (TrackerMiner *miner,
+		      const GError *error,
+		      gpointer      user_data)
+{
+	if (error) {
+		g_critical ("Could not initialize currently active mount points: %s",
+			    error->message);
+	}
+}
+
+static void
+query_mount_points_cb (TrackerMiner *miner,
+		       GPtrArray    *result,
+		       const GError *error,
+		       gpointer      user_data)
 {
 	TrackerMinerFilesPrivate *priv;
 	GHashTable *volumes;
 	GHashTableIter iter;
 	gpointer key, value;
-	GPtrArray *sparql_result;
-	GError *error = NULL;
 	GString *accumulator;
 	gint i;
 #ifdef HAVE_HAL
 	GSList *udis, *u;
 #endif
 
-	priv = TRACKER_MINER_FILES_GET_PRIVATE (miner);
+	if (error) {
+		g_critical ("Could not obtain the mounted volumes");
+		return;
+	}
 
-	g_debug ("Initializing mount points");
+	priv = TRACKER_MINER_FILES_GET_PRIVATE (miner);
 
 	volumes = g_hash_table_new_full (g_str_hash, g_str_equal,
 	                                 (GDestroyNotify) g_free,
 	                                 NULL);
 
-	/* First, get all mounted volumes, according to tracker-store */
-	sparql_result = tracker_miner_execute_sparql (TRACKER_MINER (miner),
-	                                              "SELECT ?v WHERE { ?v a tracker:Volume ; tracker:isMounted true }",
-	                                              &error);
-
-	if (error) {
-		g_critical ("Could not obtain the mounted volumes");
-		g_error_free (error);
-		return;
-	}
-
-	for (i = 0; i < sparql_result->len; i++) {
+	for (i = 0; i < result->len; i++) {
 		gchar **row;
 		gint state;
 
-		row = g_ptr_array_index (sparql_result, i);
+		row = g_ptr_array_index (result, i);
 		state = VOLUME_MOUNTED_IN_STORE;
 
 		if (strcmp (row[0], TRACKER_NON_REMOVABLE_MEDIA_DATASOURCE_URN) == 0) {
@@ -562,9 +577,6 @@ init_mount_points (TrackerMinerFiles *miner)
 		g_hash_table_insert (volumes, g_strdup (row[0]), GINT_TO_POINTER (state));
 	}
 
-	g_ptr_array_foreach (sparql_result, (GFunc) g_strfreev, NULL);
-	g_ptr_array_free (sparql_result, TRUE);
-
 	g_hash_table_replace (volumes, g_strdup (TRACKER_NON_REMOVABLE_MEDIA_DATASOURCE_URN),
 	                      GINT_TO_POINTER (VOLUME_MOUNTED));
 
@@ -613,28 +625,39 @@ init_mount_points (TrackerMinerFiles *miner)
 
 			g_debug ("URN '%s' (mount point: %s) was not reported to be mounted, but now it is, updating state",
 			         mount_point, urn);
-			set_up_mount_point (miner, urn, mount_point, TRUE, accumulator);
+			set_up_mount_point (TRACKER_MINER_FILES (miner), urn, mount_point, TRUE, accumulator);
 		} else if (!(state & VOLUME_MOUNTED) &&
 			   (state & VOLUME_MOUNTED_IN_STORE)) {
 			g_debug ("URN '%s' was reported to be mounted, but it isn't anymore, updating state", urn);
-			set_up_mount_point (miner, urn, NULL, FALSE, accumulator);
+			set_up_mount_point (TRACKER_MINER_FILES (miner), urn, NULL, FALSE, accumulator);
 		}
 	}
 
 	if (accumulator->str[0] != '\0') {
-		tracker_miner_execute_update (TRACKER_MINER (miner), accumulator->str, &error);
-
-		if (error) {
-			g_critical ("Could not initialize currently active mount points: %s",
-			            error->message);
-			g_error_free (error);
-		}
+		tracker_miner_execute_update (miner,
+					      accumulator->str,
+					      NULL,
+					      init_mount_points_cb,
+					      NULL);
 	}
 
 	g_string_free (accumulator, TRUE);
 	g_hash_table_unref (volumes);
 }
 
+static void
+init_mount_points (TrackerMinerFiles *miner)
+{
+	g_debug ("Initializing mount points");
+
+	/* First, get all mounted volumes, according to tracker-store */
+	tracker_miner_execute_sparql (TRACKER_MINER (miner),
+				      "SELECT ?v WHERE { ?v a tracker:Volume ; tracker:isMounted true }",
+				      NULL,
+				      query_mount_points_cb,
+				      NULL);
+}
+
 #ifdef HAVE_HAL
 
 static void



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]