r7480 - dumbhippo/trunk/firehose/firehose/jobs
- From: commits mugshot org
- To: online-desktop-list gnome org
- Subject: r7480 - dumbhippo/trunk/firehose/firehose/jobs
- Date: Mon, 12 May 2008 17:36:48 -0500 (CDT)
Author: walters
Date: 2008-05-12 17:36:45 -0500 (Mon, 12 May 2008)
New Revision: 7480
Modified:
dumbhippo/trunk/firehose/firehose/jobs/poller.py
Log:
Discover S3 connections are not threadsafe
Handle exceptions from S3 in a non-fatal way
Modified: dumbhippo/trunk/firehose/firehose/jobs/poller.py
===================================================================
--- dumbhippo/trunk/firehose/firehose/jobs/poller.py 2008-05-12 20:23:06 UTC (rev 7479)
+++ dumbhippo/trunk/firehose/firehose/jobs/poller.py 2008-05-12 22:36:45 UTC (rev 7480)
@@ -115,6 +115,11 @@
def is_identity(self):
return self.__is_identity
+
+ def size(self):
+ if self.is_identity():
+ return 0
+ return len(self._processors)
def process(self, data):
for processor in self._processors:
@@ -127,7 +132,7 @@
# This maps from a regular expression matching a URL to a list of processors
feed_transformations = [
(r'digg.com/users/.*/history/diggs.rss', [rss_eater]),
- (r'picasaweb.google.com.*feed.*base.*album', [rss_eater, atom_eater]),
+ (r'picasaweb.google.com/data/feed', [rss_eater, atom_eater]),
(r'google.com/reader/public', [XmlElementEater(['/feed/updated'])]),
(r'blogs.gnome.org', [RegexpEater(['<!--.*page served in.*seconds.*-->'])]),
# We try to consume all HTML
@@ -145,8 +150,22 @@
class FeedTaskHandler(object):
FAMILY = 'FEED'
- def __init__(self, feedcache_bucket=None):
- self.__feedcache_bucket = feedcache_bucket
+ def __init__(self, aws_accessid=None, aws_secretkey=None, feedcache_bucket_name=None):
+ self.__aws_accessid = aws_accessid
+ self.__aws_secretkey = aws_secretkey
+ self.__feedcache_bucket_name = feedcache_bucket_name
+
+ def _store_to_s3(self, targeturl, temppath):
+ try:
+ s3_conn = S3Connection(self.__aws_accessid, self.__aws_secretkey)
+ feedcache_bucket = s3_conn.get_bucket(self.__feedcache_bucket_name)
+ k = Key(feedcache_bucket)
+ ts = int(time.time())
+ k.key = targeturl + ('.%d' % (ts,))
+ _logger.debug("storing to bucket %s key %s", feedcache_bucket.name, k.key)
+ k.set_contents_from_filename(temppath)
+ except:
+ _logger.exception("failed to store to S3: key %s", targeturl)
def run(self, id, prev_hash, prev_timestamp):
targeturl = id
@@ -169,7 +188,7 @@
if response.status == 304:
_logger.info("Got 304 Unmodified for %r", targeturl)
return (prev_hash, prev_timestamp)
- if self.__feedcache_bucket is not None:
+ if self.__feedcache_bucket_name is not None:
(tempfd, temppath) = tempfile.mkstemp()
outfile = os.fdopen(tempfd, 'w')
else:
@@ -194,20 +213,15 @@
if outfile is not None:
outfile.close()
if prev_hash != hash_hex:
- k = Key(self.__feedcache_bucket)
- ts = int(time.time())
- k.key = targeturl + ('.%d' % (ts,))
- _logger.debug("storing to bucket %s key %s", self.__feedcache_bucket.name, k.key)
- k.set_contents_from_filename(temppath)
- else:
- os.unlink(temppath)
+ self._store_to_s3(targeturl, temppath)
+ os.unlink(temppath)
timestamp_str = response.getheader('Last-Modified', None)
if timestamp_str is not None:
timestamp = mktime_tz(parsedate_tz(timestamp_str))
else:
_logger.debug("no last-modified for %r", targeturl)
timestamp = time.time()
- filters_applied = (not processor.is_identity()) and "(filters applied)" or ""
+ filters_applied = (not processor.is_identity()) and ("(%d filters applied)" % (processor.size())) or ""
if prev_hash != hash_hex:
_logger.info("Got new hash:%r (prev:%r) ts:%r %s for url %r", hash_hex, prev_hash, timestamp, filters_applied, targeturl)
return (hash_hex, timestamp)
@@ -248,13 +262,10 @@
self.__savefetches = config.get('firehose.savefetches') == "true"
self.__server = BaseHTTPServer.HTTPServer(('', bindport), TaskRequestHandler)
self.__active_collectors = set()
- aws_accessid = config.get('firehose.awsAccessKeyId')
- aws_secretkey = config.get('firehose.awsSecretAccessKey')
- self.__s3_conn = S3Connection(aws_accessid, aws_secretkey)
+ self.__aws_accessid = config.get('firehose.awsAccessKeyId')
+ self.__aws_secretkey = config.get('firehose.awsSecretAccessKey')
+ self.__feedcache_bucket_name = 'feedcache.' + config.get('firehose.awsS3Bucket')
- bname = config.get('firehose.awsS3Bucket')
- self.__feedcache_bucket = self.__s3_conn.get_bucket('feedcache.' + bname)
-
def run_async(self):
thr = threading.Thread(target=self.run)
thr.setDaemon(True)
@@ -295,7 +306,9 @@
_logger.exception("Failed to find family for task %r", taskid)
return
if self.__savefetches:
- inst_kwargs = {'feedcache_bucket': self.__feedcache_bucket}
+ inst_kwargs = {'aws_accessid': self.__aws_accessid,
+ 'aws_secretkey': self.__aws_secretkey,
+ 'feedcache_bucket_name': self.__feedcache_bucket_name}
else:
inst_kwargs = {}
inst = fclass(**inst_kwargs)
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]