r7281 - in firehose: . firehose firehose/jobs



Author: walters
Date: 2008-02-01 10:48:15 -0600 (Fri, 01 Feb 2008)
New Revision: 7281

Modified:
   firehose/dev.cfg
   firehose/firehose/controllers.py
   firehose/firehose/jobs/master.py
   firehose/firehose/jobs/poller.py
   firehose/firehose/jobs/tasks.py
   firehose/start-firehose.py
Log:
Boots, runs.


Modified: firehose/dev.cfg
===================================================================
--- firehose/dev.cfg	2008-01-31 19:28:09 UTC (rev 7280)
+++ firehose/dev.cfg	2008-02-01 16:48:15 UTC (rev 7281)
@@ -13,7 +13,7 @@
 
 firehose.taskdbpath="%(current_dir_uri)s/dev-tasks.sqlite"
 firehose.masterhost="localhost"
-firehose.slaveport="8090"
+firehose.slaveport="6677"
 
 # if you are using a database or table type without transactions
 # (MySQL default, for example), you should turn off transactions
@@ -31,11 +31,11 @@
 # Enable the debug output at the end on pages.
 # log_debug_info_filter.on = False
 
-server.environment="development"
-autoreload.package="firehose"
+#server.environment="development"
+#autoreload.package="firehose"
 
 # Auto-Reload after code modification
-# autoreload.on = True
+autoreload.on = False
 
 # Set to True if you'd like to abort execution if a controller gets an
 # unexpected parameter. False by default
@@ -55,6 +55,7 @@
 level='DEBUG'
 qualname='firehose'
 handlers=['debug_out']
+propagate=0
 
 [[[allinfo]]]
 level='INFO'

Modified: firehose/firehose/controllers.py
===================================================================
--- firehose/firehose/controllers.py	2008-01-31 19:28:09 UTC (rev 7280)
+++ firehose/firehose/controllers.py	2008-02-01 16:48:15 UTC (rev 7281)
@@ -1,9 +1,12 @@
+import logging
+
+import simplejson
 from turbogears import controllers, expose, flash
 import cherrypy
 # from model import *
-# import logging
-# log = logging.getLogger("firehose.controllers")
 
+_logger = logging.getLogger("firehose.controllers")
+
 class Root(controllers.RootController):
     @expose(template="firehose.templates.welcome")
     def index(self):
@@ -13,22 +16,33 @@
         return dict(now=time.ctime())
 
     @expose("json")
-    def addtask(self, taskid=None):
+    def addfeed(self, feedurl=None):
         if cherrypy.request.method != 'POST':
             raise Exception("Must invoke this method using POST")
-        if taskid is None:
+        if feedurl is None:
+            _logger.debug("no feed url specified")
             return {}
         
         from firehose.jobs.master import MasterPoller        
         master = MasterPoller.get()
-        master.add_task(taskid)
+        master.add_feed(feedurl)
         return {}
     
     @expose("json")
-    def taskset_status(self, results=None):
+    def requeue(self):
         if cherrypy.request.method != 'POST':
             raise Exception("Must invoke this method using POST")
         from firehose.jobs.master import MasterPoller        
         master = MasterPoller.get()
-        master.taskset_status(results)
+        master.requeue()
+        return {}        
+    
+    @expose("json")
+    def taskset_status(self):
+        if cherrypy.request.method != 'POST':
+            raise Exception("Must invoke this method using POST")
+        from firehose.jobs.master import MasterPoller        
+        master = MasterPoller.get()
+        status = simplejson.load(cherrypy.request.body)
+        master.taskset_status(status)
         return {}                
\ No newline at end of file

Modified: firehose/firehose/jobs/master.py
===================================================================
--- firehose/firehose/jobs/master.py	2008-01-31 19:28:09 UTC (rev 7280)
+++ firehose/firehose/jobs/master.py	2008-02-01 16:48:15 UTC (rev 7281)
@@ -1,23 +1,27 @@
 #!/usr/bin/python
 
 import os,sys,re,heapq,time,httplib,logging,threading
+import traceback
+import BaseHTTPServer,urllib
 
 if sys.version_info[0] < 2 or sys.version_info[1] < 5:
     from pysqlite2 import dbapi2 as sqlite3
 else:    
     import sqlite3
+import simplejson
 import boto
-
 from boto.sqs.connection import SQSConnection
 from boto.s3.connection import S3Connection
 from boto.s3.key import Key
 
