[tracker/miner-twitter: 1/4] Add Twitter/Identi.ca miner



commit 1cb450f4afb68bbe62ab6284939b180eed6d59e0
Author: Adrien Bustany <abustany gnome org>
Date:   Tue Mar 2 09:39:27 2010 -0300

    Add Twitter/Identi.ca miner

 configure.ac                                       |   33 ++
 src/Makefile.am                                    |    3 +
 src/tracker-miner-twitter/Makefile.am              |   64 +++
 src/tracker-miner-twitter/query-queue.vala         |   58 +++
 .../tracker-miner-twitter.vala                     |  437 ++++++++++++++++++++
 5 files changed, 595 insertions(+), 0 deletions(-)
---
diff --git a/configure.ac b/configure.ac
index 5d32778..b6b0d9d 100644
--- a/configure.ac
+++ b/configure.ac
@@ -158,6 +158,8 @@ LIBSTREAMANALYZER_REQUIRED=0.7.0
 GEE_REQUIRED=0.3
 ID3LIB_REQUIRED=3.8.3
 GNOME_KEYRING_REQUIRED=2.26
+REST_REQUIRED=0.6
+TWITTER_GLIB_REQUIRED=0.9
 
 # Library Checks
 PKG_CHECK_MODULES(GLIB2, [glib-2.0 >= $GLIB_REQUIRED])
@@ -792,6 +794,32 @@ fi
 
 AM_CONDITIONAL(HAVE_GNOME_KEYRING, test "x$have_gnome_keyring" = "xyes")
 
