r7277 - in firehose: . firehose firehose/jobs



Author: walters
Date: 2008-01-30 16:33:57 -0600 (Wed, 30 Jan 2008)
New Revision: 7277

Modified:
   firehose/dev.cfg
   firehose/firehose/controllers.py
   firehose/firehose/jobs/master.py
   firehose/firehose/jobs/poller.py
   firehose/start-firehose.py
Log:
Closer to running.


Modified: firehose/dev.cfg
===================================================================
--- firehose/dev.cfg	2008-01-30 20:23:26 UTC (rev 7276)
+++ firehose/dev.cfg	2008-01-30 22:33:57 UTC (rev 7277)
@@ -11,7 +11,8 @@
 # sqlobject.dburi="mysql://username:password hostname:port/databasename"
 # sqlobject.dburi="sqlite:///file_name_and_path"
 
-firehose.taskdbpath="'%(current_dir_uri)s/dev-tasks.sqlite"
+firehose.taskdbpath="%(current_dir_uri)s/dev-tasks.sqlite"
+firehose.masterhost="localhost"
 firehose.slaveport="8090"
 
 # if you are using a database or table type without transactions

Modified: firehose/firehose/controllers.py
===================================================================
--- firehose/firehose/controllers.py	2008-01-30 20:23:26 UTC (rev 7276)
+++ firehose/firehose/controllers.py	2008-01-30 22:33:57 UTC (rev 7277)
@@ -1,4 +1,5 @@
 from turbogears import controllers, expose, flash
+import cherrypy
 # from model import *
 # import logging
 # log = logging.getLogger("firehose.controllers")
@@ -11,9 +12,23 @@
         flash("Your application is now running")
         return dict(now=time.ctime())
 
-    @expose("json", as_format="json", accept_format="text/javascript")
-    def addtask(self, taskid):
+    @expose("json")
+    def addtask(self, taskid=None):
+        if cherrypy.request.method != 'POST':
+            raise Exception("Must invoke this method using POST")
+        if taskid is None:
+            return {}
+        
         from firehose.jobs.master import MasterPoller        
         master = MasterPoller.get()
         master.add_task(taskid)
-        return {}
\ No newline at end of file
+        return {}
+    
+    @expose("json")
+    def taskset_status(self, results=None):
+        if cherrypy.request.method != 'POST':
+            raise Exception("Must invoke this method using POST")
+        from firehose.jobs.master import MasterPoller        
+        master = MasterPoller.get()
+        master.taskset_status(results)
+        return {}                
\ No newline at end of file

Modified: firehose/firehose/jobs/master.py
===================================================================
--- firehose/firehose/jobs/master.py	2008-01-30 20:23:26 UTC (rev 7276)
+++ firehose/firehose/jobs/master.py	2008-01-30 22:33:57 UTC (rev 7277)
@@ -1,6 +1,6 @@
 #!/usr/bin/python
 
-import os,sys,re,heapq,time,httplib
+import os,sys,re,heapq,time,httplib,logging,threading
 
 if sys.version_info[0] < 2 or sys.version_info[1] < 5:
     from pysqlite2 import dbapi2 as sqlite3
@@ -13,7 +13,7 @@
 from boto.s3.key import Key
 
 from turbogears import config
-import turbogears.jsonify
+import turbojson.jsonify
 
 from tasks import TaskKey
 
@@ -28,6 +28,14 @@
 MAX_TASKSET_SIZE = 30
 MAX_TASKSET_WORKERS = 1
 
+class TaskStatusHandler(BaseHTTPServer.BaseHTTPRequestHandler):
+    def do_POST(self):
+        _logger.debug("handling POST")
+        data = self.rfile.read()
+        taskset_results = simplejson.load(data)
+        poller = MasterPoller.get()
+        poller.taskset_status(taskset_results)
+
 class QueuedTask(object):
     __slots__ = ['eligibility', 'task']
     def __init__(self, eligibility, task):
@@ -60,13 +68,19 @@
             self.__task_lock.release()
     
     def add_task(self, taskkey):
-        cursor = self.__conn.cursor()
-        cursor.execute('''INSERT INTO Tasks VALUES (?)''',
-                       taskkey)
-        self.__add_task_for_key(taskkey)        
+        cursor = self.__conn.cursor()         
+        self.__set_task_status(cursor, taskkey, None, None)
+        self.__add_task_for_key(taskkey)
         
