r7329 - dumbhippo/trunk/firehose/firehose/jobs



Author: walters
Date: 2008-02-22 11:14:21 -0600 (Fri, 22 Feb 2008)
New Revision: 7329

Modified:
   dumbhippo/trunk/firehose/firehose/jobs/master.py
   dumbhippo/trunk/firehose/firehose/jobs/poller.py
Log:
Avoid using Python 2.5 Queue bits.

Send result status back via SQS.  


Modified: dumbhippo/trunk/firehose/firehose/jobs/master.py
===================================================================
--- dumbhippo/trunk/firehose/firehose/jobs/master.py	2008-02-21 20:36:48 UTC (rev 7328)
+++ dumbhippo/trunk/firehose/firehose/jobs/master.py	2008-02-22 17:14:21 UTC (rev 7329)
@@ -74,6 +74,9 @@
         
         self.__client_url = config.get('firehose.clienturl')
         
+        import socket
+        self.__hostport = '%s:%s' % (socket.gethostname(), config.get('server.socket_port'))
+        
         # Default to one slave on localhost
         self.__worker_endpoints = ['localhost:%d' % (int(config.get('firehose.localslaveport')),)]
         _logger.debug("worker endpoints are %r", self.__worker_endpoints)
@@ -207,7 +210,6 @@
     @log_except(_logger)
     def __push_changed(self):
         _logger.debug("doing change push")
-        extkey = config.get('firehose.externalServiceKey')
         try:
             self.__task_lock.acquire()
             self.__changed_thread_queued = False
@@ -215,14 +217,26 @@
             self.__changed_buffer = []
         finally:
             self.__task_lock.release()
-        update_obj = {'updated_keys': changed}
-        jsonstr = simplejson.dumps(update_obj)
-        parsed = urlparse.urlparse(self.__client_url)
-        conn = httplib.HTTPConnection(parsed.hostname, parsed.port)
-        path = parsed.path or '/'
-        path += '?esk=' + extkey
-        conn.request('POST', path, jsonstr)
-        conn.close()        
+            
+        maxchars = 8*1024
+        remaining_bytes = maxchars
+        taskcount = 0
+        buf = "add\n"
+        _logger.debug("splitting %d tasks into messages", len(changed));            
+        for task in changed:
+            buf += task
+            buf += '\n'
+            taskcount += 1
+            new_remaining_bytes = remaining_bytes - (len(task)+1);
+            if new_remaining_bytes < 0:
+                self.__sqs_outgoing_q.write(self.__sqs_outgoing_q.new_message(buf))
+                remaining_bytes = maxchars
+                taskcount = 0
+                buf = "add\n"                        
+            else:
+                remaining_bytes = new_remaining_bytes
+        if taskcount > 0:
+            self.__sqs_outgoing_q.write(self.__sqs_outgoing_q.new_message(buf))
 
     def __append_changed(self, changed):
         if len(changed) == 0:
@@ -289,7 +303,7 @@
     def __enqueue_taskset(self, worker, taskset):
         jsonstr = simplejson.dumps(taskset)
         conn = httplib.HTTPConnection(worker)
-        conn.request('POST', '/', jsonstr)
+        conn.request('POST', '/?masterhost=%s' % (self.__hostport,), jsonstr)
         conn.close()
             
     def __do_poll(self):

Modified: dumbhippo/trunk/firehose/firehose/jobs/poller.py
===================================================================
--- dumbhippo/trunk/firehose/firehose/jobs/poller.py	2008-02-21 20:36:48 UTC (rev 7328)
+++ dumbhippo/trunk/firehose/firehose/jobs/poller.py	2008-02-22 17:14:21 UTC (rev 7329)
@@ -77,9 +77,13 @@
 class TaskRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
     def do_POST(self):
         _logger.debug("handling POST")
+        def parsequery(s):
+            args = s.split('&')
+            return dict(map(lambda x: map(urllib.unquote, x.split('=',1)), args))
+        masterhost = parsequery(urlparse.urlparse(self.path).query)['masterhost']
         taskids = simplejson.load(self.rfile)
         poller = TaskPoller.get()
-        poller.poll_tasks(taskids)
+        poller.poll_tasks(taskids, masterhost)
         
 _instance = None
 class TaskPoller(object):   
@@ -95,7 +99,6 @@
         bindport = int(config.get('firehose.localslaveport'))
         self.__server = BaseHTTPServer.HTTPServer(('', bindport), TaskRequestHandler)
         self.__active_collectors = set()
-        self.__master_hostport = config.get('firehose.masterhost')
         
     def run_async(self):
         thr = threading.Thread(target=self.run)
@@ -106,29 +109,26 @@
     def run(self):
         self.__server.serve_forever()
         
-    def __send_results(self, results):
+    def __send_results(self, results, masterhost):
         dumped_results = simplejson.dumps(results)
-        connection = httplib.HTTPConnection(self.__master_hostport)
+        _logger.debug("opening connection to %r" % (masterhost,))
+        connection = httplib.HTTPConnection(masterhost)
         connection.request('POST', '/taskset_status', dumped_results,
                            headers={'Content-Type': 'text/javascript'})
         
     @log_except(_logger)        
-    def __run_collect_tasks(self, taskqueue, resultqueue):
-        _logger.debug("doing join on taskqueue")
-        taskqueue.join()
-        _logger.debug("all tasks complete")
+    def __run_collect_tasks(self, resultcount, resultqueue, masterhost):
         results = []
-        while True:
-            try:
-                result = resultqueue.get(False)
-                results.append(result)
-            except Queue.Empty:
-                break
+        _logger.debug("expecting %r results", resultcount)
+        while len(results) < resultcount:
+            result = resultqueue.get()
+            if result is not None:
+                results.append(result)     
         _logger.debug("sending %d results", len(results))            
-        self.__send_results(results)
+        self.__send_results(results, masterhost)
         
     @log_except(_logger)        
-    def __run_task(self, taskid, prev_hash, prev_timestamp, taskqueue, resultqueue):
+    def __run_task(self, taskid, prev_hash, prev_timestamp, resultqueue):
         (family, tid) = taskid.split('/', 1)
         try:
             fclass = _task_families[family]
@@ -142,15 +142,15 @@
             _logger.exception("Failed task id %r", tid)
             (new_hash, new_timestamp) = (None, None)
         if new_hash is not None:
-            resultqueue.put((taskid, new_hash, new_timestamp))                 
-        taskqueue.task_done()   
+            resultqueue.put((taskid, new_hash, new_timestamp))
+        resultqueue.put(None)
         
-    def poll_tasks(self, tasks):
-        taskqueue = Queue.Queue()
+    def poll_tasks(self, tasks, masterhost):
+        taskcount = 0
         resultqueue = Queue.Queue()
         for (taskid, prev_hash, prev_timestamp) in tasks:
-            taskqueue.put(taskid)
-            thread = threading.Thread(target=self.__run_task, args=(taskid, prev_hash, prev_timestamp, taskqueue, resultqueue))
+            taskcount += 1
+            thread = threading.Thread(target=self.__run_task, args=(taskid, prev_hash, prev_timestamp, resultqueue))
             thread.start()
-        collector = threading.Thread(target=self.__run_collect_tasks, args=(taskqueue,resultqueue))
+        collector = threading.Thread(target=self.__run_collect_tasks, args=(taskcount,resultqueue,masterhost))
         collector.start()



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]