r7410 - in dumbhippo/trunk: firehose/firehose/jobs server/src/com/dumbhippo/polling super/firehose/files/conf



Author: walters
Date: 2008-04-03 15:57:36 -0500 (Thu, 03 Apr 2008)
New Revision: 7410

Modified:
   dumbhippo/trunk/firehose/firehose/jobs/master.py
   dumbhippo/trunk/server/src/com/dumbhippo/polling/SwarmPollingSystem.java
   dumbhippo/trunk/super/firehose/files/conf/mugshot.cfg
Log:
Fix synchronization of mugshot taskset with Firehose.

Ensure that we only send new tasks over SQS, not all of them.

Change Firehose master to use the S3 store as its canonical list,
and synchronize the sqlite hash cache with it.



Modified: dumbhippo/trunk/firehose/firehose/jobs/master.py
===================================================================
--- dumbhippo/trunk/firehose/firehose/jobs/master.py	2008-04-03 18:24:48 UTC (rev 7409)
+++ dumbhippo/trunk/firehose/firehose/jobs/master.py	2008-04-03 20:57:36 UTC (rev 7410)
@@ -104,7 +104,30 @@
                 _logger.debug("found localhost worker, starting it")
                 self.__local_handler = TaskPoller.get()
                 self.__local_handler.run_async()
+                
+        aws_accessid = config.get('firehose.awsAccessKeyId')
+        if aws_accessid is None:
+            raise ValueError("Must have AWS access key set")
+        aws_secretkey = config.get('firehose.awsSecretAccessKey')            
+        sqs_base_conn = SQSConnection(aws_accessid, aws_secretkey)
+        self.__sqs_conn = sqs_base_conn.get_query_connection(api_version=SQSConnection.Version20080101)         
+        self.__s3_conn = S3Connection(aws_accessid, aws_secretkey)
         
+        self.__sqs_incoming_q = self.__sqs_conn.create_queue(config.get('firehose.awsSqsIncomingName'))
+        self.__sqs_outgoing_q = self.__sqs_conn.create_queue(config.get('firehose.awsSqsOutgoingName'))                
+        poll_sqs_thread = threading.Thread(target=self.__poll_sqs)
+        poll_sqs_thread.setDaemon(True)     
+        
+        _logger.debug("retrieving current task set...")
+        bucket = self.__s3_conn.get_bucket(config.get('firehose.awsS3Bucket'))
+        key = bucket.get_key(config.get('firehose.awsS3Key'))
+        # FIXME should stream this
+        current_task_keys = {}
+        f = StringIO.StringIO(key.get_contents_as_string())
+        for line in f:
+            current_task_keys[line.strip()] = True
+        _logger.debug("have %d tasks", len(current_task_keys))
+        
         path = self.__path = config.get('firehose.taskdbpath')
         _logger.debug("connecting to %r", path)
         conn = sqlite3.connect(path, isolation_level=None)
@@ -116,9 +139,20 @@
         
         curtime = time.time()
         taskentries = []
+        dropped_task_keys = {}
         for key,prev_hash,prev_time in cursor.execute('''SELECT key,prev_hash,prev_time from Tasks'''):
+            if key not in current_task_keys:
+                dropped_task_keys[key] = True
+                continue        
+            del current_task_keys[key]            
             task = TaskEntry(key, prev_hash, prev_time)
             taskentries.append(task)
+        for key in current_task_keys:
+            task = TaskEntry(key, None, None)
+            taskentries.append(task)    
+        _logger.info("%d keys were new, %d task keys to drop", len(current_task_keys), len(dropped_task_keys))
+        for key in dropped_task_keys:
+            cursor.execute("DELETE FROM Tasks where key = ?", (key,))
         # We explicitly randomize to avoid database grouping
         random.shuffle(taskentries)
         for task in taskentries:
@@ -128,20 +162,7 @@
         conn.close()
         _logger.debug("%d queued tasks", len(self.__tasks_queue))
         
-        aws_accessid = config.get('firehose.awsAccessKeyId')
-        if aws_accessid is not None:
-            aws_secretkey = config.get('firehose.awsSecretAccessKey')            
-            sqs_base_conn = SQSConnection(aws_accessid, aws_secretkey)
-            self.__sqs_conn = sqs_base_conn.get_query_connection(api_version=SQSConnection.Version20080101)         
-            self.__s3_conn = S3Connection(aws_accessid, aws_secretkey)
-            
-            self.__sqs_incoming_q = self.__sqs_conn.create_queue(config.get('firehose.awsSqsIncomingName'))
-            self.__sqs_outgoing_q = self.__sqs_conn.create_queue(config.get('firehose.awsSqsOutgoingName'))                
-            t = threading.Thread(target=self.__poll_sqs)
-            t.setDaemon(True)
-            t.start()
-        else:
-            self.__sqs_conn = None        
+        poll_sqs_thread.start()
     
     def __add_task_keys_unlocked(self, keys):
         for key in keys:

Modified: dumbhippo/trunk/server/src/com/dumbhippo/polling/SwarmPollingSystem.java
===================================================================
--- dumbhippo/trunk/server/src/com/dumbhippo/polling/SwarmPollingSystem.java	2008-04-03 18:24:48 UTC (rev 7409)
+++ dumbhippo/trunk/server/src/com/dumbhippo/polling/SwarmPollingSystem.java	2008-04-03 20:57:36 UTC (rev 7410)
@@ -290,7 +290,7 @@
 				}
 				
 				if (!newExternalTasks.isEmpty())
-					notifyExternalTasks(isFirst);
+					notifyExternalTasks(newExternalTasks, isFirst);
 				
 				lastSeenTaskDatabaseId = loadResult.getLastDbId();
 				long totalLoaded = loadResult.getTasks().size();
@@ -440,11 +440,15 @@
 		}	
 	}
 	
-	private void notifyExternalTasks(final boolean overwrite) {		
+	private void notifyExternalTasks(final boolean overwrite) {
+		notifyExternalTasks(externalTasks.values(), overwrite);	
+	}
+	
+	private void notifyExternalTasks(final Collection<PollingTask> tasks, final boolean overwrite) {		
 		Thread t = new Thread(new Runnable() {
 			public void run() {
 				try {
-					storeExternalTaskSetsS3(externalTasks.values(), overwrite);
+					storeExternalTaskSetsS3(tasks, overwrite);
 				} catch (TransientServiceException e) {	
 					throw new RuntimeException(e);
 				}

Modified: dumbhippo/trunk/super/firehose/files/conf/mugshot.cfg
===================================================================
--- dumbhippo/trunk/super/firehose/files/conf/mugshot.cfg	2008-04-03 18:24:48 UTC (rev 7409)
+++ dumbhippo/trunk/super/firehose/files/conf/mugshot.cfg	2008-04-03 20:57:36 UTC (rev 7410)
@@ -18,6 +18,8 @@
 firehose.externalServiceKey="@@externalServiceKey@@"
 firehose.awsAccessKeyId="@@firehoseAwsAccessKeyId@@"
 firehose.awsSecretAccessKey="@@firehoseAwsSecretKey@@"
+firehose.awsS3Bucket="@@firehoseAwsS3Bucket@@"
+firehose.awsS3Key="@@firehoseAwsS3Key@@"
 firehose.awsSqsIncomingName="@@firehoseAwsSqsIncomingName@@"
 firehose.awsSqsOutgoingName="@@firehoseAwsSqsOutgoingName@@"
 



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]