[libgxps] regtest: implement parallel testing using Python's Queue class



commit 41a13771ca76b1ee0b2f036a27de374cd13af0cc
Author: Adam Reichold <adamreichold myopera com>
Date:   Tue Jun 23 17:25:43 2015 +0200

    regtest: implement parallel testing using Python's Queue class

 regtest/Printer.py        |   35 +++++++++++++++++++-------------
 regtest/Test.py           |   25 +---------------------
 regtest/TestReferences.py |   22 +++++++++++++++++++-
 regtest/TestRun.py        |   48 ++++++++++++++++++++++++++++++++++++--------
 regtest/main.py           |    9 +++++++-
 5 files changed, 91 insertions(+), 48 deletions(-)
---
diff --git a/regtest/Printer.py b/regtest/Printer.py
index 008f46b..025f697 100644
--- a/regtest/Printer.py
+++ b/regtest/Printer.py
@@ -18,6 +18,7 @@
 
 import sys
 from Config import Config
+from threading import RLock
 
 class Printer:
 
@@ -32,6 +33,8 @@ class Printer:
         self._rewrite = self._stream.isatty() and not self._verbose
         self._current_line = None
 
+        self._lock = RLock()
+
         Printer.__single = self
 
     def _erase_current_line(self):
@@ -52,27 +55,31 @@ class Printer:
         self._stream.flush()
 
     def printout(self, msg):
-        self._erase_current_line()
-        self._print(msg)
-        self._current_line = msg[msg.rfind('\n') + 1:]
+        with self._lock:
+            self._erase_current_line()
+            self._print(msg)
+            self._current_line = msg[msg.rfind('\n') + 1:]
 
     def printout_update(self, msg):
-        if self._rewrite and self._current_line is not None:
-            msg = self._current_line + msg
-        elif not self._rewrite:
-            msg = self._ensure_new_line(msg)
-        self.printout(msg)
+        with self._lock:
+            if self._rewrite and self._current_line is not None:
+                msg = self._current_line + msg
+            elif not self._rewrite:
+                msg = self._ensure_new_line(msg)
+            self.printout(msg)
 
     def printout_ln(self, msg):
-        if self._current_line is not None:
-            self._current_line = None
-            msg = '\n' + msg
+        with self._lock:
+            if self._current_line is not None:
+                self._current_line = None
+                msg = '\n' + msg
 
-        self._print(self._ensure_new_line(msg))
+            self._print(self._ensure_new_line(msg))
 
     def printerr(self, msg):
-        self.stderr.write(self._ensure_new_line(msg))
-        self.stderr.flush()
+        with self._lock:
+            self.stderr.write(self._ensure_new_line(msg))
+            self.stderr.flush()
 
     def print_test_start(self, msg):
         self.printout(msg)
diff --git a/regtest/Test.py b/regtest/Test.py
index fb86dbe..3e22e74 100644
--- a/regtest/Test.py
+++ b/regtest/Test.py
@@ -191,26 +191,6 @@ class Test:
 
         return True
 
-    def _check_exit_status2(self, p1, p2, out_path):
-        p1_stderr = p1.stderr.read()
-        status1 = p1.wait()
-        p2_stderr = p2.stderr.read()
-        status2 = p2.wait()
-
-        if p1_stderr or p2_stderr:
-            self.__create_stderr_file(p1_stderr + p2_stderr, out_path)
-
-        if not os.WIFEXITED(status1) or not os.WIFEXITED(status2):
-            open(os.path.join(out_path, 'crashed'), 'w').close()
-            return False
-
-        if self.__create_failed_file_if_needed(status1, out_path):
-            return False
-        if self.__create_failed_file_if_needed(status2, out_path):
-            return False
-
-        return True
-
     def _create_diff(self, ref_path, result_path):
         try:
             from PIL import Image, ImageChops
@@ -224,8 +204,7 @@ class Test:
 
     def create_refs(self, doc_path, refs_path):
         out_path = os.path.join(refs_path, 'page')
-        p1 = subprocess.Popen([self._xpstopng, '-r', '72', '-e', doc_path, out_path], stderr = 
subprocess.PIPE)
-        p2 = subprocess.Popen([self._xpstopng, '-r', '72', '-o', doc_path, out_path], stderr = 
subprocess.PIPE)
+        p = subprocess.Popen([self._xpstopng, '-r', '72', doc_path, out_path], stderr = subprocess.PIPE)
+        return self._check_exit_status(p, refs_path)
 
-        return self._check_exit_status2(p1, p2, refs_path)
 
diff --git a/regtest/TestReferences.py b/regtest/TestReferences.py
index 5d114b1..60d91e4 100644
--- a/regtest/TestReferences.py
+++ b/regtest/TestReferences.py
@@ -22,6 +22,8 @@ from Test import Test
 from Config import Config
 from Printer import get_printer
 from Utils import get_document_paths_from_dir, get_skipped_tests
+from Queue import Queue
+from threading import Thread
 
 class TestReferences:
 
@@ -33,6 +35,8 @@ class TestReferences:
         self.config = Config()
         self.printer = get_printer()
 
+        self._queue = Queue()
+
         try:
             os.makedirs(self._refsdir)
         except OSError as e:
@@ -64,9 +68,25 @@ class TestReferences:
         if self._test.create_refs(doc_path, refs_path):
             self._test.create_checksums(refs_path, self.config.checksums_only)
 
+    def _worker_thread(self):
+        while True:
+            doc, n_doc, total_docs = self._queue.get()
+            self.create_refs_for_file(doc, n_doc, total_docs)
+            self._queue.task_done()
+
     def create_refs(self):
         docs, total_docs = get_document_paths_from_dir(self._docsdir)
