[libgrss] First implementation of FeedsPublisher (subscription, content delivery yet TBD) Correction in FeedsS
- From: Roberto Guido <rguido src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [libgrss] First implementation of FeedsPublisher (subscription, content delivery yet TBD) Correction in FeedsS
- Date: Thu, 24 Jun 2010 23:40:54 +0000 (UTC)
commit 736fee805494b45cca5a770eea1214df293ee665
Author: Roberto Guido <bob4mail gmail com>
Date: Fri Jun 25 01:40:39 2010 +0200
First implementation of FeedsPublisher (subscription, content delivery yet TBD)
Correction in FeedsSubscriber subscription refresh procedure
NEWS | 1 +
TODO | 10 -
configure.ac | 2 +-
doc/reference/libgrss-docs.sgml | 1 +
doc/reference/libgrss-sections.txt | 11 +
src/Makefile.am | 2 +
src/feed-channel.c | 8 +-
src/feed-marshal.list | 1 +
src/feeds-publisher.c | 651 ++++++++++++++++++++++++++++++++++++
src/feeds-publisher.h | 57 ++++
src/feeds-subscriber.c | 119 +------
src/libgrss.h | 1 +
src/ns-handler.c | 2 +-
src/utils.c | 60 ++++
src/utils.h | 2 +
15 files changed, 812 insertions(+), 116 deletions(-)
---
diff --git a/NEWS b/NEWS
index 40d0ddb..49bea78 100644
--- a/NEWS
+++ b/NEWS
@@ -2,6 +2,7 @@ libgrss 0.5 (UNRELEASED)
==============================================================================
- Added XOXO files parser
- Added XBEL files parser
+- First test implementation of PubSubHub hub
- Added functions feed_channel_fetch_async() and feed_channel_new_from_file()
libgrss 0.4
diff --git a/TODO b/TODO
index 40cec78..e69de29 100644
--- a/TODO
+++ b/TODO
@@ -1,10 +0,0 @@
-- dynamic loading for parsing modules
-
-- add RSSCloud support (http://rsscloud.org)
-
-- namespace OpenSearch (http://a9.com/-/spec/opensearchrss/1.0/)
-
-- add implementation of PubSubHubBub publisher
-- add a FeedsPool mode in which filter already parsed and exposed items for each cycle
-- use libedataserver/EAccount to describe people found in feeds
-- use libchamplain/ChamplainPoint to describe GeoRSS coordinates
diff --git a/configure.ac b/configure.ac
index c39e293..dfd4e61 100644
--- a/configure.ac
+++ b/configure.ac
@@ -1,5 +1,5 @@
m4_define([libgrss_major_version], [0])
-m4_define([libgrss_minor_version], [4])
+m4_define([libgrss_minor_version], [5])
m4_define([libgrss_micro_version], [0])
m4_define([libgrss_version],
diff --git a/doc/reference/libgrss-docs.sgml b/doc/reference/libgrss-docs.sgml
index c9e7c2f..b8c22f3 100644
--- a/doc/reference/libgrss-docs.sgml
+++ b/doc/reference/libgrss-docs.sgml
@@ -39,6 +39,7 @@
<xi:include href="xml/feeds-pool.xml"/>
<xi:include href="xml/feeds-group.xml"/>
<xi:include href="xml/feeds-subscriber.xml"/>
+ <xi:include href="xml/feeds-publisher.xml"/>
<xi:include href="xml/feeds-store.xml"/>
</part>
diff --git a/doc/reference/libgrss-sections.txt b/doc/reference/libgrss-sections.txt
index c0a4bb2..5959410 100644
--- a/doc/reference/libgrss-sections.txt
+++ b/doc/reference/libgrss-sections.txt
@@ -45,6 +45,17 @@ feeds_subscriber_switch
</SECTION>
<SECTION>
+<FILE>feeds-publisher</FILE>
+<TITLE>FeedsPublisher</TITLE>
+FeedsPublisher
+feeds_publisher_new
+feeds_publisher_set_port
+feeds_publisher_set_topics
+feeds_publisher_switch
+feeds_publisher_publish
+</SECTION>
+
+<SECTION>
<FILE>feed-item</FILE>
<TITLE>FeedItem</TITLE>
FeedItem
diff --git a/src/Makefile.am b/src/Makefile.am
index 9495e76..ff2afe2 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -30,6 +30,7 @@ sources_public_h = \
feed-parser.h \
feeds-group.h \
feeds-pool.h \
+ feeds-publisher.h \
feeds-store.h \
feeds-subscriber.h
@@ -47,6 +48,7 @@ sources_c = \
feeds-group-handler.c \
feeds-opml-group-handler.c \
feeds-pool.c \
+ feeds-publisher.c \
feeds-store.c \
feeds-subscriber.c \
feeds-xbel-group-handler.c \
diff --git a/src/feed-channel.c b/src/feed-channel.c
index ae49c2d..a6b3b72 100644
--- a/src/feed-channel.c
+++ b/src/feed-channel.c
@@ -410,7 +410,7 @@ feed_channel_get_category (FeedChannel *channel)
* @hub: hub for the feed, or NULL
* @self: target referencing the feed, or NULL
*
- * To set information about PubSubHub for the channel. Options can be set
+ * To set information about PubSubHubbub for the channel. Options can be set
* alternatively, only with hub != %NULL or self != %NULL, and are saved
* internally to the object: the hub is considered valid
* (feed_channel_get_pubsubhub() returns %TRUE) only when both parameters has
@@ -441,10 +441,10 @@ feed_channel_set_pubsubhub (FeedChannel *channel, gchar *hub, gchar *self)
* @hub: location for the hub string, or NULL
* @self: location for the reference to the feed, or NULL
*
- * Retrieves information about the PubSub hub of the channel
+ * Retrieves information about the PubSubHubbub hub of the channel
*
- * Return value: %TRUE if a valid PubSub hub has been set for the @channel,
- * %FALSE otherwise
+ * Return value: %TRUE if a valid PubSubHubbub hub has been set for the
+ * @channel, %FALSE otherwise
*/
gboolean
feed_channel_get_pubsubhub (FeedChannel *channel, gchar **hub, gchar **self)
diff --git a/src/feed-marshal.list b/src/feed-marshal.list
index 8bf98f6..73366cb 100644
--- a/src/feed-marshal.list
+++ b/src/feed-marshal.list
@@ -1,2 +1,3 @@
VOID:OBJECT,POINTER
+VOID:OBJECT,STRING
VOID:OBJECT,OBJECT
diff --git a/src/feeds-publisher.c b/src/feeds-publisher.c
new file mode 100644
index 0000000..618f899
--- /dev/null
+++ b/src/feeds-publisher.c
@@ -0,0 +1,651 @@
+/*
+ * Copyright (C) 2010, Roberto Guido <rguido src gnome org>
+ * Michele Tameni <michele amdplanet it>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#include "feeds-publisher.h"
+#include "utils.h"
+#include "feed-marshal.h"
+
+#define VERIFICATION_INTERVAL_MINUTES 30
+#define DEFAULT_LEASE_INTERVAL (60 * 60)
+#define DEFAULT_SERVER_PORT 80
+#define DEFAULT_REFRESH_CHECK_INTERVAL 60
+
+#define FEEDS_PUBLISHER_GET_PRIVATE(obj) (G_TYPE_INSTANCE_GET_PRIVATE ((obj), FEEDS_PUBLISHER_TYPE, FeedsPublisherPrivate))
+
+/**
+ * SECTION: feeds-publisher
+ * @short_description: PubSubHubbub publisher and hub
+ *
+ * #FeedsPublisher implements a server able to receive subscriptions by
+ * PubSubHubbub clients and deliver them new contents.
+ * For more information look at http://code.google.com/p/pubsubhubbub/
+ */
+
+static void subscribe_verify_cb (SoupSession *session, SoupMessage *msg, gpointer user_data);
+
+struct _FeedsPublisherPrivate {
+ gboolean running;
+
+ int port;
+ SoupServer *server;
+ GInetAddress *local_addr;
+
+ SoupSession *soupsession;
+
+ time_t current_time;
+ GHashTable *topics;
+ guint refresh_handler;
+};
+
+typedef struct {
+ FeedChannel *channel;
+ GList *subscribers;
+} ValidTopic;
+
+typedef enum {
+ REMOTE_SUBSCRIBING,
+ REMOTE_UNSUBSCRIBING,
+} SUBSCRIBER_STATUS;
+
+typedef struct {
+ FeedsPublisher *parent;
+ SUBSCRIBER_STATUS status;
+ gchar *topic;
+ gchar *callback;
+ gchar *challenge;
+ gint64 lease_interval;
+ time_t first_contact_time;
+ time_t registration_time;
+ SoupMessage *registration_msg;
+} RemoteSubscriber;
+
+enum {
+ SUBSCRIPTION_ADDED,
+ SUBSCRIPTION_DELETED,
+ LAST_SIGNAL
+};
+
+static guint signals [LAST_SIGNAL] = {0};
+
+G_DEFINE_TYPE (FeedsPublisher, feeds_publisher, G_TYPE_OBJECT);
+
+static void
+destroy_remote_subscriber (RemoteSubscriber *client)
+{
+ FREE_STRING (client->topic);
+ FREE_STRING (client->callback);
+ FREE_STRING (client->challenge);
+ FREE_OBJECT (client->registration_msg);
+ g_free (client);
+}
+
+static void
+destroy_topic (gpointer user_data)
+{
+ GList *iter;
+ ValidTopic *topic;
+
+ topic = user_data;
+ g_object_unref (topic->channel);
+
+ if (topic->subscribers != NULL) {
+ for (iter = topic->subscribers; iter; iter = iter->next)
+ destroy_remote_subscriber (iter->data);
+
+ g_list_free (topic->subscribers);
+ }
+
+ g_free (topic);
+}
+
+static gboolean
+topic_remove_helper (gpointer key, gpointer value, gpointer user_data)
+{
+ return TRUE;
+}
+
+static void
+remove_current_topics (FeedsPublisher *pub)
+{
+ g_hash_table_foreach_remove (pub->priv->topics, topic_remove_helper, NULL);
+}
+
+static void
+feeds_publisher_finalize (GObject *obj)
+{
+ FeedsPublisher *pub;
+
+ pub = FEEDS_PUBLISHER (obj);
+ feeds_publisher_switch (pub, FALSE);
+
+ remove_current_topics (pub);
+ g_hash_table_destroy (pub->priv->topics);
+}
+
+static void
+feeds_publisher_class_init (FeedsPublisherClass *klass)
+{
+ GObjectClass *gobject_class;
+
+ g_type_class_add_private (klass, sizeof (FeedsPublisherPrivate));
+
+ gobject_class = G_OBJECT_CLASS (klass);
+ gobject_class->finalize = feeds_publisher_finalize;
+
+ /**
+ * FeedsSubscriber::new_subscription:
+ * @pub: the #FeedsPublisher emitting the signal
+ * @topic: #FeedChannel for which subscription has been added
+ * @callback: callback required for new subscriber
+ *
+ * Emitted when a new remote client subscribes to this publisher
+ */
+ signals [SUBSCRIPTION_ADDED] = g_signal_new ("new-subscription", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, 0,
+ NULL, NULL, feed_marshal_VOID__OBJECT_STRING,
+ G_TYPE_NONE, 2, FEED_CHANNEL_TYPE, G_TYPE_STRING);
+
+ /**
+ * FeedsSubscriber::new_subscription:
+ * @pub: the #FeedsPublisher emitting the signal
+ * @topic: #FeedChannel for which subscription has been removed
+ * @callback: callback revoked by the subscriber
+ *
+ * Emitted when a new remote client unsubscribes to this publisher
+ */
+ signals [SUBSCRIPTION_DELETED] = g_signal_new ("delete-subscription", G_TYPE_FROM_CLASS (klass), G_SIGNAL_RUN_LAST, 0,
+ NULL, NULL, feed_marshal_VOID__OBJECT_STRING,
+ G_TYPE_NONE, 2, FEED_CHANNEL_TYPE, G_TYPE_STRING);
+}
+
+static void
+feeds_publisher_init (FeedsPublisher *node)
+{
+ node->priv = FEEDS_PUBLISHER_GET_PRIVATE (node);
+ memset (node->priv, 0, sizeof (FeedsPublisherPrivate));
+ node->priv->port = DEFAULT_SERVER_PORT;
+ node->priv->topics = g_hash_table_new_full (g_str_hash, g_str_equal, NULL, destroy_topic);
+}
+
+/**
+ * feeds_publisher_new:
+ *
+ * Allocates a new #FeedsPublisher
+ *
+ * Return value: a new #FeedsPublisher
+ */
+FeedsPublisher*
+feeds_publisher_new ()
+{
+ return g_object_new (FEEDS_PUBLISHER_TYPE, NULL);
+}
+
+static void
+add_client_to_topic (FeedsPublisher *pub, RemoteSubscriber *client)
+{
+ ValidTopic *topic;
+
+ topic = g_hash_table_lookup (pub->priv->topics, client->topic);
+ if (topic != NULL) {
+ topic->subscribers = g_list_prepend (topic->subscribers, client);
+ client->registration_time = time (NULL);
+
+ if (client->registration_msg != NULL) {
+ g_object_unref (client->registration_msg);
+ client->registration_msg = NULL;
+ }
+
+ g_signal_emit (pub, signals [SUBSCRIPTION_ADDED], 0, topic->channel, client->callback);
+ }
+}
+
+static void
+remove_client_to_topic (FeedsPublisher *pub, RemoteSubscriber *client)
+{
+ GList *iter;
+ ValidTopic *topic;
+
+ topic = g_hash_table_lookup (pub->priv->topics, client->topic);
+ if (topic != NULL) {
+ for (iter = topic->subscribers; iter; iter = iter->next) {
+ if (iter->data == client) {
+ topic->subscribers = g_list_delete_link (topic->subscribers, iter);
+ g_signal_emit (pub, signals [SUBSCRIPTION_DELETED], 0, topic->channel, client->callback);
+ destroy_remote_subscriber (client);
+ }
+ }
+ }
+}
+
+static gchar*
+random_string ()
+{
+ register int i;
+ gchar str [50];
+ gchar *chars = "qwertyuiopasdfghjklzxcvbnm1234567890QWERTYUIOPASDFGHJKLZXCVBNM";
+
+ srand (time (NULL));
+
+ for (i = 0; i < 49; i++)
+ str [i] = chars [rand () % 62];
+ str [i] = '\0';
+
+ return g_strdup (str);
+}
+
+static SoupMessage*
+verification_message_for_client (RemoteSubscriber *client)
+{
+ gchar *mode;
+ gchar *body;
+ SoupMessage *ret;
+
+ FREE_STRING (client->challenge);
+ client->challenge = random_string ();
+
+ switch (client->status) {
+ case REMOTE_SUBSCRIBING:
+ mode = "subscribe";
+ break;
+
+ case REMOTE_UNSUBSCRIBING:
+ mode = "unsubscribe";
+ break;
+
+ default:
+ return NULL;
+ break;
+ }
+
+ body = g_strdup_printf ("%s?hub.mode=%s&hub.topic=%s&hub.challenge=%s&hub.lease_seconds=%ld",
+ client->callback, mode, client->topic, client->challenge, client->lease_interval);
+
+ ret = soup_message_new ("GET", body);
+ g_free (body);
+
+ return ret;
+}
+
+static gboolean
+retry_subscriber_verification (gpointer user_data)
+{
+ SoupMessage *msg;
+ RemoteSubscriber *client;
+
+ client = user_data;
+ msg = verification_message_for_client (client);
+ soup_session_queue_message (client->parent->priv->soupsession, msg, subscribe_verify_cb, client);
+ return FALSE;
+}
+
+static void
+verification_failed (RemoteSubscriber *client)
+{
+ if (client->registration_msg != NULL) {
+ soup_message_set_status (client->registration_msg, 404);
+ soup_server_unpause_message (client->parent->priv->server, client->registration_msg);
+ destroy_remote_subscriber (client);
+ }
+ else {
+ /*
+ If the verification fails for 6 hours, the procedure is dropped.
+ Otherwise it is re-scheduled each VERIFICATION_INTERVAL_MINUTES
+ minutes
+ Cfr. PubSubHubbub Core 0.3 Section 6.2.1
+ */
+ if (time (NULL) - client->first_contact_time > (60 * 60 * 6)) {
+ if (client->status == REMOTE_SUBSCRIBING)
+ destroy_remote_subscriber (client);
+ }
+ else {
+ g_timeout_add_seconds (60 * VERIFICATION_INTERVAL_MINUTES, retry_subscriber_verification, client);
+ }
+ }
+}
+
+static void
+subscribe_verify_cb (SoupSession *session, SoupMessage *msg, gpointer user_data)
+{
+ guint status;
+ RemoteSubscriber *client;
+
+ client = user_data;
+
+ g_object_get (msg, "status-code", &status, NULL);
+
+ if (status < 200 || status > 299) {
+ verification_failed (client);
+ }
+ else {
+ if (msg->response_body->data == NULL || strcmp (msg->response_body->data, client->challenge) != 0) {
+ verification_failed (client);
+ }
+ else {
+ if (client->registration_msg != NULL) {
+ soup_message_set_status (client->registration_msg, 204);
+ soup_server_unpause_message (client->parent->priv->server, client->registration_msg);
+ }
+
+ if (client->status == REMOTE_SUBSCRIBING)
+ add_client_to_topic (client->parent, client);
+ else
+ remove_client_to_topic (client->parent, client);
+ }
+ }
+}
+
+static RemoteSubscriber*
+search_subscriber_by_topic_and_callback (FeedsPublisher *pub, gchar *topic, gchar *callback)
+{
+ GList *iter;
+ ValidTopic *topic_test;
+ RemoteSubscriber *client_test;
+ RemoteSubscriber *ret;
+
+ ret = NULL;
+
+ topic_test = g_hash_table_lookup (pub->priv->topics, topic);
+ if (topic_test != NULL) {
+ for (iter = topic_test->subscribers; iter; iter = iter->next) {
+ client_test = iter->data;
+ if (strcmp (callback, client_test->callback) == 0) {
+ ret = iter->data;
+ break;
+ }
+ }
+ }
+
+ return ret;
+}
+
+static void
+handle_incoming_requests_cb (SoupServer *server, SoupMessage *msg, const char *path,
+ GHashTable *query, SoupClientContext *context, gpointer user_data)
+{
+ int i;
+ int len;
+ gchar **contents;
+ gchar *mode;
+ gchar *topic;
+ gchar *lease;
+ gchar *callback;
+ gchar *verify;
+ SoupMessage *verify_msg;
+ FeedsPublisher *pub;
+ RemoteSubscriber *client;
+
+ pub = user_data;
+
+ contents = g_strsplit (msg->request_body->data, "hub.", -1);
+ if (contents == NULL) {
+ soup_message_set_status (msg, 404);
+ return;
+ }
+
+ client = NULL;
+ mode = NULL;
+ topic = NULL;
+ lease = NULL;
+ callback = NULL;
+ verify = NULL;
+
+ for (i = 0; contents [i] != NULL; i++) {
+ len = strlen (contents [i]) - 1;
+ if (contents [i] [len] == '&')
+ contents [i] [len] = '\0';
+
+ if (strncmp (contents [i], "mode", 4) == 0)
+ mode = contents [i] + 5;
+ else if (strncmp (contents [i], "topic", 5) == 0)
+ topic = contents [i] + 6;
+ else if (strncmp (contents [i], "lease_seconds", 13) == 0)
+ lease = contents [i] + 14;
+ else if (strncmp (contents [i], "callback", 8) == 0)
+ callback = contents [i] + 9;
+ else if (strncmp (contents [i], "verify", 6) == 0)
+ verify = contents [i] + 7;
+ }
+
+ if (mode == NULL) {
+ soup_message_set_status (msg, 404);
+ }
+ else {
+ if (strcmp (mode, "subscribe") == 0) {
+ /*
+ TODO What to do if an invalid topic is required?
+ */
+ client = g_new0 (RemoteSubscriber, 1);
+ client->parent = pub;
+ client->first_contact_time = time (NULL);
+ client->topic = g_strdup (topic);
+ client->callback = g_strdup (callback);
+
+ if (lease != NULL)
+ client->lease_interval = strtoll (lease, NULL, 10);
+ else
+ client->lease_interval = DEFAULT_LEASE_INTERVAL;
+
+ client->status = REMOTE_SUBSCRIBING;
+ }
+ else if (strcmp (mode, "unsubscribe") == 0) {
+ client = search_subscriber_by_topic_and_callback (pub, topic, callback);
+ if (client != NULL)
+ client->status = REMOTE_UNSUBSCRIBING;
+ }
+
+ if (client != NULL) {
+ verify_msg = verification_message_for_client (client);
+
+ if (strcmp (verify, "sync") == 0) {
+ g_object_ref (msg);
+ client->registration_msg = msg;
+ soup_server_pause_message (server, msg);
+ soup_session_queue_message (pub->priv->soupsession, verify_msg, subscribe_verify_cb, client);
+ }
+ else {
+ soup_session_queue_message (pub->priv->soupsession, verify_msg, subscribe_verify_cb, client);
+ soup_message_set_status (msg, 202);
+ }
+ }
+ }
+
+ g_strfreev (contents);
+}
+
+static void
+close_server (FeedsPublisher *pub)
+{
+ if (pub->priv->server != NULL) {
+ soup_server_remove_handler (pub->priv->server, NULL);
+ soup_server_quit (pub->priv->server);
+ g_object_unref (pub->priv->server);
+ pub->priv->server = NULL;
+ }
+}
+
+static GInetAddress*
+my_detect_internet_address (FeedsPublisher *pub)
+{
+ if (pub->priv->local_addr == NULL)
+ pub->priv->local_addr = detect_internet_address ();
+
+ return pub->priv->local_addr;
+}
+
+static void
+create_and_run_server (FeedsPublisher *pub)
+{
+ gchar *ip;
+ struct sockaddr_in low_addr;
+ SoupAddress *soup_addr;
+ GInetAddress *my_addr;
+
+ my_addr = my_detect_internet_address (pub);
+ if (my_addr == NULL)
+ return;
+
+ if (pub->priv->soupsession == NULL)
+ pub->priv->soupsession = soup_session_async_new ();
+
+ low_addr.sin_family = AF_INET;
+ low_addr.sin_port = htons (pub->priv->port);
+ ip = g_inet_address_to_string (my_addr);
+ inet_pton (AF_INET, ip, &low_addr.sin_addr);
+ g_free (ip);
+
+ soup_addr = soup_address_new_from_sockaddr ((struct sockaddr*) &low_addr, sizeof (low_addr));
+ pub->priv->server = soup_server_new ("port", pub->priv->port, "interface", soup_addr, NULL);
+ g_object_unref (soup_addr);
+
+ soup_server_add_handler (pub->priv->server, NULL, handle_incoming_requests_cb, pub, NULL);
+ soup_server_run_async (pub->priv->server);
+}
+
+/**
+ * feeds_publisher_set_port:
+ * @pub: a #FeedsPublisher
+ * @port: new listening port for the server
+ *
+ * To customize the port opened by the local server to catch incoming
+ * subscriptions. By default this is 80. Changing the port while the
+ * subscriber is running imply restart the local server
+ */
+void
+feeds_publisher_set_port (FeedsPublisher *pub, int port)
+{
+ if (port != pub->priv->port) {
+ pub->priv->port = port;
+
+ if (pub->priv->running == TRUE) {
+ feeds_publisher_switch (pub, FALSE);
+ feeds_publisher_switch (pub, TRUE);
+ }
+ }
+}
+
+/**
+ * feeds_publisher_set_topics:
+ * @pub: a #FeedsPublisher
+ * @topics: a list of #FeedChannels
+ *
+ * To define a list of valid "topics" for which the #FeedsPublisher will
+ * deliver contents. Sources of those channels, as retrieved by
+ * feed_channel_get_source(), would be found as hub.topic parameter in
+ * registration requests from remote subscribers
+ */
+void
+feeds_publisher_set_topics (FeedsPublisher *pub, GList *topics)
+{
+ GList *iter;
+ ValidTopic *topic;
+
+ remove_current_topics (pub);
+
+ for (iter = topics; iter; iter = iter->next) {
+ topic = g_new0 (ValidTopic, 1);
+ topic->channel = g_object_ref (iter->data);
+ g_hash_table_insert (pub->priv->topics, (gpointer) feed_channel_get_source (iter->data), topic);
+ }
+}
+
+static void
+refresh_subscribers_in_topic (gpointer key, gpointer value, gpointer user_data)
+{
+ GList *iter;
+ SoupMessage *verify_msg;
+ FeedsPublisher *pub;
+ ValidTopic *topic;
+ RemoteSubscriber *client;
+
+ pub = user_data;
+ topic = value;
+
+ for (iter = topic->subscribers; iter; iter = iter->next) {
+ client = iter->data;
+
+ if (client->registration_time + client->lease_interval + DEFAULT_REFRESH_CHECK_INTERVAL >= pub->priv->current_time) {
+ verify_msg = verification_message_for_client (client);
+ soup_session_queue_message (pub->priv->soupsession, verify_msg, subscribe_verify_cb, client);
+ }
+ }
+}
+
+static gboolean
+refresh_subscribers (gpointer user_data)
+{
+ FeedsPublisher *pub;
+
+ pub = user_data;
+ pub->priv->current_time = time (NULL);
+ g_hash_table_foreach (pub->priv->topics, refresh_subscribers_in_topic, pub);
+
+ return TRUE;
+}
+
+static void
+install_refresh_handler (FeedsPublisher *pub)
+{
+ pub->priv->refresh_handler = g_timeout_add_seconds (DEFAULT_REFRESH_CHECK_INTERVAL, refresh_subscribers, pub);
+}
+
+static void
+remove_refresh_handler (FeedsPublisher *pub)
+{
+ g_source_remove (pub->priv->refresh_handler);
+}
+
+/**
+ * feeds_publisher_switch:
+ * @pub: a #FeedsPublisher
+ * @run: TRUE to run the subscriber, FALSE to pause it
+ *
+ * Permits to pause or resume @pub listening for events
+ */
+void
+feeds_publisher_switch (FeedsPublisher *pub, gboolean run)
+{
+ if (pub->priv->running != run) {
+ pub->priv->running = run;
+
+ if (run == TRUE) {
+ create_and_run_server (pub);
+ install_refresh_handler (pub);
+ }
+ else {
+ remove_refresh_handler (pub);
+ close_server (pub);
+ }
+ }
+}
+
+/**
+ * feeds_publisher_publish:
+ * @pub: a #FeedsPublisher
+ * @item: item to publish
+ *
+ * Deliver the specified @item to all subscribers registered for the parent's
+ * channel (defined as "topic")
+ */
+void
+feeds_publisher_publish (FeedsPublisher *pub, FeedItem *item)
+{
+ /*
+ TODO
+ */
+}
diff --git a/src/feeds-publisher.h b/src/feeds-publisher.h
new file mode 100644
index 0000000..a21134e
--- /dev/null
+++ b/src/feeds-publisher.h
@@ -0,0 +1,57 @@
+/*
+ * Copyright (C) 2010, Roberto Guido <rguido src gnome org>
+ * Michele Tameni <michele amdplanet it>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef __FEEDS_PUBLISHER_H__
+#define __FEEDS_PUBLISHER_H__
+
+#include "libgrss.h"
+
+#define FEEDS_PUBLISHER_TYPE (feeds_publisher_get_type())
+#define FEEDS_PUBLISHER(o) (G_TYPE_CHECK_INSTANCE_CAST ((o), FEEDS_PUBLISHER_TYPE, FeedsPublisher))
+#define FEEDS_PUBLISHER_CLASS(c) (G_TYPE_CHECK_CLASS_CAST ((c), FEEDS_PUBLISHER_TYPE, FeedsPublisherClass))
+#define IS_FEEDS_PUBLISHER(o) (G_TYPE_CHECK_INSTANCE_TYPE ((o), FEEDS_PUBLISHER_TYPE))
+#define IS_FEEDS_PUBLISHER_CLASS(c) (G_TYPE_CHECK_CLASS_TYPE ((c), FEEDS_PUBLISHER_TYPE))
+#define FEEDS_PUBLISHER_GET_CLASS(o) (G_TYPE_INSTANCE_GET_CLASS ((o), FEEDS_PUBLISHER_TYPE, FeedsPublisherClass))
+
+typedef struct _FeedsPublisher FeedsPublisher;
+typedef struct _FeedsPublisherPrivate FeedsPublisherPrivate;
+
+struct _FeedsPublisher {
+ GObject parent;
+ FeedsPublisherPrivate *priv;
+};
+
+typedef struct {
+ GObjectClass parent;
+
+ void (*new_subscription) (FeedsPublisher *pub, FeedChannel *topic, gchar *callback);
+ void (*delete_subscription) (FeedsPublisher *pub, FeedChannel *topic, gchar *callback);
+} FeedsPublisherClass;
+
+GType feeds_publisher_get_type () G_GNUC_CONST;
+
+FeedsPublisher* feeds_publisher_new ();
+
+void feeds_publisher_set_port (FeedsPublisher *pub, int port);
+void feeds_publisher_set_topics (FeedsPublisher *pub, GList *topics);
+void feeds_publisher_switch (FeedsPublisher *pub, gboolean run);
+void feeds_publisher_publish (FeedsPublisher *pub, FeedItem *item);
+
+#endif /* __FEEDS_PUBLISHER_H__ */
diff --git a/src/feeds-subscriber.c b/src/feeds-subscriber.c
index 419c198..a9547fe 100644
--- a/src/feeds-subscriber.c
+++ b/src/feeds-subscriber.c
@@ -29,10 +29,10 @@
/**
* SECTION: feeds-subscriber
- * @short_description: PubSubHub subscriber
+ * @short_description: PubSubHubbub subscriber
*
* #FeedsSubscriber is an alternative for #FeedsPool, able to receive
- * real-time notifications by feeds managed by a PubSubHub hub.
+ * real-time notifications by feeds managed by a PubSubHubbub hub.
* When the subscriber is executed (with feeds_subscriber_switch()) it opens
* a server on a local port (configurable with feeds_subscriber_set_port()),
* engage a subscription for each #FeedChannel passed with
@@ -41,6 +41,14 @@
* For more information look at http://code.google.com/p/pubsubhubbub/
*/
+/*
+ TODO There were an error in refreshing subscriptions, since is the
+ hub which must send refresh requests, not subscriber.
+ That has been removed, but it would be better to provide a
+ control mechanism able to refresh subscriptions from the
+ client side in case of problems by the server side
+*/
+
typedef enum {
SUBSCRIBER_IS_IDLE,
SUBSCRIBER_TRYING_LOCAL_IP,
@@ -85,8 +93,6 @@ typedef struct {
int identifier;
gchar *path;
- gint64 lease_time;
-
FeedsSubscriber *sub;
} FeedChannelWrap;
@@ -201,7 +207,7 @@ create_listened (FeedsSubscriber *sub, GList *feeds)
feed = (FeedChannel*) iter->data;
if (feed_channel_get_pubsubhub (feed, NULL, NULL) == FALSE) {
- g_warning ("Feed at %s has not PubSubHub capability", feed_channel_get_source (feed));
+ g_warning ("Feed at %s has not PubSubHubbub capability", feed_channel_get_source (feed));
return FALSE;
}
}
@@ -215,7 +221,6 @@ create_listened (FeedsSubscriber *sub, GList *feeds)
g_object_ref (feed);
wrap->status = FEED_SUBSCRIPTION_IDLE;
wrap->path = NULL;
- wrap->lease_time = 60;
wrap->channel = feed;
wrap->sub = sub;
list = g_list_prepend (list, wrap);
@@ -298,9 +303,6 @@ handle_incoming_notification_cb (SoupServer *server, SoupMessage *msg, const cha
mode = (gchar*) g_hash_table_lookup (query, "hub.mode");
if (feed->status == FEED_SUBSCRIPTION_SUBSCRIBING && strcmp (mode, "subscribe") == 0) {
- feed->lease_time = strtoll ((gchar*) g_hash_table_lookup (query, "hub.lease_seconds"), NULL, 10);
- feed->status = FEED_SUBSCRIPTION_SUBSCRIBED;
-
challenge = g_strdup ((gchar*) g_hash_table_lookup (query, "hub.challenge"));
soup_message_set_response (msg, "application/x-www-form-urlencoded", SOUP_MEMORY_TAKE, challenge, strlen (challenge));
@@ -385,39 +387,11 @@ close_server (FeedsSubscriber *sub)
}
}
-static gboolean
-refresh_subscribtions (gpointer data)
-{
- FeedsSubscriber *sub;
-
- sub = (FeedsSubscriber*) data;
- subscribe_feeds (sub);
- return FALSE;
-}
-
-
static void
feeds_subscribed_cb (FeedsSubscriber *sub)
{
- gint64 min_lease;
- GList *iter;
- FeedChannelWrap *feed;
-
- if (sub->priv->has_errors_in_subscription == TRUE) {
+ if (sub->priv->has_errors_in_subscription == TRUE)
try_another_subscription_policy (sub);
- }
- else {
- min_lease = G_MAXINT64;
-
- for (iter = sub->priv->feeds_list; iter; iter = g_list_next (iter)) {
- feed = (FeedChannelWrap*) iter->data;
-
- if (min_lease > feed->lease_time)
- min_lease = feed->lease_time;
- }
-
- sub->priv->refresh_scheduler = g_timeout_add_seconds (min_lease, refresh_subscribtions, sub);
- }
}
static void
@@ -483,67 +457,12 @@ subscribe_feeds (FeedsSubscriber *sub)
}
}
-/*
- Inspired by:
- GNet - Networking library
- Copyright (C) 2000-2002 David Helder
- Copyright (C) 2000-2003 Andrew Lanoix
- Copyright (C) 2007 Tim-Philipp Müller <tim centricular net>
-*/
static GInetAddress*
-detect_internet_address (FeedsSubscriber *sub)
+my_detect_internet_address (FeedsSubscriber *sub)
{
- int sockfd;
- gchar ip [100];
- struct sockaddr_in serv_add;
- struct sockaddr_storage myaddr;
- socklen_t len;
-
- if (sub->priv->local_addr != NULL)
- return sub->priv->local_addr;
-
- /*
- TODO This is to be adapted to work also over IPv6
- */
-
- memset (&serv_add, 0, sizeof (serv_add));
- serv_add.sin_family = AF_INET;
- serv_add.sin_port = htons (80);
-
- /*
- This is the IP for slashdot.com
- */
- if ((inet_pton (AF_INET, "216.34.181.45", &serv_add.sin_addr)) <= 0)
- return NULL;
-
- sockfd = socket (AF_INET, SOCK_DGRAM, 0);
- if (!sockfd) {
- g_warning ("Unable to open a socket to detect interface exposed to Internet");
- return NULL;
- }
+ if (sub->priv->local_addr == NULL)
+ sub->priv->local_addr = detect_internet_address ();
- if (connect (sockfd, (struct sockaddr*) &serv_add, sizeof (serv_add)) == -1) {
- g_warning ("Unable to open a connection to detect interface exposed to Internet");
- close (sockfd);
- return NULL;
- }
-
- len = sizeof (myaddr);
- if (getsockname (sockfd, (struct sockaddr*) &myaddr, &len) != 0) {
- close (sockfd);
- g_warning ("Unable to obtain information about interface exposed to Internet");
- return NULL;
- }
-
- close (sockfd);
- memset (ip, 0, sizeof (char) * 100);
-
- if (inet_ntop (AF_INET, &(((struct sockaddr_in*) &myaddr)->sin_addr), ip, 100) == NULL) {
- g_warning ("Unable to obtain IP exposed to Internet");
- return NULL;
- }
-
- sub->priv->local_addr = g_inet_address_new_from_string (ip);
return sub->priv->local_addr;
}
@@ -555,7 +474,7 @@ create_and_run_server (FeedsSubscriber *sub)
SoupAddress *soup_addr;
GInetAddress *my_addr;
- my_addr = detect_internet_address (sub);
+ my_addr = my_detect_internet_address (sub);
if (my_addr == NULL)
return;
@@ -647,7 +566,7 @@ init_run_server (FeedsSubscriber *sub)
| has fixed hub | ---- YES ---> | is hub local | ----- YES ---+
+---------------+ +--------------+ |
| | |
- NO -----------------------------+ |
+ NO <----------------------------+ |
| |
+-------------------+ +-----------------+ |
| host seems public | -- YES -> | subscribe works | ---- YES ---+
@@ -675,14 +594,14 @@ init_run_server (FeedsSubscriber *sub)
if (sub->priv->hub != NULL) {
addr = g_inet_address_new_from_string (sub->priv->hub);
if (g_inet_address_get_is_link_local (addr) == TRUE) {
- sub->priv->external_addr = detect_internet_address (sub);
+ sub->priv->external_addr = my_detect_internet_address (sub);
done = TRUE;
create_and_run_server (sub);
}
}
if (done == FALSE) {
- addr = detect_internet_address (sub);
+ addr = my_detect_internet_address (sub);
if (address_seems_public (addr) == TRUE) {
sub->priv->external_addr = addr;
done = TRUE;
diff --git a/src/libgrss.h b/src/libgrss.h
index 51f45aa..1d5addc 100644
--- a/src/libgrss.h
+++ b/src/libgrss.h
@@ -38,6 +38,7 @@
#include "feeds-store.h"
#include "feeds-pool.h"
#include "feeds-subscriber.h"
+#include "feeds-publisher.h"
#include "feeds-group.h"
#endif /* __LIBGRSS_H__ */
diff --git a/src/ns-handler.c b/src/ns-handler.c
index ae20f6d..79cdfb4 100644
--- a/src/ns-handler.c
+++ b/src/ns-handler.c
@@ -464,7 +464,7 @@ static gboolean
ns_atom10_channel (FeedChannel *feed, xmlNodePtr cur)
{
/*
- Used to manage PubSubHub information in RSS feeds
+ Used to manage PubSubHubbub information in RSS feeds
*/
gchar *href;
diff --git a/src/utils.c b/src/utils.c
index 7d87c87..35a5fa8 100644
--- a/src/utils.c
+++ b/src/utils.c
@@ -547,6 +547,66 @@ date_parse_ISO8601 (const gchar *date)
return 0;
}
+/*
+ Inspired by:
+ GNet - Networking library
+ Copyright (C) 2000-2002 David Helder
+ Copyright (C) 2000-2003 Andrew Lanoix
+ Copyright (C) 2007 Tim-Philipp Müller <tim centricular net>
+*/
+GInetAddress*
+detect_internet_address ()
+{
+ int sockfd;
+ gchar ip [100];
+ struct sockaddr_in serv_add;
+ struct sockaddr_storage myaddr;
+ socklen_t len;
+
+ /*
+ TODO This is to be adapted to work also over IPv6
+ */
+
+ memset (&serv_add, 0, sizeof (serv_add));
+ serv_add.sin_family = AF_INET;
+ serv_add.sin_port = htons (80);
+
+ /*
+ This is the IP for slashdot.com
+ */
+ if ((inet_pton (AF_INET, "216.34.181.45", &serv_add.sin_addr)) <= 0)
+ return NULL;
+
+ sockfd = socket (AF_INET, SOCK_DGRAM, 0);
+ if (!sockfd) {
+ g_warning ("Unable to open a socket to detect interface exposed to Internet");
+ return NULL;
+ }
+
+ if (connect (sockfd, (struct sockaddr*) &serv_add, sizeof (serv_add)) == -1) {
+ g_warning ("Unable to open a connection to detect interface exposed to Internet");
+ close (sockfd);
+ return NULL;
+ }
+
+ len = sizeof (myaddr);
+ if (getsockname (sockfd, (struct sockaddr*) &myaddr, &len) != 0) {
+ close (sockfd);
+ g_warning ("Unable to obtain information about interface exposed to Internet");
+ return NULL;
+ }
+
+ close (sockfd);
+ memset (ip, 0, sizeof (char) * 100);
+
+ if (inet_ntop (AF_INET, &(((struct sockaddr_in*) &myaddr)->sin_addr), ip, 100) == NULL) {
+ g_warning ("Unable to obtain IP exposed to Internet");
+ return NULL;
+ }
+
+ return g_inet_address_new_from_string (ip);
+}
+
gboolean
address_seems_public (GInetAddress *addr)
{
diff --git a/src/utils.h b/src/utils.h
index a69f86b..3d6ba2d 100644
--- a/src/utils.h
+++ b/src/utils.h
@@ -37,6 +37,7 @@
#define PACKAGE "libgrss"
#define FREE_STRING(__str) if (__str) { g_free (__str); __str = NULL; }
+#define FREE_OBJECT(__obj) if (__obj) { g_object_unref (__obj); __obj = NULL; }
gchar* unhtmlize (gchar *string);
gchar* unxmlize (gchar * string);
@@ -48,6 +49,7 @@ xmlDocPtr file_to_xml (const gchar *path);
time_t date_parse_RFC822 (const gchar *date);
time_t date_parse_ISO8601 (const gchar *date);
+GInetAddress* detect_internet_address ();
gboolean address_seems_public (GInetAddress *addr);
#endif /* __UTILS_LIBGRSS_H__ */
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]