+import turbogears
 from turbogears import config
-import turbojson.jsonify
 
-from tasks import TaskKey
+from tasks import TaskEntry
+from firehose.jobs.poller import TaskPoller
 
 _logger = logging.getLogger('firehose.Master')
+_logger.debug("hello master!")
 
 aws_config_path = os.path.expanduser('~/.aws')
 execfile(aws_config_path)
@@ -25,6 +29,7 @@
 DEFAULT_POLL_TIME_SECS = 45 * 60 # 45 minutes
 MIN_POLL_TIME_SECS = 15
 TASKSET_TIMEOUT_SECS = 7 * 60 # 7 minutes
+TASKSET_POLL_CHECK_SECS = 2 * 60 # 2 minutes
 MAX_TASKSET_SIZE = 30
 MAX_TASKSET_WORKERS = 1
 
@@ -37,7 +42,7 @@
         poller.taskset_status(taskset_results)
 
 class QueuedTask(object):
-    __slots__ = ['eligibility', 'task']
+    __slots__ = ['eligibility', 'task',]
     def __init__(self, eligibility, task):
         self.eligibility = eligibility
         self.task = task
@@ -52,61 +57,89 @@
     def get():
         global _instance
         if _instance is None:
+            _logger.debug("Constructing master poller instance")            
             _instance = MasterPoller()
         return _instance
+
+    def __init__(self):
+        global _instance
+        assert _instance is None
+        self.__tasks = []
+        self.__poll_task = None        
+        self.__task_lock = threading.Lock()
+        
+        # Default to one slave on localhost
+        self.__worker_endpoints = ['localhost:%d' % (int(config.get('firehose.slaveport')),)]
+        _logger.debug("worker endpoints are %r", self.__worker_endpoints)
+        for bind in self.__worker_endpoints:
+            (host,port) = bind.split(':')
+            if host == 'localhost':
+                _logger.debug("found localhost worker, starting it")
+                self.__local_handler = TaskPoller.get()
+                self.__local_handler.run_async()
+        
+        path = self.__path = config.get('firehose.taskdbpath')
+        _logger.debug("connecting to %r", path)
+        conn = sqlite3.connect(path, isolation_level=None)
+        cursor = conn.cursor()
+        cursor.execute('''CREATE TABLE IF NOT EXISTS Tasks (key TEXT UNIQUE,
+                                                            prev_hash TEXT,
+                                                            prev_time DATETIME)''')
+        cursor.execute('''CREATE INDEX IF NOT EXISTS TasksIdx on Tasks (key)''')
+        
+        curtime = time.time()
+        for key,prev_hash,prev_time in cursor.execute('''SELECT key,prev_hash,prev_time from Tasks'''):
+            task = QueuedTask(curtime, TaskEntry(key, prev_hash, prev_time))
+            heapq.heappush(self.__tasks, task)
+        conn.close()
+        _logger.debug("%d queued tasks", len(self.__tasks))
     
     def __add_task_for_key(self, key):
         try:
             self.__task_lock.acquire()        
-            taskkey = TaskKey(key)
+            task = TaskEntry(key, None, None)
             for qtask in self.__tasks:
-                if qtask.task == taskkey:
+                if qtask.task == task:
                     return qtask
-            newtask = QueuedTask(time.time(), taskkey)
-            self.__tasks.append(newtask)
+            qtask = QueuedTask(time.time(), task)
+            self.__tasks.append(qtask)
         finally:
             self.__task_lock.release()
+            
+    def add_feed(self, feedurl):
+        taskkey = 'feed/' + urllib.quote(feedurl)
+        self.add_task(taskkey)
     
     def add_task(self, taskkey):
-        cursor = self.__conn.cursor()         
-        self.__set_task_status(cursor, taskkey, None, None)
+        _logger.debug("adding task key=%r", taskkey)
         self.__add_task_for_key(taskkey)
+        try:
+            conn = sqlite3.connect(self.__path, isolation_level=None)        
+            cursor = conn.cursor()
+            self.__set_task_status(cursor, taskkey, None, None)
+        finally:
+            conn.close()
         
     def __set_task_status(self, cursor, taskkey, hashcode, timestamp):
