[tracker] TrackerMiner: Make SPARQL API fully asynchronous.
- From: Carlos Garnacho <carlosg src gnome org>
- To: svn-commits-list gnome org
- Cc:
- Subject: [tracker] TrackerMiner: Make SPARQL API fully asynchronous.
- Date: Mon, 2 Nov 2009 12:58:48 +0000 (UTC)
commit 8f9cd2ddb26087950d6a5f83070532663d94aaff
Author: Carlos Garnacho <carlos lanedo com>
Date: Fri Oct 30 16:53:15 2009 +0100
TrackerMiner: Make SPARQL API fully asynchronous.
Callers have been modified in TrackerMinerFS and TrackerMinerFiles.
src/libtracker-miner/tracker-miner-fs.c | 271 ++++++++++++++++-----
src/libtracker-miner/tracker-miner.c | 361 +++++++++++++++++++++-------
src/libtracker-miner/tracker-miner.h | 41 +++-
src/tracker-miner-fs/tracker-miner-files.c | 101 +++++---
4 files changed, 570 insertions(+), 204 deletions(-)
---
diff --git a/src/libtracker-miner/tracker-miner-fs.c b/src/libtracker-miner/tracker-miner-fs.c
index ccd3ed0..7d7ef67 100644
--- a/src/libtracker-miner/tracker-miner-fs.c
+++ b/src/libtracker-miner/tracker-miner-fs.c
@@ -57,6 +57,19 @@ typedef struct {
TrackerSparqlBuilder *builder;
} ProcessData;
+typedef struct {
+ GMainLoop *main_loop;
+ gboolean value;
+} SparqlQueryData;
+
+typedef struct {
+ GMainLoop *main_loop;
+ gint level;
+ GString *sparql;
+ const gchar *source_uri;
+ const gchar *uri;
+} RecursiveMoveData;
+
struct TrackerMinerFSPrivate {
TrackerMonitor *monitor;
TrackerCrawler *crawler;
@@ -192,6 +205,10 @@ static void crawl_directories_stop (TrackerMinerFS *fs);
static void item_queue_handlers_set_up (TrackerMinerFS *fs);
+static void item_update_uri_recursively (TrackerMinerFS *fs,
+ RecursiveMoveData *data,
+ const gchar *source_uri,
+ const gchar *uri);
static guint signals[LAST_SIGNAL] = { 0, };
@@ -436,10 +453,16 @@ process_data_new (GFile *file,
{
ProcessData *data;
- data = g_slice_new (ProcessData);
+ data = g_slice_new0 (ProcessData);
data->file = g_object_ref (file);
- data->cancellable = g_object_ref (cancellable);
- data->builder = g_object_ref (builder);
+
+ if (cancellable) {
+ data->cancellable = g_object_ref (cancellable);
+ }
+
+ if (builder) {
+ data->builder = g_object_ref (builder);
+ }
return data;
}
@@ -448,8 +471,15 @@ static void
process_data_free (ProcessData *data)
{
g_object_unref (data->file);
- g_object_unref (data->cancellable);
- g_object_unref (data->builder);
+
+ if (data->cancellable) {
+ g_object_unref (data->cancellable);
+ }
+
+ if (data->builder) {
+ g_object_unref (data->builder);
+ }
+
g_slice_free (ProcessData, data);
}
@@ -689,12 +719,22 @@ process_print_stats (TrackerMinerFS *fs)
}
static void
+commit_cb (TrackerMiner *miner,
+ const GError *error,
+ gpointer user_data)
+{
+ if (error) {
+ g_critical ("Could not commit: %s", error->message);
+ }
+}
+
+static void
process_stop (TrackerMinerFS *fs)
{
/* Now we have finished crawling, print stats and enable monitor events */
process_print_stats (fs);
- tracker_miner_commit (TRACKER_MINER (fs));
+ tracker_miner_commit (TRACKER_MINER (fs), NULL, commit_cb, NULL);
g_message ("Idle");
@@ -745,6 +785,48 @@ item_moved_data_free (ItemMovedData *data)
}
static void
+sparql_update_cb (TrackerMiner *miner,
+ const GError *error,
+ gpointer user_data)
+{
+ TrackerMinerFS *fs;
+ TrackerMinerFSPrivate *priv;
+ ProcessData *data;
+
+ fs = TRACKER_MINER_FS (miner);
+ priv = fs->private;
+ data = user_data;
+
+ if (error) {
+ g_critical ("Could not execute sparql: %s", error->message);
+ } else {
+ if (fs->private->been_crawled) {
+ /* Only commit immediately for
+ * changes after initial crawling.
+ */
+ tracker_miner_commit (TRACKER_MINER (fs), NULL, commit_cb, NULL);
+ }
+ }
+
+ priv->processing_pool = g_list_remove (priv->processing_pool, data);
+ process_data_free (data);
+
+ item_queue_handlers_set_up (fs);
+}
+
+static void
+sparql_query_cb (TrackerMiner *miner,
+ GPtrArray *result,
+ const GError *error,
+ gpointer user_data)
+{
+ SparqlQueryData *data = user_data;
+
+ data->value = result && result->len == 1;
+ g_main_loop_quit (data->main_loop);
+}
+
+static void
item_add_or_update_cb (TrackerMinerFS *fs,
ProcessData *data,
const GError *error)
@@ -755,6 +837,12 @@ item_add_or_update_cb (TrackerMinerFS *fs,
if (error) {
g_message ("Could not process '%s': %s", uri, error->message);
+
+ fs->private->processing_pool =
+ g_list_remove (fs->private->processing_pool, data);
+ process_data_free (data);
+
+ item_queue_handlers_set_up (fs);
} else {
gchar *full_sparql;
@@ -763,24 +851,15 @@ item_add_or_update_cb (TrackerMinerFS *fs,
full_sparql = g_strdup_printf ("DROP GRAPH <%s> %s",
uri, tracker_sparql_builder_get_result (data->builder));
- tracker_miner_execute_batch_update (TRACKER_MINER (fs), full_sparql, NULL);
+ tracker_miner_execute_batch_update (TRACKER_MINER (fs),
+ full_sparql,
+ NULL,
+ sparql_update_cb,
+ data);
g_free (full_sparql);
-
- if (fs->private->been_crawled) {
- /* Only commit immediately for
- * changes after initial crawling.
- */
- tracker_miner_commit (TRACKER_MINER (fs));
- }
}
- fs->private->processing_pool =
- g_list_remove (fs->private->processing_pool, data);
- process_data_free (data);
-
g_free (uri);
-
- item_queue_handlers_set_up (fs);
}
static gboolean
@@ -849,29 +928,39 @@ item_query_exists (TrackerMinerFS *miner,
{
gboolean result;
gchar *sparql, *uri;
- GPtrArray *sparql_result;
+ SparqlQueryData data;
uri = g_file_get_uri (file);
sparql = g_strdup_printf ("SELECT ?s WHERE { ?s a rdfs:Resource . FILTER (?s = <%s>) }",
uri);
- sparql_result = tracker_miner_execute_sparql (TRACKER_MINER (miner), sparql, NULL);
+ data.main_loop = g_main_loop_new (NULL, FALSE);
+ data.value = FALSE;
+
+ tracker_miner_execute_sparql (TRACKER_MINER (miner),
+ sparql,
+ NULL,
+ sparql_query_cb,
+ &data);
- result = (sparql_result && sparql_result->len == 1);
+ g_main_loop_run (data.main_loop);
+ result = data.value;
+
+ g_main_loop_unref (data.main_loop);
- tracker_dbus_results_ptr_array_free (&sparql_result);
g_free (sparql);
g_free (uri);
return result;
}
-static void
+static gboolean
item_remove (TrackerMinerFS *fs,
GFile *file)
{
GString *sparql;
gchar *uri, *slash_uri;
+ ProcessData *data;
uri = g_file_get_uri (file);
@@ -880,7 +969,7 @@ item_remove (TrackerMinerFS *fs,
if (!item_query_exists (fs, file)) {
g_debug (" File does not exist anyway (uri:'%s')", uri);
- return;
+ return TRUE;
}
if (!g_str_has_suffix (uri, "/")) {
@@ -902,59 +991,84 @@ item_remove (TrackerMinerFS *fs,
"DELETE { <%s> a rdfs:Resource }",
uri);
- tracker_miner_execute_batch_update (TRACKER_MINER (fs), sparql->str, NULL);
+ data = process_data_new (file, NULL, NULL);
+ fs->private->processing_pool = g_list_prepend (fs->private->processing_pool, data);
+
+ tracker_miner_execute_batch_update (TRACKER_MINER (fs),
+ sparql->str,
+ NULL,
+ sparql_update_cb,
+ data);
g_string_free (sparql, TRUE);
g_free (slash_uri);
g_free (uri);
+
+ return FALSE;
}
static void
-item_update_uri_recursively (TrackerMinerFS *fs,
- GString *sparql_update,
- const gchar *source_uri,
- const gchar *uri)
+item_update_uri_recursively_cb (TrackerMiner *miner,
+ GPtrArray *result,
+ const GError *error,
+ gpointer user_data)
{
- gchar *sparql;
- GPtrArray *result_set;
+ TrackerMinerFS *fs = TRACKER_MINER_FS (miner);
+ RecursiveMoveData *data = user_data;
- g_debug ("Moving item from '%s' to '%s'",
- source_uri,
- uri);
-
- g_string_append_printf (sparql_update, " <%s> tracker:uri <%s> .", source_uri, uri);
-
- sparql = g_strdup_printf ("SELECT ?child WHERE { ?child nfo:belongsToContainer <%s> }", source_uri);
- result_set = tracker_miner_execute_sparql (TRACKER_MINER (fs), sparql, NULL);
- g_free (sparql);
-
- if (result_set) {
+ if (result) {
gint i;
- for (i = 0; i < result_set->len; i++) {
+ for (i = 0; i < result->len; i++) {
gchar **child_source_uri, *child_uri;
- child_source_uri = g_ptr_array_index (result_set, i);
+ child_source_uri = g_ptr_array_index (result, i);
- if (!g_str_has_prefix (*child_source_uri, source_uri)) {
+ if (!g_str_has_prefix (*child_source_uri, data->source_uri)) {
g_warning ("Child URI '%s' does not start with parent URI '%s'",
*child_source_uri,
- source_uri);
+ data->source_uri);
continue;
}
- child_uri = g_strdup_printf ("%s%s", uri, *child_source_uri + strlen (source_uri));
+ child_uri = g_strdup_printf ("%s%s", data->uri, *child_source_uri + strlen (data->source_uri));
- item_update_uri_recursively (fs, sparql_update, *child_source_uri, child_uri);
+ item_update_uri_recursively (fs, data, *child_source_uri, child_uri);
- g_free (child_source_uri);
g_free (child_uri);
}
+ }
- g_ptr_array_free (result_set, TRUE);
+ data->level--;
+
+ g_assert (data->level >= 0);
+
+ if (data->level == 0) {
+ g_main_loop_quit (data->main_loop);
}
}
+static void
+item_update_uri_recursively (TrackerMinerFS *fs,
+ RecursiveMoveData *move_data,
+ const gchar *source_uri,
+ const gchar *uri)
+{
+ gchar *sparql;
+
+ move_data->level++;
+
+ g_string_append_printf (move_data->sparql, " <%s> tracker:uri <%s> .", source_uri, uri);
+
+ sparql = g_strdup_printf ("SELECT ?child WHERE { ?child nfo:belongsToContainer <%s> }", source_uri);
+ tracker_miner_execute_sparql (TRACKER_MINER (fs),
+ sparql,
+ NULL,
+ item_update_uri_recursively_cb,
+ move_data);
+ g_free (sparql);
+}
+
static gboolean
item_move (TrackerMinerFS *fs,
GFile *file,
@@ -963,6 +1077,8 @@ item_move (TrackerMinerFS *fs,
gchar *uri, *source_uri, *escaped_filename;
GFileInfo *file_info;
GString *sparql;
+ RecursiveMoveData move_data;
+ ProcessData *data;
uri = g_file_get_uri (file);
source_uri = g_file_get_uri (source_file);
@@ -987,15 +1103,21 @@ item_move (TrackerMinerFS *fs,
NULL, NULL);
if (!file_info) {
+ gboolean retval;
+
/* Destination file has gone away, ignore dest file and remove source if any */
- item_remove (fs, source_file);
+ retval = item_remove (fs, source_file);
g_free (source_uri);
g_free (uri);
- return TRUE;
+ return retval;
}
+ g_debug ("Moving item from '%s' to '%s'",
+ source_uri,
+ uri);
+
sparql = g_string_new ("");
g_string_append_printf (sparql,
@@ -1008,11 +1130,28 @@ item_move (TrackerMinerFS *fs,
g_string_append_printf (sparql, " <%s> nfo:fileName \"%s\" .", source_uri, escaped_filename);
- item_update_uri_recursively (fs, sparql, source_uri, uri);
+ move_data.main_loop = g_main_loop_new (NULL, FALSE);
+ move_data.level = 0;
+ move_data.sparql = sparql;
+ move_data.source_uri = source_uri;
+ move_data.uri = uri;
+
+ item_update_uri_recursively (fs, &move_data, source_uri, uri);
+
+ g_main_loop_run (move_data.main_loop);
+
+ g_main_loop_unref (move_data.main_loop);
g_string_append (sparql, " }");
- tracker_miner_execute_batch_update (TRACKER_MINER (fs), sparql->str, NULL);
+ data = process_data_new (file, NULL, NULL);
+ fs->private->processing_pool = g_list_prepend (fs->private->processing_pool, data);
+
+ tracker_miner_execute_batch_update (TRACKER_MINER (fs),
+ sparql->str,
+ NULL,
+ sparql_update_cb,
+ data);
g_free (uri);
g_free (source_uri);
@@ -1136,7 +1275,7 @@ item_queue_handlers_cb (gpointer user_data)
keep_processing = item_move (fs, file, source_file);
break;
case QUEUE_DELETED:
- item_remove (fs, file);
+ keep_processing = item_remove (fs, file);
break;
case QUEUE_CREATED:
case QUEUE_UPDATED:
@@ -1162,7 +1301,7 @@ item_queue_handlers_cb (gpointer user_data)
/* Only commit immediately for
* changes after initial crawling.
*/
- tracker_miner_commit (TRACKER_MINER (fs));
+ tracker_miner_commit (TRACKER_MINER (fs), NULL, commit_cb, NULL);
}
return TRUE;
@@ -1224,12 +1363,12 @@ should_change_index_for_file (TrackerMinerFS *fs,
GFile *file)
{
gboolean uptodate;
- GPtrArray *sparql_result;
GFileInfo *file_info;
guint64 time;
time_t mtime;
struct tm t;
gchar *query, *uri;
+ SparqlQueryData data;
file_info = g_file_query_info (file,
G_FILE_ATTRIBUTE_TIME_MODIFIED,
@@ -1259,12 +1398,20 @@ should_change_index_for_file (TrackerMinerFS *fs,
t.tm_min,
t.tm_sec,
uri);
- sparql_result = tracker_miner_execute_sparql (TRACKER_MINER (fs), query, NULL);
- uptodate = sparql_result && sparql_result->len == 1;
+ data.main_loop = g_main_loop_new (NULL, FALSE);
+ data.value = FALSE;
+
+ tracker_miner_execute_sparql (TRACKER_MINER (fs),
+ query,
+ NULL,
+ sparql_query_cb,
+ &data);
- tracker_dbus_results_ptr_array_free (&sparql_result);
+ g_main_loop_run (data.main_loop);
+ uptodate = data.value;
+ g_main_loop_unref (data.main_loop);
g_free (query);
g_free (uri);
diff --git a/src/libtracker-miner/tracker-miner.c b/src/libtracker-miner/tracker-miner.c
index eec2d56..3c6f43c 100644
--- a/src/libtracker-miner/tracker-miner.c
+++ b/src/libtracker-miner/tracker-miner.c
@@ -44,6 +44,8 @@
#define TRACKER_MINER_GET_PRIVATE(o) (G_TYPE_INSTANCE_GET_PRIVATE ((o), TRACKER_TYPE_MINER, TrackerMinerPrivate))
+static GQuark miner_error_quark = 0;
+
struct TrackerMinerPrivate {
TrackerClient *client;
@@ -56,6 +58,8 @@ struct TrackerMinerPrivate {
gdouble progress;
gint availability_cookie;
+
+ GPtrArray *async_calls;
};
typedef struct {
@@ -70,6 +74,17 @@ typedef struct {
gchar *reason;
} PauseData;
+typedef struct {
+ TrackerMiner *miner;
+ GCancellable *cancellable;
+ gpointer callback;
+ gpointer user_data;
+
+ guint id;
+ guint signal_id;
+ gboolean update;
+} AsyncCallData;
+
enum {
PROP_0,
PROP_NAME,
@@ -109,6 +124,12 @@ static void pause_data_destroy (gpointer data);
static PauseData *pause_data_new (const gchar *application,
const gchar *reason);
+static void async_call_notify_error (AsyncCallData *data,
+ gint code,
+ const gchar *message);
+static void async_call_data_destroy (AsyncCallData *data);
+
+
G_DEFINE_ABSTRACT_TYPE (TrackerMiner, tracker_miner, G_TYPE_OBJECT)
static void
@@ -250,6 +271,8 @@ tracker_miner_class_init (TrackerMinerClass *klass)
G_PARAM_READWRITE));
g_type_class_add_private (object_class, sizeof (TrackerMinerPrivate));
+
+ miner_error_quark = g_quark_from_static_string ("TrackerMiner");
}
static void
@@ -266,6 +289,7 @@ tracker_miner_init (TrackerMiner *miner)
g_direct_equal,
NULL,
pause_data_destroy);
+ priv->async_calls = g_ptr_array_new ();
}
static void
@@ -350,6 +374,14 @@ miner_get_property (GObject *object,
}
static void
+async_call_finalize_foreach (AsyncCallData *data,
+ gpointer user_data)
+{
+ async_call_notify_error (data, 0, "Miner is being finalized");
+ async_call_data_destroy (data);
+}
+
+static void
miner_finalize (GObject *object)
{
TrackerMiner *miner = TRACKER_MINER (object);
@@ -367,6 +399,11 @@ miner_finalize (GObject *object)
g_hash_table_unref (miner->private->pauses);
+ g_ptr_array_foreach (miner->private->async_calls,
+ (GFunc) async_call_finalize_foreach,
+ object);
+ g_ptr_array_free (miner->private->async_calls, TRUE);
+
G_OBJECT_CLASS (tracker_miner_parent_class)->finalize (object);
}
@@ -688,151 +725,293 @@ tracker_miner_is_started (TrackerMiner *miner)
return miner->private->started;
}
+static void
+async_call_data_destroy (AsyncCallData *data)
+{
+ TrackerMiner *miner = data->miner;
+
+ if (data->cancellable) {
+ if (data->signal_id) {
+ g_signal_handler_disconnect (data->cancellable, data->signal_id);
+ }
+
+ g_object_unref (data->cancellable);
+ }
+
+ if (data->id != 0) {
+ tracker_cancel_call (miner->private->client, data->id);
+ data->id = 0;
+ }
+
+ g_slice_free (AsyncCallData, data);
+}
+
+static void
+run_update_callback (AsyncCallData *data,
+ const GError *error)
+{
+ TrackerMinerUpdateCallback callback;
+
+ callback = data->callback;
+
+ if (callback) {
+ (callback) (data->miner, error, data->user_data);
+ }
+}
+
+static void
+run_query_callback (AsyncCallData *data,
+ GPtrArray *result,
+ const GError *error)
+{
+ TrackerMinerQueryCallback callback;
+
+ callback = data->callback;
+
+ if (callback) {
+ (callback) (data->miner, result, error, data->user_data);
+ }
+}
+
+static void
+sparql_update_cb (GError *error,
+ gpointer user_data)
+{
+ AsyncCallData *data = user_data;
+
+ run_update_callback (data, error);
+
+ async_call_data_destroy (data);
+}
+
+static void
+sparql_query_cb (GPtrArray *result,
+ GError *error,
+ gpointer user_data)
+{
+ AsyncCallData *data = user_data;
+
+ run_query_callback (data, result, error);
+
+ if (result) {
+ tracker_dbus_results_ptr_array_free (&result);
+ }
+
+ async_call_data_destroy (data);
+}
+
+static void
+sparql_cancelled_cb (GCancellable *cancellable,
+ AsyncCallData *data)
+{
+ TrackerMinerPrivate *priv;
+
+ async_call_notify_error (data, 0, "SPARQL operation was cancelled");
+
+ priv = TRACKER_MINER_GET_PRIVATE (data->miner);
+
+ g_ptr_array_remove (priv->async_calls, data);
+ async_call_data_destroy (data);
+}
+
+static AsyncCallData *
+async_call_data_new (TrackerMiner *miner,
+ GCancellable *cancellable,
+ gpointer callback,
+ gpointer user_data,
+ gboolean update)
+{
+ AsyncCallData *data;
+
+ data = g_slice_new0 (AsyncCallData);
+ data->miner = miner;
+ data->callback = callback;
+ data->user_data = user_data;
+ data->update = update;
+
+ if (cancellable) {
+ data->cancellable = g_object_ref (cancellable);
+
+ data->signal_id = g_signal_connect (cancellable, "cancelled",
+ G_CALLBACK (sparql_cancelled_cb), data);
+ }
+
+ g_ptr_array_add (miner->private->async_calls, data);
+
+ return data;
+}
+
+static void
+async_call_notify_error (AsyncCallData *data,
+ gint code,
+ const gchar *message)
+{
+ TrackerMiner *miner;
+ GError *error;
+
+ miner = data->miner;
+
+ if (data->id != 0) {
+ tracker_cancel_call (miner->private->client, data->id);
+ data->id = 0;
+ }
+
+ if (data->callback) {
+ error = g_error_new_literal (miner_error_quark, code, message);
+
+ if (data->update) {
+ run_update_callback (data, error);
+ } else {
+ run_query_callback (data, NULL, error);
+ }
+
+ g_error_free (error);
+ }
+}
+
/**
* tracker_miner_execute_update:
* @miner: a #TrackerMiner
* @sparql: a SPARQL query
- * @error: return location for errors
+ * @cancellable: a #GCancellable to control the operation
+ * @callback: a #TrackerMinerUpdateCallback to call when the operation is finished
+ * @user_data: data to pass to @callback
*
* Executes an update SPARQL query on tracker-store, use this
* whenever you want to perform data insertions or modifications.
*
- * Returns: #TRUE if the SPARQL query was executed successfully.
+ * When the operation is finished, @callback will be called, providing the error, if any.
+ * If the operation is cancelled, @callback will be called anyways, with error filled in.
**/
-gboolean
-tracker_miner_execute_update (TrackerMiner *miner,
- const gchar *sparql,
- GError **error)
+void
+tracker_miner_execute_update (TrackerMiner *miner,
+ const gchar *sparql,
+ GCancellable *cancellable,
+ TrackerMinerUpdateCallback callback,
+ gpointer user_data)
{
- GError *internal_error = NULL;
-
- g_return_val_if_fail (TRACKER_IS_MINER (miner), FALSE);
-
- tracker_resources_sparql_update (miner->private->client,
- sparql,
- &internal_error);
+ TrackerMinerPrivate *priv;
+ AsyncCallData *data;
- if (!internal_error) {
- return TRUE;
- }
+ g_return_if_fail (TRACKER_IS_MINER (miner));
+ g_return_if_fail (sparql != NULL);
+ g_return_if_fail (!cancellable || G_IS_CANCELLABLE (cancellable));
- if (error) {
- g_propagate_error (error, internal_error);
- } else {
- g_warning ("Error running sparql queries: %s", internal_error->message);
- g_error_free (internal_error);
- }
+ priv = TRACKER_MINER_GET_PRIVATE (miner);
+ data = async_call_data_new (miner, cancellable, callback, user_data, TRUE);
- return FALSE;
+ data->id = tracker_resources_sparql_update_async (miner->private->client,
+ sparql, sparql_update_cb,
+ data);
}
/**
* tracker_miner_execute_sparql:
* @miner: a #TrackerMiner
* @sparql: a SPARQL query
- * @error: return location for errors
+ * @cancellable: a #GCancellable to control the operation
+ * @callback: a #TrackerMinerQueryCallback to call when the operation is finished
+ * @user_data: data to pass to @callback
*
- * Executes the SPARQL query on tracker-store and returns the
- * queried data. Use this whenever you need to get data from
+ * Executes the SPARQL query on tracker-store and returns asynchronously
+ * the queried data. Use this whenever you need to get data from
* already stored information.
*
- * Returns: a #GPtrArray with the returned data.
+ * When the operation is finished, @callback will be called, providing the queried data,
+ * or the error, if any. If the operation is cancelled, @callback will be called with the
+ * error parameter filled in.
**/
-GPtrArray *
-tracker_miner_execute_sparql (TrackerMiner *miner,
- const gchar *sparql,
- GError **error)
+void
+tracker_miner_execute_sparql (TrackerMiner *miner,
+ const gchar *sparql,
+ GCancellable *cancellable,
+ TrackerMinerQueryCallback callback,
+ gpointer user_data)
{
- GError *internal_error = NULL;
- GPtrArray *res;
-
- g_return_val_if_fail (TRACKER_IS_MINER (miner), FALSE);
-
- res = tracker_resources_sparql_query (miner->private->client,
- sparql,
- &internal_error);
+ TrackerMinerPrivate *priv;
+ AsyncCallData *data;
- if (!internal_error) {
- return res;
- }
+ g_return_if_fail (TRACKER_IS_MINER (miner));
+ g_return_if_fail (sparql != NULL);
+ g_return_if_fail (!cancellable || G_IS_CANCELLABLE (cancellable));
+ g_return_if_fail (callback != NULL);
- if (error) {
- g_propagate_error (error, internal_error);
- } else {
- g_warning ("Error running sparql queries: %s", internal_error->message);
- g_error_free (internal_error);
- }
+ priv = TRACKER_MINER_GET_PRIVATE (miner);
+ data = async_call_data_new (miner, cancellable, callback, user_data, FALSE);
- return res;
+ data->id = tracker_resources_sparql_query_async (miner->private->client,
+ sparql, sparql_query_cb,
+ data);
}
/**
* tracker_miner_execute_batch_update:
* @miner: a #TrackerMiner
* @sparql: a set of SPARQL updates
- * @error: return location for errors
+ * @cancellable: a #GCancellable to control the operation
+ * @callback: a #TrackerMinerUpdateCallback to call when the operation is finished
+ * @user_data: data to pass to @callback
*
* Executes a batch of update SPARQL queries on tracker-store, use this
* whenever you want to perform data insertions or modifications in
* batches.
*
- * Returns: #TRUE if the SPARQL query was executed successfully.
+ * When the operation is finished, @callback will be called, providing the error, if any.
+ * If the operation is cancelled, @callback will be called anyways, with error filled in.
**/
-gboolean
-tracker_miner_execute_batch_update (TrackerMiner *miner,
- const gchar *sparql,
- GError **error)
+void
+tracker_miner_execute_batch_update (TrackerMiner *miner,
+ const gchar *sparql,
+ GCancellable *cancellable,
+ TrackerMinerUpdateCallback callback,
+ gpointer user_data)
{
- GError *internal_error = NULL;
-
- g_return_val_if_fail (TRACKER_IS_MINER (miner), FALSE);
+ TrackerMinerPrivate *priv;
+ AsyncCallData *data;
- tracker_resources_batch_sparql_update (miner->private->client,
- sparql,
- &internal_error);
- if (!internal_error) {
- return TRUE;
- }
+ g_return_if_fail (TRACKER_IS_MINER (miner));
+ g_return_if_fail (sparql != NULL);
+ g_return_if_fail (!cancellable || G_IS_CANCELLABLE (cancellable));
- if (error) {
- g_propagate_error (error, internal_error);
- } else {
- g_warning ("Error running sparql queries: %s", internal_error->message);
- g_error_free (internal_error);
- }
+ priv = TRACKER_MINER_GET_PRIVATE (miner);
+ data = async_call_data_new (miner, cancellable, callback, user_data, TRUE);
- return FALSE;
+ data->id = tracker_resources_batch_sparql_update_async (miner->private->client,
+ sparql, sparql_update_cb,
+ data);
}
/**
* tracker_miner_commit:
* @miner: a #TrackerMiner
+ * @cancellable: a #GCancellable to control the operation
+ * @callback: a #TrackerMinerUpdateCallback to call when the operation is finished
+ * @user_data: data to pass to @callback
*
- * Commits all pending batch updates. see tracker_miner_execute_batch_update()
- *
- * Returns: #TRUE if the data was committed successfully.
+ * Commits all pending batch updates. see tracker_miner_execute_batch_update(). When
+ * the operation is finished, @callback will be called, with the error parameter filled
+ * in, if any. If the operation is cancelled through @cancellable, the callback will be
+ * called anyways with error filled in.
**/
-gboolean
-tracker_miner_commit (TrackerMiner *miner)
-{
- GError *error = NULL;
-
- g_return_val_if_fail (TRACKER_IS_MINER (miner), FALSE);
+void
+tracker_miner_commit (TrackerMiner *miner,
+ GCancellable *cancellable,
+ TrackerMinerUpdateCallback callback,
+ gpointer user_data)
- if (g_hash_table_size (miner->private->pauses) > 0) {
- g_warning ("Can not commit while miner is paused");
- return FALSE;
- }
+{
+ TrackerMinerPrivate *priv;
+ AsyncCallData *data;
- tracker_resources_batch_commit (miner->private->client, &error);
+ g_return_if_fail (TRACKER_IS_MINER (miner));
+ g_return_if_fail (!cancellable || G_IS_CANCELLABLE (cancellable));
- if (error) {
- g_critical ("Could not commit: %s", error->message);
- g_error_free (error);
- return FALSE;
- }
+ priv = TRACKER_MINER_GET_PRIVATE (miner);
+ data = async_call_data_new (miner, cancellable, callback, user_data, TRUE);
- return TRUE;
+ data->id = tracker_resources_batch_commit_async (miner->private->client,
+ sparql_update_cb,
+ data);
}
static gint
diff --git a/src/libtracker-miner/tracker-miner.h b/src/libtracker-miner/tracker-miner.h
index 3406947..af046e4 100644
--- a/src/libtracker-miner/tracker-miner.h
+++ b/src/libtracker-miner/tracker-miner.h
@@ -22,6 +22,7 @@
#define __LIBTRACKERMINER_MINER_H__
#include <glib-object.h>
+#include <gio/gio.h>
#include <libtracker-client/tracker.h>
G_BEGIN_DECLS
@@ -82,6 +83,14 @@ typedef struct {
GError *error);
} TrackerMinerClass;
+typedef void (* TrackerMinerUpdateCallback) (TrackerMiner *miner,
+ const GError *error,
+ gpointer user_data);
+typedef void (* TrackerMinerQueryCallback) (TrackerMiner *miner,
+ GPtrArray *result,
+ const GError *error,
+ gpointer user_data);
+
GType tracker_miner_get_type (void) G_GNUC_CONST;
GQuark tracker_miner_error_quark (void);
@@ -90,18 +99,6 @@ void tracker_miner_stop (TrackerMiner *miner);
gboolean tracker_miner_is_started (TrackerMiner *miner);
-gboolean tracker_miner_execute_update (TrackerMiner *miner,
- const gchar *sparql,
- GError **error);
-GPtrArray * tracker_miner_execute_sparql (TrackerMiner *miner,
- const gchar *sparql,
- GError **error);
-gboolean tracker_miner_execute_batch_update
- (TrackerMiner *miner,
- const gchar *sparql,
- GError **error);
-gboolean tracker_miner_commit (TrackerMiner *miner);
-
gint tracker_miner_pause (TrackerMiner *miner,
const gchar *reason,
GError **error);
@@ -109,6 +106,26 @@ gboolean tracker_miner_resume (TrackerMiner *miner,
gint cookie,
GError **error);
+void tracker_miner_execute_update (TrackerMiner *miner,
+ const gchar *sparql,
+ GCancellable *cancellable,
+ TrackerMinerUpdateCallback callback,
+ gpointer user_data);
+void tracker_miner_execute_sparql (TrackerMiner *miner,
+ const gchar *sparql,
+ GCancellable *cancellable,
+ TrackerMinerQueryCallback callback,
+ gpointer user_data);
+void tracker_miner_execute_batch_update (TrackerMiner *miner,
+ const gchar *sparql,
+ GCancellable *cancellable,
+ TrackerMinerUpdateCallback callback,
+ gpointer user_data);
+void tracker_miner_commit (TrackerMiner *miner,
+ GCancellable *cancellable,
+ TrackerMinerUpdateCallback callback,
+ gpointer user_data);
+
G_END_DECLS
diff --git a/src/tracker-miner-fs/tracker-miner-files.c b/src/tracker-miner-fs/tracker-miner-files.c
index 6b9ab4c..9f60d08 100644
--- a/src/tracker-miner-fs/tracker-miner-files.c
+++ b/src/tracker-miner-fs/tracker-miner-files.c
@@ -430,6 +430,21 @@ miner_files_constructed (GObject *object)
}
static void
+set_up_mount_point_cb (TrackerMiner *miner,
+ const GError *error,
+ gpointer user_data)
+{
+ gchar *removable_device_urn = user_data;
+
+ if (error) {
+ g_critical ("Could not set up mount point '%s': %s",
+ removable_device_urn, error->message);
+ }
+
+ g_free (removable_device_urn);
+}
+
+static void
set_up_mount_point (TrackerMinerFiles *miner,
const gchar *removable_device_urn,
const gchar *mount_point,
@@ -437,7 +452,6 @@ set_up_mount_point (TrackerMinerFiles *miner,
GString *accumulator)
{
GString *queries;
- GError *error = NULL;
g_debug ("Setting up mount point '%s'", removable_device_urn);
@@ -500,58 +514,59 @@ set_up_mount_point (TrackerMinerFiles *miner,
if (accumulator) {
g_string_append_printf (accumulator, "%s ", queries->str);
} else {
-
- tracker_miner_execute_update (TRACKER_MINER (miner), queries->str, &error);
-
- if (error) {
- g_critical ("Could not set up mount point '%s': %s",
- removable_device_urn, error->message);
- g_error_free (error);
- }
+ tracker_miner_execute_update (TRACKER_MINER (miner),
+ queries->str,
+ NULL,
+ set_up_mount_point_cb,
+ g_strdup (removable_device_urn));
}
g_string_free (queries, TRUE);
}
static void
-init_mount_points (TrackerMinerFiles *miner)
+init_mount_points_cb (TrackerMiner *miner,
+ const GError *error,
+ gpointer user_data)
+{
+ if (error) {
+ g_critical ("Could not initialize currently active mount points: %s",
+ error->message);
+ }
+}
+
+static void
+query_mount_points_cb (TrackerMiner *miner,
+ GPtrArray *result,
+ const GError *error,
+ gpointer user_data)
{
TrackerMinerFilesPrivate *priv;
GHashTable *volumes;
GHashTableIter iter;
gpointer key, value;
- GPtrArray *sparql_result;
- GError *error = NULL;
GString *accumulator;
gint i;
#ifdef HAVE_HAL
GSList *udis, *u;
#endif
- priv = TRACKER_MINER_FILES_GET_PRIVATE (miner);
+ if (error) {
+ g_critical ("Could not obtain the mounted volumes");
+ return;
+ }
- g_debug ("Initializing mount points");
+ priv = TRACKER_MINER_FILES_GET_PRIVATE (miner);
volumes = g_hash_table_new_full (g_str_hash, g_str_equal,
(GDestroyNotify) g_free,
NULL);
- /* First, get all mounted volumes, according to tracker-store */
- sparql_result = tracker_miner_execute_sparql (TRACKER_MINER (miner),
- "SELECT ?v WHERE { ?v a tracker:Volume ; tracker:isMounted true }",
- &error);
-
- if (error) {
- g_critical ("Could not obtain the mounted volumes");
- g_error_free (error);
- return;
- }
-
- for (i = 0; i < sparql_result->len; i++) {
+ for (i = 0; i < result->len; i++) {
gchar **row;
gint state;
- row = g_ptr_array_index (sparql_result, i);
+ row = g_ptr_array_index (result, i);
state = VOLUME_MOUNTED_IN_STORE;
if (strcmp (row[0], TRACKER_NON_REMOVABLE_MEDIA_DATASOURCE_URN) == 0) {
@@ -562,9 +577,6 @@ init_mount_points (TrackerMinerFiles *miner)
g_hash_table_insert (volumes, g_strdup (row[0]), GINT_TO_POINTER (state));
}
- g_ptr_array_foreach (sparql_result, (GFunc) g_strfreev, NULL);
- g_ptr_array_free (sparql_result, TRUE);
-
g_hash_table_replace (volumes, g_strdup (TRACKER_NON_REMOVABLE_MEDIA_DATASOURCE_URN),
GINT_TO_POINTER (VOLUME_MOUNTED));
@@ -613,28 +625,39 @@ init_mount_points (TrackerMinerFiles *miner)
g_debug ("URN '%s' (mount point: %s) was not reported to be mounted, but now it is, updating state",
mount_point, urn);
- set_up_mount_point (miner, urn, mount_point, TRUE, accumulator);
+ set_up_mount_point (TRACKER_MINER_FILES (miner), urn, mount_point, TRUE, accumulator);
} else if (!(state & VOLUME_MOUNTED) &&
(state & VOLUME_MOUNTED_IN_STORE)) {
g_debug ("URN '%s' was reported to be mounted, but it isn't anymore, updating state", urn);
- set_up_mount_point (miner, urn, NULL, FALSE, accumulator);
+ set_up_mount_point (TRACKER_MINER_FILES (miner), urn, NULL, FALSE, accumulator);
}
}
if (accumulator->str[0] != '\0') {
- tracker_miner_execute_update (TRACKER_MINER (miner), accumulator->str, &error);
-
- if (error) {
- g_critical ("Could not initialize currently active mount points: %s",
- error->message);
- g_error_free (error);
- }
+ tracker_miner_execute_update (miner,
+ accumulator->str,
+ NULL,
+ init_mount_points_cb,
+ NULL);
}
g_string_free (accumulator, TRUE);
g_hash_table_unref (volumes);
}
+static void
+init_mount_points (TrackerMinerFiles *miner)
+{
+ g_debug ("Initializing mount points");
+
+ /* First, get all mounted volumes, according to tracker-store */
+ tracker_miner_execute_sparql (TRACKER_MINER (miner),
+ "SELECT ?v WHERE { ?v a tracker:Volume ; tracker:isMounted true }",
+ NULL,
+ query_mount_points_cb,
+ NULL);
+}
+
#ifdef HAVE_HAL
static void
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]