r7478 - dumbhippo/trunk/firehose/firehose/jobs



Author: walters
Date: 2008-05-12 14:31:27 -0500 (Mon, 12 May 2008)
New Revision: 7478

Modified:
   dumbhippo/trunk/firehose/firehose/jobs/master.py
   dumbhippo/trunk/firehose/firehose/jobs/poller.py
Log:
Switch to saving feed caches in S3.

This way we avoid running out of disk space.


Modified: dumbhippo/trunk/firehose/firehose/jobs/master.py
===================================================================
--- dumbhippo/trunk/firehose/firehose/jobs/master.py	2008-05-12 17:55:08 UTC (rev 7477)
+++ dumbhippo/trunk/firehose/firehose/jobs/master.py	2008-05-12 19:31:27 UTC (rev 7478)
@@ -94,16 +94,6 @@
         
         import socket
         self.__hostport = '%s:%s' % (socket.gethostname(), config.get('server.socket_port'))
-        
-        # Default to one slave on localhost
-        self.__worker_endpoints = ['localhost:%d' % (int(config.get('firehose.localslaveport')),)]
-        _logger.debug("worker endpoints are %r", self.__worker_endpoints)
-        for bind in self.__worker_endpoints:
-            (host,port) = bind.split(':')
-            if host == 'localhost':
-                _logger.debug("found localhost worker, starting it")
-                self.__local_handler = TaskPoller.get()
-                self.__local_handler.run_async()
                 
         aws_accessid = config.get('firehose.awsAccessKeyId')
         if aws_accessid is None:
@@ -114,7 +104,20 @@
         self.__s3_conn = S3Connection(aws_accessid, aws_secretkey)
         
         self.__sqs_incoming_q = self.__sqs_conn.create_queue(config.get('firehose.awsSqsIncomingName'))
-        self.__sqs_outgoing_q = self.__sqs_conn.create_queue(config.get('firehose.awsSqsOutgoingName'))                
+        self.__sqs_outgoing_q = self.__sqs_conn.create_queue(config.get('firehose.awsSqsOutgoingName'))
+        
+        mainbucket_name = config.get('firehose.awsS3Bucket')   
+        
+        # Default to one slave on localhost
+        self.__worker_endpoints = ['localhost:%d' % (int(config.get('firehose.localslaveport')),)]
+        _logger.debug("worker endpoints are %r", self.__worker_endpoints)
+        for bind in self.__worker_endpoints:
+            (host,port) = bind.split(':')
+            if host == 'localhost':
+                _logger.debug("found localhost worker, starting it")
+                self.__local_handler = TaskPoller.get()
+                self.__local_handler.run_async()        
+                        
         poll_sqs_thread = threading.Thread(target=self.__poll_sqs)
         poll_sqs_thread.setDaemon(True)     
         
