r7467 - dumbhippo/trunk/firehose/firehose/jobs



Author: walters
Date: 2008-05-01 17:31:11 -0500 (Thu, 01 May 2008)
New Revision: 7467

Modified:
   dumbhippo/trunk/firehose/firehose/jobs/poller.py
Log:
Processors can't be singletons and have state, duh


Modified: dumbhippo/trunk/firehose/firehose/jobs/poller.py
===================================================================
--- dumbhippo/trunk/firehose/firehose/jobs/poller.py	2008-05-01 21:08:06 UTC (rev 7466)
+++ dumbhippo/trunk/firehose/firehose/jobs/poller.py	2008-05-01 22:31:11 UTC (rev 7467)
@@ -40,22 +40,15 @@
         raise NotImplementedError() 
     
 class FeedPostProcessor(object):
-    def __init__(self):
-        self._data = StringIO()
-    
-    def feed(self, data):
-        self._data.write(data)
-        
-    def get_value(self):
-        return self._data.getvalue()
+    def process(self, data):
+        return data
 
 class XmlElementEater(FeedPostProcessor):
     def __init__(self, deletepaths=[]):
         super(XmlElementEater, self).__init__()
         self.__deletepaths = deletepaths
         
-    def get_value(self):
-        data = super(XmlElementEater, self).get_value()
+    def process(self, data):
         import lxml.etree
         tree = lxml.etree.parse(StringIO(data))
         root = tree.getroot()
@@ -70,10 +63,11 @@
     
 class RegexpEater(FeedPostProcessor):
     def __init__(self, regexps):
+        super(RegexpEater, self).__init__()
         self.__regexps = map(re.compile, regexps)
         
-    def get_value(self):
-        value = StringIO(super(RegexpEater, self).get_value())
+    def process(self, data):
+        value = StringIO(data)
         outvalue = StringIO()
         for line in value:
             for regexp in self.__regexps:
@@ -89,16 +83,11 @@
             processors = [FeedPostProcessor()]
         self._processors = processors
         
-    def feed(self, data):
-        self._processors[0].feed(data)
-        
-    def get_value(self):
-        buf = self._processors[0].get_value()
-        for processor in self._processors[1:]:
-            processor.feed(buf)
-            buf = processor.get_value()
-        return buf
-    
+    def process(self, data):
+        for processor in self._processors:
+            data = processor.process(data)
+        return data
+
 # Define a shared eater for rss which has a lastBuildDate
 rss_eater = XmlElementEater(['/rss/channel/lastBuildDate', '/rss/channel/pubDate'])
 # This maps from a regular expression matching a URL to a list of processors
@@ -148,14 +137,17 @@
                 outpath_tmpname = None
                 outfile = None
             processor = ChainedProcessors(transformlist)
+            data = StringIO()
             buf = response.read(8192)
             while buf:
                 if outfile is not None:
                     outfile.write(buf)
-                processor.feed(buf)
+                data.write(buf)
                 buf = response.read(8192)
+            datavalue = data.getvalue()
+            processed = processor.process(datavalue)
             hash = sha.new()
-            hash.update(processor.get_value())
+            hash.update(processed)
             hash_hex = hash.hexdigest()
             if outfile is not None:
                 outfile.close()
@@ -256,7 +248,7 @@
         try:
             (new_hash, new_timestamp) = inst.run(tid, prev_hash, prev_timestamp, **kwargs)            
         except Exception, e:
-            _logger.error("Failed task id %r: %s", tid, e)
+            _logger.exception("Failed task id %r: %s", tid, e)
             (new_hash, new_timestamp) = (None, None)
         if new_hash is not None:
             resultqueue.put((taskid, new_hash, new_timestamp))
@@ -311,9 +303,7 @@
     '''
     transformers = get_transformations('http://digg.com/users/jdhore/history/diggs.rss')
     processor = ChainedProcessors(transformers)
-    processor.feed(testdata)
-    print processor.get_value()
+    print processor.process(testdata)
     processor = ChainedProcessors([])
-    processor.feed(testdata)
-    print processor.get_value()
+    print processor.process(testdata)
     
\ No newline at end of file



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]