[file-roller/wip/jtojnar/dogtail-tests: 3/4] fixup! Add basic integration test




commit e61c79b6e874186cf797ad4ab78a4a0baff47c74
Author: Jan Tojnar <jtojnar gmail com>
Date:   Thu Apr 21 16:17:27 2022 +0200

    fixup! Add basic integration test

 default.nix       |  13 +++-
 tests/basic.py    |  16 ++--
 tests/meson.build |   7 +-
 tests/testutil.py | 220 +++++++++++++++++++++++++++++-------------------------
 4 files changed, 147 insertions(+), 109 deletions(-)
---
diff --git a/default.nix b/default.nix
index 07e774eb..b6d548a8 100644
--- a/default.nix
+++ b/default.nix
@@ -98,8 +98,19 @@ makeDerivation rec {
     pkg-config
 
     # For tests
-    dbus
+    (dbus.overrideAttrs (attrs: {
+      patches = attrs.patches ++ [
+        # Fix dbus-daemon crashing when running tests due to long XDG_DATA_DIRS.
+        # https://gitlab.freedesktop.org/dbus/dbus/-/merge_requests/302
+        (fetchpatch {
+          url = 
"https://gitlab.freedesktop.org/dbus/dbus/-/commit/b551b3e9737958216a1a9d359150a4110a9d0549.patch";;
+          sha256 = "kOVjlklZzKvBZXmmrE1UiO4XWRoBLViGwdn6/eDH+DY=";
+        })
+      ];
+    }))
     python3.pkgs.dogtail
+    python3.pkgs.dbus-python
+    python3.pkgs.python-dbusmock
     python3.pkgs.pygobject3
     gobject-introspection # for finding typelibs
 
diff --git a/tests/basic.py b/tests/basic.py
index fe6b0019..687d0a6d 100644
--- a/tests/basic.py
+++ b/tests/basic.py
@@ -6,12 +6,18 @@ from testutil import test_file_roller, mocked_portal_open_uri
 from time import sleep
 
 
-with test_file_roller() as (app, app_tree), \
-    mocked_portal_open_uri(Gio.bus_get_sync(Gio.BusType.SESSION)) as open_uri_portal:
+with test_file_roller() as (bus, (mock_case, legacy_bus), app, app_tree), \
+    mocked_portal_open_uri(mock_case, legacy_bus) as open_uri_portal:
 
-    app.open_file(Path(__file__).parent / "data" / "texts.tar.gz")
+    open_uri_portal.AddMethod(
+        interface = "",
+        name = "OpenFile",
+        in_sig = "sha{sv}",
+        out_sig = "o",
+        code = "print(args)",
+    )
 
-    open_uri_portal.AddMethod("", "OpenFile", "", "", "")
+    # app.open_file(Path(__file__).parent / "data" / "texts.tar.gz")
 
     win = app_tree.get_window("texts.tar.gz")
 
@@ -30,4 +36,4 @@ with test_file_roller() as (app, app_tree), \
     hello_txt = file_listing.child("hello.txt")
     hello_txt.doubleClick()
 
-    sleep(5)
+    sleep(500)
diff --git a/tests/meson.build b/tests/meson.build
index 3f10182c..f46ce283 100644
--- a/tests/meson.build
+++ b/tests/meson.build
@@ -7,12 +7,15 @@ python3_for_dogtail_tests = pymod.find_installation(
     'dbusmock',
     'dogtail',
     'gi',
-    'pyatspi',
   ],
   required: get_option('dogtail'),
 )
+dbus_daemon = find_program(
+  'dbus-daemon',
+  required: get_option('dogtail'),
+)
 
