[tracker/decorator-memory-reduction: 1/3] libtracker-miner: Reduce TrackerDecorator file cache size



commit 8a607959a5289050e9003c64a85e15b71f6695bb
Author: Carlos Garnacho <carlosg gnome org>
Date:   Sun Nov 30 13:29:51 2014 +0100

    libtracker-miner: Reduce TrackerDecorator file cache size
    
    Instead of keeping all items in memory, a small cache is kept of upcoming
    files to be processed, querying the database again for files missing the
    nie:dataSource when the cache has been flushed. When no further items are
    processed, the miner is stopped. Whenever the prioritized rdf:types list
    changes, the cache is dropped, just to query again with the new order kept
    into account.
    
    This reduces the potential memory usage of tracker-extract, it would
    previously would grow linearly with the number of files left to be
    inspected, which might bring in some memory pressure on certain scenarios.
    
    On the downside, this obviously makes the decorator a little bit slower,
    due to more queries happening (as opposed to a huge one), and at times the
    decorator is idle waiting to know what else to do.

 src/libtracker-miner/Makefile.am                  |    1 -
 src/libtracker-miner/tracker-decorator-fs.c       |   26 +-
 src/libtracker-miner/tracker-decorator-internal.h |   32 -
 src/libtracker-miner/tracker-decorator.c          | 1321 ++++++++++++---------
 4 files changed, 758 insertions(+), 622 deletions(-)
---
diff --git a/src/libtracker-miner/Makefile.am b/src/libtracker-miner/Makefile.am
index 5df74a3..eafb3ca 100644
--- a/src/libtracker-miner/Makefile.am
+++ b/src/libtracker-miner/Makefile.am
@@ -65,7 +65,6 @@ miner_sources =                                      \
        tracker-data-provider.h                        \
        tracker-decorator.c                            \
        tracker-decorator.h                            \
-       tracker-decorator-internal.h                   \
        tracker-decorator-fs.c                         \
        tracker-decorator-fs.h                         \
        tracker-enumerator.c                           \
diff --git a/src/libtracker-miner/tracker-decorator-fs.c b/src/libtracker-miner/tracker-decorator-fs.c
index d2e4470..7ed097e 100644
--- a/src/libtracker-miner/tracker-decorator-fs.c
+++ b/src/libtracker-miner/tracker-decorator-fs.c
@@ -24,7 +24,6 @@
 #include <libtracker-sparql/tracker-sparql.h>
 
 #include "tracker-decorator-fs.h"
-#include "tracker-decorator-internal.h"
 
 #define TRACKER_DECORATOR_FS_GET_PRIVATE(o) (G_TYPE_INSTANCE_GET_PRIVATE ((o), TRACKER_TYPE_DECORATOR_FS, 
TrackerDecoratorFSPrivate))
 
@@ -130,6 +129,31 @@ remove_files_cb (GObject *object,
 }
 
 static void
+_tracker_decorator_query_append_rdf_type_filter (TrackerDecorator *decorator,
+                                                 GString          *query)
+{
+       const gchar **class_names;
+       gint i = 0;
+
+       class_names = tracker_decorator_get_class_names (decorator);
+
+       if (!class_names || !*class_names)
+               return;
+
+       g_string_append (query, "&& ?type IN (");
+
+       while (class_names[i]) {
+               if (i != 0)
+                       g_string_append (query, ",");
+
+               g_string_append (query, class_names[i]);
+               i++;
+       }
+
+       g_string_append (query, ") ");
+}
+
+static void
 check_files (TrackerDecorator    *decorator,
              const gchar         *mount_point_urn,
              gboolean             available,
diff --git a/src/libtracker-miner/tracker-decorator.c b/src/libtracker-miner/tracker-decorator.c
index 28cdf4a..5832207 100644
--- a/src/libtracker-miner/tracker-decorator.c
+++ b/src/libtracker-miner/tracker-decorator.c
@@ -20,11 +20,10 @@
 #include "config.h"
 
 #include "tracker-decorator.h"
-#include "tracker-decorator-internal.h"
 #include "tracker-priority-queue.h"
 
 #define QUERY_BATCH_SIZE 100
-#define DEFAULT_BATCH_SIZE 100
+#define DEFAULT_BATCH_SIZE 200
 
 #define TRACKER_DECORATOR_GET_PRIVATE(o) (G_TYPE_INSTANCE_GET_PRIVATE ((o), TRACKER_TYPE_DECORATOR, 
TrackerDecoratorPrivate))
 
@@ -43,42 +42,59 @@
 **/
 
 typedef struct _TrackerDecoratorPrivate TrackerDecoratorPrivate;
-typedef struct _ElemNode ElemNode;
+typedef struct _SparqlUpdate SparqlUpdate;
+typedef struct _ClassInfo ClassInfo;
 
 struct _TrackerDecoratorInfo {
        GTask *task;
        gchar *urn;
        gchar *url;
        gchar *mimetype;
+       gint id;
        gint ref_count;
 };
 
