[tracker] rss: Perform extraction/insertion of feed items at once
- From: Carlos Garnacho <carlosg src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [tracker] rss: Perform extraction/insertion of feed items at once
- Date: Sun, 26 Jul 2015 14:22:56 +0000 (UTC)
commit 5ed1528f58e8c94092c1381a5aa9c07fff319b1e
Author: Carlos Garnacho <carlosg gnome org>
Date: Sat Jul 25 19:30:54 2015 +0200
rss: Perform extraction/insertion of feed items at once
The async ASK/INSERT pattern per-feed was vulnerable to nie:url collisions
if we happened to process several feed items pointing to the same URL
simultaneously.
Change this so we first query the older items, and then we check items are
really up to date, inserting only the newest of occurrences, and updating
the database if the content changed for real (eg. the post was updated).
This is done in a single update_array operation for all feed items in a
channel.
src/miners/rss/tracker-miner-rss.c | 310 +++++++++++++++++++++---------------
1 files changed, 184 insertions(+), 126 deletions(-)
---
diff --git a/src/miners/rss/tracker-miner-rss.c b/src/miners/rss/tracker-miner-rss.c
index 6fdfb87..d302de8 100644
--- a/src/miners/rss/tracker-miner-rss.c
+++ b/src/miners/rss/tracker-miner-rss.c
@@ -46,7 +46,6 @@ struct _TrackerMinerRSSPrivate {
GDBusConnection *connection;
guint graph_updated_id;
- GList *item_inserts;
GHashTable *channel_updates;
GHashTable *channels;
@@ -67,6 +66,12 @@ typedef struct {
GCancellable *cancellable;
} FeedItemInsertData;
+typedef struct {
+ TrackerMinerRSS *miner;
+ GrssFeedChannel *channel;
+ GHashTable *items;
+} FeedItemListInsertData;
+
static void graph_updated_cb (GDBusConnection *connection,
const gchar *sender_name,
const gchar *object_path,
@@ -82,7 +87,6 @@ static void retrieve_and_schedule_feeds (TrackerMinerRSS *mine
GArray *channel_ids);
static gboolean feed_channel_changed_timeout_cb (gpointer user_data);
static void feed_channel_update_data_free (FeedChannelUpdateData *fcud);
-static void feed_item_insert_data_free (FeedItemInsertData *fiid);
static void feed_fetching_cb (GrssFeedsPool *pool,
GrssFeedChannel *feed,
gpointer user_data);
@@ -177,9 +181,6 @@ tracker_miner_rss_finalize (GObject *object)
g_dbus_connection_signal_unsubscribe (priv->connection, priv->graph_updated_id);
g_object_unref (priv->connection);
- g_list_foreach (priv->item_inserts, (GFunc) feed_item_insert_data_free, NULL);
- g_list_free (priv->item_inserts);
-
g_hash_table_unref (priv->channel_updates);
g_hash_table_unref (priv->channels);
@@ -476,41 +477,47 @@ feed_channel_update_data_free (FeedChannelUpdateData *fcud)
g_slice_free (FeedChannelUpdateData, fcud);
}
-static FeedItemInsertData *
-feed_item_insert_data_new (TrackerMinerRSS *miner,
- GrssFeedItem *item)
+static FeedItemListInsertData *
+feed_item_list_insert_data_new (TrackerMinerRSS *miner,
+ GrssFeedChannel *channel,
+ GList *items)
{
- FeedItemInsertData *fiid;
+ FeedItemListInsertData *data;
+ GrssFeedItem *prev;
+ const gchar *url;
+ GList *l;
+
+ data = g_slice_new0 (FeedItemListInsertData);
+ data->channel = channel;
+ data->miner = miner;
+ data->items = g_hash_table_new_full (g_str_hash, g_str_equal,
+ (GDestroyNotify) g_free,
+ (GDestroyNotify) g_object_unref);
+
+ /* Make items unique, keep most recent */
+ for (l = items; l; l = l->next) {
+ url = get_message_url (l->data);
+ prev = g_hash_table_lookup (data->items, url);
+
+ if (prev) {
+ /* Compare publish times */
+ if (grss_feed_item_get_publish_time (l->data) <=
+ grss_feed_item_get_publish_time (prev))
+ continue;
+ }
- fiid = g_slice_new0 (FeedItemInsertData);
- fiid->miner = g_object_ref (miner);
- fiid->item = g_object_ref (item);
- fiid->cancellable = g_cancellable_new ();
+ g_hash_table_insert (data->items, g_strdup (url),
+ g_object_ref (l->data));
+ }
- return fiid;
+ return data;
}
static void
-feed_item_insert_data_free (FeedItemInsertData *fiid)
+feed_item_list_insert_data_free (FeedItemListInsertData *data)
{
- if (!fiid) {
- return;
- }
-
- if (fiid->cancellable) {
- g_cancellable_cancel (fiid->cancellable);
- g_object_unref (fiid->cancellable);
- }
-
- if (fiid->item) {
- g_object_unref (fiid->item);
- }
-
- if (fiid->miner) {
- g_object_unref (fiid->miner);
- }
-
- g_slice_free (FeedItemInsertData, fiid);
+ g_hash_table_destroy (data->items);
+ g_slice_free (FeedItemListInsertData, data);
}
static void
@@ -585,16 +592,16 @@ feed_channel_changed_timeout_cb (gpointer user_data)
}
static void
-feed_channel_change_updated_time (FeedItemInsertData *fiid)
+feed_channel_change_updated_time (TrackerMinerRSS *miner,
+ GrssFeedChannel *channel)
{
TrackerMinerRSSPrivate *priv;
- GrssFeedChannel *channel;
FeedChannelUpdateData *fcud;
- priv = TRACKER_MINER_RSS_GET_PRIVATE (fiid->miner);
+ if (!channel)
+ return;
- /* Check we don't already have an update request for this channel */
- channel = grss_feed_item_get_parent (fiid->item);
+ priv = TRACKER_MINER_RSS_GET_PRIVATE (miner);
fcud = g_hash_table_lookup (priv->channel_updates, channel);
if (fcud) {
@@ -607,7 +614,7 @@ feed_channel_change_updated_time (FeedItemInsertData *fiid)
fcud);
} else {
/* This is a new update for this channel */
- fcud = feed_channel_update_data_new (fiid->miner, channel);
+ fcud = feed_channel_update_data_new (miner, channel);
g_hash_table_insert (priv->channel_updates,
fcud->channel,
fcud);
@@ -642,32 +649,6 @@ feed_fetching_cb (GrssFeedsPool *pool,
g_object_set (miner, "progress", prog, "status", "Fetching…", NULL);
}
-static void
-feed_item_insert_cb (GObject *source,
- GAsyncResult *result,
- gpointer user_data)
-{
- FeedItemInsertData *fiid;
- GError *error;
- const gchar *title;
-
- fiid = user_data;
- title = grss_feed_item_get_title (fiid->item);
- error = NULL;
-
- tracker_sparql_connection_update_finish (TRACKER_SPARQL_CONNECTION (source), result, &error);
- if (error != NULL) {
- g_critical ("Could not insert feed information for message titled:'%s', %s",
- title,
- error->message);
- g_error_free (error);
- } else {
- feed_channel_change_updated_time (fiid);
- }
-
- feed_item_insert_data_free (fiid);
-}
-
static gchar *
sparql_add_website (TrackerSparqlBuilder *sparql,
const gchar *uri)
@@ -725,6 +706,16 @@ sparql_add_contact (TrackerSparqlBuilder *sparql,
}
}
+static gchar *
+feed_message_create_delete_properties_query (const gchar *item_urn)
+{
+ return g_strdup_printf ("DELETE { <%s> ?p ?o }"
+ "WHERE { <%s> a mfo:FeedMessage ;"
+ " ?p ?o ."
+ " FILTER (?p != rdf:type)"
+ "}", item_urn, item_urn);
+}
+
static TrackerSparqlBuilder *
feed_message_create_insert_builder (TrackerMinerRSS *miner,
GrssFeedItem *item,
@@ -912,94 +903,166 @@ feed_message_create_insert_builder (TrackerMinerRSS *miner,
}
static void
-feed_item_check_exists_cb (GObject *source_object,
- GAsyncResult *res,
- gpointer user_data)
+feed_channel_content_update_cb (GObject *source,
+ GAsyncResult *result,
+ gpointer user_data)
+{
+ TrackerSparqlConnection *connection;
+ GPtrArray *errors, *array = user_data;
+ GError *error = NULL;
+ guint i;
+
+ connection = TRACKER_SPARQL_CONNECTION (source);
+ errors = tracker_sparql_connection_update_array_finish (connection,
+ result, &error);
+
+ if (error) {
+ g_warning ("Could not update feed items: %s",
+ error->message);
+ g_error_free (error);
+ } else {
+ for (i = 0; i < errors->len; i++) {
+ GError *error = g_ptr_array_index (errors, i);
+
+ if (!error)
+ continue;
+
+ g_warning ("Error in item %d of update: %s\nQuery: %s", i,
+ error->message, (gchar *) g_ptr_array_index (array, i));
+ }
+
+ g_ptr_array_unref (errors);
+ }
+
+ g_ptr_array_unref (array);
+}
+
+static void
+check_feed_items_cb (GObject *source_object,
+ GAsyncResult *res,
+ gpointer user_data)
{
TrackerSparqlConnection *connection;
- FeedItemInsertData *fiid;
- TrackerSparqlCursor *cursor;
- GError *error;
TrackerSparqlBuilder *sparql;
+ FeedItemListInsertData *data;
+ TrackerSparqlCursor *cursor;
+ GrssFeedItem *item;
+ GError *error = NULL;
+ GHashTableIter iter;
+ GPtrArray *array;
+ const gchar *str;
- fiid = user_data;
+ data = user_data;
connection = TRACKER_SPARQL_CONNECTION (source_object);
- error = NULL;
cursor = tracker_sparql_connection_query_finish (connection, res, &error);
+ array = g_ptr_array_new_with_free_func ((GDestroyNotify) g_free);
- if (error != NULL) {
- g_message ("Could not verify feed existance, %s", error->message);
- g_error_free (error);
+ while (!error && tracker_sparql_cursor_next (cursor, NULL, &error)) {
+ const gchar *urn, *url, *date;
+ time_t time;
- if (cursor) {
- g_object_unref (cursor);
- }
+ urn = tracker_sparql_cursor_get_string (cursor, 0, NULL);
+ url = tracker_sparql_cursor_get_string (cursor, 1, NULL);
+ date = tracker_sparql_cursor_get_string (cursor, 2, NULL);
+ time = (time_t) tracker_string_to_date (date, NULL, NULL);
- feed_item_insert_data_free (fiid);
+ item = g_hash_table_lookup (data->items, url);
- return;
- }
+ if (!item)
+ continue;
- if (!tracker_sparql_cursor_next (cursor, NULL, NULL)) {
- g_message ("No data in query response??");
+ if (time <= grss_feed_item_get_publish_time (item)) {
+ g_debug ("Item '%s' already up to date", url);
+ } else {
+ g_debug ("Updating item '%s'", url);
- if (cursor) {
- g_object_unref (cursor);
+ g_ptr_array_add (array, feed_message_create_delete_properties_query (urn));
+
+ sparql = feed_message_create_insert_builder (data->miner,
+ item, urn);
+ str = tracker_sparql_builder_get_result (sparql);
+ g_ptr_array_add (array, g_strdup (str));
+ g_object_unref (sparql);
}
- feed_item_insert_data_free (fiid);
+ g_hash_table_remove (data->items, url);
+ }
- return;
+ if (cursor) {
+ g_object_unref (cursor);
}
- if (tracker_sparql_cursor_get_boolean (cursor, 0)) {
- g_debug (" Item already exists '%s'",
- grss_feed_item_get_title (fiid->item));
+ if (error) {
+ g_message ("Could check feed items, %s", error->message);
+ g_error_free (error);
+ feed_item_list_insert_data_free (data);
+ g_ptr_array_unref (array);
+ return;
+ }
- if (cursor) {
- g_object_unref (cursor);
- }
+ g_hash_table_iter_init (&iter, data->items);
- feed_item_insert_data_free (fiid);
+ /* Insert all remaining items as new */
+ while (g_hash_table_iter_next (&iter, NULL, (gpointer *) &item)) {
+ sparql = feed_message_create_insert_builder (data->miner,
+ item, NULL);
+ str = tracker_sparql_builder_get_result (sparql);
+ g_ptr_array_add (array, g_strdup (str));
+ g_object_unref (sparql);
+ }
+ if (array->len == 0) {
+ feed_item_list_insert_data_free (data);
+ g_ptr_array_unref (array);
return;
}
- sparql = feed_message_create_insert_builder (fiid->miner, fiid->item, NULL);
- tracker_sparql_connection_update_async (connection,
- tracker_sparql_builder_get_result (sparql),
- G_PRIORITY_DEFAULT,
- fiid->cancellable,
- feed_item_insert_cb,
- fiid);
- g_object_unref (sparql);
- g_object_unref (cursor);
+ tracker_sparql_connection_update_array_async (tracker_miner_get_connection (TRACKER_MINER
(data->miner)),
+ (gchar **) array->pdata,
+ array->len,
+ G_PRIORITY_DEFAULT, NULL,
+ feed_channel_content_update_cb,
+ array);
+ feed_channel_change_updated_time (data->miner, data->channel);
+ feed_item_list_insert_data_free (data);
}
static void
-feed_item_check_exists (TrackerMinerRSS *miner,
- GrssFeedItem *item)
+check_feed_items (TrackerMinerRSS *miner,
+ GrssFeedChannel *channel,
+ GList *items)
{
- FeedItemInsertData *fiid;
- gchar *query;
+ FeedItemListInsertData *data;
+ GHashTableIter iter;
+ GrssFeedItem *item;
+ gboolean first = TRUE;
const gchar *url;
+ GString *query;
- url = get_message_url (item);
+ data = feed_item_list_insert_data_new (miner, channel, items);
+ g_hash_table_iter_init (&iter, data->items);
+
+ query = g_string_new ("SELECT ?msg nie:url(?msg)"
+ " nie:contentCreated(?msg) {"
+ " ?msg a rdfs:Resource ."
+ " FILTER (nie:url(?msg) IN (");
- query = g_strdup_printf ("ASK {"
- " ?message a mfo:FeedMessage ;"
- " nie:url \"%s\""
- "}",
- url);
+ while (g_hash_table_iter_next (&iter, NULL, (gpointer *) &item)) {
+ if (!first)
+ g_string_append_c (query, ',');
- fiid = feed_item_insert_data_new (miner, item);
+ url = get_message_url (item);
+ g_string_append_printf (query, "\"%s\"", url);
+ first = FALSE;
+ }
+
+ g_string_append (query, "))}");
tracker_sparql_connection_query_async (tracker_miner_get_connection (TRACKER_MINER (miner)),
- query,
- fiid->cancellable,
- feed_item_check_exists_cb,
- fiid);
- g_free (query);
+ query->str, NULL,
+ check_feed_items_cb,
+ data);
+ g_string_free (query, TRUE);
}
static void
@@ -1077,7 +1140,6 @@ feed_ready_cb (GrssFeedsPool *pool,
{
TrackerMinerRSS *miner;
TrackerMinerRSSPrivate *priv;
- GList *iter;
miner = TRACKER_MINER_RSS (user_data);
priv = TRACKER_MINER_RSS_GET_PRIVATE (miner);
@@ -1100,11 +1162,7 @@ feed_ready_cb (GrssFeedsPool *pool,
g_message ("Verifying channel:'%s' is up to date",
grss_feed_channel_get_title (channel));
- for (iter = items; iter; iter = iter->next) {
- GrssFeedItem *item = iter->data;
-
- feed_item_check_exists (miner, item);
- }
+ check_feed_items (miner, channel, items);
}
static void
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]