r7281 - in firehose: . firehose firehose/jobs
- From: commits mugshot org
- To: online-desktop-list gnome org
- Subject: r7281 - in firehose: . firehose firehose/jobs
- Date: Fri, 1 Feb 2008 10:48:16 -0600 (CST)
Author: walters
Date: 2008-02-01 10:48:15 -0600 (Fri, 01 Feb 2008)
New Revision: 7281
Modified:
firehose/dev.cfg
firehose/firehose/controllers.py
firehose/firehose/jobs/master.py
firehose/firehose/jobs/poller.py
firehose/firehose/jobs/tasks.py
firehose/start-firehose.py
Log:
Boots, runs.
Modified: firehose/dev.cfg
===================================================================
--- firehose/dev.cfg 2008-01-31 19:28:09 UTC (rev 7280)
+++ firehose/dev.cfg 2008-02-01 16:48:15 UTC (rev 7281)
@@ -13,7 +13,7 @@
firehose.taskdbpath="%(current_dir_uri)s/dev-tasks.sqlite"
firehose.masterhost="localhost"
-firehose.slaveport="8090"
+firehose.slaveport="6677"
# if you are using a database or table type without transactions
# (MySQL default, for example), you should turn off transactions
@@ -31,11 +31,11 @@
# Enable the debug output at the end on pages.
# log_debug_info_filter.on = False
-server.environment="development"
-autoreload.package="firehose"
+#server.environment="development"
+#autoreload.package="firehose"
# Auto-Reload after code modification
-# autoreload.on = True
+autoreload.on = False
# Set to True if you'd like to abort execution if a controller gets an
# unexpected parameter. False by default
@@ -55,6 +55,7 @@
level='DEBUG'
qualname='firehose'
handlers=['debug_out']
+propagate=0
[[[allinfo]]]
level='INFO'
Modified: firehose/firehose/controllers.py
===================================================================
--- firehose/firehose/controllers.py 2008-01-31 19:28:09 UTC (rev 7280)
+++ firehose/firehose/controllers.py 2008-02-01 16:48:15 UTC (rev 7281)
@@ -1,9 +1,12 @@
+import logging
+
+import simplejson
from turbogears import controllers, expose, flash
import cherrypy
# from model import *
-# import logging
-# log = logging.getLogger("firehose.controllers")
+_logger = logging.getLogger("firehose.controllers")
+
class Root(controllers.RootController):
@expose(template="firehose.templates.welcome")
def index(self):
@@ -13,22 +16,33 @@
return dict(now=time.ctime())
@expose("json")
- def addtask(self, taskid=None):
+ def addfeed(self, feedurl=None):
if cherrypy.request.method != 'POST':
raise Exception("Must invoke this method using POST")
- if taskid is None:
+ if feedurl is None:
+ _logger.debug("no feed url specified")
return {}
from firehose.jobs.master import MasterPoller
master = MasterPoller.get()
- master.add_task(taskid)
+ master.add_feed(feedurl)
return {}
@expose("json")
- def taskset_status(self, results=None):
+ def requeue(self):
if cherrypy.request.method != 'POST':
raise Exception("Must invoke this method using POST")
from firehose.jobs.master import MasterPoller
master = MasterPoller.get()
- master.taskset_status(results)
+ master.requeue()
+ return {}
+
+ @expose("json")
+ def taskset_status(self):
+ if cherrypy.request.method != 'POST':
+ raise Exception("Must invoke this method using POST")
+ from firehose.jobs.master import MasterPoller
+ master = MasterPoller.get()
+ status = simplejson.load(cherrypy.request.body)
+ master.taskset_status(status)
return {}
\ No newline at end of file
Modified: firehose/firehose/jobs/master.py
===================================================================
--- firehose/firehose/jobs/master.py 2008-01-31 19:28:09 UTC (rev 7280)
+++ firehose/firehose/jobs/master.py 2008-02-01 16:48:15 UTC (rev 7281)
@@ -1,23 +1,27 @@
#!/usr/bin/python
import os,sys,re,heapq,time,httplib,logging,threading
+import traceback
+import BaseHTTPServer,urllib
if sys.version_info[0] < 2 or sys.version_info[1] < 5:
from pysqlite2 import dbapi2 as sqlite3
else:
import sqlite3
+import simplejson
import boto
-
from boto.sqs.connection import SQSConnection
from boto.s3.connection import S3Connection
from boto.s3.key import Key
+import turbogears
from turbogears import config
-import turbojson.jsonify
-from tasks import TaskKey
+from tasks import TaskEntry
+from firehose.jobs.poller import TaskPoller
_logger = logging.getLogger('firehose.Master')
+_logger.debug("hello master!")
aws_config_path = os.path.expanduser('~/.aws')
execfile(aws_config_path)
@@ -25,6 +29,7 @@
DEFAULT_POLL_TIME_SECS = 45 * 60 # 45 minutes
MIN_POLL_TIME_SECS = 15
TASKSET_TIMEOUT_SECS = 7 * 60 # 7 minutes
+TASKSET_POLL_CHECK_SECS = 2 * 60 # 2 minutes
MAX_TASKSET_SIZE = 30
MAX_TASKSET_WORKERS = 1
@@ -37,7 +42,7 @@
poller.taskset_status(taskset_results)
class QueuedTask(object):
- __slots__ = ['eligibility', 'task']
+ __slots__ = ['eligibility', 'task',]
def __init__(self, eligibility, task):
self.eligibility = eligibility
self.task = task
@@ -52,61 +57,89 @@
def get():
global _instance
if _instance is None:
+ _logger.debug("Constructing master poller instance")
_instance = MasterPoller()
return _instance
+
+ def __init__(self):
+ global _instance
+ assert _instance is None
+ self.__tasks = []
+ self.__poll_task = None
+ self.__task_lock = threading.Lock()
+
+ # Default to one slave on localhost
+ self.__worker_endpoints = ['localhost:%d' % (int(config.get('firehose.slaveport')),)]
+ _logger.debug("worker endpoints are %r", self.__worker_endpoints)
+ for bind in self.__worker_endpoints:
+ (host,port) = bind.split(':')
+ if host == 'localhost':
+ _logger.debug("found localhost worker, starting it")
+ self.__local_handler = TaskPoller.get()
+ self.__local_handler.run_async()
+
+ path = self.__path = config.get('firehose.taskdbpath')
+ _logger.debug("connecting to %r", path)
+ conn = sqlite3.connect(path, isolation_level=None)
+ cursor = conn.cursor()
+ cursor.execute('''CREATE TABLE IF NOT EXISTS Tasks (key TEXT UNIQUE,
+ prev_hash TEXT,
+ prev_time DATETIME)''')
+ cursor.execute('''CREATE INDEX IF NOT EXISTS TasksIdx on Tasks (key)''')
+
+ curtime = time.time()
+ for key,prev_hash,prev_time in cursor.execute('''SELECT key,prev_hash,prev_time from Tasks'''):
+ task = QueuedTask(curtime, TaskEntry(key, prev_hash, prev_time))
+ heapq.heappush(self.__tasks, task)
+ conn.close()
+ _logger.debug("%d queued tasks", len(self.__tasks))
def __add_task_for_key(self, key):
try:
self.__task_lock.acquire()
- taskkey = TaskKey(key)
+ task = TaskEntry(key, None, None)
for qtask in self.__tasks:
- if qtask.task == taskkey:
+ if qtask.task == task:
return qtask
- newtask = QueuedTask(time.time(), taskkey)
- self.__tasks.append(newtask)
+ qtask = QueuedTask(time.time(), task)
+ self.__tasks.append(qtask)
finally:
self.__task_lock.release()
+
+ def add_feed(self, feedurl):
+ taskkey = 'feed/' + urllib.quote(feedurl)
+ self.add_task(taskkey)
def add_task(self, taskkey):
- cursor = self.__conn.cursor()
- self.__set_task_status(cursor, taskkey, None, None)
+ _logger.debug("adding task key=%r", taskkey)
self.__add_task_for_key(taskkey)
+ try:
+ conn = sqlite3.connect(self.__path, isolation_level=None)
+ cursor = conn.cursor()
+ self.__set_task_status(cursor, taskkey, None, None)
+ finally:
+ conn.close()
def __set_task_status(self, cursor, taskkey, hashcode, timestamp):
- cursor.execute('''INSERT INTO Tasks VALUES (?, ?, ?)''',
- taskkey, hashcode, timestamp)
-
+ _logger.debug("updating task %r values (%r %r)", taskkey, hashcode, timestamp)
+ cursor.execute('''INSERT OR REPLACE INTO Tasks VALUES (?, ?, ?)''',
+ (taskkey, hashcode, timestamp))
+
def taskset_status(self, results):
- _logger.debug("got %d results", len(results))
- cursor = self.__conn.cursor()
- for (taskkey,hashcode,timestamp) in results:
- self.__set_task_status(cursor, taskkey, hashcode, timestamp)
-
- def __init__(self):
- self.__tasks = []
- self.__poll_task = None
- self.__task_lock = threading.Lock()
-
- # Default to one slave on localhost
- self.__worker_urls = ['localhost:%d' % (int(config.get('firehose.slaveport')),)]
- _logger.debug("worker urls are %r", self.__worker_urls)
-
- path = config.get('firehose.taskdbpath')
- _logger.debug("connecting to %r", path)
- self.__conn = sqlite3.connect(path, isolation_level=None)
- cursor = self.__conn.cursor()
- cursor.execute('''CREATE TABLE IF NOT EXISTS Tasks (key TEXT UNIQUE ON CONFLICT IGNORE,
- resulthash TEXT,
- resulttime DATETIME)''')
- cursor.execute('''CREATE INDEX IF NOT EXISTS TasksIdx on Tasks (key)''')
-
- curtime = time.time()
- for v in cursor.execute('''SELECT key from Tasks'''):
- task = QueuedTask(curtime, TaskKey(v))
- heapq.heappush(self.__tasks, task)
+ _logger.info("got %d results", len(results))
+ _logger.debug("results: %r", results )
+ try:
+ conn = sqlite3.connect(self.__path, isolation_level=None)
+ cursor = conn.cursor()
+ cursor.execute('''BEGIN''')
+ for (taskkey,hashcode,timestamp) in results:
+ self.__set_task_status(cursor, taskkey, hashcode, timestamp)
+ cursor.execute('''COMMIT''')
+ finally:
+ conn.close()
def start(self):
- self.__requeue_poll()
+ self.__requeue_poll(immediate=True)
def __unset_poll(self):
try:
@@ -122,13 +155,13 @@
raise NotImplementedError()
def __enqueue_taskset(self, worker, taskset):
- jsonstr = turbogears.jsonify.encode(taskset)
+ jsonstr = simplejson.dumps(taskset)
conn = httplib.HTTPConnection(worker)
- req = conn.request('POST', '/', jsonstr)
+ conn.request('POST', '/', jsonstr)
+ conn.close()
def __do_poll(self):
_logger.debug("in poll")
- self.__unset_poll()
tasksets = []
curtime = time.time()
@@ -136,27 +169,50 @@
try:
self.__task_lock.acquire()
taskset = []
- for i,task in enumerate(self.__tasks):
+ i = 0
+ while True:
+ try:
+ task = heapq.heappop(self.__tasks)
+ except IndexError, e:
+ break
if i >= MAX_TASKSET_SIZE:
if len(tasksets) >= MAX_TASKSET_WORKERS:
break
tasksets.append(taskset)
taskset = []
- if task.eligibility < taskset_limit:
- taskset.extend(task)
+ i = 0
+ else:
+ i += 1
+ eligible = task.eligibility < taskset_limit
+ task.eligibility = curtime + DEFAULT_POLL_TIME_SECS
+ heapq.heappush(self.__tasks, task)
+ if eligible:
+ taskset.append((str(task.task), task.task.prev_hash, task.task.prev_timestamp))
+ else:
+ break
+ if len(taskset) > 0:
+ tasksets.append(taskset)
+ taskset = None
finally:
self.__task_lock.release()
taskset_count = len(tasksets)
- if taskset_count > len(self.__worker_urls):
- self.__activate_workers()
- for worker,taskset in zip(tasksets, self.__worker_urls):
- self.__enqueue_taskset(worker, taskset)
+ curworker_count = len(self.__worker_endpoints)
+ if taskset_count > curworker_count:
+ _logger.info("Need worker activation, current=%d, required=%d", curworker_count, taskset_count)
+ self.__activate_workers()
+ _logger.info("have %d active tasksets", taskset_count)
+ if taskset_count > 0:
+ for worker,taskset in zip(self.__worker_endpoints,tasksets):
+ self.__enqueue_taskset(worker, taskset)
+ self.__requeue_poll()
+
+ def requeue(self):
self.__requeue_poll()
- def __requeue_poll(self):
+ def __requeue_poll(self, immediate=False):
_logger.debug("doing poll requeue")
+ self.__unset_poll()
try:
- self.__unset_poll()
self.__task_lock.acquire()
assert self.__poll_task is None
@@ -165,9 +221,12 @@
return
curtime = time.time()
next_timeout = self.__tasks[0].eligibility - curtime
- if next_timeout < MIN_POLL_TIME_SECS:
+ if immediate:
+ next_timeout = 1
+ elif (next_timeout < MIN_POLL_TIME_SECS):
next_timeout = MIN_POLL_TIME_SECS
- _logger.debug("requeuing check for %r", next_timeout)
- self.__poll_task = turbogears.scheduler.add_interval_task(action=self.__do_poll, taskname='FirehoseMasterPoll', initialdelay=next_timeout, interval=1)
+ _logger.debug("requeuing check for %r secs (%r mins)", next_timeout, next_timeout/60.0)
+ self.__poll_task = turbogears.scheduler.add_interval_task(action=self.__do_poll, taskname='FirehoseMasterPoll',
+ initialdelay=next_timeout, interval=TASKSET_POLL_CHECK_SECS)
finally:
self.__task_lock.release()
Modified: firehose/firehose/jobs/poller.py
===================================================================
--- firehose/firehose/jobs/poller.py 2008-01-31 19:28:09 UTC (rev 7280)
+++ firehose/firehose/jobs/poller.py 2008-02-01 16:48:15 UTC (rev 7281)
@@ -1,8 +1,9 @@
#!/usr/bin/python
-import os,sys,re,heapq,time,Queue,sha
-import BaseHTTPServer,httplib,urlparse
-from email.Utils import formatdate,parsedate
+import os,sys,re,heapq,time,Queue,sha,threading
+import BaseHTTPServer,httplib,urlparse,urllib
+from email.utils import formatdate,parsedate
+import logging
import boto
@@ -19,6 +20,9 @@
aws_config_path = os.path.expanduser('~/.aws')
execfile(aws_config_path)
+# Global hash mapping family to class
+_task_families = {}
+
class TaskHandler(object):
FAMILY = None
@@ -28,25 +32,26 @@
raise NotImplementedError()
class FeedTaskHandler(object):
- FAMILY = 'http-feed'
+ FAMILY = 'feed'
def run(self, id, prev_hash, prev_timestamp):
targeturl = urllib.unquote(id)
parsedurl = urlparse.urlparse(targeturl)
try:
- connection = httplib.HTTPConnection(parsedurl.host, parsedurl.port)
+ _logger.info('Connecting to %r', targeturl)
+ connection = httplib.HTTPConnection(parsedurl.hostname, parsedurl.port)
connection.request('GET', parsedurl.path,
headers={'If-Modified-Since':
formatdate(prev_timestamp)})
response = connection.getresponse()
if response.status == 304:
+ _logger.info("Got 304 Unmodified for %r", targeturl)
return (prev_hash, prev_timestamp)
data = response.read()
- new_timestamp = resp_headers
hash = sha.new()
hash.update(data)
hash_hex = hash.hexdigest()
- timestamp_str = response.getheader('Last-Modified')
+ timestamp_str = response.getheader('Last-Modified', None)
if timestamp_str is not None:
timestamp = parsedate(timestamp_str)
else:
@@ -60,22 +65,22 @@
connection.close()
except:
pass
+_task_families[FeedTaskHandler.FAMILY] = FeedTaskHandler
class TaskRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler):
def do_POST(self):
_logger.debug("handling POST")
- data = self.rfile.read()
- taskids = simplejson.load(data)
+ taskids = simplejson.load(self.rfile)
poller = TaskPoller.get()
poller.poll_tasks(taskids)
_instance = None
-class TaskPoller(SimpleHTTPServer.BaseHTTPServer):
-
+class TaskPoller(object):
@staticmethod
def get():
global _instance
if _instance is None:
+ _logger.debug("constructing task poller")
_instance = TaskPoller()
return _instance
@@ -85,13 +90,18 @@
self.__active_collectors = set()
self.__master_hostport = (config.get('firehose.masterhost'), 8080)
+ def run_async(self):
+ thr = threading.Thread(target=self.run)
+ thr.setDaemon(True)
+ thr.start()
+
def run(self):
self.__server.serve_forever()
def __send_results(self, results):
dumped_results = simplejson.dumps(results)
connection = httplib.HTTPConnection(*(self.__master_hostport))
- connection.request('POST', '/taskset-status', dumped_results,
+ connection.request('POST', '/taskset_status', dumped_results,
headers={'Content-Type': 'text/javascript'})
def __run_collect_tasks(self, taskqueue, resultqueue):
@@ -107,15 +117,25 @@
break
self.__send_results(results)
- def poll_tasks(self, taskids):
+ def __run_task(self, taskid, prev_hash, prev_timestamp, taskqueue, resultqueue):
+ (family, tid) = taskid.split('/', 1)
+ try:
+ fclass = _task_families[family]
+ except KeyError, e:
+ _logger.exception("Failed to find family for task %r", taskid)
+ return
+ inst = fclass()
+ (new_hash, new_timestamp) = inst.run(tid, prev_hash, prev_timestamp)
+ _logger.info("Result hash:%r ts:%r", new_hash, new_timestamp)
+ resultqueue.put((taskid, new_hash, new_timestamp))
+ taskqueue.task_done()
+
+ def poll_tasks(self, tasks):
taskqueue = Queue.Queue()
resultqueue = Queue.Queue()
- for task in taskids:
- taskqueue.put(task)
- thread = threading.Thread(target=self.__run_task, args=(task,resultqueue))
+ for (taskid, prev_hash, prev_timestamp) in tasks:
+ taskqueue.put(taskid)
+ thread = threading.Thread(target=self.__run_task, args=(taskid, prev_hash, prev_timestamp, taskqueue, resultqueue))
thread.start()
collector = threading.Thread(target=self.__run_collect_tasks, args=(taskqueue,resultqueue))
collector.start()
-
- def run(self):
- pass
Modified: firehose/firehose/jobs/tasks.py
===================================================================
--- firehose/firehose/jobs/tasks.py 2008-01-31 19:28:09 UTC (rev 7280)
+++ firehose/firehose/jobs/tasks.py 2008-02-01 16:48:15 UTC (rev 7281)
@@ -2,11 +2,15 @@
import os,sys,re
-class TaskKey(object):
+class TaskEntry(object):
family = property(lambda self: self._family)
id = property(lambda self: self._id)
- def __init__(self, keystr):
+ prev_hash = property(lambda self: self._prev_hash)
+ prev_timestamp = property(lambda self: self._prev_timestamp)
+ def __init__(self, keystr, prev_hash, prev_ts):
+ super(TaskEntry, self).__init__()
(self._family, self._id) = keystr.split('/', 1)
+ (self._prev_hash, self._prev_timestamp) = prev_hash, prev_ts
def __cmp__(self, other):
v = cmp(self.family, other.family)
@@ -14,6 +18,9 @@
return v
return cmp(self.id, other.id)
+ def __str__(self):
+ return self.family + '/' + self.id
+
def __json__(self):
return {
"family" : self.family,
Modified: firehose/start-firehose.py
===================================================================
--- firehose/start-firehose.py 2008-01-31 19:28:09 UTC (rev 7280)
+++ firehose/start-firehose.py 2008-02-01 16:48:15 UTC (rev 7281)
@@ -21,8 +21,13 @@
update_config(configfile="prod.cfg",modulename="firehose.config")
config.update(dict(package="firehose"))
+import firehose.jobs.master
from firehose.jobs.master import MasterPoller
-MasterPoller.get().start()
+import logging
+l = logging.getLogger('firehose.Startup')
+l.debug("Startup!")
+master = MasterPoller.get()
+master.start()
from firehose.controllers import Root
start_server(Root())
[Date Prev][
Date Next] [Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]