[tracker-miners/wip/carlosg/batches-and-resources: 20/31] libtracker-miner: Make SPARQL flush async, not push




commit b645a693ebcbd47eb93bca6579b46338b524550c
Author: Carlos Garnacho <carlosg gnome org>
Date:   Wed Dec 2 18:06:20 2020 +0100

    libtracker-miner: Make SPARQL flush async, not push
    
    We add a GTask per file, just to dispatch them en masse when the
    sparql buffer is finished. Do this a little bit further up, so
    we just need to dispatch a single task for flush().

 src/libtracker-miner/tracker-miner-fs.c      | 82 +++++++++++++++------------
 src/libtracker-miner/tracker-sparql-buffer.c | 83 ++++++----------------------
 src/libtracker-miner/tracker-sparql-buffer.h | 14 ++---
 3 files changed, 73 insertions(+), 106 deletions(-)
---
diff --git a/src/libtracker-miner/tracker-miner-fs.c b/src/libtracker-miner/tracker-miner-fs.c
index 2e39047e6..ce33b4ede 100644
--- a/src/libtracker-miner/tracker-miner-fs.c
+++ b/src/libtracker-miner/tracker-miner-fs.c
@@ -1228,49 +1228,56 @@ item_queue_is_blocked_by_file (TrackerMinerFS *fs,
 }
 
 static void
-sparql_buffer_task_finished_cb (GObject      *object,
-                                GAsyncResult *result,
-                                gpointer      user_data)
+sparql_buffer_flush_cb (GObject      *object,
+                        GAsyncResult *result,
+                        gpointer      user_data)
 {
-       TrackerMinerFS *fs;
-       TrackerMinerFSPrivate *priv;
+       TrackerMinerFS *fs = user_data;
+       TrackerMinerFSPrivate *priv = fs->priv;
+       GPtrArray *tasks;
+       GError *error = NULL;
        TrackerTask *task;
        GFile *task_file;
-       GError *error = NULL;
-
-       fs = user_data;
-       priv = fs->priv;
+       guint i;
 
-       task = tracker_sparql_buffer_push_finish (TRACKER_SPARQL_BUFFER (object),
-                                                 result, &error);
-       task_file = tracker_task_get_file (task);
+       tasks = tracker_sparql_buffer_flush_finish (TRACKER_SPARQL_BUFFER (object),
+                                                   result, &error);
 
        if (error) {
                g_warning ("Could not execute sparql: %s", error->message);
-               tracker_error_report (task_file, error->message,
-                                     tracker_sparql_task_get_sparql (task));
-               priv->total_files_notified_error++;
-               g_error_free (error);
        }
 
-       tracker_error_report_delete (task_file);
+       for (i = 0; i < tasks->len; i++) {
+               task = g_ptr_array_index (tasks, i);
+               task_file = tracker_task_get_file (task);
 
-       if (item_queue_is_blocked_by_file (fs, task_file)) {
-               g_object_unref (priv->item_queue_blocker);
-               priv->item_queue_blocker = NULL;
+               if (error) {
+                       tracker_error_report (task_file, error->message,
+                                             tracker_sparql_task_get_sparql (task));
+                       fs->priv->total_files_notified_error++;
+               } else {
+                       tracker_error_report_delete (task_file);
+               }
+
+               if (item_queue_is_blocked_by_file (fs, task_file))
+                       g_clear_object (&fs->priv->item_queue_blocker);
        }
 
        if (priv->item_queue_blocker != NULL) {
                if (tracker_task_pool_get_size (TRACKER_TASK_POOL (object)) > 0) {
                        tracker_sparql_buffer_flush (TRACKER_SPARQL_BUFFER (object),
-                                                    "Item queue still blocked after flush");
+                                                    "Item queue still blocked after flush",
+                                                    sparql_buffer_flush_cb,
+                                                    fs);
 
                        /* Check if we've finished inserting for given prefixes ... */
                        notify_roots_finished (fs, TRUE);
                }
        } else if (tracker_task_pool_limit_reached (TRACKER_TASK_POOL (object))) {
                tracker_sparql_buffer_flush (TRACKER_SPARQL_BUFFER (object),
-                                            "SPARQL buffer limit reached");
+                                            "SPARQL buffer limit reached",
+                                            sparql_buffer_flush_cb,
+                                            fs);
 
                /* Check if we've finished inserting for given prefixes ... */
                notify_roots_finished (fs, TRUE);
@@ -1278,7 +1285,7 @@ sparql_buffer_task_finished_cb (GObject      *object,
                item_queue_handlers_set_up (fs);
        }
 
-       tracker_task_unref (task);
+       g_clear_error (&error);
 }
 
 static void
