[tracker-miners/wip/carlosg/miner-files-queues: 3/10] libtracker-miner: Avoid stopping during flush if possible




commit a15b62dc784f3c1298cbf0982fe40425c7aebbca
Author: Carlos Garnacho <carlosg gnome org>
Date:   Wed Dec 15 20:32:10 2021 +0100

    libtracker-miner: Avoid stopping during flush if possible
    
    Due to the way we asked the store to create the file content URNs
    for us, we fairly often had to stop all the machinery while there was
    a SPARQL batch being inserted, since we need information on files
    that are in the batch being executed before proceeding with files
    being processed now (e.g. a nfo:belongsToContainer relation).
    
    Since the file content URNs are stable and quick to fetch from GIO,
    we no longer need to wait for anything here. File processing is
    able to continue now despite these "dependency" files being in a
    batch that is being inserted.
    
    Still, avoid to have more than one batch in flight, if the SPARQL
    buffer gets full again before the prior batch finished insertion,
    processing will stop until that happens. This still favors memory
    usage over parallellization, if batches are built up quicker than
    they are inserted in the database, there will always be worth 2 of
    them in flight.

 src/libtracker-miner/tracker-miner-fs.c      | 182 ++++++---------------------
 src/libtracker-miner/tracker-sparql-buffer.c |  10 +-
 2 files changed, 42 insertions(+), 150 deletions(-)
---
diff --git a/src/libtracker-miner/tracker-miner-fs.c b/src/libtracker-miner/tracker-miner-fs.c
index 0c8dc8950..aaada76e7 100644
--- a/src/libtracker-miner/tracker-miner-fs.c
+++ b/src/libtracker-miner/tracker-miner-fs.c
@@ -111,7 +111,6 @@ struct _TrackerMinerFSPrivate {
        GHashTable *items_by_file;
 
        guint item_queues_handler_id;
-       GFile *item_queue_blocker;
 
        /* Root / tree / index */
        GFile *root;
@@ -140,6 +139,7 @@ struct _TrackerMinerFSPrivate {
                                     * done */
        guint shown_totals : 1;     /* TRUE if totals have been shown */
        guint is_paused : 1;        /* TRUE if miner is paused */
+       guint flushing : 1;         /* TRUE if flushing SPARQL */
 
        guint timer_stopped : 1;    /* TRUE if main timer is stopped */
        guint extraction_timer_stopped : 1; /* TRUE if the extraction
@@ -674,10 +674,6 @@ fs_finalize (GObject *object)
                priv->item_queues_handler_id = 0;
        }
 
-       if (priv->item_queue_blocker) {
-               g_object_unref (priv->item_queue_blocker);
-       }
-
        if (priv->file_notifier) {
                tracker_file_notifier_stop (priv->file_notifier);
        }
@@ -979,28 +975,12 @@ process_stop (TrackerMinerFS *fs)
        fs->priv->been_crawled = TRUE;
 }
 
-static gboolean
-item_queue_is_blocked_by_file (TrackerMinerFS *fs,
-                               GFile *file)
-{
-       g_return_val_if_fail (G_IS_FILE (file), FALSE);
-
-       if (fs->priv->item_queue_blocker != NULL &&
-           (fs->priv->item_queue_blocker == file ||
-            g_file_equal (fs->priv->item_queue_blocker, file))) {
-               return TRUE;
-       }
-
-       return FALSE;
-}
-
 static void
 sparql_buffer_flush_cb (GObject      *object,
                         GAsyncResult *result,
                         gpointer      user_data)
 {
        TrackerMinerFS *fs = user_data;
-       TrackerMinerFSPrivate *priv = fs->priv;
        GPtrArray *tasks;
        GError *error = NULL;
        TrackerTask *task;
@@ -1028,33 +1008,23 @@ sparql_buffer_flush_cb (GObject      *object,
                } else {
                        tracker_error_report_delete (task_file);
                }
-
-               if (item_queue_is_blocked_by_file (fs, task_file))
-                       g_clear_object (&fs->priv->item_queue_blocker);
        }
 
-       if (priv->item_queue_blocker != NULL) {
-               if (tracker_task_pool_get_size (TRACKER_TASK_POOL (object)) > 0) {
-                       tracker_sparql_buffer_flush (TRACKER_SPARQL_BUFFER (object),
-                                                    "Item queue still blocked after flush",
-                                                    sparql_buffer_flush_cb,
-                                                    fs);
+       fs->priv->flushing = FALSE;
 
-                       /* Check if we've finished inserting for given prefixes ... */
-                       notify_roots_finished (fs);
-               }
-       } else if (tracker_task_pool_limit_reached (TRACKER_TASK_POOL (object))) {
-               tracker_sparql_buffer_flush (TRACKER_SPARQL_BUFFER (object),
-                                            "SPARQL buffer limit reached",
-                                            sparql_buffer_flush_cb,
-                                            fs);
+       if (tracker_task_pool_limit_reached (TRACKER_TASK_POOL (object))) {
+               if (tracker_sparql_buffer_flush (TRACKER_SPARQL_BUFFER (object),
+                                                "SPARQL buffer again full after flush",
+                                                sparql_buffer_flush_cb,
+                                                fs))
+                       fs->priv->flushing = TRUE;
 
                /* Check if we've finished inserting for given prefixes ... */
                notify_roots_finished (fs);
-       } else {
-               item_queue_handlers_set_up (fs);
        }
 
+       item_queue_handlers_set_up (fs);
+
        g_ptr_array_unref (tasks);
        g_clear_error (&error);
 }