-        cursor.execute('''INSERT INTO Tasks VALUES (?, ?, ?)''',
-                       taskkey, hashcode, timestamp)
-        
+        _logger.debug("updating task %r values (%r %r)", taskkey, hashcode, timestamp)
+        cursor.execute('''INSERT OR REPLACE INTO Tasks VALUES (?, ?, ?)''',
+                       (taskkey, hashcode, timestamp))
+    
     def taskset_status(self, results):
-        _logger.debug("got %d results", len(results))
-        cursor = self.__conn.cursor()        
-        for (taskkey,hashcode,timestamp) in results:
-            self.__set_task_status(cursor, taskkey, hashcode, timestamp)
-
-    def __init__(self):
-        self.__tasks = []
-        self.__poll_task = None        
-        self.__task_lock = threading.Lock()
-        
-        # Default to one slave on localhost
-        self.__worker_urls = ['localhost:%d' % (int(config.get('firehose.slaveport')),)]
-        _logger.debug("worker urls are %r", self.__worker_urls)
-        
-        path = config.get('firehose.taskdbpath')
-        _logger.debug("connecting to %r", path)
-        self.__conn = sqlite3.connect(path, isolation_level=None)
-        cursor = self.__conn.cursor()
-        cursor.execute('''CREATE TABLE IF NOT EXISTS Tasks (key TEXT UNIQUE ON CONFLICT IGNORE,
-                                                            resulthash TEXT,
-                                                            resulttime DATETIME)''')
-        cursor.execute('''CREATE INDEX IF NOT EXISTS TasksIdx on Tasks (key)''')
-        
-        curtime = time.time()
-        for v in cursor.execute('''SELECT key from Tasks'''):
-            task = QueuedTask(curtime, TaskKey(v))
-            heapq.heappush(self.__tasks, task)
+        _logger.info("got %d results", len(results))
+        _logger.debug("results: %r", results    )
+        try:
+            conn = sqlite3.connect(self.__path, isolation_level=None)
+            cursor = conn.cursor()
+            cursor.execute('''BEGIN''')   
+            for (taskkey,hashcode,timestamp) in results:
+                self.__set_task_status(cursor, taskkey, hashcode, timestamp)
+            cursor.execute('''COMMIT''')
+        finally:
+            conn.close()
             
     def start(self):
-        self.__requeue_poll()
+        self.__requeue_poll(immediate=True)
 
     def __unset_poll(self):
         try:
@@ -122,13 +155,13 @@
         raise NotImplementedError()
     
     def __enqueue_taskset(self, worker, taskset):
-        jsonstr = turbogears.jsonify.encode(taskset)
+        jsonstr = simplejson.dumps(taskset)
         conn = httplib.HTTPConnection(worker)
-        req = conn.request('POST', '/', jsonstr)
+        conn.request('POST', '/', jsonstr)
+        conn.close()
             
     def __do_poll(self):
         _logger.debug("in poll")
-        self.__unset_poll()
         
         tasksets = []
         curtime = time.time()
@@ -136,27 +169,50 @@
         try:
             self.__task_lock.acquire()
             taskset = []
-            for i,task in enumerate(self.__tasks):
+            i = 0 
+            while True:          
+                try:
+                    task = heapq.heappop(self.__tasks)
+                except IndexError, e:
+                    break
                 if i >= MAX_TASKSET_SIZE:
                     if len(tasksets) >= MAX_TASKSET_WORKERS:
                         break
                     tasksets.append(taskset)
                     taskset = []
-                if task.eligibility < taskset_limit:
-                    taskset.extend(task)
+                    i = 0                    
+                else:
+                    i += 1
+                eligible = task.eligibility < taskset_limit
+                task.eligibility = curtime + DEFAULT_POLL_TIME_SECS
+                heapq.heappush(self.__tasks, task)                 
+                if eligible:
+                    taskset.append((str(task.task), task.task.prev_hash, task.task.prev_timestamp))
+                else:
+                    break
+            if len(taskset) > 0:
+                tasksets.append(taskset)
+            taskset = None
         finally:
             self.__task_lock.release()
         taskset_count = len(tasksets)
-        if taskset_count > len(self.__worker_urls):
-            self.__activate_workers()        
-        for worker,taskset in zip(tasksets, self.__worker_urls):
-            self.__enqueue_taskset(worker, taskset)
+        curworker_count = len(self.__worker_endpoints)
+        if taskset_count > curworker_count:
+            _logger.info("Need worker activation, current=%d, required=%d", curworker_count, taskset_count)
+            self.__activate_workers()
+        _logger.info("have %d active tasksets", taskset_count)
+        if taskset_count > 0:
+            for worker,taskset in zip(self.__worker_endpoints,tasksets):
+                self.__enqueue_taskset(worker, taskset)
+            self.__requeue_poll()
+
+    def requeue(self):
         self.__requeue_poll()
 
