[tracker/miner-priority-queues: 14/22] libtracker-miner: Use TrackerSparqlBuffer for metadata insertions
- From: Carlos Garnacho <carlosg src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [tracker/miner-priority-queues: 14/22] libtracker-miner: Use TrackerSparqlBuffer for metadata insertions
- Date: Mon, 18 Jul 2011 15:39:03 +0000 (UTC)
commit 5fc88a31334cc9c03f101c4d7568466be1d52af0
Author: Carlos Garnacho <carlos lanedo com>
Date: Tue Jul 5 18:20:55 2011 +0200
libtracker-miner: Use TrackerSparqlBuffer for metadata insertions
As a side effect, the TrackerMinerFS::processing-pool-requests-limit property
has been removed, as flow control has been simplified to not needing this third
queue.
src/libtracker-miner/tracker-miner-fs.c | 265 ++++++++++++++++---------------
src/miners/fs/tracker-miner-files.c | 1 -
2 files changed, 135 insertions(+), 131 deletions(-)
---
diff --git a/src/libtracker-miner/tracker-miner-fs.c b/src/libtracker-miner/tracker-miner-fs.c
index 2b5690d..1574eea 100644
--- a/src/libtracker-miner/tracker-miner-fs.c
+++ b/src/libtracker-miner/tracker-miner-fs.c
@@ -31,9 +31,9 @@
#include "tracker-monitor.h"
#include "tracker-utils.h"
#include "tracker-thumbnailer.h"
-#include "tracker-miner-fs-processing-pool.h"
#include "tracker-priority-queue.h"
#include "tracker-task-pool.h"
+#include "tracker-sparql-buffer.h"
/* If defined will print the tree from GNode while running */
#ifdef CRAWLED_TREE_ENABLE_TRACE
@@ -100,7 +100,6 @@ static gboolean miner_fs_queues_status_trace_timeout_cb (gpointer data);
/* Default processing pool limits to be set */
#define DEFAULT_WAIT_POOL_LIMIT 1
#define DEFAULT_READY_POOL_LIMIT 1
-#define DEFAULT_N_REQUESTS_POOL_LIMIT 10
/* Put tasks processing at a lower priority so other events
* (timeouts, monitor events, etc...) are guaranteed to be
@@ -205,7 +204,8 @@ struct _TrackerMinerFSPrivate {
TrackerTaskPool *task_pool;
/* Sparql insertion tasks */
- TrackerProcessingPool *processing_pool;
+ TrackerSparqlBuffer *sparql_buffer;
+ guint sparql_buffer_limit;
/* URI mtime cache */
GFile *current_mtime_cache_parent;
@@ -277,11 +277,12 @@ enum {
PROP_THROTTLE,
PROP_WAIT_POOL_LIMIT,
PROP_READY_POOL_LIMIT,
- PROP_N_REQUESTS_POOL_LIMIT,
PROP_MTIME_CHECKING,
PROP_INITIAL_CRAWLING
};
+static void miner_fs_initable_iface_init (GInitableIface *iface);
+
static void fs_finalize (GObject *object);
static void fs_set_property (GObject *object,
guint prop_id,
@@ -373,9 +374,12 @@ static void task_pool_cancel_foreach (gpointer d
gpointer user_data);
+static GInitableIface* miner_fs_initable_parent_iface;
static guint signals[LAST_SIGNAL] = { 0, };
-G_DEFINE_ABSTRACT_TYPE (TrackerMinerFS, tracker_miner_fs, TRACKER_TYPE_MINER)
+G_DEFINE_ABSTRACT_TYPE_WITH_CODE (TrackerMinerFS, tracker_miner_fs, TRACKER_TYPE_MINER,
+ G_IMPLEMENT_INTERFACE (G_TYPE_INITABLE,
+ miner_fs_initable_iface_init));
static void
tracker_miner_fs_class_init (TrackerMinerFSClass *klass)
@@ -423,14 +427,6 @@ tracker_miner_fs_class_init (TrackerMinerFSClass *klass)
1, G_MAXUINT, DEFAULT_READY_POOL_LIMIT,
G_PARAM_READWRITE | G_PARAM_CONSTRUCT));
g_object_class_install_property (object_class,
- PROP_N_REQUESTS_POOL_LIMIT,
- g_param_spec_uint ("processing-pool-requests-limit",
- "Processing pool limit for number of requests",
- "Maximum number of SPARQL requests that can be sent "
- "to the store in parallel.",
- 1, G_MAXUINT, DEFAULT_N_REQUESTS_POOL_LIMIT,
- G_PARAM_READWRITE | G_PARAM_CONSTRUCT));
- g_object_class_install_property (object_class,
PROP_MTIME_CHECKING,
g_param_spec_boolean ("mtime-checking",
"Mtime checking",
@@ -696,10 +692,6 @@ tracker_miner_fs_init (TrackerMinerFS *object)
/* Create processing pools */
priv->task_pool = tracker_task_pool_new (DEFAULT_WAIT_POOL_LIMIT);
- priv->processing_pool = tracker_processing_pool_new (object,
- DEFAULT_WAIT_POOL_LIMIT,
- DEFAULT_READY_POOL_LIMIT,
- DEFAULT_N_REQUESTS_POOL_LIMIT);
/* Set up the crawlers now we have config and hal */
priv->crawler = tracker_crawler_new ();
@@ -758,6 +750,33 @@ tracker_miner_fs_init (TrackerMinerFS *object)
priv->dirs_without_parent = NULL;
}
+static gboolean
+miner_fs_initable_init (GInitable *initable,
+ GCancellable *cancellable,
+ GError **error)
+{
+ TrackerMinerFSPrivate *priv;
+ guint limit;
+
+ if (!miner_fs_initable_parent_iface->init (initable, cancellable, error)) {
+ return FALSE;
+ }
+
+ priv = TRACKER_MINER_FS_GET_PRIVATE (initable);
+
+ g_object_get (initable, "processing-pool-ready-limit", &limit, NULL);
+ priv->sparql_buffer = tracker_sparql_buffer_new (tracker_miner_get_connection (TRACKER_MINER (initable)),
+ limit);
+ return TRUE;
+}
+
+static void
+miner_fs_initable_iface_init (GInitableIface *iface)
+{
+ miner_fs_initable_parent_iface = g_type_interface_peek_parent (iface);
+ iface->init = miner_fs_initable_init;
+}
+
static void
fs_finalize (GObject *object)
{
@@ -806,7 +825,9 @@ fs_finalize (GObject *object)
NULL);
g_object_unref (priv->task_pool);
- tracker_processing_pool_free (priv->processing_pool);
+ if (priv->sparql_buffer) {
+ g_object_unref (priv->sparql_buffer);
+ }
tracker_priority_queue_foreach (priv->items_moved,
(GFunc) item_moved_data_free,
@@ -871,12 +892,12 @@ fs_set_property (GObject *object,
g_value_get_uint (value));
break;
case PROP_READY_POOL_LIMIT:
- tracker_processing_pool_set_ready_limit (fs->priv->processing_pool,
- g_value_get_uint (value));
- break;
- case PROP_N_REQUESTS_POOL_LIMIT:
- tracker_processing_pool_set_n_requests_limit (fs->priv->processing_pool,
- g_value_get_uint (value));
+ fs->priv->sparql_buffer_limit = g_value_get_uint (value);
+
+ if (fs->priv->sparql_buffer) {
+ tracker_task_pool_set_limit (TRACKER_TASK_POOL (fs->priv->sparql_buffer),
+ fs->priv->sparql_buffer_limit);
+ }
break;
case PROP_MTIME_CHECKING:
fs->priv->mtime_checking = g_value_get_boolean (value);
@@ -909,12 +930,7 @@ fs_get_property (GObject *object,
tracker_task_pool_get_limit (fs->priv->task_pool));
break;
case PROP_READY_POOL_LIMIT:
- g_value_set_uint (value,
- tracker_processing_pool_get_ready_limit (fs->priv->processing_pool));
- break;
- case PROP_N_REQUESTS_POOL_LIMIT:
- g_value_set_uint (value,
- tracker_processing_pool_get_n_requests_limit (fs->priv->processing_pool));
+ g_value_set_uint (value, fs->priv->sparql_buffer_limit);
break;
case PROP_MTIME_CHECKING:
g_value_set_boolean (value, fs->priv->mtime_checking);
@@ -1150,48 +1166,50 @@ item_moved_data_free (ItemMovedData *data)
}
static void
-processing_pool_task_finished_cb (TrackerProcessingTask *task,
- gpointer user_data,
- const GError *error)
+sparql_buffer_task_finished_cb (GObject *object,
+ GAsyncResult *result,
+ gpointer user_data)
{
TrackerMinerFS *fs;
TrackerMinerFSPrivate *priv;
+ TrackerTask *task;
+ GError *error = NULL;
fs = user_data;
priv = fs->priv;
- if (error) {
+ if (g_simple_async_result_propagate_error (G_SIMPLE_ASYNC_RESULT (result),
+ &error)) {
g_critical ("Could not execute sparql: %s", error->message);
priv->total_files_notified_error++;
- } else {
- if (fs->priv->current_iri_cache_parent) {
- GFile *parent;
- GFile *task_file;
-
- task_file = tracker_processing_task_get_file (task);
-
- /* Note: parent may be NULL if the file represents
- * the root directory of the file system (applies to
- * .gvfs mounts also!) */
- parent = g_file_get_parent (task_file);
-
- if (parent) {
- if (g_file_equal (parent, fs->priv->current_iri_cache_parent) &&
- g_hash_table_lookup (fs->priv->iri_cache, task_file) == NULL) {
- /* Item is processed, add an empty element for the processed GFile,
- * in case it is again processed before the cache expires
- */
- g_hash_table_insert (fs->priv->iri_cache,
- g_object_ref (task_file),
- NULL);
- }
+ g_error_free (error);
+ } else if (fs->priv->current_iri_cache_parent) {
+ GFile *parent;
+ GFile *task_file;
+
+ task = g_simple_async_result_get_op_res_gpointer (G_SIMPLE_ASYNC_RESULT (result));
+ task_file = tracker_task_get_file (task);
- g_object_unref (parent);
+ /* Note: parent may be NULL if the file represents
+ * the root directory of the file system (applies to
+ * .gvfs mounts also!) */
+ parent = g_file_get_parent (task_file);
+
+ if (parent) {
+ if (g_file_equal (parent, fs->priv->current_iri_cache_parent) &&
+ g_hash_table_lookup (fs->priv->iri_cache, task_file) == NULL) {
+ /* Item is processed, add an empty element for the processed GFile,
+ * in case it is again processed before the cache expires
+ */
+ g_hash_table_insert (fs->priv->iri_cache,
+ g_object_ref (task_file),
+ NULL);
}
+
+ g_object_unref (parent);
}
}
- tracker_processing_task_unref (task);
item_queue_handlers_set_up (fs);
}
@@ -1697,7 +1715,7 @@ item_add_or_update_cb (TrackerMinerFS *fs,
const GError *error)
{
UpdateProcessingTaskContext *ctxt;
- TrackerProcessingTask *task;
+ TrackerTask *sparql_task;
GFile *task_file;
gchar *uri;
@@ -1711,8 +1729,7 @@ item_add_or_update_cb (TrackerMinerFS *fs,
g_message ("Could not process '%s': %s", uri, error->message);
fs->priv->total_files_notified_error++;
-
- item_queue_handlers_set_up (fs);
+ sparql_task = tracker_sparql_task_new_with_sparql (task_file, ctxt->builder);
} else {
if (ctxt->urn) {
gboolean attribute_update_only;
@@ -1756,27 +1773,26 @@ item_add_or_update_cb (TrackerMinerFS *fs,
ctxt->urn, ctxt->urn,
tracker_sparql_builder_get_result (ctxt->builder));
- /* Note that set_sparql_string() takes ownership of the passed string */
- tracker_processing_task_set_sparql_string (task, full_sparql);
+ sparql_task = tracker_sparql_task_new_take_sparql_str (task_file, full_sparql);
} else {
/* Do not drop graph if only updating attributes, the SPARQL builder
* will already contain the necessary DELETE statements for the properties
* being updated */
- tracker_processing_task_set_sparql (task, ctxt->builder);
+ sparql_task = tracker_sparql_task_new_with_sparql (task_file, ctxt->builder);
}
} else {
g_debug ("Creating new item '%s'", uri);
- tracker_processing_task_set_sparql (task, ctxt->builder);
+ sparql_task = tracker_sparql_task_new_with_sparql (task_file, ctxt->builder);
}
+ }
- /* If push_ready_task() returns FALSE, it means the actual db update was delayed,
- * and in this case we need to setup queue handlers again */
- if (!tracker_processing_pool_push_ready_task (fs->priv->processing_pool,
- task,
- processing_pool_task_finished_cb,
- fs)) {
- item_queue_handlers_set_up (fs);
- }
+ tracker_sparql_buffer_push (fs->priv->sparql_buffer,
+ sparql_task,
+ sparql_buffer_task_finished_cb,
+ fs);
+
+ if (!tracker_task_pool_limit_reached (TRACKER_TASK_POOL (fs->priv->sparql_buffer))) {
+ item_queue_handlers_set_up (fs);
}
tracker_task_unref (extraction_task);
@@ -1875,7 +1891,7 @@ item_remove (TrackerMinerFS *fs,
{
gchar *uri;
gchar *mime = NULL;
- TrackerProcessingTask *task;
+ TrackerTask *task;
iri_cache_invalidate (fs, file);
uri = g_file_get_uri (file);
@@ -1903,22 +1919,17 @@ item_remove (TrackerMinerFS *fs,
* device). */
/* Add new task to processing pool */
- task = tracker_processing_task_new (file);
- tracker_processing_task_set_bulk_operation (task,
- "DELETE { "
- " ?f tracker:available true "
- "}",
- TRACKER_BULK_MATCH_EQUALS |
- TRACKER_BULK_MATCH_CHILDREN);
-
- /* If push_ready_task() returns FALSE, it means the actual db update was delayed,
- * and in this case we need to setup queue handlers again */
- if (!tracker_processing_pool_push_ready_task (fs->priv->processing_pool,
- task,
- processing_pool_task_finished_cb,
- fs)) {
- item_queue_handlers_set_up (fs);
- }
+ task = tracker_sparql_task_new_bulk (file,
+ "DELETE { "
+ " ?f tracker:available true "
+ "}",
+ TRACKER_BULK_MATCH_EQUALS |
+ TRACKER_BULK_MATCH_CHILDREN);
+
+ tracker_sparql_buffer_push (fs->priv->sparql_buffer,
+ task,
+ sparql_buffer_task_finished_cb,
+ fs);
/* SECOND:
* Actually remove all resources. This operation is the one which may take
@@ -1926,20 +1937,19 @@ item_remove (TrackerMinerFS *fs,
*/
/* Add new task to processing pool */
- task = tracker_processing_task_new (file);
- tracker_processing_task_set_bulk_operation (task,
- "DELETE { "
- " ?f a rdfs:Resource "
- "}",
- TRACKER_BULK_MATCH_EQUALS |
- TRACKER_BULK_MATCH_CHILDREN);
-
- /* If push_ready_task() returns FALSE, it means the actual db update was delayed,
- * and in this case we need to setup queue handlers again */
- if (!tracker_processing_pool_push_ready_task (fs->priv->processing_pool,
- task,
- processing_pool_task_finished_cb,
- fs)) {
+ task = tracker_sparql_task_new_bulk (file,
+ "DELETE { "
+ " ?f a rdfs:Resource "
+ "}",
+ TRACKER_BULK_MATCH_EQUALS |
+ TRACKER_BULK_MATCH_CHILDREN);
+
+ tracker_sparql_buffer_push (fs->priv->sparql_buffer,
+ task,
+ sparql_buffer_task_finished_cb,
+ fs);
+
+ if (!tracker_task_pool_limit_reached (TRACKER_TASK_POOL (fs->priv->sparql_buffer))) {
item_queue_handlers_set_up (fs);
}
@@ -2132,7 +2142,7 @@ item_move (TrackerMinerFS *fs,
GFileInfo *file_info;
GString *sparql;
RecursiveMoveData move_data;
- TrackerProcessingTask *task;
+ TrackerTask *task;
gchar *source_iri;
gchar *display_name;
gboolean source_exists;
@@ -2278,16 +2288,15 @@ item_move (TrackerMinerFS *fs,
g_main_loop_unref (move_data.main_loop);
/* Add new task to processing pool */
- task = tracker_processing_task_new (file);
- /* Note that set_sparql_string() takes ownership of the passed string */
- tracker_processing_task_set_sparql_string (task,
- g_string_free (sparql, FALSE));
- /* If push_ready_task() returns FALSE, it means the actual db update was delayed,
- * and in this case we need to setup queue handlers again */
- if (!tracker_processing_pool_push_ready_task (fs->priv->processing_pool,
- task,
- processing_pool_task_finished_cb,
- fs)) {
+ task = tracker_sparql_task_new_take_sparql_str (file,
+ g_string_free (sparql,
+ FALSE));
+ tracker_sparql_buffer_push (fs->priv->sparql_buffer,
+ task,
+ sparql_buffer_task_finished_cb,
+ fs);
+
+ if (!tracker_task_pool_limit_reached (TRACKER_TASK_POOL (fs->priv->sparql_buffer))) {
item_queue_handlers_set_up (fs);
}
@@ -2407,7 +2416,8 @@ should_wait (TrackerMinerFS *fs,
GFile *parent;
/* Is the item already being processed? */
- if (tracker_task_pool_find (fs->priv->task_pool, file)) {
+ if (tracker_task_pool_find (fs->priv->task_pool, file) ||
+ tracker_task_pool_find (TRACKER_TASK_POOL (fs->priv->sparql_buffer), file)) {
/* Yes, a previous event on same item currently
* being processed */
return TRUE;
@@ -2416,7 +2426,8 @@ should_wait (TrackerMinerFS *fs,
/* Is the item's parent being processed right now? */
parent = g_file_get_parent (file);
if (parent) {
- if (tracker_task_pool_find (fs->priv->task_pool, parent)) {
+ if (tracker_task_pool_find (fs->priv->task_pool, parent) ||
+ tracker_task_pool_find (TRACKER_TASK_POOL (fs->priv->sparql_buffer), parent)) {
/* Yes, a previous event on the parent of this item
* currently being processed */
g_object_unref (parent);
@@ -2708,8 +2719,8 @@ item_queue_handlers_cb (gpointer user_data)
* if there was a previous task on the same file we want to
* process now, we want it to get finished before we can go
* on with the queues... */
- tracker_processing_pool_buffer_flush (fs->priv->processing_pool,
- "Queue handlers WAIT");
+ tracker_sparql_buffer_flush (fs->priv->sparql_buffer,
+ "Queue handlers WAIT");
return FALSE;
}
@@ -2819,13 +2830,13 @@ item_queue_handlers_cb (gpointer user_data)
/* Print stats and signal finished */
if (!fs->priv->is_crawling &&
tracker_task_pool_get_size (fs->priv->task_pool) == 0 &&
- tracker_processing_pool_get_total_task_count (fs->priv->processing_pool) == 0) {
+ tracker_task_pool_get_size (TRACKER_TASK_POOL (fs->priv->sparql_buffer)) == 0) {
process_stop (fs);
}
/* Flush any possible pending update here */
- tracker_processing_pool_buffer_flush (fs->priv->processing_pool,
- "Queue handlers NONE");
+ tracker_sparql_buffer_flush (fs->priv->sparql_buffer,
+ "Queue handlers NONE");
tracker_thumbnailer_send ();
tracker_albumart_check_cleanup (tracker_miner_get_connection (TRACKER_MINER (fs)));
@@ -2917,13 +2928,7 @@ item_queue_handlers_set_up (TrackerMinerFS *fs)
return;
}
- /* Already sent max number of requests to tracker-store?
- * In this case, we also slow down the processing of items, as we don't
- * want to keep on extracting if the communication with tracker-store is
- * very busy. Note that this is not very likely to happen, as the bottleneck
- * during extraction is not the communication with tracker-store.
- */
- if (tracker_processing_pool_n_requests_limit_reached (fs->priv->processing_pool)) {
+ if (tracker_task_pool_limit_reached (TRACKER_TASK_POOL (fs->priv->sparql_buffer))) {
return;
}
diff --git a/src/miners/fs/tracker-miner-files.c b/src/miners/fs/tracker-miner-files.c
index cb182ed..73cc62c 100644
--- a/src/miners/fs/tracker-miner-files.c
+++ b/src/miners/fs/tracker-miner-files.c
@@ -2537,7 +2537,6 @@ tracker_miner_files_new (TrackerConfig *config,
"config", config,
"processing-pool-wait-limit", 10,
"processing-pool-ready-limit", 100,
- "processing-pool-requests-limit", 10,
NULL);
}
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]