@@ -1299,18 +1306,21 @@ push_sparql_task (TrackerMinerFS *fs,
 
        if (sparql_task) {
                tracker_sparql_buffer_push (fs->priv->sparql_buffer,
-                                           sparql_task,
-                                           sparql_buffer_task_finished_cb,
-                                           fs);
+                                           sparql_task);
 
                if (item_queue_is_blocked_by_file (fs, file)) {
-                       tracker_sparql_buffer_flush (fs->priv->sparql_buffer, "Current file is blocking item 
queue");
+                       tracker_sparql_buffer_flush (fs->priv->sparql_buffer,
+                                                    "Current file is blocking item queue",
+                                                    sparql_buffer_flush_cb,
+                                                    fs);
 
                        /* Check if we've finished inserting for given prefixes ... */
                        notify_roots_finished (fs, TRUE);
                } else if (tracker_task_pool_limit_reached (TRACKER_TASK_POOL (fs->priv->sparql_buffer))) {
                        tracker_sparql_buffer_flush (fs->priv->sparql_buffer,
-                                                    "SPARQL buffer limit reached");
+                                                    "SPARQL buffer limit reached",
+                                                    sparql_buffer_flush_cb,
+                                                    fs);
 
                        /* Check if we've finished inserting for given prefixes ... */
                        notify_roots_finished (fs, TRUE);
@@ -1597,9 +1607,7 @@ push_task (TrackerMinerFS *fs,
 
        task = tracker_sparql_task_new_take_sparql_str (file, sparql);
        tracker_sparql_buffer_push (fs->priv->sparql_buffer,
-                                   task,
-                                   sparql_buffer_task_finished_cb,
-                                   fs);
+                                   task);
        tracker_task_unref (task);
 }
 
@@ -1631,7 +1639,9 @@ miner_handle_next_item (TrackerMinerFS *fs)
                 * process now, we want it to get finished before we can go
                 * on with the queues... */
                tracker_sparql_buffer_flush (fs->priv->sparql_buffer,
-                                            "Queue handlers WAIT");
+                                            "Queue handlers WAIT",
+                                            sparql_buffer_flush_cb,
+                                            fs);
 
                /* Check if we've finished inserting for given prefixes ... */
                notify_roots_finished (fs, TRUE);
@@ -1735,7 +1745,9 @@ miner_handle_next_item (TrackerMinerFS *fs)
                        } else {
                                /* Flush any possible pending update here */
                                tracker_sparql_buffer_flush (fs->priv->sparql_buffer,
-                                                            "Queue handlers NONE");
+                                                            "Queue handlers NONE",
+                                                            sparql_buffer_flush_cb,
+                                                            fs);
 
                                /* Check if we've finished inserting for given prefixes ... */
                                notify_roots_finished (fs, TRUE);
@@ -1790,7 +1802,9 @@ miner_handle_next_item (TrackerMinerFS *fs)
 
        if (tracker_task_pool_limit_reached (TRACKER_TASK_POOL (fs->priv->sparql_buffer))) {
                tracker_sparql_buffer_flush (fs->priv->sparql_buffer,
-                                            "SPARQL buffer limit reached");
+                                            "SPARQL buffer limit reached",
+                                            sparql_buffer_flush_cb,
+                                            fs);
 
                /* Check if we've finished inserting for given prefixes ... */
                notify_roots_finished (fs, TRUE);
