r7358 - dumbhippo/trunk/firehose/firehose/jobs



Author: walters
Date: 2008-03-11 20:32:52 -0500 (Tue, 11 Mar 2008)
New Revision: 7358

Modified:
   dumbhippo/trunk/firehose/firehose/jobs/master.py
Log:
Bump task size, document things more.

Also randomize the database load to avoid all items for
say digg being together.


Modified: dumbhippo/trunk/firehose/firehose/jobs/master.py
===================================================================
--- dumbhippo/trunk/firehose/firehose/jobs/master.py	2008-03-12 00:23:22 UTC (rev 7357)
+++ dumbhippo/trunk/firehose/firehose/jobs/master.py	2008-03-12 01:32:52 UTC (rev 7358)
@@ -1,6 +1,6 @@
 #!/usr/bin/python
 
-import os,sys,re,heapq,time,httplib,logging,threading
+import os,sys,re,heapq,time,httplib,logging,threading,random
 import traceback,urlparse,StringIO
 import BaseHTTPServer,urllib
 
@@ -27,11 +27,25 @@
 aws_config_path = os.path.expanduser('~/.aws')
 execfile(aws_config_path)
 
+# How often we an individual task becomes eligible
 DEFAULT_POLL_PERIODICITY_SECS = 45 * 60 # 45 minutes
+# Ignore this number
 MIN_POLL_TIME_SECS = 15
+
+# The size of the sliding window used - we get all tasks which
+# are going to be ready inside this window. 
 TASKSET_TIMEOUT_SECS = 7 * 60 # 7 minutes
-TASKSET_POLL_CHECK_SECS = 15
-MAX_TASKSET_SIZE = 30
+# Period at which we check for eligible tasks.
+TASKSET_POLL_CHECK_SECS = 30
+# FIXME - remove this and base on dynamic scaling
+# We base this number on our current total number of tasks, which is ~20000.
+# Let's say we want to take a maximum of an hour to update them all.  Then
+# We need to process tasks/second = 20000/1.0*60*60=5.5
+# Then every 30s, we need to process 6*30=180 tasks.
+# As mentioned above, we really want TASKSET_SIZE=1 and to dynamically scale
+# machines based on pending size
+MAX_TASKSET_SIZE = 200
+# Capped right now at 1 until we need a cluster
 MAX_TASKSET_WORKERS = 1
 
 class TaskStatusHandler(BaseHTTPServer.BaseHTTPRequestHandler):
@@ -101,11 +115,16 @@
         cursor.execute('''CREATE INDEX IF NOT EXISTS TasksIdx on Tasks (key)''')
         
         curtime = time.time()
+        taskentries = []
         for key,prev_hash,prev_time in cursor.execute('''SELECT key,prev_hash,prev_time from Tasks'''):
             task = TaskEntry(key, prev_hash, prev_time)
+            taskentries.append(task)
+        # We explicitly randomize to avoid database grouping
+        random.shuffle(taskentries)
+        for task in taskentries:
             qtask = QueuedTask(curtime, task)
             heapq.heappush(self.__tasks_queue, qtask)
-            self.__tasks_map[key] = task
+            self.__tasks_map[str(task)] = task
         conn.close()
         _logger.debug("%d queued tasks", len(self.__tasks_queue))
         



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]