[file-roller/wip/jtojnar/dogtail-tests: 2/4] fixup! Add basic integration test




commit 3e171fb8c6c43e67cb6ca08e7401a7e0545eaf64
Author: Jan Tojnar <jtojnar gmail com>
Date:   Wed Apr 20 10:22:50 2022 +0200

    fixup! Add basic integration test

 default.nix       |   1 +
 tests/basic.py    |  12 ++-
 tests/meson.build |   6 +-
 tests/testutil.py | 232 ++++++++++++++++++++++++++++++++++++++++++++++--------
 4 files changed, 215 insertions(+), 36 deletions(-)
---
diff --git a/default.nix b/default.nix
index 83109e2c..07e774eb 100644
--- a/default.nix
+++ b/default.nix
@@ -98,6 +98,7 @@ makeDerivation rec {
     pkg-config
 
     # For tests
+    dbus
     python3.pkgs.dogtail
     python3.pkgs.pygobject3
     gobject-introspection # for finding typelibs
diff --git a/tests/basic.py b/tests/basic.py
index 3f88ec23..fe6b0019 100644
--- a/tests/basic.py
+++ b/tests/basic.py
@@ -2,11 +2,17 @@
 
 from gi.repository import Gio, GLib
 from pathlib import Path
-from testutil import test_file_roller
+from testutil import test_file_roller, mocked_portal_open_uri
+from time import sleep
+
+
+with test_file_roller() as (app, app_tree), \
+    mocked_portal_open_uri(Gio.bus_get_sync(Gio.BusType.SESSION)) as open_uri_portal:
 
-with test_file_roller() as (app, app_tree):
     app.open_file(Path(__file__).parent / "data" / "texts.tar.gz")
 
+    open_uri_portal.AddMethod("", "OpenFile", "", "", "")
+
     win = app_tree.get_window("texts.tar.gz")
 
     file_listing = win.child(roleName="table")
@@ -23,3 +29,5 @@ with test_file_roller() as (app, app_tree):
 
     hello_txt = file_listing.child("hello.txt")
     hello_txt.doubleClick()
+
+    sleep(5)
diff --git a/tests/meson.build b/tests/meson.build
index e1b6c238..3f10182c 100644
--- a/tests/meson.build
+++ b/tests/meson.build
@@ -3,10 +3,11 @@ pymod = import('python')
 python3_for_dogtail_tests = pymod.find_installation(
   'python3',
   modules: [
+    'dbus',
+    'dbusmock',
     'dogtail',
-    'pyatspi',
     'gi',
-    'gi.repository.Gtk',
+    'pyatspi',
   ],
   required: get_option('dogtail'),
 )
@@ -30,6 +31,7 @@ if enable_dogtail_tests
     ],
     env: test_env,
     depends: [
+      fr_exe,
       compiled_schemas,
     ],
   )
diff --git a/tests/testutil.py b/tests/testutil.py
index 0579ab02..00128c74 100644
--- a/tests/testutil.py
+++ b/tests/testutil.py
@@ -5,16 +5,169 @@ from dogtail.utils import isA11yEnabled, enableA11y
 if not isA11yEnabled():
     enableA11y(True)
 
-from contextlib import contextmanager
-from enum import Enum
+from contextlib import AbstractContextManager, contextmanager
 from dogtail import tree
 from dogtail.predicate import Predicate
+from enum import Enum
+import errno
+import os
 from pathlib import Path
+import signal
+import subprocess
 from tempfile import TemporaryDirectory
 from textwrap import dedent
