[tracker-miners/wip/carlosg/batches-and-resources: 20/31] libtracker-miner: Make SPARQL flush async, not push
- From: Carlos Garnacho <carlosg src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [tracker-miners/wip/carlosg/batches-and-resources: 20/31] libtracker-miner: Make SPARQL flush async, not push
- Date: Fri, 11 Dec 2020 10:55:03 +0000 (UTC)
commit b645a693ebcbd47eb93bca6579b46338b524550c
Author: Carlos Garnacho <carlosg gnome org>
Date: Wed Dec 2 18:06:20 2020 +0100
libtracker-miner: Make SPARQL flush async, not push
We add a GTask per file, just to dispatch them en masse when the
sparql buffer is finished. Do this a little bit further up, so
we just need to dispatch a single task for flush().
src/libtracker-miner/tracker-miner-fs.c | 82 +++++++++++++++------------
src/libtracker-miner/tracker-sparql-buffer.c | 83 ++++++----------------------
src/libtracker-miner/tracker-sparql-buffer.h | 14 ++---
3 files changed, 73 insertions(+), 106 deletions(-)
---
diff --git a/src/libtracker-miner/tracker-miner-fs.c b/src/libtracker-miner/tracker-miner-fs.c
index 2e39047e6..ce33b4ede 100644
--- a/src/libtracker-miner/tracker-miner-fs.c
+++ b/src/libtracker-miner/tracker-miner-fs.c
@@ -1228,49 +1228,56 @@ item_queue_is_blocked_by_file (TrackerMinerFS *fs,
}
static void
-sparql_buffer_task_finished_cb (GObject *object,
- GAsyncResult *result,
- gpointer user_data)
+sparql_buffer_flush_cb (GObject *object,
+ GAsyncResult *result,
+ gpointer user_data)
{
- TrackerMinerFS *fs;
- TrackerMinerFSPrivate *priv;
+ TrackerMinerFS *fs = user_data;
+ TrackerMinerFSPrivate *priv = fs->priv;
+ GPtrArray *tasks;
+ GError *error = NULL;
TrackerTask *task;
GFile *task_file;
- GError *error = NULL;
-
- fs = user_data;
- priv = fs->priv;
+ guint i;
- task = tracker_sparql_buffer_push_finish (TRACKER_SPARQL_BUFFER (object),
- result, &error);
- task_file = tracker_task_get_file (task);
+ tasks = tracker_sparql_buffer_flush_finish (TRACKER_SPARQL_BUFFER (object),
+ result, &error);
if (error) {
g_warning ("Could not execute sparql: %s", error->message);
- tracker_error_report (task_file, error->message,
- tracker_sparql_task_get_sparql (task));
- priv->total_files_notified_error++;
- g_error_free (error);
}
- tracker_error_report_delete (task_file);
+ for (i = 0; i < tasks->len; i++) {
+ task = g_ptr_array_index (tasks, i);
+ task_file = tracker_task_get_file (task);
- if (item_queue_is_blocked_by_file (fs, task_file)) {
- g_object_unref (priv->item_queue_blocker);
- priv->item_queue_blocker = NULL;
+ if (error) {
+ tracker_error_report (task_file, error->message,
+ tracker_sparql_task_get_sparql (task));
+ fs->priv->total_files_notified_error++;
+ } else {
+ tracker_error_report_delete (task_file);
+ }
+
+ if (item_queue_is_blocked_by_file (fs, task_file))
+ g_clear_object (&fs->priv->item_queue_blocker);
}
if (priv->item_queue_blocker != NULL) {
if (tracker_task_pool_get_size (TRACKER_TASK_POOL (object)) > 0) {
tracker_sparql_buffer_flush (TRACKER_SPARQL_BUFFER (object),
- "Item queue still blocked after flush");
+ "Item queue still blocked after flush",
+ sparql_buffer_flush_cb,
+ fs);
/* Check if we've finished inserting for given prefixes ... */
notify_roots_finished (fs, TRUE);
}
} else if (tracker_task_pool_limit_reached (TRACKER_TASK_POOL (object))) {
tracker_sparql_buffer_flush (TRACKER_SPARQL_BUFFER (object),
- "SPARQL buffer limit reached");
+ "SPARQL buffer limit reached",
+ sparql_buffer_flush_cb,
+ fs);
/* Check if we've finished inserting for given prefixes ... */
notify_roots_finished (fs, TRUE);
@@ -1278,7 +1285,7 @@ sparql_buffer_task_finished_cb (GObject *object,
item_queue_handlers_set_up (fs);
}
- tracker_task_unref (task);
+ g_clear_error (&error);
}
static void
@@ -1299,18 +1306,21 @@ push_sparql_task (TrackerMinerFS *fs,
if (sparql_task) {
tracker_sparql_buffer_push (fs->priv->sparql_buffer,
- sparql_task,
- sparql_buffer_task_finished_cb,
- fs);
+ sparql_task);
if (item_queue_is_blocked_by_file (fs, file)) {
- tracker_sparql_buffer_flush (fs->priv->sparql_buffer, "Current file is blocking item
queue");
+ tracker_sparql_buffer_flush (fs->priv->sparql_buffer,
+ "Current file is blocking item queue",
+ sparql_buffer_flush_cb,
+ fs);
/* Check if we've finished inserting for given prefixes ... */
notify_roots_finished (fs, TRUE);
} else if (tracker_task_pool_limit_reached (TRACKER_TASK_POOL (fs->priv->sparql_buffer))) {
tracker_sparql_buffer_flush (fs->priv->sparql_buffer,
- "SPARQL buffer limit reached");
+ "SPARQL buffer limit reached",
+ sparql_buffer_flush_cb,
+ fs);
/* Check if we've finished inserting for given prefixes ... */
notify_roots_finished (fs, TRUE);
@@ -1597,9 +1607,7 @@ push_task (TrackerMinerFS *fs,
task = tracker_sparql_task_new_take_sparql_str (file, sparql);
tracker_sparql_buffer_push (fs->priv->sparql_buffer,
- task,
- sparql_buffer_task_finished_cb,
- fs);
+ task);
tracker_task_unref (task);
}
@@ -1631,7 +1639,9 @@ miner_handle_next_item (TrackerMinerFS *fs)
* process now, we want it to get finished before we can go
* on with the queues... */
tracker_sparql_buffer_flush (fs->priv->sparql_buffer,
- "Queue handlers WAIT");
+ "Queue handlers WAIT",
+ sparql_buffer_flush_cb,
+ fs);
/* Check if we've finished inserting for given prefixes ... */
notify_roots_finished (fs, TRUE);
@@ -1735,7 +1745,9 @@ miner_handle_next_item (TrackerMinerFS *fs)
} else {
/* Flush any possible pending update here */
tracker_sparql_buffer_flush (fs->priv->sparql_buffer,
- "Queue handlers NONE");
+ "Queue handlers NONE",
+ sparql_buffer_flush_cb,
+ fs);
/* Check if we've finished inserting for given prefixes ... */
notify_roots_finished (fs, TRUE);
@@ -1790,7 +1802,9 @@ miner_handle_next_item (TrackerMinerFS *fs)
if (tracker_task_pool_limit_reached (TRACKER_TASK_POOL (fs->priv->sparql_buffer))) {
tracker_sparql_buffer_flush (fs->priv->sparql_buffer,
- "SPARQL buffer limit reached");
+ "SPARQL buffer limit reached",
+ sparql_buffer_flush_cb,
+ fs);
/* Check if we've finished inserting for given prefixes ... */
notify_roots_finished (fs, TRUE);
diff --git a/src/libtracker-miner/tracker-sparql-buffer.c b/src/libtracker-miner/tracker-sparql-buffer.c
index 4418f3a71..eaea5ec84 100644
--- a/src/libtracker-miner/tracker-sparql-buffer.c
+++ b/src/libtracker-miner/tracker-sparql-buffer.c
@@ -45,13 +45,13 @@ struct _TrackerSparqlBufferPrivate
struct _SparqlTaskData
{
gchar *str;
- GTask *async_task;
};
struct _UpdateArrayData {
TrackerSparqlBuffer *buffer;
GPtrArray *tasks;
GArray *sparql_array;
+ GTask *async_task;
};
G_DEFINE_TYPE_WITH_PRIVATE (TrackerSparqlBuffer, tracker_sparql_buffer, TRACKER_TYPE_TASK_POOL)
@@ -166,7 +166,7 @@ update_array_data_free (UpdateArrayData *update_data)
g_ptr_array_foreach (update_data->tasks,
(GFunc) remove_task_foreach,
update_data->buffer);
- g_ptr_array_free (update_data->tasks, TRUE);
+ g_ptr_array_unref (update_data->tasks);
g_slice_free (UpdateArrayData, update_data);
}
@@ -180,7 +180,6 @@ tracker_sparql_buffer_update_array_cb (GObject *object,
TrackerSparqlBuffer *buffer;
GError *error = NULL;
UpdateArrayData *update_data;
- gint i;
update_data = user_data;
buffer = TRACKER_SPARQL_BUFFER (update_data->buffer);
@@ -198,41 +197,23 @@ tracker_sparql_buffer_update_array_cb (GObject *object,
error->message);
}
- /* Report status on each task of the batch update */
- for (i = 0; i < update_data->tasks->len; i++) {
- TrackerTask *task;
- SparqlTaskData *task_data;
-
- task = g_ptr_array_index (update_data->tasks, i);
- task_data = tracker_task_get_data (task);
-
- /* Call finished handler with the error, if any */
- if (error) {
- g_task_return_error (task_data->async_task,
- g_error_copy (error));
- } else {
- g_task_return_pointer (task_data->async_task,
- tracker_task_ref (task),
- (GDestroyNotify) tracker_task_unref);
- }
-
- g_clear_object (&task_data->async_task);
-
- /* No need to deallocate the task here, it will be done when
- * unref-ing the UpdateArrayData below */
+ if (error) {
+ g_task_return_error (update_data->async_task, error);
+ } else {
+ g_task_return_pointer (update_data->async_task,
+ g_ptr_array_ref (update_data->tasks),
+ (GDestroyNotify) g_ptr_array_unref);
}
/* Note that tasks are actually deallocated here */
update_array_data_free (update_data);
-
- if (error) {
- g_error_free (error);
- }
}
gboolean
tracker_sparql_buffer_flush (TrackerSparqlBuffer *buffer,
- const gchar *reason)
+ const gchar *reason,
+ GAsyncReadyCallback cb,
+ gpointer user_data)
{
TrackerSparqlBufferPrivate *priv;
GArray *sparql_array;
@@ -268,6 +249,7 @@ tracker_sparql_buffer_flush (TrackerSparqlBuffer *buffer,
update_data->buffer = buffer;
update_data->tasks = g_ptr_array_ref (priv->tasks);
update_data->sparql_array = sparql_array;
+ update_data->async_task = g_task_new (buffer, NULL, cb, user_data);
/* Empty pool, update_data will keep
* references to the tasks to keep
@@ -309,28 +291,11 @@ sparql_buffer_push_to_pool (TrackerSparqlBuffer *buffer,
void
tracker_sparql_buffer_push (TrackerSparqlBuffer *buffer,
- TrackerTask *task,
- GAsyncReadyCallback cb,
- gpointer user_data)
+ TrackerTask *task)
{
- SparqlTaskData *data;
-
g_return_if_fail (TRACKER_IS_SPARQL_BUFFER (buffer));
g_return_if_fail (task != NULL);
- /* NOTE: We don't own the task and if we want it we have to
- * reference it, each function below references task in
- * different ways.
- */
- data = tracker_task_get_data (task);
-
- if (!data->async_task) {
- data->async_task = g_task_new (buffer, NULL, cb, user_data);
- g_task_set_task_data (data->async_task,
- tracker_task_ref (task),
- (GDestroyNotify) tracker_task_unref);
- }
-
sparql_buffer_push_to_pool (buffer, task);
}
@@ -350,11 +315,6 @@ static void
sparql_task_data_free (SparqlTaskData *data)
{
g_free (data->str);
-
- if (data->async_task) {
- g_object_unref (data->async_task);
- }
-
g_slice_free (SparqlTaskData, data);
}
@@ -390,23 +350,16 @@ tracker_sparql_task_get_sparql (TrackerTask *task)
return task_data->str;
}
-TrackerTask *
-tracker_sparql_buffer_push_finish (TrackerSparqlBuffer *buffer,
- GAsyncResult *res,
- GError **error)
+GPtrArray *
+tracker_sparql_buffer_flush_finish (TrackerSparqlBuffer *buffer,
+ GAsyncResult *res,
+ GError **error)
{
- TrackerTask *task;
-
g_return_val_if_fail (TRACKER_IS_SPARQL_BUFFER (buffer), NULL);
g_return_val_if_fail (G_IS_ASYNC_RESULT (res), NULL);
g_return_val_if_fail (!error || !*error, NULL);
- task = g_task_propagate_pointer (G_TASK (res), error);
-
- if (!task)
- task = g_object_ref (g_task_get_task_data (G_TASK (res)));
-
- return task;
+ return g_task_propagate_pointer (G_TASK (res), error);
}
static gboolean
diff --git a/src/libtracker-miner/tracker-sparql-buffer.h b/src/libtracker-miner/tracker-sparql-buffer.h
index 7f5f53515..0d9c5fce2 100644
--- a/src/libtracker-miner/tracker-sparql-buffer.h
+++ b/src/libtracker-miner/tracker-sparql-buffer.h
@@ -65,16 +65,16 @@ TrackerSparqlBuffer *tracker_sparql_buffer_new (TrackerSparqlConnection *conne
guint limit);
gboolean tracker_sparql_buffer_flush (TrackerSparqlBuffer *buffer,
- const gchar *reason);
-
-void tracker_sparql_buffer_push (TrackerSparqlBuffer *buffer,
- TrackerTask *task,
+ const gchar *reason,
GAsyncReadyCallback cb,
gpointer user_data);
-TrackerTask * tracker_sparql_buffer_push_finish (TrackerSparqlBuffer *buffer,
- GAsyncResult *res,
- GError **error);
+GPtrArray * tracker_sparql_buffer_flush_finish (TrackerSparqlBuffer *buffer,
+ GAsyncResult *res,
+ GError **error);
+
+void tracker_sparql_buffer_push (TrackerSparqlBuffer *buffer,
+ TrackerTask *task);
TrackerSparqlBufferState tracker_sparql_buffer_get_state (TrackerSparqlBuffer *buffer,
GFile *file);
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]