-struct _ElemNode {
-       TrackerDecoratorInfo *info;
+struct _ClassInfo {
+       gchar *class_name;
+       gint class_id;
+       gint priority;
+};
+
+struct _SparqlUpdate {
+       gchar *sparql;
        gint id;
-       gint class_name_id;
-       gboolean prepend;
 };
 
 struct _TrackerDecoratorPrivate {
        guint graph_updated_signal_id;
        gchar *data_source;
-       GStrv class_names;
 
-       TrackerPriorityQueue *elem_queue;
-       GHashTable *elems;
-       GPtrArray *sparql_buffer;
+       GArray *classes; /* Array of ClassInfo */
+       gchar **class_names;
+
+       gssize n_remaining_items;
+       gssize n_processed_items;
+
+       GQueue item_cache; /* Queue of TrackerDecoratorInfo */
+
+       /* Arrays of tracker IDs */
+       GArray *prepended_ids;
+       GSequence *blacklist_items;
+
+       GHashTable *tasks; /* Associative array of GTasks */
+       GArray *sparql_buffer; /* Array of SparqlUpdate */
+       GArray *commit_buffer; /* Array of SparqlUpdate */
        GTimer *timer;
-       GQueue next_elem_queue;
+       GQueue next_elem_queue; /* Queue of incoming tasks */
 
-       GArray *class_name_ids;
-       GArray *priority_class_name_ids;
        gint rdf_type_id;
        gint nie_data_source_id;
        gint data_source_id;
        gint batch_size;
 
-       gint stats_n_elems;
+       guint processing : 1;
+       guint querying   : 1;
+       guint validated  : 1;
 };
 
 enum {
@@ -99,22 +115,41 @@ static GInitableIface *parent_initable_iface;
 
 static void   tracker_decorator_initable_iface_init (GInitableIface   *iface);
 
+static void decorator_task_done (GObject      *object,
+                                 GAsyncResult *result,
+                                 gpointer      user_data);
+static void decorator_cache_next_items (TrackerDecorator *decorator);
+static gboolean decorator_check_commit (TrackerDecorator *decorator);
+
 G_DEFINE_QUARK (TrackerDecoratorError, tracker_decorator_error)
 
 G_DEFINE_ABSTRACT_TYPE_WITH_CODE (TrackerDecorator, tracker_decorator, TRACKER_TYPE_MINER,
                                   G_IMPLEMENT_INTERFACE (G_TYPE_INITABLE, 
tracker_decorator_initable_iface_init))
 
 static TrackerDecoratorInfo *
-tracker_decorator_info_new (TrackerSparqlCursor *cursor)
+tracker_decorator_info_new (TrackerDecorator    *decorator,
+                            TrackerSparqlCursor *cursor)
 {
+       TrackerSparqlBuilder *sparql;
        TrackerDecoratorInfo *info;
+       GCancellable *cancellable;
 
        info = g_slice_new0 (TrackerDecoratorInfo);
        info->urn = g_strdup (tracker_sparql_cursor_get_string (cursor, 0, NULL));
+       info->id = tracker_sparql_cursor_get_integer (cursor, 1);
        info->url = g_strdup (tracker_sparql_cursor_get_string (cursor, 2, NULL));
        info->mimetype = g_strdup (tracker_sparql_cursor_get_string (cursor, 3, NULL));
        info->ref_count = 1;
 
+       cancellable = g_cancellable_new ();
+       info->task = g_task_new (decorator, cancellable,
+                                decorator_task_done, info);
+       g_object_unref (cancellable);
+
+       sparql = tracker_sparql_builder_new_update ();
+       g_task_set_task_data (info->task, sparql,
+                             (GDestroyNotify) g_object_unref);
+
        return info;
 }
 
@@ -144,6 +179,46 @@ G_DEFINE_BOXED_TYPE (TrackerDecoratorInfo,
                      tracker_decorator_info_ref,
                      tracker_decorator_info_unref)
 
+static gint
+sequence_compare_func (gconstpointer data1,
+                       gconstpointer data2,
+                       gpointer      user_data)
+{
+       return GPOINTER_TO_INT (data1) - GPOINTER_TO_INT (data2);
+}
+
+static void
+decorator_blacklist_add (TrackerDecorator *decorator,
+                         gint              id)
+{
+       TrackerDecoratorPrivate *priv = decorator->priv;
+       GSequenceIter *iter;
+
+       iter = g_sequence_search (priv->blacklist_items,
+                                 GINT_TO_POINTER (id),
+                                 sequence_compare_func,
+                                 NULL);
+
+       if (g_sequence_iter_is_end (iter) ||
+           g_sequence_get (g_sequence_iter_prev (iter)) != GINT_TO_POINTER (id))
+               g_sequence_insert_before (iter, GINT_TO_POINTER (id));
+}
+
+static void
+decorator_blacklist_remove (TrackerDecorator *decorator,
+                            gint              id)
+{
+       TrackerDecoratorPrivate *priv = decorator->priv;
+       GSequenceIter *iter;
+
+       iter = g_sequence_lookup (priv->blacklist_items,
+                                 GINT_TO_POINTER (id),
+                                 sequence_compare_func,
+                                 NULL);
+       if (iter)
+               g_sequence_remove (iter);
+}
+
 static void
 decorator_update_state (TrackerDecorator *decorator,
                         const gchar      *message,
@@ -152,27 +227,24 @@ decorator_update_state (TrackerDecorator *decorator,
        TrackerDecoratorPrivate *priv;
        gint remaining_time = -1;
        gdouble progress = 1;
-       guint length;
+       gsize total_items;
 
        priv = decorator->priv;
-       length = tracker_priority_queue_get_length (priv->elem_queue);
+       remaining_time = 0;
+       total_items = priv->n_remaining_items + priv->n_processed_items;
 
-       if (length > 0) {
-               progress = 1 - ((gdouble) length / priv->stats_n_elems);
-               remaining_time = 0;
-       }
+       if (priv->n_remaining_items > 0)
+               progress = ((gdouble) priv->n_processed_items / total_items);
 
        if (priv->timer && estimate_time &&
            !tracker_miner_is_paused (TRACKER_MINER (decorator))) {
                gdouble elapsed;
-               gint elems_done;
 
                /* FIXME: Quite naive calculation */
                elapsed = g_timer_elapsed (priv->timer, NULL);
-               elems_done = priv->stats_n_elems - length;
 
-               if (elems_done > 0)
-                       remaining_time = (length * elapsed) / elems_done;
+               if (priv->n_processed_items > 0)
+                       remaining_time = (priv->n_remaining_items * elapsed) / priv->n_processed_items;
        }
 
        g_object_set (decorator,
@@ -184,270 +256,609 @@ decorator_update_state (TrackerDecorator *decorator,
                g_object_set (decorator, "status", message, NULL);
 }
 
-static gboolean
-class_name_array_contains (GArray *array,
-                           gint    id)
+static void
+decorator_commit_cb (GObject      *object,
+                     GAsyncResult *result,
+                     gpointer      user_data)
 {
+       TrackerSparqlConnection *conn;
+       TrackerDecoratorPrivate *priv;
+       TrackerDecorator *decorator;
+       GError *error = NULL;
+       GPtrArray *errors;
        guint i;
 
-       for (i = 0; i < array->len; i++) {
-               if (id == g_array_index (array, gint, i))
-                       return TRUE;
+       decorator = user_data;
+       priv = decorator->priv;
+       conn = TRACKER_SPARQL_CONNECTION (object);
+       errors = tracker_sparql_connection_update_array_finish (conn, result, &error);
+
+       if (error) {
+               g_warning ("There was an error pushing metadata: %s\n", error->message);
        }
 
-       return FALSE;
+       if (errors) {
+               for (i = 0; i < errors->len; i++) {
+                       SparqlUpdate *update;
+                       GError *child_error;
+
+                       child_error = g_ptr_array_index (errors, i);
+                       update = &g_array_index (priv->commit_buffer, SparqlUpdate, i);
+
+                       if (child_error) {
+                               decorator_blacklist_add (decorator, update->id);
+                               g_warning ("Task %d, error: %s\n"
+                                          "Sparql was:\n%s\n",
+                                          i, child_error->message,
+                                          update->sparql);
+                       }
+               }
+
+               g_ptr_array_unref (errors);
+       }
+
+       g_clear_pointer (&priv->commit_buffer, (GDestroyNotify) g_array_unref);
+
+       if (!decorator_check_commit (decorator))
+               decorator_cache_next_items (decorator);
 }
 
-static gint
-elem_node_get_priority (TrackerDecorator *decorator,
-                        ElemNode         *node)
+static void
+sparql_update_clear (SparqlUpdate *update)
 {
+       g_free (update->sparql);
+}
+
+static GArray *
+sparql_buffer_new (void)
+{
+       GArray *array;
+
+       array = g_array_new (FALSE, FALSE, sizeof (SparqlUpdate));
+       g_array_set_clear_func (array, (GDestroyNotify) sparql_update_clear);
+
+       return array;
+}
+
+static gboolean
+decorator_commit_info (TrackerDecorator *decorator)
+{
+       TrackerSparqlConnection *sparql_conn;
        TrackerDecoratorPrivate *priv;
-       gboolean prior;
+       GPtrArray *array;
+       gint i;
 
        priv = decorator->priv;
 
-       /* We want [prepend and prior, prior, prepend, the rest] */
-       prior = class_name_array_contains (priv->priority_class_name_ids,
-                                          node->class_name_id);
-       if (prior && node->prepend)
-               return G_PRIORITY_HIGH - 1;
-       else if (prior)
-               return G_PRIORITY_HIGH;
-       else if (node->prepend)
-               return G_PRIORITY_HIGH + 1;
+       if (!priv->sparql_buffer || priv->sparql_buffer->len == 0)
+               return FALSE;
+
+       if (priv->commit_buffer)
+               return FALSE;
+
+       /* Move sparql buffer to commit buffer */
+       priv->commit_buffer = priv->sparql_buffer;
+       priv->sparql_buffer = NULL;
+       array = g_ptr_array_new ();
+
+       for (i = 0; i < priv->commit_buffer->len; i++) {
+               SparqlUpdate *update;
+
+               update = &g_array_index (priv->commit_buffer, SparqlUpdate, i);
+               g_ptr_array_add (array, update->sparql);
+       }
 
-       return G_PRIORITY_DEFAULT;
+       sparql_conn = tracker_miner_get_connection (TRACKER_MINER (decorator));
+       tracker_sparql_connection_update_array_async (sparql_conn,
+                                                     (gchar **) array->pdata,
+                                                     array->len,
+                                                     G_PRIORITY_DEFAULT,
+                                                     NULL,
+                                                     decorator_commit_cb,
+                                                     decorator);
+
+       decorator_update_state (decorator, NULL, TRUE);
+       g_ptr_array_unref (array);
+       return TRUE;
 }
 
-static void
-element_add (TrackerDecorator *decorator,
-             gint              id,
-             gint              class_name_id,
-             gboolean          prepend)
+static gboolean
+decorator_check_commit (TrackerDecorator *decorator)
 {
        TrackerDecoratorPrivate *priv;
-       gboolean first_elem;
-       ElemNode *node;
-       GList *elem;
 
        priv = decorator->priv;
 
-       if (g_hash_table_contains (priv->elems, GINT_TO_POINTER (id)))
+       if (!priv->sparql_buffer ||
+           (priv->n_remaining_items > 0 &&
+            priv->sparql_buffer->len < (guint) priv->batch_size))
+               return FALSE;
+
+       return decorator_commit_info (decorator);
+}
+
+static void
+decorator_notify_task_error (TrackerDecorator *decorator,
+                             GError           *error)
+{
+       TrackerDecoratorPrivate *priv = decorator->priv;
+       GTask *task;
+
+       while (!g_queue_is_empty (&priv->next_elem_queue)) {
+               task = g_queue_pop_head (&priv->next_elem_queue);
+               g_task_return_error (task, g_error_copy (error));
+               g_object_unref (task);
+       }
+}
+
+static void
+decorator_notify_empty (TrackerDecorator *decorator)
+{
+       GError *error;
+
+       error = g_error_new (tracker_decorator_error_quark (),
+                            TRACKER_DECORATOR_ERROR_EMPTY,
+                            "There are no items left");
+       decorator_notify_task_error (decorator, error);
+       g_error_free (error);
+}
+
+static void
+decorator_start (TrackerDecorator *decorator)
+{
+       TrackerDecoratorPrivate *priv = decorator->priv;
+
+       if (priv->processing)
                return;
 
-       first_elem = g_hash_table_size (priv->elems) == 0;
-       node = g_new0 (ElemNode, 1);
-       node->id = id;
-       node->class_name_id = class_name_id;
-       node->prepend = prepend;
+       priv->processing = TRUE;
+       g_signal_emit (decorator, signals[ITEMS_AVAILABLE], 0);
+       decorator_update_state (decorator, "Extracting metadata", TRUE);
+}
 
-       elem = tracker_priority_queue_add (priv->elem_queue, node,
-                                          elem_node_get_priority (decorator, node));
+static void
+decorator_finish (TrackerDecorator *decorator)
+{
+       TrackerDecoratorPrivate *priv = decorator->priv;
 
-       g_hash_table_insert (priv->elems, GINT_TO_POINTER (id), elem);
-       priv->stats_n_elems++;
+       if (!priv->processing)
+               return;
 
-       if (first_elem) {
-               g_signal_emit (decorator, signals[ITEMS_AVAILABLE], 0);
-               decorator_update_state (decorator, "Extracting metadata", TRUE);
-       }
+       priv->processing = FALSE;
+       priv->n_remaining_items = priv->n_processed_items = 0;
+       g_signal_emit (decorator, signals[FINISHED], 0);
+       decorator_commit_info (decorator);
+       decorator_notify_empty (decorator);
+       decorator_update_state (decorator, "Idle", FALSE);
 }
 
+static void
+decorator_rebuild_cache (TrackerDecorator *decorator)
+{
+       TrackerDecoratorPrivate *priv = decorator->priv;
+
+       priv->n_remaining_items = 0;
+       g_queue_foreach (&priv->item_cache,
+                        (GFunc) tracker_decorator_info_unref, NULL);
+       g_queue_clear (&priv->item_cache);
 
-static void decorator_commit_info (TrackerDecorator *decorator);
+        decorator_cache_next_items (decorator);
+}
 
+/* This function is called after the caller has completed the
+ * GTask given on the TrackerDecoratorInfo, this definitely removes
+ * the element being processed from queues.
+ */
 static void
-element_remove_link (TrackerDecorator *decorator,
-                     GList            *elem_link,
-                     gboolean          emit)
+decorator_task_done (GObject      *object,
+                     GAsyncResult *result,
+                     gpointer      user_data)
 {
+       TrackerDecorator *decorator = TRACKER_DECORATOR (object);
+       TrackerDecoratorInfo *info = user_data;
        TrackerDecoratorPrivate *priv;
-       ElemNode *node;
 
        priv = decorator->priv;
-       node = elem_link->data;
 
-       if (node->info && node->info->task) {
-               /* A GTask is running on this element, cancel it
-                * and wait for the task callback to delete this node.
-                */
-               g_cancellable_cancel (g_task_get_cancellable (node->info->task));
-               return;
+       if (g_task_had_error (G_TASK (result))) {
+               GError *error = NULL;
+
+               /* Blacklist item */
+               decorator_blacklist_add (decorator, info->id);
+               g_task_propagate_pointer (G_TASK (result), &error);
+               g_warning ("Task for '%s' finished with error: %s\n",
+                          info->url, error->message);
+               g_error_free (error);
+       } else {
+               TrackerSparqlBuilder *sparql;
+               SparqlUpdate update;
+
+               /* Add resulting sparql to buffer and check whether flushing */
+               sparql = g_task_get_task_data (G_TASK (result));
+               update.sparql = g_strdup (tracker_sparql_builder_get_result (sparql));
+               update.id = info->id;
+
+               if (!priv->sparql_buffer)
+                       priv->sparql_buffer = sparql_buffer_new ();
+
+               g_array_append_val (priv->sparql_buffer, update);
        }
 
-       tracker_priority_queue_remove_node (priv->elem_queue, elem_link);
-       g_hash_table_remove (priv->elems, GINT_TO_POINTER (node->id));
+       g_hash_table_remove (priv->tasks, result);
 
-       if (emit && g_hash_table_size (priv->elems) == 0) {
-               /* Flush any remaining Sparql updates */
-               decorator_commit_info (decorator);
+       if (priv->n_remaining_items > 0)
+               priv->n_remaining_items--;
+       priv->n_processed_items++;
 
-               g_signal_emit (decorator, signals[FINISHED], 0);
-               decorator_update_state (decorator, "Idle", FALSE);
-               priv->stats_n_elems = 0;
+       decorator_check_commit (decorator);
+
+       if (priv->n_remaining_items == 0) {
+               decorator_finish (decorator);
+               decorator_rebuild_cache (decorator);
+       } else if (g_queue_is_empty (&priv->item_cache) &&
+                  g_hash_table_size (priv->tasks) == 0 &&
+                  (!priv->sparql_buffer || !priv->commit_buffer)) {
+               decorator_cache_next_items (decorator);
        }
+}
+
+static void
+decorator_cancel_active_tasks (TrackerDecorator *decorator)
+{
+       TrackerDecoratorPrivate *priv = decorator->priv;
+       GHashTableIter iter;
+       GTask *task;
 
-       if (node->info)
-               tracker_decorator_info_unref (node->info);
+       g_hash_table_iter_init (&iter, priv->tasks);
+
+       while (g_hash_table_iter_next (&iter, NULL, (gpointer*) &task)) {
+               g_cancellable_cancel (g_task_get_cancellable (task));
+       }
 
-       g_free (node);
+       g_hash_table_remove_all (priv->tasks);
 }
 
 static void
-element_remove_by_id (TrackerDecorator *decorator,
-                      gint              id)
+query_append_id (GString *string,
+                 gint     id)
 {
-       TrackerDecoratorPrivate *priv;
-       GList *elem_link;
+       if (string->len > 1 && string->str[string->len - 1] != '(')
+               g_string_append_c (string, ',');
 
-       priv = decorator->priv;
-       elem_link = g_hash_table_lookup (priv->elems, GINT_TO_POINTER (id));
+       g_string_append_printf (string, "%d", id);
+}
+
+static void
+query_add_blacklisted_filter (TrackerDecorator *decorator,
+                              GString          *query)
+{
+       TrackerDecoratorPrivate *priv = decorator->priv;
+       GSequenceIter *iter;
 
-       if (!elem_link)
+       if (g_sequence_get_length (priv->blacklist_items) == 0)
                return;
 
-       element_remove_link (decorator, elem_link, TRUE);
+       g_string_append (query, "&& tracker:id(?urn) NOT IN (");
+
+       iter = g_sequence_get_begin_iter (priv->blacklist_items);
+
+       while (!g_sequence_iter_is_end (iter)) {
+               query_append_id (query, GPOINTER_TO_INT (g_sequence_get (iter)));
+               iter = g_sequence_iter_next (iter);
+       }
+
+       g_string_append (query, ")");
 }
 
 static void
-decorator_commit_cb (GObject      *object,
-                     GAsyncResult *result,
-                     gpointer      user_data)
+query_add_update_buffer_ids (GString *query,
+                             GArray  *commit_buffer)
 {
-       TrackerSparqlConnection *conn;
-       GPtrArray *errors, *sparql;
-       GError *error = NULL;
-       guint i;
+       SparqlUpdate *update;
+       gint i;
 
-       sparql = user_data;
-       conn = TRACKER_SPARQL_CONNECTION (object);
-       errors = tracker_sparql_connection_update_array_finish (conn, result, &error);
+       for (i = 0; i < commit_buffer->len; i++) {
+               update = &g_array_index (commit_buffer, SparqlUpdate, i);
+               query_append_id (query, update->id);
+       }
+}
 
-       if (error) {
-               g_warning ("There was an error pushing metadata: %s\n", error->message);
+static void
+query_add_processing_filter (TrackerDecorator *decorator,
+                             GString          *query)
+{
+       TrackerDecoratorPrivate *priv = decorator->priv;
+
+       if ((!priv->sparql_buffer || priv->sparql_buffer->len == 0) &&
+           (!priv->commit_buffer || priv->commit_buffer->len == 0))
+           return;
+
+       g_string_append (query, "&& tracker:id(?urn) NOT IN (");
+
+       if (priv->sparql_buffer && priv->sparql_buffer->len > 0)
+               query_add_update_buffer_ids (query, priv->sparql_buffer);
+       if (priv->commit_buffer && priv->commit_buffer->len > 0)
+               query_add_update_buffer_ids (query, priv->commit_buffer);
+
+       g_string_append (query, ")");
+}
+
+static void
+query_add_id_filter (GString  *query,
+                     GArray   *ids)
+{
+       gint i;
+
+       if (!ids || ids->len == 0)
+               return;
+
+       g_string_append (query, "&& tracker:id(?urn) IN (");
+
+       for (i = 0; i < ids->len; i++) {
+               if (i != 0)
+                       g_string_append (query, ",");
+
+               g_string_append_printf (query, "%d",
+                                       g_array_index (ids, gint, i));
        }
 
-       if (errors) {
-               for (i = 0; i < errors->len; i++) {
-                       GError *child_error;
+       g_string_append (query, ")");
+}
 
-                       child_error = g_ptr_array_index (errors, i);
+static void
+query_append_current_tasks_filter (TrackerDecorator *decorator,
+                                  GString          *query)
+{
+       TrackerDecoratorPrivate *priv = decorator->priv;
+       GHashTableIter iter;
+       gint i = 0, id;
+       GTask *task;
 
-                       if (child_error) {
-                               g_warning ("Task %d, error: %s", i, child_error->message);
-                               g_warning ("Sparql update was:\n%s\n",
-                                          (gchar *) g_ptr_array_index (sparql, i));
-                       }
+       if (g_hash_table_size (priv->tasks) == 0)
+               return;
+
+       g_string_append (query, "&& tracker:id(?urn) NOT IN (");
+       g_hash_table_iter_init (&iter, priv->tasks);
+
+       while (g_hash_table_iter_next (&iter, NULL, (gpointer *) &task)) {
+               if (i != 0)
+                       g_string_append (query, ",");
+
+               id = GPOINTER_TO_INT (g_task_get_task_data (task));
+               g_string_append_printf (query, "%d", id);
+               i++;
+       }
+
+       g_string_append (query, ")");
+}
+
+static gchar *
+create_query_string (TrackerDecorator  *decorator,
+                    gchar            **select_clauses,
+                     gboolean           for_prepended)
+{
+       TrackerDecoratorPrivate *priv = decorator->priv;
+       ClassInfo *prev = NULL, *cur;
+       GString *query;
+       gint i;
+
+       if (!priv->validated)
+               return NULL;
+
+       query = g_string_new ("SELECT ");
+
+       for (i = 0; select_clauses[i]; i++) {
+               g_string_append_printf (query, "%s ", select_clauses[i]);
+       }
+
+       g_string_append (query, "{ SELECT ?urn WHERE {");
+
+       for (i = 0; i < priv->classes->len; i++) {
+               cur = &g_array_index (priv->classes, ClassInfo, i);
+
+               if (!prev || prev->priority != cur->priority) {
+                       if (prev)
+                               g_string_append (query, "))} UNION ");
+
+                       g_string_append_printf (query,
+                                               "{ ?urn a rdfs:Resource;"
+                                               "       a ?type ;"
+                                               "       tracker:available true ."
+                                               "  FILTER (! EXISTS { ?urn nie:dataSource <%s> } ",
+                                               priv->data_source);
+
+                       query_add_blacklisted_filter (decorator, query);
+                       query_add_processing_filter (decorator, query);
+
+                       if (for_prepended && priv->prepended_ids->len > 0)
+                               query_add_id_filter (query, priv->prepended_ids);
+
+                       query_append_current_tasks_filter (decorator, query);
+                       g_string_append (query, " && ?type IN (");
+               } else {
+                       g_string_append (query, ",");
                }
 
-               g_ptr_array_unref (errors);
+               g_string_append_printf (query, "%s", cur->class_name);
+               prev = cur;
        }
 
-       g_ptr_array_unref (sparql);
+       g_string_append_printf (query, "))}}} LIMIT %d", QUERY_BATCH_SIZE);
+
+       return g_string_free (query, FALSE);
+}
+
+static gchar *
+create_remaining_items_query (TrackerDecorator *decorator)
+{
+       gchar *clauses[] = {
+               "?urn",
+               "tracker:id(?urn)",
+               "nie:url(?urn)",
+               "nie:mimeType(?urn)",
+               NULL
+       };
+
+       return create_query_string (decorator, clauses, TRUE);
 }
 
 static void
-decorator_commit_info (TrackerDecorator *decorator)
+decorator_query_remaining_items_cb (GObject      *object,
+                                    GAsyncResult *result,
+                                    gpointer      user_data)
 {
-       TrackerSparqlConnection *sparql_conn;
+       TrackerDecorator *decorator = user_data;
        TrackerDecoratorPrivate *priv;
-       GPtrArray *array;
+       TrackerSparqlCursor *cursor;
+       GError *error = NULL;
 
+       cursor = tracker_sparql_connection_query_finish (TRACKER_SPARQL_CONNECTION (object),
+                                                        result, &error);
        priv = decorator->priv;
+        priv->querying = FALSE;
 
-       if (priv->sparql_buffer->len == 0)
+       if (error || !tracker_sparql_cursor_next (cursor, NULL, &error)) {
+               decorator_notify_task_error (decorator, error);
+               g_error_free (error);
                return;
+       }
 
-       array = priv->sparql_buffer;
-       priv->sparql_buffer = g_ptr_array_new_with_free_func (g_free);
+       priv->n_remaining_items = g_queue_get_length (&priv->item_cache) +
+               tracker_sparql_cursor_get_integer (cursor, 0);
+       g_object_unref (cursor);
 
-       sparql_conn = tracker_miner_get_connection (TRACKER_MINER (decorator));
-       tracker_sparql_connection_update_array_async (sparql_conn,
-                                                     (gchar **) array->pdata,
-                                                     array->len,
-                                                     G_PRIORITY_DEFAULT,
-                                                     NULL,
-                                                     decorator_commit_cb,
-                                                     array);
+       g_debug ("Found %ld items to extract", priv->n_remaining_items);
 
-       decorator_update_state (decorator, NULL, TRUE);
+       if (priv->n_remaining_items > 0)
+               decorator_cache_next_items (decorator);
+       else
+               decorator_finish (decorator);
 }
 
 static void
-decorator_check_commit (TrackerDecorator *decorator)
+decorator_query_remaining_items (TrackerDecorator *decorator)
 {
-       TrackerDecoratorPrivate *priv;
+       gchar *query, *clauses[] = { "COUNT(?urn)", NULL };
+       TrackerSparqlConnection *sparql_conn;
 
-       priv = decorator->priv;
+       query = create_query_string (decorator, clauses, FALSE);
 
-       if (priv->sparql_buffer->len < (guint) priv->batch_size)
-               return;
+       if (query) {
+               sparql_conn = tracker_miner_get_connection (TRACKER_MINER (decorator));
+               tracker_sparql_connection_query_async (sparql_conn, query,
+                                                      NULL, decorator_query_remaining_items_cb,
+                                                      decorator);
+               g_free (query);
+       } else {
+               decorator_notify_empty (decorator);
+       }
+}
 
-       decorator_commit_info (decorator);
+static void
+decorator_pair_tasks (TrackerDecorator *decorator)
+{
+       TrackerDecoratorPrivate *priv = decorator->priv;
+       TrackerDecoratorInfo *info;
+       GTask *task;
+
+       while (!g_queue_is_empty (&priv->item_cache) &&
+              !g_queue_is_empty (&priv->next_elem_queue)) {
+               info = g_queue_pop_head (&priv->item_cache);
+               task = g_queue_pop_head (&priv->next_elem_queue);
+
+               g_task_set_task_data (task, GINT_TO_POINTER (info->id), NULL);
+
+               /* Pass ownership of info */
+               g_task_return_pointer (task, info,
+                                      (GDestroyNotify) tracker_decorator_info_unref);
+               g_object_unref (task);
+
+               /* Store the decorator-side task in the active task pool */
+               g_hash_table_add (priv->tasks, info->task);
+       }
 }
 
-/* This function is called after the caller has completed the
- * GTask given on the TrackerDecoratorInfo, this definitely removes
- * the element being processed from queues.
- */
 static void
-decorator_task_done (GObject      *object,
-                     GAsyncResult *result,
-                     gpointer      user_data)
+decorator_item_cache_remove (TrackerDecorator *decorator,
+                             gint              id)
 {
-       TrackerDecorator *decorator = TRACKER_DECORATOR (object);
+       TrackerDecoratorPrivate *priv = decorator->priv;
+       GList *item;
+
+       for (item = g_queue_peek_head_link (&priv->item_cache);
+            item; item = item->next) {
+               TrackerDecoratorInfo *info = item->data;
+
+               if (info->id != id)
+                       continue;
+
+               g_queue_remove (&priv->item_cache, info);
+               tracker_decorator_info_unref (info);
+       }
+}
+
+static void
+decorator_cache_items_cb (GObject      *object,
+                          GAsyncResult *result,
+                          gpointer      user_data)
+{
+       TrackerDecorator *decorator = user_data;
        TrackerDecoratorPrivate *priv;
-       ElemNode *node = user_data;
+       TrackerSparqlConnection *conn;
+       TrackerSparqlCursor *cursor;
+       TrackerDecoratorInfo *info;
+       GError *error = NULL;
 
+       conn = TRACKER_SPARQL_CONNECTION (object);
+       cursor = tracker_sparql_connection_query_finish (conn, result, &error);
        priv = decorator->priv;
+        priv->querying = FALSE;
 
-       if (g_task_had_error (G_TASK (result))) {
-               GError *error = NULL;
-
-               g_task_propagate_pointer (G_TASK (result), &error);
-               g_warning ("Task for '%s' finished with error: %s\n",
-                          node->info->url, error->message);
+       if (error) {
+               decorator_notify_task_error (decorator, error);
                g_error_free (error);
        } else {
-               TrackerSparqlBuilder *sparql;
-
-               /* Add resulting sparql to buffer and check whether flushing */
-               sparql = g_task_get_task_data (G_TASK (result));
-               g_ptr_array_add (priv->sparql_buffer,
-                                g_strdup (tracker_sparql_builder_get_result (sparql)));
+               while (tracker_sparql_cursor_next (cursor, NULL, NULL)) {
+                       info = tracker_decorator_info_new (decorator, cursor);
+                       g_queue_push_tail (&priv->item_cache, info);
+               }
+       }
 
-               decorator_check_commit (decorator);
+       if (!g_queue_is_empty (&priv->item_cache) && !priv->processing) {
+               decorator_start (decorator);
+       } else if (g_queue_is_empty (&priv->item_cache) && priv->processing) {
+               decorator_finish (decorator);
        }
 
-       /* Detach task first, so the node is removed for good */
-       g_clear_object (&node->info->task);
-       element_remove_by_id (decorator, node->id);
+       decorator_pair_tasks (decorator);
+       g_object_unref (cursor);
 }
 
 static void
-element_ensure_task (ElemNode         *node,
-                     TrackerDecorator *decorator)
+decorator_cache_next_items (TrackerDecorator *decorator)
 {
-       TrackerSparqlBuilder *sparql;
-       TrackerDecoratorInfo *info;
-       GCancellable *cancellable;
-
-       g_return_if_fail (node->info != NULL);
-
-       info = node->info;
+       TrackerDecoratorPrivate *priv = decorator->priv;
 
-       if (info->task)
+       if (priv->querying ||
+           g_hash_table_size (priv->tasks) > 0 ||
+           !g_queue_is_empty (&priv->item_cache))
                return;
 
-       cancellable = g_cancellable_new ();
-       info->task = g_task_new (decorator, cancellable,
-                                decorator_task_done, node);
-       g_object_unref (cancellable);
+        priv->querying = TRUE;
 
-       sparql = tracker_sparql_builder_new_update ();
-       g_task_set_task_data (info->task, sparql,
-                             (GDestroyNotify) g_object_unref);
+       if (priv->n_remaining_items == 0) {
+               decorator_query_remaining_items (decorator);
+       } else {
+               TrackerSparqlConnection *sparql_conn;
+               gchar *query;
+
+               sparql_conn = tracker_miner_get_connection (TRACKER_MINER (decorator));
+               query = create_remaining_items_query (decorator);
+               tracker_sparql_connection_query_async (sparql_conn, query,
+                                                      NULL, decorator_cache_items_cb,
+                                                      decorator);
+               g_free (query);
+       }
 }
 
 static gint
@@ -486,51 +897,29 @@ get_class_id (TrackerSparqlConnection *conn,
 }
 
 static void
-tracker_decorator_validate_class_ids (TrackerDecorator *decorator,
-                                      const GStrv       class_names)
+tracker_decorator_validate_class_ids (TrackerDecorator *decorator)
 {
        TrackerSparqlConnection *sparql_conn;
        TrackerDecoratorPrivate *priv;
-       GPtrArray *strings;
+       ClassInfo *info;
+       GArray *array;
        gint i = 0;
 
        priv = decorator->priv;
        sparql_conn = tracker_miner_get_connection (TRACKER_MINER (decorator));
-
-       if (!sparql_conn) {
-               /* Copy as-is and postpone validation */
-               g_strfreev (priv->class_names);
-               priv->class_names = g_strdupv (class_names);
-               return;
-       }
-
-       if (priv->class_name_ids->len > 0)
-               g_array_remove_range (priv->class_name_ids, 0,
-                                     priv->class_name_ids->len);
-
-       strings = g_ptr_array_new ();
-       if (class_names) {
-               while (class_names[i]) {
-                       gchar *copy;
-                       gint id;
-
-                       id = get_class_id (sparql_conn, class_names[i], FALSE);
-
-                       if (id >= 0) {
-                               copy = g_strdup (class_names[i]);
-                               g_ptr_array_add (strings, copy);
-                               g_array_append_val (priv->class_name_ids, id);
-                       }
-
-                       i++;
+       array = g_array_new (TRUE, FALSE, sizeof (gchar*));
+
+       for (i = 0; i < priv->classes->len; i++) {
+               info = &g_array_index (priv->classes, ClassInfo, i);
+               info->class_id = get_class_id (sparql_conn,
+                                              info->class_name, FALSE);
+               if (info->class_id > 0) {
+                       priv->validated = TRUE;
+                       g_array_append_val (array, info->class_name);
                }
        }
-       g_ptr_array_add (strings, NULL);
-
-       g_strfreev (priv->class_names);
-       priv->class_names = (GStrv) g_ptr_array_free (strings, FALSE);
 
-       g_object_notify (G_OBJECT (decorator), "class-names");
+       priv->class_names = (gchar **) g_array_free (array, FALSE);
 }
 
 static void
@@ -559,6 +948,36 @@ tracker_decorator_get_property (GObject    *object,
 }
 
 static void
+decorator_add_class (TrackerDecorator *decorator,
+                     const gchar      *class)
+{
+       TrackerDecoratorPrivate *priv = decorator->priv;
+       ClassInfo info;
+
+       info.class_name = g_strdup (class);
+       info.class_id = -1;
+       info.priority = G_PRIORITY_DEFAULT;
+       g_array_append_val (priv->classes, info);
+}
+
+static void
+decorator_set_classes (TrackerDecorator  *decorator,
+                       const gchar      **classes)
+{
+       TrackerDecoratorPrivate *priv = decorator->priv;
+       gint i;
+
+       if (priv->classes->len > 0) {
+               g_array_remove_range (priv->classes, 0,
+                                     priv->classes->len);
+       }
+
+       for (i = 0; classes[i]; i++) {
+               decorator_add_class (decorator, classes[i]);
+       }
+}
+
+static void
 tracker_decorator_set_property (GObject      *object,
                                 guint         param_id,
                                 const GValue *value,
@@ -574,8 +993,7 @@ tracker_decorator_set_property (GObject      *object,
                priv->data_source = g_value_dup_string (value);
                break;
        case PROP_CLASS_NAMES:
-               tracker_decorator_validate_class_ids (decorator,
-                                                     g_value_get_boxed (value));
+               decorator_set_classes (decorator, g_value_get_boxed (value));
                break;
        case PROP_COMMIT_BATCH_SIZE:
                priv->batch_size = g_value_get_int (value);
@@ -590,46 +1008,6 @@ tracker_decorator_set_property (GObject      *object,
 }
 
 static void
-query_type_and_add_element (TrackerDecorator *decorator,
-                            gint subject)
-{
-       TrackerSparqlConnection *sparql_conn;
-       TrackerSparqlCursor *cursor;
-       GString *query;
-       GError *error = NULL;
-
-       sparql_conn = tracker_miner_get_connection (TRACKER_MINER (decorator));
-
-       query = g_string_new (NULL);
-       g_string_append_printf (query, "select tracker:id (?type) {"
-                                      "  ?urn a ?type . "
-                                      "  FILTER (tracker:id(?urn) = %d ", subject);
-       _tracker_decorator_query_append_rdf_type_filter (decorator, query);
-       g_string_append (query, ")}");
-
-       cursor = tracker_sparql_connection_query (sparql_conn, query->str,
-                                                 NULL, &error);
-       g_string_free (query, TRUE);
-
-       if (error) {
-               g_critical ("Could not get type ID for '%d': %s\n",
-                           subject, error->message);
-               g_error_free (error);
-               return;
-       }
-
-       if (!tracker_sparql_cursor_next (cursor, NULL, NULL)) {
-               g_critical ("'%d' doesn't have a known type", subject);
-       } else {
-               element_add (decorator, subject,
-                            tracker_sparql_cursor_get_integer (cursor, 0),
-                            FALSE);
-       }
-
-       g_object_unref (cursor);
-}
-
-static void
 handle_deletes (TrackerDecorator *decorator,
                 GVariantIter     *iter)
 {
@@ -640,9 +1018,10 @@ handle_deletes (TrackerDecorator *decorator,
 
        while (g_variant_iter_loop (iter, "(iiii)",
                                    &graph, &subject, &predicate, &object)) {
-               if (predicate == priv->rdf_type_id)
-                       element_remove_by_id (decorator, subject);
-               else if (predicate == priv->nie_data_source_id &&
+               if (predicate == priv->rdf_type_id) {
+                       decorator_item_cache_remove (decorator, subject);
+                       decorator_blacklist_remove (decorator, subject);
+               } else if (predicate == priv->nie_data_source_id &&
                         object == priv->data_source_id) {
                        /* If only the decorator datasource is removed,
                         * re-process the file from scratch if it's not already
@@ -650,40 +1029,20 @@ handle_deletes (TrackerDecorator *decorator,
                         * to query it first. This should be rare enough that
                         * it doesn't matter to accumulate them to query in
                         * batches. */
-                       if (!g_hash_table_contains (priv->elems,
-                                                   GINT_TO_POINTER (subject))) {
-                               query_type_and_add_element (decorator, subject);
-                       }
+                       decorator_cache_next_items (decorator);
                }
        }
 }
 
-static gboolean
-class_name_id_handled (TrackerDecorator *decorator,
-                       gint              id)
-{
-       TrackerDecoratorPrivate *priv;
-
-       priv = decorator->priv;
-
-       return class_name_array_contains (priv->class_name_ids, id);
-}
-
 static void
 handle_updates (TrackerDecorator *decorator,
                 GVariantIter     *iter)
 {
-       gint graph, subject, predicate, object;
-       TrackerDecoratorPrivate *priv;
-
-       priv = decorator->priv;
-
-       while (g_variant_iter_loop (iter, "(iiii)",
-                                   &graph, &subject, &predicate, &object)) {
-               if (predicate == priv->rdf_type_id &&
-                   class_name_id_handled (decorator, object))
-                       element_add (decorator, subject, object, FALSE);
-       }
+       /* Merely use this as a hint that there is something
+        * left to be processed.
+        */
+       if (g_variant_iter_n_children (iter) > 0)
+               decorator_cache_next_items (decorator);
 }
 
 static void
@@ -695,13 +1054,16 @@ class_signal_cb (GDBusConnection *connection,
                  GVariant        *parameters,
                  gpointer         user_data)
 {
+       TrackerDecorator *decorator = user_data;
        GVariantIter *iter1, *iter2;
 
        g_variant_get (parameters, "(&sa(iiii)a(iiii))", NULL, &iter1, &iter2);
-       handle_deletes (user_data, iter1);
-       handle_updates (user_data, iter2);
+       handle_deletes (decorator, iter1);
+       handle_updates (decorator, iter2);
        g_variant_iter_free (iter1);
        g_variant_iter_free (iter2);
+
+       decorator_query_remaining_items (decorator);
 }
 
 static gboolean
@@ -729,7 +1091,7 @@ tracker_decorator_initable_init (GInitable     *initable,
        priv->rdf_type_id = get_class_id (sparql_conn, "rdf:type", FALSE);
        priv->nie_data_source_id = get_class_id (sparql_conn, "nie:dataSource", FALSE);
        priv->data_source_id = get_class_id (sparql_conn, priv->data_source, TRUE);
-       tracker_decorator_validate_class_ids (decorator, priv->class_names);
+       tracker_decorator_validate_class_ids (decorator);
 
        priv->graph_updated_signal_id =
                g_dbus_connection_signal_subscribe (conn,
@@ -742,6 +1104,7 @@ tracker_decorator_initable_init (GInitable     *initable,
                                                    class_signal_cb,
                                                    initable, NULL);
        decorator_update_state (decorator, "Idle", FALSE);
+       decorator_rebuild_cache (decorator);
        return TRUE;
 }
 
@@ -752,7 +1115,6 @@ tracker_decorator_initable_iface_init (GInitableIface *iface)
        iface->init = tracker_decorator_initable_init;
 }
 
-
 static void
 tracker_decorator_constructed (GObject *object)
 {
@@ -770,7 +1132,6 @@ tracker_decorator_finalize (GObject *object)
        TrackerDecoratorPrivate *priv;
        TrackerDecorator *decorator;
        GDBusConnection *conn;
-       GList *l;
 
        decorator = TRACKER_DECORATOR (object);
        priv = decorator->priv;
@@ -781,80 +1142,33 @@ tracker_decorator_finalize (GObject *object)
                                                      priv->graph_updated_signal_id);
        }
 
-       while ((l = tracker_priority_queue_get_head (priv->elem_queue)))
-               element_remove_link (decorator, l, FALSE);
-
-       g_array_unref (priv->class_name_ids);
-       tracker_priority_queue_unref (priv->elem_queue);
-       g_array_unref (priv->priority_class_name_ids);
-       g_hash_table_unref (priv->elems);
+       g_queue_foreach (&priv->item_cache,
+                        (GFunc) tracker_decorator_info_unref,
+                        NULL);
+       g_queue_clear (&priv->item_cache);
+
+       decorator_cancel_active_tasks (decorator);
+       decorator_notify_empty (decorator);
+
+       g_free (priv->class_names);
+       g_hash_table_destroy (priv->tasks);
+       g_array_unref (priv->classes);
+       g_array_unref (priv->prepended_ids);
+       g_clear_pointer (&priv->sparql_buffer, (GDestroyNotify) g_array_unref);
+       g_clear_pointer (&priv->commit_buffer, (GDestroyNotify) g_array_unref);
+       g_sequence_free (priv->blacklist_items);
        g_free (priv->data_source);
-       g_strfreev (priv->class_names);
        g_timer_destroy (priv->timer);
 
-       if (priv->sparql_buffer)
-               g_ptr_array_unref (priv->sparql_buffer);
-
        G_OBJECT_CLASS (tracker_decorator_parent_class)->finalize (object);
 }
 
-void
-_tracker_decorator_query_append_rdf_type_filter (TrackerDecorator *decorator,
-                                                 GString          *query)
-{
-       const gchar **class_names;
-       gint i = 0;
-
-       class_names = tracker_decorator_get_class_names (decorator);
-
-       if (!class_names || !*class_names)
-               return;
-
-       g_string_append (query, "&& ?type IN (");
-
-       while (class_names[i]) {
-               if (i != 0)
-                       g_string_append (query, ",");
-
-               g_string_append (query, class_names[i]);
-               i++;
-       }
-
-       g_string_append (query, ") ");
-}
-
-static void
-query_elements_cb (GObject      *object,
-                   GAsyncResult *result,
-                   gpointer      user_data)
-{
-       TrackerSparqlConnection *conn;
-       TrackerSparqlCursor *cursor;
-       GError *error = NULL;
-
-       conn = TRACKER_SPARQL_CONNECTION (object);
-       cursor = tracker_sparql_connection_query_finish (conn, result, &error);
-
-        if (error) {
-                g_critical ("Could not load files missing metadata: %s", error->message);
-                g_error_free (error);
-               return;
-       }
-
-       while (tracker_sparql_cursor_next (cursor, NULL, NULL)) {
-               gint id = tracker_sparql_cursor_get_integer (cursor, 0);
-               gint class_name_id = tracker_sparql_cursor_get_integer (cursor, 1);
-               element_add (user_data, id, class_name_id, TRUE);
-       }
-
-       g_object_unref (cursor);
-}
-
 static void
 tracker_decorator_paused (TrackerMiner *miner)
 {
        TrackerDecoratorPrivate *priv;
 
+       decorator_cancel_active_tasks (TRACKER_DECORATOR (miner));
        priv = TRACKER_DECORATOR (miner)->priv;
        g_timer_stop (priv->timer);
 }
@@ -864,6 +1178,7 @@ tracker_decorator_resumed (TrackerMiner *miner)
 {
        TrackerDecoratorPrivate *priv;
 
+       decorator_cache_next_items (TRACKER_DECORATOR (miner));
        priv = TRACKER_DECORATOR (miner)->priv;
        g_timer_continue (priv->timer);
 }
@@ -873,6 +1188,7 @@ tracker_decorator_stopped (TrackerMiner *miner)
 {
        TrackerDecoratorPrivate *priv;
 
+       decorator_cancel_active_tasks (TRACKER_DECORATOR (miner));
        priv = TRACKER_DECORATOR (miner)->priv;
        g_timer_stop (priv->timer);
 }
@@ -880,33 +1196,14 @@ tracker_decorator_stopped (TrackerMiner *miner)
 static void
 tracker_decorator_started (TrackerMiner *miner)
 {
-       TrackerSparqlConnection *sparql_conn;
        TrackerDecoratorPrivate *priv;
        TrackerDecorator *decorator;
-       const gchar *data_source;
-       GString *query;
 
        decorator = TRACKER_DECORATOR (miner);
        priv = decorator->priv;
 
        g_timer_start (priv->timer);
-       data_source = tracker_decorator_get_data_source (decorator);
-       query = g_string_new ("SELECT tracker:id(?urn) tracker:id(?type) { "
-                             "  ?urn a rdfs:Resource ; "
-                             "       a ?type. ");
-
-       g_string_append_printf (query,
-                               "FILTER (! EXISTS { ?urn nie:dataSource <%s> } ",
-                               data_source);
-
-       _tracker_decorator_query_append_rdf_type_filter (decorator, query);
-       g_string_append (query, "&& BOUND(tracker:available(?urn)))}");
-
-       sparql_conn = tracker_miner_get_connection (miner);
-       tracker_sparql_connection_query_async (sparql_conn, query->str,
-                                              NULL, query_elements_cb,
-                                              decorator);
-       g_string_free (query, TRUE);
+       decorator_cache_next_items (decorator);
 }
 
 static void
@@ -994,18 +1291,27 @@ tracker_decorator_class_init (TrackerDecoratorClass *klass)
 }
 
 static void
+class_info_clear (ClassInfo *info)
+{
+       g_free (info->class_name);
+}
+
+static void
 tracker_decorator_init (TrackerDecorator *decorator)
 {
        TrackerDecoratorPrivate *priv;
 
        decorator->priv = priv = TRACKER_DECORATOR_GET_PRIVATE (decorator);
-       priv->elems = g_hash_table_new (NULL, NULL);
-       priv->elem_queue = tracker_priority_queue_new ();
-       priv->class_name_ids = g_array_new (FALSE, FALSE, sizeof (gint));
-       priv->priority_class_name_ids = g_array_new (FALSE, FALSE, sizeof (gint));
+       priv->classes = g_array_new (FALSE, FALSE, sizeof (ClassInfo));
+       g_array_set_clear_func (priv->classes, (GDestroyNotify) class_info_clear);
+       priv->blacklist_items = g_sequence_new (NULL);
+       priv->prepended_ids = g_array_new (FALSE, FALSE, sizeof (gint));
        priv->batch_size = DEFAULT_BATCH_SIZE;
-       priv->sparql_buffer = g_ptr_array_new_with_free_func (g_free);
        priv->timer = g_timer_new ();
+
+       g_queue_init (&priv->next_elem_queue);
+       g_queue_init (&priv->item_cache);
+       priv->tasks = g_hash_table_new (NULL, NULL);
 }
 
 /**
@@ -1069,7 +1375,8 @@ tracker_decorator_get_n_items (TrackerDecorator *decorator)
        g_return_val_if_fail (TRACKER_IS_DECORATOR (decorator), 0);
 
        priv = decorator->priv;
-       return g_hash_table_size (priv->elems);
+
+       return priv->n_remaining_items;
 }
 
 /**
@@ -1089,9 +1396,12 @@ tracker_decorator_prepend_id (TrackerDecorator *decorator,
                               gint              id,
                               gint              class_name_id)
 {
+       TrackerDecoratorPrivate *priv;
+
        g_return_if_fail (TRACKER_IS_DECORATOR (decorator));
 
-       element_add (decorator, id, class_name_id, TRUE);
+       priv = decorator->priv;
+       g_array_append_val (priv->prepended_ids, id);
 }
 
 /**
@@ -1111,171 +1421,9 @@ tracker_decorator_delete_id (TrackerDecorator *decorator,
 {
        g_return_if_fail (TRACKER_IS_DECORATOR (decorator));
 
+#if 0
        element_remove_by_id (decorator, id);
-}
-
-static void complete_tasks_or_query (TrackerDecorator *decorator);
-
-typedef struct {
-       TrackerDecorator *decorator;
-       GArray *ids;
-} QueryNextItemsData;
-
-static void
-query_next_items_cb (GObject      *object,
-                     GAsyncResult *result,
-                     gpointer      user_data)
-{
-       QueryNextItemsData *data = user_data;
-       TrackerDecorator *decorator = data->decorator;
-       TrackerDecoratorPrivate *priv;
-       TrackerSparqlConnection *conn;
-       TrackerSparqlCursor *cursor;
-       GList *elem;
-       ElemNode *node;
-       gint id;
-       guint i;
-       GError *error = NULL;
-
-       conn = TRACKER_SPARQL_CONNECTION (object);
-       cursor = tracker_sparql_connection_query_finish (conn, result, &error);
-       priv = decorator->priv;
-
-       if (error) {
-               GTask *task;
-
-               while ((task = g_queue_pop_head (&priv->next_elem_queue))) {
-                       g_task_return_error (task, g_error_copy (error));
-                       g_object_unref (task);
-               }
-
-               g_clear_error (&error);
-               goto out;
-       }
-
-       while (tracker_sparql_cursor_next (cursor, NULL, NULL)) {
-               id = tracker_sparql_cursor_get_integer (cursor, 1);
-               elem = g_hash_table_lookup (priv->elems, GINT_TO_POINTER (id));
-               if (!elem)
-                       continue;
-
-               node = elem->data;
-               node->info = tracker_decorator_info_new (cursor);
-       }
-
-       /* Remove elements that we queried but we didn't get info */
-       for (i = 0; i < data->ids->len; i++) {
-               id = g_array_index (data->ids, gint, i);
-               elem = g_hash_table_lookup (priv->elems, GINT_TO_POINTER (id));
-               if (!elem)
-                       continue;
-
-               node = elem->data;
-               if (!node->info)
-                       element_remove_link (decorator, elem, TRUE);
-       }
-
-       complete_tasks_or_query (decorator);
-
-out:
-       g_clear_object (&cursor);
-       g_array_unref (data->ids);
-       g_slice_free (QueryNextItemsData, data);
-}
-
-static void
-query_next_items (TrackerDecorator *decorator,
-                  GList            *l)
-{
-       TrackerSparqlConnection *sparql_conn;
-       TrackerDecoratorPrivate *priv;
-       GString *id_string;
-       gchar *query;
-       QueryNextItemsData *data;
-
-       priv = decorator->priv;
-
-       data = g_slice_new0 (QueryNextItemsData);
-       data->decorator = decorator;
-       data->ids = g_array_sized_new (FALSE, FALSE,
-                                      sizeof (gint),
-                                      QUERY_BATCH_SIZE);
-
-       id_string = g_string_new (NULL);
-       for (; l != NULL && data->ids->len < QUERY_BATCH_SIZE; l = l->next) {
-               ElemNode *node = l->data;
-
-               if (node->info)
-                       continue;
-
-               if (id_string->len > 0)
-                       g_string_append_c (id_string, ',');
-               g_string_append_printf (id_string, "%d", node->id);
-               g_array_append_val (data->ids, node->id);
-       }
-
-       g_assert (data->ids->len > 0);
-
-       query = g_strdup_printf ("SELECT ?urn"
-                                "       tracker:id(?urn) "
-                                "       nie:url(?urn) "
-                                "       nie:mimeType(?urn) { "
-                                "  ?urn tracker:available true . "
-                                "  FILTER (tracker:id(?urn) IN (%s) && "
-                                "          ! EXISTS { ?urn nie:dataSource <%s> })"
-                                "}", id_string->str, priv->data_source);
-
-       sparql_conn = tracker_miner_get_connection (TRACKER_MINER (decorator));
-       tracker_sparql_connection_query_async (sparql_conn, query,
-                                              NULL,
-                                              query_next_items_cb, data);
-       g_string_free (id_string, TRUE);
-       g_free (query);
-}
-
-static void
-complete_tasks_or_query (TrackerDecorator *decorator)
-{
-       TrackerDecoratorPrivate *priv;
-       GList *l;
-       GTask *task;
-
-       priv = decorator->priv;
-
-       for (l = tracker_priority_queue_get_head (priv->elem_queue);
-            l != NULL;
-            l = l->next) {
-               ElemNode *node = l->data;
-
-               /* The next item isn't queried yet, do it now */
-               if (!node->info) {
-                       query_next_items (decorator, l);
-                       return;
-               }
-
-               /* If the item is not already being processed, we can complete a
-                * task with it. */
-               if (!node->info->task) {
-                       task = g_queue_pop_head (&priv->next_elem_queue);
-                       element_ensure_task (node, decorator);
-                       g_task_return_pointer (task,
-                                              tracker_decorator_info_ref (node->info),
-                                              (GDestroyNotify) tracker_decorator_info_unref);
-                       g_object_unref (task);
-
-                       if (g_queue_is_empty (&priv->next_elem_queue))
-                               return;
-               }
-       }
-
-       /* There is no element left, or they are all being processed already */
-       while ((task = g_queue_pop_head (&priv->next_elem_queue))) {
-               g_task_return_new_error (task,
-                                        tracker_decorator_error_quark (),
-                                        TRACKER_DECORATOR_ERROR_EMPTY,
-                                        "There are no items left");
-               g_object_unref (task);
-       }
+#endif
 }
 
 /**
@@ -1320,12 +1468,8 @@ tracker_decorator_next (TrackerDecorator    *decorator,
                return;
        }
 
-       /* Push the task in a queue, for the case this function is called
-        * multiple before it finishes. */
        g_queue_push_tail (&priv->next_elem_queue, task);
-       if (g_queue_get_length (&priv->next_elem_queue) == 1) {
-               complete_tasks_or_query (decorator);
-       }
+       decorator_pair_tasks (decorator);
 }
 
 /**
@@ -1355,51 +1499,52 @@ tracker_decorator_next_finish (TrackerDecorator  *decorator,
        return g_task_propagate_pointer (G_TASK (result), error);
 }
 
+static gint
+class_compare_func (const ClassInfo *a,
+                    const ClassInfo *b)
+{
+       return b->priority - a->priority;
+}
+
+static void
+decorator_set_class_priority (TrackerDecorator *decorator,
+                              const gchar      *class,
+                              gint              priority)
+{
+       TrackerDecoratorPrivate *priv = decorator->priv;
+       ClassInfo *info;
+       gint i;
+
+       for (i = 0; i < priv->classes->len; i++) {
+               info = &g_array_index (priv->classes, ClassInfo, i);
+
+               if (strcmp (info->class_name, class) != 0)
+                       continue;
+
+               info->priority = priority;
+               break;
+       }
+}
+
 void
 tracker_decorator_set_priority_rdf_types (TrackerDecorator    *decorator,
                                           const gchar * const *rdf_types)
 {
        TrackerDecoratorPrivate *priv;
-       TrackerPriorityQueue *new_queue;
-       guint i, j;
-       GList *elem_link;
+       gint i;
 
        g_return_if_fail (TRACKER_DECORATOR (decorator));
        g_return_if_fail (rdf_types != NULL);
 
        priv = decorator->priv;
 
-       if (priv->priority_class_name_ids->len > 0)
-               g_array_remove_range (priv->priority_class_name_ids, 0,
-                                     priv->priority_class_name_ids->len);
-
-       for (i = 0; rdf_types[i] != NULL; i++) {
-               for (j = 0; priv->class_names[j] != NULL; j++) {
-                       gint id;
-
-                       if (!g_str_equal (rdf_types[i], priv->class_names[j]))
-                               continue;
-
-                       /* priv->class_names and priv->class_name_ids are in the
-                        * same order */
-                       id = g_array_index (priv->class_name_ids, gint, j);
-                       g_array_append_val (priv->priority_class_name_ids, id);
-                       break;
-               }
+       for (i = 0; rdf_types[i]; i++) {
+               decorator_set_class_priority (decorator, rdf_types[i],
+                                             G_PRIORITY_HIGH);
        }
 
-       /* We have to re-evaluate the priority of each element. We also have to
-        * keep the same elem_link because they are referenced in priv->elems.
-        * So we create a new priority queue and transfer nodes one by one. */
-       new_queue = tracker_priority_queue_new ();
-       while ((elem_link = tracker_priority_queue_pop_node (priv->elem_queue, NULL))) {
-               ElemNode *node = elem_link->data;
-
-               tracker_priority_queue_add_node (new_queue, elem_link,
-                                                elem_node_get_priority (decorator, node));
-       }
-       tracker_priority_queue_unref (priv->elem_queue);
-       priv->elem_queue = new_queue;
+       g_array_sort (priv->classes, (GCompareFunc) class_compare_func);
+       decorator_rebuild_cache (decorator);
 }
 
 /**


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]