[tracker] libtracker-miner: Process writeback events one by one



commit c12d41c4cd5b31b9378dee7de1722fcf65509256
Author: Carlos Garnacho <carlosg gnome org>
Date:   Mon Nov 28 21:48:05 2016 +0100

    libtracker-miner: Process writeback events one by one
    
    The idle function could result in an unrestricted number of
    writeback events performing queries simultaneously, which may
    result in more CPU spent than desirable.
    
    So serialize the whole thing, split the GVariant into multiple
    events that get queued and processed one by one, only when one
    event has been fully processed the next will be processed.
    
    This reopens the question of what to do with writeback events
    at finalize time though. Those now will be freed instead of
    silently ignored after we break the main loop.

 src/miners/fs/tracker-writeback-listener.c |  246 +++++++++++++++++-----------
 1 files changed, 151 insertions(+), 95 deletions(-)
---
diff --git a/src/miners/fs/tracker-writeback-listener.c b/src/miners/fs/tracker-writeback-listener.c
index 20e3d14..298a005 100644
--- a/src/miners/fs/tracker-writeback-listener.c
+++ b/src/miners/fs/tracker-writeback-listener.c
@@ -30,10 +30,19 @@
 #define TRACKER_INTERFACE_RESOURCES     "org.freedesktop.Tracker1.Resources"
 
 typedef struct {
+       gint32 subject_id;
+       gint32 *types;
+} WritebackEvent;
+
+typedef struct {
        TrackerMinerFiles *files_miner;
        GDBusConnection *d_connection;
        TrackerSparqlConnection *connection;
        guint d_signal;
+
+       GQueue *events;
+       guint event_dispatch_id;
+       guint querying : 1;
 } TrackerWritebackListenerPrivate;
 
 typedef struct {
@@ -41,11 +50,6 @@ typedef struct {
        GStrv rdf_types;
 } QueryData;
 
-typedef struct {
-       GVariantIter *iter1;
-       TrackerWritebackListener *self;
-} DelayedLoopData;
-
 enum {
        PROP_0,
        PROP_FILES_MINER
@@ -71,6 +75,9 @@ static void     on_writeback_cb                    (GDBusConnection      *connec
                                                     GVariant             *parameters,
                                                     gpointer              user_data);
 
+static void    check_start_idle                    (TrackerWritebackListener *self,
+                                                    gboolean                  force);
+
 static void
 writeback_listener_initable_iface_init (GInitableIface *iface)
 {
@@ -147,6 +154,13 @@ writeback_listener_get_property (GObject    *object,
 }
 
 static void
+free_event (WritebackEvent *event)
+{
+       g_free (event->types);
+       g_free (event);
+}
+
+static void
 writeback_listener_finalize (GObject *object)
 {
        TrackerWritebackListener *listener = TRACKER_WRITEBACK_LISTENER (object);
@@ -154,6 +168,12 @@ writeback_listener_finalize (GObject *object)
 
        priv = tracker_writeback_listener_get_instance_private (listener);
 
+       if (priv->event_dispatch_id) {
+               g_source_remove (priv->event_dispatch_id);
+       }
+
+       g_queue_free_full (priv->events, (GDestroyNotify) free_event);
+
        if (priv->connection && priv->d_signal) {
                g_dbus_connection_signal_unsubscribe (priv->d_connection, priv->d_signal);
        }
@@ -175,6 +195,11 @@ writeback_listener_finalize (GObject *object)
 static void
 tracker_writeback_listener_init (TrackerWritebackListener *object)
 {
+       TrackerWritebackListener *listener = TRACKER_WRITEBACK_LISTENER (object);
+       TrackerWritebackListenerPrivate *priv;
+
+       priv = tracker_writeback_listener_get_instance_private (listener);
+       priv->events = g_queue_new ();
 }
 
 static gboolean
@@ -288,6 +313,8 @@ sparql_query_cb (GObject      *object,
                                        g_message ("  No files qualify for updates");
                                        query_data_free (data);
                                        g_object_unref (cursor);
+
+                                       check_start_idle (self, TRUE);
                                        return;
                                }
                        }
@@ -330,6 +357,8 @@ sparql_query_cb (GObject      *object,
        }
 
        query_data_free (data);
+
+       check_start_idle (self, TRUE);
 }
 
 static void
@@ -343,56 +372,52 @@ rdf_types_to_uris_cb (GObject      *object,
        TrackerSparqlCursor *cursor;
        TrackerSparqlConnection *connection;
        GError *error = NULL;
+       gchar *query;
+       GArray *rdf_types;
+       gchar *subject = NULL;
 
        priv = tracker_writeback_listener_get_instance_private (self);
        connection = priv->connection;
 
        cursor = tracker_sparql_connection_query_finish (connection, result, &error);
 
-       if (!error) {
-               gchar *query;
-               GArray *rdf_types;
-               gchar *subject = NULL;
+       if (error)
+               goto trouble;
 
-               rdf_types = g_array_new (TRUE, TRUE, sizeof (gchar *));
+       rdf_types = g_array_new (TRUE, TRUE, sizeof (gchar *));
 
-               while (tracker_sparql_cursor_next (cursor, NULL, NULL)) {
-                       gchar *uri = g_strdup (tracker_sparql_cursor_get_string (cursor, 0, NULL));
-                       if (subject == NULL) {
-                               subject = g_strdup (tracker_sparql_cursor_get_string (cursor, 1, NULL));
-                       }
-                       g_array_append_val (rdf_types, uri);
+       while (tracker_sparql_cursor_next (cursor, NULL, NULL)) {
+               gchar *uri = g_strdup (tracker_sparql_cursor_get_string (cursor, 0, NULL));
+               if (subject == NULL) {
+                       subject = g_strdup (tracker_sparql_cursor_get_string (cursor, 1, NULL));
                }
+               g_array_append_val (rdf_types, uri);
+       }
 
-               g_object_unref (cursor);
-
-               data->rdf_types = (GStrv) rdf_types->data;
-               g_array_free (rdf_types, FALSE);
+       g_object_unref (cursor);
 
-               if (subject == NULL) {
-                       goto trouble;
-               }
+       data->rdf_types = (GStrv) rdf_types->data;
+       g_array_free (rdf_types, FALSE);
 
-               query = g_strdup_printf ("SELECT ?url '%s' ?predicate ?object { "
-                                           "<%s> a nfo:FileDataObject . "
-                                           "<%s> ?predicate ?object ; nie:url ?url . "
-                                           "?predicate tracker:writeback true . "
-                                           "FILTER (NOT EXISTS { GRAPH <" TRACKER_OWN_GRAPH_URN "> "
-                                             "{ <%s> ?predicate ?object } }) } ",
-                                        subject, subject, subject, subject);
+       if (subject == NULL)
+               goto trouble;
 
-               tracker_sparql_connection_query_async (connection,
-                                                      query,
-                                                      NULL,
-                                                      sparql_query_cb,
-                                                      data);
+       query = g_strdup_printf ("SELECT ?url '%s' ?predicate ?object { "
+                                "<%s> a nfo:FileDataObject . "
+                                "<%s> ?predicate ?object ; nie:url ?url . "
+                                "?predicate tracker:writeback true . "
+                                "FILTER (NOT EXISTS { GRAPH <" TRACKER_OWN_GRAPH_URN "> "
+                                "{ <%s> ?predicate ?object } }) } ",
+                                subject, subject, subject, subject);
 
-               g_free (subject);
-               g_free (query);
+       tracker_sparql_connection_query_async (connection,
+                                              query,
+                                              NULL,
+                                              sparql_query_cb,
+                                              data);
 
-       } else {
-               goto trouble;
-       }
+       g_free (subject);
+       g_free (query);
 
        return;
 
@@ -402,69 +427,86 @@ trouble:
                g_error_free (error);
        }
        query_data_free (data);
+
+       check_start_idle (self, TRUE);
 }
 
 static gboolean
-delayed_loop_idle (gpointer user_data)
+process_event (gpointer user_data)
 {
-       DelayedLoopData *ldata = user_data;
-       GVariantIter *iter2;
-       gint subject_id = 0, rdf_type = 0;
-
-       if (g_variant_iter_next (ldata->iter1, "{iai}", &subject_id, &iter2)) {
-               TrackerWritebackListenerPrivate *priv;
-               QueryData *data = NULL;
-               GString *query;
-               gboolean comma = FALSE;
-
-               priv = tracker_writeback_listener_get_instance_private (ldata->self);
-
-               data = query_data_new (ldata->self);
-
-               /* Two queries are grouped together here to reduce the amount of
-                * queries that must be made. tracker:uri() is idd unrelated to
-                * the other part of the query (and is repeated each result, idd) */
-
-               query = g_string_new ("SELECT ");
-               g_string_append_printf (query, "?resource tracker:uri (%d) { "
-                                              "?resource a rdfs:Class . "
-                                              "FILTER (tracker:id (?resource) IN (",
-                                       subject_id);
-
-               while (g_variant_iter_loop (iter2, "i", &rdf_type)) {
-                       if (comma) {
-                               g_string_append_printf (query, ", %d", rdf_type);
-                       } else {
-                               g_string_append_printf (query, "%d", rdf_type);
-                               comma = TRUE;
-                       }
-               }
+       TrackerWritebackListener *self = user_data;
+       TrackerWritebackListenerPrivate *priv;
+       WritebackEvent *event;
+       QueryData *data = NULL;
+       GString *query;
+       gboolean comma = FALSE;
+       gint i;
 
-               g_string_append (query, ")) }");
+       priv = tracker_writeback_listener_get_instance_private (self);
+       event = g_queue_pop_head (priv->events);
+       priv->event_dispatch_id = 0;
 
-               tracker_sparql_connection_query_async (priv->connection,
-                                                      query->str,
-                                                      NULL,
-                                                      rdf_types_to_uris_cb,
-                                                      data);
+       if (!event)
+               return G_SOURCE_REMOVE;
 
-               g_string_free (query, TRUE);
+       data = query_data_new (self);
 
-               g_variant_iter_free (iter2);
+       /* Two queries are grouped together here to reduce the amount of
+        * queries that must be made. tracker:uri() is idd unrelated to
+        * the other part of the query (and is repeated each result, idd) */
+
+       query = g_string_new ("SELECT ");
+       g_string_append_printf (query, "?resource tracker:uri (%d) { "
+                                      "?resource a rdfs:Class . "
+                                      "FILTER (tracker:id (?resource) IN (",
+                               event->subject_id);
+
+       for (i = 0; event->types[i] != 0; i++) {
+               gint32 rdf_type = event->types[i];
 
-               return TRUE;
+               if (comma) {
+                       g_string_append_printf (query, ", %d", rdf_type);
+               } else {
+                       g_string_append_printf (query, "%d", rdf_type);
+                       comma = TRUE;
+               }
        }
 
-       return FALSE;
+       g_string_append (query, ")) }");
+
+       tracker_sparql_connection_query_async (priv->connection,
+                                              query->str,
+                                              NULL,
+                                              rdf_types_to_uris_cb,
+                                              data);
+       g_string_free (query, TRUE);
+       free_event (event);
+
+       return G_SOURCE_REMOVE;
 }
 
 static void
-delayed_loop_finished (gpointer user_data)
+check_start_idle (TrackerWritebackListener *self,
+                  gboolean                  force)
 {
-       DelayedLoopData *ldata = user_data;
-       g_variant_iter_free (ldata->iter1);
-       g_object_unref (ldata->self);
-       g_free (ldata);
+       TrackerWritebackListenerPrivate *priv;
+
+       priv = tracker_writeback_listener_get_instance_private (self);
+
+       if (priv->event_dispatch_id != 0)
+               return;
+       if (priv->querying && !force)
+               return;
+       if (g_queue_is_empty (priv->events)) {
+               priv->querying = FALSE;
+               return;
+       }
+
+       priv->querying = TRUE;
+       priv->event_dispatch_id =
+               g_idle_add_full (G_PRIORITY_LOW,
+                                process_event,
+                                self, NULL);
 }
 
 static void
@@ -477,13 +519,27 @@ on_writeback_cb (GDBusConnection      *connection,
                  gpointer              user_data)
 {
        TrackerWritebackListener *self = TRACKER_WRITEBACK_LISTENER (user_data);
-       DelayedLoopData *data = g_new (DelayedLoopData, 1);
+       TrackerWritebackListenerPrivate *priv;
+       GVariantIter *iter1, *iter2;
+       gint32 subject_id;
 
-       data->self = g_object_ref (self);
-       g_variant_get (parameters, "(a{iai})", &data->iter1);
+       priv = tracker_writeback_listener_get_instance_private (self);
+       g_variant_get (parameters, "(a{iai})", &iter1);
+
+       if (g_variant_iter_next (iter1, "{iai}", &subject_id, &iter2)) {
+               WritebackEvent *event = g_new (WritebackEvent, 1);
+               GArray *types = g_array_new (TRUE, TRUE, sizeof (gint32));
+               gint32 rdf_type;
+
+               while (g_variant_iter_loop (iter2, "i", &rdf_type))
+                       g_array_append_val (types, rdf_type);
+
+               g_variant_iter_free (iter2);
+               event->subject_id = subject_id;
+               event->types = (gint32 *) g_array_free (types, FALSE);
+               g_queue_push_tail (priv->events, event);
+       }
 
-       g_idle_add_full (G_PRIORITY_LOW,
-                        delayed_loop_idle,
-                        data,
-                        delayed_loop_finished);
+       g_variant_iter_free (iter1);
+       check_start_idle (self, FALSE);
 }


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]