[ocrfeeder/new_fixes: 4/10] Change the async worker to use threads for its job execution



commit 6ee1bc5fa106725fd3c8bd304cb400b49d97e446
Author: Jan Losinski <losinski wh2 tu-dresden de>
Date:   Thu Jul 26 20:41:23 2012 +0200

    Change the async worker to use threads for its job execution
    
    The async worker is only one thread to decouple the job execution from
    the ui. This introduces per-job-threads within the async worker. This
    allows to execute more than one job at a time in the worker. The maximum
    spawned threads are controlled by a semaphore. The count is hardcoded at
    the moment.
    
    Signed-off-by: Jan Losinski <losinski wh2 tu-dresden de>

 src/ocrfeeder/studio/widgetModeler.py   |    4 +-
 src/ocrfeeder/studio/widgetPresenter.py |    4 +-
 src/ocrfeeder/util/asyncworker.py       |   70 +++++++++++++++++++++++--------
 3 files changed, 56 insertions(+), 22 deletions(-)
---
diff --git a/src/ocrfeeder/studio/widgetModeler.py b/src/ocrfeeder/studio/widgetModeler.py
index 0e3bcd6..7d27554 100644
--- a/src/ocrfeeder/studio/widgetModeler.py
+++ b/src/ocrfeeder/studio/widgetModeler.py
@@ -585,7 +585,7 @@ class ImageReviewer_Controler:
 
     def recognizeDocument(self):
         pages = self.source_images_selector_widget.getAllPages()
-        dialog = QueuedEventsProgressDialog(self.main_window.window)
+        dialog = QueuedEventsProgressDialog(self.main_window.window, parallel=4)
         items = []
         i = 1
         total = len(pages)
@@ -645,7 +645,7 @@ class ImageReviewer_Controler:
                                               pages_to_process,
                                               data_boxes, error):
         page.data_boxes = data_boxes
-        if page == pages_to_process[-1]:
+        if dialog.worker.done:
             dialog.cancel()
             self.__updateImageReviewers()
 
diff --git a/src/ocrfeeder/studio/widgetPresenter.py b/src/ocrfeeder/studio/widgetPresenter.py
index c8e1dc3..cd970cf 100644
--- a/src/ocrfeeder/studio/widgetPresenter.py
+++ b/src/ocrfeeder/studio/widgetPresenter.py
@@ -1170,12 +1170,12 @@ class CommandProgressBarDialog(gtk.Dialog):
 
 class QueuedEventsProgressDialog(gtk.Dialog):
 
-    def __init__(self, parent, items_list = []):
+    def __init__(self, parent, items_list = [], parallel=1):
         super(QueuedEventsProgressDialog, self).__init__(parent = parent,
                                                        flags = gtk.DIALOG_MODAL)
         self.set_icon_from_file(WINDOW_ICON)
         self.info_list = []
-        self.worker = AsyncWorker()
+        self.worker = AsyncWorker(parallel)
         self.setItemsList(items_list)
         self.label = gtk.Label()
         self.__makeProgressBar()
diff --git a/src/ocrfeeder/util/asyncworker.py b/src/ocrfeeder/util/asyncworker.py
index e1ad5a1..b2731a6 100644
--- a/src/ocrfeeder/util/asyncworker.py
+++ b/src/ocrfeeder/util/asyncworker.py
@@ -22,11 +22,12 @@
 #    along with this program.  If not, see <http://www.gnu.org/licenses/>.
 ###########################################################################
 
-from threading import Thread
+from threading import Thread, BoundedSemaphore, RLock
 import Queue
 import gobject
 from lib import debug
 
+
 class AsyncItem(object):
 
     def __init__(self, target_method, target_method_args, finish_callback = None, finish_callback_args = ()):
@@ -54,32 +55,65 @@ class AsyncItem(object):
     def cancel(self):
         self.canceled = True
 
+
+ready_lock = RLock()
+
+
 class AsyncWorker(Thread):
 
-    def __init__(self):
+    def __init__(self, parallel=1):
         Thread.__init__(self)
         self.queue = Queue.Queue(0)
         self.stopped = False
-        self.async_item = None
         self.item_number = -1
 
+        self.parallel = parallel
+        self.running_items = []
+        self.worker_threads = []
+        self.thread_sem = BoundedSemaphore(value=parallel)
+        self.done = False
+        self.queue_processing = True
+
     def run(self):
-        while not self.stopped:
-            if self.queue.empty():
-                self.stop()
-                break
-            try:
-                self.async_item = self.queue.get()
-                self.item_number += 1
-                self.async_item.run()
-                self.queue.task_done()
-                self.async_item = None
-            except Exception, exception:
-                debug(str(exception))
-                self.stop()
+        try:
+            while not self.stopped or not self.queue.empty():
+                try:
+                    async_item = self.queue.get(False)
+                    self.item_number += 1
+
+                    thread = Thread(target=self._run_item, args=(async_item, ))
+                    self.thread_sem.acquire()
+                    self.worker_threads.append(thread)
+                    thread.start()
+
+                except Queue.Empty:
+                    break
+
+                except Exception, exception:
+                    debug(str(exception))
+                    self.stop()
+        finally:
+            with ready_lock:
+                self.queue_processing = False
+            for thread in self.worker_threads:
+                thread.join()
+            self.stopped = True
+
 
     def stop(self):
         self.stopped = True
-        if self.async_item:
-            self.async_item.cancel()
+        if len(self.running_items):
+            for async_item in self.running_items:
+                async_item.cancel()
+
 
+    def _run_item(self, async_item):
+        with ready_lock:
+            self.running_items.append(async_item)
+        async_item.run()
+        self.queue.task_done()
+        self.running_items.remove(async_item)
+        with ready_lock:
+            if not self.queue_processing and not self.running_items:
+                self.done = True
+        self.thread_sem.release()



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]