[tracker/multi-insert] libtracker-miner: Use new sparql_update_array API
- From: Aleksander Morgado <aleksm src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [tracker/multi-insert] libtracker-miner: Use new sparql_update_array API
- Date: Mon, 11 Oct 2010 15:57:06 +0000 (UTC)
commit 87497ecc527f4c731c0f7c10f88e8c41a7d1d6f0
Author: Aleksander Morgado <aleksander lanedo com>
Date: Mon Oct 11 16:21:08 2010 +0200
libtracker-miner: Use new sparql_update_array API
* Note. Right now, the max number of items to include in the same SPARQL
connection is equal to 'process-pool-limit', which defaults to 10 in
tracker-miner-files and to 1 in tracker-miner-applications.
src/libtracker-miner/tracker-miner-fs.c | 285 ++++++++++++++++++++++++++-----
1 files changed, 243 insertions(+), 42 deletions(-)
---
diff --git a/src/libtracker-miner/tracker-miner-fs.c b/src/libtracker-miner/tracker-miner-fs.c
index 47965b3..bef412c 100644
--- a/src/libtracker-miner/tracker-miner-fs.c
+++ b/src/libtracker-miner/tracker-miner-fs.c
@@ -38,6 +38,9 @@
/* If defined will print contents of populated mtime cache while running */
#undef PRINT_MTIME_CACHE_CONTENTS
+/* Maximum time (seconds) before forcing a sparql buffer flush */
+#define MAX_SPARQL_BUFFER_TIME 15
+
/**
* SECTION:tracker-miner-fs
* @short_description: Abstract base class for filesystem miners
@@ -66,6 +69,7 @@ typedef struct {
GFile *file;
gchar *urn;
gchar *parent_urn;
+ gchar *sparql;
GCancellable *cancellable;
TrackerSparqlBuilder *builder;
TrackerMiner *miner;
@@ -143,6 +147,11 @@ struct _TrackerMinerFSPrivate {
/* Files to check if no longer exist */
GHashTable *check_removed;
+ /* SPARQL buffer to pile up several UPDATEs */
+ GPtrArray *sparql_buffer;
+ GFile *sparql_buffer_current_parent;
+ time_t sparql_buffer_start_time;
+
/* Status */
guint been_started : 1;
guint been_crawled : 1;
@@ -661,6 +670,8 @@ process_data_free (ProcessData *data)
g_object_unref (data->builder);
}
+ g_free (data->sparql);
+
g_slice_free (ProcessData, data);
}
@@ -734,6 +745,14 @@ fs_finalize (GObject *object)
g_list_free (priv->config_directories);
}
+ if (priv->sparql_buffer_current_parent) {
+ g_object_unref (priv->sparql_buffer_current_parent);
+ }
+
+ if (priv->sparql_buffer) {
+ g_ptr_array_free (priv->sparql_buffer, TRUE);
+ }
+
g_queue_foreach (priv->crawled_directories, (GFunc) crawled_directory_data_free, NULL);
g_queue_free (priv->crawled_directories);
@@ -1059,6 +1078,10 @@ sparql_update_cb (GObject *object,
tracker_sparql_connection_update_finish (TRACKER_SPARQL_CONNECTION (object), result, &error);
+ /* If update was done when crawling finished, no data will be given */
+ if (!user_data)
+ return;
+
data = user_data;
fs = TRACKER_MINER_FS (data->miner);
priv = fs->private;
@@ -1509,12 +1532,144 @@ do_process_file (TrackerMinerFS *fs,
}
static void
+sparql_update_array_cb (GObject *object,
+ GAsyncResult *result,
+ gpointer user_data)
+{
+ TrackerMinerFS *fs = NULL;
+ GError *global_error = NULL;
+ gboolean batch_failed = FALSE;
+ GPtrArray *sparql_array_errors;
+ GPtrArray *sparql_array;
+ guint i;
+
+ /* Get arrays of errors and queries */
+ sparql_array = user_data;
+ sparql_array_errors = tracker_sparql_connection_update_array_finish (TRACKER_SPARQL_CONNECTION (object),
+ result,
+ &global_error);
+ if (global_error) {
+ g_critical ("Could not execute array-update with '%u' items: %s",
+ sparql_array->len,
+ global_error->message);
+ fs->private->total_files_notified_error += sparql_array->len;
+ g_error_free (global_error);
+ batch_failed = TRUE;
+ }
+
+ for (i = 0; i < sparql_array->len; i++) {
+ ProcessData *data;
+
+ data = g_ptr_array_index (sparql_array, i);
+ if (!fs) {
+ fs = TRACKER_MINER_FS (data->miner);
+ }
+
+ if (!batch_failed) {
+ GError *error;
+
+ error = g_ptr_array_index (sparql_array_errors, i);
+ if (error) {
+ g_critical ("Could not execute sparql (%s): %s",
+ data->sparql,
+ error->message);
+ fs->private->total_files_notified_error++;
+ } else {
+ if (fs->private->current_iri_cache_parent) {
+ GFile *parent;
+
+ /* Note: parent may be NULL if the file represents
+ * the root directory of the file system (applies to
+ * .gvfs mounts also!) */
+ parent = g_file_get_parent (data->file);
+
+ if (parent) {
+ if (g_file_equal (parent, fs->private->current_iri_cache_parent) &&
+ g_hash_table_lookup (fs->private->iri_cache, data->file) == NULL) {
+ /* Item is processed, add an empty element for the processed GFile,
+ * in case it is again processed before the cache expires
+ */
+ g_hash_table_insert (fs->private->iri_cache,
+ g_object_ref (data->file),
+ NULL);
+ }
+
+ g_object_unref (parent);
+ }
+ }
+ }
+ }
+
+ /* For each reply, remove it from the processing pool */
+ fs->private->processing_pool = g_list_remove (fs->private->processing_pool, data);
+ }
+
+ /* Unref the arrays of errors and queries */
+ if (sparql_array_errors)
+ g_ptr_array_unref (sparql_array_errors);
+ g_ptr_array_unref (sparql_array);
+
+ item_queue_handlers_set_up (fs);
+}
+
+static void
+sparql_buffer_flush (TrackerMinerFS *fs)
+{
+ TrackerMinerFSPrivate *priv;
+
+ priv = fs->private;
+
+ if (priv->sparql_buffer) {
+ guint i;
+ GPtrArray *sparql_array;
+
+ /* Loop buffer and construct array of strings */
+ sparql_array = g_ptr_array_new ();
+ for (i = 0; i < priv->sparql_buffer->len; i++) {
+ ProcessData *data;
+
+ data = g_ptr_array_index (priv->sparql_buffer, i);
+ /* Add original string, not a duplicate */
+ g_ptr_array_add (sparql_array, data->sparql);
+ }
+
+ g_debug ("(Sparql buffer) Flushing buffer with '%u' items",
+ priv->sparql_buffer->len);
+ tracker_sparql_connection_update_array_async (tracker_miner_get_connection (TRACKER_MINER (fs)),
+ (gchar **)(sparql_array->pdata),
+ sparql_array->len,
+ G_PRIORITY_DEFAULT,
+ NULL,
+ sparql_update_array_cb,
+ priv->sparql_buffer);
+
+ /* Clear current parent */
+ if (priv->sparql_buffer_current_parent) {
+ g_object_unref (priv->sparql_buffer_current_parent);
+ priv->sparql_buffer_current_parent = NULL;
+ }
+ /* Clear buffer */
+ g_ptr_array_free (sparql_array, TRUE);
+ priv->sparql_buffer_start_time = 0;
+
+ /* Note the whole buffer is passed to the update_array callback,
+ * so no need to free it. */
+ priv->sparql_buffer = NULL;
+ } else {
+ g_debug ("(Sparql buffer) Nothing to flush");
+ }
+}
+
+static void
item_add_or_update_cb (TrackerMinerFS *fs,
ProcessData *data,
const GError *error)
{
+ TrackerMinerFSPrivate *priv;
gchar *uri;
+ GFile *parent;
+ priv = fs->private;
uri = g_file_get_uri (data->file);
if (error) {
@@ -1534,7 +1689,8 @@ item_add_or_update_cb (TrackerMinerFS *fs,
(error->code == DBUS_GERROR_NO_REPLY ||
error->code == DBUS_GERROR_TIMEOUT ||
error->code == DBUS_GERROR_TIMED_OUT)) {
- g_debug (" Got DBus timeout error on '%s', but it could not be caused by it. Retrying file.", uri);
+ g_debug (" Got DBus timeout error on '%s', but it could not "
+ "be caused by it. Retrying file.", uri);
/* Reset the TrackerSparqlBuilder */
g_object_unref (data->builder);
@@ -1551,50 +1707,79 @@ item_add_or_update_cb (TrackerMinerFS *fs,
item_queue_handlers_set_up (fs);
}
- } else {
- gchar *full_sparql;
-
- if (data->urn) {
- gboolean attribute_update_only;
-
- attribute_update_only = GPOINTER_TO_INT (g_object_steal_qdata (G_OBJECT (data->file),
- fs->private->quark_attribute_updated));
- g_debug ("Updating item '%s' with urn '%s'%s",
- uri,
- data->urn,
- attribute_update_only ? " (attributes only)" : "");
-
- if (!attribute_update_only) {
- /* update, delete all statements inserted by miner
- * except for rdf:type statements as they could cause implicit deletion of user data */
- full_sparql = g_strdup_printf ("DELETE { GRAPH <%s> { <%s> ?p ?o } } "
- "WHERE { GRAPH <%s> { <%s> ?p ?o FILTER (?p != rdf:type) } } %s",
- TRACKER_MINER_FS_GRAPH_URN,
- data->urn,
- TRACKER_MINER_FS_GRAPH_URN,
- data->urn,
- tracker_sparql_builder_get_result (data->builder));
- } else {
- /* Do not drop graph if only updating attributes, the SPARQL builder
- * will already contain the necessary DELETE statements for the properties
- * being updated */
- full_sparql = g_strdup (tracker_sparql_builder_get_result (data->builder));
- }
- } else {
- g_debug ("Creating new item '%s'", uri);
+ g_free (uri);
+ return;
+ }
- /* new file */
- full_sparql = g_strdup (tracker_sparql_builder_get_result (data->builder));
+ /* Get parent of this file we're updating/creating */
+ parent = g_file_get_parent (data->file);
+
+ /* Start buffer if it not already done */
+ if (!priv->sparql_buffer) {
+ priv->sparql_buffer = g_ptr_array_new_with_free_func ((GDestroyNotify)process_data_free);
+ priv->sparql_buffer_start_time = time (NULL);
+ }
+
+ /* Append new SPARQL to buffer */
+ if (data->urn) {
+ gboolean attribute_update_only;
+
+ attribute_update_only = GPOINTER_TO_INT (g_object_steal_qdata (G_OBJECT (data->file),
+ fs->private->quark_attribute_updated));
+ g_debug ("(Sparql buffer) Appending item update: '%s' with urn '%s'%s",
+ uri,
+ data->urn,
+ attribute_update_only ? " (attributes only)" : "");
+
+ if (!attribute_update_only) {
+ /* update, delete all statements inserted by miner
+ * except for rdf:type statements as they could cause implicit deletion of user data */
+ data->sparql = g_strdup_printf ("DELETE { GRAPH <%s> { <%s> ?p ?o } } "
+ "WHERE { GRAPH <%s> { <%s> ?p ?o FILTER (?p != rdf:type) } } %s",
+ TRACKER_MINER_FS_GRAPH_URN,
+ data->urn,
+ TRACKER_MINER_FS_GRAPH_URN,
+ data->urn,
+ tracker_sparql_builder_get_result (data->builder));
+ } else {
+ /* Do not drop graph if only updating attributes, the SPARQL builder
+ * will already contain the necessary DELETE statements for the properties
+ * being updated */
+ data->sparql = g_strdup (tracker_sparql_builder_get_result (data->builder));
}
+ } else {
+ /* new file */
+ g_debug ("(Sparql buffer) Appending item creation: '%s'", uri);
+ data->sparql = g_strdup (tracker_sparql_builder_get_result (data->builder));
+ }
- tracker_sparql_connection_update_async (tracker_miner_get_connection (TRACKER_MINER (fs)),
- full_sparql,
- G_PRIORITY_DEFAULT,
- NULL,
- sparql_update_cb, data);
- g_free (full_sparql);
+ /* Set current parent if not set already */
+ if (!priv->sparql_buffer_current_parent && parent) {
+ priv->sparql_buffer_current_parent = g_object_ref (parent);
}
+ /* Add processing element to array */
+ g_ptr_array_add (priv->sparql_buffer, data);
+
+ /* Flush buffer if:
+ * - Last item has no parent
+ * - Parent change was detected
+ * - 'process-pool-limit' items reached
+ * - Not flushed in the last MAX_SPARQL_BUFFER_TIME seconds
+ */
+ if (!parent ||
+ !g_file_equal (parent, priv->sparql_buffer_current_parent) ||
+ priv->sparql_buffer->len >= fs->private->pool_limit ||
+ (time (NULL) - priv->sparql_buffer_start_time > MAX_SPARQL_BUFFER_TIME)) {
+ /* Flush! */
+ sparql_buffer_flush (fs);
+ } else {
+ /* If we didn't do the flush after the append, just re-setup handlers */
+ item_queue_handlers_set_up (fs);
+ }
+
+ if (parent)
+ g_object_unref (parent);
g_free (uri);
}
@@ -1699,6 +1884,9 @@ item_remove (TrackerMinerFS *fs,
return TRUE;
}
+ /* Before dumping the DELETE event, flush any possible CREATED/UPDATED event */
+ sparql_buffer_flush (fs);
+
tracker_thumbnailer_remove_add (uri, mime);
g_free (mime);
@@ -1731,7 +1919,8 @@ item_remove (TrackerMinerFS *fs,
sparql->str,
G_PRIORITY_DEFAULT,
NULL,
- sparql_update_cb, data);
+ sparql_update_cb,
+ data);
g_string_free (sparql, TRUE);
g_free (uri);
@@ -1984,6 +2173,9 @@ item_move (TrackerMinerFS *fs,
return retval;
}
+ /* Before dumping the MOVE event, flush any possible CREATED/UPDATED event */
+ sparql_buffer_flush (fs);
+
g_debug ("Moving item from '%s' to '%s'",
source_uri,
uri);
@@ -2047,7 +2239,8 @@ item_move (TrackerMinerFS *fs,
sparql->str,
G_PRIORITY_DEFAULT,
NULL,
- sparql_update_cb, data);
+ sparql_update_cb,
+ data);
g_free (uri);
g_free (source_uri);
@@ -2377,6 +2570,10 @@ item_queue_handlers_cb (gpointer user_data)
* the next directories batch.
*/
fs->private->item_queues_handler_id = 0;
+
+ /* Flush any possible pending update here */
+ sparql_buffer_flush (fs);
+
return FALSE;
}
@@ -2449,6 +2646,10 @@ item_queue_handlers_cb (gpointer user_data)
process_stop (fs);
}
+ /* Flush any possible pending update here */
+ g_debug ("(Sparql buffer) No more items in queue, flushing...");
+ sparql_buffer_flush (fs);
+
tracker_thumbnailer_send ();
/* No more files left to process */
keep_processing = FALSE;
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]