[gnome-settings-daemon/benzea/tests-output-checker: 3/5] tests: Add new OutputChecker class
- From: Benjamin Berg <bberg src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [gnome-settings-daemon/benzea/tests-output-checker: 3/5] tests: Add new OutputChecker class
- Date: Mon, 15 Feb 2021 16:29:32 +0000 (UTC)
commit d26394ae051dd7854e81bff88f2d10899f986bea
Author: Benjamin Berg <bberg redhat com>
Date: Wed Dec 2 17:55:37 2020 +0100
tests: Add new OutputChecker class
This improves logging because everything just goes to stdout directly
which means that the logs are properly interleaved. It also avoids the
need for temporary files.
tests/output_checker.py | 178 ++++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 178 insertions(+)
---
diff --git a/tests/output_checker.py b/tests/output_checker.py
new file mode 100644
index 00000000..3f5a1e1c
--- /dev/null
+++ b/tests/output_checker.py
@@ -0,0 +1,178 @@
+#! /usr/bin/env python3
+# Copyright © 2020, RedHat Inc.
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library. If not, see <http://www.gnu.org/licenses/>.
+# Authors:
+# Benjamin Berg <bberg redhat com>
+
+import os
+import sys
+import fcntl
+import io
+import re
+import time
+import threading
+import select
+import errno
+
+class OutputChecker(object):
+
+ def __init__(self, out=sys.stdout):
+ self._output = out
+ self._pipe_fd_r, self._pipe_fd_w = os.pipe()
+ self._partial_buf = b''
+ self._lines_sem = threading.Semaphore()
+ self._lines = []
+ self._reader_io = io.StringIO()
+
+ # Just to be sure, shouldn't be a problem even if we didn't set it
+ fcntl.fcntl(self._pipe_fd_r, fcntl.F_SETFL,
+ fcntl.fcntl(self._pipe_fd_r, fcntl.F_GETFL) | os.O_CLOEXEC | os.O_NONBLOCK)
+ fcntl.fcntl(self._pipe_fd_w, fcntl.F_SETFL,
+ fcntl.fcntl(self._pipe_fd_w, fcntl.F_GETFL) | os.O_CLOEXEC)
+
+ # Start copier thread
+ self._thread = threading.Thread(target=self._copy)
+ self._thread.start()
+
+ def _copy(self):
+ while True:
+ try:
+ # Be lazy and wake up occasionally in case _pipe_fd_r became invalid
+ # The reason to do this is because os.read() will *not* return if the
+ # FD is forcefully closed.
+ select.select([self._pipe_fd_r], [], [], 0.1)
+
+ r = os.read(self._pipe_fd_r, 1024)
+ if not r:
+ return
+ except OSError as e:
+ if e.errno == errno.EWOULDBLOCK:
+ continue
+
+ # We get a bad file descriptor error when the outside closes the FD
+ return
+
+ l = r.split(b'\n')
+ l[0] = self._partial_buf + l[0]
+ self._lines.extend(l[:-1])
+ self._partial_buf = l[-1]
+
+ self._lines_sem.release()
+
+ os.write(self._output.fileno(), r)
+
+ def check_line_re(self, needle_re, timeout=0, failmsg=None):
+ deadline = time.time() + timeout
+
+ if isinstance(needle_re, str):
+ needle_re = needle_re.encode('ascii')
+
+ r = re.compile(needle_re)
+ ret = []
+
+ while True:
+ try:
+ l = self._lines.pop(0)
+ except IndexError:
+ # Check if should wake up
+ if not self._lines_sem.acquire(timeout = deadline - time.time()):
+ if failmsg:
+ raise AssertionError(failmsg)
+ else:
+ raise AssertionError('Timed out waiting for needle %s (timeout: %0.2f)' %
(str(needle_re), timeout))
+ continue
+
+ ret.append(l)
+ if r.search(l):
+ return ret
+
+ def check_line(self, needle, timeout=0, failmsg=None):
+ if isinstance(needle, str):
+ needle = needle.encode('ascii')
+
+ needle_re = re.escape(needle)
+
+ return self.check_line_re(needle_re, timeout=timeout, failmsg=failmsg)
+
+ def check_no_line_re(self, needle_re, wait=0, failmsg=None):
+ deadline = time.time() + wait
+
+ if isinstance(needle_re, str):
+ needle_re = needle_re.encode('ascii')
+
+ r = re.compile(needle_re)
+ ret = []
+
+ while True:
+ try:
+ l = self._lines.pop(0)
+ except IndexError:
+ # Check if should wake up
+ if not self._lines_sem.acquire(timeout = deadline - time.time()):
+ # Timed out, so everything is good
+ break
+ continue
+
+ ret.append(l)
+ if r.search(l):
+ if failmsg:
+ raise AssertionError(failmsg)
+ else:
+ raise AssertionError('Found needle %s but shouldn\'t have been there (timeout: %0.2f)' %
(str(needle_re), timeout))
+
+ return ret
+
+ def check_no_line(self, needle, wait=0, failmsg=None):
+ if isinstance(needle, str):
+ needle = needle.encode('ascii')
+
+ needle_re = re.escape(needle)
+
+ return self.check_no_line_re(needle_re, wait=wait, failmsg=failmsg)
+
+ def clear(self):
+ ret = self._lines
+ self._lines = []
+ return ret
+
+ def assert_closed(self, timeout=1):
+ self._thread.join(timeout)
+
+ if self._thread.is_alive() != False:
+ raise AssertionError("OutputCheck: Write side has not been closed yet!")
+
+ def force_close(self):
+
+ fd = self._pipe_fd_r
+ self._pipe_fd_r = -1
+ os.close(fd)
+
+ self._thread.join()
+
+ @property
+ def fd(self):
+ return self._pipe_fd_w
+
+ def writer_attached(self):
+ os.close(self._pipe_fd_w)
+ self._pipe_fd_w = -1
+
+ def __del__(self):
+ if self._pipe_fd_r > 0:
+ os.close(self._pipe_fd_r)
+ if self._pipe_fd_w > 0:
+ os.close(self._pipe_fd_w)
+
+ assert not self._thread.is_alive()
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]