[ocrfeeder/new_fixes: 9/10] add a multiprocess Pool to the async worker.



commit b45eb02ae0810ee28547b314ea321b6e3d4dd420
Author: Jan Losinski <losinski wh2 tu-dresden de>
Date:   Fri Jul 27 15:25:38 2012 +0200

    add a multiprocess Pool to the async worker.
    
    The multiprocess.Pool is a Process pool for doing parallel work without
    a limiting GIL - so its really parallel if you have more than one cpu.
    
    The Pool usage is limited for cases where the "parallel" argument of the
    AsyncWorker is >1 - in other cases this makes no sense and is uneccesary
    overhead.
    
    Another benefit is, that there are less shared-state-problems that
    treads normally introduce - because of the real process insolation.
    
    Signed-off-by: Jan Losinski <losinski wh2 tu-dresden de>

 src/ocrfeeder/util/asyncworker.py |   26 +++++++++++++++++++++++---
 1 files changed, 23 insertions(+), 3 deletions(-)
---
diff --git a/src/ocrfeeder/util/asyncworker.py b/src/ocrfeeder/util/asyncworker.py
index b2731a6..ffee92f 100644
--- a/src/ocrfeeder/util/asyncworker.py
+++ b/src/ocrfeeder/util/asyncworker.py
@@ -26,23 +26,29 @@ from threading import Thread, BoundedSemaphore, RLock
 import Queue
 import gobject
 from lib import debug
+from multiprocessing import Pool
 
 
 class AsyncItem(object):
 
-    def __init__(self, target_method, target_method_args, finish_callback = None, finish_callback_args = ()):
+    def __init__(self, target_method, target_method_args, finish_callback = None, finish_callback_args = (), process_pool=None):
         self.target_method = target_method
         self.target_method_args = target_method_args
         self.finish_callback = finish_callback
         self.finish_callback_args = finish_callback_args
         self.canceled = False
+        self.process_pool = process_pool
 
     def run(self):
         if self.canceled:
             return
         results = error = None
         try:
-            results = self.target_method(*self.target_method_args)
+            if self.process_pool is None:
+                results = self.target_method(*self.target_method_args)
+            else:
+                results = self.process_pool.apply(self.target_method, self.target_method_args)
+
         except Exception, exception:
             debug(str(exception))
             error = exception
@@ -64,6 +70,7 @@ class AsyncWorker(Thread):
     def __init__(self, parallel=1):
         Thread.__init__(self)
         self.queue = Queue.Queue(0)
+        self.daemon = True
         self.stopped = False
         self.item_number = -1
 
@@ -74,19 +81,28 @@ class AsyncWorker(Thread):
         self.done = False
         self.queue_processing = True
 
+        self.process_pool = None
+        if parallel > 1:
+            self.process_pool = Pool(parallel)
+
+
     def run(self):
         try:
-            while not self.stopped or not self.queue.empty():
+            while not self.stopped and not self.queue.empty():
                 try:
                     async_item = self.queue.get(False)
                     self.item_number += 1
 
                     thread = Thread(target=self._run_item, args=(async_item, ))
+                    self.running_items.append(async_item)
                     self.thread_sem.acquire()
+                    async_item.process_pool = self.process_pool
                     self.worker_threads.append(thread)
                     thread.start()
 
                 except Queue.Empty:
+                    if self.process_pool is not None:
+                        self.process_pool.close()
                     break
 
                 except Exception, exception:
@@ -98,6 +114,8 @@ class AsyncWorker(Thread):
             for thread in self.worker_threads:
                 thread.join()
             self.stopped = True
+            if self.process_pool is not None:
+                self.process_pool.terminate()
 
 
     def stop(self):
@@ -105,6 +123,8 @@ class AsyncWorker(Thread):
         if len(self.running_items):
             for async_item in self.running_items:
                 async_item.cancel()
+        if self.process_pool is not None:
+            self.process_pool.close()
 
 
     def _run_item(self, async_item):



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]