r7329 - dumbhippo/trunk/firehose/firehose/jobs
- From: commits mugshot org
- To: online-desktop-list gnome org
- Subject: r7329 - dumbhippo/trunk/firehose/firehose/jobs
- Date: Fri, 22 Feb 2008 11:14:24 -0600 (CST)
Author: walters
Date: 2008-02-22 11:14:21 -0600 (Fri, 22 Feb 2008)
New Revision: 7329
Modified:
dumbhippo/trunk/firehose/firehose/jobs/master.py
dumbhippo/trunk/firehose/firehose/jobs/poller.py
Log:
Avoid using Python 2.5 Queue bits.
Send result status back via SQS.
Modified: dumbhippo/trunk/firehose/firehose/jobs/master.py
===================================================================
--- dumbhippo/trunk/firehose/firehose/jobs/master.py 2008-02-21 20:36:48 UTC (rev 7328)
+++ dumbhippo/trunk/firehose/firehose/jobs/master.py 2008-02-22 17:14:21 UTC (rev 7329)
@@ -74,6 +74,9 @@
self.__client_url = config.get('firehose.clienturl')
+ import socket
+ self.__hostport = '%s:%s' % (socket.gethostname(), config.get('server.socket_port'))
+
# Default to one slave on localhost
self.__worker_endpoints = ['localhost:%d' % (int(config.get('firehose.localslaveport')),)]
_logger.debug("worker endpoints are %r", self.__worker_endpoints)
@@ -207,7 +210,6 @@
@log_except(_logger)
def __push_changed(self):
_logger.debug("doing change push")
- extkey = config.get('firehose.externalServiceKey')
try:
self.__task_lock.acquire()
self.__changed_thread_queued = False
@@ -215,14 +217,26 @@
self.__changed_buffer = []
finally:
self.__task_lock.release()
- update_obj = {'updated_keys': changed}
- jsonstr = simplejson.dumps(update_obj)
- parsed = urlparse.urlparse(self.__client_url)
- conn = httplib.HTTPConnection(parsed.hostname, parsed.port)
- path = parsed.path or '/'
- path += '?esk=' + extkey
- conn.request('POST', path, jsonstr)
- conn.close()
+
+ maxchars = 8*1024
+ remaining_bytes = maxchars
+ taskcount = 0
+ buf = "add\n"
+ _logger.debug("splitting %d tasks into messages", len(changed));
+ for task in changed:
+ buf += task
+ buf += '\n'
+ taskcount += 1
+ new_remaining_bytes = remaining_bytes - (len(task)+1);
+ if new_remaining_bytes < 0:
+ self.__sqs_outgoing_q.write(self.__sqs_outgoing_q.new_message(buf))
+ remaining_bytes = maxchars
+ taskcount = 0
+ buf = "add\n"
+ else:
+ remaining_bytes = new_remaining_bytes
+ if taskcount > 0:
+ self.__sqs_outgoing_q.write(self.__sqs_outgoing_q.new_message(buf))
def __append_changed(self, changed):
if len(changed) == 0:
@@ -289,7 +303,7 @@
def __enqueue_taskset(self, worker, taskset):
jsonstr = simplejson.dumps(taskset)
conn = httplib.HTTPConnection(worker)
- conn.request('POST', '/', jsonstr)
+ conn.request('POST', '/?masterhost=%s' % (self.__hostport,), jsonstr)
conn.close()
def __do_poll(self):
Modified: dumbhippo/trunk/firehose/firehose/jobs/poller.py
===================================================================
--- dumbhippo/trunk/firehose/firehose/jobs/poller.py 2008-02-21 20:36:48 UTC (rev 7328)
+++ dumbhippo/trunk/firehose/firehose/jobs/poller.py 2008-02-22 17:14:21 UTC (rev 7329)
@@ -77,9 +77,13 @@
class TaskRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
def do_POST(self):
_logger.debug("handling POST")
+ def parsequery(s):
+ args = s.split('&')
+ return dict(map(lambda x: map(urllib.unquote, x.split('=',1)), args))
+ masterhost = parsequery(urlparse.urlparse(self.path).query)['masterhost']
taskids = simplejson.load(self.rfile)
poller = TaskPoller.get()
- poller.poll_tasks(taskids)
+ poller.poll_tasks(taskids, masterhost)
_instance = None
class TaskPoller(object):
@@ -95,7 +99,6 @@
bindport = int(config.get('firehose.localslaveport'))
self.__server = BaseHTTPServer.HTTPServer(('', bindport), TaskRequestHandler)
self.__active_collectors = set()
- self.__master_hostport = config.get('firehose.masterhost')
def run_async(self):
thr = threading.Thread(target=self.run)
@@ -106,29 +109,26 @@
def run(self):
self.__server.serve_forever()
- def __send_results(self, results):
+ def __send_results(self, results, masterhost):
dumped_results = simplejson.dumps(results)
- connection = httplib.HTTPConnection(self.__master_hostport)
+ _logger.debug("opening connection to %r" % (masterhost,))
+ connection = httplib.HTTPConnection(masterhost)
connection.request('POST', '/taskset_status', dumped_results,
headers={'Content-Type': 'text/javascript'})
@log_except(_logger)
- def __run_collect_tasks(self, taskqueue, resultqueue):
- _logger.debug("doing join on taskqueue")
- taskqueue.join()
- _logger.debug("all tasks complete")
+ def __run_collect_tasks(self, resultcount, resultqueue, masterhost):
results = []
- while True:
- try:
- result = resultqueue.get(False)
- results.append(result)
- except Queue.Empty:
- break
+ _logger.debug("expecting %r results", resultcount)
+ while len(results) < resultcount:
+ result = resultqueue.get()
+ if result is not None:
+ results.append(result)
_logger.debug("sending %d results", len(results))
- self.__send_results(results)
+ self.__send_results(results, masterhost)
@log_except(_logger)
- def __run_task(self, taskid, prev_hash, prev_timestamp, taskqueue, resultqueue):
+ def __run_task(self, taskid, prev_hash, prev_timestamp, resultqueue):
(family, tid) = taskid.split('/', 1)
try:
fclass = _task_families[family]
@@ -142,15 +142,15 @@
_logger.exception("Failed task id %r", tid)
(new_hash, new_timestamp) = (None, None)
if new_hash is not None:
- resultqueue.put((taskid, new_hash, new_timestamp))
- taskqueue.task_done()
+ resultqueue.put((taskid, new_hash, new_timestamp))
+ resultqueue.put(None)
- def poll_tasks(self, tasks):
- taskqueue = Queue.Queue()
+ def poll_tasks(self, tasks, masterhost):
+ taskcount = 0
resultqueue = Queue.Queue()
for (taskid, prev_hash, prev_timestamp) in tasks:
- taskqueue.put(taskid)
- thread = threading.Thread(target=self.__run_task, args=(taskid, prev_hash, prev_timestamp, taskqueue, resultqueue))
+ taskcount += 1
+ thread = threading.Thread(target=self.__run_task, args=(taskid, prev_hash, prev_timestamp, resultqueue))
thread.start()
- collector = threading.Thread(target=self.__run_collect_tasks, args=(taskqueue,resultqueue))
+ collector = threading.Thread(target=self.__run_collect_tasks, args=(taskcount,resultqueue,masterhost))
collector.start()
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]