[tracker/decorator-memory-reduction: 1/3] libtracker-miner: Reduce TrackerDecorator file cache size
- From: Carlos Garnacho <carlosg src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [tracker/decorator-memory-reduction: 1/3] libtracker-miner: Reduce TrackerDecorator file cache size
- Date: Sun, 7 Dec 2014 22:15:56 +0000 (UTC)
commit 8a607959a5289050e9003c64a85e15b71f6695bb
Author: Carlos Garnacho <carlosg gnome org>
Date: Sun Nov 30 13:29:51 2014 +0100
libtracker-miner: Reduce TrackerDecorator file cache size
Instead of keeping all items in memory, a small cache is kept of upcoming
files to be processed, querying the database again for files missing the
nie:dataSource when the cache has been flushed. When no further items are
processed, the miner is stopped. Whenever the prioritized rdf:types list
changes, the cache is dropped, just to query again with the new order kept
into account.
This reduces the potential memory usage of tracker-extract, it would
previously would grow linearly with the number of files left to be
inspected, which might bring in some memory pressure on certain scenarios.
On the downside, this obviously makes the decorator a little bit slower,
due to more queries happening (as opposed to a huge one), and at times the
decorator is idle waiting to know what else to do.
src/libtracker-miner/Makefile.am | 1 -
src/libtracker-miner/tracker-decorator-fs.c | 26 +-
src/libtracker-miner/tracker-decorator-internal.h | 32 -
src/libtracker-miner/tracker-decorator.c | 1321 ++++++++++++---------
4 files changed, 758 insertions(+), 622 deletions(-)
---
diff --git a/src/libtracker-miner/Makefile.am b/src/libtracker-miner/Makefile.am
index 5df74a3..eafb3ca 100644
--- a/src/libtracker-miner/Makefile.am
+++ b/src/libtracker-miner/Makefile.am
@@ -65,7 +65,6 @@ miner_sources = \
tracker-data-provider.h \
tracker-decorator.c \
tracker-decorator.h \
- tracker-decorator-internal.h \
tracker-decorator-fs.c \
tracker-decorator-fs.h \
tracker-enumerator.c \
diff --git a/src/libtracker-miner/tracker-decorator-fs.c b/src/libtracker-miner/tracker-decorator-fs.c
index d2e4470..7ed097e 100644
--- a/src/libtracker-miner/tracker-decorator-fs.c
+++ b/src/libtracker-miner/tracker-decorator-fs.c
@@ -24,7 +24,6 @@
#include <libtracker-sparql/tracker-sparql.h>
#include "tracker-decorator-fs.h"
-#include "tracker-decorator-internal.h"
#define TRACKER_DECORATOR_FS_GET_PRIVATE(o) (G_TYPE_INSTANCE_GET_PRIVATE ((o), TRACKER_TYPE_DECORATOR_FS,
TrackerDecoratorFSPrivate))
@@ -130,6 +129,31 @@ remove_files_cb (GObject *object,
}
static void
+_tracker_decorator_query_append_rdf_type_filter (TrackerDecorator *decorator,
+ GString *query)
+{
+ const gchar **class_names;
+ gint i = 0;
+
+ class_names = tracker_decorator_get_class_names (decorator);
+
+ if (!class_names || !*class_names)
+ return;
+
+ g_string_append (query, "&& ?type IN (");
+
+ while (class_names[i]) {
+ if (i != 0)
+ g_string_append (query, ",");
+
+ g_string_append (query, class_names[i]);
+ i++;
+ }
+
+ g_string_append (query, ") ");
+}
+
+static void
check_files (TrackerDecorator *decorator,
const gchar *mount_point_urn,
gboolean available,
diff --git a/src/libtracker-miner/tracker-decorator.c b/src/libtracker-miner/tracker-decorator.c
index 28cdf4a..5832207 100644
--- a/src/libtracker-miner/tracker-decorator.c
+++ b/src/libtracker-miner/tracker-decorator.c
@@ -20,11 +20,10 @@
#include "config.h"
#include "tracker-decorator.h"
-#include "tracker-decorator-internal.h"
#include "tracker-priority-queue.h"
#define QUERY_BATCH_SIZE 100
-#define DEFAULT_BATCH_SIZE 100
+#define DEFAULT_BATCH_SIZE 200
#define TRACKER_DECORATOR_GET_PRIVATE(o) (G_TYPE_INSTANCE_GET_PRIVATE ((o), TRACKER_TYPE_DECORATOR,
TrackerDecoratorPrivate))
@@ -43,42 +42,59 @@
**/
typedef struct _TrackerDecoratorPrivate TrackerDecoratorPrivate;
-typedef struct _ElemNode ElemNode;
+typedef struct _SparqlUpdate SparqlUpdate;
+typedef struct _ClassInfo ClassInfo;
struct _TrackerDecoratorInfo {
GTask *task;
gchar *urn;
gchar *url;
gchar *mimetype;
+ gint id;
gint ref_count;
};
-struct _ElemNode {
- TrackerDecoratorInfo *info;
+struct _ClassInfo {
+ gchar *class_name;
+ gint class_id;
+ gint priority;
+};
+
+struct _SparqlUpdate {
+ gchar *sparql;
gint id;
- gint class_name_id;
- gboolean prepend;
};
struct _TrackerDecoratorPrivate {
guint graph_updated_signal_id;
gchar *data_source;
- GStrv class_names;
- TrackerPriorityQueue *elem_queue;
- GHashTable *elems;
- GPtrArray *sparql_buffer;
+ GArray *classes; /* Array of ClassInfo */
+ gchar **class_names;
+
+ gssize n_remaining_items;
+ gssize n_processed_items;
+
+ GQueue item_cache; /* Queue of TrackerDecoratorInfo */
+
+ /* Arrays of tracker IDs */
+ GArray *prepended_ids;
+ GSequence *blacklist_items;
+
+ GHashTable *tasks; /* Associative array of GTasks */
+ GArray *sparql_buffer; /* Array of SparqlUpdate */
+ GArray *commit_buffer; /* Array of SparqlUpdate */
GTimer *timer;
- GQueue next_elem_queue;
+ GQueue next_elem_queue; /* Queue of incoming tasks */
- GArray *class_name_ids;
- GArray *priority_class_name_ids;
gint rdf_type_id;
gint nie_data_source_id;
gint data_source_id;
gint batch_size;
- gint stats_n_elems;
+ guint processing : 1;
+ guint querying : 1;
+ guint validated : 1;
};
enum {
@@ -99,22 +115,41 @@ static GInitableIface *parent_initable_iface;
static void tracker_decorator_initable_iface_init (GInitableIface *iface);
+static void decorator_task_done (GObject *object,
+ GAsyncResult *result,
+ gpointer user_data);
+static void decorator_cache_next_items (TrackerDecorator *decorator);
+static gboolean decorator_check_commit (TrackerDecorator *decorator);
+
G_DEFINE_QUARK (TrackerDecoratorError, tracker_decorator_error)
G_DEFINE_ABSTRACT_TYPE_WITH_CODE (TrackerDecorator, tracker_decorator, TRACKER_TYPE_MINER,
G_IMPLEMENT_INTERFACE (G_TYPE_INITABLE,
tracker_decorator_initable_iface_init))
static TrackerDecoratorInfo *
-tracker_decorator_info_new (TrackerSparqlCursor *cursor)
+tracker_decorator_info_new (TrackerDecorator *decorator,
+ TrackerSparqlCursor *cursor)
{
+ TrackerSparqlBuilder *sparql;
TrackerDecoratorInfo *info;
+ GCancellable *cancellable;
info = g_slice_new0 (TrackerDecoratorInfo);
info->urn = g_strdup (tracker_sparql_cursor_get_string (cursor, 0, NULL));
+ info->id = tracker_sparql_cursor_get_integer (cursor, 1);
info->url = g_strdup (tracker_sparql_cursor_get_string (cursor, 2, NULL));
info->mimetype = g_strdup (tracker_sparql_cursor_get_string (cursor, 3, NULL));
info->ref_count = 1;
+ cancellable = g_cancellable_new ();
+ info->task = g_task_new (decorator, cancellable,
+ decorator_task_done, info);
+ g_object_unref (cancellable);
+
+ sparql = tracker_sparql_builder_new_update ();
+ g_task_set_task_data (info->task, sparql,
+ (GDestroyNotify) g_object_unref);
+
return info;
}
@@ -144,6 +179,46 @@ G_DEFINE_BOXED_TYPE (TrackerDecoratorInfo,
tracker_decorator_info_ref,
tracker_decorator_info_unref)
+static gint
+sequence_compare_func (gconstpointer data1,
+ gconstpointer data2,
+ gpointer user_data)
+{
+ return GPOINTER_TO_INT (data1) - GPOINTER_TO_INT (data2);
+}
+
+static void
+decorator_blacklist_add (TrackerDecorator *decorator,
+ gint id)
+{
+ TrackerDecoratorPrivate *priv = decorator->priv;
+ GSequenceIter *iter;
+
+ iter = g_sequence_search (priv->blacklist_items,
+ GINT_TO_POINTER (id),
+ sequence_compare_func,
+ NULL);
+
+ if (g_sequence_iter_is_end (iter) ||
+ g_sequence_get (g_sequence_iter_prev (iter)) != GINT_TO_POINTER (id))
+ g_sequence_insert_before (iter, GINT_TO_POINTER (id));
+}
+
+static void
+decorator_blacklist_remove (TrackerDecorator *decorator,
+ gint id)
+{
+ TrackerDecoratorPrivate *priv = decorator->priv;
+ GSequenceIter *iter;
+
+ iter = g_sequence_lookup (priv->blacklist_items,
+ GINT_TO_POINTER (id),
+ sequence_compare_func,
+ NULL);
+ if (iter)
+ g_sequence_remove (iter);
+}
+
static void
decorator_update_state (TrackerDecorator *decorator,
const gchar *message,
@@ -152,27 +227,24 @@ decorator_update_state (TrackerDecorator *decorator,
TrackerDecoratorPrivate *priv;
gint remaining_time = -1;
gdouble progress = 1;
- guint length;
+ gsize total_items;
priv = decorator->priv;
- length = tracker_priority_queue_get_length (priv->elem_queue);
+ remaining_time = 0;
+ total_items = priv->n_remaining_items + priv->n_processed_items;
- if (length > 0) {
- progress = 1 - ((gdouble) length / priv->stats_n_elems);
- remaining_time = 0;
- }
+ if (priv->n_remaining_items > 0)
+ progress = ((gdouble) priv->n_processed_items / total_items);
if (priv->timer && estimate_time &&
!tracker_miner_is_paused (TRACKER_MINER (decorator))) {
gdouble elapsed;
- gint elems_done;
/* FIXME: Quite naive calculation */
elapsed = g_timer_elapsed (priv->timer, NULL);
- elems_done = priv->stats_n_elems - length;
- if (elems_done > 0)
- remaining_time = (length * elapsed) / elems_done;
+ if (priv->n_processed_items > 0)
+ remaining_time = (priv->n_remaining_items * elapsed) / priv->n_processed_items;
}
g_object_set (decorator,
@@ -184,270 +256,609 @@ decorator_update_state (TrackerDecorator *decorator,
g_object_set (decorator, "status", message, NULL);
}
-static gboolean
-class_name_array_contains (GArray *array,
- gint id)
+static void
+decorator_commit_cb (GObject *object,
+ GAsyncResult *result,
+ gpointer user_data)
{
+ TrackerSparqlConnection *conn;
+ TrackerDecoratorPrivate *priv;
+ TrackerDecorator *decorator;
+ GError *error = NULL;
+ GPtrArray *errors;
guint i;
- for (i = 0; i < array->len; i++) {
- if (id == g_array_index (array, gint, i))
- return TRUE;
+ decorator = user_data;
+ priv = decorator->priv;
+ conn = TRACKER_SPARQL_CONNECTION (object);
+ errors = tracker_sparql_connection_update_array_finish (conn, result, &error);
+
+ if (error) {
+ g_warning ("There was an error pushing metadata: %s\n", error->message);
}
- return FALSE;
+ if (errors) {
+ for (i = 0; i < errors->len; i++) {
+ SparqlUpdate *update;
+ GError *child_error;
+
+ child_error = g_ptr_array_index (errors, i);
+ update = &g_array_index (priv->commit_buffer, SparqlUpdate, i);
+
+ if (child_error) {
+ decorator_blacklist_add (decorator, update->id);
+ g_warning ("Task %d, error: %s\n"
+ "Sparql was:\n%s\n",
+ i, child_error->message,
+ update->sparql);
+ }
+ }
+
+ g_ptr_array_unref (errors);
+ }
+
+ g_clear_pointer (&priv->commit_buffer, (GDestroyNotify) g_array_unref);
+
+ if (!decorator_check_commit (decorator))
+ decorator_cache_next_items (decorator);
}
-static gint
-elem_node_get_priority (TrackerDecorator *decorator,
- ElemNode *node)
+static void
+sparql_update_clear (SparqlUpdate *update)
{
+ g_free (update->sparql);
+}
+
+static GArray *
+sparql_buffer_new (void)
+{
+ GArray *array;
+
+ array = g_array_new (FALSE, FALSE, sizeof (SparqlUpdate));
+ g_array_set_clear_func (array, (GDestroyNotify) sparql_update_clear);
+
+ return array;
+}
+
+static gboolean
+decorator_commit_info (TrackerDecorator *decorator)
+{
+ TrackerSparqlConnection *sparql_conn;
TrackerDecoratorPrivate *priv;
- gboolean prior;
+ GPtrArray *array;
+ gint i;
priv = decorator->priv;
- /* We want [prepend and prior, prior, prepend, the rest] */
- prior = class_name_array_contains (priv->priority_class_name_ids,
- node->class_name_id);
- if (prior && node->prepend)
- return G_PRIORITY_HIGH - 1;
- else if (prior)
- return G_PRIORITY_HIGH;
- else if (node->prepend)
- return G_PRIORITY_HIGH + 1;
+ if (!priv->sparql_buffer || priv->sparql_buffer->len == 0)
+ return FALSE;
+
+ if (priv->commit_buffer)
+ return FALSE;
+
+ /* Move sparql buffer to commit buffer */
+ priv->commit_buffer = priv->sparql_buffer;
+ priv->sparql_buffer = NULL;
+ array = g_ptr_array_new ();
+
+ for (i = 0; i < priv->commit_buffer->len; i++) {
+ SparqlUpdate *update;
+
+ update = &g_array_index (priv->commit_buffer, SparqlUpdate, i);
+ g_ptr_array_add (array, update->sparql);
+ }
- return G_PRIORITY_DEFAULT;
+ sparql_conn = tracker_miner_get_connection (TRACKER_MINER (decorator));
+ tracker_sparql_connection_update_array_async (sparql_conn,
+ (gchar **) array->pdata,
+ array->len,
+ G_PRIORITY_DEFAULT,
+ NULL,
+ decorator_commit_cb,
+ decorator);
+
+ decorator_update_state (decorator, NULL, TRUE);
+ g_ptr_array_unref (array);
+ return TRUE;
}
-static void
-element_add (TrackerDecorator *decorator,
- gint id,
- gint class_name_id,
- gboolean prepend)
+static gboolean
+decorator_check_commit (TrackerDecorator *decorator)
{
TrackerDecoratorPrivate *priv;
- gboolean first_elem;
- ElemNode *node;
- GList *elem;
priv = decorator->priv;
- if (g_hash_table_contains (priv->elems, GINT_TO_POINTER (id)))
+ if (!priv->sparql_buffer ||
+ (priv->n_remaining_items > 0 &&
+ priv->sparql_buffer->len < (guint) priv->batch_size))
+ return FALSE;
+
+ return decorator_commit_info (decorator);
+}
+
+static void
+decorator_notify_task_error (TrackerDecorator *decorator,
+ GError *error)
+{
+ TrackerDecoratorPrivate *priv = decorator->priv;
+ GTask *task;
+
+ while (!g_queue_is_empty (&priv->next_elem_queue)) {
+ task = g_queue_pop_head (&priv->next_elem_queue);
+ g_task_return_error (task, g_error_copy (error));
+ g_object_unref (task);
+ }
+}
+
+static void
+decorator_notify_empty (TrackerDecorator *decorator)
+{
+ GError *error;
+
+ error = g_error_new (tracker_decorator_error_quark (),
+ TRACKER_DECORATOR_ERROR_EMPTY,
+ "There are no items left");
+ decorator_notify_task_error (decorator, error);
+ g_error_free (error);
+}
+
+static void
+decorator_start (TrackerDecorator *decorator)
+{
+ TrackerDecoratorPrivate *priv = decorator->priv;
+
+ if (priv->processing)
return;
- first_elem = g_hash_table_size (priv->elems) == 0;
- node = g_new0 (ElemNode, 1);
- node->id = id;
- node->class_name_id = class_name_id;
- node->prepend = prepend;
+ priv->processing = TRUE;
+ g_signal_emit (decorator, signals[ITEMS_AVAILABLE], 0);
+ decorator_update_state (decorator, "Extracting metadata", TRUE);
+}
- elem = tracker_priority_queue_add (priv->elem_queue, node,
- elem_node_get_priority (decorator, node));
+static void
+decorator_finish (TrackerDecorator *decorator)
+{
+ TrackerDecoratorPrivate *priv = decorator->priv;
- g_hash_table_insert (priv->elems, GINT_TO_POINTER (id), elem);
- priv->stats_n_elems++;
+ if (!priv->processing)
+ return;
- if (first_elem) {
- g_signal_emit (decorator, signals[ITEMS_AVAILABLE], 0);
- decorator_update_state (decorator, "Extracting metadata", TRUE);
- }
+ priv->processing = FALSE;
+ priv->n_remaining_items = priv->n_processed_items = 0;
+ g_signal_emit (decorator, signals[FINISHED], 0);
+ decorator_commit_info (decorator);
+ decorator_notify_empty (decorator);
+ decorator_update_state (decorator, "Idle", FALSE);
}
+static void
+decorator_rebuild_cache (TrackerDecorator *decorator)
+{
+ TrackerDecoratorPrivate *priv = decorator->priv;
+
+ priv->n_remaining_items = 0;
+ g_queue_foreach (&priv->item_cache,
+ (GFunc) tracker_decorator_info_unref, NULL);
+ g_queue_clear (&priv->item_cache);
-static void decorator_commit_info (TrackerDecorator *decorator);
+ decorator_cache_next_items (decorator);
+}
+/* This function is called after the caller has completed the
+ * GTask given on the TrackerDecoratorInfo, this definitely removes
+ * the element being processed from queues.
+ */
static void
-element_remove_link (TrackerDecorator *decorator,
- GList *elem_link,
- gboolean emit)
+decorator_task_done (GObject *object,
+ GAsyncResult *result,
+ gpointer user_data)
{
+ TrackerDecorator *decorator = TRACKER_DECORATOR (object);
+ TrackerDecoratorInfo *info = user_data;
TrackerDecoratorPrivate *priv;
- ElemNode *node;
priv = decorator->priv;
- node = elem_link->data;
- if (node->info && node->info->task) {
- /* A GTask is running on this element, cancel it
- * and wait for the task callback to delete this node.
- */
- g_cancellable_cancel (g_task_get_cancellable (node->info->task));
- return;
+ if (g_task_had_error (G_TASK (result))) {
+ GError *error = NULL;
+
+ /* Blacklist item */
+ decorator_blacklist_add (decorator, info->id);
+ g_task_propagate_pointer (G_TASK (result), &error);
+ g_warning ("Task for '%s' finished with error: %s\n",
+ info->url, error->message);
+ g_error_free (error);
+ } else {
+ TrackerSparqlBuilder *sparql;
+ SparqlUpdate update;
+
+ /* Add resulting sparql to buffer and check whether flushing */
+ sparql = g_task_get_task_data (G_TASK (result));
+ update.sparql = g_strdup (tracker_sparql_builder_get_result (sparql));
+ update.id = info->id;
+
+ if (!priv->sparql_buffer)
+ priv->sparql_buffer = sparql_buffer_new ();
+
+ g_array_append_val (priv->sparql_buffer, update);
}
- tracker_priority_queue_remove_node (priv->elem_queue, elem_link);
- g_hash_table_remove (priv->elems, GINT_TO_POINTER (node->id));
+ g_hash_table_remove (priv->tasks, result);
- if (emit && g_hash_table_size (priv->elems) == 0) {
- /* Flush any remaining Sparql updates */
- decorator_commit_info (decorator);
+ if (priv->n_remaining_items > 0)
+ priv->n_remaining_items--;
+ priv->n_processed_items++;
- g_signal_emit (decorator, signals[FINISHED], 0);
- decorator_update_state (decorator, "Idle", FALSE);
- priv->stats_n_elems = 0;
+ decorator_check_commit (decorator);
+
+ if (priv->n_remaining_items == 0) {
+ decorator_finish (decorator);
+ decorator_rebuild_cache (decorator);
+ } else if (g_queue_is_empty (&priv->item_cache) &&
+ g_hash_table_size (priv->tasks) == 0 &&
+ (!priv->sparql_buffer || !priv->commit_buffer)) {
+ decorator_cache_next_items (decorator);
}
+}
+
+static void
+decorator_cancel_active_tasks (TrackerDecorator *decorator)
+{
+ TrackerDecoratorPrivate *priv = decorator->priv;
+ GHashTableIter iter;
+ GTask *task;
- if (node->info)
- tracker_decorator_info_unref (node->info);
+ g_hash_table_iter_init (&iter, priv->tasks);
+
+ while (g_hash_table_iter_next (&iter, NULL, (gpointer*) &task)) {
+ g_cancellable_cancel (g_task_get_cancellable (task));
+ }
- g_free (node);
+ g_hash_table_remove_all (priv->tasks);
}
static void
-element_remove_by_id (TrackerDecorator *decorator,
- gint id)
+query_append_id (GString *string,
+ gint id)
{
- TrackerDecoratorPrivate *priv;
- GList *elem_link;
+ if (string->len > 1 && string->str[string->len - 1] != '(')
+ g_string_append_c (string, ',');
- priv = decorator->priv;
- elem_link = g_hash_table_lookup (priv->elems, GINT_TO_POINTER (id));
+ g_string_append_printf (string, "%d", id);
+}
+
+static void
+query_add_blacklisted_filter (TrackerDecorator *decorator,
+ GString *query)
+{
+ TrackerDecoratorPrivate *priv = decorator->priv;
+ GSequenceIter *iter;
- if (!elem_link)
+ if (g_sequence_get_length (priv->blacklist_items) == 0)
return;
- element_remove_link (decorator, elem_link, TRUE);
+ g_string_append (query, "&& tracker:id(?urn) NOT IN (");
+
+ iter = g_sequence_get_begin_iter (priv->blacklist_items);
+
+ while (!g_sequence_iter_is_end (iter)) {
+ query_append_id (query, GPOINTER_TO_INT (g_sequence_get (iter)));
+ iter = g_sequence_iter_next (iter);
+ }
+
+ g_string_append (query, ")");
}
static void
-decorator_commit_cb (GObject *object,
- GAsyncResult *result,
- gpointer user_data)
+query_add_update_buffer_ids (GString *query,
+ GArray *commit_buffer)
{
- TrackerSparqlConnection *conn;
- GPtrArray *errors, *sparql;
- GError *error = NULL;
- guint i;
+ SparqlUpdate *update;
+ gint i;
- sparql = user_data;
- conn = TRACKER_SPARQL_CONNECTION (object);
- errors = tracker_sparql_connection_update_array_finish (conn, result, &error);
+ for (i = 0; i < commit_buffer->len; i++) {
+ update = &g_array_index (commit_buffer, SparqlUpdate, i);
+ query_append_id (query, update->id);
+ }
+}
- if (error) {
- g_warning ("There was an error pushing metadata: %s\n", error->message);
+static void
+query_add_processing_filter (TrackerDecorator *decorator,
+ GString *query)
+{
+ TrackerDecoratorPrivate *priv = decorator->priv;
+
+ if ((!priv->sparql_buffer || priv->sparql_buffer->len == 0) &&
+ (!priv->commit_buffer || priv->commit_buffer->len == 0))
+ return;
+
+ g_string_append (query, "&& tracker:id(?urn) NOT IN (");
+
+ if (priv->sparql_buffer && priv->sparql_buffer->len > 0)
+ query_add_update_buffer_ids (query, priv->sparql_buffer);
+ if (priv->commit_buffer && priv->commit_buffer->len > 0)
+ query_add_update_buffer_ids (query, priv->commit_buffer);
+
+ g_string_append (query, ")");
+}
+
+static void
+query_add_id_filter (GString *query,
+ GArray *ids)
+{
+ gint i;
+
+ if (!ids || ids->len == 0)
+ return;
+
+ g_string_append (query, "&& tracker:id(?urn) IN (");
+
+ for (i = 0; i < ids->len; i++) {
+ if (i != 0)
+ g_string_append (query, ",");
+
+ g_string_append_printf (query, "%d",
+ g_array_index (ids, gint, i));
}
- if (errors) {
- for (i = 0; i < errors->len; i++) {
- GError *child_error;
+ g_string_append (query, ")");
+}
- child_error = g_ptr_array_index (errors, i);
+static void
+query_append_current_tasks_filter (TrackerDecorator *decorator,
+ GString *query)
+{
+ TrackerDecoratorPrivate *priv = decorator->priv;
+ GHashTableIter iter;
+ gint i = 0, id;
+ GTask *task;
- if (child_error) {
- g_warning ("Task %d, error: %s", i, child_error->message);
- g_warning ("Sparql update was:\n%s\n",
- (gchar *) g_ptr_array_index (sparql, i));
- }
+ if (g_hash_table_size (priv->tasks) == 0)
+ return;
+
+ g_string_append (query, "&& tracker:id(?urn) NOT IN (");
+ g_hash_table_iter_init (&iter, priv->tasks);
+
+ while (g_hash_table_iter_next (&iter, NULL, (gpointer *) &task)) {
+ if (i != 0)
+ g_string_append (query, ",");
+
+ id = GPOINTER_TO_INT (g_task_get_task_data (task));
+ g_string_append_printf (query, "%d", id);
+ i++;
+ }
+
+ g_string_append (query, ")");
+}
+
+static gchar *
+create_query_string (TrackerDecorator *decorator,
+ gchar **select_clauses,
+ gboolean for_prepended)
+{
+ TrackerDecoratorPrivate *priv = decorator->priv;
+ ClassInfo *prev = NULL, *cur;
+ GString *query;
+ gint i;
+
+ if (!priv->validated)
+ return NULL;
+
+ query = g_string_new ("SELECT ");
+
+ for (i = 0; select_clauses[i]; i++) {
+ g_string_append_printf (query, "%s ", select_clauses[i]);
+ }
+
+ g_string_append (query, "{ SELECT ?urn WHERE {");
+
+ for (i = 0; i < priv->classes->len; i++) {
+ cur = &g_array_index (priv->classes, ClassInfo, i);
+
+ if (!prev || prev->priority != cur->priority) {
+ if (prev)
+ g_string_append (query, "))} UNION ");
+
+ g_string_append_printf (query,
+ "{ ?urn a rdfs:Resource;"
+ " a ?type ;"
+ " tracker:available true ."
+ " FILTER (! EXISTS { ?urn nie:dataSource <%s> } ",
+ priv->data_source);
+
+ query_add_blacklisted_filter (decorator, query);
+ query_add_processing_filter (decorator, query);
+
+ if (for_prepended && priv->prepended_ids->len > 0)
+ query_add_id_filter (query, priv->prepended_ids);
+
+ query_append_current_tasks_filter (decorator, query);
+ g_string_append (query, " && ?type IN (");
+ } else {
+ g_string_append (query, ",");
}
- g_ptr_array_unref (errors);
+ g_string_append_printf (query, "%s", cur->class_name);
+ prev = cur;
}
- g_ptr_array_unref (sparql);
+ g_string_append_printf (query, "))}}} LIMIT %d", QUERY_BATCH_SIZE);
+
+ return g_string_free (query, FALSE);
+}
+
+static gchar *
+create_remaining_items_query (TrackerDecorator *decorator)
+{
+ gchar *clauses[] = {
+ "?urn",
+ "tracker:id(?urn)",
+ "nie:url(?urn)",
+ "nie:mimeType(?urn)",
+ NULL
+ };
+
+ return create_query_string (decorator, clauses, TRUE);
}
static void
-decorator_commit_info (TrackerDecorator *decorator)
+decorator_query_remaining_items_cb (GObject *object,
+ GAsyncResult *result,
+ gpointer user_data)
{
- TrackerSparqlConnection *sparql_conn;
+ TrackerDecorator *decorator = user_data;
TrackerDecoratorPrivate *priv;
- GPtrArray *array;
+ TrackerSparqlCursor *cursor;
+ GError *error = NULL;
+ cursor = tracker_sparql_connection_query_finish (TRACKER_SPARQL_CONNECTION (object),
+ result, &error);
priv = decorator->priv;
+ priv->querying = FALSE;
- if (priv->sparql_buffer->len == 0)
+ if (error || !tracker_sparql_cursor_next (cursor, NULL, &error)) {
+ decorator_notify_task_error (decorator, error);
+ g_error_free (error);
return;
+ }
- array = priv->sparql_buffer;
- priv->sparql_buffer = g_ptr_array_new_with_free_func (g_free);
+ priv->n_remaining_items = g_queue_get_length (&priv->item_cache) +
+ tracker_sparql_cursor_get_integer (cursor, 0);
+ g_object_unref (cursor);
- sparql_conn = tracker_miner_get_connection (TRACKER_MINER (decorator));
- tracker_sparql_connection_update_array_async (sparql_conn,
- (gchar **) array->pdata,
- array->len,
- G_PRIORITY_DEFAULT,
- NULL,
- decorator_commit_cb,
- array);
+ g_debug ("Found %ld items to extract", priv->n_remaining_items);
- decorator_update_state (decorator, NULL, TRUE);
+ if (priv->n_remaining_items > 0)
+ decorator_cache_next_items (decorator);
+ else
+ decorator_finish (decorator);
}
static void
-decorator_check_commit (TrackerDecorator *decorator)
+decorator_query_remaining_items (TrackerDecorator *decorator)
{
- TrackerDecoratorPrivate *priv;
+ gchar *query, *clauses[] = { "COUNT(?urn)", NULL };
+ TrackerSparqlConnection *sparql_conn;
- priv = decorator->priv;
+ query = create_query_string (decorator, clauses, FALSE);
- if (priv->sparql_buffer->len < (guint) priv->batch_size)
- return;
+ if (query) {
+ sparql_conn = tracker_miner_get_connection (TRACKER_MINER (decorator));
+ tracker_sparql_connection_query_async (sparql_conn, query,
+ NULL, decorator_query_remaining_items_cb,
+ decorator);
+ g_free (query);
+ } else {
+ decorator_notify_empty (decorator);
+ }
+}
- decorator_commit_info (decorator);
+static void
+decorator_pair_tasks (TrackerDecorator *decorator)
+{
+ TrackerDecoratorPrivate *priv = decorator->priv;
+ TrackerDecoratorInfo *info;
+ GTask *task;
+
+ while (!g_queue_is_empty (&priv->item_cache) &&
+ !g_queue_is_empty (&priv->next_elem_queue)) {
+ info = g_queue_pop_head (&priv->item_cache);
+ task = g_queue_pop_head (&priv->next_elem_queue);
+
+ g_task_set_task_data (task, GINT_TO_POINTER (info->id), NULL);
+
+ /* Pass ownership of info */
+ g_task_return_pointer (task, info,
+ (GDestroyNotify) tracker_decorator_info_unref);
+ g_object_unref (task);
+
+ /* Store the decorator-side task in the active task pool */
+ g_hash_table_add (priv->tasks, info->task);
+ }
}
-/* This function is called after the caller has completed the
- * GTask given on the TrackerDecoratorInfo, this definitely removes
- * the element being processed from queues.
- */
static void
-decorator_task_done (GObject *object,
- GAsyncResult *result,
- gpointer user_data)
+decorator_item_cache_remove (TrackerDecorator *decorator,
+ gint id)
{
- TrackerDecorator *decorator = TRACKER_DECORATOR (object);
+ TrackerDecoratorPrivate *priv = decorator->priv;
+ GList *item;
+
+ for (item = g_queue_peek_head_link (&priv->item_cache);
+ item; item = item->next) {
+ TrackerDecoratorInfo *info = item->data;
+
+ if (info->id != id)
+ continue;
+
+ g_queue_remove (&priv->item_cache, info);
+ tracker_decorator_info_unref (info);
+ }
+}
+
+static void
+decorator_cache_items_cb (GObject *object,
+ GAsyncResult *result,
+ gpointer user_data)
+{
+ TrackerDecorator *decorator = user_data;
TrackerDecoratorPrivate *priv;
- ElemNode *node = user_data;
+ TrackerSparqlConnection *conn;
+ TrackerSparqlCursor *cursor;
+ TrackerDecoratorInfo *info;
+ GError *error = NULL;
+ conn = TRACKER_SPARQL_CONNECTION (object);
+ cursor = tracker_sparql_connection_query_finish (conn, result, &error);
priv = decorator->priv;
+ priv->querying = FALSE;
- if (g_task_had_error (G_TASK (result))) {
- GError *error = NULL;
-
- g_task_propagate_pointer (G_TASK (result), &error);
- g_warning ("Task for '%s' finished with error: %s\n",
- node->info->url, error->message);
+ if (error) {
+ decorator_notify_task_error (decorator, error);
g_error_free (error);
} else {
- TrackerSparqlBuilder *sparql;
-
- /* Add resulting sparql to buffer and check whether flushing */
- sparql = g_task_get_task_data (G_TASK (result));
- g_ptr_array_add (priv->sparql_buffer,
- g_strdup (tracker_sparql_builder_get_result (sparql)));
+ while (tracker_sparql_cursor_next (cursor, NULL, NULL)) {
+ info = tracker_decorator_info_new (decorator, cursor);
+ g_queue_push_tail (&priv->item_cache, info);
+ }
+ }
- decorator_check_commit (decorator);
+ if (!g_queue_is_empty (&priv->item_cache) && !priv->processing) {
+ decorator_start (decorator);
+ } else if (g_queue_is_empty (&priv->item_cache) && priv->processing) {
+ decorator_finish (decorator);
}
- /* Detach task first, so the node is removed for good */
- g_clear_object (&node->info->task);
- element_remove_by_id (decorator, node->id);
+ decorator_pair_tasks (decorator);
+ g_object_unref (cursor);
}
static void
-element_ensure_task (ElemNode *node,
- TrackerDecorator *decorator)
+decorator_cache_next_items (TrackerDecorator *decorator)
{
- TrackerSparqlBuilder *sparql;
- TrackerDecoratorInfo *info;
- GCancellable *cancellable;
-
- g_return_if_fail (node->info != NULL);
-
- info = node->info;
+ TrackerDecoratorPrivate *priv = decorator->priv;
- if (info->task)
+ if (priv->querying ||
+ g_hash_table_size (priv->tasks) > 0 ||
+ !g_queue_is_empty (&priv->item_cache))
return;
- cancellable = g_cancellable_new ();
- info->task = g_task_new (decorator, cancellable,
- decorator_task_done, node);
- g_object_unref (cancellable);
+ priv->querying = TRUE;
- sparql = tracker_sparql_builder_new_update ();
- g_task_set_task_data (info->task, sparql,
- (GDestroyNotify) g_object_unref);
+ if (priv->n_remaining_items == 0) {
+ decorator_query_remaining_items (decorator);
+ } else {
+ TrackerSparqlConnection *sparql_conn;
+ gchar *query;
+
+ sparql_conn = tracker_miner_get_connection (TRACKER_MINER (decorator));
+ query = create_remaining_items_query (decorator);
+ tracker_sparql_connection_query_async (sparql_conn, query,
+ NULL, decorator_cache_items_cb,
+ decorator);
+ g_free (query);
+ }
}
static gint
@@ -486,51 +897,29 @@ get_class_id (TrackerSparqlConnection *conn,
}
static void
-tracker_decorator_validate_class_ids (TrackerDecorator *decorator,
- const GStrv class_names)
+tracker_decorator_validate_class_ids (TrackerDecorator *decorator)
{
TrackerSparqlConnection *sparql_conn;
TrackerDecoratorPrivate *priv;
- GPtrArray *strings;
+ ClassInfo *info;
+ GArray *array;
gint i = 0;
priv = decorator->priv;
sparql_conn = tracker_miner_get_connection (TRACKER_MINER (decorator));
-
- if (!sparql_conn) {
- /* Copy as-is and postpone validation */
- g_strfreev (priv->class_names);
- priv->class_names = g_strdupv (class_names);
- return;
- }
-
- if (priv->class_name_ids->len > 0)
- g_array_remove_range (priv->class_name_ids, 0,
- priv->class_name_ids->len);
-
- strings = g_ptr_array_new ();
- if (class_names) {
- while (class_names[i]) {
- gchar *copy;
- gint id;
-
- id = get_class_id (sparql_conn, class_names[i], FALSE);
-
- if (id >= 0) {
- copy = g_strdup (class_names[i]);
- g_ptr_array_add (strings, copy);
- g_array_append_val (priv->class_name_ids, id);
- }
-
- i++;
+ array = g_array_new (TRUE, FALSE, sizeof (gchar*));
+
+ for (i = 0; i < priv->classes->len; i++) {
+ info = &g_array_index (priv->classes, ClassInfo, i);
+ info->class_id = get_class_id (sparql_conn,
+ info->class_name, FALSE);
+ if (info->class_id > 0) {
+ priv->validated = TRUE;
+ g_array_append_val (array, info->class_name);
}
}
- g_ptr_array_add (strings, NULL);
-
- g_strfreev (priv->class_names);
- priv->class_names = (GStrv) g_ptr_array_free (strings, FALSE);
- g_object_notify (G_OBJECT (decorator), "class-names");
+ priv->class_names = (gchar **) g_array_free (array, FALSE);
}
static void
@@ -559,6 +948,36 @@ tracker_decorator_get_property (GObject *object,
}
static void
+decorator_add_class (TrackerDecorator *decorator,
+ const gchar *class)
+{
+ TrackerDecoratorPrivate *priv = decorator->priv;
+ ClassInfo info;
+
+ info.class_name = g_strdup (class);
+ info.class_id = -1;
+ info.priority = G_PRIORITY_DEFAULT;
+ g_array_append_val (priv->classes, info);
+}
+
+static void
+decorator_set_classes (TrackerDecorator *decorator,
+ const gchar **classes)
+{
+ TrackerDecoratorPrivate *priv = decorator->priv;
+ gint i;
+
+ if (priv->classes->len > 0) {
+ g_array_remove_range (priv->classes, 0,
+ priv->classes->len);
+ }
+
+ for (i = 0; classes[i]; i++) {
+ decorator_add_class (decorator, classes[i]);
+ }
+}
+
+static void
tracker_decorator_set_property (GObject *object,
guint param_id,
const GValue *value,
@@ -574,8 +993,7 @@ tracker_decorator_set_property (GObject *object,
priv->data_source = g_value_dup_string (value);
break;
case PROP_CLASS_NAMES:
- tracker_decorator_validate_class_ids (decorator,
- g_value_get_boxed (value));
+ decorator_set_classes (decorator, g_value_get_boxed (value));
break;
case PROP_COMMIT_BATCH_SIZE:
priv->batch_size = g_value_get_int (value);
@@ -590,46 +1008,6 @@ tracker_decorator_set_property (GObject *object,
}
static void
-query_type_and_add_element (TrackerDecorator *decorator,
- gint subject)
-{
- TrackerSparqlConnection *sparql_conn;
- TrackerSparqlCursor *cursor;
- GString *query;
- GError *error = NULL;
-
- sparql_conn = tracker_miner_get_connection (TRACKER_MINER (decorator));
-
- query = g_string_new (NULL);
- g_string_append_printf (query, "select tracker:id (?type) {"
- " ?urn a ?type . "
- " FILTER (tracker:id(?urn) = %d ", subject);
- _tracker_decorator_query_append_rdf_type_filter (decorator, query);
- g_string_append (query, ")}");
-
- cursor = tracker_sparql_connection_query (sparql_conn, query->str,
- NULL, &error);
- g_string_free (query, TRUE);
-
- if (error) {
- g_critical ("Could not get type ID for '%d': %s\n",
- subject, error->message);
- g_error_free (error);
- return;
- }
-
- if (!tracker_sparql_cursor_next (cursor, NULL, NULL)) {
- g_critical ("'%d' doesn't have a known type", subject);
- } else {
- element_add (decorator, subject,
- tracker_sparql_cursor_get_integer (cursor, 0),
- FALSE);
- }
-
- g_object_unref (cursor);
-}
-
-static void
handle_deletes (TrackerDecorator *decorator,
GVariantIter *iter)
{
@@ -640,9 +1018,10 @@ handle_deletes (TrackerDecorator *decorator,
while (g_variant_iter_loop (iter, "(iiii)",
&graph, &subject, &predicate, &object)) {
- if (predicate == priv->rdf_type_id)
- element_remove_by_id (decorator, subject);
- else if (predicate == priv->nie_data_source_id &&
+ if (predicate == priv->rdf_type_id) {
+ decorator_item_cache_remove (decorator, subject);
+ decorator_blacklist_remove (decorator, subject);
+ } else if (predicate == priv->nie_data_source_id &&
object == priv->data_source_id) {
/* If only the decorator datasource is removed,
* re-process the file from scratch if it's not already
@@ -650,40 +1029,20 @@ handle_deletes (TrackerDecorator *decorator,
* to query it first. This should be rare enough that
* it doesn't matter to accumulate them to query in
* batches. */
- if (!g_hash_table_contains (priv->elems,
- GINT_TO_POINTER (subject))) {
- query_type_and_add_element (decorator, subject);
- }
+ decorator_cache_next_items (decorator);
}
}
}
-static gboolean
-class_name_id_handled (TrackerDecorator *decorator,
- gint id)
-{
- TrackerDecoratorPrivate *priv;
-
- priv = decorator->priv;
-
- return class_name_array_contains (priv->class_name_ids, id);
-}
-
static void
handle_updates (TrackerDecorator *decorator,
GVariantIter *iter)
{
- gint graph, subject, predicate, object;
- TrackerDecoratorPrivate *priv;
-
- priv = decorator->priv;
-
- while (g_variant_iter_loop (iter, "(iiii)",
- &graph, &subject, &predicate, &object)) {
- if (predicate == priv->rdf_type_id &&
- class_name_id_handled (decorator, object))
- element_add (decorator, subject, object, FALSE);
- }
+ /* Merely use this as a hint that there is something
+ * left to be processed.
+ */
+ if (g_variant_iter_n_children (iter) > 0)
+ decorator_cache_next_items (decorator);
}
static void
@@ -695,13 +1054,16 @@ class_signal_cb (GDBusConnection *connection,
GVariant *parameters,
gpointer user_data)
{
+ TrackerDecorator *decorator = user_data;
GVariantIter *iter1, *iter2;
g_variant_get (parameters, "(&sa(iiii)a(iiii))", NULL, &iter1, &iter2);
- handle_deletes (user_data, iter1);
- handle_updates (user_data, iter2);
+ handle_deletes (decorator, iter1);
+ handle_updates (decorator, iter2);
g_variant_iter_free (iter1);
g_variant_iter_free (iter2);
+
+ decorator_query_remaining_items (decorator);
}
static gboolean
@@ -729,7 +1091,7 @@ tracker_decorator_initable_init (GInitable *initable,
priv->rdf_type_id = get_class_id (sparql_conn, "rdf:type", FALSE);
priv->nie_data_source_id = get_class_id (sparql_conn, "nie:dataSource", FALSE);
priv->data_source_id = get_class_id (sparql_conn, priv->data_source, TRUE);
- tracker_decorator_validate_class_ids (decorator, priv->class_names);
+ tracker_decorator_validate_class_ids (decorator);
priv->graph_updated_signal_id =
g_dbus_connection_signal_subscribe (conn,
@@ -742,6 +1104,7 @@ tracker_decorator_initable_init (GInitable *initable,
class_signal_cb,
initable, NULL);
decorator_update_state (decorator, "Idle", FALSE);
+ decorator_rebuild_cache (decorator);
return TRUE;
}
@@ -752,7 +1115,6 @@ tracker_decorator_initable_iface_init (GInitableIface *iface)
iface->init = tracker_decorator_initable_init;
}
-
static void
tracker_decorator_constructed (GObject *object)
{
@@ -770,7 +1132,6 @@ tracker_decorator_finalize (GObject *object)
TrackerDecoratorPrivate *priv;
TrackerDecorator *decorator;
GDBusConnection *conn;
- GList *l;
decorator = TRACKER_DECORATOR (object);
priv = decorator->priv;
@@ -781,80 +1142,33 @@ tracker_decorator_finalize (GObject *object)
priv->graph_updated_signal_id);
}
- while ((l = tracker_priority_queue_get_head (priv->elem_queue)))
- element_remove_link (decorator, l, FALSE);
-
- g_array_unref (priv->class_name_ids);
- tracker_priority_queue_unref (priv->elem_queue);
- g_array_unref (priv->priority_class_name_ids);
- g_hash_table_unref (priv->elems);
+ g_queue_foreach (&priv->item_cache,
+ (GFunc) tracker_decorator_info_unref,
+ NULL);
+ g_queue_clear (&priv->item_cache);
+
+ decorator_cancel_active_tasks (decorator);
+ decorator_notify_empty (decorator);
+
+ g_free (priv->class_names);
+ g_hash_table_destroy (priv->tasks);
+ g_array_unref (priv->classes);
+ g_array_unref (priv->prepended_ids);
+ g_clear_pointer (&priv->sparql_buffer, (GDestroyNotify) g_array_unref);
+ g_clear_pointer (&priv->commit_buffer, (GDestroyNotify) g_array_unref);
+ g_sequence_free (priv->blacklist_items);
g_free (priv->data_source);
- g_strfreev (priv->class_names);
g_timer_destroy (priv->timer);
- if (priv->sparql_buffer)
- g_ptr_array_unref (priv->sparql_buffer);
-
G_OBJECT_CLASS (tracker_decorator_parent_class)->finalize (object);
}
-void
-_tracker_decorator_query_append_rdf_type_filter (TrackerDecorator *decorator,
- GString *query)
-{
- const gchar **class_names;
- gint i = 0;
-
- class_names = tracker_decorator_get_class_names (decorator);
-
- if (!class_names || !*class_names)
- return;
-
- g_string_append (query, "&& ?type IN (");
-
- while (class_names[i]) {
- if (i != 0)
- g_string_append (query, ",");
-
- g_string_append (query, class_names[i]);
- i++;
- }
-
- g_string_append (query, ") ");
-}
-
-static void
-query_elements_cb (GObject *object,
- GAsyncResult *result,
- gpointer user_data)
-{
- TrackerSparqlConnection *conn;
- TrackerSparqlCursor *cursor;
- GError *error = NULL;
-
- conn = TRACKER_SPARQL_CONNECTION (object);
- cursor = tracker_sparql_connection_query_finish (conn, result, &error);
-
- if (error) {
- g_critical ("Could not load files missing metadata: %s", error->message);
- g_error_free (error);
- return;
- }
-
- while (tracker_sparql_cursor_next (cursor, NULL, NULL)) {
- gint id = tracker_sparql_cursor_get_integer (cursor, 0);
- gint class_name_id = tracker_sparql_cursor_get_integer (cursor, 1);
- element_add (user_data, id, class_name_id, TRUE);
- }
-
- g_object_unref (cursor);
-}
-
static void
tracker_decorator_paused (TrackerMiner *miner)
{
TrackerDecoratorPrivate *priv;
+ decorator_cancel_active_tasks (TRACKER_DECORATOR (miner));
priv = TRACKER_DECORATOR (miner)->priv;
g_timer_stop (priv->timer);
}
@@ -864,6 +1178,7 @@ tracker_decorator_resumed (TrackerMiner *miner)
{
TrackerDecoratorPrivate *priv;
+ decorator_cache_next_items (TRACKER_DECORATOR (miner));
priv = TRACKER_DECORATOR (miner)->priv;
g_timer_continue (priv->timer);
}
@@ -873,6 +1188,7 @@ tracker_decorator_stopped (TrackerMiner *miner)
{
TrackerDecoratorPrivate *priv;
+ decorator_cancel_active_tasks (TRACKER_DECORATOR (miner));
priv = TRACKER_DECORATOR (miner)->priv;
g_timer_stop (priv->timer);
}
@@ -880,33 +1196,14 @@ tracker_decorator_stopped (TrackerMiner *miner)
static void
tracker_decorator_started (TrackerMiner *miner)
{
- TrackerSparqlConnection *sparql_conn;
TrackerDecoratorPrivate *priv;
TrackerDecorator *decorator;
- const gchar *data_source;
- GString *query;
decorator = TRACKER_DECORATOR (miner);
priv = decorator->priv;
g_timer_start (priv->timer);
- data_source = tracker_decorator_get_data_source (decorator);
- query = g_string_new ("SELECT tracker:id(?urn) tracker:id(?type) { "
- " ?urn a rdfs:Resource ; "
- " a ?type. ");
-
- g_string_append_printf (query,
- "FILTER (! EXISTS { ?urn nie:dataSource <%s> } ",
- data_source);
-
- _tracker_decorator_query_append_rdf_type_filter (decorator, query);
- g_string_append (query, "&& BOUND(tracker:available(?urn)))}");
-
- sparql_conn = tracker_miner_get_connection (miner);
- tracker_sparql_connection_query_async (sparql_conn, query->str,
- NULL, query_elements_cb,
- decorator);
- g_string_free (query, TRUE);
+ decorator_cache_next_items (decorator);
}
static void
@@ -994,18 +1291,27 @@ tracker_decorator_class_init (TrackerDecoratorClass *klass)
}
static void
+class_info_clear (ClassInfo *info)
+{
+ g_free (info->class_name);
+}
+
+static void
tracker_decorator_init (TrackerDecorator *decorator)
{
TrackerDecoratorPrivate *priv;
decorator->priv = priv = TRACKER_DECORATOR_GET_PRIVATE (decorator);
- priv->elems = g_hash_table_new (NULL, NULL);
- priv->elem_queue = tracker_priority_queue_new ();
- priv->class_name_ids = g_array_new (FALSE, FALSE, sizeof (gint));
- priv->priority_class_name_ids = g_array_new (FALSE, FALSE, sizeof (gint));
+ priv->classes = g_array_new (FALSE, FALSE, sizeof (ClassInfo));
+ g_array_set_clear_func (priv->classes, (GDestroyNotify) class_info_clear);
+ priv->blacklist_items = g_sequence_new (NULL);
+ priv->prepended_ids = g_array_new (FALSE, FALSE, sizeof (gint));
priv->batch_size = DEFAULT_BATCH_SIZE;
- priv->sparql_buffer = g_ptr_array_new_with_free_func (g_free);
priv->timer = g_timer_new ();
+
+ g_queue_init (&priv->next_elem_queue);
+ g_queue_init (&priv->item_cache);
+ priv->tasks = g_hash_table_new (NULL, NULL);
}
/**
@@ -1069,7 +1375,8 @@ tracker_decorator_get_n_items (TrackerDecorator *decorator)
g_return_val_if_fail (TRACKER_IS_DECORATOR (decorator), 0);
priv = decorator->priv;
- return g_hash_table_size (priv->elems);
+
+ return priv->n_remaining_items;
}
/**
@@ -1089,9 +1396,12 @@ tracker_decorator_prepend_id (TrackerDecorator *decorator,
gint id,
gint class_name_id)
{
+ TrackerDecoratorPrivate *priv;
+
g_return_if_fail (TRACKER_IS_DECORATOR (decorator));
- element_add (decorator, id, class_name_id, TRUE);
+ priv = decorator->priv;
+ g_array_append_val (priv->prepended_ids, id);
}
/**
@@ -1111,171 +1421,9 @@ tracker_decorator_delete_id (TrackerDecorator *decorator,
{
g_return_if_fail (TRACKER_IS_DECORATOR (decorator));
+#if 0
element_remove_by_id (decorator, id);
-}
-
-static void complete_tasks_or_query (TrackerDecorator *decorator);
-
-typedef struct {
- TrackerDecorator *decorator;
- GArray *ids;
-} QueryNextItemsData;
-
-static void
-query_next_items_cb (GObject *object,
- GAsyncResult *result,
- gpointer user_data)
-{
- QueryNextItemsData *data = user_data;
- TrackerDecorator *decorator = data->decorator;
- TrackerDecoratorPrivate *priv;
- TrackerSparqlConnection *conn;
- TrackerSparqlCursor *cursor;
- GList *elem;
- ElemNode *node;
- gint id;
- guint i;
- GError *error = NULL;
-
- conn = TRACKER_SPARQL_CONNECTION (object);
- cursor = tracker_sparql_connection_query_finish (conn, result, &error);
- priv = decorator->priv;
-
- if (error) {
- GTask *task;
-
- while ((task = g_queue_pop_head (&priv->next_elem_queue))) {
- g_task_return_error (task, g_error_copy (error));
- g_object_unref (task);
- }
-
- g_clear_error (&error);
- goto out;
- }
-
- while (tracker_sparql_cursor_next (cursor, NULL, NULL)) {
- id = tracker_sparql_cursor_get_integer (cursor, 1);
- elem = g_hash_table_lookup (priv->elems, GINT_TO_POINTER (id));
- if (!elem)
- continue;
-
- node = elem->data;
- node->info = tracker_decorator_info_new (cursor);
- }
-
- /* Remove elements that we queried but we didn't get info */
- for (i = 0; i < data->ids->len; i++) {
- id = g_array_index (data->ids, gint, i);
- elem = g_hash_table_lookup (priv->elems, GINT_TO_POINTER (id));
- if (!elem)
- continue;
-
- node = elem->data;
- if (!node->info)
- element_remove_link (decorator, elem, TRUE);
- }
-
- complete_tasks_or_query (decorator);
-
-out:
- g_clear_object (&cursor);
- g_array_unref (data->ids);
- g_slice_free (QueryNextItemsData, data);
-}
-
-static void
-query_next_items (TrackerDecorator *decorator,
- GList *l)
-{
- TrackerSparqlConnection *sparql_conn;
- TrackerDecoratorPrivate *priv;
- GString *id_string;
- gchar *query;
- QueryNextItemsData *data;
-
- priv = decorator->priv;
-
- data = g_slice_new0 (QueryNextItemsData);
- data->decorator = decorator;
- data->ids = g_array_sized_new (FALSE, FALSE,
- sizeof (gint),
- QUERY_BATCH_SIZE);
-
- id_string = g_string_new (NULL);
- for (; l != NULL && data->ids->len < QUERY_BATCH_SIZE; l = l->next) {
- ElemNode *node = l->data;
-
- if (node->info)
- continue;
-
- if (id_string->len > 0)
- g_string_append_c (id_string, ',');
- g_string_append_printf (id_string, "%d", node->id);
- g_array_append_val (data->ids, node->id);
- }
-
- g_assert (data->ids->len > 0);
-
- query = g_strdup_printf ("SELECT ?urn"
- " tracker:id(?urn) "
- " nie:url(?urn) "
- " nie:mimeType(?urn) { "
- " ?urn tracker:available true . "
- " FILTER (tracker:id(?urn) IN (%s) && "
- " ! EXISTS { ?urn nie:dataSource <%s> })"
- "}", id_string->str, priv->data_source);
-
- sparql_conn = tracker_miner_get_connection (TRACKER_MINER (decorator));
- tracker_sparql_connection_query_async (sparql_conn, query,
- NULL,
- query_next_items_cb, data);
- g_string_free (id_string, TRUE);
- g_free (query);
-}
-
-static void
-complete_tasks_or_query (TrackerDecorator *decorator)
-{
- TrackerDecoratorPrivate *priv;
- GList *l;
- GTask *task;
-
- priv = decorator->priv;
-
- for (l = tracker_priority_queue_get_head (priv->elem_queue);
- l != NULL;
- l = l->next) {
- ElemNode *node = l->data;
-
- /* The next item isn't queried yet, do it now */
- if (!node->info) {
- query_next_items (decorator, l);
- return;
- }
-
- /* If the item is not already being processed, we can complete a
- * task with it. */
- if (!node->info->task) {
- task = g_queue_pop_head (&priv->next_elem_queue);
- element_ensure_task (node, decorator);
- g_task_return_pointer (task,
- tracker_decorator_info_ref (node->info),
- (GDestroyNotify) tracker_decorator_info_unref);
- g_object_unref (task);
-
- if (g_queue_is_empty (&priv->next_elem_queue))
- return;
- }
- }
-
- /* There is no element left, or they are all being processed already */
- while ((task = g_queue_pop_head (&priv->next_elem_queue))) {
- g_task_return_new_error (task,
- tracker_decorator_error_quark (),
- TRACKER_DECORATOR_ERROR_EMPTY,
- "There are no items left");
- g_object_unref (task);
- }
+#endif
}
/**
@@ -1320,12 +1468,8 @@ tracker_decorator_next (TrackerDecorator *decorator,
return;
}
- /* Push the task in a queue, for the case this function is called
- * multiple before it finishes. */
g_queue_push_tail (&priv->next_elem_queue, task);
- if (g_queue_get_length (&priv->next_elem_queue) == 1) {
- complete_tasks_or_query (decorator);
- }
+ decorator_pair_tasks (decorator);
}
/**
@@ -1355,51 +1499,52 @@ tracker_decorator_next_finish (TrackerDecorator *decorator,
return g_task_propagate_pointer (G_TASK (result), error);
}
+static gint
+class_compare_func (const ClassInfo *a,
+ const ClassInfo *b)
+{
+ return b->priority - a->priority;
+}
+
+static void
+decorator_set_class_priority (TrackerDecorator *decorator,
+ const gchar *class,
+ gint priority)
+{
+ TrackerDecoratorPrivate *priv = decorator->priv;
+ ClassInfo *info;
+ gint i;
+
+ for (i = 0; i < priv->classes->len; i++) {
+ info = &g_array_index (priv->classes, ClassInfo, i);
+
+ if (strcmp (info->class_name, class) != 0)
+ continue;
+
+ info->priority = priority;
+ break;
+ }
+}
+
void
tracker_decorator_set_priority_rdf_types (TrackerDecorator *decorator,
const gchar * const *rdf_types)
{
TrackerDecoratorPrivate *priv;
- TrackerPriorityQueue *new_queue;
- guint i, j;
- GList *elem_link;
+ gint i;
g_return_if_fail (TRACKER_DECORATOR (decorator));
g_return_if_fail (rdf_types != NULL);
priv = decorator->priv;
- if (priv->priority_class_name_ids->len > 0)
- g_array_remove_range (priv->priority_class_name_ids, 0,
- priv->priority_class_name_ids->len);
-
- for (i = 0; rdf_types[i] != NULL; i++) {
- for (j = 0; priv->class_names[j] != NULL; j++) {
- gint id;
-
- if (!g_str_equal (rdf_types[i], priv->class_names[j]))
- continue;
-
- /* priv->class_names and priv->class_name_ids are in the
- * same order */
- id = g_array_index (priv->class_name_ids, gint, j);
- g_array_append_val (priv->priority_class_name_ids, id);
- break;
- }
+ for (i = 0; rdf_types[i]; i++) {
+ decorator_set_class_priority (decorator, rdf_types[i],
+ G_PRIORITY_HIGH);
}
- /* We have to re-evaluate the priority of each element. We also have to
- * keep the same elem_link because they are referenced in priv->elems.
- * So we create a new priority queue and transfer nodes one by one. */
- new_queue = tracker_priority_queue_new ();
- while ((elem_link = tracker_priority_queue_pop_node (priv->elem_queue, NULL))) {
- ElemNode *node = elem_link->data;
-
- tracker_priority_queue_add_node (new_queue, elem_link,
- elem_node_get_priority (decorator, node));
- }
- tracker_priority_queue_unref (priv->elem_queue);
- priv->elem_queue = new_queue;
+ g_array_sort (priv->classes, (GCompareFunc) class_compare_func);
+ decorator_rebuild_cache (decorator);
}
/**
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]