r7347 - dumbhippo/trunk/firehose/firehose/jobs



Author: walters
Date: 2008-03-03 14:37:01 -0600 (Mon, 03 Mar 2008)
New Revision: 7347

Modified:
   dumbhippo/trunk/firehose/firehose/jobs/master.py
   dumbhippo/trunk/firehose/firehose/jobs/poller.py
Log:
Poll every 15s by default.  

More cleanly handle multiple tasksets in flight by associating them with a serial.

Report the time delta we took to process each taskset.


Modified: dumbhippo/trunk/firehose/firehose/jobs/master.py
===================================================================
--- dumbhippo/trunk/firehose/firehose/jobs/master.py	2008-02-29 22:38:18 UTC (rev 7346)
+++ dumbhippo/trunk/firehose/firehose/jobs/master.py	2008-03-03 20:37:01 UTC (rev 7347)
@@ -27,10 +27,10 @@
 aws_config_path = os.path.expanduser('~/.aws')
 execfile(aws_config_path)
 
-DEFAULT_POLL_TIME_SECS = 45 * 60 # 45 minutes
+DEFAULT_POLL_PERIODICITY_SECS = 45 * 60 # 45 minutes
 MIN_POLL_TIME_SECS = 15
 TASKSET_TIMEOUT_SECS = 7 * 60 # 7 minutes
-TASKSET_POLL_CHECK_SECS = 2 * 60 # 2 minutes
+TASKSET_POLL_CHECK_SECS = 15
 MAX_TASKSET_SIZE = 30
 MAX_TASKSET_WORKERS = 1
 
@@ -72,6 +72,10 @@
         self.__poll_task = None        
         self.__task_lock = threading.Lock()
         
+        self.__pending_tasksets = {} # map id => timestamp
+        self.__pending_tasksets_serial = 0
+        self.__pending_tasksets_lock = threading.Lock()
+        
         self.__client_url = config.get('firehose.clienturl')
         
         import socket
@@ -253,9 +257,23 @@
             self.__task_lock.release()
 
     @log_except(_logger)
-    def taskset_status(self, results):
-        _logger.info("got %d results", len(results))
+    def taskset_status(self, status):
+        results = status['results']
+        serial = int(status['serial'])
+        time_delta = -1
+        try:
+            self.__pending_tasksets_lock.acquire()
+            try:
+                timestamp = self.__pending_tasksets[serial]
+                del self.__pending_tasksets[serial]
+                time_delta = time.time() - timestamp
+            except KeyError, e:
+                _logger.warn("unknown serial %s received!", serial)
+        finally:
+            self.__pending_tasksets_lock.release()      
+        _logger.info("got status for serial %s; timedelta: %s, %d results", serial, time_delta, len(results))
         changed = []
+
         try:
             self.__task_lock.acquire()
             for (taskkey, hashcode, timestamp) in results:
@@ -302,8 +320,16 @@
     @log_except(_logger)
     def __enqueue_taskset(self, worker, taskset):
         jsonstr = simplejson.dumps(taskset)
+        serial = None
+        try:
+            self.__pending_tasksets_lock.acquire()
+            serial = self.__pending_tasksets_serial 
+            self.__pending_tasksets_serial += 1
+            self.__pending_tasksets[serial] = time.time()
+        finally:
+            self.__pending_tasksets_lock.release()
         conn = httplib.HTTPConnection(worker)
-        conn.request('POST', '/?masterhost=%s' % (self.__hostport,), jsonstr)
+        conn.request('POST', '/?masterhost=%s&serial=%s' % (self.__hostport,serial), jsonstr)
         conn.close()
             
     def __do_poll(self):
@@ -330,7 +356,7 @@
                 else:
                     i += 1
                 eligible = qtask.eligibility < taskset_limit