@@ -1177,35 +1147,6 @@ item_move (TrackerMinerFS *fs,
        return TRUE;
 }
 
-static gboolean
-should_wait (TrackerMinerFS *fs,
-             GFile          *file)
-{
-       GFile *parent;
-
-       /* Is the item already being processed? */
-       if (tracker_sparql_buffer_get_state (fs->priv->sparql_buffer, file) == TRACKER_BUFFER_STATE_FLUSHING) 
{
-               /* Yes, a previous event on same item currently
-                * being processed */
-               fs->priv->item_queue_blocker = g_object_ref (file);
-               return TRUE;
-       }
-
-       /* Is the item's parent being processed right now? */
-       parent = g_file_get_parent (file);
-       if (parent) {
-               if (tracker_sparql_buffer_get_state (fs->priv->sparql_buffer, parent) == 
TRACKER_BUFFER_STATE_FLUSHING) {
-                       /* Yes, a previous event on the parent of this item
-                        * currently being processed */
-                       fs->priv->item_queue_blocker = parent;
-                       return TRUE;
-               }
-
-               g_object_unref (parent);
-       }
-       return FALSE;
-}
-
 static gboolean
 maybe_remove_file_event_node (TrackerMinerFS *fs,
                               QueueEvent     *event)
@@ -1233,7 +1174,7 @@ remove_items_by_file_foreach (gpointer key,
        return queue_event_is_equal_or_descendant (link->data, file);
 }
 