+
+        self.printer.printout_ln('Process %d is spawning %d worker threads...' % (os.getpid(), 
self.config.threads))
+
+        for n_thread in range(self.config.threads):
+            thread = Thread(target=self._worker_thread)
+            thread.daemon = True
+            thread.start()
+
         n_doc = 0
         for doc in docs:
             n_doc += 1
-            self.create_refs_for_file(doc, n_doc, total_docs)
+            self._queue.put((doc, n_doc, total_docs))
+
+        self._queue.join()
diff --git a/regtest/TestRun.py b/regtest/TestRun.py
index 3f39c9f..dbf61af 100644
--- a/regtest/TestRun.py
+++ b/regtest/TestRun.py
@@ -23,6 +23,8 @@ from Printer import get_printer
 import sys
 import os
 import errno
+from Queue import Queue
+from threading import Thread, RLock
 
 class TestRun:
 
@@ -43,6 +45,9 @@ class TestRun:
         self._failed_status_error = []
         self._stderr = []
 
+        self._queue = Queue()
+        self._lock = RLock()
+
         try:
             os.makedirs(self._outdir);
         except OSError as e:
@@ -60,21 +65,26 @@ class TestRun:
             self.printer.print_default("Reference files not found, skipping '%s'" % (doc_path))
             return
 
-        self._n_tests += 1
+        with self._lock:
+            self._n_tests += 1
+
         self.printer.print_test_start("Testing '%s' (%d/%d): " % (doc_path, n_doc, total_docs))
         test_has_md5 = self._test.create_refs(doc_path, test_path)
 
         if self._test.has_stderr(test_path):
-            self._stderr.append(doc_path)
+            with self._lock:
+                self._stderr.append(doc_path)
 
         if ref_has_md5 and test_has_md5:
             if self._test.compare_checksums(refs_path, test_path, not self.config.keep_results, 
self.config.create_diffs, self.config.update_refs):
                 # FIXME: remove dir if it's empty?
                 self.printer.print_test_result("PASS")
-                self._n_passed += 1
+                with self._lock:
+                    self._n_passed += 1
             else:
                 self.printer.print_test_result_ln("FAIL")
-                self._failed.append(doc_path)
+                with self._lock:
+                    self._failed.append(doc_path)
             return
         elif test_has_md5:
             if ref_is_crashed:
@@ -87,24 +97,28 @@ class TestRun:
         test_is_crashed = self._test.is_crashed(test_path)
         if ref_is_crashed and test_is_crashed:
             self.printer.print_test_result("PASS (Expected crash)")
-            self._n_passed += 1
+            with self._lock:
+                self._n_passed += 1
             return
 
         test_is_failed = self._test.is_failed(test_path)
         if ref_is_failed and test_is_failed:
             # FIXME: compare status errors
             self.printer.print_test_result("PASS (Expected fail with status error %d)" % (test_is_failed))
-            self._n_passed += 1
+            with self._lock:
+                self._n_passed += 1
             return
 
         if test_is_crashed:
             self.printer.print_test_result_ln("CRASH")
-            self._crashed.append(doc_path)
+            with self._lock:
+                self._crashed.append(doc_path)
             return
 
         if test_is_failed:
             self.printer.print_test_result_ln("FAIL (status error %d)" % (test_is_failed))
-            self._failed_status_error(doc_path)
+            with self._lock:
+                self._failed_status_error(doc_path)
             return
 
     def run_test(self, filename, n_doc = 1, total_docs = 1):
@@ -129,12 +143,28 @@ class TestRun:
 
         self.test(refs_path, doc_path, out_path, n_doc, total_docs)
 
+    def _worker_thread(self):
+        while True:
+            doc, n_doc, total_docs = self._queue.get()
+            self.run_test(doc, n_doc, total_docs)
+            self._queue.task_done()
+
     def run_tests(self):
         docs, total_docs = get_document_paths_from_dir(self._docsdir)
+
+        self.printer.printout_ln('Process %d is spawning %d worker threads...' % (os.getpid(), 
self.config.threads))
+
+        for n_thread in range(self.config.threads):
+            thread = Thread(target=self._worker_thread)
+            thread.daemon = True
+            thread.start()
+
         n_doc = 0
         for doc in docs:
             n_doc += 1
-            self.run_test(doc, n_doc, total_docs)
+            self._queue.put((doc, n_doc, total_docs))
+
+        self._queue.join()
 
     def summary(self):
         if not self._n_tests:
diff --git a/regtest/main.py b/regtest/main.py
index d84524e..1a29bce 100644
--- a/regtest/main.py
+++ b/regtest/main.py
@@ -21,6 +21,7 @@ import argparse
 import commands
 import os
 from Config import Config
+from multiprocessing import cpu_count
 
 class ListAction(argparse.Action):
     def __call__(self, parser, namespace, values, option_string = None):
@@ -57,13 +58,19 @@ def main(args):
     parser.add_argument('--skip', metavar = 'FILE',
                         action = 'store', dest = 'skipped_file',
                         help = 'File containing tests to skip')
+    parser.add_argument('-t', '--threads',
+                        action = 'store', dest = 'threads', type = int, default = 1,
+                        help = 'Number of worker threads')
 
     ns, args = parser.parse_known_args(args)
     if not args:
         parser.print_help()
         sys.exit(0)
 
-    Config(vars(ns))
+    config = Config(vars(ns))
+    if config.threads <= 0:
+        config.threads = cpu_count() - config.threads
+
     try:
         commands.run(args)
     except commands.UnknownCommandError:


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]