-    def taskset_status(self, status_str):
-        pass
+    def __set_task_status(self, cursor, taskkey, hashcode, timestamp):
+        cursor.execute('''INSERT INTO Tasks VALUES (?, ?, ?)''',
+                       taskkey, hashcode, timestamp)
+        
+    def taskset_status(self, results):
+        _logger.debug("got %d results", len(results))
+        cursor = self.__conn.cursor()        
+        for (taskkey,hashcode,timestamp) in results:
+            self.__set_task_status(cursor, taskkey, hashcode, timestamp)
 
     def __init__(self):
         self.__tasks = []
@@ -74,19 +88,25 @@
         self.__task_lock = threading.Lock()
         
         # Default to one slave on localhost
-        self.__worker_urls = ['localhost:%d' + int(config.get('firehose.slaveport'))]
+        self.__worker_urls = ['localhost:%d' % (int(config.get('firehose.slaveport')),)]
+        _logger.debug("worker urls are %r", self.__worker_urls)
         
         path = config.get('firehose.taskdbpath')
+        _logger.debug("connecting to %r", path)
         self.__conn = sqlite3.connect(path, isolation_level=None)
         cursor = self.__conn.cursor()
-        cursor.execute('''CREATE TABLE IF NOT EXISTS Tasks (tid INTEGER PRIMARY KEY AUTOINCREMENT, 
-                                                            key TEXT UNIQUE IGNORE)''')
+        cursor.execute('''CREATE TABLE IF NOT EXISTS Tasks (key TEXT UNIQUE ON CONFLICT IGNORE,
+                                                            resulthash TEXT,
+                                                            resulttime DATETIME)''')
         cursor.execute('''CREATE INDEX IF NOT EXISTS TasksIdx on Tasks (key)''')
         
         curtime = time.time()
         for v in cursor.execute('''SELECT key from Tasks'''):
             task = QueuedTask(curtime, TaskKey(v))
             heapq.heappush(self.__tasks, task)
+            
+    def start(self):
+        self.__requeue_poll()
 
     def __unset_poll(self):
         try:

Modified: firehose/firehose/jobs/poller.py
===================================================================
--- firehose/firehose/jobs/poller.py	2008-01-30 20:23:26 UTC (rev 7276)
+++ firehose/firehose/jobs/poller.py	2008-01-30 22:33:57 UTC (rev 7277)
@@ -64,7 +64,7 @@
 class TaskRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
     def do_POST(self):
         _logger.debug("handling POST")
-        data = rfile.read()
+        data = self.rfile.read()
         taskids = simplejson.load(data)
         poller = TaskPoller.get()
         poller.poll_tasks(taskids)
@@ -76,18 +76,23 @@
     def get():
         global _instance
         if _instance is None:
-            _instance = MasterPoller()
+            _instance = TaskPoller()
         return _instance
         
     def __init__(self):
         bindport = int(config.get('firehose.slaveport'))
         self.__server = BaseHTTPServer.HTTPServer(('', bindport), TaskRequestHandler)
         self.__active_collectors = set()
+        self.__master_hostport = (config.get('firehose.masterhost'), 8080)
         
     def run(self):
         self.__server.serve_forever()
         
-    def __send_results(self, ):
+    def __send_results(self, results):
+        dumped_results = simplejson.dumps(results)
+        connection = httplib.HTTPConnection(*(self.__master_hostport))
+        connection.request('POST', '/taskset-status', dumped_results,
+                           headers={'Content-Type': 'text/javascript'})
         
     def __run_collect_tasks(self, taskqueue, resultqueue):
         _logger.debug("doing join on taskqueue")

Modified: firehose/start-firehose.py
===================================================================
--- firehose/start-firehose.py	2008-01-30 20:23:26 UTC (rev 7276)
+++ firehose/start-firehose.py	2008-01-30 22:33:57 UTC (rev 7277)
@@ -21,5 +21,8 @@
     update_config(configfile="prod.cfg",modulename="firehose.config")
 config.update(dict(package="firehose"))
 
+from firehose.jobs.master import MasterPoller
+MasterPoller.get().start()
+
 from firehose.controllers import Root
 start_server(Root())



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]