[tracker/tracker-0.10] tracker-miner-rss: Only change mfo:updatedTime if a feed is added



commit 6a44f873f748f1abd88fff60a37e946392d2ac6b
Author: Martyn Russell <martyn lanedo com>
Date:   Tue May 3 16:34:21 2011 +0100

    tracker-miner-rss: Only change mfo:updatedTime if a feed is added
    
    Also used proper GCancellable for all SPARQL calls

 src/miners/rss/tracker-miner-rss.c |  358 +++++++++++++++++++++++++++---------
 1 files changed, 269 insertions(+), 89 deletions(-)
---
diff --git a/src/miners/rss/tracker-miner-rss.c b/src/miners/rss/tracker-miner-rss.c
index 7dc468d..6b35230 100644
--- a/src/miners/rss/tracker-miner-rss.c
+++ b/src/miners/rss/tracker-miner-rss.c
@@ -43,21 +43,47 @@ struct _TrackerMinerRSSPrivate {
 	gint now_fetching;
 	GDBusConnection *connection;
 	guint graph_updated_id;
+
+	GList *item_inserts;
+	GHashTable *channel_updates;
 };
 
-static void         miner_started               (TrackerMiner    *miner);
-static void         miner_stopped               (TrackerMiner    *miner);
-static void         miner_paused                (TrackerMiner    *miner);
-static void         miner_resumed               (TrackerMiner    *miner);
-static void         retrieve_and_schedule_feeds (TrackerMinerRSS *miner);
-static void         feed_fetching_cb            (FeedsPool       *pool,
-                                                 FeedChannel     *feed,
-                                                 gpointer         user_data);
-static void         feed_ready_cb               (FeedsPool       *pool,
-                                                 FeedChannel     *feed,
-                                                 GList           *items,
-                                                 gpointer         user_data);
-static const gchar *get_message_url             (FeedItem        *item);
+typedef struct {
+	TrackerMinerRSS *miner;
+	FeedChannel *channel;
+	gint timeout_id;
+	GCancellable *cancellable;
+} FeedChannelUpdateData;
+
+typedef struct {
+	TrackerMinerRSS *miner;
+	FeedItem *item;
+	GCancellable *cancellable;
+} FeedItemInsertData;
+
+static void         graph_updated_cb                (GDBusConnection       *connection,
+                                                     const gchar           *sender_name,
+                                                     const gchar           *object_path,
+                                                     const gchar           *interface_name,
+                                                     const gchar           *signal_name,
+                                                     GVariant              *parameters,
+                                                     gpointer               user_data);
+static void         miner_started                   (TrackerMiner          *miner);
+static void         miner_stopped                   (TrackerMiner          *miner);
+static void         miner_paused                    (TrackerMiner          *miner);
+static void         miner_resumed                   (TrackerMiner          *miner);
+static void         retrieve_and_schedule_feeds     (TrackerMinerRSS       *miner);
+static gboolean     feed_channel_changed_timeout_cb (gpointer               user_data);
+static void         feed_channel_update_data_free   (FeedChannelUpdateData *fcud);
+static void         feed_item_insert_data_free      (FeedItemInsertData    *fiid);
+static void         feed_fetching_cb                (FeedsPool             *pool,
+                                                     FeedChannel           *feed,
+                                                     gpointer               user_data);
+static void         feed_ready_cb                   (FeedsPool             *pool,
+                                                     FeedChannel           *feed,
+                                                     GList                 *items,
+                                                     gpointer               user_data);
+static const gchar *get_message_url                 (FeedItem              *item);
 
 G_DEFINE_TYPE (TrackerMinerRSS, tracker_miner_rss, TRACKER_TYPE_MINER)
 
@@ -75,6 +101,11 @@ tracker_miner_rss_finalize (GObject *object)
 	g_dbus_connection_signal_unsubscribe (priv->connection, priv->graph_updated_id);
 	g_object_unref (priv->connection);
 
