r7478 - dumbhippo/trunk/firehose/firehose/jobs
- From: commits mugshot org
- To: online-desktop-list gnome org
- Subject: r7478 - dumbhippo/trunk/firehose/firehose/jobs
- Date: Mon, 12 May 2008 14:31:27 -0500 (CDT)
Author: walters
Date: 2008-05-12 14:31:27 -0500 (Mon, 12 May 2008)
New Revision: 7478
Modified:
dumbhippo/trunk/firehose/firehose/jobs/master.py
dumbhippo/trunk/firehose/firehose/jobs/poller.py
Log:
Switch to saving feed caches in S3.
This way we avoid running out of disk space.
Modified: dumbhippo/trunk/firehose/firehose/jobs/master.py
===================================================================
--- dumbhippo/trunk/firehose/firehose/jobs/master.py 2008-05-12 17:55:08 UTC (rev 7477)
+++ dumbhippo/trunk/firehose/firehose/jobs/master.py 2008-05-12 19:31:27 UTC (rev 7478)
@@ -94,16 +94,6 @@
import socket
self.__hostport = '%s:%s' % (socket.gethostname(), config.get('server.socket_port'))
-
- # Default to one slave on localhost
- self.__worker_endpoints = ['localhost:%d' % (int(config.get('firehose.localslaveport')),)]
- _logger.debug("worker endpoints are %r", self.__worker_endpoints)
- for bind in self.__worker_endpoints:
- (host,port) = bind.split(':')
- if host == 'localhost':
- _logger.debug("found localhost worker, starting it")
- self.__local_handler = TaskPoller.get()
- self.__local_handler.run_async()
aws_accessid = config.get('firehose.awsAccessKeyId')
if aws_accessid is None:
@@ -114,7 +104,20 @@
self.__s3_conn = S3Connection(aws_accessid, aws_secretkey)
self.__sqs_incoming_q = self.__sqs_conn.create_queue(config.get('firehose.awsSqsIncomingName'))
- self.__sqs_outgoing_q = self.__sqs_conn.create_queue(config.get('firehose.awsSqsOutgoingName'))
+ self.__sqs_outgoing_q = self.__sqs_conn.create_queue(config.get('firehose.awsSqsOutgoingName'))
+
+ mainbucket_name = config.get('firehose.awsS3Bucket')
+
+ # Default to one slave on localhost
+ self.__worker_endpoints = ['localhost:%d' % (int(config.get('firehose.localslaveport')),)]
+ _logger.debug("worker endpoints are %r", self.__worker_endpoints)
+ for bind in self.__worker_endpoints:
+ (host,port) = bind.split(':')
+ if host == 'localhost':
+ _logger.debug("found localhost worker, starting it")
+ self.__local_handler = TaskPoller.get()
+ self.__local_handler.run_async()
+
poll_sqs_thread = threading.Thread(target=self.__poll_sqs)
poll_sqs_thread.setDaemon(True)
@@ -126,15 +129,14 @@
prev_hash TEXT,
prev_time DATETIME)''')
cursor.execute('''CREATE INDEX IF NOT EXISTS TasksIdx on Tasks (key)''')
+
+ tasklist_key = config.get('firehose.awsS3Key')
+ _logger.debug("retrieving current task set from bucket: %r key: %r", mainbucket_name, tasklist_key)
+ bucket = self.__s3_conn.get_bucket(mainbucket_name)
- bname = config.get('firehose.awsS3Bucket')
- kname = config.get('firehose.awsS3Key')
- _logger.debug("retrieving current task set from bucket: %r key: %r", bname, kname)
- bucket = self.__s3_conn.get_bucket(bname)
-
# FIXME should stream this
current_task_keys = {}
- key = bucket.get_key(kname)
+ key = bucket.get_key(tasklist_key)
if key is not None:
contents = key.get_contents_as_string()
if contents is not None:
Modified: dumbhippo/trunk/firehose/firehose/jobs/poller.py
===================================================================
--- dumbhippo/trunk/firehose/firehose/jobs/poller.py 2008-05-12 17:55:08 UTC (rev 7477)
+++ dumbhippo/trunk/firehose/firehose/jobs/poller.py 2008-05-12 19:31:27 UTC (rev 7478)
@@ -2,6 +2,7 @@
import os,sys,re,heapq,time,Queue,sha,threading
import BaseHTTPServer,httplib,urlparse,urllib
+import tempfile
from email.Utils import formatdate,parsedate_tz,mktime_tz
from BeautifulSoup import BeautifulSoup,Comment
import logging
@@ -143,8 +144,11 @@
class FeedTaskHandler(object):
FAMILY = 'FEED'
+
+ def __init__(self, feedcache_bucket=None):
+ self.__feedcache_bucket = feedcache_bucket
- def run(self, id, prev_hash, prev_timestamp, cachedir=None):
+ def run(self, id, prev_hash, prev_timestamp):
targeturl = id
transformlist = get_transformations(targeturl)
parsedurl = urlparse.urlparse(targeturl)
@@ -165,14 +169,11 @@
if response.status == 304:
_logger.info("Got 304 Unmodified for %r", targeturl)
return (prev_hash, prev_timestamp)
- if cachedir is not None:
- quotedname = urllib.quote_plus(targeturl)
- ts = int(time.time())
- outpath = os.path.join(cachedir, quotedname + '.' + unicode(ts))
- outpath_tmpname = outpath + '.tmp'
- outfile = open(outpath_tmpname, 'w')
+ if self.__feedcache_bucket is not None:
+ (tempfd, temppath) = tempfile.mkstemp()
+ outfile = os.fdopen(tempfd, 'w')
else:
- outpath_tmpname = None
+ (tempfd, temppath) = (None, None)
outfile = None
rawhash = sha.new()
data = StringIO()
@@ -192,10 +193,14 @@
hash_hex = hash.hexdigest()
if outfile is not None:
outfile.close()
- if prev_hash != hash_hex:
- os.rename(outpath_tmpname, outpath)
+ if prev_hash != hash_hex:
+ k = Key(self.__feedcache_bucket)
+ ts = int(time.time())
+ k.key = targeturl + ('.%d' % (ts,))
+ _logger.debug("storing to bucket %s key %s", self.__feedcache_bucket.name, k.key)
+ k.set_contents_from_filename(temppath)
else:
- os.unlink(outpath_tmpname)
+ os.unlink(temppath)
timestamp_str = response.getheader('Last-Modified', None)
if timestamp_str is not None:
timestamp = mktime_tz(parsedate_tz(timestamp_str))
@@ -243,7 +248,13 @@
self.__savefetches = config.get('firehose.savefetches') == "true"
self.__server = BaseHTTPServer.HTTPServer(('', bindport), TaskRequestHandler)
self.__active_collectors = set()
+ aws_accessid = config.get('firehose.awsAccessKeyId')
+ aws_secretkey = config.get('firehose.awsSecretAccessKey')
+ self.__s3_conn = S3Connection(aws_accessid, aws_secretkey)
+ bname = config.get('firehose.awsS3Bucket')
+ self.__feedcache_bucket = self.__s3_conn.get_bucket('feedcache.' + bname)
+
def run_async(self):
thr = threading.Thread(target=self.run)
thr.setDaemon(True)
@@ -283,11 +294,12 @@
except KeyError, e:
_logger.exception("Failed to find family for task %r", taskid)
return
- inst = fclass()
- kwargs = {}
if self.__savefetches:
- outpath = os.path.join(os.getcwd(), 'data', 'feedcache')
- kwargs['cachedir'] = outpath
+ inst_kwargs = {'feedcache_bucket': self.__feedcache_bucket}
+ else:
+ inst_kwargs = {}
+ inst = fclass(**inst_kwargs)
+ kwargs = {}
try:
(new_hash, new_timestamp) = inst.run(tid, prev_hash, prev_timestamp, **kwargs)
except Exception, e:
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]