-enable_dogtail_tests = python3_for_dogtail_tests.found()
+enable_dogtail_tests = python3_for_dogtail_tests.found() and dbus_daemon.found()
 
 if enable_dogtail_tests
   test_env = [
diff --git a/tests/testutil.py b/tests/testutil.py
index 00128c74..0a49e9d5 100644
--- a/tests/testutil.py
+++ b/tests/testutil.py
@@ -1,12 +1,13 @@
 from gi.repository import GLib, Gio
 
+print('foo')
 from dogtail.utils import isA11yEnabled, enableA11y
 
 if not isA11yEnabled():
     enableA11y(True)
 
 from contextlib import AbstractContextManager, contextmanager
-from dogtail import tree
+print('foobat')
 from dogtail.predicate import Predicate
 from enum import Enum
 import errno
@@ -19,52 +20,53 @@ from textwrap import dedent
 import time
 from typing import Tuple
 
-# import dbus
-# from dbus.mainloop.glib import DBusGMainLoop
+import dbus
+import dbusmock
+from dbus.mainloop.glib import DBusGMainLoop
 
 
-# @contextmanager
-# def mocked_bus(system_bus: bool = False) -> dbus.Bus:
-#     try:
-#         case = dbusmock.DBusTestCase()
-#         case.start_session_bus()
-#         DBusGMainLoop(set_as_default=True)
-#         dbus.bus.BusConnection(os.environ['DBUS_SESSION_BUS_ADDRESS'])
-#         yield case, case.get_dbus(system_bus=system_bus)
-#     finally:
-#         case.tearDown()
+@contextmanager
+def bus_for_testing(system_bus: bool = False) -> dbus.Bus:
+    try:
+        case = dbusmock.DBusTestCase()
+        case.start_session_bus()
+        DBusGMainLoop(set_as_default=True)
+        dbus.bus.BusConnection(os.environ['DBUS_SESSION_BUS_ADDRESS'])
+        yield case, case.get_dbus(system_bus=system_bus)
+    finally:
+        case.tearDown()
 
 
-# @contextmanager
-# def mocked_portal_open_uri(bus: dbus.Bus):
-#     try:
-#         p_mock = bus.spawn_server(
-#             "org.freedesktop.portal.Desktop",
-#             "/org/freedesktop/portal/desktop",
-#             "org.freedesktop.portal.OpenURI",
-#             # stdout=subprocess.PIPE,
-#         )
-
-#         # Get a proxy for the Desktop object's Mock interface
-#         desktop_portal_mock = dbus.Interface(
-#             bus.get_object(
-#                 "org.freedesktop.portal.Desktop", "/org/freedesktop/portal/desktop"
-#             ),
-#             dbusmock.MOCK_IFACE,
-#         )
-
-#         yield desktop_portal_mock
+@contextmanager
+def mocked_portal_open_uri(mock_case: dbusmock.DBusTestCase, bus: dbus.Bus):
+    p_mock = mock_case.spawn_server(
+        "org.freedesktop.portal.Desktop",
+        "/org/freedesktop/portal/desktop",
+        "org.freedesktop.portal.OpenURI",
+        # stdout=subprocess.PIPE,
+    )
 
-#     finally:
-#         # p_mock.stdout.close()
-#         p_mock.terminate()
-#         p_mock.wait()
+    try:
+        # Get a proxy for the Desktop object's Mock interface
+        desktop_portal_mock = dbus.Interface(
+            bus.get_object(
+                "org.freedesktop.portal.Desktop", "/org/freedesktop/portal/desktop"
+            ),
+            dbusmock.MOCK_IFACE,
+        )
+
+        yield desktop_portal_mock
+
+    finally:
+        # p_mock.stdout.close()
+        p_mock.terminate()
+        p_mock.wait()
 
-#     def test_suspend_on_idle(self):
-#         # run your program in a way that should trigger one suspend call
+    def test_suspend_on_idle(self):
+        # run your program in a way that should trigger one suspend call
 
-#         # now check the log that we got one OpenFile() call
-#         self.assertRegex(self.p_mock.stdout.readline(), b"^[0-9.]+ Suspend$")
+        # now check the log that we got one OpenFile() call
+        self.assertRegex(self.p_mock.stdout.readline(), b"^[0-9.]+ Suspend$")
 
 
 def start_dbus_session() -> Tuple[int, str]:
@@ -78,8 +80,12 @@ def start_dbus_session() -> Tuple[int, str]:
         "--fork",
         "--print-address=1",
         "--print-pid=1",
-        "--session",
     ]
