[tracker/wip/carlosg/notifier-query-batches] libtracker-sparql: Query notifier data in batches




commit 2fd26f7d82ef2efb977b638dd30f6553bb9f55e0
Author: Carlos Garnacho <carlosg gnome org>
Date:   Tue Aug 4 23:48:16 2020 +0200

    libtracker-sparql: Query notifier data in batches
    
    We have limits with the services table, and other limits on the number
    of parameters higher up. We shouldn't be doing queries with an arbitrary
    number of those.
    
    Limit the batch size in queries for extra TrackerNotifier data, and
    adapt code to maybe do extra queries for a single event batch.
    
    As it is now possible that we are initiating queries on a fresh new
    TrackerNotifierEventCache when an old one is still being processed, and
    this happens from different threads (the update thread for the initial
    one triggered from commit hooks, the main context thread for later async
    processing), protect the statements execution with a mutex, as binding
    arguments for execution is not thread safe.

 src/libtracker-sparql/tracker-notifier.c | 64 ++++++++++++++++++--------------
 1 file changed, 36 insertions(+), 28 deletions(-)
---
diff --git a/src/libtracker-sparql/tracker-notifier.c b/src/libtracker-sparql/tracker-notifier.c
index bed40a5c8..80f5867f4 100644
--- a/src/libtracker-sparql/tracker-notifier.c
+++ b/src/libtracker-sparql/tracker-notifier.c
@@ -71,6 +71,7 @@ struct _TrackerNotifierPrivate {
        GCancellable *cancellable;
        TrackerSparqlStatement *local_statement;
        gint n_local_statement_slots;
+       GMutex mutex;
 };
 
 struct _TrackerNotifierEventCache {
@@ -78,6 +79,7 @@ struct _TrackerNotifierEventCache {
        gchar *graph;
        TrackerNotifier *notifier;
        GSequence *sequence;
+       GSequenceIter *first;
 };
 
 struct _TrackerNotifierEvent {
@@ -100,11 +102,16 @@ enum {
 
 static guint signals[N_SIGNALS] = { 0 };
 
+#define N_SLOTS 50 /* In sync with tracker-vtab-service.c parameters */
+
 #define DEFAULT_OBJECT_PATH "/org/freedesktop/Tracker3/Endpoint"
 
 G_DEFINE_TYPE_WITH_CODE (TrackerNotifier, tracker_notifier, G_TYPE_OBJECT,
                          G_ADD_PRIVATE (TrackerNotifier))
 
+static void tracker_notifier_query_extra_info (TrackerNotifier           *notifier,
+                                               TrackerNotifierEventCache *cache);
+
 static TrackerNotifierSubscription *
 tracker_notifier_subscription_new (TrackerNotifier *notifier,
                                    GDBusConnection *connection,
@@ -365,8 +372,7 @@ tracker_notifier_emit_events_in_idle (TrackerNotifierEventCache *cache)
 
 static gchar *
 create_extra_info_query (TrackerNotifier           *notifier,
-                         TrackerNotifierEventCache *cache,
-                         gint                       n_slots)
+                         TrackerNotifierEventCache *cache)
 {
        GString *sparql;
        gchar *service;
@@ -384,7 +390,7 @@ create_extra_info_query (TrackerNotifier           *notifier,
 
        g_string_append (sparql, "{ VALUES ?id { ");
 
-       for (i = 0; i < n_slots; i++) {
+       for (i = 0; i < N_SLOTS; i++) {
                g_string_append_printf (sparql, "~arg%d ", i + 1);
        }
 
@@ -405,33 +411,26 @@ create_extra_info_query (TrackerNotifier           *notifier,
 
 static TrackerSparqlStatement *
 ensure_extra_info_statement (TrackerNotifier           *notifier,
-                             TrackerNotifierEventCache *cache,
-                             gint                      *n_slots_out)
+                             TrackerNotifierEventCache *cache)
 {
        TrackerSparqlStatement **ptr;
        TrackerNotifierPrivate *priv;
        gchar *sparql;
-       gint *n_slots, new_slots;
        GError *error = NULL;
 
        priv = tracker_notifier_get_instance_private (notifier);
 
        if (cache->subscription) {
                ptr = &cache->subscription->statement;
-               n_slots = &cache->subscription->n_statement_slots;
        } else {
                ptr = &priv->local_statement;
-               n_slots = &priv->n_local_statement_slots;
        }
 
-       if (*ptr && *n_slots >= g_sequence_get_length (cache->sequence)) {
-               *n_slots_out = *n_slots;
+       if (*ptr) {
                return *ptr;
        }
 
-       g_clear_object (ptr);
-       new_slots = g_sequence_get_length (cache->sequence);
-       sparql = create_extra_info_query (notifier, cache, new_slots);
+       sparql = create_extra_info_query (notifier, cache);
        *ptr = tracker_sparql_connection_query_statement (priv->connection,
                                                          sparql,
                                                          priv->cancellable,
@@ -444,9 +443,6 @@ ensure_extra_info_statement (TrackerNotifier           *notifier,
                return NULL;
        }
 
-       *n_slots = new_slots;
-       *n_slots_out = new_slots;
-
        return *ptr;
 }
 
@@ -462,7 +458,7 @@ handle_cursor (GTask        *task,
        GSequenceIter *iter;
        gint64 id;
 
-       iter = g_sequence_get_begin_iter (cache->sequence);
+       iter = cache->first;
 
        /* We rely here in both the GPtrArray and the query items being
         * sorted by tracker:id, the former will be so because the way it's
@@ -487,8 +483,13 @@ handle_cursor (GTask        *task,
        }
 
        tracker_sparql_cursor_close (cursor);
+       cache->first = iter;
 
-       tracker_notifier_emit_events_in_idle (cache);
+       if (g_sequence_iter_is_end (cache->first)) {
+               tracker_notifier_emit_events_in_idle (cache);
+       } else {
+               tracker_notifier_query_extra_info (cache->notifier, cache);
+       }
 
        g_task_return_boolean (task, TRUE);
 }
@@ -549,15 +550,16 @@ query_extra_info_cb (GObject      *object,
 
 static void
 bind_arguments (TrackerSparqlStatement    *statement,
-                TrackerNotifierEventCache *cache,
-                gint                       n_slots)
+                TrackerNotifierEventCache *cache)
 {
        GSequenceIter *iter;
        gchar *arg_name;
        gint i = 0;
 
-       for (iter = g_sequence_get_begin_iter (cache->sequence);
-            !g_sequence_iter_is_end (iter);
+       tracker_sparql_statement_clear_bindings (statement);
+
+       for (iter = cache->first;
+            !g_sequence_iter_is_end (iter) && i < N_SLOTS;
             iter = g_sequence_iter_next (iter)) {
                TrackerNotifierEvent *event;
 
@@ -570,7 +572,7 @@ bind_arguments (TrackerSparqlStatement    *statement,
        }
 
        /* Fill in missing slots with 0's */
-       while (i < n_slots) {
+       while (i < N_SLOTS) {
                arg_name = g_strdup_printf ("arg%d", i + 1);
                tracker_sparql_statement_bind_int (statement, arg_name, 0);
                g_free (arg_name);
@@ -584,18 +586,23 @@ tracker_notifier_query_extra_info (TrackerNotifier           *notifier,
 {
        TrackerNotifierPrivate *priv;
        TrackerSparqlStatement *statement;
-       gint n_slots;
 
-       statement = ensure_extra_info_statement (notifier, cache, &n_slots);
+       priv = tracker_notifier_get_instance_private (notifier);
+
+       g_mutex_lock (&priv->mutex);
+
+       statement = ensure_extra_info_statement (notifier, cache);
        if (!statement)
-               return;
+               goto out;
 
-       bind_arguments (statement, cache, n_slots);
-       priv = tracker_notifier_get_instance_private (notifier);
+       bind_arguments (statement, cache);
        tracker_sparql_statement_execute_async (statement,
                                                priv->cancellable,
                                                query_extra_info_cb,
                                                cache);
+
+out:
+       g_mutex_unlock (&priv->mutex);
 }
 
 void
@@ -608,6 +615,7 @@ _tracker_notifier_event_cache_flush_events (TrackerNotifierEventCache *cache)
                return;
        }
 
+       cache->first = g_sequence_get_begin_iter (cache->sequence);
        tracker_notifier_query_extra_info (notifier, cache);
 }
 


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]