[libgxps] regtest: implement parallel testing using Python's Queue class
- From: Carlos Garcia Campos <carlosgc src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [libgxps] regtest: implement parallel testing using Python's Queue class
- Date: Tue, 23 Jun 2015 17:24:35 +0000 (UTC)
commit 41a13771ca76b1ee0b2f036a27de374cd13af0cc
Author: Adam Reichold <adamreichold myopera com>
Date: Tue Jun 23 17:25:43 2015 +0200
regtest: implement parallel testing using Python's Queue class
regtest/Printer.py | 35 +++++++++++++++++++-------------
regtest/Test.py | 25 +---------------------
regtest/TestReferences.py | 22 +++++++++++++++++++-
regtest/TestRun.py | 48 ++++++++++++++++++++++++++++++++++++--------
regtest/main.py | 9 +++++++-
5 files changed, 91 insertions(+), 48 deletions(-)
---
diff --git a/regtest/Printer.py b/regtest/Printer.py
index 008f46b..025f697 100644
--- a/regtest/Printer.py
+++ b/regtest/Printer.py
@@ -18,6 +18,7 @@
import sys
from Config import Config
+from threading import RLock
class Printer:
@@ -32,6 +33,8 @@ class Printer:
self._rewrite = self._stream.isatty() and not self._verbose
self._current_line = None
+ self._lock = RLock()
+
Printer.__single = self
def _erase_current_line(self):
@@ -52,27 +55,31 @@ class Printer:
self._stream.flush()
def printout(self, msg):
- self._erase_current_line()
- self._print(msg)
- self._current_line = msg[msg.rfind('\n') + 1:]
+ with self._lock:
+ self._erase_current_line()
+ self._print(msg)
+ self._current_line = msg[msg.rfind('\n') + 1:]
def printout_update(self, msg):
- if self._rewrite and self._current_line is not None:
- msg = self._current_line + msg
- elif not self._rewrite:
- msg = self._ensure_new_line(msg)
- self.printout(msg)
+ with self._lock:
+ if self._rewrite and self._current_line is not None:
+ msg = self._current_line + msg
+ elif not self._rewrite:
+ msg = self._ensure_new_line(msg)
+ self.printout(msg)
def printout_ln(self, msg):
- if self._current_line is not None:
- self._current_line = None
- msg = '\n' + msg
+ with self._lock:
+ if self._current_line is not None:
+ self._current_line = None
+ msg = '\n' + msg
- self._print(self._ensure_new_line(msg))
+ self._print(self._ensure_new_line(msg))
def printerr(self, msg):
- self.stderr.write(self._ensure_new_line(msg))
- self.stderr.flush()
+ with self._lock:
+ self.stderr.write(self._ensure_new_line(msg))
+ self.stderr.flush()
def print_test_start(self, msg):
self.printout(msg)
diff --git a/regtest/Test.py b/regtest/Test.py
index fb86dbe..3e22e74 100644
--- a/regtest/Test.py
+++ b/regtest/Test.py
@@ -191,26 +191,6 @@ class Test:
return True
- def _check_exit_status2(self, p1, p2, out_path):
- p1_stderr = p1.stderr.read()
- status1 = p1.wait()
- p2_stderr = p2.stderr.read()
- status2 = p2.wait()
-
- if p1_stderr or p2_stderr:
- self.__create_stderr_file(p1_stderr + p2_stderr, out_path)
-
- if not os.WIFEXITED(status1) or not os.WIFEXITED(status2):
- open(os.path.join(out_path, 'crashed'), 'w').close()
- return False
-
- if self.__create_failed_file_if_needed(status1, out_path):
- return False
- if self.__create_failed_file_if_needed(status2, out_path):
- return False
-
- return True
-
def _create_diff(self, ref_path, result_path):
try:
from PIL import Image, ImageChops
@@ -224,8 +204,7 @@ class Test:
def create_refs(self, doc_path, refs_path):
out_path = os.path.join(refs_path, 'page')
- p1 = subprocess.Popen([self._xpstopng, '-r', '72', '-e', doc_path, out_path], stderr =
subprocess.PIPE)
- p2 = subprocess.Popen([self._xpstopng, '-r', '72', '-o', doc_path, out_path], stderr =
subprocess.PIPE)
+ p = subprocess.Popen([self._xpstopng, '-r', '72', doc_path, out_path], stderr = subprocess.PIPE)
+ return self._check_exit_status(p, refs_path)
- return self._check_exit_status2(p1, p2, refs_path)
diff --git a/regtest/TestReferences.py b/regtest/TestReferences.py
index 5d114b1..60d91e4 100644
--- a/regtest/TestReferences.py
+++ b/regtest/TestReferences.py
@@ -22,6 +22,8 @@ from Test import Test
from Config import Config
from Printer import get_printer
from Utils import get_document_paths_from_dir, get_skipped_tests
+from Queue import Queue
+from threading import Thread
class TestReferences:
@@ -33,6 +35,8 @@ class TestReferences:
self.config = Config()
self.printer = get_printer()
+ self._queue = Queue()
+
try:
os.makedirs(self._refsdir)
except OSError as e:
@@ -64,9 +68,25 @@ class TestReferences:
if self._test.create_refs(doc_path, refs_path):
self._test.create_checksums(refs_path, self.config.checksums_only)
+ def _worker_thread(self):
+ while True:
+ doc, n_doc, total_docs = self._queue.get()
+ self.create_refs_for_file(doc, n_doc, total_docs)
+ self._queue.task_done()
+
def create_refs(self):
docs, total_docs = get_document_paths_from_dir(self._docsdir)
+
+ self.printer.printout_ln('Process %d is spawning %d worker threads...' % (os.getpid(),
self.config.threads))
+
+ for n_thread in range(self.config.threads):
+ thread = Thread(target=self._worker_thread)
+ thread.daemon = True
+ thread.start()
+
n_doc = 0
for doc in docs:
n_doc += 1
- self.create_refs_for_file(doc, n_doc, total_docs)
+ self._queue.put((doc, n_doc, total_docs))
+
+ self._queue.join()
diff --git a/regtest/TestRun.py b/regtest/TestRun.py
index 3f39c9f..dbf61af 100644
--- a/regtest/TestRun.py
+++ b/regtest/TestRun.py
@@ -23,6 +23,8 @@ from Printer import get_printer
import sys
import os
import errno
+from Queue import Queue
+from threading import Thread, RLock
class TestRun:
@@ -43,6 +45,9 @@ class TestRun:
self._failed_status_error = []
self._stderr = []
+ self._queue = Queue()
+ self._lock = RLock()
+
try:
os.makedirs(self._outdir);
except OSError as e:
@@ -60,21 +65,26 @@ class TestRun:
self.printer.print_default("Reference files not found, skipping '%s'" % (doc_path))
return
- self._n_tests += 1
+ with self._lock:
+ self._n_tests += 1
+
self.printer.print_test_start("Testing '%s' (%d/%d): " % (doc_path, n_doc, total_docs))
test_has_md5 = self._test.create_refs(doc_path, test_path)
if self._test.has_stderr(test_path):
- self._stderr.append(doc_path)
+ with self._lock:
+ self._stderr.append(doc_path)
if ref_has_md5 and test_has_md5:
if self._test.compare_checksums(refs_path, test_path, not self.config.keep_results,
self.config.create_diffs, self.config.update_refs):
# FIXME: remove dir if it's empty?
self.printer.print_test_result("PASS")
- self._n_passed += 1
+ with self._lock:
+ self._n_passed += 1
else:
self.printer.print_test_result_ln("FAIL")
- self._failed.append(doc_path)
+ with self._lock:
+ self._failed.append(doc_path)
return
elif test_has_md5:
if ref_is_crashed:
@@ -87,24 +97,28 @@ class TestRun:
test_is_crashed = self._test.is_crashed(test_path)
if ref_is_crashed and test_is_crashed:
self.printer.print_test_result("PASS (Expected crash)")
- self._n_passed += 1
+ with self._lock:
+ self._n_passed += 1
return
test_is_failed = self._test.is_failed(test_path)
if ref_is_failed and test_is_failed:
# FIXME: compare status errors
self.printer.print_test_result("PASS (Expected fail with status error %d)" % (test_is_failed))
- self._n_passed += 1
+ with self._lock:
+ self._n_passed += 1
return
if test_is_crashed:
self.printer.print_test_result_ln("CRASH")
- self._crashed.append(doc_path)
+ with self._lock:
+ self._crashed.append(doc_path)
return
if test_is_failed:
self.printer.print_test_result_ln("FAIL (status error %d)" % (test_is_failed))
- self._failed_status_error(doc_path)
+ with self._lock:
+ self._failed_status_error(doc_path)
return
def run_test(self, filename, n_doc = 1, total_docs = 1):
@@ -129,12 +143,28 @@ class TestRun:
self.test(refs_path, doc_path, out_path, n_doc, total_docs)
+ def _worker_thread(self):
+ while True:
+ doc, n_doc, total_docs = self._queue.get()
+ self.run_test(doc, n_doc, total_docs)
+ self._queue.task_done()
+
def run_tests(self):
docs, total_docs = get_document_paths_from_dir(self._docsdir)
+
+ self.printer.printout_ln('Process %d is spawning %d worker threads...' % (os.getpid(),
self.config.threads))
+
+ for n_thread in range(self.config.threads):
+ thread = Thread(target=self._worker_thread)
+ thread.daemon = True
+ thread.start()
+
n_doc = 0
for doc in docs:
n_doc += 1
- self.run_test(doc, n_doc, total_docs)
+ self._queue.put((doc, n_doc, total_docs))
+
+ self._queue.join()
def summary(self):
if not self._n_tests:
diff --git a/regtest/main.py b/regtest/main.py
index d84524e..1a29bce 100644
--- a/regtest/main.py
+++ b/regtest/main.py
@@ -21,6 +21,7 @@ import argparse
import commands
import os
from Config import Config
+from multiprocessing import cpu_count
class ListAction(argparse.Action):
def __call__(self, parser, namespace, values, option_string = None):
@@ -57,13 +58,19 @@ def main(args):
parser.add_argument('--skip', metavar = 'FILE',
action = 'store', dest = 'skipped_file',
help = 'File containing tests to skip')
+ parser.add_argument('-t', '--threads',
+ action = 'store', dest = 'threads', type = int, default = 1,
+ help = 'Number of worker threads')
ns, args = parser.parse_known_args(args)
if not args:
parser.print_help()
sys.exit(0)
- Config(vars(ns))
+ config = Config(vars(ns))
+ if config.threads <= 0:
+ config.threads = cpu_count() - config.threads
+
try:
commands.run(args)
except commands.UnknownCommandError:
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]