r7492 - dumbhippo/trunk/firehose/firehose/jobs



Author: walters
Date: 2008-07-02 11:04:48 -0500 (Wed, 02 Jul 2008)
New Revision: 7492

Modified:
   dumbhippo/trunk/firehose/firehose/jobs/master.py
   dumbhippo/trunk/firehose/firehose/jobs/poller.py
Log:
Reboot when we detect deadlock, and fix setDaemon call


Modified: dumbhippo/trunk/firehose/firehose/jobs/master.py
===================================================================
--- dumbhippo/trunk/firehose/firehose/jobs/master.py	2008-06-25 14:29:01 UTC (rev 7491)
+++ dumbhippo/trunk/firehose/firehose/jobs/master.py	2008-07-02 16:04:48 UTC (rev 7492)
@@ -173,7 +173,7 @@
         _logger.debug("%d queued tasks", len(self.__tasks_queue))
         
         poll_sqs_thread.start()
-    
+
     def __add_task_keys_unlocked(self, keys):
         for key in keys:
             if key == '':
@@ -436,7 +436,11 @@
             for worker,taskset in zip(self.__worker_endpoints,tasksets):
                 self.__enqueue_taskset(worker, taskset)
             self.__requeue_poll()
-        _logger.debug("%d pending tasksets", len(self.__pending_tasksets))
+        pending_count = len(self.__pending_tasksets)
+        _logger.debug("%d pending tasksets", pending_count)
+        if pending_count > 8:
+            logger.warning("DEADLOCK DETECTED, rebooting")
+            os._exit(1)
 
     def requeue(self):
         self.__requeue_poll(immediate=True)

Modified: dumbhippo/trunk/firehose/firehose/jobs/poller.py
===================================================================
--- dumbhippo/trunk/firehose/firehose/jobs/poller.py	2008-06-25 14:29:01 UTC (rev 7491)
+++ dumbhippo/trunk/firehose/firehose/jobs/poller.py	2008-07-02 16:04:48 UTC (rev 7492)
@@ -302,7 +302,7 @@
             received_count += 1
             if result is not None:
                 results.append(result)     
-        _logger.debug("sending %d results", len(results)) 
+        _logger.debug("sending %d successful results (%d total)", len(results), resultcount) 
         self.__send_results(results, masterhost, serial)
         
     @log_except(_logger)        
@@ -340,5 +340,5 @@
             thread.setDaemon(True)
             thread.start()
         collector = threading.Thread(target=self.__run_collect_tasks, args=(taskcount,resultqueue,masterhost,serial))
+        collector.setDaemon(True)
         collector.start()
-        collector.setDaemon(True)



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]