[tracker/multi-insert] libtracker-miner: Use new sparql_update_array API



commit 87497ecc527f4c731c0f7c10f88e8c41a7d1d6f0
Author: Aleksander Morgado <aleksander lanedo com>
Date:   Mon Oct 11 16:21:08 2010 +0200

    libtracker-miner: Use new sparql_update_array API
    
     * Note. Right now, the max number of items to include in the same SPARQL
       connection is equal to 'process-pool-limit', which defaults to 10 in
       tracker-miner-files and to 1 in tracker-miner-applications.

 src/libtracker-miner/tracker-miner-fs.c |  285 ++++++++++++++++++++++++++-----
 1 files changed, 243 insertions(+), 42 deletions(-)
---
diff --git a/src/libtracker-miner/tracker-miner-fs.c b/src/libtracker-miner/tracker-miner-fs.c
index 47965b3..bef412c 100644
--- a/src/libtracker-miner/tracker-miner-fs.c
+++ b/src/libtracker-miner/tracker-miner-fs.c
@@ -38,6 +38,9 @@
 /* If defined will print contents of populated mtime cache while running */
 #undef PRINT_MTIME_CACHE_CONTENTS
 
+/* Maximum time (seconds) before forcing a sparql buffer flush */
+#define MAX_SPARQL_BUFFER_TIME  15
+
 /**
  * SECTION:tracker-miner-fs
  * @short_description: Abstract base class for filesystem miners
@@ -66,6 +69,7 @@ typedef struct {
 	GFile *file;
 	gchar *urn;
 	gchar *parent_urn;
+	gchar *sparql;
 	GCancellable *cancellable;
 	TrackerSparqlBuilder *builder;
 	TrackerMiner *miner;
@@ -143,6 +147,11 @@ struct _TrackerMinerFSPrivate {
 	/* Files to check if no longer exist */
 	GHashTable     *check_removed;
 
+	/* SPARQL buffer to pile up several UPDATEs */
+	GPtrArray      *sparql_buffer;
+	GFile          *sparql_buffer_current_parent;
+	time_t          sparql_buffer_start_time;
+
 	/* Status */
 	guint           been_started : 1;
 	guint           been_crawled : 1;
@@ -661,6 +670,8 @@ process_data_free (ProcessData *data)
 		g_object_unref (data->builder);
 	}
 
+	g_free (data->sparql);
+
 	g_slice_free (ProcessData, data);
 }
 
@@ -734,6 +745,14 @@ fs_finalize (GObject *object)
 		g_list_free (priv->config_directories);
 	}
 
+	if (priv->sparql_buffer_current_parent) {
+		g_object_unref (priv->sparql_buffer_current_parent);
+	}
+
+	if (priv->sparql_buffer) {
+		g_ptr_array_free (priv->sparql_buffer, TRUE);
+	}
+
 	g_queue_foreach (priv->crawled_directories, (GFunc) crawled_directory_data_free, NULL);
 	g_queue_free (priv->crawled_directories);
 
