[gnome-continuous-yocto/gnomeostree-3.28-rocko: 6573/8267] runcmd.py: unit testing for runCmd()
- From: Emmanuele Bassi <ebassi src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [gnome-continuous-yocto/gnomeostree-3.28-rocko: 6573/8267] runcmd.py: unit testing for runCmd()
- Date: Sun, 17 Dec 2017 05:02:05 +0000 (UTC)
commit 8b6fde28e9fe7503f6946500e5b4ee30664501b9
Author: Patrick Ohly <patrick ohly intel com>
Date: Tue Jun 27 13:03:41 2017 +0200
runcmd.py: unit testing for runCmd()
This covers the traditional API as well as the new output_log feature.
While testing, it was noticed that killing hanging commands does not
work when a shell is used to run the command(s). This might be worth
fixing.
(From OE-Core rev: 62489e58ca9975f58b48fc2bd8cf27fd22e25564)
Signed-off-by: Patrick Ohly <patrick ohly intel com>
Signed-off-by: Ross Burton <ross burton intel com>
Signed-off-by: Richard Purdie <richard purdie linuxfoundation org>
meta/lib/oeqa/selftest/cases/runcmd.py | 117 ++++++++++++++++++++++++++++++++
1 files changed, 117 insertions(+), 0 deletions(-)
---
diff --git a/meta/lib/oeqa/selftest/cases/runcmd.py b/meta/lib/oeqa/selftest/cases/runcmd.py
new file mode 100644
index 0000000..6c785ca
--- /dev/null
+++ b/meta/lib/oeqa/selftest/cases/runcmd.py
@@ -0,0 +1,117 @@
+from oeqa.selftest.case import OESelftestTestCase
+from oeqa.utils.commands import runCmd
+from oeqa.utils import CommandError
+
+import subprocess
+import threading
+import time
+import signal
+
+class MemLogger(object):
+ def __init__(self):
+ self.info_msgs = []
+ self.error_msgs = []
+
+ def info(self, msg):
+ self.info_msgs.append(msg)
+
+ def error(self, msg):
+ self.error_msgs.append(msg)
+
+class RunCmdTests(OESelftestTestCase):
+ """ Basic tests for runCmd() utility function """
+
+ # The delta is intentionally smaller than the timeout, to detect cases where
+ # we incorrectly apply the timeout more than once.
+ TIMEOUT = 2
+ DELTA = 1
+
+ def test_result_okay(self):
+ result = runCmd("true")
+ self.assertEqual(result.status, 0)
+
+ def test_result_false(self):
+ result = runCmd("false", ignore_status=True)
+ self.assertEqual(result.status, 1)
+
+ def test_shell(self):
+ # A shell is used for all string commands.
+ result = runCmd("false; true", ignore_status=True)
+ self.assertEqual(result.status, 0)
+
+ def test_no_shell(self):
+ self.assertRaises(FileNotFoundError,
+ runCmd, "false; true", shell=False)
+
+ def test_list_not_found(self):
+ self.assertRaises(FileNotFoundError,
+ runCmd, ["false; true"])
+
+ def test_list_okay(self):
+ result = runCmd(["true"])
+ self.assertEqual(result.status, 0)
+
+ def test_result_assertion(self):
+ self.assertRaisesRegexp(AssertionError, "Command 'echo .* false' returned non-zero exit status
1:\nfoobar",
+ runCmd, "echo foobar >&2; false", shell=True)
+
+ def test_result_exception(self):
+ self.assertRaisesRegexp(CommandError, "Command 'echo .* false' returned non-zero exit status 1 with
output: foobar",
+ runCmd, "echo foobar >&2; false", shell=True, assert_error=False)
+
+ def test_output(self):
+ result = runCmd("echo stdout; echo stderr >&2", shell=True)
+ self.assertEqual("stdout\nstderr", result.output)
+ self.assertEqual("", result.error)
+
+ def test_output_split(self):
+ result = runCmd("echo stdout; echo stderr >&2", shell=True, stderr=subprocess.PIPE)
+ self.assertEqual("stdout", result.output)
+ self.assertEqual("stderr", result.error)
+
+ def test_timeout(self):
+ numthreads = threading.active_count()
+ start = time.time()
+ # Killing a hanging process only works when not using a shell?!
+ result = runCmd(['sleep', '60'], timeout=self.TIMEOUT, ignore_status=True)
+ self.assertEqual(result.status, -signal.SIGTERM)
+ end = time.time()
+ self.assertLess(end - start, self.TIMEOUT + self.DELTA)
+ self.assertEqual(numthreads, threading.active_count())
+
+ def test_timeout_split(self):
+ numthreads = threading.active_count()
+ start = time.time()
+ # Killing a hanging process only works when not using a shell?!
+ result = runCmd(['sleep', '60'], timeout=self.TIMEOUT, ignore_status=True, stderr=subprocess.PIPE)
+ self.assertEqual(result.status, -signal.SIGTERM)
+ end = time.time()
+ self.assertLess(end - start, self.TIMEOUT + self.DELTA)
+ self.assertEqual(numthreads, threading.active_count())
+
+ def test_stdin(self):
+ numthreads = threading.active_count()
+ result = runCmd("cat", data=b"hello world", timeout=self.TIMEOUT)
+ self.assertEqual("hello world", result.output)
+ self.assertEqual(numthreads, threading.active_count())
+
+ def test_stdin_timeout(self):
+ numthreads = threading.active_count()
+ start = time.time()
+ result = runCmd(['sleep', '60'], data=b"hello world", timeout=self.TIMEOUT, ignore_status=True)
+ self.assertEqual(result.status, -signal.SIGTERM)
+ end = time.time()
+ self.assertLess(end - start, self.TIMEOUT + self.DELTA)
+ self.assertEqual(numthreads, threading.active_count())
+
+ def test_log(self):
+ log = MemLogger()
+ result = runCmd("echo stdout; echo stderr >&2", shell=True, output_log=log)
+ self.assertEqual(["Running: echo stdout; echo stderr >&2", "stdout", "stderr"], log.info_msgs)
+ self.assertEqual([], log.error_msgs)
+
+ def test_log_split(self):
+ log = MemLogger()
+ result = runCmd("echo stdout; echo stderr >&2", shell=True, output_log=log, stderr=subprocess.PIPE)
+ self.assertEqual(["Running: echo stdout; echo stderr >&2", "stdout"], log.info_msgs)
+ self.assertEqual(["stderr"], log.error_msgs)
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]