[libgrss] First implementation of FeedsPublisher (subscription, content delivery yet TBD) Correction in FeedsS



commit 736fee805494b45cca5a770eea1214df293ee665
Author: Roberto Guido <bob4mail gmail com>
Date:   Fri Jun 25 01:40:39 2010 +0200

    First implementation of FeedsPublisher (subscription, content delivery yet TBD)
    Correction in FeedsSubscriber subscription refresh procedure

 NEWS                               |    1 +
 TODO                               |   10 -
 configure.ac                       |    2 +-
 doc/reference/libgrss-docs.sgml    |    1 +
 doc/reference/libgrss-sections.txt |   11 +
 src/Makefile.am                    |    2 +
 src/feed-channel.c                 |    8 +-
 src/feed-marshal.list              |    1 +
 src/feeds-publisher.c              |  651 ++++++++++++++++++++++++++++++++++++
 src/feeds-publisher.h              |   57 ++++
 src/feeds-subscriber.c             |  119 +------
 src/libgrss.h                      |    1 +
 src/ns-handler.c                   |    2 +-
 src/utils.c                        |   60 ++++
 src/utils.h                        |    2 +
 15 files changed, 812 insertions(+), 116 deletions(-)
---
diff --git a/NEWS b/NEWS
index 40d0ddb..49bea78 100644
--- a/NEWS
+++ b/NEWS
@@ -2,6 +2,7 @@ libgrss 0.5 (UNRELEASED)
 ==============================================================================
 - Added XOXO files parser
 - Added XBEL files parser
+- First test implementation of PubSubHub hub
 - Added functions feed_channel_fetch_async() and feed_channel_new_from_file()
 
 libgrss 0.4
