[gnome-settings-daemon/benzea/tests-output-checker: 2/3] tests: Add new OutputChecker class




commit 2fc2c1832b89576af23624a66f26c6652e424686
Author: Benjamin Berg <bberg redhat com>
Date:   Wed Dec 2 17:55:37 2020 +0100

    tests: Add new OutputChecker class
    
    This improves logging because everything just goes to stdout directly
    which means that the logs are properly interleaved. It also avoids the
    need for temporary files.

 tests/output_checker.py | 132 ++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 132 insertions(+)
---
diff --git a/tests/output_checker.py b/tests/output_checker.py
new file mode 100644
index 00000000..46b9869c
--- /dev/null
+++ b/tests/output_checker.py
@@ -0,0 +1,132 @@
+#! /usr/bin/env python3
+# Copyright © 2020, RedHat Inc.
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+# Authors:
+#       Benjamin Berg <bberg redhat com>
+
+import os
+import sys
+import fcntl
+import io
+import re
+import time
+import threading
+
+class OutputChecker(object):
+
+    def __init__(self, out=sys.stdout):
+        self._output = out
+        self._pipe_fd_r, self._pipe_fd_w = os.pipe()
+        self._partial_buf = b''
+        self._lines_sem = threading.Semaphore()
+        self._lines = []
+        self._reader_io = io.StringIO()
+
+        # Just to be sure, shouldn't be a problem even if we didn't set it
+        fcntl.fcntl(self._pipe_fd_r, fcntl.F_SETFL,
+                    fcntl.fcntl(self._pipe_fd_r, fcntl.F_GETFL) | os.O_CLOEXEC)
+        fcntl.fcntl(self._pipe_fd_w, fcntl.F_SETFL,
+                    fcntl.fcntl(self._pipe_fd_w, fcntl.F_GETFL) | os.O_CLOEXEC)
+
+        # Start copier thread
+        self._thread = threading.Thread(target=self._copy)
+        self._thread.start()
+
+    def _copy(self):
+        while True:
+            r = os.read(self._pipe_fd_r, 1024)
+            if not r:
+                return
+
+            l = r.split(b'\n')
+            l[0] = self._partial_buf + l[0]
+            self._lines.extend(l[:-1])
+            self._partial_buf = l[-1]
+
+            self._lines_sem.release()
+
+            os.write(self._output.fileno(), r)
+
+    def check_line(self, needle_re, timeout=0, failmsg=None):
+        deadline = time.time() + timeout
+
+        if isinstance(needle_re, str):
+            needle_re = needle_re.encode('ascii')
+
+        r = re.compile(needle_re)
+        ret = []
+
+        while True:
+            try:
+                l = self._lines.pop(0)
+            except IndexError:
+                # Check if should wake up
+                if not self._lines_sem.acquire(timeout = deadline - time.time()):
+                    if failmsg:
+                        raise AssertionError(failmsg)
+                    else:
+                        raise AssertionError('Timed out waiting for needle %s (timeout: %0.2f)' % 
(str(needle_re), timeout))
+                continue
+
+            ret.append(l)
+            if r.search(l):
+                return ret
+
+    def check_no_line(self, needle_re, wait=0, failmsg=None):
+        deadline = time.time() + wait
+
+        if isinstance(needle_re, str):
+            needle_re = needle_re.encode('ascii')
+
+        r = re.compile(needle_re)
+        ret = []
+
+        while True:
+            try:
+                l = self._lines.pop(0)
+            except IndexError:
+                # Check if should wake up
+                if not self._lines_sem.acquire(timeout = deadline - time.time()):
+                    # Timed out, so everything is good
+                    break
+                continue
+
+            ret.append(l)
+            if r.search(l):
+                if failmsg:
+                    raise AssertionError(failmsg)
+                else:
+                    raise AssertionError('Found needle %s but shouldn\'t have been there (timeout: %0.2f)' % 
(str(needle_re), timeout))
+
+        return ret
+
+    def clear(self):
+        ret = self._lines
+        self._lines = []
+        return ret
+
+    def assert_closed(self, timeout=1):
+        self._thread.join(timeout)
+        if self._thread.is_alive() != False:
+            raise AssertionError("OutputCheck: Write side has not been closed yet!")
+
+    @property
+    def fd(self):
+        return self._pipe_fd_w
+
+    def writer_attached(self):
+        os.close(self._pipe_fd_w)
+        self._pipe_fd_w = -1
+


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]