-                qtask.eligibility = curtime + DEFAULT_POLL_TIME_SECS
+                qtask.eligibility = curtime + DEFAULT_POLL_PERIODICITY_SECS
                 heapq.heappush(self.__tasks_queue, qtask)                 
                 if eligible:
                     taskset.append((str(qtask.task), qtask.task.prev_hash, qtask.task.prev_timestamp))
@@ -346,11 +372,12 @@
         if taskset_count > curworker_count:
             _logger.info("Need worker activation, current=%d, required=%d", curworker_count, taskset_count)
             self.__activate_workers()
-        _logger.info("have %d active tasksets", taskset_count)
+        _logger.info("have %d tasksets to be sent", taskset_count)
         if taskset_count > 0:
             for worker,taskset in zip(self.__worker_endpoints,tasksets):
                 self.__enqueue_taskset(worker, taskset)
             self.__requeue_poll()
+        _logger.debug("%d pending tasksets", len(self.__pending_tasksets))
 
     def requeue(self):
         self.__requeue_poll(immediate=True)

Modified: dumbhippo/trunk/firehose/firehose/jobs/poller.py
===================================================================
--- dumbhippo/trunk/firehose/firehose/jobs/poller.py	2008-02-29 22:38:18 UTC (rev 7346)
+++ dumbhippo/trunk/firehose/firehose/jobs/poller.py	2008-03-03 20:37:01 UTC (rev 7347)
@@ -85,10 +85,12 @@
         def parsequery(s):
             args = s.split('&')
             return dict(map(lambda x: map(urllib.unquote, x.split('=',1)), args))
-        masterhost = parsequery(urlparse.urlparse(self.path)[4])['masterhost']
+        args = parsequery(urlparse.urlparse(self.path)[4])
+        masterhost = args['masterhost']
+        serial = args['serial']
         taskids = simplejson.load(self.rfile)
         poller = TaskPoller.get()
-        poller.poll_tasks(taskids, masterhost)
+        poller.poll_tasks(taskids, masterhost, serial)
         
 _instance = None
 class TaskPoller(object):   
@@ -114,15 +116,17 @@
     def run(self):
         self.__server.serve_forever()
         
-    def __send_results(self, results, masterhost):
-        dumped_results = simplejson.dumps(results)
+    def __send_results(self, results, masterhost, serial):
+        data = {'results': results,
+                'serial': serial}
+        dumped_results = simplejson.dumps(data)
         _logger.debug("opening connection to %r" % (masterhost,))
         connection = httplib.HTTPConnection(masterhost)
         connection.request('POST', '/taskset_status', dumped_results,
                            headers={'Content-Type': 'text/javascript'})
         
     @log_except(_logger)        
-    def __run_collect_tasks(self, resultcount, resultqueue, masterhost):
+    def __run_collect_tasks(self, resultcount, resultqueue, masterhost, serial):
         results = []
         received_count = 0
         _logger.debug("expecting %r results", resultcount)
@@ -131,8 +135,8 @@
             received_count += 1
             if result is not None:
                 results.append(result)     
-        _logger.debug("sending %d results", len(results))            
-        self.__send_results(results, masterhost)
+        _logger.debug("sending %d results", len(results)) 
+        self.__send_results(results, masterhost, serial)
         
     @log_except(_logger)        
     def __run_task(self, taskid, prev_hash, prev_timestamp, resultqueue):
@@ -153,12 +157,12 @@
         else:
             resultqueue.put(None)
         
-    def poll_tasks(self, tasks, masterhost):
+    def poll_tasks(self, tasks, masterhost, serial):
         taskcount = 0
         resultqueue = Queue.Queue()
         for (taskid, prev_hash, prev_timestamp) in tasks:
             taskcount += 1
             thread = threading.Thread(target=self.__run_task, args=(taskid, prev_hash, prev_timestamp, resultqueue))
             thread.start()
-        collector = threading.Thread(target=self.__run_collect_tasks, args=(taskcount,resultqueue,masterhost))
+        collector = threading.Thread(target=self.__run_collect_tasks, args=(taskcount,resultqueue,masterhost,serial))
         collector.start()



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]