+	g_list_foreach (priv->item_inserts, (GFunc) feed_item_insert_data_free, NULL);
+	g_list_free (priv->item_inserts);
+
+	g_hash_table_unref (priv->channel_updates);
+
 	G_OBJECT_CLASS (tracker_miner_rss_parent_class)->finalize (object);
 }
 
@@ -94,6 +125,52 @@ tracker_miner_rss_class_init (TrackerMinerRSSClass *klass)
 	g_type_class_add_private (object_class, sizeof (TrackerMinerRSSPrivate));
 }
 
+static void
+tracker_miner_rss_init (TrackerMinerRSS *object)
+{
+	GError *error = NULL;
+	TrackerMinerRSSPrivate *priv;
+
+	g_message ("Initializing...");
+
+	priv = TRACKER_MINER_RSS_GET_PRIVATE (object);
+
+	priv->connection = g_bus_get_sync (G_BUS_TYPE_SESSION, NULL, &error);
+
+	if (!priv->connection) {
+		g_critical ("Could not connect to the D-Bus session bus, %s",
+			    error ? error->message : "no error given.");
+		g_error_free (error);
+		return;
+	}
+
+	/* Key object reference is cleaned up in value destroy func */
+	priv->channel_updates = g_hash_table_new_full (g_direct_hash,
+	                                               g_direct_equal,
+	                                               NULL,
+	                                               (GDestroyNotify) feed_channel_update_data_free);
+
+	priv->pool = feeds_pool_new ();
+	g_signal_connect (priv->pool, "feed-fetching", G_CALLBACK (feed_fetching_cb), object);
+	g_signal_connect (priv->pool, "feed-ready", G_CALLBACK (feed_ready_cb), object);
+	priv->now_fetching = 0;
+
+	g_message ("Listening for GraphUpdated changes on D-Bus interface...");
+	g_message ("  arg0:'%s'", TRACKER_MFO_PREFIX "FeedChannel");
+
+	priv->graph_updated_id =
+		g_dbus_connection_signal_subscribe  (priv->connection,
+		                                     "org.freedesktop.Tracker1",
+		                                     "org.freedesktop.Tracker1.Resources",
+		                                     "GraphUpdated",
+		                                     "/org/freedesktop/Tracker1/Resources",
+		                                     TRACKER_MFO_PREFIX "FeedChannel",
+		                                     G_DBUS_SIGNAL_FLAGS_NONE,
+		                                     graph_updated_cb,
+		                                     object,
+		                                     NULL);
+}
+
 static gboolean
 check_if_update_is_ours (TrackerSparqlConnection *con,
                          gint                     p)
@@ -184,74 +261,123 @@ graph_updated_cb (GDBusConnection *connection,
 	}
 }
 
+static FeedChannelUpdateData *
+feed_channel_update_data_new (TrackerMinerRSS *miner,
+                              FeedChannel     *channel)
+{
+	FeedChannelUpdateData *fcud;
+
+	fcud = g_slice_new0 (FeedChannelUpdateData);
+	fcud->miner = g_object_ref (miner);
+	fcud->channel = g_object_ref (channel);
+	fcud->timeout_id = g_timeout_add_seconds (2, feed_channel_changed_timeout_cb, fcud);
+	fcud->cancellable = g_cancellable_new ();
+
+	return fcud;
+}
+
 static void
