[tracker] rss: Perform extraction/insertion of feed items at once



commit 5ed1528f58e8c94092c1381a5aa9c07fff319b1e
Author: Carlos Garnacho <carlosg gnome org>
Date:   Sat Jul 25 19:30:54 2015 +0200

    rss: Perform extraction/insertion of feed items at once
    
    The async ASK/INSERT pattern per-feed was vulnerable to nie:url collisions
    if we happened to process several feed items pointing to the same URL
    simultaneously.
    
    Change this so we first query the older items, and then we check items are
    really up to date, inserting only the newest of occurrences, and updating
    the database if the content changed for real (eg. the post was updated).
    This is done in a single update_array operation for all feed items in a
    channel.

 src/miners/rss/tracker-miner-rss.c |  310 +++++++++++++++++++++---------------
 1 files changed, 184 insertions(+), 126 deletions(-)
---
diff --git a/src/miners/rss/tracker-miner-rss.c b/src/miners/rss/tracker-miner-rss.c
index 6fdfb87..d302de8 100644
--- a/src/miners/rss/tracker-miner-rss.c
+++ b/src/miners/rss/tracker-miner-rss.c
@@ -46,7 +46,6 @@ struct _TrackerMinerRSSPrivate {
        GDBusConnection *connection;
        guint graph_updated_id;
 
-       GList *item_inserts;
        GHashTable *channel_updates;
        GHashTable *channels;
 
@@ -67,6 +66,12 @@ typedef struct {
        GCancellable *cancellable;
 } FeedItemInsertData;
 
+typedef struct {
+       TrackerMinerRSS *miner;
+       GrssFeedChannel *channel;
+       GHashTable *items;
+} FeedItemListInsertData;
+
 static void         graph_updated_cb                (GDBusConnection       *connection,
                                                      const gchar           *sender_name,
                                                      const gchar           *object_path,
@@ -82,7 +87,6 @@ static void         retrieve_and_schedule_feeds     (TrackerMinerRSS       *mine
                                                      GArray                *channel_ids);
 static gboolean     feed_channel_changed_timeout_cb (gpointer               user_data);
 static void         feed_channel_update_data_free   (FeedChannelUpdateData *fcud);
-static void         feed_item_insert_data_free      (FeedItemInsertData    *fiid);
 static void         feed_fetching_cb                (GrssFeedsPool             *pool,
                                                      GrssFeedChannel           *feed,
                                                      gpointer               user_data);
@@ -177,9 +181,6 @@ tracker_miner_rss_finalize (GObject *object)
        g_dbus_connection_signal_unsubscribe (priv->connection, priv->graph_updated_id);
        g_object_unref (priv->connection);
 
-       g_list_foreach (priv->item_inserts, (GFunc) feed_item_insert_data_free, NULL);
-       g_list_free (priv->item_inserts);
-
        g_hash_table_unref (priv->channel_updates);
        g_hash_table_unref (priv->channels);
 
@@ -476,41 +477,47 @@ feed_channel_update_data_free (FeedChannelUpdateData *fcud)
        g_slice_free (FeedChannelUpdateData, fcud);
 }
 
-static FeedItemInsertData *
-feed_item_insert_data_new (TrackerMinerRSS *miner,
-                           GrssFeedItem    *item)
+static FeedItemListInsertData *
+feed_item_list_insert_data_new (TrackerMinerRSS *miner,
+                                GrssFeedChannel *channel,
+                                GList           *items)
 {
-       FeedItemInsertData *fiid;
+       FeedItemListInsertData *data;
+       GrssFeedItem *prev;
+       const gchar *url;
+       GList *l;
+
+       data = g_slice_new0 (FeedItemListInsertData);
+       data->channel = channel;
+       data->miner = miner;
+       data->items = g_hash_table_new_full (g_str_hash, g_str_equal,
+                                            (GDestroyNotify) g_free,
+                                            (GDestroyNotify) g_object_unref);
+
+       /* Make items unique, keep most recent */
+       for (l = items; l; l = l->next) {
+               url = get_message_url (l->data);
+               prev = g_hash_table_lookup (data->items, url);
+
+               if (prev) {
+                       /* Compare publish times */
+                       if (grss_feed_item_get_publish_time (l->data) <=
+                           grss_feed_item_get_publish_time (prev))
+                               continue;
+               }
 
-       fiid = g_slice_new0 (FeedItemInsertData);
-       fiid->miner = g_object_ref (miner);
-       fiid->item = g_object_ref (item);
-       fiid->cancellable = g_cancellable_new ();
+               g_hash_table_insert (data->items, g_strdup (url),
+                                    g_object_ref (l->data));
+       }
 
-       return fiid;
+       return data;
 }
 
 static void
-feed_item_insert_data_free (FeedItemInsertData *fiid)
+feed_item_list_insert_data_free (FeedItemListInsertData *data)
 {
-       if (!fiid) {
-               return;
-       }
-
-       if (fiid->cancellable) {
-               g_cancellable_cancel (fiid->cancellable);
-               g_object_unref (fiid->cancellable);
-       }
-
-       if (fiid->item) {
-               g_object_unref (fiid->item);
-       }
-
-       if (fiid->miner) {
-               g_object_unref (fiid->miner);
-       }
-
-       g_slice_free (FeedItemInsertData, fiid);
+       g_hash_table_destroy (data->items);
+       g_slice_free (FeedItemListInsertData, data);
 }
 
 static void
@@ -585,16 +592,16 @@ feed_channel_changed_timeout_cb (gpointer user_data)
 }
 
 static void
