r7297 - dumbhippo/trunk/firehose/firehose/jobs



Author: walters
Date: 2008-02-07 14:47:16 -0600 (Thu, 07 Feb 2008)
New Revision: 7297

Modified:
   dumbhippo/trunk/firehose/firehose/jobs/master.py
   dumbhippo/trunk/firehose/firehose/jobs/poller.py
   dumbhippo/trunk/firehose/firehose/jobs/tasks.py
Log:
Actually update in-memory objects.  

Logging tweaks.  

Set things up so we do a poll immediately for addtasks.


Modified: dumbhippo/trunk/firehose/firehose/jobs/master.py
===================================================================
--- dumbhippo/trunk/firehose/firehose/jobs/master.py	2008-02-07 19:00:11 UTC (rev 7296)
+++ dumbhippo/trunk/firehose/firehose/jobs/master.py	2008-02-07 20:47:16 UTC (rev 7297)
@@ -95,8 +95,9 @@
         
         curtime = time.time()
         for key,prev_hash,prev_time in cursor.execute('''SELECT key,prev_hash,prev_time from Tasks'''):
-            task = QueuedTask(curtime, TaskEntry(key, prev_hash, prev_time))
-            heapq.heappush(self.__tasks_queue, task)
+            task = TaskEntry(key, prev_hash, prev_time)
+            qtask = QueuedTask(curtime, task)
+            heapq.heappush(self.__tasks_queue, qtask)
             self.__tasks_map[key] = task
         conn.close()
         _logger.debug("%d queued tasks", len(self.__tasks_queue))
@@ -107,7 +108,7 @@
                 continue 
             task = TaskEntry(key, None, None)
             qtask = QueuedTask(time.time(), task)
-            self.__tasks_queue.append(qtask)
+            heapq.heappush(self.__tasks_queue, qtask)
             self.__tasks_map[key] = task
             
     def __add_task_keys(self, keys):
@@ -133,9 +134,9 @@
         
     def set_tasks(self, taskkeys):
         # For now we don't support resetting the list; just append
-        self.add_tasks(taskkeys)
+        self.add_tasks(taskkeys, immediate=False)
     
-    def add_tasks(self, taskkeys):
+    def add_tasks(self, taskkeys, immediate=True):
         _logger.debug("adding %d task keys", len(taskkeys))
         # Append them to the in-memory state
         self.__add_task_keys(taskkeys)
@@ -148,7 +149,7 @@
                        (taskkey, None, None))
         finally:
             conn.close()
-        self.__requeue_poll()
+        self.__requeue_poll(immediate=immediate)
         
     def __set_task_status(self, cursor, taskkey, hashcode, timestamp):
         _logger.debug("updating task %r values (%r %r)", taskkey, hashcode, timestamp)
@@ -189,6 +190,7 @@
         finally:
             self.__task_lock.release()
 
+    @log_except(_logger)
     def taskset_status(self, results):
         _logger.info("got %d results", len(results))
         changed = []
@@ -200,10 +202,12 @@
                 except KeyError, e:
                     _logger.exception("failed to find task key %r", taskkey)
                     continue
-                if curtask.task.prev_hash != hashcode:
+                if curtask.prev_hash != hashcode:
                     _logger.debug("task %r: new hash %r differs from prev %r", 
-                                  taskkey, hashcode, curtask.task.prev_hash)
+                                  taskkey, hashcode, curtask.prev_hash)
                     changed.append(taskkey)
+                    curtask.prev_hash = hashcode
+                    curtask.prev_timestamp = timestamp
         finally:
             self.__task_lock.release()
         self.__append_changed(changed)            
@@ -252,7 +256,7 @@
             i = 0 
             while True:          
                 try:
-                    task = heapq.heappop(self.__tasks_queue)
+                    qtask = heapq.heappop(self.__tasks_queue)
                 except IndexError, e:
                     break
                 if i >= MAX_TASKSET_SIZE:
@@ -263,11 +267,11 @@
                     i = 0                    
                 else:
                     i += 1
-                eligible = task.eligibility < taskset_limit
-                task.eligibility = curtime + DEFAULT_POLL_TIME_SECS
-                heapq.heappush(self.__tasks_queue, task)                 
+                eligible = qtask.eligibility < taskset_limit
+                qtask.eligibility = curtime + DEFAULT_POLL_TIME_SECS
+                heapq.heappush(self.__tasks_queue, qtask)                 
                 if eligible:
-                    taskset.append((str(task.task), task.task.prev_hash, task.task.prev_timestamp))
+                    taskset.append((str(qtask.task), qtask.task.prev_hash, qtask.task.prev_timestamp))
                 else:
                     break
             if len(taskset) > 0:

Modified: dumbhippo/trunk/firehose/firehose/jobs/poller.py
===================================================================
--- dumbhippo/trunk/firehose/firehose/jobs/poller.py	2008-02-07 19:00:11 UTC (rev 7296)
+++ dumbhippo/trunk/firehose/firehose/jobs/poller.py	2008-02-07 20:47:16 UTC (rev 7297)
@@ -63,7 +63,9 @@
                 _logger.debug("no last-modified for %r", targeturl)
                 timestamp = time.time()
             if prev_hash != hash_hex:
+                _logger.info("Got new hash:%r (prev:%r) ts:%r for url %r", hash_hex, prev_hash, timestamp, targeturl)                
                 return (hash_hex, timestamp)
+            _logger.info("Fetched full unmodified content for %r", targeturl)             
             return (prev_hash, prev_timestamp)
         finally:
             try:
@@ -135,7 +137,6 @@
             return
         inst = fclass()
         (new_hash, new_timestamp) = inst.run(tid, prev_hash, prev_timestamp)
-        _logger.info("Result hash:%r ts:%r", new_hash, new_timestamp)
         resultqueue.put((taskid, new_hash, new_timestamp))     
         taskqueue.task_done()   
         

Modified: dumbhippo/trunk/firehose/firehose/jobs/tasks.py
===================================================================
--- dumbhippo/trunk/firehose/firehose/jobs/tasks.py	2008-02-07 19:00:11 UTC (rev 7296)
+++ dumbhippo/trunk/firehose/firehose/jobs/tasks.py	2008-02-07 20:47:16 UTC (rev 7297)
@@ -3,10 +3,22 @@
 import os,sys,re
 
 class TaskEntry(object):
-    family = property(lambda self: self._family)
-    id = property(lambda self: self._id)
-    prev_hash = property(lambda self: self._prev_hash)
-    prev_timestamp = property(lambda self: self._prev_timestamp)    
+    def set_family(self, v):
+        self._family = v
+    family = property(lambda self: self._family, set_family)
+    
+    def set_id(self, v):
+        self._id = v
+    id = property(lambda self: self._id, set_id)
+    
+    def set_hash(self, v):
+        self._hash = v
+    prev_hash = property(lambda self: self._prev_hash, set_hash)
+    
+    def set_timestamp(self, v):
+        self._timestamp = v        
+    prev_timestamp = property(lambda self: self._prev_timestamp, set_timestamp)
+        
     def __init__(self, keystr, prev_hash, prev_ts):
         super(TaskEntry, self).__init__()
         (self._family, self._id) = keystr.split('/', 1)



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]