r7430 - dumbhippo/trunk/firehose/firehose/jobs



Author: walters
Date: 2008-04-17 13:12:25 -0500 (Thu, 17 Apr 2008)
New Revision: 7430

Modified:
   dumbhippo/trunk/firehose/firehose/jobs/master.py
Log:
Be sure we persist new tasks before polling, for sanity.



Modified: dumbhippo/trunk/firehose/firehose/jobs/master.py
===================================================================
--- dumbhippo/trunk/firehose/firehose/jobs/master.py	2008-04-16 23:08:19 UTC (rev 7429)
+++ dumbhippo/trunk/firehose/firehose/jobs/master.py	2008-04-17 18:12:25 UTC (rev 7430)
@@ -118,6 +118,15 @@
         poll_sqs_thread = threading.Thread(target=self.__poll_sqs)
         poll_sqs_thread.setDaemon(True)     
         
+        path = self.__path = config.get('firehose.taskdbpath')
+        _logger.debug("connecting to %r", path)
+        conn = sqlite3.connect(path, isolation_level=None)
+        cursor = conn.cursor()
+        cursor.execute('''CREATE TABLE IF NOT EXISTS Tasks (key TEXT UNIQUE,
+                                                            prev_hash TEXT,
+                                                            prev_time DATETIME)''')
+        cursor.execute('''CREATE INDEX IF NOT EXISTS TasksIdx on Tasks (key)''')
+        
         _logger.debug("retrieving current task set...")
         bucket = self.__s3_conn.get_bucket(config.get('firehose.awsS3Bucket'))
         key = bucket.get_key(config.get('firehose.awsS3Key'))
@@ -126,17 +135,9 @@
         f = StringIO.StringIO(key.get_contents_as_string())
         for line in f:
             current_task_keys[line.strip()] = True
-        _logger.debug("have %d tasks", len(current_task_keys))
+        _logger.debug("have %d tasks", len(current_task_keys))        
+        self.__ensure_tasks_persisted(current_task_keys)        
         
-        path = self.__path = config.get('firehose.taskdbpath')
-        _logger.debug("connecting to %r", path)
-        conn = sqlite3.connect(path, isolation_level=None)
-        cursor = conn.cursor()
-        cursor.execute('''CREATE TABLE IF NOT EXISTS Tasks (key TEXT UNIQUE,
-                                                            prev_hash TEXT,
-                                                            prev_time DATETIME)''')
-        cursor.execute('''CREATE INDEX IF NOT EXISTS TasksIdx on Tasks (key)''')
-        
         curtime = time.time()
         taskentries = []
         dropped_task_keys = {}
@@ -201,21 +202,23 @@
         # For now we don't support resetting the list; just append
         self.add_tasks(taskkeys, immediate=False)
     
+    def __ensure_tasks_persisted(self, taskkeys):
+        conn = sqlite3.connect(self.__path, isolation_level=None)          
+        try:      
+            cursor = conn.cursor()
+            for taskkey in taskkeys:
+                cursor.execute('''INSERT OR IGNORE INTO Tasks VALUES (?, ?, ?)''',
+                       (taskkey, None, None))
+        finally:
+            conn.close()        
+    
     def add_tasks(self, taskkeys, immediate=True):
         # Convert to a list, be sure to strip any trailing newlines
         taskkeys = map(lambda x: x.strip(), taskkeys)
         _logger.debug("adding %d task keys", len(taskkeys))
         # Append them to the in-memory state
         self.__add_task_keys(taskkeys)
-        # Persist them
-        try:
-            conn = sqlite3.connect(self.__path, isolation_level=None)        
-            cursor = conn.cursor()
-            for taskkey in taskkeys:
-                cursor.execute('''INSERT OR IGNORE INTO Tasks VALUES (?, ?, ?)''',
-                       (taskkey, None, None))
-        finally:
-            conn.close()
+        self.__ensure_tasks_persisted(taskkeys)
         self.__requeue_poll(immediate=immediate)
         
     def __set_task_status(self, cursor, taskkey, hashcode, timestamp):



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]