diff --git a/src/libtracker-miner/tracker-sparql-buffer.c b/src/libtracker-miner/tracker-sparql-buffer.c
index 4418f3a71..eaea5ec84 100644
--- a/src/libtracker-miner/tracker-sparql-buffer.c
+++ b/src/libtracker-miner/tracker-sparql-buffer.c
@@ -45,13 +45,13 @@ struct _TrackerSparqlBufferPrivate
 struct _SparqlTaskData
 {
        gchar *str;
-       GTask *async_task;
 };
 
 struct _UpdateArrayData {
        TrackerSparqlBuffer *buffer;
        GPtrArray *tasks;
        GArray *sparql_array;
+       GTask *async_task;
 };
 
 G_DEFINE_TYPE_WITH_PRIVATE (TrackerSparqlBuffer, tracker_sparql_buffer, TRACKER_TYPE_TASK_POOL)
@@ -166,7 +166,7 @@ update_array_data_free (UpdateArrayData *update_data)
        g_ptr_array_foreach (update_data->tasks,
                             (GFunc) remove_task_foreach,
                             update_data->buffer);
-       g_ptr_array_free (update_data->tasks, TRUE);
+       g_ptr_array_unref (update_data->tasks);
 
        g_slice_free (UpdateArrayData, update_data);
 }
@@ -180,7 +180,6 @@ tracker_sparql_buffer_update_array_cb (GObject      *object,
        TrackerSparqlBuffer *buffer;
        GError *error = NULL;
        UpdateArrayData *update_data;
-       gint i;
 
        update_data = user_data;
        buffer = TRACKER_SPARQL_BUFFER (update_data->buffer);
@@ -198,41 +197,23 @@ tracker_sparql_buffer_update_array_cb (GObject      *object,
                            error->message);
        }
 
-       /* Report status on each task of the batch update */
-       for (i = 0; i < update_data->tasks->len; i++) {
-               TrackerTask *task;
-               SparqlTaskData *task_data;
-
-               task = g_ptr_array_index (update_data->tasks, i);
-               task_data = tracker_task_get_data (task);
-
-               /* Call finished handler with the error, if any */
-               if (error) {
-                       g_task_return_error (task_data->async_task,
-                                            g_error_copy (error));
-               } else {
-                       g_task_return_pointer (task_data->async_task,
-                                              tracker_task_ref (task),
-                                              (GDestroyNotify) tracker_task_unref);
-               }
-
-               g_clear_object (&task_data->async_task);
-
-               /* No need to deallocate the task here, it will be done when
-                * unref-ing the UpdateArrayData below */
+       if (error) {
+               g_task_return_error (update_data->async_task, error);
+       } else {
+               g_task_return_pointer (update_data->async_task,
+                                      g_ptr_array_ref (update_data->tasks),
+                                      (GDestroyNotify) g_ptr_array_unref);
        }
 
        /* Note that tasks are actually deallocated here */
        update_array_data_free (update_data);
-
-       if (error) {
-               g_error_free (error);
-       }
 }
 
 gboolean
 tracker_sparql_buffer_flush (TrackerSparqlBuffer *buffer,
-                             const gchar         *reason)
+                             const gchar         *reason,
+                             GAsyncReadyCallback  cb,
+                             gpointer             user_data)
 {
        TrackerSparqlBufferPrivate *priv;
        GArray *sparql_array;
@@ -268,6 +249,7 @@ tracker_sparql_buffer_flush (TrackerSparqlBuffer *buffer,
        update_data->buffer = buffer;
        update_data->tasks = g_ptr_array_ref (priv->tasks);
        update_data->sparql_array = sparql_array;
+       update_data->async_task = g_task_new (buffer, NULL, cb, user_data);
 
        /* Empty pool, update_data will keep
         * references to the tasks to keep
@@ -309,28 +291,11 @@ sparql_buffer_push_to_pool (TrackerSparqlBuffer *buffer,
 
 void
 tracker_sparql_buffer_push (TrackerSparqlBuffer *buffer,
-                            TrackerTask         *task,
-                            GAsyncReadyCallback  cb,
-                            gpointer             user_data)
+                            TrackerTask         *task)
 {
-       SparqlTaskData *data;
-
        g_return_if_fail (TRACKER_IS_SPARQL_BUFFER (buffer));
        g_return_if_fail (task != NULL);
 
-       /* NOTE: We don't own the task and if we want it we have to
-        * reference it, each function below references task in
-        * different ways.
-        */
-       data = tracker_task_get_data (task);
-
-       if (!data->async_task) {
-               data->async_task = g_task_new (buffer, NULL, cb, user_data);
-               g_task_set_task_data (data->async_task,
-                                     tracker_task_ref (task),
-                                     (GDestroyNotify) tracker_task_unref);
-       }
-
        sparql_buffer_push_to_pool (buffer, task);
 }
 
@@ -350,11 +315,6 @@ static void
 sparql_task_data_free (SparqlTaskData *data)
 {
        g_free (data->str);
-
-       if (data->async_task) {
-               g_object_unref (data->async_task);
-       }
-
        g_slice_free (SparqlTaskData, data);
 }
 
@@ -390,23 +350,16 @@ tracker_sparql_task_get_sparql (TrackerTask *task)
        return task_data->str;
 }
 
