r7410 - in dumbhippo/trunk: firehose/firehose/jobs server/src/com/dumbhippo/polling super/firehose/files/conf
- From: commits mugshot org
- To: online-desktop-list gnome org
- Subject: r7410 - in dumbhippo/trunk: firehose/firehose/jobs server/src/com/dumbhippo/polling super/firehose/files/conf
- Date: Thu, 3 Apr 2008 15:57:36 -0500 (CDT)
Author: walters
Date: 2008-04-03 15:57:36 -0500 (Thu, 03 Apr 2008)
New Revision: 7410
Modified:
dumbhippo/trunk/firehose/firehose/jobs/master.py
dumbhippo/trunk/server/src/com/dumbhippo/polling/SwarmPollingSystem.java
dumbhippo/trunk/super/firehose/files/conf/mugshot.cfg
Log:
Fix synchronization of mugshot taskset with Firehose.
Ensure that we only send new tasks over SQS, not all of them.
Change Firehose master to use the S3 store as its canonical list,
and synchronize the sqlite hash cache with it.
Modified: dumbhippo/trunk/firehose/firehose/jobs/master.py
===================================================================
--- dumbhippo/trunk/firehose/firehose/jobs/master.py 2008-04-03 18:24:48 UTC (rev 7409)
+++ dumbhippo/trunk/firehose/firehose/jobs/master.py 2008-04-03 20:57:36 UTC (rev 7410)
@@ -104,7 +104,30 @@
_logger.debug("found localhost worker, starting it")
self.__local_handler = TaskPoller.get()
self.__local_handler.run_async()
+
+ aws_accessid = config.get('firehose.awsAccessKeyId')
+ if aws_accessid is None:
+ raise ValueError("Must have AWS access key set")
+ aws_secretkey = config.get('firehose.awsSecretAccessKey')
+ sqs_base_conn = SQSConnection(aws_accessid, aws_secretkey)
+ self.__sqs_conn = sqs_base_conn.get_query_connection(api_version=SQSConnection.Version20080101)
+ self.__s3_conn = S3Connection(aws_accessid, aws_secretkey)
+ self.__sqs_incoming_q = self.__sqs_conn.create_queue(config.get('firehose.awsSqsIncomingName'))
+ self.__sqs_outgoing_q = self.__sqs_conn.create_queue(config.get('firehose.awsSqsOutgoingName'))
+ poll_sqs_thread = threading.Thread(target=self.__poll_sqs)
+ poll_sqs_thread.setDaemon(True)
+
+ _logger.debug("retrieving current task set...")
+ bucket = self.__s3_conn.get_bucket(config.get('firehose.awsS3Bucket'))
+ key = bucket.get_key(config.get('firehose.awsS3Key'))
+ # FIXME should stream this
+ current_task_keys = {}
+ f = StringIO.StringIO(key.get_contents_as_string())
+ for line in f:
+ current_task_keys[line.strip()] = True
+ _logger.debug("have %d tasks", len(current_task_keys))
+
path = self.__path = config.get('firehose.taskdbpath')
_logger.debug("connecting to %r", path)
conn = sqlite3.connect(path, isolation_level=None)
@@ -116,9 +139,20 @@
curtime = time.time()
taskentries = []
+ dropped_task_keys = {}
for key,prev_hash,prev_time in cursor.execute('''SELECT key,prev_hash,prev_time from Tasks'''):
+ if key not in current_task_keys:
+ dropped_task_keys[key] = True
+ continue
+ del current_task_keys[key]
task = TaskEntry(key, prev_hash, prev_time)
taskentries.append(task)
+ for key in current_task_keys:
+ task = TaskEntry(key, None, None)
+ taskentries.append(task)
+ _logger.info("%d keys were new, %d task keys to drop", len(current_task_keys), len(dropped_task_keys))
+ for key in dropped_task_keys:
+ cursor.execute("DELETE FROM Tasks where key = ?", (key,))
# We explicitly randomize to avoid database grouping
random.shuffle(taskentries)
for task in taskentries:
@@ -128,20 +162,7 @@
conn.close()
_logger.debug("%d queued tasks", len(self.__tasks_queue))
- aws_accessid = config.get('firehose.awsAccessKeyId')
- if aws_accessid is not None:
- aws_secretkey = config.get('firehose.awsSecretAccessKey')
- sqs_base_conn = SQSConnection(aws_accessid, aws_secretkey)
- self.__sqs_conn = sqs_base_conn.get_query_connection(api_version=SQSConnection.Version20080101)
- self.__s3_conn = S3Connection(aws_accessid, aws_secretkey)
-
- self.__sqs_incoming_q = self.__sqs_conn.create_queue(config.get('firehose.awsSqsIncomingName'))
- self.__sqs_outgoing_q = self.__sqs_conn.create_queue(config.get('firehose.awsSqsOutgoingName'))
- t = threading.Thread(target=self.__poll_sqs)
- t.setDaemon(True)
- t.start()
- else:
- self.__sqs_conn = None
+ poll_sqs_thread.start()
def __add_task_keys_unlocked(self, keys):
for key in keys:
Modified: dumbhippo/trunk/server/src/com/dumbhippo/polling/SwarmPollingSystem.java
===================================================================
--- dumbhippo/trunk/server/src/com/dumbhippo/polling/SwarmPollingSystem.java 2008-04-03 18:24:48 UTC (rev 7409)
+++ dumbhippo/trunk/server/src/com/dumbhippo/polling/SwarmPollingSystem.java 2008-04-03 20:57:36 UTC (rev 7410)
@@ -290,7 +290,7 @@
}
if (!newExternalTasks.isEmpty())
- notifyExternalTasks(isFirst);
+ notifyExternalTasks(newExternalTasks, isFirst);
lastSeenTaskDatabaseId = loadResult.getLastDbId();
long totalLoaded = loadResult.getTasks().size();
@@ -440,11 +440,15 @@
}
}
- private void notifyExternalTasks(final boolean overwrite) {
+ private void notifyExternalTasks(final boolean overwrite) {
+ notifyExternalTasks(externalTasks.values(), overwrite);
+ }
+
+ private void notifyExternalTasks(final Collection<PollingTask> tasks, final boolean overwrite) {
Thread t = new Thread(new Runnable() {
public void run() {
try {
- storeExternalTaskSetsS3(externalTasks.values(), overwrite);
+ storeExternalTaskSetsS3(tasks, overwrite);
} catch (TransientServiceException e) {
throw new RuntimeException(e);
}
Modified: dumbhippo/trunk/super/firehose/files/conf/mugshot.cfg
===================================================================
--- dumbhippo/trunk/super/firehose/files/conf/mugshot.cfg 2008-04-03 18:24:48 UTC (rev 7409)
+++ dumbhippo/trunk/super/firehose/files/conf/mugshot.cfg 2008-04-03 20:57:36 UTC (rev 7410)
@@ -18,6 +18,8 @@
firehose.externalServiceKey="@@externalServiceKey@@"
firehose.awsAccessKeyId="@@firehoseAwsAccessKeyId@@"
firehose.awsSecretAccessKey="@@firehoseAwsSecretKey@@"
+firehose.awsS3Bucket="@@firehoseAwsS3Bucket@@"
+firehose.awsS3Key="@@firehoseAwsS3Key@@"
firehose.awsSqsIncomingName="@@firehoseAwsSqsIncomingName@@"
firehose.awsSqsOutgoingName="@@firehoseAwsSqsOutgoingName@@"
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]