-feed_channel_change_updated_time (FeedItemInsertData *fiid)
+feed_channel_change_updated_time (TrackerMinerRSS *miner,
+                                  GrssFeedChannel *channel)
 {
        TrackerMinerRSSPrivate *priv;
-       GrssFeedChannel *channel;
        FeedChannelUpdateData *fcud;
 
-       priv = TRACKER_MINER_RSS_GET_PRIVATE (fiid->miner);
+       if (!channel)
+               return;
 
-       /* Check we don't already have an update request for this channel */
-       channel = grss_feed_item_get_parent (fiid->item);
+       priv = TRACKER_MINER_RSS_GET_PRIVATE (miner);
 
        fcud = g_hash_table_lookup (priv->channel_updates, channel);
        if (fcud) {
@@ -607,7 +614,7 @@ feed_channel_change_updated_time (FeedItemInsertData *fiid)
                                                          fcud);
        } else {
                /* This is a new update for this channel */
-               fcud = feed_channel_update_data_new (fiid->miner, channel);
+               fcud = feed_channel_update_data_new (miner, channel);
                g_hash_table_insert (priv->channel_updates,
                                     fcud->channel,
                                     fcud);
@@ -642,32 +649,6 @@ feed_fetching_cb (GrssFeedsPool   *pool,
        g_object_set (miner, "progress", prog, "status", "Fetching…", NULL);
 }
 
-static void
-feed_item_insert_cb (GObject      *source,
-                     GAsyncResult *result,
-                     gpointer      user_data)
-{
-       FeedItemInsertData *fiid;
-       GError *error;
-       const gchar *title;
-
-       fiid = user_data;
-       title = grss_feed_item_get_title (fiid->item);
-       error = NULL;
-
-       tracker_sparql_connection_update_finish (TRACKER_SPARQL_CONNECTION (source), result, &error);
-       if (error != NULL) {
-               g_critical ("Could not insert feed information for message titled:'%s', %s",
-                           title,
-                           error->message);
-               g_error_free (error);
-       } else {
-               feed_channel_change_updated_time (fiid);
-       }
-
-       feed_item_insert_data_free (fiid);
-}
-
 static gchar *
 sparql_add_website (TrackerSparqlBuilder *sparql,
                     const gchar          *uri)
@@ -725,6 +706,16 @@ sparql_add_contact (TrackerSparqlBuilder *sparql,
        }
 }
 
