[kupfer] task: Implement a threaded task



commit be62e753ee54ee3db14cfd625a2b75f0e83bbdfd
Author: Ulrik Sverdrup <ulrik sverdrup gmail com>
Date:   Sat Aug 22 23:36:01 2009 +0200

    task: Implement a threaded task

 kupfer/task.py |   83 ++++++++++++++++++++++++++++++++++++++++++-------------
 1 files changed, 63 insertions(+), 20 deletions(-)
---
diff --git a/kupfer/task.py b/kupfer/task.py
index c546dca..6223b9f 100644
--- a/kupfer/task.py
+++ b/kupfer/task.py
@@ -1,3 +1,5 @@
+import threading
+
 from kupfer import scheduler, pretty
 
 class Task (object):
@@ -5,6 +7,9 @@ class Task (object):
 	def __init__(self, name):
 		self.name = name
 
+	def is_thread(self):
+		return False
+
 	def run(self):
 		raise NotImplementedError
 
@@ -26,35 +31,73 @@ class StepTask (Task):
 		finally:
 			self.finish()
 
+class ThreadTask (Task, threading.Thread):
+	"""Run in a thread"""
+	def __init__(self, name):
+		Task.__init__(self, name)
+		threading.Thread.__init__(self)
+		self.__thread_done = False
+		self.__thread_started = False
 
-def _step_task(task):
-	try:
-		task.next()
-	except StopIteration:
-		return False
-	else:
+	def is_thread(self):
 		return True
 
+	def thread_do(self):
+		"""Override this to run what should be done in the thread"""
+		raise NotImplementedError
+
+	def __thread_run(self):
+		try:
+			self.__retval = self.thread_do()
+		finally:
+			self.__thread_done = True
+
+	def run(self):
+		while True:
+			if not self.__thread_started:
+				# swizzle the methods for Thread
+				self.run = self.__thread_run
+				self.start()
+				self.__thread_started = True
+			elif self.__thread_done:
+				return
+			yield
+
 class TaskRunner (pretty.OutputMixin):
 	"""Run Tasks in the idle Loop"""
 	def __init__(self, end_on_finish):
-		scheduler.GetScheduler().connect("finish", self._on_finish)
 		self.task_iters = []
-		self.timer = scheduler.Timer(True)
-		self.end_on_finish = end_on_finish
+		self.thread_iters = []
+		self.idle_timer = scheduler.Timer(True)
+		if end_on_finish:
+			scheduler.GetScheduler().connect("finish", self._finish_cleanup)
+
 	def add_task(self, task):
 		"""Register @task to be run"""
-		self.task_iters.append(task.run())
-		self.timer.set_idle(self._step_tasks)
-	def _step_tasks(self):
-		for task in list(self.task_iters):
-			if not _step_task(task):
-				self.output_debug("Task done:", task)
-				self.task_iters.remove(task)
+		if task.is_thread():
+			# start thread
+			thread = task.run()
+			self.thread_iters.append(thread)
+			# run through all threads
+			self._step_tasks(self.thread_iters)
+		else:
+			self.task_iters.append(task.run())
+		self._setup_timers()
+
+	def _setup_timers(self):
 		if self.task_iters:
-			self.timer.set_idle(self._step_tasks)
+			self.idle_timer.set_idle(self._step_tasks, self.task_iters)
+
+	def _step_tasks(self, tasks):
+		for task in list(tasks):
+			try:
+				task.next()
+			except StopIteration:
+				self.output_debug("Task done:", task)
+				tasks.remove(task)
+		self._setup_timers()
 
-	def _on_finish(self, sched):
-		if self.end_on_finish:
-			del self.task_iters[:]
+	def _finish_cleanup(self, sched):
+		del self.task_iters[:]
+		del self.thread_iters[:]
 



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]