-static gboolean
+static void
 item_queue_get_next_file (TrackerMinerFS           *fs,
                           GFile                   **file,
                           GFile                   **source_file,
@@ -1247,27 +1188,9 @@ item_queue_get_next_file (TrackerMinerFS           *fs,
        *file = NULL;
        *source_file = NULL;
 
-       if (tracker_file_notifier_is_active (fs->priv->file_notifier) ||
-           tracker_task_pool_limit_reached (TRACKER_TASK_POOL (fs->priv->sparql_buffer))) {
-               if (!fs->priv->extraction_timer_stopped) {
-                       fs->priv->extraction_timer_stopped = TRUE;
-                       g_timer_stop (fs->priv->extraction_timer);
-               }
-
-               /* There are still pending items to crawl,
-                * or extract pool limit is reached
-                */
-               return FALSE;
-       }
-
        event = tracker_priority_queue_peek (fs->priv->items, NULL);
 
        if (event) {
-               if (should_wait (fs, event->file) ||
-                   (event->dest_file && should_wait (fs, event->dest_file))) {
-                       return FALSE;
-               }
-
                if (event->type == TRACKER_MINER_FS_EVENT_MOVED) {
                        g_set_object (file, event->dest_file);
                        g_set_object (source_file, event->file);
@@ -1284,8 +1207,6 @@ item_queue_get_next_file (TrackerMinerFS           *fs,
                queue_event_free (event);
                tracker_priority_queue_pop (fs->priv->items, NULL);
        }
-
-       return TRUE;
 }
 
 static gdouble
@@ -1332,31 +1253,8 @@ miner_handle_next_item (TrackerMinerFS *fs)
        TrackerMinerFSEventType type;
        GFileInfo *info = NULL;
 
-       if (tracker_task_pool_limit_reached (TRACKER_TASK_POOL (fs->priv->sparql_buffer))) {
-               /* Task pool is full, give it a break */
-               return FALSE;
-       }
-
-       if (!item_queue_get_next_file (fs, &file, &source_file, &info, &type,
-                                      &attributes_update, &is_dir)) {
-               /* We should flush the processing pool buffer here, because
-                * if there was a previous task on the same file we want to
-                * process now, we want it to get finished before we can go
-                * on with the queues... */
-               tracker_sparql_buffer_flush (fs->priv->sparql_buffer,
-                                            "Queue handlers WAIT",
-                                            sparql_buffer_flush_cb,
-                                            fs);
-
-               /* Check if we've finished inserting for given prefixes ... */
-               notify_roots_finished (fs);
-
-               /* Items are still being processed, so wait until
-                * the processing pool is cleared before starting with
-                * the next directories batch.
-                */
-               return FALSE;
-       }
+       item_queue_get_next_file (fs, &file, &source_file, &info, &type,
+                                 &attributes_update, &is_dir);
 
        if (file == NULL && !fs->priv->extraction_timer_stopped) {
                g_timer_stop (fs->priv->extraction_timer);
@@ -1443,15 +1341,17 @@ miner_handle_next_item (TrackerMinerFS *fs)
 
        if (file == NULL) {
                if (!tracker_file_notifier_is_active (fs->priv->file_notifier)) {
-                       if (tracker_task_pool_get_size (TRACKER_TASK_POOL (fs->priv->sparql_buffer)) == 0) {
+                       if (!fs->priv->flushing &&
+                           tracker_task_pool_get_size (TRACKER_TASK_POOL (fs->priv->sparql_buffer)) == 0) {
                                /* Print stats and signal finished */
                                process_stop (fs);
                        } else {
                                /* Flush any possible pending update here */
-                               tracker_sparql_buffer_flush (fs->priv->sparql_buffer,
-                                                            "Queue handlers NONE",
-                                                            sparql_buffer_flush_cb,
-                                                            fs);
+                               if (tracker_sparql_buffer_flush (fs->priv->sparql_buffer,
+                                                                "Queue handlers NONE",
+                                                                sparql_buffer_flush_cb,
+                                                                fs))
+                                       fs->priv->flushing = TRUE;
 
                                /* Check if we've finished inserting for given prefixes ... */
                                notify_roots_finished (fs);
@@ -1480,26 +1380,25 @@ miner_handle_next_item (TrackerMinerFS *fs)
                g_assert_not_reached ();
        }
 
-       if (item_queue_is_blocked_by_file (fs, file)) {
-               tracker_sparql_buffer_flush (fs->priv->sparql_buffer,
-                                            "Current file is blocking item queue",
-                                            sparql_buffer_flush_cb,
-                                            fs);
-
-               /* Check if we've finished inserting for given prefixes ... */
-               notify_roots_finished (fs);
-       } else if (tracker_task_pool_limit_reached (TRACKER_TASK_POOL (fs->priv->sparql_buffer))) {
-               tracker_sparql_buffer_flush (fs->priv->sparql_buffer,
-                                            "SPARQL buffer limit reached",
-                                            sparql_buffer_flush_cb,
-                                            fs);
+       if (tracker_task_pool_limit_reached (TRACKER_TASK_POOL (fs->priv->sparql_buffer))) {
+               if (tracker_sparql_buffer_flush (fs->priv->sparql_buffer,
+                                                "SPARQL buffer limit reached",
+                                                sparql_buffer_flush_cb,
+                                                fs)) {
+                       fs->priv->flushing = TRUE;
+               } else {
+                       /* If we cannot flush, wait for the pending operations
+                        * to finish.
+                        */
+                       keep_processing = FALSE;
+               }
 
                /* Check if we've finished inserting for given prefixes ... */
                notify_roots_finished (fs);
-       } else {
-               item_queue_handlers_set_up (fs);
        }
 
+       item_queue_handlers_set_up (fs);
+
        g_clear_object (&file);
        g_clear_object (&source_file);
        g_clear_object (&info);
@@ -1557,12 +1456,6 @@ item_queue_handlers_set_up (TrackerMinerFS *fs)
                return;
        }
 
-       if (fs->priv->item_queue_blocker) {
-               trace_eq ("   cancelled: item queue blocked waiting for file '%s'",
-                         g_file_get_uri (fs->priv->item_queue_blocker));
-               return;
-       }
-
        /* Already processing max number of sparql updates */
        if (tracker_task_pool_limit_reached (TRACKER_TASK_POOL (fs->priv->sparql_buffer))) {
                trace_eq ("   cancelled: pool limit reached (sparql buffer: %u)",
@@ -1831,12 +1724,7 @@ file_notifier_finished (TrackerFileNotifier *notifier,
 {
        TrackerMinerFS *fs = user_data;
 
-       if (!tracker_miner_fs_has_items_to_process (fs)) {
-               g_info ("Finished all tasks");
-               process_stop (fs);
-       } else {
-               item_queue_handlers_set_up (fs);
-       }
+       item_queue_handlers_set_up (fs);
 }
 
 static void
diff --git a/src/libtracker-miner/tracker-sparql-buffer.c b/src/libtracker-miner/tracker-sparql-buffer.c
index 3eddd3728..ca24cbbee 100644
--- a/src/libtracker-miner/tracker-sparql-buffer.c
+++ b/src/libtracker-miner/tracker-sparql-buffer.c
@@ -174,9 +174,6 @@ update_batch_data_free (UpdateBatchData *batch_data)
 {
        g_object_unref (batch_data->batch);
 
-       g_ptr_array_foreach (batch_data->tasks,
-                            (GFunc) remove_task_foreach,
-                            batch_data->buffer);
        g_ptr_array_unref (batch_data->tasks);
 
        g_clear_object (&batch_data->async_task);
@@ -257,6 +254,13 @@ tracker_sparql_buffer_flush (TrackerSparqlBuffer *buffer,
        priv->n_updates++;
        g_clear_object (&priv->batch);
 
+       /* While flushing, remove the tasks from the task pool too, so it's
+        * hinted as below limits again.
+        */
+       g_ptr_array_foreach (update_data->tasks,
+                            (GFunc) remove_task_foreach,
+                            update_data->buffer);
+
        tracker_batch_execute_async (update_data->batch,
                                     NULL,
                                     batch_execute_cb,


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]