+static gchar *
+feed_message_create_delete_properties_query (const gchar *item_urn)
+{
+       return g_strdup_printf ("DELETE { <%s> ?p ?o }"
+                               "WHERE  { <%s> a mfo:FeedMessage ;"
+                               "              ?p ?o ."
+                               "              FILTER (?p != rdf:type)"
+                               "}", item_urn, item_urn);
+}
+
 static TrackerSparqlBuilder *
 feed_message_create_insert_builder (TrackerMinerRSS    *miner,
                                     GrssFeedItem       *item,
@@ -912,94 +903,166 @@ feed_message_create_insert_builder (TrackerMinerRSS    *miner,
 }
 
 static void
-feed_item_check_exists_cb (GObject      *source_object,
-                           GAsyncResult *res,
-                           gpointer      user_data)
+feed_channel_content_update_cb (GObject      *source,
+                                GAsyncResult *result,
+                                gpointer      user_data)
+{
+       TrackerSparqlConnection *connection;
+       GPtrArray *errors, *array = user_data;
+       GError *error = NULL;
+       guint i;
+
+       connection = TRACKER_SPARQL_CONNECTION (source);
+       errors = tracker_sparql_connection_update_array_finish (connection,
+                                                               result, &error);
+
+       if (error) {
+               g_warning ("Could not update feed items: %s",
+                          error->message);
+               g_error_free (error);
+       } else {
+               for (i = 0; i < errors->len; i++) {
+                       GError *error = g_ptr_array_index (errors, i);
+
+                       if (!error)
+                               continue;
+
+                       g_warning ("Error in item %d of update: %s\nQuery: %s", i,
+                                  error->message, (gchar *) g_ptr_array_index (array, i));
+               }
+
+               g_ptr_array_unref (errors);
+       }
+
+       g_ptr_array_unref (array);
+}
+
+static void
+check_feed_items_cb (GObject      *source_object,
+                     GAsyncResult *res,
+                     gpointer      user_data)
 {
        TrackerSparqlConnection *connection;
-       FeedItemInsertData *fiid;
-       TrackerSparqlCursor *cursor;
-       GError *error;
        TrackerSparqlBuilder *sparql;
+       FeedItemListInsertData *data;
+       TrackerSparqlCursor *cursor;
+       GrssFeedItem *item;
+       GError *error = NULL;
+       GHashTableIter iter;
+       GPtrArray *array;
+       const gchar *str;
 
-       fiid = user_data;
+       data = user_data;
        connection = TRACKER_SPARQL_CONNECTION (source_object);
-       error = NULL;
        cursor = tracker_sparql_connection_query_finish (connection, res, &error);
+       array = g_ptr_array_new_with_free_func ((GDestroyNotify) g_free);
 
-       if (error != NULL) {
-               g_message ("Could not verify feed existance, %s", error->message);
-               g_error_free (error);
+       while (!error && tracker_sparql_cursor_next (cursor, NULL, &error)) {
+               const gchar *urn, *url, *date;
+               time_t time;
 
-               if (cursor) {
-                       g_object_unref (cursor);
-               }
+               urn = tracker_sparql_cursor_get_string (cursor, 0, NULL);
+               url = tracker_sparql_cursor_get_string (cursor, 1, NULL);
+               date = tracker_sparql_cursor_get_string (cursor, 2, NULL);
+               time = (time_t) tracker_string_to_date (date, NULL, NULL);
 
-               feed_item_insert_data_free (fiid);
+               item = g_hash_table_lookup (data->items, url);
 
-               return;
-       }
+               if (!item)
+                       continue;
 
-       if (!tracker_sparql_cursor_next (cursor, NULL, NULL)) {
-               g_message ("No data in query response??");
+               if (time <= grss_feed_item_get_publish_time (item)) {
+                       g_debug ("Item '%s' already up to date", url);
+               } else {
+                       g_debug ("Updating item '%s'", url);
 
-               if (cursor) {
-                       g_object_unref (cursor);
+                       g_ptr_array_add (array, feed_message_create_delete_properties_query (urn));
+
+                       sparql = feed_message_create_insert_builder (data->miner,
+                                                                    item, urn);
+                       str = tracker_sparql_builder_get_result (sparql);
+                       g_ptr_array_add (array, g_strdup (str));
+                       g_object_unref (sparql);
                }
 
-               feed_item_insert_data_free (fiid);
+               g_hash_table_remove (data->items, url);
+       }
 
-               return;
+       if (cursor) {
+               g_object_unref (cursor);
        }
 
-       if (tracker_sparql_cursor_get_boolean (cursor, 0)) {
-               g_debug ("  Item already exists '%s'",
-                        grss_feed_item_get_title (fiid->item));
+       if (error) {
+               g_message ("Could check feed items, %s", error->message);
+               g_error_free (error);
+               feed_item_list_insert_data_free (data);
+               g_ptr_array_unref (array);
+               return;
+       }
 
-               if (cursor) {
-                       g_object_unref (cursor);
-               }
+       g_hash_table_iter_init (&iter, data->items);
 
-               feed_item_insert_data_free (fiid);
+       /* Insert all remaining items as new */
+       while (g_hash_table_iter_next (&iter, NULL, (gpointer *) &item)) {
+               sparql = feed_message_create_insert_builder (data->miner,
+                                                            item, NULL);
+               str = tracker_sparql_builder_get_result (sparql);
+               g_ptr_array_add (array, g_strdup (str));
+               g_object_unref (sparql);
+       }
 
+       if (array->len == 0) {
+               feed_item_list_insert_data_free (data);
+               g_ptr_array_unref (array);
                return;
        }
 
-       sparql = feed_message_create_insert_builder (fiid->miner, fiid->item, NULL);
-       tracker_sparql_connection_update_async (connection,
-                                               tracker_sparql_builder_get_result (sparql),
-                                               G_PRIORITY_DEFAULT,
-                                               fiid->cancellable,
-                                               feed_item_insert_cb,
-                                               fiid);
-       g_object_unref (sparql);
-       g_object_unref (cursor);
+       tracker_sparql_connection_update_array_async (tracker_miner_get_connection (TRACKER_MINER 
(data->miner)),
+                                                     (gchar **) array->pdata,
+                                                     array->len,
+                                                     G_PRIORITY_DEFAULT, NULL,
+                                                     feed_channel_content_update_cb,
+                                                     array);
+       feed_channel_change_updated_time (data->miner, data->channel);
+       feed_item_list_insert_data_free (data);
 }
 
 static void
-feed_item_check_exists (TrackerMinerRSS *miner,
-                        GrssFeedItem    *item)
+check_feed_items (TrackerMinerRSS *miner,
+                  GrssFeedChannel *channel,
+                  GList           *items)
 {
-       FeedItemInsertData *fiid;
-       gchar *query;
+       FeedItemListInsertData *data;
+       GHashTableIter iter;
+       GrssFeedItem *item;
+       gboolean first = TRUE;
        const gchar *url;
+       GString *query;
 
-       url = get_message_url (item);
+       data = feed_item_list_insert_data_new (miner, channel, items);
+       g_hash_table_iter_init (&iter, data->items);
+
+       query = g_string_new ("SELECT ?msg nie:url(?msg)"
+                             "       nie:contentCreated(?msg) {"
+                             "  ?msg a rdfs:Resource ."
+                             "       FILTER (nie:url(?msg) IN (");
 
-       query = g_strdup_printf ("ASK {"
-                                "  ?message a mfo:FeedMessage ;"
-                                "             nie:url \"%s\""
-                                "}",
-                                url);
+       while (g_hash_table_iter_next (&iter, NULL, (gpointer *) &item)) {
+               if (!first)
+                       g_string_append_c (query, ',');
 
-       fiid = feed_item_insert_data_new (miner, item);
+               url = get_message_url (item);
+               g_string_append_printf (query, "\"%s\"", url);
+               first = FALSE;
+       }
+
+       g_string_append (query, "))}");
 
        tracker_sparql_connection_query_async (tracker_miner_get_connection (TRACKER_MINER (miner)),
-                                              query,
-                                              fiid->cancellable,
-                                              feed_item_check_exists_cb,
-                                              fiid);
-       g_free (query);
+                                              query->str, NULL,
+                                              check_feed_items_cb,
+                                              data);
+       g_string_free (query, TRUE);
 }
 
 static void
@@ -1077,7 +1140,6 @@ feed_ready_cb (GrssFeedsPool   *pool,
 {
        TrackerMinerRSS *miner;
        TrackerMinerRSSPrivate *priv;
-       GList *iter;
 
        miner = TRACKER_MINER_RSS (user_data);
        priv = TRACKER_MINER_RSS_GET_PRIVATE (miner);
@@ -1100,11 +1162,7 @@ feed_ready_cb (GrssFeedsPool   *pool,
        g_message ("Verifying channel:'%s' is up to date",
                   grss_feed_channel_get_title (channel));
 
-       for (iter = items; iter; iter = iter->next) {
-               GrssFeedItem *item = iter->data;
-
-               feed_item_check_exists (miner, item);
-       }
+       check_feed_items (miner, channel, items);
 }
 
 static void


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]