-    def __requeue_poll(self):
+    def __requeue_poll(self, immediate=False):
         _logger.debug("doing poll requeue")
+        self.__unset_poll()
         try:
-            self.__unset_poll()
             self.__task_lock.acquire()
                     
             assert self.__poll_task is None
@@ -165,9 +221,12 @@
                 return
             curtime = time.time()
             next_timeout = self.__tasks[0].eligibility - curtime
-            if next_timeout < MIN_POLL_TIME_SECS:
+            if immediate:
+                next_timeout = 1
+            elif (next_timeout < MIN_POLL_TIME_SECS):
                 next_timeout = MIN_POLL_TIME_SECS
-            _logger.debug("requeuing check for %r", next_timeout)
-            self.__poll_task = turbogears.scheduler.add_interval_task(action=self.__do_poll, taskname='FirehoseMasterPoll', initialdelay=next_timeout, interval=1)
+            _logger.debug("requeuing check for %r secs (%r mins)", next_timeout, next_timeout/60.0)
+            self.__poll_task = turbogears.scheduler.add_interval_task(action=self.__do_poll, taskname='FirehoseMasterPoll', 
+                                                                      initialdelay=next_timeout, interval=TASKSET_POLL_CHECK_SECS)
         finally:
             self.__task_lock.release()

Modified: firehose/firehose/jobs/poller.py
===================================================================
--- firehose/firehose/jobs/poller.py	2008-01-31 19:28:09 UTC (rev 7280)
+++ firehose/firehose/jobs/poller.py	2008-02-01 16:48:15 UTC (rev 7281)
@@ -1,8 +1,9 @@
 #!/usr/bin/python
 
-import os,sys,re,heapq,time,Queue,sha
-import BaseHTTPServer,httplib,urlparse
-from email.Utils import formatdate,parsedate
+import os,sys,re,heapq,time,Queue,sha,threading
+import BaseHTTPServer,httplib,urlparse,urllib
+from email.utils import formatdate,parsedate
+import logging
 
 import boto
 
@@ -19,6 +20,9 @@
 aws_config_path = os.path.expanduser('~/.aws')
 execfile(aws_config_path)
 
+# Global hash mapping family to class
+_task_families = {}
+
 class TaskHandler(object):
     FAMILY = None
     
@@ -28,25 +32,26 @@
         raise NotImplementedError() 
 
 class FeedTaskHandler(object):
-    FAMILY = 'http-feed'
+    FAMILY = 'feed'
     
     def run(self, id, prev_hash, prev_timestamp):
         targeturl = urllib.unquote(id)
         parsedurl = urlparse.urlparse(targeturl)
         try:
-            connection = httplib.HTTPConnection(parsedurl.host, parsedurl.port)
+            _logger.info('Connecting to %r', targeturl)
+            connection = httplib.HTTPConnection(parsedurl.hostname, parsedurl.port)
             connection.request('GET', parsedurl.path,
                                headers={'If-Modified-Since':
                                         formatdate(prev_timestamp)})
             response = connection.getresponse()
             if response.status == 304:
+                _logger.info("Got 304 Unmodified for %r", targeturl)
                 return (prev_hash, prev_timestamp) 
             data = response.read()
-            new_timestamp = resp_headers
             hash = sha.new()
             hash.update(data)
             hash_hex = hash.hexdigest()
-            timestamp_str = response.getheader('Last-Modified')
+            timestamp_str = response.getheader('Last-Modified', None)
             if timestamp_str is not None:
                 timestamp = parsedate(timestamp_str)
             else:
@@ -60,22 +65,22 @@
                 connection.close()
             except:
                 pass
+_task_families[FeedTaskHandler.FAMILY] = FeedTaskHandler
 
 class TaskRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
     def do_POST(self):
         _logger.debug("handling POST")
-        data = self.rfile.read()
-        taskids = simplejson.load(data)
+        taskids = simplejson.load(self.rfile)
         poller = TaskPoller.get()
         poller.poll_tasks(taskids)
         
 _instance = None