@@ -126,15 +129,14 @@
                                                             prev_hash TEXT,
                                                             prev_time DATETIME)''')
         cursor.execute('''CREATE INDEX IF NOT EXISTS TasksIdx on Tasks (key)''')
+
+        tasklist_key = config.get('firehose.awsS3Key') 
+        _logger.debug("retrieving current task set from bucket: %r  key: %r", mainbucket_name, tasklist_key)
+        bucket = self.__s3_conn.get_bucket(mainbucket_name)
         
-        bname = config.get('firehose.awsS3Bucket')
-        kname = config.get('firehose.awsS3Key')
-        _logger.debug("retrieving current task set from bucket: %r  key: %r", bname, kname)
-        bucket = self.__s3_conn.get_bucket(bname)
-        
         # FIXME should stream this
         current_task_keys = {}        
-        key = bucket.get_key(kname)
+        key = bucket.get_key(tasklist_key)
         if key is not None:
             contents = key.get_contents_as_string()
             if contents is not None:

Modified: dumbhippo/trunk/firehose/firehose/jobs/poller.py
===================================================================
--- dumbhippo/trunk/firehose/firehose/jobs/poller.py	2008-05-12 17:55:08 UTC (rev 7477)
+++ dumbhippo/trunk/firehose/firehose/jobs/poller.py	2008-05-12 19:31:27 UTC (rev 7478)
@@ -2,6 +2,7 @@
 
 import os,sys,re,heapq,time,Queue,sha,threading
 import BaseHTTPServer,httplib,urlparse,urllib
+import tempfile
 from email.Utils import formatdate,parsedate_tz,mktime_tz
 from BeautifulSoup import BeautifulSoup,Comment
 import logging
@@ -143,8 +144,11 @@
 
 class FeedTaskHandler(object):
     FAMILY = 'FEED'
+    
+    def __init__(self, feedcache_bucket=None):
+        self.__feedcache_bucket = feedcache_bucket
 
-    def run(self, id, prev_hash, prev_timestamp, cachedir=None):
+    def run(self, id, prev_hash, prev_timestamp):
         targeturl = id
         transformlist = get_transformations(targeturl)
         parsedurl = urlparse.urlparse(targeturl)
@@ -165,14 +169,11 @@
             if response.status == 304:
                 _logger.info("Got 304 Unmodified for %r", targeturl)
                 return (prev_hash, prev_timestamp)
-            if cachedir is not None:
-                quotedname = urllib.quote_plus(targeturl)
-                ts = int(time.time())
-                outpath = os.path.join(cachedir, quotedname + '.' + unicode(ts))
-                outpath_tmpname = outpath + '.tmp'
-                outfile = open(outpath_tmpname, 'w')
+            if self.__feedcache_bucket is not None:
+                (tempfd, temppath) = tempfile.mkstemp()
+                outfile = os.fdopen(tempfd, 'w')
             else:
-                outpath_tmpname = None
+                (tempfd, temppath) = (None, None)
                 outfile = None
             rawhash = sha.new()
             data = StringIO()
@@ -192,10 +193,14 @@
             hash_hex = hash.hexdigest()
             if outfile is not None:
                 outfile.close()
-                if prev_hash != hash_hex:                
-                    os.rename(outpath_tmpname, outpath)
+                if prev_hash != hash_hex:
+                    k = Key(self.__feedcache_bucket)
+                    ts = int(time.time())                    
+                    k.key = targeturl + ('.%d' % (ts,))
+                    _logger.debug("storing to bucket %s key %s", self.__feedcache_bucket.name, k.key)      
+                    k.set_contents_from_filename(temppath)
                 else:
-                    os.unlink(outpath_tmpname)
+                    os.unlink(temppath)
             timestamp_str = response.getheader('Last-Modified', None)
             if timestamp_str is not None:
                 timestamp = mktime_tz(parsedate_tz(timestamp_str))
@@ -243,7 +248,13 @@
         self.__savefetches = config.get('firehose.savefetches') == "true"
         self.__server = BaseHTTPServer.HTTPServer(('', bindport), TaskRequestHandler)
         self.__active_collectors = set()
+        aws_accessid = config.get('firehose.awsAccessKeyId')
+        aws_secretkey = config.get('firehose.awsSecretAccessKey')       
+        self.__s3_conn = S3Connection(aws_accessid, aws_secretkey)
         
+        bname = config.get('firehose.awsS3Bucket')
+        self.__feedcache_bucket = self.__s3_conn.get_bucket('feedcache.' + bname)              
+        
     def run_async(self):
         thr = threading.Thread(target=self.run)
         thr.setDaemon(True)
@@ -283,11 +294,12 @@
         except KeyError, e:
             _logger.exception("Failed to find family for task %r", taskid)
             return
-        inst = fclass()
-        kwargs = {}
         if self.__savefetches:
-            outpath = os.path.join(os.getcwd(), 'data', 'feedcache')
-            kwargs['cachedir'] = outpath       
+            inst_kwargs = {'feedcache_bucket': self.__feedcache_bucket}
+        else:
+            inst_kwargs = {}
+        inst = fclass(**inst_kwargs)
+        kwargs = {}     
         try:
             (new_hash, new_timestamp) = inst.run(tid, prev_hash, prev_timestamp, **kwargs)            
         except Exception, e:



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]