+import time
+from typing import Tuple
+
+# import dbus
+# from dbus.mainloop.glib import DBusGMainLoop
+
+
+# @contextmanager
+# def mocked_bus(system_bus: bool = False) -> dbus.Bus:
+#     try:
+#         case = dbusmock.DBusTestCase()
+#         case.start_session_bus()
+#         DBusGMainLoop(set_as_default=True)
+#         dbus.bus.BusConnection(os.environ['DBUS_SESSION_BUS_ADDRESS'])
+#         yield case, case.get_dbus(system_bus=system_bus)
+#     finally:
+#         case.tearDown()
+
+
+# @contextmanager
+# def mocked_portal_open_uri(bus: dbus.Bus):
+#     try:
+#         p_mock = bus.spawn_server(
+#             "org.freedesktop.portal.Desktop",
+#             "/org/freedesktop/portal/desktop",
+#             "org.freedesktop.portal.OpenURI",
+#             # stdout=subprocess.PIPE,
+#         )
+
+#         # Get a proxy for the Desktop object's Mock interface
+#         desktop_portal_mock = dbus.Interface(
+#             bus.get_object(
+#                 "org.freedesktop.portal.Desktop", "/org/freedesktop/portal/desktop"
+#             ),
+#             dbusmock.MOCK_IFACE,
+#         )
+
+#         yield desktop_portal_mock
+
+#     finally:
+#         # p_mock.stdout.close()
+#         p_mock.terminate()
+#         p_mock.wait()
+
+#     def test_suspend_on_idle(self):
+#         # run your program in a way that should trigger one suspend call
+
+#         # now check the log that we got one OpenFile() call
+#         self.assertRegex(self.p_mock.stdout.readline(), b"^[0-9.]+ Suspend$")
+
+
+def start_dbus_session() -> Tuple[int, str]:
+    """Start a D-Bus daemon
+    Return (pid, address) pair.
+    Normally you do not need to call this directly. Use start_system_bus()
+    and start_session_bus() instead.
+    """
+    argv = [
+        "dbus-daemon",
+        "--fork",
+        "--print-address=1",
+        "--print-pid=1",
+        "--session",
+    ]
+    lines = subprocess.check_output(argv, universal_newlines=True).strip().splitlines()
+    assert len(lines) == 2, "expected exactly 2 lines of output from dbus-daemon"
+    # usually the first line is the address, but be lenient and accept any order
+    try:
+        return (int(lines[1]), lines[0])
+    except ValueError:
+        return (int(lines[0]), lines[1])
+
+
+def stop_dbus(pid: int) -> None:
+    """Stop a D-Bus daemon
+    Normally you do not need to call this directly. When you use
+    start_system_bus() and start_session_bus(), these buses are
+    automatically stopped in tearDownClass().
+    """
+    signal.signal(signal.SIGTERM, signal.SIG_IGN)
+    for _ in range(50):
+        try:
+            os.kill(pid, signal.SIGTERM)
+        except OSError as e:
+            if e.errno == errno.ESRCH:
+                break
+            raise
+        time.sleep(0.1)
+    else:
+        sys.stderr.write("ERROR: timed out waiting for bus process to terminate\n")
+        os.kill(pid, signal.SIGKILL)
+        time.sleep(0.5)
+    signal.signal(signal.SIGTERM, signal.SIG_DFL)
 
-import os
-import subprocess
+
+@contextmanager
+def session_bus_for_testing() -> AbstractContextManager[Gio.DBusConnection]:
+    original_bus_address = os.environ.get("DBUS_SESSION_BUS_ADDRESS", None)
+
+    pid, bus_address = start_dbus_session()
+    os.environ["DBUS_SESSION_BUS_ADDRESS"] = bus_address
+    print(bus_address)
+
+    try:
+        # bus = Gio.bus_get_sync(Gio.BusType.SESSION)
+        bus = Gio.DBusConnection.new_for_address_sync(bus_address, Gio.DBusConnectionFlags.NONE)
+
+        yield bus
+    finally:
+        stop_dbus(pid)
+
+        if original_bus_address is not None:
+            os.environ["DBUS_SESSION_BUS_ADDRESS"] = original_bus_address
+        else:
+            os.environ.pop("DBUS_SESSION_BUS_ADDRESS")
+
+
+@contextmanager
+def mocked_portal_open_uri(bus: Gio.DBusConnection) -> AbstractContextManager[Gio.DBusConnection]:
+    with open(Path(__file__).parent / "org.freedesktop.portal.OpenURI.xml") as introspection_xml:
+        introspection_data = Gio.dbus_node_info_new_for_xml(introspection_xml.read())
+        assert introspection_data is not None
+
+        def on_bus_acquired():
+            print("on_bus_acquired")
+            pass
+
+        def on_method_called():
+            print("on_method_called")
+            pass
+
+        try:
+            owner_id = Gio.bus_own_name_on_connection(
+                bus,
+                "org.freedesktop.portal.Desktop",
+                Gio.BusNameOwnerFlags.NONE,
+                on_bus_acquired,
+            )
+            print(2)
+
+            registration_id = Gio.dbus_connection_register_object(
+                connection,
+                "/org/freedesktop/portal/desktop",
+                introspection_data.get_interfaces()[0],
+                on_method_called,
+            )
+            assert registration_id > 0
+            print(3)
+            yield owner_id
+
+        finally:
+            Gio.bus_unown_name(owner_id)
 
 
 class WaitState(Enum):
