r7462 - in dumbhippo/trunk: firehose/firehose/jobs super super/firehose/files/conf
- From: commits mugshot org
- To: online-desktop-list gnome org
- Subject: r7462 - in dumbhippo/trunk: firehose/firehose/jobs super super/firehose/files/conf
- Date: Tue, 29 Apr 2008 16:49:24 -0500 (CDT)
Author: walters
Date: 2008-04-29 16:49:24 -0500 (Tue, 29 Apr 2008)
New Revision: 7462
Modified:
dumbhippo/trunk/firehose/firehose/jobs/master.py
dumbhippo/trunk/firehose/firehose/jobs/poller.py
dumbhippo/trunk/super/base.conf
dumbhippo/trunk/super/firehose/files/conf/mugshot.cfg
Log:
Support for saving fetched data, so we can compute diffs over it later
Modified: dumbhippo/trunk/firehose/firehose/jobs/master.py
===================================================================
--- dumbhippo/trunk/firehose/firehose/jobs/master.py 2008-04-29 18:24:58 UTC (rev 7461)
+++ dumbhippo/trunk/firehose/firehose/jobs/master.py 2008-04-29 21:49:24 UTC (rev 7462)
@@ -127,17 +127,24 @@
prev_time DATETIME)''')
cursor.execute('''CREATE INDEX IF NOT EXISTS TasksIdx on Tasks (key)''')
- _logger.debug("retrieving current task set...")
- bucket = self.__s3_conn.get_bucket(config.get('firehose.awsS3Bucket'))
- key = bucket.get_key(config.get('firehose.awsS3Key'))
+ bname = config.get('firehose.awsS3Bucket')
+ kname = config.get('firehose.awsS3Key')
+ _logger.debug("retrieving current task set from bucket: %r key: %r", bname, kname)
+ bucket = self.__s3_conn.get_bucket(bname)
+
# FIXME should stream this
- current_task_keys = {}
- f = StringIO.StringIO(key.get_contents_as_string())
- for line in f:
- current_task_keys[line.strip()] = True
- _logger.debug("have %d tasks", len(current_task_keys))
- self.__ensure_tasks_persisted(current_task_keys)
-
+ current_task_keys = {}
+ key = bucket.get_key(kname)
+ if key is not None:
+ contents = key.get_contents_as_string()
+ if contents is not None:
+ f = StringIO.StringIO(contents)
+ for line in f:
+ current_task_keys[line.strip()] = True
+ _logger.debug("have %d tasks", len(current_task_keys))
+ self.__ensure_tasks_persisted(current_task_keys)
+ else:
+ _logger.warn("no currently saved task set!")
curtime = time.time()
taskentries = []
dropped_task_keys = {}
Modified: dumbhippo/trunk/firehose/firehose/jobs/poller.py
===================================================================
--- dumbhippo/trunk/firehose/firehose/jobs/poller.py 2008-04-29 18:24:58 UTC (rev 7461)
+++ dumbhippo/trunk/firehose/firehose/jobs/poller.py 2008-04-29 21:49:24 UTC (rev 7462)
@@ -4,6 +4,7 @@
import BaseHTTPServer,httplib,urlparse,urllib
from email.Utils import formatdate,parsedate_tz,mktime_tz
import logging
+from StringIO import StringIO
import boto
@@ -41,7 +42,7 @@
class FeedTaskHandler(object):
FAMILY = 'FEED'
- def run(self, id, prev_hash, prev_timestamp):
+ def run(self, id, prev_hash, prev_timestamp, outpath=None):
targeturl = id
parsedurl = urlparse.urlparse(targeturl)
try:
@@ -61,12 +62,23 @@
if response.status == 304:
_logger.info("Got 304 Unmodified for %r", targeturl)
return (prev_hash, prev_timestamp)
- hash = sha.new()
+ if outpath is not None:
+ outpath_tmpname = outpath+'.tmp'
+ outfile = open(outpath_tmpname, 'w')
+ else:
+ outpath_tmpname = None
+ outfile = None
+ hash = sha.new()
buf = response.read(8192)
while buf:
hash.update(buf)
+ if outfile is not None:
+ outfile.write(buf)
buf = response.read(8192)
hash_hex = hash.hexdigest()
+ if outfile is not None:
+ outfile.close()
+ os.rename(outpath_tmpname, outpath)
timestamp_str = response.getheader('Last-Modified', None)
if timestamp_str is not None:
timestamp = mktime_tz(parsedate_tz(timestamp_str))
@@ -110,6 +122,7 @@
def __init__(self):
bindport = int(config.get('firehose.localslaveport'))
+ self.__savefetches = config.get('firehose.savefetches') == "true"
self.__server = BaseHTTPServer.HTTPServer(('', bindport), TaskRequestHandler)
self.__active_collectors = set()
@@ -153,8 +166,14 @@
_logger.exception("Failed to find family for task %r", taskid)
return
inst = fclass()
+ kwargs = {}
+ if self.__savefetches:
+ quotedname = urllib.quote_plus(taskid)
+ ts = int(time.time())
+ outpath = os.path.join(os.getcwd(), 'data', quotedname + '.' + unicode(ts))
+ kwargs['outpath'] = outpath
try:
- (new_hash, new_timestamp) = inst.run(tid, prev_hash, prev_timestamp)
+ (new_hash, new_timestamp) = inst.run(tid, prev_hash, prev_timestamp, **kwargs)
except Exception, e:
_logger.error("Failed task id %r: %s", tid, e)
(new_hash, new_timestamp) = (None, None)
Modified: dumbhippo/trunk/super/base.conf
===================================================================
--- dumbhippo/trunk/super/base.conf 2008-04-29 18:24:58 UTC (rev 7461)
+++ dumbhippo/trunk/super/base.conf 2008-04-29 21:49:24 UTC (rev 7462)
@@ -508,7 +508,8 @@
<targetAttributes pattern="/run/*" ignore="yes"/>
<targetAttributes pattern="/data/*" ignore="yes" preserve="yes"/>
- <parameter name="firehoseLocalSlavePort">$((baseport+81))</parameter>
+ <parameter name="firehoseLocalSlavePort">$((baseport+81))</parameter>
+ <parameter name="firehoseSaveFetches">false</parameter>
<parameter name="startCommand">$targetdir/scripts/firehose-start.sh</parameter>
<parameter name="stopCommand">$targetdir/scripts/firehose-stop.sh</parameter>
Modified: dumbhippo/trunk/super/firehose/files/conf/mugshot.cfg
===================================================================
--- dumbhippo/trunk/super/firehose/files/conf/mugshot.cfg 2008-04-29 18:24:58 UTC (rev 7461)
+++ dumbhippo/trunk/super/firehose/files/conf/mugshot.cfg 2008-04-29 21:49:24 UTC (rev 7462)
@@ -23,6 +23,8 @@
firehose.awsSqsIncomingName="@@firehoseAwsSqsIncomingName@@"
firehose.awsSqsOutgoingName="@@firehoseAwsSqsOutgoingName@@"
+firehose.savefetches="@@firehoseSaveFetches@@"
+
# if you are using a database or table type without transactions
# (MySQL default, for example), you should turn off transactions
# by prepending notrans_ on the uri
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]