-class TaskPoller(SimpleHTTPServer.BaseHTTPServer):
-    
+class TaskPoller(object):   
     @staticmethod
     def get():
         global _instance
         if _instance is None:
+            _logger.debug("constructing task poller")
             _instance = TaskPoller()
         return _instance
         
@@ -85,13 +90,18 @@
         self.__active_collectors = set()
         self.__master_hostport = (config.get('firehose.masterhost'), 8080)
         
+    def run_async(self):
+        thr = threading.Thread(target=self.run)
+        thr.setDaemon(True)
+        thr.start()
+        
     def run(self):
         self.__server.serve_forever()
         
     def __send_results(self, results):
         dumped_results = simplejson.dumps(results)
         connection = httplib.HTTPConnection(*(self.__master_hostport))
-        connection.request('POST', '/taskset-status', dumped_results,
+        connection.request('POST', '/taskset_status', dumped_results,
                            headers={'Content-Type': 'text/javascript'})
         
     def __run_collect_tasks(self, taskqueue, resultqueue):
@@ -107,15 +117,25 @@
                 break 
         self.__send_results(results)
         
-    def poll_tasks(self, taskids):
+    def __run_task(self, taskid, prev_hash, prev_timestamp, taskqueue, resultqueue):
+        (family, tid) = taskid.split('/', 1)
+        try:
+            fclass = _task_families[family]
+        except KeyError, e:
+            _logger.exception("Failed to find family for task %r", taskid)
+            return
+        inst = fclass()
+        (new_hash, new_timestamp) = inst.run(tid, prev_hash, prev_timestamp)
+        _logger.info("Result hash:%r ts:%r", new_hash, new_timestamp)
+        resultqueue.put((taskid, new_hash, new_timestamp))     
+        taskqueue.task_done()   
+        
+    def poll_tasks(self, tasks):
         taskqueue = Queue.Queue()
         resultqueue = Queue.Queue()
-        for task in taskids:
-            taskqueue.put(task)
-            thread = threading.Thread(target=self.__run_task, args=(task,resultqueue))
+        for (taskid, prev_hash, prev_timestamp) in tasks:
+            taskqueue.put(taskid)
+            thread = threading.Thread(target=self.__run_task, args=(taskid, prev_hash, prev_timestamp, taskqueue, resultqueue))
             thread.start()
         collector = threading.Thread(target=self.__run_collect_tasks, args=(taskqueue,resultqueue))
         collector.start()
-    
-    def run(self):
-        pass

Modified: firehose/firehose/jobs/tasks.py
===================================================================
--- firehose/firehose/jobs/tasks.py	2008-01-31 19:28:09 UTC (rev 7280)
+++ firehose/firehose/jobs/tasks.py	2008-02-01 16:48:15 UTC (rev 7281)
@@ -2,11 +2,15 @@
 
 import os,sys,re
 
-class TaskKey(object):
+class TaskEntry(object):
     family = property(lambda self: self._family)
     id = property(lambda self: self._id)
-    def __init__(self, keystr):
+    prev_hash = property(lambda self: self._prev_hash)
+    prev_timestamp = property(lambda self: self._prev_timestamp)    
+    def __init__(self, keystr, prev_hash, prev_ts):
+        super(TaskEntry, self).__init__()
         (self._family, self._id) = keystr.split('/', 1)
+        (self._prev_hash, self._prev_timestamp) = prev_hash, prev_ts
         
     def __cmp__(self, other):
         v = cmp(self.family, other.family)
@@ -14,6 +18,9 @@
             return v
         return cmp(self.id, other.id)
     
+    def __str__(self):
+        return self.family + '/' + self.id
+    
     def __json__(self):
         return {
           "family" : self.family,

Modified: firehose/start-firehose.py
===================================================================
--- firehose/start-firehose.py	2008-01-31 19:28:09 UTC (rev 7280)
+++ firehose/start-firehose.py	2008-02-01 16:48:15 UTC (rev 7281)
@@ -21,8 +21,13 @@
     update_config(configfile="prod.cfg",modulename="firehose.config")
 config.update(dict(package="firehose"))
 
+import firehose.jobs.master
 from firehose.jobs.master import MasterPoller
-MasterPoller.get().start()
+import logging
+l = logging.getLogger('firehose.Startup')
+l.debug("Startup!")
+master = MasterPoller.get()
+master.start()
 
 from firehose.controllers import Root
 start_server(Root())



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]