@@ -1059,6 +1078,10 @@ sparql_update_cb (GObject      *object,
 
 	tracker_sparql_connection_update_finish (TRACKER_SPARQL_CONNECTION (object), result, &error);
 
+	/* If update was done when crawling finished, no data will be given */
+	if (!user_data)
+		return;
+
 	data = user_data;
 	fs = TRACKER_MINER_FS (data->miner);
 	priv = fs->private;
@@ -1509,12 +1532,144 @@ do_process_file (TrackerMinerFS *fs,
 }
 
 static void
+sparql_update_array_cb (GObject      *object,
+                        GAsyncResult *result,
+                        gpointer      user_data)
+{
+	TrackerMinerFS *fs = NULL;
+	GError *global_error = NULL;
+	gboolean batch_failed = FALSE;
+	GPtrArray *sparql_array_errors;
+	GPtrArray *sparql_array;
+	guint i;
+
+	/* Get arrays of errors and queries */
+	sparql_array = user_data;
+	sparql_array_errors = tracker_sparql_connection_update_array_finish (TRACKER_SPARQL_CONNECTION (object),
+	                                                                     result,
+	                                                                     &global_error);
+	if (global_error) {
+		g_critical ("Could not execute array-update with '%u' items: %s",
+		            sparql_array->len,
+		            global_error->message);
+		fs->private->total_files_notified_error += sparql_array->len;
+		g_error_free (global_error);
+		batch_failed = TRUE;
+	}
+
+	for (i = 0; i < sparql_array->len; i++) {
+		ProcessData *data;
+
+		data = g_ptr_array_index (sparql_array, i);
+		if (!fs) {
+			fs = TRACKER_MINER_FS (data->miner);
+		}
+
+		if (!batch_failed) {
+			GError *error;
+
+			error = g_ptr_array_index (sparql_array_errors, i);
+			if (error) {
+				g_critical ("Could not execute sparql (%s): %s",
+				            data->sparql,
+				            error->message);
+				fs->private->total_files_notified_error++;
+			} else {
+				if (fs->private->current_iri_cache_parent) {
+					GFile *parent;
+
+					/* Note: parent may be NULL if the file represents
+					 * the root directory of the file system (applies to
+					 * .gvfs mounts also!) */
+					parent = g_file_get_parent (data->file);
+
+					if (parent) {
+						if (g_file_equal (parent, fs->private->current_iri_cache_parent) &&
+						    g_hash_table_lookup (fs->private->iri_cache, data->file) == NULL) {
+							/* Item is processed, add an empty element for the processed GFile,
+							 * in case it is again processed before the cache expires
+							 */
+							g_hash_table_insert (fs->private->iri_cache,
+							                     g_object_ref (data->file),
+							                     NULL);
+						}
+
+						g_object_unref (parent);
+					}
+				}
+			}
+		}
+
+		/* For each reply, remove it from the processing pool */
+		fs->private->processing_pool = g_list_remove (fs->private->processing_pool, data);
+	}
+
+	/* Unref the arrays of errors and queries */
+	if (sparql_array_errors)
+		g_ptr_array_unref (sparql_array_errors);
+	g_ptr_array_unref (sparql_array);
+
+	item_queue_handlers_set_up (fs);
+}
+
+static void
+sparql_buffer_flush (TrackerMinerFS *fs)
+{
+	TrackerMinerFSPrivate *priv;
+
+	priv = fs->private;
+
+	if (priv->sparql_buffer) {
+		guint i;
+		GPtrArray *sparql_array;
+
+		/* Loop buffer and construct array of strings */
+		sparql_array = g_ptr_array_new ();
+		for (i = 0; i < priv->sparql_buffer->len; i++) {
+			ProcessData *data;
+
+			data = g_ptr_array_index (priv->sparql_buffer, i);
+			/* Add original string, not a duplicate */
+			g_ptr_array_add (sparql_array, data->sparql);
+		}
+
+		g_debug ("(Sparql buffer) Flushing buffer with '%u' items",
+		         priv->sparql_buffer->len);
+		tracker_sparql_connection_update_array_async (tracker_miner_get_connection (TRACKER_MINER (fs)),
+		                                              (gchar **)(sparql_array->pdata),
+		                                              sparql_array->len,
+		                                              G_PRIORITY_DEFAULT,
+		                                              NULL,
+		                                              sparql_update_array_cb,
+		                                              priv->sparql_buffer);
+
+		/* Clear current parent */
+		if (priv->sparql_buffer_current_parent) {
+			g_object_unref (priv->sparql_buffer_current_parent);
+			priv->sparql_buffer_current_parent = NULL;
+		}
+		/* Clear buffer */
+		g_ptr_array_free (sparql_array, TRUE);
+		priv->sparql_buffer_start_time = 0;
+
+		/* Note the whole buffer is passed to the update_array callback,
+		 * so no need to free it. */
+		priv->sparql_buffer = NULL;
+	} else {
+		g_debug ("(Sparql buffer) Nothing to flush");
+	}
+}
+
+static void
 item_add_or_update_cb (TrackerMinerFS *fs,
                        ProcessData    *data,
                        const GError   *error)
 {
+	TrackerMinerFSPrivate *priv;
 	gchar *uri;
+	GFile *parent;
 
+	priv = fs->private;
 	uri = g_file_get_uri (data->file);
 
 	if (error) {
@@ -1534,7 +1689,8 @@ item_add_or_update_cb (TrackerMinerFS *fs,
 		    (error->code == DBUS_GERROR_NO_REPLY ||
 		     error->code == DBUS_GERROR_TIMEOUT ||
 		     error->code == DBUS_GERROR_TIMED_OUT)) {
-			g_debug ("  Got DBus timeout error on '%s', but it could not be caused by it. Retrying file.", uri);
+			g_debug ("  Got DBus timeout error on '%s', but it could not "
+			         "be caused by it. Retrying file.", uri);
 
 			/* Reset the TrackerSparqlBuilder */
 			g_object_unref (data->builder);
@@ -1551,50 +1707,79 @@ item_add_or_update_cb (TrackerMinerFS *fs,
 
 			item_queue_handlers_set_up (fs);
 		}
-	} else {
-		gchar *full_sparql;
-
-		if (data->urn) {
-			gboolean attribute_update_only;
-
-			attribute_update_only = GPOINTER_TO_INT (g_object_steal_qdata (G_OBJECT (data->file),
-			                                                               fs->private->quark_attribute_updated));
-			g_debug ("Updating item '%s' with urn '%s'%s",
-			         uri,
-			         data->urn,
-			         attribute_update_only ? " (attributes only)" : "");
-
-			if (!attribute_update_only) {
-				/* update, delete all statements inserted by miner
-				 * except for rdf:type statements as they could cause implicit deletion of user data */
-				full_sparql = g_strdup_printf ("DELETE { GRAPH <%s> { <%s> ?p ?o } } "
-				                               "WHERE { GRAPH <%s> { <%s> ?p ?o FILTER (?p != rdf:type) } } %s",
-				                               TRACKER_MINER_FS_GRAPH_URN,
-				                               data->urn,
-				                               TRACKER_MINER_FS_GRAPH_URN,
-				                               data->urn,
-				                               tracker_sparql_builder_get_result (data->builder));
-			} else {
-				/* Do not drop graph if only updating attributes, the SPARQL builder
-				 * will already contain the necessary DELETE statements for the properties
-				 * being updated */
-				full_sparql = g_strdup (tracker_sparql_builder_get_result (data->builder));
-			}
-		} else {
-			g_debug ("Creating new item '%s'", uri);
+		g_free (uri);
+		return;
+	}
 
-			/* new file */
-			full_sparql = g_strdup (tracker_sparql_builder_get_result (data->builder));
+	/* Get parent of this file we're updating/creating */
+	parent = g_file_get_parent (data->file);
+
+	/* Start buffer if it not already done */
+	if (!priv->sparql_buffer) {
+		priv->sparql_buffer = g_ptr_array_new_with_free_func ((GDestroyNotify)process_data_free);
+		priv->sparql_buffer_start_time = time (NULL);
+	}
+
+	/* Append new SPARQL to buffer */
+	if (data->urn) {
+		gboolean attribute_update_only;
+
+		attribute_update_only = GPOINTER_TO_INT (g_object_steal_qdata (G_OBJECT (data->file),
+		                                                               fs->private->quark_attribute_updated));
+		g_debug ("(Sparql buffer) Appending item update: '%s' with urn '%s'%s",
+		         uri,
+		         data->urn,
+		         attribute_update_only ? " (attributes only)" : "");
+
+		if (!attribute_update_only) {
+			/* update, delete all statements inserted by miner
+			 * except for rdf:type statements as they could cause implicit deletion of user data */
+			data->sparql = g_strdup_printf ("DELETE { GRAPH <%s> { <%s> ?p ?o } } "
+			                                "WHERE { GRAPH <%s> { <%s> ?p ?o FILTER (?p != rdf:type) } } %s",
+			                                TRACKER_MINER_FS_GRAPH_URN,
+			                                data->urn,
+			                                TRACKER_MINER_FS_GRAPH_URN,
+			                                data->urn,
+			                                tracker_sparql_builder_get_result (data->builder));
+		} else {
+			/* Do not drop graph if only updating attributes, the SPARQL builder
+			 * will already contain the necessary DELETE statements for the properties
+			 * being updated */
+			data->sparql = g_strdup (tracker_sparql_builder_get_result (data->builder));
 		}
+	} else {
+		/* new file */
+		g_debug ("(Sparql buffer) Appending item creation: '%s'", uri);
+		data->sparql = g_strdup (tracker_sparql_builder_get_result (data->builder));
+	}
 
-		tracker_sparql_connection_update_async (tracker_miner_get_connection (TRACKER_MINER (fs)),
-		                                        full_sparql,
-		                                        G_PRIORITY_DEFAULT,
-		                                        NULL,
-		                                        sparql_update_cb, data);
-		g_free (full_sparql);
+	/* Set current parent if not set already */
+	if (!priv->sparql_buffer_current_parent && parent) {
+		priv->sparql_buffer_current_parent = g_object_ref (parent);
 	}
 
+	/* Add processing element to array */
+	g_ptr_array_add (priv->sparql_buffer, data);
+
+	/* Flush buffer if:
+	 *  - Last item has no parent
+	 *  - Parent change was detected
+	 *  - 'process-pool-limit' items reached
+	 *  - Not flushed in the last MAX_SPARQL_BUFFER_TIME seconds
+	 */
+	if (!parent ||
+	    !g_file_equal (parent, priv->sparql_buffer_current_parent) ||
+	    priv->sparql_buffer->len >= fs->private->pool_limit ||
+	    (time (NULL) - priv->sparql_buffer_start_time > MAX_SPARQL_BUFFER_TIME)) {
+		/* Flush! */
+		sparql_buffer_flush (fs);
+	} else {
+		/* If we didn't do the flush after the append, just re-setup handlers */
+		item_queue_handlers_set_up (fs);
+	}
+
+	if (parent)
+		g_object_unref (parent);
 	g_free (uri);
 }
 
@@ -1699,6 +1884,9 @@ item_remove (TrackerMinerFS *fs,
 		return TRUE;
 	}
 
+	/* Before dumping the DELETE event, flush any possible CREATED/UPDATED event */
+	sparql_buffer_flush (fs);
+
 	tracker_thumbnailer_remove_add (uri, mime);
 
 	g_free (mime);
@@ -1731,7 +1919,8 @@ item_remove (TrackerMinerFS *fs,
 	                                        sparql->str,
 	                                        G_PRIORITY_DEFAULT,
 	                                        NULL,
-	                                        sparql_update_cb, data);
+	                                        sparql_update_cb,
+	                                        data);
 
 	g_string_free (sparql, TRUE);
 	g_free (uri);
@@ -1984,6 +2173,9 @@ item_move (TrackerMinerFS *fs,
 		return retval;
 	}
 
+	/* Before dumping the MOVE event, flush any possible CREATED/UPDATED event */
+	sparql_buffer_flush (fs);
+
 	g_debug ("Moving item from '%s' to '%s'",
 	         source_uri,
 	         uri);
@@ -2047,7 +2239,8 @@ item_move (TrackerMinerFS *fs,
 	                                        sparql->str,
 	                                        G_PRIORITY_DEFAULT,
 	                                        NULL,
-	                                        sparql_update_cb, data);
+	                                        sparql_update_cb,
+	                                        data);
 
 	g_free (uri);
 	g_free (source_uri);
@@ -2377,6 +2570,10 @@ item_queue_handlers_cb (gpointer user_data)
 		 * the next directories batch.
 		 */
 		fs->private->item_queues_handler_id = 0;
+
+		/* Flush any possible pending update here */
+		sparql_buffer_flush (fs);
+
 		return FALSE;
 	}
 
@@ -2449,6 +2646,10 @@ item_queue_handlers_cb (gpointer user_data)
 			process_stop (fs);
 		}
 
+		/* Flush any possible pending update here */
+		g_debug ("(Sparql buffer) No more items in queue, flushing...");
+		sparql_buffer_flush (fs);
+
 		tracker_thumbnailer_send ();
 		/* No more files left to process */
 		keep_processing = FALSE;



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]