r7464 - dumbhippo/trunk/firehose/firehose/jobs



Author: walters
Date: 2008-05-01 15:10:26 -0500 (Thu, 01 May 2008)
New Revision: 7464

Modified:
   dumbhippo/trunk/firehose/firehose/jobs/poller.py
Log:
Support for post-processing feed data

Add an initial set of heuristics for stripping elements from Digg
RSS feeds which change in a non-interesting way.



Modified: dumbhippo/trunk/firehose/firehose/jobs/poller.py
===================================================================
--- dumbhippo/trunk/firehose/firehose/jobs/poller.py	2008-04-30 21:57:15 UTC (rev 7463)
+++ dumbhippo/trunk/firehose/firehose/jobs/poller.py	2008-05-01 20:10:26 UTC (rev 7464)
@@ -38,12 +38,72 @@
         """Receives a task id, result SHA1, and result timestamp integer.
 Should compute a new result (newhash, newtimestamp)"""
         raise NotImplementedError() 
+    
+class FeedPostProcessor(object):
+    def __init__(self):
+        self._data = StringIO()
+    
+    def feed(self, data):
+        self._data.write(data)
+        
+    def get_value(self):
+        return self._data.getvalue()
 
+class XmlElementEater(FeedPostProcessor):
+    def __init__(self, deletepaths=[]):
+        super(XmlElementEater, self).__init__()
+        self.__deletepaths = deletepaths
+        
+    def get_value(self):
+        data = super(XmlElementEater, self).get_value()
+        import lxml.etree
+        tree = lxml.etree.parse(StringIO(data))
+        root = tree.getroot()
+        for path in self.__deletepaths:
+            node = root.xpath(path)
+            if not node:
+                continue
+            node = node[0]
+            parent = node.getparent()
+            parent.remove(node)
+        return lxml.etree.tostring(tree, pretty_print=True)
+    
+class ChainedProcessors(object):
+    def __init__(self, processors):
+        super(ChainedProcessors, self).__init__()
+        if len(processors) == 0:
+            processors = [FeedPostProcessor()]
+        self._processors = processors
+        
+    def feed(self, data):
+        self._processors[0].feed(data)
+        
+    def get_value(self):
+        buf = self._processors[0].get_value()
+        for processor in self._processors[1:]:
+            processor.feed(buf)
+            buf = processor.get_value()
+        return buf
+    
+# This maps from a regular expression matching a URL to a list of processors
+feed_transformations = [
+  (r'digg.com/users/.*/history/diggs.rss', [XmlElementEater(['/rss/channel/lastBuildDate', '/rss/channel/pubDate'])]),
+]
+feed_transformations = [(re.compile(r'^https?://([A-Z0-9]+\.)*' + x[0]), x[1]) for x in feed_transformations]
+
+def get_transformations(url):
+    transformers = []
+    for (matcher, decepticons) in feed_transformations:
+         if matcher.search(url):
+             transformers.extend(decepticons)
+    return transformers
+
 class FeedTaskHandler(object):
     FAMILY = 'FEED'
-    
+
     def run(self, id, prev_hash, prev_timestamp, outpath=None):
         targeturl = id
+        transformlist = get_transformations(targeturl)
         parsedurl = urlparse.urlparse(targeturl)
         try:
             _logger.info('Connecting to %r', targeturl)
@@ -68,13 +128,15 @@
             else:
                 outpath_tmpname = None
                 outfile = None
-            hash = sha.new()
+            processor = ChainedProcessors(transformlist)
             buf = response.read(8192)
             while buf:
-                hash.update(buf)
                 if outfile is not None:
                     outfile.write(buf)
+                processor.feed(buf)
                 buf = response.read(8192)
+            hash = sha.new()
+            hash.feed(processor.get_value())
             hash_hex = hash.hexdigest()
             if outfile is not None:
                 outfile.close()
@@ -191,3 +253,45 @@
             thread.start()
         collector = threading.Thread(target=self.__run_collect_tasks, args=(taskcount,resultqueue,masterhost,serial))
         collector.start()
+
+if __name__ == '__main__':
+    testdata = '''<?xml version="1.0" encoding="UTF-8"?>
+<rss version="2.0">
+    <channel>
+        <title>digg / jdhore1 / history / diggs</title>
+        <link>http://digg.com/users/jdhore1/history/diggs</link>
+        <description>A history of jdhore1's diggs</description>
+        <language>en-us</language>
+        <pubDate>Wed, 30 Apr 2008 16:42:42 UTC</pubDate>
+        <lastBuildDate>Wed, 30 Apr 2008 16:42:42 UTC</lastBuildDate>
+        <generator>Digg.com</generator>
+        <item>
+            <title>Hans Reiser was convicted Monday of first degree murder in t</title>
+            <link>http://digg.com/linux_unix/Hans_Reiser_was_convicted_Monday_of_first_degree_murder_in_t</link>
+            <description><![CDATA[
+                 A jury has found an Oakland software programmer guilty in the death of his estranged wife.        
+    ]]></description>
+            <pubDate>Mon, 28 Apr 2008 23:41:43 UTC</pubDate>
+            <author>jdhore1</author>
+            <guid>http://digg.com/linux_unix/Hans_Reiser_was_convicted_Monday_of_first_degree_murder_in_t</guid>
+        </item>
+        <item>
+            <title>The Democrats Have a Nominee: It's Obama!</title>
+            <link>http://digg.com/political_opinion/The_Democrats_Have_a_Nominee_It_s_Obama</link>
+            <description><![CDATA[
+                Other than ensuring the Greatest Show on Earth will continue, does it matter that Hillary Clinton d
+efeated Barack Obama Tuesday in Pennsylvania by nine-plus points? Barack Obama is the nominee.
+
+            ]]></description>
+            <pubDate>Fri, 25 Apr 2008 21:23:01 UTC</pubDate>
+            <author>jdhore1</author>
+            <guid>http://digg.com/political_opinion/The_Democrats_Have_a_Nominee_It_s_Obama</guid>
+        </item>
+    </channel>
+</rss>
+    '''
+    transformers = get_transformations('http://digg.com/users/jdhore/history/diggs.rss')
+    processor = ChainedProcessors(transformers)
+    processor.feed(testdata)
+    print processor.get_value()
+    
\ No newline at end of file



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]