r7277 - in firehose: . firehose firehose/jobs
- From: commits mugshot org
- To: online-desktop-list gnome org
- Subject: r7277 - in firehose: . firehose firehose/jobs
- Date: Wed, 30 Jan 2008 16:33:57 -0600 (CST)
Author: walters
Date: 2008-01-30 16:33:57 -0600 (Wed, 30 Jan 2008)
New Revision: 7277
Modified:
firehose/dev.cfg
firehose/firehose/controllers.py
firehose/firehose/jobs/master.py
firehose/firehose/jobs/poller.py
firehose/start-firehose.py
Log:
Closer to running.
Modified: firehose/dev.cfg
===================================================================
--- firehose/dev.cfg 2008-01-30 20:23:26 UTC (rev 7276)
+++ firehose/dev.cfg 2008-01-30 22:33:57 UTC (rev 7277)
@@ -11,7 +11,8 @@
# sqlobject.dburi="mysql://username:password hostname:port/databasename"
# sqlobject.dburi="sqlite:///file_name_and_path"
-firehose.taskdbpath="'%(current_dir_uri)s/dev-tasks.sqlite"
+firehose.taskdbpath="%(current_dir_uri)s/dev-tasks.sqlite"
+firehose.masterhost="localhost"
firehose.slaveport="8090"
# if you are using a database or table type without transactions
Modified: firehose/firehose/controllers.py
===================================================================
--- firehose/firehose/controllers.py 2008-01-30 20:23:26 UTC (rev 7276)
+++ firehose/firehose/controllers.py 2008-01-30 22:33:57 UTC (rev 7277)
@@ -1,4 +1,5 @@
from turbogears import controllers, expose, flash
+import cherrypy
# from model import *
# import logging
# log = logging.getLogger("firehose.controllers")
@@ -11,9 +12,23 @@
flash("Your application is now running")
return dict(now=time.ctime())
- @expose("json", as_format="json", accept_format="text/javascript")
- def addtask(self, taskid):
+ @expose("json")
+ def addtask(self, taskid=None):
+ if cherrypy.request.method != 'POST':
+ raise Exception("Must invoke this method using POST")
+ if taskid is None:
+ return {}
+
from firehose.jobs.master import MasterPoller
master = MasterPoller.get()
master.add_task(taskid)
- return {}
\ No newline at end of file
+ return {}
+
+ @expose("json")
+ def taskset_status(self, results=None):
+ if cherrypy.request.method != 'POST':
+ raise Exception("Must invoke this method using POST")
+ from firehose.jobs.master import MasterPoller
+ master = MasterPoller.get()
+ master.taskset_status(results)
+ return {}
\ No newline at end of file
Modified: firehose/firehose/jobs/master.py
===================================================================
--- firehose/firehose/jobs/master.py 2008-01-30 20:23:26 UTC (rev 7276)
+++ firehose/firehose/jobs/master.py 2008-01-30 22:33:57 UTC (rev 7277)
@@ -1,6 +1,6 @@
#!/usr/bin/python
-import os,sys,re,heapq,time,httplib
+import os,sys,re,heapq,time,httplib,logging,threading
if sys.version_info[0] < 2 or sys.version_info[1] < 5:
from pysqlite2 import dbapi2 as sqlite3
@@ -13,7 +13,7 @@
from boto.s3.key import Key
from turbogears import config
-import turbogears.jsonify
+import turbojson.jsonify
from tasks import TaskKey
@@ -28,6 +28,14 @@
MAX_TASKSET_SIZE = 30
MAX_TASKSET_WORKERS = 1
+class TaskStatusHandler(BaseHTTPServer.BaseHTTPRequestHandler):
+ def do_POST(self):
+ _logger.debug("handling POST")
+ data = self.rfile.read()
+ taskset_results = simplejson.load(data)
+ poller = MasterPoller.get()
+ poller.taskset_status(taskset_results)
+
class QueuedTask(object):
__slots__ = ['eligibility', 'task']
def __init__(self, eligibility, task):
@@ -60,13 +68,19 @@
self.__task_lock.release()
def add_task(self, taskkey):
- cursor = self.__conn.cursor()
- cursor.execute('''INSERT INTO Tasks VALUES (?)''',
- taskkey)
- self.__add_task_for_key(taskkey)
+ cursor = self.__conn.cursor()
+ self.__set_task_status(cursor, taskkey, None, None)
+ self.__add_task_for_key(taskkey)
- def taskset_status(self, status_str):
- pass
+ def __set_task_status(self, cursor, taskkey, hashcode, timestamp):
+ cursor.execute('''INSERT INTO Tasks VALUES (?, ?, ?)''',
+ taskkey, hashcode, timestamp)
+
+ def taskset_status(self, results):
+ _logger.debug("got %d results", len(results))
+ cursor = self.__conn.cursor()
+ for (taskkey,hashcode,timestamp) in results:
+ self.__set_task_status(cursor, taskkey, hashcode, timestamp)
def __init__(self):
self.__tasks = []
@@ -74,19 +88,25 @@
self.__task_lock = threading.Lock()
# Default to one slave on localhost
- self.__worker_urls = ['localhost:%d' + int(config.get('firehose.slaveport'))]
+ self.__worker_urls = ['localhost:%d' % (int(config.get('firehose.slaveport')),)]
+ _logger.debug("worker urls are %r", self.__worker_urls)
path = config.get('firehose.taskdbpath')
+ _logger.debug("connecting to %r", path)
self.__conn = sqlite3.connect(path, isolation_level=None)
cursor = self.__conn.cursor()
- cursor.execute('''CREATE TABLE IF NOT EXISTS Tasks (tid INTEGER PRIMARY KEY AUTOINCREMENT,
- key TEXT UNIQUE IGNORE)''')
+ cursor.execute('''CREATE TABLE IF NOT EXISTS Tasks (key TEXT UNIQUE ON CONFLICT IGNORE,
+ resulthash TEXT,
+ resulttime DATETIME)''')
cursor.execute('''CREATE INDEX IF NOT EXISTS TasksIdx on Tasks (key)''')
curtime = time.time()
for v in cursor.execute('''SELECT key from Tasks'''):
task = QueuedTask(curtime, TaskKey(v))
heapq.heappush(self.__tasks, task)
+
+ def start(self):
+ self.__requeue_poll()
def __unset_poll(self):
try:
Modified: firehose/firehose/jobs/poller.py
===================================================================
--- firehose/firehose/jobs/poller.py 2008-01-30 20:23:26 UTC (rev 7276)
+++ firehose/firehose/jobs/poller.py 2008-01-30 22:33:57 UTC (rev 7277)
@@ -64,7 +64,7 @@
class TaskRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
def do_POST(self):
_logger.debug("handling POST")
- data = rfile.read()
+ data = self.rfile.read()
taskids = simplejson.load(data)
poller = TaskPoller.get()
poller.poll_tasks(taskids)
@@ -76,18 +76,23 @@
def get():
global _instance
if _instance is None:
- _instance = MasterPoller()
+ _instance = TaskPoller()
return _instance
def __init__(self):
bindport = int(config.get('firehose.slaveport'))
self.__server = BaseHTTPServer.HTTPServer(('', bindport), TaskRequestHandler)
self.__active_collectors = set()
+ self.__master_hostport = (config.get('firehose.masterhost'), 8080)
def run(self):
self.__server.serve_forever()
- def __send_results(self, ):
+ def __send_results(self, results):
+ dumped_results = simplejson.dumps(results)
+ connection = httplib.HTTPConnection(*(self.__master_hostport))
+ connection.request('POST', '/taskset-status', dumped_results,
+ headers={'Content-Type': 'text/javascript'})
def __run_collect_tasks(self, taskqueue, resultqueue):
_logger.debug("doing join on taskqueue")
Modified: firehose/start-firehose.py
===================================================================
--- firehose/start-firehose.py 2008-01-30 20:23:26 UTC (rev 7276)
+++ firehose/start-firehose.py 2008-01-30 22:33:57 UTC (rev 7277)
@@ -21,5 +21,8 @@
update_config(configfile="prod.cfg",modulename="firehose.config")
config.update(dict(package="firehose"))
+from firehose.jobs.master import MasterPoller
+MasterPoller.get().start()
+
from firehose.controllers import Root
start_server(Root())
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]