[gnome-continuous-yocto/gnomeostree-3.28-rocko: 6046/8267] oeqa/core/threaded: Add support of OETestRunnerThreaded



commit 6632277986ad20ce0cbcb4432018156dfa3ba002
Author: Aníbal Limón <anibal limon linux intel com>
Date:   Fri May 26 15:37:37 2017 -0500

    oeqa/core/threaded: Add support of OETestRunnerThreaded
    
    The OETestRunnerThreaded overrides the run method of OETestRunner
    it recieves a list of suites to be executed by a ThreadPool.
    
    The new run method handles the ThreadPool creation and the
    OETestResultThreaded fill.
    
    [YOCTO #11450]
    
    (From OE-Core rev: 48b7a407d692e6c49c41b16f2bd11e8c3f47a421)
    
    Signed-off-by: Aníbal Limón <anibal limon linux intel com>
    Signed-off-by: Richard Purdie <richard purdie linuxfoundation org>

 meta/lib/oeqa/core/threaded.py |   75 +++++++++++++++++++++++++++++++++++++++-
 1 files changed, 74 insertions(+), 1 deletions(-)
---
diff --git a/meta/lib/oeqa/core/threaded.py b/meta/lib/oeqa/core/threaded.py
index f216685..81df340 100644
--- a/meta/lib/oeqa/core/threaded.py
+++ b/meta/lib/oeqa/core/threaded.py
@@ -3,11 +3,13 @@
 
 import threading
 import multiprocessing
+import queue
+import time
 
 from unittest.suite import TestSuite
 
 from oeqa.core.loader import OETestLoader
-from oeqa.core.runner import OEStreamLogger, OETestResult
+from oeqa.core.runner import OEStreamLogger, OETestResult, OETestRunner
 
 class OETestLoaderThreaded(OETestLoader):
     def __init__(self, tc, module_paths, modules, tests, modules_required,
@@ -185,3 +187,74 @@ class OETestResultThreaded(object):
         tid = list(self._results)[0]
         result = self._results[tid]['result']
         result.logDetails()
+
+class _Worker(threading.Thread):
+    """Thread executing tasks from a given tasks queue"""
+    def __init__(self, tasks, result, stream):
+        threading.Thread.__init__(self)
+        self.tasks = tasks
+
+        self.result = result
+        self.stream = stream
+
+    def run(self):
+        while True:
+            try:
+                func, args, kargs = self.tasks.get(block=False)
+            except queue.Empty:
+                break
+
+            try:
+                run_start_time = time.time()
+                rc = func(*args, **kargs)
+                run_end_time = time.time()
+                self.result.addResult(rc, run_start_time, run_end_time)
+                self.stream.finish()
+            except Exception as e:
+                print(e)
+            finally:
+                self.tasks.task_done()
+
+class _ThreadedPool:
+    """Pool of threads consuming tasks from a queue"""
+    def __init__(self, num_workers, num_tasks, stream=None, result=None):
+        self.tasks = queue.Queue(num_tasks)
+        self.workers = []
+
+        for _ in range(num_workers):
+            worker = _Worker(self.tasks, result, stream)
+            self.workers.append(worker)
+
+    def start(self):
+        for worker in self.workers:
+            worker.start()
+
+    def add_task(self, func, *args, **kargs):
+        """Add a task to the queue"""
+        self.tasks.put((func, args, kargs))
+
+    def wait_completion(self):
+        """Wait for completion of all the tasks in the queue"""
+        self.tasks.join()
+        for worker in self.workers:
+            worker.join()
+
+class OETestRunnerThreaded(OETestRunner):
+    streamLoggerClass = OEStreamLoggerThreaded
+
+    def __init__(self, tc, *args, **kwargs):
+        super(OETestRunnerThreaded, self).__init__(tc, *args, **kwargs)
+        self.resultclass = OETestResultThreadedInternal # XXX: XML reporting overrides at __init__
+
+    def run(self, suites):
+        result = OETestResultThreaded(self.tc)
+
+        pool = _ThreadedPool(len(suites), len(suites), stream=self.stream,
+                result=result)
+        for s in suites:
+            pool.add_task(super(OETestRunnerThreaded, self).run, s)
+        pool.start()
+        pool.wait_completion()
+        result._fill_tc_results()
+
+        return result


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]