[libgrss] Improvements in FeedsPublisher API, now able to export a feed as file and directly deliver contents
- From: Roberto Guido <rguido src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [libgrss] Improvements in FeedsPublisher API, now able to export a feed as file and directly deliver contents
- Date: Sat, 26 Jun 2010 18:49:32 +0000 (UTC)
commit 76a444be09421f25670124fd98824fd276a1a58f
Author: Roberto Guido <bob4mail gmail com>
Date: Sat Jun 26 20:49:01 2010 +0200
Improvements in FeedsPublisher API, now able to export a feed as file and
directly deliver contents with built-in webserver. Warning: yet to be tested
NEWS | 1 +
src/feeds-publisher.c | 463 +++++++++++++++++++++++++++++++++++++++++--------
src/feeds-publisher.h | 10 +-
src/utils.c | 11 ++
src/utils.h | 2 +
5 files changed, 410 insertions(+), 77 deletions(-)
---
diff --git a/NEWS b/NEWS
index 49bea78..05f897b 100644
--- a/NEWS
+++ b/NEWS
@@ -2,6 +2,7 @@ libgrss 0.5 (UNRELEASED)
==============================================================================
- Added XOXO files parser
- Added XBEL files parser
+- Added FeedsPublisher to expose feeds contents as files or by local server
- First test implementation of PubSubHub hub
- Added functions feed_channel_fetch_async() and feed_channel_new_from_file()
diff --git a/src/feeds-publisher.c b/src/feeds-publisher.c
index 618f899..ecdcca7 100644
--- a/src/feeds-publisher.c
+++ b/src/feeds-publisher.c
@@ -31,14 +31,17 @@
/**
* SECTION: feeds-publisher
- * @short_description: PubSubHubbub publisher and hub
+ * @short_description: feed writer and PubSubHubbub publisher
*
- * #FeedsPublisher implements a server able to receive subscriptions by
- * PubSubHubbub clients and deliver them new contents.
- * For more information look at http://code.google.com/p/pubsubhubbub/
+ * #FeedsPublisher may be used to expose contents for any given #FeedChannel,
+ * both writing a file to be dispatched by the local webserver or providing
+ * himself to distribute it, and implements a server able to receive
+ * subscriptions by PubSubHubbub clients and deliver them new contents in
+ * real-time.
*/
static void subscribe_verify_cb (SoupSession *session, SoupMessage *msg, gpointer user_data);
+static void verify_delivery_cb (SoupSession *session, SoupMessage *msg, gpointer user_data);
struct _FeedsPublisherPrivate {
gboolean running;
@@ -57,6 +60,8 @@ struct _FeedsPublisherPrivate {
typedef struct {
FeedChannel *channel;
GList *subscribers;
+ GList *items_delivered;
+ guint resend_handler;
} ValidTopic;
typedef enum {
@@ -68,12 +73,14 @@ typedef struct {
FeedsPublisher *parent;
SUBSCRIBER_STATUS status;
gchar *topic;
+ ValidTopic *topic_struct;
gchar *callback;
gchar *challenge;
gint64 lease_interval;
time_t first_contact_time;
time_t registration_time;
SoupMessage *registration_msg;
+ gchar *to_be_resent;
} RemoteSubscriber;
enum {
@@ -112,6 +119,13 @@ destroy_topic (gpointer user_data)
g_list_free (topic->subscribers);
}
+ if (topic->items_delivered != NULL) {
+ for (iter = topic->items_delivered; iter; iter = iter->next)
+ g_object_unref (iter->data);
+
+ g_list_free (topic->items_delivered);
+ }
+
g_free (topic);
}
@@ -133,7 +147,7 @@ feeds_publisher_finalize (GObject *obj)
FeedsPublisher *pub;
pub = FEEDS_PUBLISHER (obj);
- feeds_publisher_switch (pub, FALSE);
+ feeds_publisher_hub_switch (pub, FALSE);
remove_current_topics (pub);
g_hash_table_destroy (pub->priv->topics);
@@ -196,6 +210,338 @@ feeds_publisher_new ()
return g_object_new (FEEDS_PUBLISHER_TYPE, NULL);
}
+static gchar*
+format_feed_text (FeedsPublisher *pub, FeedChannel *channel, GList *items)
+{
+ const gchar *str;
+ gchar *formatted;
+ time_t date;
+ GList *iter;
+ const GList *list;
+ GString *text;
+ FeedItem *item;
+
+ text = g_string_new ("<?xml version=\"1.0\" encoding=\"utf-8\"?>\n<feed xmlns=\"http://www.w3.org/2005/Atom\">\n");
+
+ str = feed_channel_get_title (channel);
+ if (str != NULL)
+ g_string_append_printf (text, "<title>%s</title>\n", str);
+
+ str = feed_channel_get_description (channel);
+ if (str != NULL)
+ g_string_append_printf (text, "<subtitle>%s</subtitle>\n", str);
+
+ str = feed_channel_get_homepage (channel);
+ if (str != NULL)
+ g_string_append_printf (text, "<link href=\"%s\" />\n", str);
+
+ str = feed_channel_get_copyright (channel);
+ if (str != NULL)
+ g_string_append_printf (text, "<author>%s</author>\n", str);
+
+ str = feed_channel_get_editor (channel);
+ if (str != NULL)
+ g_string_append_printf (text, "<rights>%s</rights>\n", str);
+
+ str = feed_channel_get_generator (channel);
+ if (str != NULL)
+ g_string_append_printf (text, "<generator>%s</generator>\n", str);
+
+ list = feed_channel_get_contributors (channel);
+ while (list != NULL) {
+ g_string_append_printf (text, "<contributor>%s</contributor>\n", (gchar*) list->data);
+ list = list->next;
+ }
+
+ date = feed_channel_get_update_time (channel);
+ formatted = date_to_ISO8601 (date);
+ g_string_append_printf (text, "<updated>%s</updated>\n", formatted);
+ g_free (formatted);
+
+ str = feed_channel_get_icon (channel);
+ if (str != NULL)
+ g_string_append_printf (text, "<icon>%s</icon>\n", str);
+
+ str = feed_channel_get_image (channel);
+ if (str != NULL)
+ g_string_append_printf (text, "<logo>%s</logo>\n", str);
+
+ for (iter = items; iter; iter = iter->next) {
+ item = iter->data;
+
+ g_string_append (text, "\t<entry>\n");
+
+ str = feed_item_get_title (item);
+ if (str != NULL)
+ g_string_append_printf (text, "\t<title>%s</title>\n", str);
+
+ str = feed_item_get_id (item);
+ if (str != NULL)
+ g_string_append_printf (text, "\t<id>%s</id>\n", str);
+
+ str = feed_item_get_source (item);
+ if (str != NULL)
+ g_string_append_printf (text, "<link href=\"%s\" />\n", str);
+
+ str = feed_item_get_description (item);
+ if (str != NULL)
+ g_string_append_printf (text, "\t<summary>%s</summary>\n", str);
+
+ str = feed_item_get_author (item);
+ if (str != NULL)
+ g_string_append_printf (text, "\t<author>%s</author>\n", str);
+
+ str = feed_item_get_copyright (item);
+ if (str != NULL)
+ g_string_append_printf (text, "\t<rights>%s</rights>\n", str);
+
+ list = feed_item_get_contributors (item);
+ while (list != NULL) {
+ g_string_append_printf (text, "\t<contributor>%s</contributor>\n", (gchar*) list->data);
+ list = list->next;
+ }
+
+ date = feed_item_get_publish_time (item);
+ formatted = date_to_ISO8601 (date);
+ g_string_append_printf (text, "<published>%s</published>\n", formatted);
+ g_free (formatted);
+
+ g_string_append (text, "\t</entry>\n");
+ }
+
+ g_string_append (text, "</feed>");
+ return g_string_free (text, FALSE);
+}
+
+static gboolean
+resend_deliver_to_subscribers (gpointer user_data)
+{
+ int found;
+ GList *iter;
+ SoupMessage *msg;
+ RemoteSubscriber *client;
+ ValidTopic *topic;
+
+ topic = user_data;
+ found = 0;
+
+ for (iter = topic->subscribers; iter; iter = iter->next) {
+ client = iter->data;
+
+ if (client->to_be_resent != NULL) {
+ found++;
+
+ msg = soup_message_new ("POST", client->callback);
+ soup_message_set_request (msg, "application/x-www-form-urlencoded", SOUP_MEMORY_STATIC, client->to_be_resent, strlen (client->to_be_resent));
+ soup_session_queue_message (client->parent->priv->soupsession, msg, verify_delivery_cb, client);
+ }
+ }
+
+ if (found == 0) {
+ topic->resend_handler = -1;
+ return FALSE;
+ }
+ else {
+ return TRUE;
+ }
+}
+
+static void
+verify_delivery_cb (SoupSession *session, SoupMessage *msg, gpointer user_data)
+{
+ guint status;
+ RemoteSubscriber *client;
+
+ client = user_data;
+ g_object_get (msg, "status-code", &status, NULL);
+
+ if (status < 200 || status > 299) {
+ if (client->to_be_resent == NULL)
+ client->to_be_resent = g_strdup (msg->request_body->data);
+
+ if (client->topic_struct->resend_handler == -1)
+ client->topic_struct->resend_handler = g_timeout_add_seconds (60 * VERIFICATION_INTERVAL_MINUTES,
+ resend_deliver_to_subscribers, client->topic_struct);
+ }
+ else {
+ if (client->to_be_resent != NULL) {
+ g_free (client->to_be_resent);
+ client->to_be_resent = NULL;
+ }
+ }
+}
+
+static void
+deliver_to_subscribers (FeedsPublisher *pub, FeedChannel *channel, GList *items)
+{
+ gboolean found;
+ gchar *text;
+ const gchar *ilink;
+ const gchar *olink;
+ GList *iiter;
+ GList *oiter;
+ GList *to_deliver;
+ SoupMessage *msg;
+ RemoteSubscriber *client;
+ ValidTopic *topic;
+
+ topic = g_hash_table_lookup (pub->priv->topics, feed_channel_get_source (channel));
+ if (topic == NULL || topic->subscribers == NULL)
+ return;
+
+ to_deliver = NULL;
+
+ for (oiter = items; oiter; oiter = oiter->next) {
+ olink = feed_item_get_source (oiter->data);
+ if (olink == NULL)
+ continue;
+
+ found = FALSE;
+
+ for (iiter = topic->items_delivered; iiter; iiter = iiter->next) {
+ ilink = feed_item_get_source (iiter->data);
+
+ if (strcmp (ilink, olink) == 0) {
+ found = TRUE;
+ break;
+ }
+ }
+
+ if (found == FALSE)
+ to_deliver = g_list_prepend (to_deliver, oiter->data);
+ }
+
+ if (to_deliver != NULL) {
+ if (topic->resend_handler != -1) {
+ g_source_remove (topic->resend_handler);
+ topic->resend_handler = -1;
+ }
+
+ to_deliver = g_list_reverse (to_deliver);
+ text = format_feed_text (pub, channel, to_deliver);
+
+ for (oiter = topic->subscribers; oiter; oiter = oiter->next) {
+ client = oiter->data;
+
+ if (client->to_be_resent != NULL) {
+ g_free (client->to_be_resent);
+ client->to_be_resent = NULL;
+ }
+
+ msg = soup_message_new ("POST", client->callback);
+ soup_message_set_request (msg, "application/x-www-form-urlencoded", SOUP_MEMORY_TAKE, g_strdup (text), strlen (text));
+ soup_session_queue_message (client->parent->priv->soupsession, msg, verify_delivery_cb, client);
+ }
+
+ topic->items_delivered = g_list_concat (to_deliver, topic->items_delivered);
+ }
+}
+
+static void
+feed_required_by_web_cb (SoupServer *server, SoupMessage *msg, const char *path,
+ GHashTable *query, SoupClientContext *context, gpointer user_data)
+{
+ gchar *uri;
+ gchar *text;
+ gint64 size;
+ gsize read;
+ GError *error;
+ GFileInfo *info;
+ GFileInputStream *stream;
+
+ error = NULL;
+ stream = g_file_read (user_data, NULL, &error);
+
+ if (stream == NULL) {
+ uri = g_file_get_uri (user_data);
+ g_warning ("Unable to open required feed in %s: %s.", uri, error->message);
+ g_free (uri);
+ g_error_free (error);
+ soup_message_set_status (msg, 404);
+ return;
+ }
+
+ info = g_file_input_stream_query_info (stream, G_FILE_ATTRIBUTE_STANDARD_SIZE, NULL, NULL);
+ size = g_file_info_get_attribute_uint64 (info, G_FILE_ATTRIBUTE_STANDARD_SIZE);
+ text = g_new0 (gchar, size);
+ g_input_stream_read_all (G_INPUT_STREAM (stream), text, size, &read, NULL, NULL);
+
+ soup_message_set_response (msg, "application/atom+xml", SOUP_MEMORY_TAKE, text, read);
+ soup_message_set_status (msg, 200);
+
+ g_object_unref (info);
+ g_object_unref (stream);
+}
+
+/**
+ * feeds_publisher_publish:
+ * @pub: a #FeedsPublisher
+ * @channel: the #FeedChannel to dump in the file
+ * @items: list of #FeedItems to be added in the feed
+ * @id: name used in the external URL of the feed
+ *
+ * If the local web server has been executed (with
+ * feeds_publisher_hub_switch()) this function exposes the given @channel as
+ * an Atom formatted file avalable to http://[LOCAL_IP:DEFINED_PORT]/@id
+ */
+void
+feeds_publisher_publish (FeedsPublisher *pub, FeedChannel *channel, GList *items, const gchar *id)
+{
+ gchar *path;
+ GFile *file;
+
+ if (pub->priv->server == NULL) {
+ g_warning ("Local web server is not running, unable to expose required contents");
+ return;
+ }
+
+ soup_server_remove_handler (pub->priv->server, id);
+
+ path = g_strdup_printf ("file://%s/libgrss/activefeeds/%s", g_get_tmp_dir (), id);
+
+ /*
+ PubSubHubbub notifies are already delivered by feeds_publisher_publish_file()
+ */
+ feeds_publisher_publish_file (pub, channel, items, (const gchar*) path);
+
+ file = g_file_new_for_uri (path);
+ soup_server_add_handler (pub->priv->server, id, feed_required_by_web_cb, file, g_object_unref);
+
+ g_free (path);
+}
+
+/**
+ * feeds_publisher_publish_file:
+ * @pub: a #FeedsPublisher
+ * @channel: the #FeedChannel to dump in the file
+ * @items: list of #FeedItems to be added in the feed
+ * @path: path of the file to write
+ *
+ * Dump the given @channel in an Atom formatted file in @path. If the local
+ * PubSubHubbub hub has been activated (with feeds_publisher_hub_switch())
+ * notifies remote subscribers about the new items which has been added since
+ * previous invocation of this function for the same #FeedChannel
+ */
+void
+feeds_publisher_publish_file (FeedsPublisher *pub, FeedChannel *channel, GList *items, const gchar *uri)
+{
+ gchar *text;
+ GFile *file;
+ GFileOutputStream *stream;
+
+ file = g_file_new_for_uri (uri);
+ text = format_feed_text (pub, channel, items);
+ stream = g_file_append_to (file, G_FILE_CREATE_NONE, NULL, NULL);
+ g_output_stream_write_all (G_OUTPUT_STREAM (stream), text, strlen (text), NULL, NULL, NULL);
+
+ if (pub->priv->server != NULL)
+ deliver_to_subscribers (pub, channel, items);
+
+ g_free (text);
+ g_object_unref (stream);
+ g_object_unref (file);
+}
+
static void
add_client_to_topic (FeedsPublisher *pub, RemoteSubscriber *client)
{
@@ -389,6 +735,7 @@ handle_incoming_requests_cb (SoupServer *server, SoupMessage *msg, const char *p
SoupMessage *verify_msg;
FeedsPublisher *pub;
RemoteSubscriber *client;
+ ValidTopic *topic_struct;
pub = user_data;
@@ -427,21 +774,26 @@ handle_incoming_requests_cb (SoupServer *server, SoupMessage *msg, const char *p
}
else {
if (strcmp (mode, "subscribe") == 0) {
- /*
- TODO What to do if an invalid topic is required?
- */
- client = g_new0 (RemoteSubscriber, 1);
- client->parent = pub;
- client->first_contact_time = time (NULL);
- client->topic = g_strdup (topic);
- client->callback = g_strdup (callback);
-
- if (lease != NULL)
- client->lease_interval = strtoll (lease, NULL, 10);
- else
- client->lease_interval = DEFAULT_LEASE_INTERVAL;
+ topic_struct = g_hash_table_lookup (pub->priv->topics, topic);
- client->status = REMOTE_SUBSCRIBING;
+ if (topic_struct == NULL) {
+ soup_message_set_status (msg, 404);
+ }
+ else {
+ client = g_new0 (RemoteSubscriber, 1);
+ client->parent = pub;
+ client->first_contact_time = time (NULL);
+ client->topic = g_strdup (topic);
+ client->topic_struct = topic_struct;
+ client->callback = g_strdup (callback);
+
+ if (lease != NULL)
+ client->lease_interval = strtoll (lease, NULL, 10);
+ else
+ client->lease_interval = DEFAULT_LEASE_INTERVAL;
+
+ client->status = REMOTE_SUBSCRIBING;
+ }
}
else if (strcmp (mode, "unsubscribe") == 0) {
client = search_subscriber_by_topic_and_callback (pub, topic, callback);
@@ -479,37 +831,15 @@ close_server (FeedsPublisher *pub)
}
}
-static GInetAddress*
-my_detect_internet_address (FeedsPublisher *pub)
-{
- if (pub->priv->local_addr == NULL)
- pub->priv->local_addr = detect_internet_address ();
-
- return pub->priv->local_addr;
-}
-
static void
create_and_run_server (FeedsPublisher *pub)
{
- gchar *ip;
- struct sockaddr_in low_addr;
SoupAddress *soup_addr;
- GInetAddress *my_addr;
-
- my_addr = my_detect_internet_address (pub);
- if (my_addr == NULL)
- return;
if (pub->priv->soupsession == NULL)
pub->priv->soupsession = soup_session_async_new ();
- low_addr.sin_family = AF_INET;
- low_addr.sin_port = htons (pub->priv->port);
- ip = g_inet_address_to_string (my_addr);
- inet_pton (AF_INET, ip, &low_addr.sin_addr);
- g_free (ip);
-
- soup_addr = soup_address_new_from_sockaddr ((struct sockaddr*) &low_addr, sizeof (low_addr));
+ soup_addr = soup_address_new_any (SOUP_ADDRESS_FAMILY_IPV4, pub->priv->port);
pub->priv->server = soup_server_new ("port", pub->priv->port, "interface", soup_addr, NULL);
g_object_unref (soup_addr);
@@ -518,39 +848,41 @@ create_and_run_server (FeedsPublisher *pub)
}
/**
- * feeds_publisher_set_port:
+ * feeds_publisher_hub_set_port:
* @pub: a #FeedsPublisher
* @port: new listening port for the server
*
- * To customize the port opened by the local server to catch incoming
- * subscriptions. By default this is 80. Changing the port while the
- * subscriber is running imply restart the local server
+ * To customize the port opened by the local server to deliver feeds and
+ * catch incoming subscriptions. By default this is 80. Changing the port
+ * while the hub is running imply restart the local server
*/
void
-feeds_publisher_set_port (FeedsPublisher *pub, int port)
+feeds_publisher_hub_set_port (FeedsPublisher *pub, int port)
{
if (port != pub->priv->port) {
pub->priv->port = port;
if (pub->priv->running == TRUE) {
- feeds_publisher_switch (pub, FALSE);
- feeds_publisher_switch (pub, TRUE);
+ feeds_publisher_hub_switch (pub, FALSE);
+ feeds_publisher_hub_switch (pub, TRUE);
}
}
}
/**
- * feeds_publisher_set_topics:
+ * feeds_publisher_hub_set_topics:
* @pub: a #FeedsPublisher
* @topics: a list of #FeedChannels
*
* To define a list of valid "topics" for which the #FeedsPublisher will
* deliver contents. Sources of those channels, as retrieved by
- * feed_channel_get_source(), would be found as hub.topic parameter in
- * registration requests from remote subscribers
+ * feed_channel_get_source(), are accepted as "hub.topic" parameter in
+ * PubSubHubbub registration requests from remote subscribers.
+ * Pay attention to the fact subscriptions requests for different topic are
+ * now rejected
*/
void
-feeds_publisher_set_topics (FeedsPublisher *pub, GList *topics)
+feeds_publisher_hub_set_topics (FeedsPublisher *pub, GList *topics)
{
GList *iter;
ValidTopic *topic;
@@ -560,6 +892,7 @@ feeds_publisher_set_topics (FeedsPublisher *pub, GList *topics)
for (iter = topics; iter; iter = iter->next) {
topic = g_new0 (ValidTopic, 1);
topic->channel = g_object_ref (iter->data);
+ topic->resend_handler = -1;
g_hash_table_insert (pub->priv->topics, (gpointer) feed_channel_get_source (iter->data), topic);
}
}
@@ -611,14 +944,14 @@ remove_refresh_handler (FeedsPublisher *pub)
}
/**
- * feeds_publisher_switch:
+ * feeds_publisher_hub_switch:
* @pub: a #FeedsPublisher
- * @run: TRUE to run the subscriber, FALSE to pause it
+ * @run: TRUE to run the local server, FALSE to stop it
*
- * Permits to pause or resume @pub listening for events
+ * Permits to start and stop the webserver implemented by this object
*/
void
-feeds_publisher_switch (FeedsPublisher *pub, gboolean run)
+feeds_publisher_hub_switch (FeedsPublisher *pub, gboolean run)
{
if (pub->priv->running != run) {
pub->priv->running = run;
@@ -633,19 +966,3 @@ feeds_publisher_switch (FeedsPublisher *pub, gboolean run)
}
}
}
-
-/**
- * feeds_publisher_publish:
- * @pub: a #FeedsPublisher
- * @item: item to publish
- *
- * Deliver the specified @item to all subscribers registered for the parent's
- * channel (defined as "topic")
- */
-void
-feeds_publisher_publish (FeedsPublisher *pub, FeedItem *item)
-{
- /*
- TODO
- */
-}
diff --git a/src/feeds-publisher.h b/src/feeds-publisher.h
index a21134e..001896d 100644
--- a/src/feeds-publisher.h
+++ b/src/feeds-publisher.h
@@ -49,9 +49,11 @@ GType feeds_publisher_get_type () G_GNUC_CONST;
FeedsPublisher* feeds_publisher_new ();
-void feeds_publisher_set_port (FeedsPublisher *pub, int port);
-void feeds_publisher_set_topics (FeedsPublisher *pub, GList *topics);
-void feeds_publisher_switch (FeedsPublisher *pub, gboolean run);
-void feeds_publisher_publish (FeedsPublisher *pub, FeedItem *item);
+void feeds_publisher_publish (FeedsPublisher *pub, FeedChannel *channel, GList *items, const gchar *id);
+void feeds_publisher_publish_file (FeedsPublisher *pub, FeedChannel *channel, GList *items, const gchar *uri);
+
+void feeds_publisher_hub_set_port (FeedsPublisher *pub, int port);
+void feeds_publisher_hub_set_topics (FeedsPublisher *pub, GList *topics);
+void feeds_publisher_hub_switch (FeedsPublisher *pub, gboolean run);
#endif /* __FEEDS_PUBLISHER_H__ */
diff --git a/src/utils.c b/src/utils.c
index 35a5fa8..a2f5bb2 100644
--- a/src/utils.c
+++ b/src/utils.c
@@ -547,6 +547,17 @@ date_parse_ISO8601 (const gchar *date)
return 0;
}
+gchar*
+date_to_ISO8601 (time_t date)
+{
+ gchar text [100];
+ struct tm broken;
+
+ localtime_r (&date, &broken);
+ strftime (text, 100, "%t%Y-%m-%dT%H:%M%t", &broken);
+ return g_strdup (text);
+}
+
/*
Inspired by:
GNet - Networking library
diff --git a/src/utils.h b/src/utils.h
index 3d6ba2d..b7d1178 100644
--- a/src/utils.h
+++ b/src/utils.h
@@ -31,6 +31,7 @@
#include <locale.h>
#include <ctype.h>
#include <unistd.h>
+#include <errno.h>
#include "libgrss.h"
@@ -48,6 +49,7 @@ xmlDocPtr file_to_xml (const gchar *path);
time_t date_parse_RFC822 (const gchar *date);
time_t date_parse_ISO8601 (const gchar *date);
+gchar* date_to_ISO8601 (time_t date);
GInetAddress* detect_internet_address ();
gboolean address_seems_public (GInetAddress *addr);
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]