r7347 - dumbhippo/trunk/firehose/firehose/jobs
- From: commits mugshot org
- To: online-desktop-list gnome org
- Subject: r7347 - dumbhippo/trunk/firehose/firehose/jobs
- Date: Mon, 3 Mar 2008 14:37:01 -0600 (CST)
Author: walters
Date: 2008-03-03 14:37:01 -0600 (Mon, 03 Mar 2008)
New Revision: 7347
Modified:
dumbhippo/trunk/firehose/firehose/jobs/master.py
dumbhippo/trunk/firehose/firehose/jobs/poller.py
Log:
Poll every 15s by default.
More cleanly handle multiple tasksets in flight by associating them with a serial.
Report the time delta we took to process each taskset.
Modified: dumbhippo/trunk/firehose/firehose/jobs/master.py
===================================================================
--- dumbhippo/trunk/firehose/firehose/jobs/master.py 2008-02-29 22:38:18 UTC (rev 7346)
+++ dumbhippo/trunk/firehose/firehose/jobs/master.py 2008-03-03 20:37:01 UTC (rev 7347)
@@ -27,10 +27,10 @@
aws_config_path = os.path.expanduser('~/.aws')
execfile(aws_config_path)
-DEFAULT_POLL_TIME_SECS = 45 * 60 # 45 minutes
+DEFAULT_POLL_PERIODICITY_SECS = 45 * 60 # 45 minutes
MIN_POLL_TIME_SECS = 15
TASKSET_TIMEOUT_SECS = 7 * 60 # 7 minutes
-TASKSET_POLL_CHECK_SECS = 2 * 60 # 2 minutes
+TASKSET_POLL_CHECK_SECS = 15
MAX_TASKSET_SIZE = 30
MAX_TASKSET_WORKERS = 1
@@ -72,6 +72,10 @@
self.__poll_task = None
self.__task_lock = threading.Lock()
+ self.__pending_tasksets = {} # map id => timestamp
+ self.__pending_tasksets_serial = 0
+ self.__pending_tasksets_lock = threading.Lock()
+
self.__client_url = config.get('firehose.clienturl')
import socket
@@ -253,9 +257,23 @@
self.__task_lock.release()
@log_except(_logger)
- def taskset_status(self, results):
- _logger.info("got %d results", len(results))
+ def taskset_status(self, status):
+ results = status['results']
+ serial = int(status['serial'])
+ time_delta = -1
+ try:
+ self.__pending_tasksets_lock.acquire()
+ try:
+ timestamp = self.__pending_tasksets[serial]
+ del self.__pending_tasksets[serial]
+ time_delta = time.time() - timestamp
+ except KeyError, e:
+ _logger.warn("unknown serial %s received!", serial)
+ finally:
+ self.__pending_tasksets_lock.release()
+ _logger.info("got status for serial %s; timedelta: %s, %d results", serial, time_delta, len(results))
changed = []
+
try:
self.__task_lock.acquire()
for (taskkey, hashcode, timestamp) in results:
@@ -302,8 +320,16 @@
@log_except(_logger)
def __enqueue_taskset(self, worker, taskset):
jsonstr = simplejson.dumps(taskset)
+ serial = None
+ try:
+ self.__pending_tasksets_lock.acquire()
+ serial = self.__pending_tasksets_serial
+ self.__pending_tasksets_serial += 1
+ self.__pending_tasksets[serial] = time.time()
+ finally:
+ self.__pending_tasksets_lock.release()
conn = httplib.HTTPConnection(worker)
- conn.request('POST', '/?masterhost=%s' % (self.__hostport,), jsonstr)
+ conn.request('POST', '/?masterhost=%s&serial=%s' % (self.__hostport,serial), jsonstr)
conn.close()
def __do_poll(self):
@@ -330,7 +356,7 @@
else:
i += 1
eligible = qtask.eligibility < taskset_limit
- qtask.eligibility = curtime + DEFAULT_POLL_TIME_SECS
+ qtask.eligibility = curtime + DEFAULT_POLL_PERIODICITY_SECS
heapq.heappush(self.__tasks_queue, qtask)
if eligible:
taskset.append((str(qtask.task), qtask.task.prev_hash, qtask.task.prev_timestamp))
@@ -346,11 +372,12 @@
if taskset_count > curworker_count:
_logger.info("Need worker activation, current=%d, required=%d", curworker_count, taskset_count)
self.__activate_workers()
- _logger.info("have %d active tasksets", taskset_count)
+ _logger.info("have %d tasksets to be sent", taskset_count)
if taskset_count > 0:
for worker,taskset in zip(self.__worker_endpoints,tasksets):
self.__enqueue_taskset(worker, taskset)
self.__requeue_poll()
+ _logger.debug("%d pending tasksets", len(self.__pending_tasksets))
def requeue(self):
self.__requeue_poll(immediate=True)
Modified: dumbhippo/trunk/firehose/firehose/jobs/poller.py
===================================================================
--- dumbhippo/trunk/firehose/firehose/jobs/poller.py 2008-02-29 22:38:18 UTC (rev 7346)
+++ dumbhippo/trunk/firehose/firehose/jobs/poller.py 2008-03-03 20:37:01 UTC (rev 7347)
@@ -85,10 +85,12 @@
def parsequery(s):
args = s.split('&')
return dict(map(lambda x: map(urllib.unquote, x.split('=',1)), args))
- masterhost = parsequery(urlparse.urlparse(self.path)[4])['masterhost']
+ args = parsequery(urlparse.urlparse(self.path)[4])
+ masterhost = args['masterhost']
+ serial = args['serial']
taskids = simplejson.load(self.rfile)
poller = TaskPoller.get()
- poller.poll_tasks(taskids, masterhost)
+ poller.poll_tasks(taskids, masterhost, serial)
_instance = None
class TaskPoller(object):
@@ -114,15 +116,17 @@
def run(self):
self.__server.serve_forever()
- def __send_results(self, results, masterhost):
- dumped_results = simplejson.dumps(results)
+ def __send_results(self, results, masterhost, serial):
+ data = {'results': results,
+ 'serial': serial}
+ dumped_results = simplejson.dumps(data)
_logger.debug("opening connection to %r" % (masterhost,))
connection = httplib.HTTPConnection(masterhost)
connection.request('POST', '/taskset_status', dumped_results,
headers={'Content-Type': 'text/javascript'})
@log_except(_logger)
- def __run_collect_tasks(self, resultcount, resultqueue, masterhost):
+ def __run_collect_tasks(self, resultcount, resultqueue, masterhost, serial):
results = []
received_count = 0
_logger.debug("expecting %r results", resultcount)
@@ -131,8 +135,8 @@
received_count += 1
if result is not None:
results.append(result)
- _logger.debug("sending %d results", len(results))
- self.__send_results(results, masterhost)
+ _logger.debug("sending %d results", len(results))
+ self.__send_results(results, masterhost, serial)
@log_except(_logger)
def __run_task(self, taskid, prev_hash, prev_timestamp, resultqueue):
@@ -153,12 +157,12 @@
else:
resultqueue.put(None)
- def poll_tasks(self, tasks, masterhost):
+ def poll_tasks(self, tasks, masterhost, serial):
taskcount = 0
resultqueue = Queue.Queue()
for (taskid, prev_hash, prev_timestamp) in tasks:
taskcount += 1
thread = threading.Thread(target=self.__run_task, args=(taskid, prev_hash, prev_timestamp, resultqueue))
thread.start()
- collector = threading.Thread(target=self.__run_collect_tasks, args=(taskcount,resultqueue,masterhost))
+ collector = threading.Thread(target=self.__run_collect_tasks, args=(taskcount,resultqueue,masterhost,serial))
collector.start()
[Date Prev][
Date Next] [Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]