@@ -118,41 +271,56 @@ class FileRollerDogtail:
 
     def get_window(self, name: str) -> tree.Window:
         # Cannot use default window finder because the window name has extra space after the file name.
-        return self.get_application().findChild(
-            IsAWindowApproximatelyNamed(name), False
-        )
+        return self.get_application().findChild(IsAWindowApproximatelyNamed(name), False)
+
+
+@contextmanager
+def new_user_config_dir() -> AbstractContextManager[Path]:
+    with TemporaryDirectory() as confdir:
+        os.environ["XDG_CONFIG_HOME"] = confdir
+        yield Path(confdir)
+
+
+@contextmanager
+def new_user_data_dir() -> AbstractContextManager[Path]:
+    with TemporaryDirectory() as datadir:
+        os.environ["XDG_DATA_HOME"] = datadir
+        yield Path(datadir)
 
 
 @contextmanager
 def test_file_roller():
-    try:
-        dbus_app = None
-        _bus = Gio.bus_get_sync(Gio.BusType.SESSION)
-        with TemporaryDirectory() as confdir, TemporaryDirectory() as datadir:
-            os.environ["XDG_CONFIG_HOME"] = confdir
-            os.environ["XDG_DATA_HOME"] = datadir
-
-            desktop_file_path = Path(datadir) / "applications" / "dummy-editor.desktop"
-            desktop_file_path.parent.mkdir(parents=True, exist_ok=True)
-            with open(desktop_file_path, "w") as desktop_file:
-                desktop_file.write(
-                    dedent(
-                        """[Desktop Entry]
-                        Exec=sh -c 'exec 3< "$1"; gdbus call --session --dest org.freedesktop.portal.Desktop 
--object-path /org/freedesktop/portal/desktop --method org.freedesktop.portal.OpenURI.OpenFile --timeout 5 0 
"3" "{}"' portal-open %u
-                        MimeType=text/plain;
-                        Terminal=false
-                        Type=Application
-                        """
-                    )
+    # Use custom user config and data directories to avoid changing user’s MIME associations.
+    with session_bus_for_testing() as bus, \
+        new_user_config_dir() as confdir, \
+        new_user_data_dir() as datadir:
+
+        # Redirect non-portal based gtk_show_uri_on_window to portal so that we can capture it by the mocked 
portal.
+        desktop_file_path = datadir / "applications" / "dummy-editor.desktop"
+        desktop_file_path.parent.mkdir(parents=True, exist_ok=True)
+        with open(desktop_file_path, "w") as desktop_file:
+            desktop_file.write(
+                dedent(
+                    """[Desktop Entry]
+                    Exec=sh -c 'exec 3< "$1"; gdbus call --session --dest org.freedesktop.portal.Desktop 
--object-path /org/freedesktop/portal/desktop --method org.freedesktop.portal.OpenURI.OpenFile --timeout 5 0 
"3" "{}"' portal-open %u
+                    MimeType=text/plain;
+                    Terminal=false
+                    Type=Application
+                    """
                 )
+            )
+        desktop = Gio.DesktopAppInfo.new_from_filename(str(desktop_file_path))
+        desktop.set_as_default_for_type("text/plain")
+
+        try:
+            # Start the tested app.
+            dbus_app = FileRollerDbus(bus)
 
-            desktop = Gio.DesktopAppInfo.new_from_filename(str(desktop_file_path))
-            desktop.set_as_default_for_type("text/plain")
+            assert wait_for_name(bus, dbus_app.APPLICATION_ID), f"Waiting for {dbus_app.APPLICATION_ID} 
should not time out."
+            print(1)
 
-            dbus_app = FileRollerDbus(_bus)
-            assert wait_for_name(_bus, dbus_app.APPLICATION_ID), f"Waiting for {dbus_app.APPLICATION_ID} 
should not time out."
             dogtail_app = FileRollerDogtail("file-roller")
+
             yield dbus_app, dogtail_app
-    finally:
-        if dbus_app:
+        finally:
             dbus_app.quit()


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]