[tracker/wip/carlosg/notifier-query-batches] libtracker-sparql: Query notifier data in batches
- From: Carlos Garnacho <carlosg src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [tracker/wip/carlosg/notifier-query-batches] libtracker-sparql: Query notifier data in batches
- Date: Mon, 10 Aug 2020 10:52:31 +0000 (UTC)
commit 2fd26f7d82ef2efb977b638dd30f6553bb9f55e0
Author: Carlos Garnacho <carlosg gnome org>
Date: Tue Aug 4 23:48:16 2020 +0200
libtracker-sparql: Query notifier data in batches
We have limits with the services table, and other limits on the number
of parameters higher up. We shouldn't be doing queries with an arbitrary
number of those.
Limit the batch size in queries for extra TrackerNotifier data, and
adapt code to maybe do extra queries for a single event batch.
As it is now possible that we are initiating queries on a fresh new
TrackerNotifierEventCache when an old one is still being processed, and
this happens from different threads (the update thread for the initial
one triggered from commit hooks, the main context thread for later async
processing), protect the statements execution with a mutex, as binding
arguments for execution is not thread safe.
src/libtracker-sparql/tracker-notifier.c | 64 ++++++++++++++++++--------------
1 file changed, 36 insertions(+), 28 deletions(-)
---
diff --git a/src/libtracker-sparql/tracker-notifier.c b/src/libtracker-sparql/tracker-notifier.c
index bed40a5c8..80f5867f4 100644
--- a/src/libtracker-sparql/tracker-notifier.c
+++ b/src/libtracker-sparql/tracker-notifier.c
@@ -71,6 +71,7 @@ struct _TrackerNotifierPrivate {
GCancellable *cancellable;
TrackerSparqlStatement *local_statement;
gint n_local_statement_slots;
+ GMutex mutex;
};
struct _TrackerNotifierEventCache {
@@ -78,6 +79,7 @@ struct _TrackerNotifierEventCache {
gchar *graph;
TrackerNotifier *notifier;
GSequence *sequence;
+ GSequenceIter *first;
};
struct _TrackerNotifierEvent {
@@ -100,11 +102,16 @@ enum {
static guint signals[N_SIGNALS] = { 0 };
+#define N_SLOTS 50 /* In sync with tracker-vtab-service.c parameters */
+
#define DEFAULT_OBJECT_PATH "/org/freedesktop/Tracker3/Endpoint"
G_DEFINE_TYPE_WITH_CODE (TrackerNotifier, tracker_notifier, G_TYPE_OBJECT,
G_ADD_PRIVATE (TrackerNotifier))
+static void tracker_notifier_query_extra_info (TrackerNotifier *notifier,
+ TrackerNotifierEventCache *cache);
+
static TrackerNotifierSubscription *
tracker_notifier_subscription_new (TrackerNotifier *notifier,
GDBusConnection *connection,
@@ -365,8 +372,7 @@ tracker_notifier_emit_events_in_idle (TrackerNotifierEventCache *cache)
static gchar *
create_extra_info_query (TrackerNotifier *notifier,
- TrackerNotifierEventCache *cache,
- gint n_slots)
+ TrackerNotifierEventCache *cache)
{
GString *sparql;
gchar *service;
@@ -384,7 +390,7 @@ create_extra_info_query (TrackerNotifier *notifier,
g_string_append (sparql, "{ VALUES ?id { ");
- for (i = 0; i < n_slots; i++) {
+ for (i = 0; i < N_SLOTS; i++) {
g_string_append_printf (sparql, "~arg%d ", i + 1);
}
@@ -405,33 +411,26 @@ create_extra_info_query (TrackerNotifier *notifier,
static TrackerSparqlStatement *
ensure_extra_info_statement (TrackerNotifier *notifier,
- TrackerNotifierEventCache *cache,
- gint *n_slots_out)
+ TrackerNotifierEventCache *cache)
{
TrackerSparqlStatement **ptr;
TrackerNotifierPrivate *priv;
gchar *sparql;
- gint *n_slots, new_slots;
GError *error = NULL;
priv = tracker_notifier_get_instance_private (notifier);
if (cache->subscription) {
ptr = &cache->subscription->statement;
- n_slots = &cache->subscription->n_statement_slots;
} else {
ptr = &priv->local_statement;
- n_slots = &priv->n_local_statement_slots;
}
- if (*ptr && *n_slots >= g_sequence_get_length (cache->sequence)) {
- *n_slots_out = *n_slots;
+ if (*ptr) {
return *ptr;
}
- g_clear_object (ptr);
- new_slots = g_sequence_get_length (cache->sequence);
- sparql = create_extra_info_query (notifier, cache, new_slots);
+ sparql = create_extra_info_query (notifier, cache);
*ptr = tracker_sparql_connection_query_statement (priv->connection,
sparql,
priv->cancellable,
@@ -444,9 +443,6 @@ ensure_extra_info_statement (TrackerNotifier *notifier,
return NULL;
}
- *n_slots = new_slots;
- *n_slots_out = new_slots;
-
return *ptr;
}
@@ -462,7 +458,7 @@ handle_cursor (GTask *task,
GSequenceIter *iter;
gint64 id;
- iter = g_sequence_get_begin_iter (cache->sequence);
+ iter = cache->first;
/* We rely here in both the GPtrArray and the query items being
* sorted by tracker:id, the former will be so because the way it's
@@ -487,8 +483,13 @@ handle_cursor (GTask *task,
}
tracker_sparql_cursor_close (cursor);
+ cache->first = iter;
- tracker_notifier_emit_events_in_idle (cache);
+ if (g_sequence_iter_is_end (cache->first)) {
+ tracker_notifier_emit_events_in_idle (cache);
+ } else {
+ tracker_notifier_query_extra_info (cache->notifier, cache);
+ }
g_task_return_boolean (task, TRUE);
}
@@ -549,15 +550,16 @@ query_extra_info_cb (GObject *object,
static void
bind_arguments (TrackerSparqlStatement *statement,
- TrackerNotifierEventCache *cache,
- gint n_slots)
+ TrackerNotifierEventCache *cache)
{
GSequenceIter *iter;
gchar *arg_name;
gint i = 0;
- for (iter = g_sequence_get_begin_iter (cache->sequence);
- !g_sequence_iter_is_end (iter);
+ tracker_sparql_statement_clear_bindings (statement);
+
+ for (iter = cache->first;
+ !g_sequence_iter_is_end (iter) && i < N_SLOTS;
iter = g_sequence_iter_next (iter)) {
TrackerNotifierEvent *event;
@@ -570,7 +572,7 @@ bind_arguments (TrackerSparqlStatement *statement,
}
/* Fill in missing slots with 0's */
- while (i < n_slots) {
+ while (i < N_SLOTS) {
arg_name = g_strdup_printf ("arg%d", i + 1);
tracker_sparql_statement_bind_int (statement, arg_name, 0);
g_free (arg_name);
@@ -584,18 +586,23 @@ tracker_notifier_query_extra_info (TrackerNotifier *notifier,
{
TrackerNotifierPrivate *priv;
TrackerSparqlStatement *statement;
- gint n_slots;
- statement = ensure_extra_info_statement (notifier, cache, &n_slots);
+ priv = tracker_notifier_get_instance_private (notifier);
+
+ g_mutex_lock (&priv->mutex);
+
+ statement = ensure_extra_info_statement (notifier, cache);
if (!statement)
- return;
+ goto out;
- bind_arguments (statement, cache, n_slots);
- priv = tracker_notifier_get_instance_private (notifier);
+ bind_arguments (statement, cache);
tracker_sparql_statement_execute_async (statement,
priv->cancellable,
query_extra_info_cb,
cache);
+
+out:
+ g_mutex_unlock (&priv->mutex);
}
void
@@ -608,6 +615,7 @@ _tracker_notifier_event_cache_flush_events (TrackerNotifierEventCache *cache)
return;
}
+ cache->first = g_sequence_get_begin_iter (cache->sequence);
tracker_notifier_query_extra_info (notifier, cache);
}
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]