+
+    if "DBUS_SESSION_CONFIG" in os.environ:
+        argv.append("--config-file=" + os.environ["DBUS_SESSION_CONFIG"])
+    else:
+        argv.append("--session")
     lines = subprocess.check_output(argv, universal_newlines=True).strip().splitlines()
     assert len(lines) == 2, "expected exactly 2 lines of output from dbus-daemon"
     # usually the first line is the address, but be lenient and accept any order
@@ -111,63 +117,66 @@ def stop_dbus(pid: int) -> None:
     signal.signal(signal.SIGTERM, signal.SIG_DFL)
 
 
-@contextmanager
-def session_bus_for_testing() -> AbstractContextManager[Gio.DBusConnection]:
-    original_bus_address = os.environ.get("DBUS_SESSION_BUS_ADDRESS", None)
-
-    pid, bus_address = start_dbus_session()
-    os.environ["DBUS_SESSION_BUS_ADDRESS"] = bus_address
-    print(bus_address)
-
-    try:
-        # bus = Gio.bus_get_sync(Gio.BusType.SESSION)
-        bus = Gio.DBusConnection.new_for_address_sync(bus_address, Gio.DBusConnectionFlags.NONE)
-
-        yield bus
-    finally:
-        stop_dbus(pid)
-
-        if original_bus_address is not None:
-            os.environ["DBUS_SESSION_BUS_ADDRESS"] = original_bus_address
-        else:
-            os.environ.pop("DBUS_SESSION_BUS_ADDRESS")
-
+# @contextmanager
+# def session_bus_for_testing() -> AbstractContextManager[Gio.DBusConnection]:
+#     original_bus_address = os.environ.get("DBUS_SESSION_BUS_ADDRESS", None)
 
-@contextmanager
-def mocked_portal_open_uri(bus: Gio.DBusConnection) -> AbstractContextManager[Gio.DBusConnection]:
-    with open(Path(__file__).parent / "org.freedesktop.portal.OpenURI.xml") as introspection_xml:
-        introspection_data = Gio.dbus_node_info_new_for_xml(introspection_xml.read())
-        assert introspection_data is not None
+#     pid, bus_address = start_dbus_session()
+#     print(pid, bus_address, original_bus_address)
+#     os.environ["DBUS_SESSION_BUS_ADDRESS"] = bus_address
 
-        def on_bus_acquired():
-            print("on_bus_acquired")
-            pass
+#     try:
+#         # bus = Gio.bus_get_sync(Gio.BusType.SESSION)
+#         bus = Gio.DBusConnection.new_for_address_sync(bus_address, 
Gio.DBusConnectionFlags.AUTHENTICATION_CLIENT | Gio.DBusConnectionFlags.MESSAGE_BUS_CONNECTION)
 
-        def on_method_called():
-            print("on_method_called")
-            pass
+#         yield bus
+#     finally:
+#         stop_dbus(pid)
 
-        try:
-            owner_id = Gio.bus_own_name_on_connection(
-                bus,
-                "org.freedesktop.portal.Desktop",
-                Gio.BusNameOwnerFlags.NONE,
-                on_bus_acquired,
-            )
-            print(2)
+#         if original_bus_address is not None:
+#             os.environ["DBUS_SESSION_BUS_ADDRESS"] = original_bus_address
+#         else:
+#             os.environ.pop("DBUS_SESSION_BUS_ADDRESS")
 
-            registration_id = Gio.dbus_connection_register_object(
-                connection,
-                "/org/freedesktop/portal/desktop",
-                introspection_data.get_interfaces()[0],
-                on_method_called,
-            )
-            assert registration_id > 0
-            print(3)
-            yield owner_id
 