-tracker_miner_rss_init (TrackerMinerRSS *object)
+feed_channel_update_data_free (FeedChannelUpdateData *fcud)
 {
-	GError *error = NULL;
-	TrackerMinerRSSPrivate *priv;
+	if (!fcud) {
+		return;
+	}
 
-	g_message ("Initializing...");
+	if (fcud->cancellable) {
+		g_cancellable_cancel (fcud->cancellable);
+		g_object_unref (fcud->cancellable);
+	}
 
-	priv = TRACKER_MINER_RSS_GET_PRIVATE (object);
+	if (fcud->timeout_id) {
+		g_source_remove (fcud->timeout_id);
+	}
 
-	priv->connection = g_bus_get_sync (G_BUS_TYPE_SESSION, NULL, &error);
+	if (fcud->channel) {
+		g_object_unref (fcud->channel);
+	}
 
-	if (!priv->connection) {
-		g_critical ("Could not connect to the D-Bus session bus, %s",
-			    error ? error->message : "no error given.");
-		g_error_free (error);
+	if (fcud->miner) {
+		g_object_unref (fcud->miner);
+	}
+
+	g_slice_free (FeedChannelUpdateData, fcud);
+}
+
+static FeedItemInsertData *
+feed_item_insert_data_new (TrackerMinerRSS *miner,
+                           FeedItem        *item)
+{
+	FeedItemInsertData *fiid;
+
+	fiid = g_slice_new0 (FeedItemInsertData);
+	fiid->miner = g_object_ref (miner);
+	fiid->item = g_object_ref (item);
+	fiid->cancellable = g_cancellable_new ();
+
+	return fiid;
+}
+
+static void
+feed_item_insert_data_free (FeedItemInsertData *fiid)
+{
+	if (!fiid) {
 		return;
 	}
 
-	priv->pool = feeds_pool_new ();
-	g_signal_connect (priv->pool, "feed-fetching", G_CALLBACK (feed_fetching_cb), object);
-	g_signal_connect (priv->pool, "feed-ready", G_CALLBACK (feed_ready_cb), object);
-	priv->now_fetching = 0;
+	if (fiid->cancellable) {
+		g_cancellable_cancel (fiid->cancellable);
+		g_object_unref (fiid->cancellable);
+	}
 
-	g_message ("Listening for GraphUpdated changes on D-Bus interface...");
-	g_message ("  arg0:'%s'", TRACKER_MFO_PREFIX "FeedChannel");
+	if (fiid->item) {
+		g_object_unref (fiid->item);
+	}
 
-	priv->graph_updated_id =
-		g_dbus_connection_signal_subscribe  (priv->connection,
-		                                     "org.freedesktop.Tracker1",
-		                                     "org.freedesktop.Tracker1.Resources",
-		                                     "GraphUpdated",
-		                                     "/org/freedesktop/Tracker1/Resources",
-		                                     TRACKER_MFO_PREFIX "FeedChannel",
-		                                     G_DBUS_SIGNAL_FLAGS_NONE,
-		                                     graph_updated_cb,
-		                                     object,
-		                                     NULL);
+	if (fiid->miner) {
+		g_object_unref (fiid->miner);
+	}
+
+	g_slice_free (FeedItemInsertData, fiid);
 }
 
 static void
-feed_change_updated_interval_cb (GObject      *source,
-                                 GAsyncResult *result,
-                                 gpointer      user_data)
+feed_channel_change_updated_time_cb (GObject      *source,
+                                     GAsyncResult *result,
+                                     gpointer      user_data)
 {
-	GError *error;
+	TrackerMinerRSSPrivate *priv;
+	FeedChannelUpdateData *fcud;
+	GError *error = NULL;
 
-	error = NULL;
+	fcud = user_data;
+	priv = TRACKER_MINER_RSS_GET_PRIVATE (fcud->miner);
 
 	tracker_sparql_connection_update_finish (TRACKER_SPARQL_CONNECTION (source), result, &error);
 	if (error != NULL) {
-		g_critical ("Could not update channel information, %s", error->message);
+		g_critical ("Could not change feed channel updated time, %s", error->message);
 		g_error_free (error);
 	}
+
+	/* This will clean up the fcud data too */
+	g_hash_table_remove (priv->channel_updates, fcud->channel);
 }
 
-static void
-feed_change_updated_interval (TrackerMinerRSS *miner,
-                              FeedChannel     *feed)
+static gboolean
+feed_channel_changed_timeout_cb (gpointer user_data)
 {
+	TrackerMinerRSSPrivate *priv;
 	TrackerSparqlBuilder *sparql;
+	FeedChannelUpdateData *fcud;
 	gchar *uri;
 	time_t now;
 
+	fcud = user_data;
+	priv = TRACKER_MINER_RSS_GET_PRIVATE (fcud->miner);
+
 	now = time (NULL);
-	uri = g_object_get_data (G_OBJECT (feed), "subject");
+	uri = g_object_get_data (G_OBJECT (fcud->channel), "subject");
 
-	g_message ("Updating mfo:updatedTime for channel '%s'", feed_channel_get_title (feed));
+	g_message ("Updating mfo:updatedTime for channel '%s'", feed_channel_get_title (fcud->channel));
 
 	/* I hope there will be soon a SPARQL command to just update a
 	 * value instead to delete and re-insert it
@@ -275,18 +401,50 @@ feed_change_updated_interval (TrackerMinerRSS *miner,
 	tracker_sparql_builder_object_date (sparql, &now);
 	tracker_sparql_builder_insert_close (sparql);
 
-	tracker_sparql_connection_update_async (tracker_miner_get_connection (TRACKER_MINER (miner)),
+	tracker_sparql_connection_update_async (tracker_miner_get_connection (TRACKER_MINER (fcud->miner)),
 	                                        tracker_sparql_builder_get_result (sparql),
 	                                        G_PRIORITY_DEFAULT,
-	                                        NULL,
-	                                        feed_change_updated_interval_cb,
-	                                        NULL);
+	                                        fcud->cancellable,
+	                                        feed_channel_change_updated_time_cb,
+	                                        fcud);
 	g_object_unref (sparql);
+
+	return FALSE;
+}
+
+static void
+feed_channel_change_updated_time (FeedItemInsertData *fiid)
+{
+	TrackerMinerRSSPrivate *priv;
+	FeedChannel *channel;
+	FeedChannelUpdateData *fcud;
+
+	priv = TRACKER_MINER_RSS_GET_PRIVATE (fiid->miner);
+
+	/* Check we don't already have an update request for this channel */
+	channel = feed_item_get_parent (fiid->item);
+
+	fcud = g_hash_table_lookup (priv->channel_updates, channel);
+	if (fcud) {
+		/* We already had an update for this channel in
+		 * progress, so we just reset the timeout.
+		 */
+		g_source_remove (fcud->timeout_id);
+		fcud->timeout_id = g_timeout_add_seconds (2,
+		                                          feed_channel_changed_timeout_cb,
+		                                          fcud);
+	} else {
+		/* This is a new update for this channel */
+		fcud = feed_channel_update_data_new (fiid->miner, channel);
+		g_hash_table_insert (priv->channel_updates,
+		                     fcud->channel,
+		                     fcud);
+	}
 }
 
 static void
 feed_fetching_cb (FeedsPool   *pool,
-                  FeedChannel *feed,
+                  FeedChannel *channel,
                   gpointer     user_data)
 {
 	gint avail;
@@ -303,8 +461,8 @@ feed_fetching_cb (FeedsPool   *pool,
 	if (priv->now_fetching > avail)
 		priv->now_fetching = avail;
 
-	g_message ("Fetching channel '%s' (in progress: %d/%d)",
-	           feed_channel_get_source (feed),
+	g_message ("Fetching channel details, source:'%s' (in progress: %d/%d)",
+	           feed_channel_get_source (channel),
 	           priv->now_fetching,
 	           avail);
 
@@ -317,10 +475,14 @@ feed_item_insert_cb (GObject      *source,
                      GAsyncResult *result,
                      gpointer      user_data)
 {
+	FeedItemInsertData *fiid;
+	FeedChannel *channel;
 	GError *error;
-	gchar *title;
+	const gchar *title;
 
-	title = user_data;
+	fiid = user_data;
+	channel = feed_item_get_parent (fiid->item);
+	title = feed_item_get_title (fiid->item);
 	error = NULL;
 
 	tracker_sparql_connection_update_finish (TRACKER_SPARQL_CONNECTION (source), result, &error);
@@ -329,9 +491,11 @@ feed_item_insert_cb (GObject      *source,
 		            title,
 		            error->message);
 		g_error_free (error);
+	} else {
+		feed_channel_change_updated_time (fiid);
 	}
 
-	g_free (title);
+	feed_item_insert_data_free (fiid);
 }
 
 static void
@@ -340,9 +504,9 @@ feed_item_check_exists_cb (GObject      *source_object,
                            gpointer      user_data)
 {
 	TrackerSparqlConnection *connection;
+	FeedItemInsertData *fiid;
 	time_t t;
 	gchar *uri;
-	gchar *title = NULL;
 	const gchar *str;
 	const gchar *url;
 	gdouble latitude;
@@ -351,10 +515,10 @@ feed_item_check_exists_cb (GObject      *source_object,
 	TrackerSparqlCursor *cursor;
 	GError *error;
 	TrackerSparqlBuilder *sparql;
-	FeedItem *item;
-	FeedChannel *feed;
+	FeedChannel *channel;
 	gboolean has_geolocation;
 
+	fiid = user_data;
 	connection = TRACKER_SPARQL_CONNECTION (source_object);
 	error = NULL;
 	cursor = tracker_sparql_connection_query_finish (connection, res, &error);
@@ -362,33 +526,50 @@ feed_item_check_exists_cb (GObject      *source_object,
 	if (error != NULL) {
 		g_message ("Could not verify feed existance, %s", error->message);
 		g_error_free (error);
+
 		if (cursor) {
 			g_object_unref (cursor);
 		}
+
+		feed_item_insert_data_free (fiid);
+
 		return;
 	}
 
 	if (!tracker_sparql_cursor_next (cursor, NULL, NULL)) {
 		g_message ("No data in query response??");
-		g_object_unref (cursor);
+
+		if (cursor) {
+			g_object_unref (cursor);
+		}
+
+		feed_item_insert_data_free (fiid);
+
 		return;
 	}
 
+	url = get_message_url (fiid->item);
+	channel = feed_item_get_parent (fiid->item);
+
 	str = tracker_sparql_cursor_get_string (cursor, 0, NULL);
 	if (str && g_ascii_strcasecmp (str, "true") == 0) {
-		g_object_unref (cursor);
-		return;
-	}
+		g_message ("  Item already exists '%s'",
+		           feed_item_get_title (fiid->item));
 
-	item = user_data;
+		if (cursor) {
+			g_object_unref (cursor);
+		}
 
-	url = get_message_url (item);
+		feed_item_insert_data_free (fiid);
+
+		return;
+	}
 
-	g_message ("Updating feed information for '%s'", url);
+	g_message ("Inserting feed item for '%s'", url);
 
 	sparql = tracker_sparql_builder_new_update ();
 
-	has_geolocation = feed_item_get_geo_point (item, &latitude, &longitude);
+	has_geolocation = feed_item_get_geo_point (fiid->item, &latitude, &longitude);
 	tracker_sparql_builder_insert_open (sparql, NULL);
 
 	if (has_geolocation) {
@@ -416,17 +597,15 @@ feed_item_check_exists_cb (GObject      *source_object,
 		tracker_sparql_builder_object (sparql, "_:location");
 	}
 
-	tmp_string = feed_item_get_title (item);
+	tmp_string = feed_item_get_title (fiid->item);
 	if (tmp_string != NULL) {
 		g_message ("  Title:'%s'", tmp_string);
 
 		tracker_sparql_builder_predicate (sparql, "nie:title");
 		tracker_sparql_builder_object_unvalidated (sparql, tmp_string);
-
-		title = g_strdup (tmp_string);
 	}
 
-	tmp_string = feed_item_get_description (item);
+	tmp_string = feed_item_get_description (fiid->item);
 	if (tmp_string != NULL) {
 		tracker_sparql_builder_predicate (sparql, "nie:plainTextContent");
 		tracker_sparql_builder_object_unvalidated (sparql, tmp_string);
@@ -448,15 +627,14 @@ feed_item_check_exists_cb (GObject      *source_object,
 	tracker_sparql_builder_predicate (sparql, "mfo:downloadedTime");
 	tracker_sparql_builder_object_date (sparql, &t);
 
-	t = feed_item_get_publish_time (item);
+	t = feed_item_get_publish_time (fiid->item);
 	tracker_sparql_builder_predicate (sparql, "nie:contentCreated");
 	tracker_sparql_builder_object_date (sparql, &t);
 
 	tracker_sparql_builder_predicate (sparql, "nmo:isRead");
 	tracker_sparql_builder_object_boolean (sparql, FALSE);
 
-	feed = feed_item_get_parent (item);
-	uri = g_object_get_data (G_OBJECT (feed), "subject");
+	uri = g_object_get_data (G_OBJECT (channel), "subject");
 	tracker_sparql_builder_predicate (sparql, "nmo:communicationChannel");
 	tracker_sparql_builder_object_iri (sparql, uri);
 
@@ -465,9 +643,9 @@ feed_item_check_exists_cb (GObject      *source_object,
 	tracker_sparql_connection_update_async (connection,
 	                                        tracker_sparql_builder_get_result (sparql),
 	                                        G_PRIORITY_DEFAULT,
-	                                        NULL,
+	                                        fiid->cancellable,
 	                                        feed_item_insert_cb,
-	                                        title);
+	                                        fiid);
 
 	g_object_unref (cursor);
 	g_object_unref (sparql);
@@ -477,6 +655,7 @@ static void
 feed_item_check_exists (TrackerMinerRSS *miner,
                         FeedItem        *item)
 {
+	FeedItemInsertData *fiid;
 	FeedChannel *feed;
 	gchar *query;
 	gchar *communication_channel;
@@ -486,8 +665,6 @@ feed_item_check_exists (TrackerMinerRSS *miner,
 	feed = feed_item_get_parent (item);
 	communication_channel = g_object_get_data (G_OBJECT (feed), "subject");
 
-	g_debug ("Verifying feed '%s' is stored", url);
-
 	query = g_strdup_printf ("ASK {"
 	                         "  ?message a mfo:FeedMessage ;"
 	                         "             nie:url \"%s\";"
@@ -496,24 +673,25 @@ feed_item_check_exists (TrackerMinerRSS *miner,
 	                         url,
 	                         communication_channel);
 
+	fiid = feed_item_insert_data_new (miner, item);
+
 	tracker_sparql_connection_query_async (tracker_miner_get_connection (TRACKER_MINER (miner)),
 	                                       query,
-	                                       NULL,
+	                                       fiid->cancellable,
 	                                       feed_item_check_exists_cb,
-	                                       item);
+	                                       fiid);
 	g_free (query);
 }
 
 static void
 feed_ready_cb (FeedsPool   *pool,
-               FeedChannel *feed,
+               FeedChannel *channel,
                GList       *items,
                gpointer     user_data)
 {
 	TrackerMinerRSS *miner;
 	TrackerMinerRSSPrivate *priv;
 	GList *iter;
-	FeedItem *item;
 
 	miner = TRACKER_MINER_RSS (user_data);
 	priv = TRACKER_MINER_RSS_GET_PRIVATE (miner);
@@ -531,10 +709,12 @@ feed_ready_cb (FeedsPool   *pool,
 		return;
 	}
 
-	feed_change_updated_interval (miner, feed);
+	g_message ("Verifying channel:'%s' is up to date",
+	           feed_channel_get_title (channel));
 
 	for (iter = items; iter; iter = iter->next) {
-		item = iter->data;
+		FeedItem *item = iter->data;
+
 		feed_item_check_exists (miner, item);
 	}
 }



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]