[gnome-continuous-yocto/gnomeostree-3.28-rocko: 6046/8267] oeqa/core/threaded: Add support of OETestRunnerThreaded
- From: Emmanuele Bassi <ebassi src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [gnome-continuous-yocto/gnomeostree-3.28-rocko: 6046/8267] oeqa/core/threaded: Add support of OETestRunnerThreaded
- Date: Sun, 17 Dec 2017 04:17:35 +0000 (UTC)
commit 6632277986ad20ce0cbcb4432018156dfa3ba002
Author: Aníbal Limón <anibal limon linux intel com>
Date: Fri May 26 15:37:37 2017 -0500
oeqa/core/threaded: Add support of OETestRunnerThreaded
The OETestRunnerThreaded overrides the run method of OETestRunner
it recieves a list of suites to be executed by a ThreadPool.
The new run method handles the ThreadPool creation and the
OETestResultThreaded fill.
[YOCTO #11450]
(From OE-Core rev: 48b7a407d692e6c49c41b16f2bd11e8c3f47a421)
Signed-off-by: Aníbal Limón <anibal limon linux intel com>
Signed-off-by: Richard Purdie <richard purdie linuxfoundation org>
meta/lib/oeqa/core/threaded.py | 75 +++++++++++++++++++++++++++++++++++++++-
1 files changed, 74 insertions(+), 1 deletions(-)
---
diff --git a/meta/lib/oeqa/core/threaded.py b/meta/lib/oeqa/core/threaded.py
index f216685..81df340 100644
--- a/meta/lib/oeqa/core/threaded.py
+++ b/meta/lib/oeqa/core/threaded.py
@@ -3,11 +3,13 @@
import threading
import multiprocessing
+import queue
+import time
from unittest.suite import TestSuite
from oeqa.core.loader import OETestLoader
-from oeqa.core.runner import OEStreamLogger, OETestResult
+from oeqa.core.runner import OEStreamLogger, OETestResult, OETestRunner
class OETestLoaderThreaded(OETestLoader):
def __init__(self, tc, module_paths, modules, tests, modules_required,
@@ -185,3 +187,74 @@ class OETestResultThreaded(object):
tid = list(self._results)[0]
result = self._results[tid]['result']
result.logDetails()
+
+class _Worker(threading.Thread):
+ """Thread executing tasks from a given tasks queue"""
+ def __init__(self, tasks, result, stream):
+ threading.Thread.__init__(self)
+ self.tasks = tasks
+
+ self.result = result
+ self.stream = stream
+
+ def run(self):
+ while True:
+ try:
+ func, args, kargs = self.tasks.get(block=False)
+ except queue.Empty:
+ break
+
+ try:
+ run_start_time = time.time()
+ rc = func(*args, **kargs)
+ run_end_time = time.time()
+ self.result.addResult(rc, run_start_time, run_end_time)
+ self.stream.finish()
+ except Exception as e:
+ print(e)
+ finally:
+ self.tasks.task_done()
+
+class _ThreadedPool:
+ """Pool of threads consuming tasks from a queue"""
+ def __init__(self, num_workers, num_tasks, stream=None, result=None):
+ self.tasks = queue.Queue(num_tasks)
+ self.workers = []
+
+ for _ in range(num_workers):
+ worker = _Worker(self.tasks, result, stream)
+ self.workers.append(worker)
+
+ def start(self):
+ for worker in self.workers:
+ worker.start()
+
+ def add_task(self, func, *args, **kargs):
+ """Add a task to the queue"""
+ self.tasks.put((func, args, kargs))
+
+ def wait_completion(self):
+ """Wait for completion of all the tasks in the queue"""
+ self.tasks.join()
+ for worker in self.workers:
+ worker.join()
+
+class OETestRunnerThreaded(OETestRunner):
+ streamLoggerClass = OEStreamLoggerThreaded
+
+ def __init__(self, tc, *args, **kwargs):
+ super(OETestRunnerThreaded, self).__init__(tc, *args, **kwargs)
+ self.resultclass = OETestResultThreadedInternal # XXX: XML reporting overrides at __init__
+
+ def run(self, suites):
+ result = OETestResultThreaded(self.tc)
+
+ pool = _ThreadedPool(len(suites), len(suites), stream=self.stream,
+ result=result)
+ for s in suites:
+ pool.add_task(super(OETestRunnerThreaded, self).run, s)
+ pool.start()
+ pool.wait_completion()
+ result._fill_tc_results()
+
+ return result
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]