diff --git a/TODO b/TODO
index 40cec78..e69de29 100644
--- a/TODO
+++ b/TODO
@@ -1,10 +0,0 @@
-- dynamic loading for parsing modules
-
-- add RSSCloud support (http://rsscloud.org)
-
-- namespace OpenSearch (http://a9.com/-/spec/opensearchrss/1.0/)
-
-- add implementation of PubSubHubBub publisher
-- add a FeedsPool mode in which filter already parsed and exposed items for each cycle
-- use libedataserver/EAccount to describe people found in feeds
-- use libchamplain/ChamplainPoint to describe GeoRSS coordinates
diff --git a/configure.ac b/configure.ac
index c39e293..dfd4e61 100644
--- a/configure.ac
+++ b/configure.ac
@@ -1,5 +1,5 @@
 m4_define([libgrss_major_version], [0])
-m4_define([libgrss_minor_version], [4])
+m4_define([libgrss_minor_version], [5])
 m4_define([libgrss_micro_version], [0])
 
 m4_define([libgrss_version],
diff --git a/doc/reference/libgrss-docs.sgml b/doc/reference/libgrss-docs.sgml
index c9e7c2f..b8c22f3 100644
--- a/doc/reference/libgrss-docs.sgml
+++ b/doc/reference/libgrss-docs.sgml
@@ -39,6 +39,7 @@
     <xi:include href="xml/feeds-pool.xml"/>
     <xi:include href="xml/feeds-group.xml"/>
     <xi:include href="xml/feeds-subscriber.xml"/>
+    <xi:include href="xml/feeds-publisher.xml"/>
     <xi:include href="xml/feeds-store.xml"/>
   </part>
 
diff --git a/doc/reference/libgrss-sections.txt b/doc/reference/libgrss-sections.txt
index c0a4bb2..5959410 100644
--- a/doc/reference/libgrss-sections.txt
+++ b/doc/reference/libgrss-sections.txt
@@ -45,6 +45,17 @@ feeds_subscriber_switch
 </SECTION>
 
 <SECTION>
+<FILE>feeds-publisher</FILE>
+<TITLE>FeedsPublisher</TITLE>
+FeedsPublisher
+feeds_publisher_new
+feeds_publisher_set_port
+feeds_publisher_set_topics
+feeds_publisher_switch
+feeds_publisher_publish
+</SECTION>
+
+<SECTION>
 <FILE>feed-item</FILE>
 <TITLE>FeedItem</TITLE>
 FeedItem
diff --git a/src/Makefile.am b/src/Makefile.am
index 9495e76..ff2afe2 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -30,6 +30,7 @@ sources_public_h = \
 	feed-parser.h              \
 	feeds-group.h              \
 	feeds-pool.h               \
+	feeds-publisher.h          \
 	feeds-store.h              \
 	feeds-subscriber.h
 
@@ -47,6 +48,7 @@ sources_c = \
 	feeds-group-handler.c      \
 	feeds-opml-group-handler.c \
 	feeds-pool.c               \
+	feeds-publisher.c          \
 	feeds-store.c              \
 	feeds-subscriber.c         \
 	feeds-xbel-group-handler.c \
diff --git a/src/feed-channel.c b/src/feed-channel.c
index ae49c2d..a6b3b72 100644
--- a/src/feed-channel.c
+++ b/src/feed-channel.c
@@ -410,7 +410,7 @@ feed_channel_get_category (FeedChannel *channel)
  * @hub: hub for the feed, or NULL
  * @self: target referencing the feed, or NULL
  *
- * To set information about PubSubHub for the channel. Options can be set
+ * To set information about PubSubHubbub for the channel. Options can be set
  * alternatively, only with hub != %NULL or self != %NULL, and are saved
  * internally to the object: the hub is considered valid
  * (feed_channel_get_pubsubhub() returns %TRUE) only when both parameters has
@@ -441,10 +441,10 @@ feed_channel_set_pubsubhub (FeedChannel *channel, gchar *hub, gchar *self)
  * @hub: location for the hub string, or NULL
  * @self: location for the reference to the feed, or NULL
  *
- * Retrieves information about the PubSub hub of the channel
+ * Retrieves information about the PubSubHubbub hub of the channel
  *
- * Return value: %TRUE if a valid PubSub hub has been set for the @channel,
- * %FALSE otherwise
+ * Return value: %TRUE if a valid PubSubHubbub hub has been set for the
+ * @channel, %FALSE otherwise
  */
 gboolean
 feed_channel_get_pubsubhub (FeedChannel *channel, gchar **hub, gchar **self)
diff --git a/src/feed-marshal.list b/src/feed-marshal.list
index 8bf98f6..73366cb 100644
--- a/src/feed-marshal.list
+++ b/src/feed-marshal.list
@@ -1,2 +1,3 @@
 VOID:OBJECT,POINTER
+VOID:OBJECT,STRING
 VOID:OBJECT,OBJECT
diff --git a/src/feeds-publisher.c b/src/feeds-publisher.c
new file mode 100644
index 0000000..618f899
--- /dev/null
+++ b/src/feeds-publisher.c
@@ -0,0 +1,651 @@
+/*
+ * Copyright (C) 2010, Roberto Guido <rguido src gnome org>
+ *                     Michele Tameni <michele amdplanet it>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ * Boston, MA  02110-1301, USA.
+ */
+
+#include "feeds-publisher.h"
+#include "utils.h"
+#include "feed-marshal.h"
+
+#define VERIFICATION_INTERVAL_MINUTES	30
+#define DEFAULT_LEASE_INTERVAL		(60 * 60)
+#define DEFAULT_SERVER_PORT   		80
+#define DEFAULT_REFRESH_CHECK_INTERVAL	60
+
+#define FEEDS_PUBLISHER_GET_PRIVATE(obj)	(G_TYPE_INSTANCE_GET_PRIVATE ((obj), FEEDS_PUBLISHER_TYPE, FeedsPublisherPrivate))
+
+/**
+ * SECTION: feeds-publisher
+ * @short_description: PubSubHubbub publisher and hub
+ *
+ * #FeedsPublisher implements a server able to receive subscriptions by
+ * PubSubHubbub clients and deliver them new contents.
+ * For more information look at http://code.google.com/p/pubsubhubbub/
+ */
+
+static void	subscribe_verify_cb	(SoupSession *session, SoupMessage *msg, gpointer user_data);
+
+struct _FeedsPublisherPrivate {
+	gboolean		running;
+
+	int			port;
+	SoupServer		*server;
+	GInetAddress		*local_addr;
+
+	SoupSession		*soupsession;
+
+	time_t			current_time;
+	GHashTable		*topics;
+	guint			refresh_handler;
+};
+
+typedef struct {
+	FeedChannel		*channel;
+	GList			*subscribers;
+} ValidTopic;
+
+typedef enum {
+	REMOTE_SUBSCRIBING,
+	REMOTE_UNSUBSCRIBING,
+} SUBSCRIBER_STATUS;
+
+typedef struct {
+	FeedsPublisher		*parent;
+	SUBSCRIBER_STATUS	status;
+	gchar			*topic;
+	gchar			*callback;
+	gchar			*challenge;
+	gint64			lease_interval;
+	time_t			first_contact_time;
+	time_t			registration_time;
+	SoupMessage		*registration_msg;
+} RemoteSubscriber;
+
+enum {
+	SUBSCRIPTION_ADDED,
+	SUBSCRIPTION_DELETED,
+	LAST_SIGNAL
+};
+
+static guint signals [LAST_SIGNAL] = {0};
+
+G_DEFINE_TYPE (FeedsPublisher, feeds_publisher, G_TYPE_OBJECT);
+
+static void
+destroy_remote_subscriber (RemoteSubscriber *client)
+{
+	FREE_STRING (client->topic);
+	FREE_STRING (client->callback);
+	FREE_STRING (client->challenge);
+	FREE_OBJECT (client->registration_msg);
+	g_free (client);
+}
+
+static void
+destroy_topic (gpointer user_data)
+{
+	GList *iter;
+	ValidTopic *topic;
+
+	topic = user_data;
+	g_object_unref (topic->channel);
+
+	if (topic->subscribers != NULL) {
+		for (iter = topic->subscribers; iter; iter = iter->next)
+			destroy_remote_subscriber (iter->data);
+
+		g_list_free (topic->subscribers);
+	}
+
+	g_free (topic);
+}
+
+static gboolean
+topic_remove_helper (gpointer key, gpointer value, gpointer user_data)
+{
+	return TRUE;
+}
+
+static void
+remove_current_topics (FeedsPublisher *pub)
+{
+	g_hash_table_foreach_remove (pub->priv->topics, topic_remove_helper, NULL);
+}
+
+static void
+feeds_publisher_finalize (GObject *obj)
+{
+	FeedsPublisher *pub;
+
+	pub = FEEDS_PUBLISHER (obj);
+	feeds_publisher_switch (pub, FALSE);
+
+	remove_current_topics (pub);
+	g_hash_table_destroy (pub->priv->topics);
+}
+
+static void
+feeds_publisher_class_init (FeedsPublisherClass *klass)
+{
+	GObjectClass *gobject_class;
+
+	g_type_class_add_private (klass, sizeof (FeedsPublisherPrivate));
+
+	gobject_class = G_OBJECT_CLASS (klass);
+	gobject_class->finalize = feeds_publisher_finalize;
+
+	/**
+	 * FeedsSubscriber::new_subscription:
+	 * @pub: the #FeedsPublisher emitting the signal
+	 * @topic: #FeedChannel for which subscription has been added
+	 * @callback: callback required for new subscriber
+	 *
+	 * Emitted when a new remote client subscribes to this publisher
+	 */
+	signals [SUBSCRIPTION_ADDED] = g_signal_new ("new-subscription", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, 0,
+	                                             NULL, NULL, feed_marshal_VOID__OBJECT_STRING,
+	                                             G_TYPE_NONE, 2, FEED_CHANNEL_TYPE, G_TYPE_STRING);
+
+	/**
+	 * FeedsSubscriber::new_subscription:
+	 * @pub: the #FeedsPublisher emitting the signal
+	 * @topic: #FeedChannel for which subscription has been removed
+	 * @callback: callback revoked by the subscriber
+	 *
+	 * Emitted when a new remote client unsubscribes to this publisher
+	 */
+	signals [SUBSCRIPTION_DELETED] = g_signal_new ("delete-subscription", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, 0,
+	                                               NULL, NULL, feed_marshal_VOID__OBJECT_STRING,
+	                                               G_TYPE_NONE, 2, FEED_CHANNEL_TYPE, G_TYPE_STRING);
+}
+
+static void
+feeds_publisher_init (FeedsPublisher *node)
+{
+	node->priv = FEEDS_PUBLISHER_GET_PRIVATE (node);
+	memset (node->priv, 0, sizeof (FeedsPublisherPrivate));
+	node->priv->port = DEFAULT_SERVER_PORT;
+	node->priv->topics = g_hash_table_new_full (g_str_hash, g_str_equal, NULL, destroy_topic);
+}
+
+/**
+ * feeds_publisher_new:
+ *
+ * Allocates a new #FeedsPublisher
+ *
+ * Return value: a new #FeedsPublisher
+ */
+FeedsPublisher*
+feeds_publisher_new ()
+{
+	return g_object_new (FEEDS_PUBLISHER_TYPE, NULL);
+}
+
+static void
+add_client_to_topic (FeedsPublisher *pub, RemoteSubscriber *client)
+{
+	ValidTopic *topic;
+
+	topic = g_hash_table_lookup (pub->priv->topics, client->topic);
+	if (topic != NULL) {
+		topic->subscribers = g_list_prepend (topic->subscribers, client);
+		client->registration_time = time (NULL);
+
+		if (client->registration_msg != NULL) {
+			g_object_unref (client->registration_msg);
+			client->registration_msg = NULL;
+		}
+
+		g_signal_emit (pub, signals [SUBSCRIPTION_ADDED], 0, topic->channel, client->callback);
+	}
+}
+
+static void
+remove_client_to_topic (FeedsPublisher *pub, RemoteSubscriber *client)
+{
+	GList *iter;
+	ValidTopic *topic;
+
+	topic = g_hash_table_lookup (pub->priv->topics, client->topic);
+	if (topic != NULL) {
+		for (iter = topic->subscribers; iter; iter = iter->next) {
+			if (iter->data == client) {
+				topic->subscribers = g_list_delete_link (topic->subscribers, iter);
+				g_signal_emit (pub, signals [SUBSCRIPTION_DELETED], 0, topic->channel, client->callback);
+				destroy_remote_subscriber (client);
+			}
+		}
+	}
+}
+
+static gchar*
+random_string ()
+{
+	register int i;
+	gchar str [50];
+	gchar *chars = "qwertyuiopasdfghjklzxcvbnm1234567890QWERTYUIOPASDFGHJKLZXCVBNM";
+
+	srand (time (NULL));
+
+	for (i = 0; i < 49; i++)
+		str [i] = chars [rand () % 62];
+	str [i] = '\0';
+
+	return g_strdup (str);
+}
+
+static SoupMessage*
+verification_message_for_client (RemoteSubscriber *client)
+{
+	gchar *mode;
+	gchar *body;
+	SoupMessage *ret;
+
+	FREE_STRING (client->challenge);
+	client->challenge = random_string ();
+
+	switch (client->status) {
+		case REMOTE_SUBSCRIBING:
+			mode = "subscribe";
+			break;
+
+		case REMOTE_UNSUBSCRIBING:
+			mode = "unsubscribe";
+			break;
+
+		default:
+			return NULL;
+			break;
+	}
+
+	body = g_strdup_printf ("%s?hub.mode=%s&hub.topic=%s&hub.challenge=%s&hub.lease_seconds=%ld",
+				client->callback, mode, client->topic, client->challenge, client->lease_interval);
+
+	ret = soup_message_new ("GET", body);
+	g_free (body);
+
+	return ret;
+}
+
+static gboolean
+retry_subscriber_verification (gpointer user_data)
+{
+	SoupMessage *msg;
+	RemoteSubscriber *client;
+
+	client = user_data;
+	msg = verification_message_for_client (client);
+	soup_session_queue_message (client->parent->priv->soupsession, msg, subscribe_verify_cb, client);
+	return FALSE;
+}
+
+static void
+verification_failed (RemoteSubscriber *client)
+{
+	if (client->registration_msg != NULL) {
+		soup_message_set_status (client->registration_msg, 404);
+		soup_server_unpause_message (client->parent->priv->server, client->registration_msg);
+		destroy_remote_subscriber (client);
+	}
+	else {
+		/*
+			If the verification fails for 6 hours, the procedure is dropped.
+			Otherwise it is re-scheduled each VERIFICATION_INTERVAL_MINUTES
+			minutes
+			Cfr. PubSubHubbub Core 0.3 Section 6.2.1
+		*/
+		if (time (NULL) - client->first_contact_time > (60 * 60 * 6)) {
+			if (client->status == REMOTE_SUBSCRIBING)
+				destroy_remote_subscriber (client);
+		}
+		else {
+			g_timeout_add_seconds (60 * VERIFICATION_INTERVAL_MINUTES, retry_subscriber_verification, client);
+		}
+	}
+}
+
+static void
+subscribe_verify_cb (SoupSession *session, SoupMessage *msg, gpointer user_data)
+{
+	guint status;
+	RemoteSubscriber *client;
+
+	client = user_data;
+
+	g_object_get (msg, "status-code", &status, NULL);
+
+	if (status < 200 || status > 299) {
+		verification_failed (client);
+	}
+	else {
+		if (msg->response_body->data == NULL || strcmp (msg->response_body->data, client->challenge) != 0) {
+			verification_failed (client);
+		}
+		else {
+			if (client->registration_msg != NULL) {
+				soup_message_set_status (client->registration_msg, 204);
+				soup_server_unpause_message (client->parent->priv->server, client->registration_msg);
+			}
+
+			if (client->status == REMOTE_SUBSCRIBING)
+				add_client_to_topic (client->parent, client);
+			else
+				remove_client_to_topic (client->parent, client);
+		}
+	}
+}
+
+static RemoteSubscriber*
+search_subscriber_by_topic_and_callback (FeedsPublisher *pub, gchar *topic, gchar *callback)
+{
+	GList *iter;
+	ValidTopic *topic_test;
+	RemoteSubscriber *client_test;
+	RemoteSubscriber *ret;
+
+	ret = NULL;
+
+	topic_test = g_hash_table_lookup (pub->priv->topics, topic);
+	if (topic_test != NULL) {
+		for (iter = topic_test->subscribers; iter; iter = iter->next) {
+			client_test = iter->data;
+			if (strcmp (callback, client_test->callback) == 0) {
+				ret = iter->data;
+				break;
+			}
+		}
+	}
+
+	return ret;
+}
+
+static void
+handle_incoming_requests_cb (SoupServer *server, SoupMessage *msg, const char *path,
+                             GHashTable *query, SoupClientContext *context, gpointer user_data)
+{
+	int i;
+	int len;
+	gchar **contents;
+	gchar *mode;
+	gchar *topic;
+	gchar *lease;
+	gchar *callback;
+	gchar *verify;
+	SoupMessage *verify_msg;
+	FeedsPublisher *pub;
+	RemoteSubscriber *client;
+
+	pub = user_data;
+
+	contents = g_strsplit (msg->request_body->data, "hub.", -1);
+	if (contents == NULL) {
+		soup_message_set_status (msg, 404);
+		return;
+	}
+
+	client = NULL;
+	mode = NULL;
+	topic = NULL;
+	lease = NULL;
+	callback = NULL;
+	verify = NULL;
+
+	for (i = 0; contents [i] != NULL; i++) {
+		len = strlen (contents [i]) - 1;
+		if (contents [i] [len] == '&')
+			contents [i] [len] = '\0';
+
+		if (strncmp (contents [i], "mode", 4) == 0)
+			mode = contents [i] + 5;
+		else if (strncmp (contents [i], "topic", 5) == 0)
+			topic = contents [i] + 6;
+		else if (strncmp (contents [i], "lease_seconds", 13) == 0)
+			lease = contents [i] + 14;
+		else if (strncmp (contents [i], "callback", 8) == 0)
+			callback = contents [i] + 9;
+		else if (strncmp (contents [i], "verify", 6) == 0)
+			verify = contents [i] + 7;
+	}
+
+	if (mode == NULL) {
+		soup_message_set_status (msg, 404);
+	}
+	else {
+		if (strcmp (mode, "subscribe") == 0) {
+			/*
+				TODO	What to do if an invalid topic is required?
+			*/
+			client = g_new0 (RemoteSubscriber, 1);
+			client->parent = pub;
+			client->first_contact_time = time (NULL);
+			client->topic = g_strdup (topic);
+			client->callback = g_strdup (callback);
+
+			if (lease != NULL)
+				client->lease_interval = strtoll (lease, NULL, 10);
+			else
+				client->lease_interval = DEFAULT_LEASE_INTERVAL;
+
+			client->status = REMOTE_SUBSCRIBING;
+		}
+		else if (strcmp (mode, "unsubscribe") == 0) {
+			client = search_subscriber_by_topic_and_callback (pub, topic, callback);
+			if (client != NULL)
+				client->status = REMOTE_UNSUBSCRIBING;
+		}
+
+		if (client != NULL) {
+			verify_msg = verification_message_for_client (client);
+
+			if (strcmp (verify, "sync") == 0) {
+				g_object_ref (msg);
+				client->registration_msg = msg;
+				soup_server_pause_message (server, msg);
+				soup_session_queue_message (pub->priv->soupsession, verify_msg, subscribe_verify_cb, client);
+			}
+			else {
+				soup_session_queue_message (pub->priv->soupsession, verify_msg, subscribe_verify_cb, client);
+				soup_message_set_status (msg, 202);
+			}
+		}
+	}
+
+	g_strfreev (contents);
+}
+
+static void
+close_server (FeedsPublisher *pub)
+{
+	if (pub->priv->server != NULL) {
+		soup_server_remove_handler (pub->priv->server, NULL);
+		soup_server_quit (pub->priv->server);
+		g_object_unref (pub->priv->server);
+		pub->priv->server = NULL;
+	}
+}
+
+static GInetAddress*
+my_detect_internet_address (FeedsPublisher *pub)
+{
+	if (pub->priv->local_addr == NULL)
+		pub->priv->local_addr = detect_internet_address ();
+
+	return pub->priv->local_addr;
+}
+
+static void
+create_and_run_server (FeedsPublisher *pub)
+{
+	gchar *ip;
+	struct sockaddr_in low_addr;
+	SoupAddress *soup_addr;
+	GInetAddress *my_addr;
+
+	my_addr = my_detect_internet_address (pub);
+	if (my_addr == NULL)
+		return;
+
+	if (pub->priv->soupsession == NULL)
+		pub->priv->soupsession = soup_session_async_new ();
+
+	low_addr.sin_family = AF_INET;
+	low_addr.sin_port = htons (pub->priv->port);
+	ip = g_inet_address_to_string (my_addr);
+	inet_pton (AF_INET, ip, &low_addr.sin_addr);
+	g_free (ip);
+
+	soup_addr = soup_address_new_from_sockaddr ((struct sockaddr*) &low_addr, sizeof (low_addr));
+	pub->priv->server = soup_server_new ("port", pub->priv->port, "interface", soup_addr, NULL);
+	g_object_unref (soup_addr);
+
+	soup_server_add_handler (pub->priv->server, NULL, handle_incoming_requests_cb, pub, NULL);
+	soup_server_run_async (pub->priv->server);
+}
+
+/**
+ * feeds_publisher_set_port:
+ * @pub: a #FeedsPublisher
+ * @port: new listening port for the server
+ *
+ * To customize the port opened by the local server to catch incoming
+ * subscriptions. By default this is 80. Changing the port while the
+ * subscriber is running imply restart the local server
+ */
+void
+feeds_publisher_set_port (FeedsPublisher *pub, int port)
+{
+	if (port != pub->priv->port) {
+		pub->priv->port = port;
+
+		if (pub->priv->running == TRUE) {
+			feeds_publisher_switch (pub, FALSE);
+			feeds_publisher_switch (pub, TRUE);
+		}
+	}
+}
+
+/**
+ * feeds_publisher_set_topics:
+ * @pub: a #FeedsPublisher
+ * @topics: a list of #FeedChannels
+ *
+ * To define a list of valid "topics" for which the #FeedsPublisher will
+ * deliver contents. Sources of those channels, as retrieved by
+ * feed_channel_get_source(), would be found as hub.topic parameter in
+ * registration requests from remote subscribers
+ */
+void
+feeds_publisher_set_topics (FeedsPublisher *pub, GList *topics)
+{
+	GList *iter;
+	ValidTopic *topic;
+
+	remove_current_topics (pub);
+
+	for (iter = topics; iter; iter = iter->next) {
+		topic = g_new0 (ValidTopic, 1);
+		topic->channel = g_object_ref (iter->data);
+		g_hash_table_insert (pub->priv->topics, (gpointer) feed_channel_get_source (iter->data), topic);
+	}
+}
+
+static void
+refresh_subscribers_in_topic (gpointer key, gpointer value, gpointer user_data)
+{
+	GList *iter;
+	SoupMessage *verify_msg;
+	FeedsPublisher *pub;
+	ValidTopic *topic;
+	RemoteSubscriber *client;
+
+	pub = user_data;
+	topic = value;
+
+	for (iter = topic->subscribers; iter; iter = iter->next) {
+		client = iter->data;
+
+		if (client->registration_time + client->lease_interval + DEFAULT_REFRESH_CHECK_INTERVAL >= pub->priv->current_time) {
+			verify_msg = verification_message_for_client (client);
+			soup_session_queue_message (pub->priv->soupsession, verify_msg, subscribe_verify_cb, client);
+		}
+	}
+}
+
+static gboolean
+refresh_subscribers (gpointer user_data)
+{
+	FeedsPublisher *pub;
+
+	pub = user_data;
+	pub->priv->current_time = time (NULL);
+	g_hash_table_foreach (pub->priv->topics, refresh_subscribers_in_topic, pub);
+
+	return TRUE;
+}
+
+static void
+install_refresh_handler (FeedsPublisher *pub)
+{
+	pub->priv->refresh_handler = g_timeout_add_seconds (DEFAULT_REFRESH_CHECK_INTERVAL, refresh_subscribers, pub);
+}
+
+static void
+remove_refresh_handler (FeedsPublisher *pub)
+{
+	g_source_remove (pub->priv->refresh_handler);
+}
+
+/**
+ * feeds_publisher_switch:
+ * @pub: a #FeedsPublisher
+ * @run: TRUE to run the subscriber, FALSE to pause it
+ *
+ * Permits to pause or resume @pub listening for events
+ */
+void
+feeds_publisher_switch (FeedsPublisher *pub, gboolean run)
+{
+	if (pub->priv->running != run) {
+		pub->priv->running = run;
+
+		if (run == TRUE) {
+			create_and_run_server (pub);
+			install_refresh_handler (pub);
+		}
+		else {
+			remove_refresh_handler (pub);
+			close_server (pub);
+		}
+	}
+}
+
+/**
+ * feeds_publisher_publish:
+ * @pub: a #FeedsPublisher
+ * @item: item to publish
+ *
+ * Deliver the specified @item to all subscribers registered for the parent's
+ * channel (defined as "topic")
+ */
+void
+feeds_publisher_publish (FeedsPublisher *pub, FeedItem *item)
+{
+	/*
+		TODO
+	*/
+}
diff --git a/src/feeds-publisher.h b/src/feeds-publisher.h
new file mode 100644
index 0000000..a21134e
--- /dev/null
+++ b/src/feeds-publisher.h
@@ -0,0 +1,57 @@
+/*
+ * Copyright (C) 2010, Roberto Guido <rguido src gnome org>
+ *                     Michele Tameni <michele amdplanet it>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ * Boston, MA  02110-1301, USA.
+ */
+
+#ifndef __FEEDS_PUBLISHER_H__
+#define __FEEDS_PUBLISHER_H__
+
+#include "libgrss.h"
+
+#define FEEDS_PUBLISHER_TYPE		(feeds_publisher_get_type())
+#define FEEDS_PUBLISHER(o)		(G_TYPE_CHECK_INSTANCE_CAST ((o), FEEDS_PUBLISHER_TYPE, FeedsPublisher))
+#define FEEDS_PUBLISHER_CLASS(c)	(G_TYPE_CHECK_CLASS_CAST ((c), FEEDS_PUBLISHER_TYPE, FeedsPublisherClass))
+#define IS_FEEDS_PUBLISHER(o)		(G_TYPE_CHECK_INSTANCE_TYPE ((o), FEEDS_PUBLISHER_TYPE))
+#define IS_FEEDS_PUBLISHER_CLASS(c)	(G_TYPE_CHECK_CLASS_TYPE ((c),  FEEDS_PUBLISHER_TYPE))
+#define FEEDS_PUBLISHER_GET_CLASS(o)	(G_TYPE_INSTANCE_GET_CLASS ((o), FEEDS_PUBLISHER_TYPE, FeedsPublisherClass))
+
+typedef struct _FeedsPublisher		FeedsPublisher;
+typedef struct _FeedsPublisherPrivate	FeedsPublisherPrivate;
+
+struct _FeedsPublisher {
+	GObject parent;
+	FeedsPublisherPrivate *priv;
+};
+
+typedef struct {
+	GObjectClass parent;
+
+	void (*new_subscription) (FeedsPublisher *pub, FeedChannel *topic, gchar *callback);
+	void (*delete_subscription) (FeedsPublisher *pub, FeedChannel *topic, gchar *callback);
+} FeedsPublisherClass;
+
+GType			feeds_publisher_get_type	() G_GNUC_CONST;
+
+FeedsPublisher*		feeds_publisher_new		();
+
+void			feeds_publisher_set_port	(FeedsPublisher *pub, int port);
+void			feeds_publisher_set_topics	(FeedsPublisher *pub, GList *topics);
+void			feeds_publisher_switch		(FeedsPublisher *pub, gboolean run);
+void			feeds_publisher_publish		(FeedsPublisher *pub, FeedItem *item);
+
+#endif /* __FEEDS_PUBLISHER_H__ */
diff --git a/src/feeds-subscriber.c b/src/feeds-subscriber.c
index 419c198..a9547fe 100644
--- a/src/feeds-subscriber.c
+++ b/src/feeds-subscriber.c
@@ -29,10 +29,10 @@
 
 /**
  * SECTION: feeds-subscriber
- * @short_description: PubSubHub subscriber
+ * @short_description: PubSubHubbub subscriber
  *
  * #FeedsSubscriber is an alternative for #FeedsPool, able to receive
- * real-time notifications by feeds managed by a PubSubHub hub.
+ * real-time notifications by feeds managed by a PubSubHubbub hub.
  * When the subscriber is executed (with feeds_subscriber_switch()) it opens
  * a server on a local port (configurable with feeds_subscriber_set_port()),
  * engage a subscription for each #FeedChannel passed with
@@ -41,6 +41,14 @@
  * For more information look at http://code.google.com/p/pubsubhubbub/
  */
 
+/*
+	TODO	There were an error in refreshing subscriptions, since is the
+		hub which must send refresh requests, not subscriber.
+		That has been removed, but it would be better to provide a
+		control mechanism able to refresh subscriptions from the
+		client side in case of problems by the server side
+*/
+
 typedef enum {
 	SUBSCRIBER_IS_IDLE,
 	SUBSCRIBER_TRYING_LOCAL_IP,
@@ -85,8 +93,6 @@ typedef struct {
 	int				identifier;
 	gchar				*path;
 
-	gint64				lease_time;
-
 	FeedsSubscriber			*sub;
 } FeedChannelWrap;
 
@@ -201,7 +207,7 @@ create_listened (FeedsSubscriber *sub, GList *feeds)
 		feed = (FeedChannel*) iter->data;
 
 		if (feed_channel_get_pubsubhub (feed, NULL, NULL) == FALSE) {
-			g_warning ("Feed at %s has not PubSubHub capability", feed_channel_get_source (feed));
+			g_warning ("Feed at %s has not PubSubHubbub capability", feed_channel_get_source (feed));
 			return FALSE;
 		}
 	}
@@ -215,7 +221,6 @@ create_listened (FeedsSubscriber *sub, GList *feeds)
 		g_object_ref (feed);
 		wrap->status = FEED_SUBSCRIPTION_IDLE;
 		wrap->path = NULL;
-		wrap->lease_time = 60;
 		wrap->channel = feed;
 		wrap->sub = sub;
 		list = g_list_prepend (list, wrap);
@@ -298,9 +303,6 @@ handle_incoming_notification_cb (SoupServer *server, SoupMessage *msg, const cha
 		mode = (gchar*) g_hash_table_lookup (query, "hub.mode");
 
 		if (feed->status == FEED_SUBSCRIPTION_SUBSCRIBING && strcmp (mode, "subscribe") == 0) {
-			feed->lease_time = strtoll ((gchar*) g_hash_table_lookup (query, "hub.lease_seconds"), NULL, 10);
-			feed->status = FEED_SUBSCRIPTION_SUBSCRIBED;
-
 			challenge = g_strdup ((gchar*) g_hash_table_lookup (query, "hub.challenge"));
 			soup_message_set_response (msg, "application/x-www-form-urlencoded", SOUP_MEMORY_TAKE, challenge, strlen (challenge));
 
@@ -385,39 +387,11 @@ close_server (FeedsSubscriber *sub)
 	}
 }
 
-static gboolean
-refresh_subscribtions (gpointer data)
-{
-	FeedsSubscriber *sub;
-
-	sub = (FeedsSubscriber*) data;
-	subscribe_feeds (sub);
-	return FALSE;
-}
-
-
 static void
 feeds_subscribed_cb (FeedsSubscriber *sub)
 {
-	gint64 min_lease;
-	GList *iter;
-	FeedChannelWrap *feed;
-
-	if (sub->priv->has_errors_in_subscription == TRUE) {
+	if (sub->priv->has_errors_in_subscription == TRUE)
 		try_another_subscription_policy (sub);
-	}
-	else {
-		min_lease = G_MAXINT64;
-
-		for (iter = sub->priv->feeds_list; iter; iter = g_list_next (iter)) {
-			feed = (FeedChannelWrap*) iter->data;
-
-			if (min_lease > feed->lease_time)
-				min_lease = feed->lease_time;
-		}
-
-		sub->priv->refresh_scheduler = g_timeout_add_seconds (min_lease, refresh_subscribtions, sub);
-	}
 }
 
 static void
@@ -483,67 +457,12 @@ subscribe_feeds (FeedsSubscriber *sub)
 	}
 }
 
-/*
-	Inspired by:
-	GNet - Networking library
-	Copyright (C) 2000-2002  David Helder
-	Copyright (C) 2000-2003  Andrew Lanoix
-	Copyright (C) 2007       Tim-Philipp Müller <tim centricular net>
-*/
 static GInetAddress*
-detect_internet_address (FeedsSubscriber *sub)
+my_detect_internet_address (FeedsSubscriber *sub)
 {
-	int sockfd;
-	gchar ip [100];
-	struct sockaddr_in serv_add;
-	struct sockaddr_storage myaddr;
-	socklen_t len;
-
-	if (sub->priv->local_addr != NULL)
-		return sub->priv->local_addr;
-
-	/*
-		TODO	This is to be adapted to work also over IPv6
-	*/
-
-	memset (&serv_add, 0, sizeof (serv_add));
-	serv_add.sin_family = AF_INET;
-	serv_add.sin_port = htons (80);
-
-	/*
-		This is the IP for slashdot.com
-	*/
-	if ((inet_pton (AF_INET, "216.34.181.45", &serv_add.sin_addr)) <= 0)
-		return NULL;
-
-	sockfd = socket (AF_INET, SOCK_DGRAM, 0);
-	if (!sockfd) {
-		g_warning ("Unable to open a socket to detect interface exposed to Internet");
-		return NULL;
-	}
+	if (sub->priv->local_addr == NULL)
+		sub->priv->local_addr = detect_internet_address ();
 
-	if (connect (sockfd, (struct sockaddr*) &serv_add, sizeof (serv_add)) == -1) {
-		g_warning ("Unable to open a connection to detect interface exposed to Internet");
-		close (sockfd);
-		return NULL;
-	}
-
-	len = sizeof (myaddr);
-	if (getsockname (sockfd, (struct sockaddr*) &myaddr, &len) != 0) {
-		close (sockfd);
-		g_warning ("Unable to obtain information about interface exposed to Internet");
-		return NULL;
-	}
-
-	close (sockfd);
-	memset (ip, 0, sizeof (char) * 100);
-
-	if (inet_ntop (AF_INET, &(((struct sockaddr_in*) &myaddr)->sin_addr), ip, 100) == NULL) {
-		g_warning ("Unable to obtain IP exposed to Internet");
-		return NULL;
-	}
-
-	sub->priv->local_addr = g_inet_address_new_from_string (ip);
 	return sub->priv->local_addr;
 }
 
@@ -555,7 +474,7 @@ create_and_run_server (FeedsSubscriber *sub)
 	SoupAddress *soup_addr;
 	GInetAddress *my_addr;
 
-	my_addr = detect_internet_address (sub);
+	my_addr = my_detect_internet_address (sub);
 	if (my_addr == NULL)
 		return;
 
@@ -647,7 +566,7 @@ init_run_server (FeedsSubscriber *sub)
 		  | has fixed hub | ---- YES ---> | is hub local | ----- YES ---+
 		  +---------------+               +--------------+              |
 		          |                               |                     |
-		          NO -----------------------------+                     |
+		          NO <----------------------------+                     |
 		          |                                                     |
 		+-------------------+           +-----------------+             |
 		| host seems public | -- YES -> | subscribe works | ---- YES ---+
@@ -675,14 +594,14 @@ init_run_server (FeedsSubscriber *sub)
 	if (sub->priv->hub != NULL) {
 		addr = g_inet_address_new_from_string (sub->priv->hub);
 		if (g_inet_address_get_is_link_local (addr) == TRUE) {
-			sub->priv->external_addr = detect_internet_address (sub);
+			sub->priv->external_addr = my_detect_internet_address (sub);
 			done = TRUE;
 			create_and_run_server (sub);
 		}
 	}
 
 	if (done == FALSE) {
-		addr = detect_internet_address (sub);
+		addr = my_detect_internet_address (sub);
 		if (address_seems_public (addr) == TRUE) {
 			sub->priv->external_addr = addr;
 			done = TRUE;
diff --git a/src/libgrss.h b/src/libgrss.h
index 51f45aa..1d5addc 100644
--- a/src/libgrss.h
+++ b/src/libgrss.h
@@ -38,6 +38,7 @@
 #include "feeds-store.h"
 #include "feeds-pool.h"
 #include "feeds-subscriber.h"
+#include "feeds-publisher.h"
 #include "feeds-group.h"
 
 #endif /* __LIBGRSS_H__ */
diff --git a/src/ns-handler.c b/src/ns-handler.c
index ae20f6d..79cdfb4 100644
--- a/src/ns-handler.c
+++ b/src/ns-handler.c
@@ -464,7 +464,7 @@ static gboolean
 ns_atom10_channel (FeedChannel *feed, xmlNodePtr cur)
 {
 	/*
-		Used to manage PubSubHub information in RSS feeds
+		Used to manage PubSubHubbub information in RSS feeds
 	*/
 
 	gchar *href;
diff --git a/src/utils.c b/src/utils.c
index 7d87c87..35a5fa8 100644
--- a/src/utils.c
+++ b/src/utils.c
@@ -547,6 +547,66 @@ date_parse_ISO8601 (const gchar *date)
 	return 0;
 }
 
+/*
+	Inspired by:
+	GNet - Networking library
+	Copyright (C) 2000-2002  David Helder
+	Copyright (C) 2000-2003  Andrew Lanoix
+	Copyright (C) 2007       Tim-Philipp Müller <tim centricular net>
+*/
+GInetAddress*
+detect_internet_address ()
+{
+	int sockfd;
+	gchar ip [100];
+	struct sockaddr_in serv_add;
+	struct sockaddr_storage myaddr;
+	socklen_t len;
+
+	/*
+		TODO	This is to be adapted to work also over IPv6
+	*/
+
+	memset (&serv_add, 0, sizeof (serv_add));
+	serv_add.sin_family = AF_INET;
+	serv_add.sin_port = htons (80);
+
+	/*
+		This is the IP for slashdot.com
+	*/
+	if ((inet_pton (AF_INET, "216.34.181.45", &serv_add.sin_addr)) <= 0)
+		return NULL;
+
+	sockfd = socket (AF_INET, SOCK_DGRAM, 0);
+	if (!sockfd) {
+		g_warning ("Unable to open a socket to detect interface exposed to Internet");
+		return NULL;
+	}
+
+	if (connect (sockfd, (struct sockaddr*) &serv_add, sizeof (serv_add)) == -1) {
+		g_warning ("Unable to open a connection to detect interface exposed to Internet");
+		close (sockfd);
+		return NULL;
+	}
+
+	len = sizeof (myaddr);
+	if (getsockname (sockfd, (struct sockaddr*) &myaddr, &len) != 0) {
+		close (sockfd);
+		g_warning ("Unable to obtain information about interface exposed to Internet");
+		return NULL;
+	}
+
+	close (sockfd);
+	memset (ip, 0, sizeof (char) * 100);
+
+	if (inet_ntop (AF_INET, &(((struct sockaddr_in*) &myaddr)->sin_addr), ip, 100) == NULL) {
+		g_warning ("Unable to obtain IP exposed to Internet");
+		return NULL;
+	}
+
+	return g_inet_address_new_from_string (ip);
+}
+
 gboolean
 address_seems_public (GInetAddress *addr)
 {
diff --git a/src/utils.h b/src/utils.h
index a69f86b..3d6ba2d 100644
--- a/src/utils.h
+++ b/src/utils.h
@@ -37,6 +37,7 @@
 #define PACKAGE			"libgrss"
 
 #define FREE_STRING(__str)	if (__str) { g_free (__str); __str = NULL; }
+#define FREE_OBJECT(__obj)	if (__obj) { g_object_unref (__obj); __obj = NULL; }
 
 gchar*		unhtmlize		(gchar *string);
 gchar*		unxmlize		(gchar * string);
@@ -48,6 +49,7 @@ xmlDocPtr	file_to_xml		(const gchar *path);
 time_t		date_parse_RFC822	(const gchar *date);
 time_t		date_parse_ISO8601	(const gchar *date);
 
+GInetAddress*	detect_internet_address	();
 gboolean	address_seems_public	(GInetAddress *addr);
 
 #endif /* __UTILS_LIBGRSS_H__ */



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]