-TrackerTask *
-tracker_sparql_buffer_push_finish (TrackerSparqlBuffer  *buffer,
-                                   GAsyncResult         *res,
-                                   GError              **error)
+GPtrArray *
+tracker_sparql_buffer_flush_finish (TrackerSparqlBuffer  *buffer,
+                                    GAsyncResult         *res,
+                                    GError              **error)
 {
-       TrackerTask *task;
-
        g_return_val_if_fail (TRACKER_IS_SPARQL_BUFFER (buffer), NULL);
        g_return_val_if_fail (G_IS_ASYNC_RESULT (res), NULL);
        g_return_val_if_fail (!error || !*error, NULL);
 
-       task = g_task_propagate_pointer (G_TASK (res), error);
-
-       if (!task)
-               task = g_object_ref (g_task_get_task_data (G_TASK (res)));
-
-       return task;
+       return g_task_propagate_pointer (G_TASK (res), error);
 }
 
 static gboolean
diff --git a/src/libtracker-miner/tracker-sparql-buffer.h b/src/libtracker-miner/tracker-sparql-buffer.h
index 7f5f53515..0d9c5fce2 100644
--- a/src/libtracker-miner/tracker-sparql-buffer.h
+++ b/src/libtracker-miner/tracker-sparql-buffer.h
@@ -65,16 +65,16 @@ TrackerSparqlBuffer *tracker_sparql_buffer_new   (TrackerSparqlConnection *conne
                                                   guint                    limit);
 
 gboolean             tracker_sparql_buffer_flush (TrackerSparqlBuffer *buffer,
-                                                  const gchar         *reason);
-
-void                 tracker_sparql_buffer_push  (TrackerSparqlBuffer *buffer,
-                                                  TrackerTask         *task,
+                                                  const gchar         *reason,
                                                   GAsyncReadyCallback  cb,
                                                   gpointer             user_data);
 
-TrackerTask *        tracker_sparql_buffer_push_finish (TrackerSparqlBuffer  *buffer,
-                                                        GAsyncResult         *res,
-                                                        GError              **error);
+GPtrArray *          tracker_sparql_buffer_flush_finish (TrackerSparqlBuffer  *buffer,
+                                                         GAsyncResult         *res,
+                                                         GError              **error);
+
+void                 tracker_sparql_buffer_push  (TrackerSparqlBuffer *buffer,
+                                                  TrackerTask         *task);
 
 TrackerSparqlBufferState tracker_sparql_buffer_get_state (TrackerSparqlBuffer *buffer,
                                                           GFile               *file);


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]