[tracker/wip/miner-priority-queues: 14/19] libtracker-miner: Use TrackerSparqlBuffer for metadata insertions
- From: Carlos Garnacho <carlosg src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [tracker/wip/miner-priority-queues: 14/19] libtracker-miner: Use TrackerSparqlBuffer for metadata insertions
- Date: Wed, 6 Jul 2011 16:28:15 +0000 (UTC)
commit bc47535616f903383400ec417168683945fc0d17
Author: Carlos Garnacho <carlos lanedo com>
Date: Tue Jul 5 18:20:55 2011 +0200
libtracker-miner: Use TrackerSparqlBuffer for metadata insertions
src/libtracker-miner/tracker-miner-fs.c | 234 +++++++++++++++++-------------
1 files changed, 133 insertions(+), 101 deletions(-)
---
diff --git a/src/libtracker-miner/tracker-miner-fs.c b/src/libtracker-miner/tracker-miner-fs.c
index 77b7534..81ce440 100644
--- a/src/libtracker-miner/tracker-miner-fs.c
+++ b/src/libtracker-miner/tracker-miner-fs.c
@@ -31,9 +31,9 @@
#include "tracker-monitor.h"
#include "tracker-utils.h"
#include "tracker-thumbnailer.h"
-#include "tracker-miner-fs-processing-pool.h"
#include "tracker-priority-queue.h"
#include "tracker-task-pool.h"
+#include "tracker-sparql-buffer.h"
/* If defined will print the tree from GNode while running */
#ifdef CRAWLED_TREE_ENABLE_TRACE
@@ -205,7 +205,8 @@ struct _TrackerMinerFSPrivate {
TrackerTaskPool *task_pool;
/* Sparql insertion tasks */
- TrackerProcessingPool *processing_pool;
+ TrackerSparqlBuffer *sparql_buffer;
+ guint sparql_buffer_limit;
/* URI mtime cache */
GFile *current_mtime_cache_parent;
@@ -282,6 +283,8 @@ enum {
PROP_INITIAL_CRAWLING
};
+static void miner_fs_initable_iface_init (GInitableIface *iface);
+
static void fs_finalize (GObject *object);
static void fs_set_property (GObject *object,
guint prop_id,
@@ -373,9 +376,12 @@ static void task_pool_cancel_foreach (gpointer d
gpointer user_data);
+static GInitableIface* miner_fs_initable_parent_iface;
static guint signals[LAST_SIGNAL] = { 0, };
-G_DEFINE_ABSTRACT_TYPE (TrackerMinerFS, tracker_miner_fs, TRACKER_TYPE_MINER)
+G_DEFINE_ABSTRACT_TYPE_WITH_CODE (TrackerMinerFS, tracker_miner_fs, TRACKER_TYPE_MINER,
+ G_IMPLEMENT_INTERFACE (G_TYPE_INITABLE,
+ miner_fs_initable_iface_init));
static void
tracker_miner_fs_class_init (TrackerMinerFSClass *klass)
@@ -696,10 +702,6 @@ tracker_miner_fs_init (TrackerMinerFS *object)
/* Create processing pools */
priv->task_pool = tracker_task_pool_new (DEFAULT_WAIT_POOL_LIMIT);
- priv->processing_pool = tracker_processing_pool_new (object,
- DEFAULT_WAIT_POOL_LIMIT,
- DEFAULT_READY_POOL_LIMIT,
- DEFAULT_N_REQUESTS_POOL_LIMIT);
/* Set up the crawlers now we have config and hal */
priv->crawler = tracker_crawler_new ();
@@ -758,6 +760,33 @@ tracker_miner_fs_init (TrackerMinerFS *object)
priv->dirs_without_parent = NULL;
}
+static gboolean
+miner_fs_initable_init (GInitable *initable,
+ GCancellable *cancellable,
+ GError **error)
+{
+ TrackerMinerFSPrivate *priv;
+ guint limit;
+
+ if (!miner_fs_initable_parent_iface->init (initable, cancellable, error)) {
+ return FALSE;
+ }
+
+ priv = TRACKER_MINER_FS_GET_PRIVATE (initable);
+
+ g_object_get (initable, "processing-pool-ready-limit", &limit, NULL);
+ priv->sparql_buffer = tracker_sparql_buffer_new (tracker_miner_get_connection (TRACKER_MINER (initable)),
+ limit);
+ return TRUE;
+}
+
+static void
+miner_fs_initable_iface_init (GInitableIface *iface)
+{
+ miner_fs_initable_parent_iface = g_type_interface_peek_parent (iface);
+ iface->init = miner_fs_initable_init;
+}
+
static void
fs_finalize (GObject *object)
{
@@ -806,7 +835,9 @@ fs_finalize (GObject *object)
NULL);
g_object_unref (priv->task_pool);
- tracker_processing_pool_free (priv->processing_pool);
+ if (priv->sparql_buffer) {
+ g_object_unref (priv->sparql_buffer);
+ }
tracker_priority_queue_foreach (priv->items_moved,
(GFunc) item_moved_data_free,
@@ -871,12 +902,18 @@ fs_set_property (GObject *object,
g_value_get_uint (value));
break;
case PROP_READY_POOL_LIMIT:
- tracker_processing_pool_set_ready_limit (fs->priv->processing_pool,
- g_value_get_uint (value));
+ fs->priv->sparql_buffer_limit = g_value_get_uint (value);
+
+ if (fs->priv->sparql_buffer) {
+ tracker_task_pool_set_limit (TRACKER_TASK_POOL (fs->priv->sparql_buffer),
+ fs->priv->sparql_buffer_limit);
+ }
break;
case PROP_N_REQUESTS_POOL_LIMIT:
+#if 0
tracker_processing_pool_set_n_requests_limit (fs->priv->processing_pool,
g_value_get_uint (value));
+#endif
break;
case PROP_MTIME_CHECKING:
fs->priv->mtime_checking = g_value_get_boolean (value);
@@ -909,12 +946,13 @@ fs_get_property (GObject *object,
tracker_task_pool_get_limit (fs->priv->task_pool));
break;
case PROP_READY_POOL_LIMIT:
- g_value_set_uint (value,
- tracker_processing_pool_get_ready_limit (fs->priv->processing_pool));
+ g_value_set_uint (value, fs->priv->sparql_buffer_limit);
break;
case PROP_N_REQUESTS_POOL_LIMIT:
+#if 0
g_value_set_uint (value,
tracker_processing_pool_get_n_requests_limit (fs->priv->processing_pool));
+#endif
break;
case PROP_MTIME_CHECKING:
g_value_set_boolean (value, fs->priv->mtime_checking);
@@ -1150,48 +1188,50 @@ item_moved_data_free (ItemMovedData *data)
}
static void
-processing_pool_task_finished_cb (TrackerProcessingTask *task,
- gpointer user_data,
- const GError *error)
+sparql_buffer_task_finished_cb (GObject *object,
+ GAsyncResult *result,
+ gpointer user_data)
{
TrackerMinerFS *fs;
TrackerMinerFSPrivate *priv;
+ TrackerTask *task;
+ GError *error = NULL;
fs = user_data;
priv = fs->priv;
- if (error) {
+ if (g_simple_async_result_propagate_error (G_SIMPLE_ASYNC_RESULT (result),
+ &error)) {
g_critical ("Could not execute sparql: %s", error->message);
priv->total_files_notified_error++;
- } else {
- if (fs->priv->current_iri_cache_parent) {
- GFile *parent;
- GFile *task_file;
-
- task_file = tracker_processing_task_get_file (task);
-
- /* Note: parent may be NULL if the file represents
- * the root directory of the file system (applies to
- * .gvfs mounts also!) */
- parent = g_file_get_parent (task_file);
-
- if (parent) {
- if (g_file_equal (parent, fs->priv->current_iri_cache_parent) &&
- g_hash_table_lookup (fs->priv->iri_cache, task_file) == NULL) {
- /* Item is processed, add an empty element for the processed GFile,
- * in case it is again processed before the cache expires
- */
- g_hash_table_insert (fs->priv->iri_cache,
- g_object_ref (task_file),
- NULL);
- }
+ g_error_free (error);
+ } else if (fs->priv->current_iri_cache_parent) {
+ GFile *parent;
+ GFile *task_file;
+
+ task = g_simple_async_result_get_op_res_gpointer (G_SIMPLE_ASYNC_RESULT (result));
+ task_file = tracker_task_get_file (task);
- g_object_unref (parent);
+ /* Note: parent may be NULL if the file represents
+ * the root directory of the file system (applies to
+ * .gvfs mounts also!) */
+ parent = g_file_get_parent (task_file);
+
+ if (parent) {
+ if (g_file_equal (parent, fs->priv->current_iri_cache_parent) &&
+ g_hash_table_lookup (fs->priv->iri_cache, task_file) == NULL) {
+ /* Item is processed, add an empty element for the processed GFile,
+ * in case it is again processed before the cache expires
+ */
+ g_hash_table_insert (fs->priv->iri_cache,
+ g_object_ref (task_file),
+ NULL);
}
+
+ g_object_unref (parent);
}
}
- tracker_processing_task_unref (task);
item_queue_handlers_set_up (fs);
}
@@ -1697,7 +1737,7 @@ item_add_or_update_cb (TrackerMinerFS *fs,
const GError *error)
{
UpdateProcessingTaskContext *ctxt;
- TrackerProcessingTask *task;
+ TrackerTask *sparql_task;
GFile *task_file;
gchar *uri;
@@ -1742,8 +1782,6 @@ item_add_or_update_cb (TrackerMinerFS *fs,
}
}
- task = tracker_processing_task_new (task_file);
-
if (ctxt->urn) {
gboolean attribute_update_only;
@@ -1789,13 +1827,12 @@ item_add_or_update_cb (TrackerMinerFS *fs,
ctxt->urn, ctxt->urn,
tracker_sparql_builder_get_result (ctxt->builder));
- /* Note that set_sparql_string() takes ownership of the passed string */
- tracker_processing_task_set_sparql_string (task, full_sparql);
+ sparql_task = tracker_sparql_task_new_take_sparql_str (task_file, full_sparql);
} else {
/* Do not drop graph if only updating attributes, the SPARQL builder
* will already contain the necessary DELETE statements for the properties
* being updated */
- tracker_processing_task_set_sparql (task, ctxt->builder);
+ sparql_task = tracker_sparql_task_new_with_sparql (task_file, ctxt->builder);
}
} else {
if (error != NULL) {
@@ -1806,15 +1843,15 @@ item_add_or_update_cb (TrackerMinerFS *fs,
g_debug ("Creating new item '%s'", uri);
}
- tracker_processing_task_set_sparql (task, ctxt->builder);
+ sparql_task = tracker_sparql_task_new_with_sparql (task_file, ctxt->builder);
}
- /* If push_ready_task() returns FALSE, it means the actual db update was delayed,
- * and in this case we need to setup queue handlers again */
- if (!tracker_processing_pool_push_ready_task (fs->priv->processing_pool,
- task,
- processing_pool_task_finished_cb,
- fs)) {
+ tracker_sparql_buffer_push (fs->priv->sparql_buffer,
+ sparql_task,
+ sparql_buffer_task_finished_cb,
+ fs);
+
+ if (!tracker_task_pool_limit_reached (TRACKER_TASK_POOL (fs->priv->sparql_buffer))) {
item_queue_handlers_set_up (fs);
}
@@ -1915,7 +1952,7 @@ item_remove (TrackerMinerFS *fs,
{
gchar *uri;
gchar *mime = NULL;
- TrackerProcessingTask *task;
+ TrackerTask *task;
iri_cache_invalidate (fs, file);
uri = g_file_get_uri (file);
@@ -1943,22 +1980,17 @@ item_remove (TrackerMinerFS *fs,
* device). */
/* Add new task to processing pool */
- task = tracker_processing_task_new (file);
- tracker_processing_task_set_bulk_operation (task,
- "DELETE { "
- " ?f tracker:available true "
- "}",
- TRACKER_BULK_MATCH_EQUALS |
- TRACKER_BULK_MATCH_CHILDREN);
-
- /* If push_ready_task() returns FALSE, it means the actual db update was delayed,
- * and in this case we need to setup queue handlers again */
- if (!tracker_processing_pool_push_ready_task (fs->priv->processing_pool,
- task,
- processing_pool_task_finished_cb,
- fs)) {
- item_queue_handlers_set_up (fs);
- }
+ task = tracker_sparql_task_new_bulk (file,
+ "DELETE { "
+ " ?f tracker:available true "
+ "}",
+ TRACKER_BULK_MATCH_EQUALS |
+ TRACKER_BULK_MATCH_CHILDREN);
+
+ tracker_sparql_buffer_push (fs->priv->sparql_buffer,
+ task,
+ sparql_buffer_task_finished_cb,
+ fs);
/* SECOND:
* Actually remove all resources. This operation is the one which may take
@@ -1966,20 +1998,19 @@ item_remove (TrackerMinerFS *fs,
*/
/* Add new task to processing pool */
- task = tracker_processing_task_new (file);
- tracker_processing_task_set_bulk_operation (task,
- "DELETE { "
- " ?f a rdfs:Resource "
- "}",
- TRACKER_BULK_MATCH_EQUALS |
- TRACKER_BULK_MATCH_CHILDREN);
-
- /* If push_ready_task() returns FALSE, it means the actual db update was delayed,
- * and in this case we need to setup queue handlers again */
- if (!tracker_processing_pool_push_ready_task (fs->priv->processing_pool,
- task,
- processing_pool_task_finished_cb,
- fs)) {
+ task = tracker_sparql_task_new_bulk (file,
+ "DELETE { "
+ " ?f a rdfs:Resource "
+ "}",
+ TRACKER_BULK_MATCH_EQUALS |
+ TRACKER_BULK_MATCH_CHILDREN);
+
+ tracker_sparql_buffer_push (fs->priv->sparql_buffer,
+ task,
+ sparql_buffer_task_finished_cb,
+ fs);
+
+ if (!tracker_task_pool_limit_reached (TRACKER_TASK_POOL (fs->priv->sparql_buffer))) {
item_queue_handlers_set_up (fs);
}
@@ -2172,7 +2203,7 @@ item_move (TrackerMinerFS *fs,
GFileInfo *file_info;
GString *sparql;
RecursiveMoveData move_data;
- TrackerProcessingTask *task;
+ TrackerTask *task;
gchar *source_iri;
gchar *display_name;
gboolean source_exists;
@@ -2318,16 +2349,15 @@ item_move (TrackerMinerFS *fs,
g_main_loop_unref (move_data.main_loop);
/* Add new task to processing pool */
- task = tracker_processing_task_new (file);
- /* Note that set_sparql_string() takes ownership of the passed string */
- tracker_processing_task_set_sparql_string (task,
- g_string_free (sparql, FALSE));
- /* If push_ready_task() returns FALSE, it means the actual db update was delayed,
- * and in this case we need to setup queue handlers again */
- if (!tracker_processing_pool_push_ready_task (fs->priv->processing_pool,
- task,
- processing_pool_task_finished_cb,
- fs)) {
+ task = tracker_sparql_task_new_take_sparql_str (file,
+ g_string_free (sparql,
+ FALSE));
+ tracker_sparql_buffer_push (fs->priv->sparql_buffer,
+ task,
+ sparql_buffer_task_finished_cb,
+ fs);
+
+ if (!tracker_task_pool_limit_reached (TRACKER_TASK_POOL (fs->priv->sparql_buffer))) {
item_queue_handlers_set_up (fs);
}
@@ -2748,8 +2778,8 @@ item_queue_handlers_cb (gpointer user_data)
* if there was a previous task on the same file we want to
* process now, we want it to get finished before we can go
* on with the queues... */
- tracker_processing_pool_buffer_flush (fs->priv->processing_pool,
- "Queue handlers WAIT");
+ tracker_sparql_buffer_flush (fs->priv->sparql_buffer,
+ "Queue handlers WAIT");
return FALSE;
}
@@ -2859,13 +2889,13 @@ item_queue_handlers_cb (gpointer user_data)
/* Print stats and signal finished */
if (!fs->priv->is_crawling &&
tracker_task_pool_get_size (fs->priv->task_pool) == 0 &&
- tracker_processing_pool_get_total_task_count (fs->priv->processing_pool) == 0) {
+ tracker_task_pool_get_size (TRACKER_TASK_POOL (fs->priv->sparql_buffer)) == 0) {
process_stop (fs);
}
/* Flush any possible pending update here */
- tracker_processing_pool_buffer_flush (fs->priv->processing_pool,
- "Queue handlers NONE");
+ tracker_sparql_buffer_flush (fs->priv->sparql_buffer,
+ "Queue handlers NONE");
tracker_thumbnailer_send ();
tracker_albumart_check_cleanup (tracker_miner_get_connection (TRACKER_MINER (fs)));
@@ -2957,6 +2987,7 @@ item_queue_handlers_set_up (TrackerMinerFS *fs)
return;
}
+#if 0
/* Already sent max number of requests to tracker-store?
* In this case, we also slow down the processing of items, as we don't
* want to keep on extracting if the communication with tracker-store is
@@ -2966,6 +2997,7 @@ item_queue_handlers_set_up (TrackerMinerFS *fs)
if (tracker_processing_pool_n_requests_limit_reached (fs->priv->processing_pool)) {
return;
}
+#endif
if (!fs->priv->is_crawling) {
gchar *status;
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]