-        finally:
-            Gio.bus_unown_name(owner_id)
+# @contextmanager
+# def mocked_portal_open_uri(bus: Gio.DBusConnection) -> AbstractContextManager[Gio.DBusConnection]:
+#     with open(Path(__file__).parent / "org.freedesktop.portal.OpenURI.xml") as introspection_xml:
+#         introspection_data = Gio.DBusNodeInfo.new_for_xml(introspection_xml.read())
+#         assert introspection_data is not None
+
+#         def on_bus_lost():
+#             print("on_bus_lost")
+
+#         def on_bus_acquired():
+#             print("on_bus_acquired")
+#             registration_id = bus.register_object(
+#                 object_path="/org/freedesktop/portal/desktop",
+#                 interface_info=introspection_data.interfaces[0],
+#                 method_call_closure=on_method_called,
+#             )
+#             assert registration_id > 0
+
+#         def on_method_called():
+#             print("on_method_called")
+#             pass
+
+#         try:
+#             owner_id = Gio.bus_own_name_on_connection(
+#                 connection=bus,
+#                 name="org.freedesktop.portal.Desktop",
+#                 flags=Gio.BusNameOwnerFlags.REPLACE | Gio.BusNameOwnerFlags.DO_NOT_QUEUE,
+#                 name_acquired_closure=on_bus_acquired,
+#                 name_lost_closure=on_bus_lost,
+#             )
+#             print(2)
+
+#             print(3)
+#             yield owner_id
+
+#         finally:
+#             print('unownd')
+#             Gio.bus_unown_name(owner_id)
 
 
 class WaitState(Enum):
@@ -176,15 +185,18 @@ class WaitState(Enum):
     TIMEOUT = 3  # timed out before seeing it
 
 
-def wait_for_name(bus: Gio.DBusConnection, name: str, timeout: int = 10) -> bool:
+def wait_for_name(bus: Gio.DBusConnection, name: str, timeout: int = 100) -> bool:
     wait_state = WaitState.RUNNING
+    print(name)
 
     def wait_name_appeared_cb(connection: Gio.DBusConnection, name: str, name_owner: str):
         nonlocal wait_state
+        print('wait_name_appeared_cb')
         wait_state = WaitState.SUCCESS
 
     def wait_timeout_cb():
         nonlocal wait_state
+        print('wait_timeout_cb')
         wait_state = WaitState.TIMEOUT
 
     watch_id = Gio.bus_watch_name_on_connection(bus, name, Gio.BusNameWatcherFlags.NONE, 
wait_name_appeared_cb, None)
@@ -266,10 +278,14 @@ class FileRollerDogtail:
     def __init__(self, app_name):
         self.app_name = app_name
 
-    def get_application(self) -> tree.Application:
-        return tree.root.application(self.app_name)
+        # Importing it has side-effects.
+        from dogtail import tree
+        self.tree = tree
+
+    def get_application(self):
+        return self.tree.root.application(self.app_name)
 
-    def get_window(self, name: str) -> tree.Window:
+    def get_window(self, name: str) -> self.tree.Window:
         # Cannot use default window finder because the window name has extra space after the file name.
         return self.get_application().findChild(IsAWindowApproximatelyNamed(name), False)
 
@@ -291,7 +307,7 @@ def new_user_data_dir() -> AbstractContextManager[Path]:
 @contextmanager
 def test_file_roller():
     # Use custom user config and data directories to avoid changing user’s MIME associations.
-    with session_bus_for_testing() as bus, \
+    with bus_for_testing() as (mock_case, legacy_bus), \
         new_user_config_dir() as confdir, \
         new_user_data_dir() as datadir:
 
@@ -312,15 +328,17 @@ def test_file_roller():
         desktop = Gio.DesktopAppInfo.new_from_filename(str(desktop_file_path))
         desktop.set_as_default_for_type("text/plain")
 
-        try:
-            # Start the tested app.
-            dbus_app = FileRollerDbus(bus)
+        # Start the tested app.
+        bus = Gio.bus_get_sync(Gio.BusType.SESSION)
+        dbus_app = FileRollerDbus(bus)
 
+        try:
             assert wait_for_name(bus, dbus_app.APPLICATION_ID), f"Waiting for {dbus_app.APPLICATION_ID} 
should not time out."
             print(1)
 
             dogtail_app = FileRollerDogtail("file-roller")
+            print(1)
 
-            yield dbus_app, dogtail_app
+            yield bus, (mock_case, legacy_bus), dbus_app, dogtail_app
         finally:
             dbus_app.quit()


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]