r7358 - dumbhippo/trunk/firehose/firehose/jobs
- From: commits mugshot org
- To: online-desktop-list gnome org
- Subject: r7358 - dumbhippo/trunk/firehose/firehose/jobs
- Date: Tue, 11 Mar 2008 20:32:53 -0500 (CDT)
Author: walters
Date: 2008-03-11 20:32:52 -0500 (Tue, 11 Mar 2008)
New Revision: 7358
Modified:
dumbhippo/trunk/firehose/firehose/jobs/master.py
Log:
Bump task size, document things more.
Also randomize the database load to avoid all items for
say digg being together.
Modified: dumbhippo/trunk/firehose/firehose/jobs/master.py
===================================================================
--- dumbhippo/trunk/firehose/firehose/jobs/master.py 2008-03-12 00:23:22 UTC (rev 7357)
+++ dumbhippo/trunk/firehose/firehose/jobs/master.py 2008-03-12 01:32:52 UTC (rev 7358)
@@ -1,6 +1,6 @@
#!/usr/bin/python
-import os,sys,re,heapq,time,httplib,logging,threading
+import os,sys,re,heapq,time,httplib,logging,threading,random
import traceback,urlparse,StringIO
import BaseHTTPServer,urllib
@@ -27,11 +27,25 @@
aws_config_path = os.path.expanduser('~/.aws')
execfile(aws_config_path)
+# How often we an individual task becomes eligible
DEFAULT_POLL_PERIODICITY_SECS = 45 * 60 # 45 minutes
+# Ignore this number
MIN_POLL_TIME_SECS = 15
+
+# The size of the sliding window used - we get all tasks which
+# are going to be ready inside this window.
TASKSET_TIMEOUT_SECS = 7 * 60 # 7 minutes
-TASKSET_POLL_CHECK_SECS = 15
-MAX_TASKSET_SIZE = 30
+# Period at which we check for eligible tasks.
+TASKSET_POLL_CHECK_SECS = 30
+# FIXME - remove this and base on dynamic scaling
+# We base this number on our current total number of tasks, which is ~20000.
+# Let's say we want to take a maximum of an hour to update them all. Then
+# We need to process tasks/second = 20000/1.0*60*60=5.5
+# Then every 30s, we need to process 6*30=180 tasks.
+# As mentioned above, we really want TASKSET_SIZE=1 and to dynamically scale
+# machines based on pending size
+MAX_TASKSET_SIZE = 200
+# Capped right now at 1 until we need a cluster
MAX_TASKSET_WORKERS = 1
class TaskStatusHandler(BaseHTTPServer.BaseHTTPRequestHandler):
@@ -101,11 +115,16 @@
cursor.execute('''CREATE INDEX IF NOT EXISTS TasksIdx on Tasks (key)''')
curtime = time.time()
+ taskentries = []
for key,prev_hash,prev_time in cursor.execute('''SELECT key,prev_hash,prev_time from Tasks'''):
task = TaskEntry(key, prev_hash, prev_time)
+ taskentries.append(task)
+ # We explicitly randomize to avoid database grouping
+ random.shuffle(taskentries)
+ for task in taskentries:
qtask = QueuedTask(curtime, task)
heapq.heappush(self.__tasks_queue, qtask)
- self.__tasks_map[key] = task
+ self.__tasks_map[str(task)] = task
conn.close()
_logger.debug("%d queued tasks", len(self.__tasks_queue))
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]