r7430 - dumbhippo/trunk/firehose/firehose/jobs
- From: commits mugshot org
- To: online-desktop-list gnome org
- Subject: r7430 - dumbhippo/trunk/firehose/firehose/jobs
- Date: Thu, 17 Apr 2008 13:12:25 -0500 (CDT)
Author: walters
Date: 2008-04-17 13:12:25 -0500 (Thu, 17 Apr 2008)
New Revision: 7430
Modified:
dumbhippo/trunk/firehose/firehose/jobs/master.py
Log:
Be sure we persist new tasks before polling, for sanity.
Modified: dumbhippo/trunk/firehose/firehose/jobs/master.py
===================================================================
--- dumbhippo/trunk/firehose/firehose/jobs/master.py 2008-04-16 23:08:19 UTC (rev 7429)
+++ dumbhippo/trunk/firehose/firehose/jobs/master.py 2008-04-17 18:12:25 UTC (rev 7430)
@@ -118,6 +118,15 @@
poll_sqs_thread = threading.Thread(target=self.__poll_sqs)
poll_sqs_thread.setDaemon(True)
+ path = self.__path = config.get('firehose.taskdbpath')
+ _logger.debug("connecting to %r", path)
+ conn = sqlite3.connect(path, isolation_level=None)
+ cursor = conn.cursor()
+ cursor.execute('''CREATE TABLE IF NOT EXISTS Tasks (key TEXT UNIQUE,
+ prev_hash TEXT,
+ prev_time DATETIME)''')
+ cursor.execute('''CREATE INDEX IF NOT EXISTS TasksIdx on Tasks (key)''')
+
_logger.debug("retrieving current task set...")
bucket = self.__s3_conn.get_bucket(config.get('firehose.awsS3Bucket'))
key = bucket.get_key(config.get('firehose.awsS3Key'))
@@ -126,17 +135,9 @@
f = StringIO.StringIO(key.get_contents_as_string())
for line in f:
current_task_keys[line.strip()] = True
- _logger.debug("have %d tasks", len(current_task_keys))
+ _logger.debug("have %d tasks", len(current_task_keys))
+ self.__ensure_tasks_persisted(current_task_keys)
- path = self.__path = config.get('firehose.taskdbpath')
- _logger.debug("connecting to %r", path)
- conn = sqlite3.connect(path, isolation_level=None)
- cursor = conn.cursor()
- cursor.execute('''CREATE TABLE IF NOT EXISTS Tasks (key TEXT UNIQUE,
- prev_hash TEXT,
- prev_time DATETIME)''')
- cursor.execute('''CREATE INDEX IF NOT EXISTS TasksIdx on Tasks (key)''')
-
curtime = time.time()
taskentries = []
dropped_task_keys = {}
@@ -201,21 +202,23 @@
# For now we don't support resetting the list; just append
self.add_tasks(taskkeys, immediate=False)
+ def __ensure_tasks_persisted(self, taskkeys):
+ conn = sqlite3.connect(self.__path, isolation_level=None)
+ try:
+ cursor = conn.cursor()
+ for taskkey in taskkeys:
+ cursor.execute('''INSERT OR IGNORE INTO Tasks VALUES (?, ?, ?)''',
+ (taskkey, None, None))
+ finally:
+ conn.close()
+
def add_tasks(self, taskkeys, immediate=True):
# Convert to a list, be sure to strip any trailing newlines
taskkeys = map(lambda x: x.strip(), taskkeys)
_logger.debug("adding %d task keys", len(taskkeys))
# Append them to the in-memory state
self.__add_task_keys(taskkeys)
- # Persist them
- try:
- conn = sqlite3.connect(self.__path, isolation_level=None)
- cursor = conn.cursor()
- for taskkey in taskkeys:
- cursor.execute('''INSERT OR IGNORE INTO Tasks VALUES (?, ?, ?)''',
- (taskkey, None, None))
- finally:
- conn.close()
+ self.__ensure_tasks_persisted(taskkeys)
self.__requeue_poll(immediate=immediate)
def __set_task_status(self, cursor, taskkey, hashcode, timestamp):
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]