+##################################################################
+# Twitter miner
+##################################################################
+
+AC_ARG_ENABLE(miner_twitter,
+	      AS_HELP_STRING([--enable-miner-twitter],
+			     [enable Twitter miner [[default=auto]]]),,
+	      [enable_miner_twitter=auto])
+
+if test "x$enable_miner_twitter" != "xno"; then
+	PKG_CHECK_MODULES(MINER_TWITTER,
+					  [ rest-0.6 >= $REST_REQUIRED twitter-glib-1.0 >= $TWITTER_GLIB_REQUIRED ],
+					  [have_miner_twitter_deps=yes],
+					  [have_miner_twitter_deps=no])
+	AC_SUBST(MINER_TWITTER_LIBS)
+	AC_SUBST(MINER_TWITTER_CFLAGS)
+fi
+
+if test "x$enable_miner_twitter" = "xyes"; then
+   if test "x$have_miner_twitter_deps" != "xyes"; then
+      AC_MSG_ERROR([Couldn't find the required dependencies for the Twitter miner: rest-0.6 >= $REST_REQUIRED.])
+   fi
+fi
+
+AM_CONDITIONAL(HAVE_MINER_TWITTER, test "x$have_miner_twitter_deps" = "xyes")
+
 ####################################################################
 # Mail miners
 ####################################################################
@@ -1678,6 +1706,7 @@ AC_CONFIG_FILES([
 	src/tracker-control/Makefile
 	src/tracker-extract/Makefile
 	src/tracker-miner-fs/Makefile
+	src/tracker-miner-twitter/Makefile
 	src/tracker-preferences/Makefile
 	src/tracker-preferences/tracker-preferences.desktop.in
 	src/tracker-search-bar/Makefile
@@ -1795,6 +1824,10 @@ Plugins/Extensions:
 	Evolution plugin    (data-push):	$enable_evolution_miner
 	KMail plugin        (data-push):	$enable_kmail_miner
 
+Extra miners:
+
+	Twitter miner				$have_miner_twitter_deps
+
 Writeback:
 
 	MP3 writeback:                          $have_id3lib
diff --git a/src/Makefile.am b/src/Makefile.am
index 2f5ff33..f4ee63d 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -39,3 +39,6 @@ if HAVE_TRACKER_EXPLORER
 SUBDIRS += tracker-explorer
 endif
 
+if HAVE_MINER_TWITTER
+SUBDIRS += tracker-miner-twitter
+endif
diff --git a/src/tracker-miner-twitter/Makefile.am b/src/tracker-miner-twitter/Makefile.am
new file mode 100644
index 0000000..9e26a00
--- /dev/null
+++ b/src/tracker-miner-twitter/Makefile.am
@@ -0,0 +1,64 @@
+include $(top_srcdir)/Makefile.decl
+
+INCLUDES =								\
+	-Wall								\
+	-DSHAREDIR=\""$(datadir)"\"					\
+	-DPKGLIBDIR=\""$(libdir)/tracker"\"				\
+	-DLOCALEDIR=\""$(localedir)"\" 					\
+	-DLIBEXEC_PATH=\""$(libexecdir)"\"				\
+	-DG_LOG_DOMAIN=\"Tracker\"					\
+	-DTRACKER_COMPILATION						\
+	-I$(top_srcdir)/src						\
+	$(WARN_CFLAGS)							\
+	$(GMODULE_CFLAGS)						\
+	$(PANGO_CFLAGS)							\
+	$(DBUS_CFLAGS)							\
+	$(MINER_TWITTER_CFLAGS)						\
+	$(GCOV_CFLAGS)
+
+VALAFLAGS =								\
+	--pkg gio-2.0							\
+	--pkg twitter-glib-1.0						\
+	--pkg posix							\
+	--thread
+
+libexec_PROGRAMS = tracker-miner-twitter
+
+tracker_miner_twitter_VALASOURCES =					\
+	tracker-miner-twitter.vala					\
+	query-queue.vala						\
+	$(top_srcdir)/src/vapi/tracker-client-0.7.vapi
+
+tracker_miner_twitter_SOURCES =						\
+	$(tracker_miner_twitter_VALASOURCES:.vala=.c)
+
+tracker_miner_twitter_LDADD =						\
+	$(top_builddir)/src/libtracker-miner/libtracker-miner- TRACKER_API_VERSION@.la	\
+	$(top_builddir)/src/libtracker-client/libtracker-client- TRACKER_API_VERSION@.la	\
+	$(DBUS_LIBS)							\
+	$(GMODULE_LIBS)							\
+	$(GTHREAD_LIBS)							\
+	$(GIO_LIBS)							\
+	$(GCOV_LIBS)							\
+	$(GLIB2_LIBS)							\
+	$(MINER_TWITTER_LIBS)							\
+	-lz								\
+	-lm
+
+vapi_sources =								\
+	$(top_srcdir)/src/libtracker-miner/tracker-miner- TRACKER_API_VERSION@.vapi
+
+tracker-miner-twitter.vala.stamp: $(tracker_miner_twitter_VALASOURCES) $(vapi_sources)
+	$(AM_V_GEN)$(VALAC) $(GCOV_VALAFLAGS) -C $(VALAFLAGS) $^
+	touch $@
+
+
+BUILT_SOURCES = tracker-miner-twitter.vala.stamp
+
+MAINTAINERCLEANFILES =							\
+	tracker-miner-twitter.vala.stamp				\
+	$(tracker_miner_twitter_VALASOURCES:.vala=.c)
+
+EXTRA_DIST =								\
+	$(tracker_miner_twitter_VALASOURCES)				\
+	tracker-miner-twitter.vala.stamp
diff --git a/src/tracker-miner-twitter/query-queue.vala b/src/tracker-miner-twitter/query-queue.vala
new file mode 100644
index 0000000..ea3ac73
--- /dev/null
+++ b/src/tracker-miner-twitter/query-queue.vala
@@ -0,0 +1,58 @@
+public class QueryQueue : GLib.Object {
+	/* Holds the pending sparql updates and monitors them */
+	private HashTable<uint, string> queue;
+	private uint cookie;
+
+	private Mutex flush_mutex;
+
+	private Tracker.Miner miner;
+
+	public QueryQueue (Tracker.Miner parent) {
+		miner = parent;
+
+		queue = new HashTable<uint, string> (direct_hash, direct_equal);
+		cookie = 0;
+
+		flush_mutex = new Mutex ();
+	}
+
+	public async void append (string query) {
+		uint current_cookie = cookie ++;
+		queue.insert (current_cookie, query);
+
+		message ("SPARQL query: %s", query);
+
+		try {
+			yield miner.execute_batch_update (query);
+		} catch (Error tracker_error) {
+			warning ("BatchUpdate query failed: %s", tracker_error.message);
+		}
+
+		queue.remove (current_cookie);
+	}
+
+	/* BLOCKING flush */
+	public void flush () {
+		if (!flush_mutex.trylock ()) {
+			message ("There's already a flush taking place");
+			return;
+		}
+
+		if (queue.size () > 0) {
+			MainLoop wait_loop;
+			try {
+				wait_loop = new MainLoop (null, false);
+				miner.commit (null, () => { wait_loop.quit (); });
+				wait_loop.run ();
+			} catch (Error tracker_error) {
+				warning ("Commit query failed: %s", tracker_error.message);
+			}
+		}
+
+		flush_mutex.unlock ();
+	}
+
+	public uint size () {
+		return queue.size ();
+	}
+}
diff --git a/src/tracker-miner-twitter/tracker-miner-twitter.vala b/src/tracker-miner-twitter/tracker-miner-twitter.vala
new file mode 100644
index 0000000..0abf85b
--- /dev/null
+++ b/src/tracker-miner-twitter/tracker-miner-twitter.vala
@@ -0,0 +1,437 @@
+/*
+ * Copyright (C) 2010, Adrien Bustany <abustany gnome org>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ * Boston, MA  02110-1301, USA.
+ */
+
+namespace Tracker {
+
+public class MinerTwitter : Tracker.MinerWeb {
+	public Twitter.Provider provider { get; construct; }
+	private string friendly_name; /* Human readable name of the provider */
+	private string channel_urn = null;
+
+	private Twitter.Client service;
+
+	private static const string DATASOURCE_URN = "urn:103e7d6e-2334-4cd2-b0a5-f1b0c8bb10ef";
+	private static const string SERVICE_DESCRIPTION = "Tracker miner for %s";
+
+	private static const uint PULL_INTERVAL = 60; /* seconds */
+	private uint pull_timeout_handle;
+
+	private QueryQueue query_queue;
+
+	/* used to store state information like last pulled tweet*/
+	private static const string STATE_FILE_NAME = "tracker-miner-%s.state";
+	private static const string STATE_FILE_GROUP = "General";
+ 	private KeyFile state_file;
+
+	private uint last_status_timestamp;
+
+	private static MainLoop main_loop;
+
+
+	construct {
+		if (provider == Twitter.Provider.DEFAULT_PROVIDER) {
+			set ("name", "Twitter");
+			friendly_name = "Twitter";
+		} else if (provider == Twitter.Provider.IDENTI_CA) {
+			set ("name", "Identica");
+			friendly_name = "Identi.ca";
+		} else {
+			assert_not_reached ();
+		}
+
+		set ("association-status", MinerWebAssociationStatus.UNASSOCIATED);
+
+		service = new Twitter.Client.full (provider, null, null, null);
+		service.status_received.connect (status_received_cb);
+		service.timeline_complete.connect (timeline_complete_cb);
+
+		state_file = new KeyFile ();
+		load_state_file ();
+
+		pull_timeout_handle = 0;
+		this.notify["association-status"].connect (association_status_changed);
+
+		query_queue = new QueryQueue (this);
+
+		init_feed_channel ();
+	}
+
+	public void shutdown () {
+		set ("association-status", MinerWebAssociationStatus.UNASSOCIATED);
+		save_state_file ();
+	}
+
+	private async void init_feed_channel () {
+		string sparql;
+		unowned PtrArray results;
+
+		sparql = "select ?c where { ?c a mfo:FeedChannel ; mfo:type ?type ."
+		                        + " ?type mfo:name ?typeName . "
+		                        + " FILTER (?typeName = \"%s\") }".printf (friendly_name);
+
+		try {
+			results = yield execute_sparql (sparql);
+
+			if (results.len == 0) {
+				/* No optimal, but we're waiting for blank support in TrackerMiner */
+				sparql = "insert { _:channel a mfo:FeedChannel ; rdfs:label \"%s home timeline\";".printf (friendly_name)
+				               + " mfo:type [ a mfo:FeedType ; mfo:name \"%s\" ] }".printf (friendly_name);
+				yield execute_update (sparql);
+				sparql = "select ?c where { ?c a mfo:FeedChannel ; mfo:type ?type ."
+				                        + " ?type mfo:name ?typeName . "
+				                        + " FILTER (?typeName = \"%s\") }".printf (friendly_name);
+				results = yield execute_sparql (sparql);
+			}
+
+			assert (results.len == 1);
+			channel_urn = ((string[][])results.pdata)[0][0];
+		} catch (Error tracker_error) {
+			critical ("Couldn't initialize feed channel: %s", tracker_error.message);
+		}
+	}
+
+	private void status_received_cb (ulong handle, Twitter.Status? status, Error error) {
+		SparqlBuilder builder;
+		string name;
+		TimeVal tv = TimeVal ();
+
+		get ("name", out name);
+
+		if (error != null) {
+			if (!(error is Twitter.Error.NOT_MODIFIED)) {
+				warning ("An error occurred while pulling statuses: %s", error.message);
+			}
+			return;
+		}
+
+		builder = new SparqlBuilder.update ();
+
+		builder.insert_open (DATASOURCE_URN);
+		builder.subject ("_:author");
+		builder.predicate ("a");
+		builder.object ("nco:Contact");
+		builder.predicate ("nco:fullname");
+		builder.object_string (status.user.name);
+		builder.predicate ("nco:photo");
+		builder.object_blank_open ();
+		builder.predicate ("a");
+		builder.object ("nfo:RemoteDataObject");
+		builder.predicate ("nie:url");
+		builder.object_string (status.user.profile_image_url);
+		builder.object_blank_close (); /* nco:photo */
+
+		builder.subject ("_:message");
+		builder.predicate ("a");
+		builder.object ("mfo:FeedMessage");
+
+		builder.predicate ("a");
+		builder.object ("nfo:RemoteDataObject");
+		builder.predicate ("nie:url");
+		builder.object_string (status.url);
+
+		builder.predicate ("nmo:communicationChannel");
+		builder.object_iri (channel_urn);
+
+		builder.predicate ("nmo:messageId");
+		builder.object_string (rdf_message_id (status.id));
+
+		builder.predicate ("nmo:from");
+		builder.object ("_:author");
+
+		builder.predicate ("nie:plainTextContent");
+		builder.object_string (status.text);
+
+		if (status.reply_to_status != 0) {
+			builder.predicate ("nmo:inReplyTo");
+			builder.object_blank_open ();
+			builder.predicate ("a");
+			builder.object ("mfo:FeedMessage");
+			builder.predicate ("nmo:communicationChannel");
+			builder.object_iri (channel_urn);
+			builder.predicate ("nmo:messageId");
+			builder.object_string (rdf_message_id (status.reply_to_status));
+			builder.object_blank_close ();
+		}
+
+		if (Twitter.date_to_time_val (status.created_at, out tv)) {
+			builder.predicate ("nmo:receivedDate");
+			builder.object_string (tv.to_iso8601 ());
+
+			/* We receive the status in chronological order */
+			last_status_timestamp = (int)tv.tv_sec;
+		}
+
+		tv.get_current_time ();
+		builder.predicate ("mfo:downloadedTime");
+		builder.object_string (tv.to_iso8601 ());
+
+		builder.insert_close ();
+
+		query_queue.append (builder.get_result ());
+	}
+
+	private void timeline_complete_cb () {
+		message ("Timeline downloaded");
+
+		query_queue.flush ();
+		state_file.set_integer (STATE_FILE_GROUP, "since", (int)last_status_timestamp);
+	}
+
+	private string rdf_message_id (uint status_id)
+	{
+		string name;
+
+		get ("name", out name);
+		return "feed:%s:%u".printf (name, status_id);
+	}
+
+	private void association_status_changed (Object source, ParamSpec pspec) {
+		MinerWebAssociationStatus status;
+
+		get ("association-status", out status);
+
+		switch (status) {
+			case MinerWebAssociationStatus.ASSOCIATED:
+				if (pull_timeout_handle != 0)
+					return;
+
+				message ("Miner is now associated. Initiating periodic pull.");
+				Timeout.add_seconds (PULL_INTERVAL, pull_timeout_cb);
+				Idle.add ( () => { pull_timeout_cb (); return false; });
+				break;
+			case MinerWebAssociationStatus.UNASSOCIATED:
+				if (pull_timeout_handle == 0)
+					return;
+
+				Source.remove (pull_timeout_handle);
+				break;
+		}
+	}
+
+	private bool pull_timeout_cb () {
+		int since;
+
+		if (channel_urn == null) {
+			message ("Feed channel not initialized yet, skipping this cycle");
+			return true;
+		}
+
+		message ("Pulling new data");
+		try {
+			since = state_file.get_integer (STATE_FILE_GROUP, "since");
+		} catch (Error error) {
+			critical ("Cannot load config variable: %s", error.message);
+			return true;
+		}
+
+		service.get_friends_timeline ("", since);
+		return true;
+	}
+
+	private void load_state_file () {
+		string name;
+		get ("name", out name);
+
+		try {
+			state_file.load_from_file (Path.build_filename (Environment.get_user_cache_dir (),
+			                                                "tracker",
+			                                                STATE_FILE_NAME.printf (name)),
+			                           KeyFileFlags.NONE);
+		} catch (Error error) {
+			message ("Couldn't load the state file");
+		}
+
+
+		try {
+			state_file.get_integer (STATE_FILE_GROUP, "since");
+		} catch (Error error) {
+			state_file.set_integer (STATE_FILE_GROUP, "since", 0);
+		}
+	}
+
+	private void save_state_file () {
+		string name;
+		string file_path;
+
+		get ("name", out name);
+		file_path = Path.build_filename (Environment.get_user_cache_dir (),
+			                             "tracker",
+		                                 STATE_FILE_NAME.printf (name));
+
+		try {
+			FileUtils.set_contents (file_path, state_file.to_data ());
+		} catch (Error error) {
+			warning ("Couldn't save state file: %s", error.message);
+		}
+	}
+
+	// If we don't protect the function with a mutex, it could be called by the
+	// inner loop, starting at inner-inner one, and so on...
+	private Mutex authenticate_mutex = new Mutex ();
+	public override void authenticate () throws MinerWebError {
+		PasswordProvider password_provider;
+		string name;
+		string username;
+		string password;
+		bool verified = false;
+		Error twitter_error = null;
+
+		if(!authenticate_mutex.trylock ()) {
+			warning ("authenticate called while it was still running");
+			return;
+		}
+
+		password_provider = PasswordProvider.get ();
+		get ("name", out name);
+
+		set ("association-status", MinerWebAssociationStatus.UNASSOCIATED);
+
+		try {
+			password = password_provider.get_password (name, out username);
+		} catch (Error e) {
+			authenticate_mutex.unlock ();
+			if (e is PasswordProviderError.SERVICE) {
+				throw new MinerWebError.KEYRING (e.message);
+			}
+			if (e is PasswordProviderError.NOTFOUND) {
+				throw new MinerWebError.NO_CREDENTIALS ("Miner is not associated");
+			}
+
+			critical ("Internal error: %s", e.message);
+			return;
+		}
+
+		message ("Verifying username and password");
+		service.set_user (username, password);
+		service.verify_user ();
+
+		var wait_loop = new MainLoop (null, false);
+		service.user_verified.connect ( (h, v, e) => {
+				verified = v;
+				twitter_error = e;
+				wait_loop.quit (); });
+		wait_loop.run ();
+		authenticate_mutex.unlock ();
+
+		if (twitter_error != null) {
+			throw new MinerWebError.SERVICE (twitter_error.message);
+		}
+
+		if (!verified) {
+			throw new MinerWebError.WRONG_CREDENTIALS ("Wrong username and/or password");
+		} else {
+			message ("Authentication sucessful");
+			set ("association-status", MinerWebAssociationStatus.ASSOCIATED);
+		}
+
+		return;
+	}
+
+	public override void dissociate () throws MinerWebError {
+		var password_provider = PasswordProvider.get ();
+		string name;
+		get ("name", out name);
+
+		try {
+			password_provider.forget_password (name);
+		} catch (Error e) {
+			if (e is PasswordProviderError.SERVICE) {
+				throw new MinerWebError.KEYRING (e.message);
+			}
+
+			critical ("Internal error: %s", e.message);
+			return;
+		}
+
+		set ("association-status", MinerWebAssociationStatus.UNASSOCIATED);
+	}
+
+	public override void associate (HashTable<string, string> association_data) throws Tracker.MinerWebError {
+		assert (association_data.lookup ("username") != null && association_data.lookup ("password") != null);
+
+		var password_provider = PasswordProvider.get ();
+		string name;
+		get ("name", out name);
+
+		try {
+			password_provider.store_password (name,
+			                                  SERVICE_DESCRIPTION.printf (name),
+			                                  association_data.lookup ("username"),
+			                                  association_data.lookup ("password"));
+		} catch (Error e) {
+			if (e is PasswordProviderError.SERVICE) {
+				throw new MinerWebError.KEYRING (e.message);
+			}
+
+			critical ("Internal error: %s", e.message);
+			return;
+		}
+	}
+
+	private static bool in_loop = false;
+	private static void signal_handler (int signo) {
+		if (in_loop) {
+			Posix.exit (Posix.EXIT_FAILURE);
+		}
+
+		switch (signo) {
+			case Posix.SIGINT:
+			case Posix.SIGTERM:
+				in_loop = true;
+				main_loop.quit ();
+				break;
+		}
+	}
+
+	private static void init_signals () {
+#if G_OS_WIN32
+#else
+		Posix.sigaction_t act = Posix.sigaction_t ();
+		Posix.sigset_t    empty_mask = Posix.sigset_t ();
+		Posix.sigemptyset (empty_mask);
+		act.sa_handler = signal_handler;
+		act.sa_mask    = empty_mask;
+		act.sa_flags   = 0;
+
+		Posix.sigaction (Posix.SIGTERM, act, null);
+		Posix.sigaction (Posix.SIGINT, act, null);
+#endif
+	}
+
+	public static void main (string[] args) {
+		Environment.set_application_name ("Twitter/Identi.ca tracker miner");
+		MinerTwitter twitter_miner;
+		twitter_miner = Object.new (typeof (MinerTwitter),
+		                                    "provider", Twitter.Provider.DEFAULT_PROVIDER) as MinerTwitter;
+
+		MinerTwitter identica_miner;
+		identica_miner = Object.new (typeof (MinerTwitter),
+		                                     "provider", Twitter.Provider.IDENTI_CA) as MinerTwitter;
+
+		init_signals ();
+
+		main_loop = new MainLoop (null, false);
+		main_loop.run ();
+
+		twitter_miner.shutdown ();
+		identica_miner.shutdown ();
+	}
+}
+
+} // End namespace Tracker



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]