r7297 - dumbhippo/trunk/firehose/firehose/jobs
- From: commits mugshot org
- To: online-desktop-list gnome org
- Subject: r7297 - dumbhippo/trunk/firehose/firehose/jobs
- Date: Thu, 7 Feb 2008 14:47:16 -0600 (CST)
Author: walters
Date: 2008-02-07 14:47:16 -0600 (Thu, 07 Feb 2008)
New Revision: 7297
Modified:
dumbhippo/trunk/firehose/firehose/jobs/master.py
dumbhippo/trunk/firehose/firehose/jobs/poller.py
dumbhippo/trunk/firehose/firehose/jobs/tasks.py
Log:
Actually update in-memory objects.
Logging tweaks.
Set things up so we do a poll immediately for addtasks.
Modified: dumbhippo/trunk/firehose/firehose/jobs/master.py
===================================================================
--- dumbhippo/trunk/firehose/firehose/jobs/master.py 2008-02-07 19:00:11 UTC (rev 7296)
+++ dumbhippo/trunk/firehose/firehose/jobs/master.py 2008-02-07 20:47:16 UTC (rev 7297)
@@ -95,8 +95,9 @@
curtime = time.time()
for key,prev_hash,prev_time in cursor.execute('''SELECT key,prev_hash,prev_time from Tasks'''):
- task = QueuedTask(curtime, TaskEntry(key, prev_hash, prev_time))
- heapq.heappush(self.__tasks_queue, task)
+ task = TaskEntry(key, prev_hash, prev_time)
+ qtask = QueuedTask(curtime, task)
+ heapq.heappush(self.__tasks_queue, qtask)
self.__tasks_map[key] = task
conn.close()
_logger.debug("%d queued tasks", len(self.__tasks_queue))
@@ -107,7 +108,7 @@
continue
task = TaskEntry(key, None, None)
qtask = QueuedTask(time.time(), task)
- self.__tasks_queue.append(qtask)
+ heapq.heappush(self.__tasks_queue, qtask)
self.__tasks_map[key] = task
def __add_task_keys(self, keys):
@@ -133,9 +134,9 @@
def set_tasks(self, taskkeys):
# For now we don't support resetting the list; just append
- self.add_tasks(taskkeys)
+ self.add_tasks(taskkeys, immediate=False)
- def add_tasks(self, taskkeys):
+ def add_tasks(self, taskkeys, immediate=True):
_logger.debug("adding %d task keys", len(taskkeys))
# Append them to the in-memory state
self.__add_task_keys(taskkeys)
@@ -148,7 +149,7 @@
(taskkey, None, None))
finally:
conn.close()
- self.__requeue_poll()
+ self.__requeue_poll(immediate=immediate)
def __set_task_status(self, cursor, taskkey, hashcode, timestamp):
_logger.debug("updating task %r values (%r %r)", taskkey, hashcode, timestamp)
@@ -189,6 +190,7 @@
finally:
self.__task_lock.release()
+ @log_except(_logger)
def taskset_status(self, results):
_logger.info("got %d results", len(results))
changed = []
@@ -200,10 +202,12 @@
except KeyError, e:
_logger.exception("failed to find task key %r", taskkey)
continue
- if curtask.task.prev_hash != hashcode:
+ if curtask.prev_hash != hashcode:
_logger.debug("task %r: new hash %r differs from prev %r",
- taskkey, hashcode, curtask.task.prev_hash)
+ taskkey, hashcode, curtask.prev_hash)
changed.append(taskkey)
+ curtask.prev_hash = hashcode
+ curtask.prev_timestamp = timestamp
finally:
self.__task_lock.release()
self.__append_changed(changed)
@@ -252,7 +256,7 @@
i = 0
while True:
try:
- task = heapq.heappop(self.__tasks_queue)
+ qtask = heapq.heappop(self.__tasks_queue)
except IndexError, e:
break
if i >= MAX_TASKSET_SIZE:
@@ -263,11 +267,11 @@
i = 0
else:
i += 1
- eligible = task.eligibility < taskset_limit
- task.eligibility = curtime + DEFAULT_POLL_TIME_SECS
- heapq.heappush(self.__tasks_queue, task)
+ eligible = qtask.eligibility < taskset_limit
+ qtask.eligibility = curtime + DEFAULT_POLL_TIME_SECS
+ heapq.heappush(self.__tasks_queue, qtask)
if eligible:
- taskset.append((str(task.task), task.task.prev_hash, task.task.prev_timestamp))
+ taskset.append((str(qtask.task), qtask.task.prev_hash, qtask.task.prev_timestamp))
else:
break
if len(taskset) > 0:
Modified: dumbhippo/trunk/firehose/firehose/jobs/poller.py
===================================================================
--- dumbhippo/trunk/firehose/firehose/jobs/poller.py 2008-02-07 19:00:11 UTC (rev 7296)
+++ dumbhippo/trunk/firehose/firehose/jobs/poller.py 2008-02-07 20:47:16 UTC (rev 7297)
@@ -63,7 +63,9 @@
_logger.debug("no last-modified for %r", targeturl)
timestamp = time.time()
if prev_hash != hash_hex:
+ _logger.info("Got new hash:%r (prev:%r) ts:%r for url %r", hash_hex, prev_hash, timestamp, targeturl)
return (hash_hex, timestamp)
+ _logger.info("Fetched full unmodified content for %r", targeturl)
return (prev_hash, prev_timestamp)
finally:
try:
@@ -135,7 +137,6 @@
return
inst = fclass()
(new_hash, new_timestamp) = inst.run(tid, prev_hash, prev_timestamp)
- _logger.info("Result hash:%r ts:%r", new_hash, new_timestamp)
resultqueue.put((taskid, new_hash, new_timestamp))
taskqueue.task_done()
Modified: dumbhippo/trunk/firehose/firehose/jobs/tasks.py
===================================================================
--- dumbhippo/trunk/firehose/firehose/jobs/tasks.py 2008-02-07 19:00:11 UTC (rev 7296)
+++ dumbhippo/trunk/firehose/firehose/jobs/tasks.py 2008-02-07 20:47:16 UTC (rev 7297)
@@ -3,10 +3,22 @@
import os,sys,re
class TaskEntry(object):
- family = property(lambda self: self._family)
- id = property(lambda self: self._id)
- prev_hash = property(lambda self: self._prev_hash)
- prev_timestamp = property(lambda self: self._prev_timestamp)
+ def set_family(self, v):
+ self._family = v
+ family = property(lambda self: self._family, set_family)
+
+ def set_id(self, v):
+ self._id = v
+ id = property(lambda self: self._id, set_id)
+
+ def set_hash(self, v):
+ self._hash = v
+ prev_hash = property(lambda self: self._prev_hash, set_hash)
+
+ def set_timestamp(self, v):
+ self._timestamp = v
+ prev_timestamp = property(lambda self: self._prev_timestamp, set_timestamp)
+
def __init__(self, keystr, prev_hash, prev_ts):
super(TaskEntry, self).__init__()
(self._family, self._id) = keystr.split('/', 1)
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]