[tracker] libtracker-miner: Process writeback events one by one
- From: Carlos Garnacho <carlosg src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [tracker] libtracker-miner: Process writeback events one by one
- Date: Thu, 8 Dec 2016 11:06:07 +0000 (UTC)
commit c12d41c4cd5b31b9378dee7de1722fcf65509256
Author: Carlos Garnacho <carlosg gnome org>
Date: Mon Nov 28 21:48:05 2016 +0100
libtracker-miner: Process writeback events one by one
The idle function could result in an unrestricted number of
writeback events performing queries simultaneously, which may
result in more CPU spent than desirable.
So serialize the whole thing, split the GVariant into multiple
events that get queued and processed one by one, only when one
event has been fully processed the next will be processed.
This reopens the question of what to do with writeback events
at finalize time though. Those now will be freed instead of
silently ignored after we break the main loop.
src/miners/fs/tracker-writeback-listener.c | 246 +++++++++++++++++-----------
1 files changed, 151 insertions(+), 95 deletions(-)
---
diff --git a/src/miners/fs/tracker-writeback-listener.c b/src/miners/fs/tracker-writeback-listener.c
index 20e3d14..298a005 100644
--- a/src/miners/fs/tracker-writeback-listener.c
+++ b/src/miners/fs/tracker-writeback-listener.c
@@ -30,10 +30,19 @@
#define TRACKER_INTERFACE_RESOURCES "org.freedesktop.Tracker1.Resources"
typedef struct {
+ gint32 subject_id;
+ gint32 *types;
+} WritebackEvent;
+
+typedef struct {
TrackerMinerFiles *files_miner;
GDBusConnection *d_connection;
TrackerSparqlConnection *connection;
guint d_signal;
+
+ GQueue *events;
+ guint event_dispatch_id;
+ guint querying : 1;
} TrackerWritebackListenerPrivate;
typedef struct {
@@ -41,11 +50,6 @@ typedef struct {
GStrv rdf_types;
} QueryData;
-typedef struct {
- GVariantIter *iter1;
- TrackerWritebackListener *self;
-} DelayedLoopData;
-
enum {
PROP_0,
PROP_FILES_MINER
@@ -71,6 +75,9 @@ static void on_writeback_cb (GDBusConnection *connec
GVariant *parameters,
gpointer user_data);
+static void check_start_idle (TrackerWritebackListener *self,
+ gboolean force);
+
static void
writeback_listener_initable_iface_init (GInitableIface *iface)
{
@@ -147,6 +154,13 @@ writeback_listener_get_property (GObject *object,
}
static void
+free_event (WritebackEvent *event)
+{
+ g_free (event->types);
+ g_free (event);
+}
+
+static void
writeback_listener_finalize (GObject *object)
{
TrackerWritebackListener *listener = TRACKER_WRITEBACK_LISTENER (object);
@@ -154,6 +168,12 @@ writeback_listener_finalize (GObject *object)
priv = tracker_writeback_listener_get_instance_private (listener);
+ if (priv->event_dispatch_id) {
+ g_source_remove (priv->event_dispatch_id);
+ }
+
+ g_queue_free_full (priv->events, (GDestroyNotify) free_event);
+
if (priv->connection && priv->d_signal) {
g_dbus_connection_signal_unsubscribe (priv->d_connection, priv->d_signal);
}
@@ -175,6 +195,11 @@ writeback_listener_finalize (GObject *object)
static void
tracker_writeback_listener_init (TrackerWritebackListener *object)
{
+ TrackerWritebackListener *listener = TRACKER_WRITEBACK_LISTENER (object);
+ TrackerWritebackListenerPrivate *priv;
+
+ priv = tracker_writeback_listener_get_instance_private (listener);
+ priv->events = g_queue_new ();
}
static gboolean
@@ -288,6 +313,8 @@ sparql_query_cb (GObject *object,
g_message (" No files qualify for updates");
query_data_free (data);
g_object_unref (cursor);
+
+ check_start_idle (self, TRUE);
return;
}
}
@@ -330,6 +357,8 @@ sparql_query_cb (GObject *object,
}
query_data_free (data);
+
+ check_start_idle (self, TRUE);
}
static void
@@ -343,56 +372,52 @@ rdf_types_to_uris_cb (GObject *object,
TrackerSparqlCursor *cursor;
TrackerSparqlConnection *connection;
GError *error = NULL;
+ gchar *query;
+ GArray *rdf_types;
+ gchar *subject = NULL;
priv = tracker_writeback_listener_get_instance_private (self);
connection = priv->connection;
cursor = tracker_sparql_connection_query_finish (connection, result, &error);
- if (!error) {
- gchar *query;
- GArray *rdf_types;
- gchar *subject = NULL;
+ if (error)
+ goto trouble;
- rdf_types = g_array_new (TRUE, TRUE, sizeof (gchar *));
+ rdf_types = g_array_new (TRUE, TRUE, sizeof (gchar *));
- while (tracker_sparql_cursor_next (cursor, NULL, NULL)) {
- gchar *uri = g_strdup (tracker_sparql_cursor_get_string (cursor, 0, NULL));
- if (subject == NULL) {
- subject = g_strdup (tracker_sparql_cursor_get_string (cursor, 1, NULL));
- }
- g_array_append_val (rdf_types, uri);
+ while (tracker_sparql_cursor_next (cursor, NULL, NULL)) {
+ gchar *uri = g_strdup (tracker_sparql_cursor_get_string (cursor, 0, NULL));
+ if (subject == NULL) {
+ subject = g_strdup (tracker_sparql_cursor_get_string (cursor, 1, NULL));
}
+ g_array_append_val (rdf_types, uri);
+ }
- g_object_unref (cursor);
-
- data->rdf_types = (GStrv) rdf_types->data;
- g_array_free (rdf_types, FALSE);
+ g_object_unref (cursor);
- if (subject == NULL) {
- goto trouble;
- }
+ data->rdf_types = (GStrv) rdf_types->data;
+ g_array_free (rdf_types, FALSE);
- query = g_strdup_printf ("SELECT ?url '%s' ?predicate ?object { "
- "<%s> a nfo:FileDataObject . "
- "<%s> ?predicate ?object ; nie:url ?url . "
- "?predicate tracker:writeback true . "
- "FILTER (NOT EXISTS { GRAPH <" TRACKER_OWN_GRAPH_URN "> "
- "{ <%s> ?predicate ?object } }) } ",
- subject, subject, subject, subject);
+ if (subject == NULL)
+ goto trouble;
- tracker_sparql_connection_query_async (connection,
- query,
- NULL,
- sparql_query_cb,
- data);
+ query = g_strdup_printf ("SELECT ?url '%s' ?predicate ?object { "
+ "<%s> a nfo:FileDataObject . "
+ "<%s> ?predicate ?object ; nie:url ?url . "
+ "?predicate tracker:writeback true . "
+ "FILTER (NOT EXISTS { GRAPH <" TRACKER_OWN_GRAPH_URN "> "
+ "{ <%s> ?predicate ?object } }) } ",
+ subject, subject, subject, subject);
- g_free (subject);
- g_free (query);
+ tracker_sparql_connection_query_async (connection,
+ query,
+ NULL,
+ sparql_query_cb,
+ data);
- } else {
- goto trouble;
- }
+ g_free (subject);
+ g_free (query);
return;
@@ -402,69 +427,86 @@ trouble:
g_error_free (error);
}
query_data_free (data);
+
+ check_start_idle (self, TRUE);
}
static gboolean
-delayed_loop_idle (gpointer user_data)
+process_event (gpointer user_data)
{
- DelayedLoopData *ldata = user_data;
- GVariantIter *iter2;
- gint subject_id = 0, rdf_type = 0;
-
- if (g_variant_iter_next (ldata->iter1, "{iai}", &subject_id, &iter2)) {
- TrackerWritebackListenerPrivate *priv;
- QueryData *data = NULL;
- GString *query;
- gboolean comma = FALSE;
-
- priv = tracker_writeback_listener_get_instance_private (ldata->self);
-
- data = query_data_new (ldata->self);
-
- /* Two queries are grouped together here to reduce the amount of
- * queries that must be made. tracker:uri() is idd unrelated to
- * the other part of the query (and is repeated each result, idd) */
-
- query = g_string_new ("SELECT ");
- g_string_append_printf (query, "?resource tracker:uri (%d) { "
- "?resource a rdfs:Class . "
- "FILTER (tracker:id (?resource) IN (",
- subject_id);
-
- while (g_variant_iter_loop (iter2, "i", &rdf_type)) {
- if (comma) {
- g_string_append_printf (query, ", %d", rdf_type);
- } else {
- g_string_append_printf (query, "%d", rdf_type);
- comma = TRUE;
- }
- }
+ TrackerWritebackListener *self = user_data;
+ TrackerWritebackListenerPrivate *priv;
+ WritebackEvent *event;
+ QueryData *data = NULL;
+ GString *query;
+ gboolean comma = FALSE;
+ gint i;
- g_string_append (query, ")) }");
+ priv = tracker_writeback_listener_get_instance_private (self);
+ event = g_queue_pop_head (priv->events);
+ priv->event_dispatch_id = 0;
- tracker_sparql_connection_query_async (priv->connection,
- query->str,
- NULL,
- rdf_types_to_uris_cb,
- data);
+ if (!event)
+ return G_SOURCE_REMOVE;
- g_string_free (query, TRUE);
+ data = query_data_new (self);
- g_variant_iter_free (iter2);
+ /* Two queries are grouped together here to reduce the amount of
+ * queries that must be made. tracker:uri() is idd unrelated to
+ * the other part of the query (and is repeated each result, idd) */
+
+ query = g_string_new ("SELECT ");
+ g_string_append_printf (query, "?resource tracker:uri (%d) { "
+ "?resource a rdfs:Class . "
+ "FILTER (tracker:id (?resource) IN (",
+ event->subject_id);
+
+ for (i = 0; event->types[i] != 0; i++) {
+ gint32 rdf_type = event->types[i];
- return TRUE;
+ if (comma) {
+ g_string_append_printf (query, ", %d", rdf_type);
+ } else {
+ g_string_append_printf (query, "%d", rdf_type);
+ comma = TRUE;
+ }
}
- return FALSE;
+ g_string_append (query, ")) }");
+
+ tracker_sparql_connection_query_async (priv->connection,
+ query->str,
+ NULL,
+ rdf_types_to_uris_cb,
+ data);
+ g_string_free (query, TRUE);
+ free_event (event);
+
+ return G_SOURCE_REMOVE;
}
static void
-delayed_loop_finished (gpointer user_data)
+check_start_idle (TrackerWritebackListener *self,
+ gboolean force)
{
- DelayedLoopData *ldata = user_data;
- g_variant_iter_free (ldata->iter1);
- g_object_unref (ldata->self);
- g_free (ldata);
+ TrackerWritebackListenerPrivate *priv;
+
+ priv = tracker_writeback_listener_get_instance_private (self);
+
+ if (priv->event_dispatch_id != 0)
+ return;
+ if (priv->querying && !force)
+ return;
+ if (g_queue_is_empty (priv->events)) {
+ priv->querying = FALSE;
+ return;
+ }
+
+ priv->querying = TRUE;
+ priv->event_dispatch_id =
+ g_idle_add_full (G_PRIORITY_LOW,
+ process_event,
+ self, NULL);
}
static void
@@ -477,13 +519,27 @@ on_writeback_cb (GDBusConnection *connection,
gpointer user_data)
{
TrackerWritebackListener *self = TRACKER_WRITEBACK_LISTENER (user_data);
- DelayedLoopData *data = g_new (DelayedLoopData, 1);
+ TrackerWritebackListenerPrivate *priv;
+ GVariantIter *iter1, *iter2;
+ gint32 subject_id;
- data->self = g_object_ref (self);
- g_variant_get (parameters, "(a{iai})", &data->iter1);
+ priv = tracker_writeback_listener_get_instance_private (self);
+ g_variant_get (parameters, "(a{iai})", &iter1);
+
+ if (g_variant_iter_next (iter1, "{iai}", &subject_id, &iter2)) {
+ WritebackEvent *event = g_new (WritebackEvent, 1);
+ GArray *types = g_array_new (TRUE, TRUE, sizeof (gint32));
+ gint32 rdf_type;
+
+ while (g_variant_iter_loop (iter2, "i", &rdf_type))
+ g_array_append_val (types, rdf_type);
+
+ g_variant_iter_free (iter2);
+ event->subject_id = subject_id;
+ event->types = (gint32 *) g_array_free (types, FALSE);
+ g_queue_push_tail (priv->events, event);
+ }
- g_idle_add_full (G_PRIORITY_LOW,
- delayed_loop_idle,
- data,
- delayed_loop_finished);
+ g_variant_iter_free (iter1);
+ check_start_idle (self, FALSE);
}
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]