[gtk/headless-tests] Try to use headless mutter for some tests




commit 48676f7551edb240c31d386378a1587bd0c32f10
Author: Matthias Clasen <mclasen redhat com>
Date:   Sat Mar 20 16:23:36 2021 -0400

    Try to use headless mutter for some tests
    
    This is not very successfull yet.

 tests/headless-tests.py     | 207 ++++++++++++++++++++++++++++++++++++++++++++
 tests/run-headless-tests.sh |  17 ++++
 2 files changed, 224 insertions(+)
---
diff --git a/tests/headless-tests.py b/tests/headless-tests.py
new file mode 100644
index 0000000000..a8e756cb88
--- /dev/null
+++ b/tests/headless-tests.py
@@ -0,0 +1,207 @@
+import subprocess
+import gi
+
+gi.require_version('Gst', '1.0')
+gi.require_version('Gdk', '4.0')
+gi.require_version('Gtk', '4.0')
+
+from gi.repository import GLib, Gst, Gdk
+from pydbus import SessionBus
+
+Gst.init(None)
+
+screen_cast = None
+monitors = {}
+done = False
+
+def terminate():
+    print("terminating")
+    if session != None:
+        session.Stop()
+
+def stream_added_closure(name):
+    def stream_added(node_id):
+        print(f'Stream added, node id: {node_id}')
+
+        monitor = monitors[name];
+
+        freq = monitor['freq'];
+        width = monitor['width'];
+        height = monitor['height'];
+
+        pipeline_desc = f'pipewiresrc path={node_id} ! 
video/x-raw,max-framerate={freq}/1,width={width},height={height} ! videoconvert ! glimagesink'
+
+        pipeline = Gst.parse_launch(pipeline_desc)
+        pipeline.set_state(Gst.State.PLAYING)
+
+        monitor['pipeline'] = pipeline
+
+    return stream_added
+
+def stream_added(node_id):
+    print(f'stream added, node id: {node_id}')
+
+def add_monitor(name, width, height, freq):
+    print(f'adding monitor "{name}": {width}x{height}, {freq}Hz')
+    session_path = screen_cast.CreateSession({})
+    print(f'Session {session_path}')
+    session = bus.get('org.gnome.Mutter.ScreenCast', session_path)
+    monitors[name] = {
+        "session": session,
+        "width": width,
+        "height": height,
+        "freq": freq
+    }
+    stream_path = session.RecordVirtual({})
+    print(f'Stream path {stream_path}')
+    stream = bus.get('org.gnome.Mutter.ScreenCast', stream_path)
+    stream.onPipeWireStreamAdded = stream_added_closure(name)
+    session.Start()
+
+def remove_monitor(name):
+    print(f'removing monitor "{name}"')
+    monitor = monitors[name];
+    if monitor == None:
+        print("monitor not found")
+    else:
+        try:
+            pipeline = monitor['pipeline']
+            pipeline.send_event(Gst.Event.new_eos())
+            pipeline.set_state(Gst.State.NULL)
+
+            session = monitor['session']
+            session.Stop()
+        except KeyError:
+            print("no session")
+        monitors[name] = None
+
+def quit_cb(loop):
+    loop.quit()
+
+def wait(millis):
+    loop = GLib.MainLoop()
+    GLib.timeout_add(millis, quit_cb, loop)
+    loop.run()
+
+def report_monitor(monitor):
+    manufacturer = monitor.get_manufacturer()
+    model = monitor.get_model()
+    geometry = monitor.get_geometry()
+    scale = monitor.get_scale_factor()
+    freq = monitor.get_refresh_rate()
+    width = monitor.get_width_mm()
+    height = monitor.get_height_mm()
+    connector = monitor.get_connector()
+    print(f'  manufacturer: {manufacturer}')
+    print(f'  model: {model}')
+    print(f'  geometry: {geometry.x} {geometry.y} {geometry.width} {geometry.height}')
+    print(f'  scale {scale}')
+    print(f'  frequency {freq}')
+    print(f'  size {width} x {height}')
+    print(f'  connector {connector}')
+
+def monitor_changed(monitor, pspec):
+    print('monitor changed')
+    if pspec.name == 'manufacturer':
+        manufacturer = monitor.get_manufacturer()
+        print(f'  manufacturer: {manufacturer}')
+    elif pspec.name == 'model':
+        model = monitor.get_model()
+        print(f'  model: {model}')
+    elif pspec.name == 'geometry':
+        geometry = monitor.get_geometry()
+        print(f'  geometry: {geometry.x} {geometry.y} {geometry.width} {geometry.height}')
+    elif pspec.name == 'scale-factor':
+        scale = monitor.get_scale_factor()
+        print(f'  scale {scale}')
+    elif pspec.name == 'refresh-rate':
+        freq = monitor.get_refresh_rate()
+        print(f'  frequency {freq}')
+    elif pspec.name == 'width-mm' or pspec.name == 'height-mm':
+        width = monitor.get_width_mm()
+        height = monitor.get_height_mm()
+        print(f'  size {width} x {height}')
+    elif pspec.name == 'connector':
+        connector = monitor.get_connector()
+        print(f'  connector {connector}')
+
+expected_change = None
+
+def monitors_changed(monitors, position, removed, added):
+    global expected_change
+
+    assert expected_change != None, "No change expected"
+    assert position == expected_change['position'], "Unexpected position in monitors-changed"
+    assert removed == expected_change['removed'], "Unexpected removed in monitors-changed"
+    assert added == expected_change['added'], "Unexpected added in monitors-changed"
+
+    expected_change = None
+
+    print(f'monitors changed: {position}, {removed}, {added}')
+    for i in range(position, position + added):
+        print(f'new monitor {i}:')
+        monitor = monitors.get_item(i)
+        report_monitor(monitor)
+        monitor.connect('notify', monitor_changed)
+
+def launch_observer():
+    Gdk.set_allowed_backends('wayland')
+    display = Gdk.Display.open('gtk-test')
+
+    monitors = display.get_monitors()
+    if monitors.get_n_items() > 0:
+        print('initial monitors')
+        for i in range(0, monitors.get_n_items()):
+            monitor = monitors.get_item(i)
+            report_monitor(monitor)
+            monitor.connect('notify', monitor_changed)
+    monitors.connect('items-changed', monitors_changed)
+
+def expect_monitors_changed(position, removed, added, timeout):
+    print("setting expected change")
+    global expected_change
+    expected_change = {
+        'position' : position,
+        'removed' : removed,
+        'added' : added
+    }
+    wait(timeout)
+    assert expected_change == None, "Expected change did not happen"
+
+def run_commands():
+    launch_observer()
+    wait(500)
+    add_monitor("0", 100, 100, 60)
+    expect_monitors_changed(0, 0, 1, 500)
+    add_monitor("1", 1024, 768, 144)
+    expect_monitors_changed(1, 0, 1, 500)
+    remove_monitor("0")
+    expect_monitors_changed(0, 1, 0, 11000)
+    remove_monitor("1")
+    expect_monitors_changed(0, 1, 0, 11000)
+
+def mutter_appeared(name):
+    print("mutter entered the bus")
+    global screen_cast
+    global done
+
+    screen_cast = bus.get('org.gnome.Mutter.ScreenCast',
+                          '/org/gnome/Mutter/ScreenCast')
+    run_commands()
+    done = True
+
+def mutter_vanished():
+    if screen_cast == None:
+        print("mutter is not on the bus. Waiting...")
+    else:
+        print("mutter left the bus")
+        terminate()
+
+bus = SessionBus()
+bus.watch_name('org.gnome.Mutter.ScreenCast', 0, mutter_appeared, mutter_vanished)
+
+try:
+    while not done:
+      GLib.MainContext.default().iteration(True)
+except KeyboardInterrupt:
+    print('Interrupted')
diff --git a/tests/run-headless-tests.sh b/tests/run-headless-tests.sh
new file mode 100755
index 0000000000..6cf625c713
--- /dev/null
+++ b/tests/run-headless-tests.sh
@@ -0,0 +1,17 @@
+#! /bin/sh
+
+dbus-run-session sh <<EOF
+
+# echo DBUS_SESSION_BUS_ADDRESS=\$DBUS_SESSION_BUS_ADDRESS
+# echo WAYLAND_DISPLAY=gtk-test
+
+mutter --headless --wayland-display gtk-test >&mutter.log &
+
+export WAYLAND_DISPLAY=gtk-test
+export GDK_BACKEND=wayland
+# export WAYLAND_DEBUG=1
+# export LD_PRELOAD=`pwd`/build/gtk/libgtk-4.so
+
+python tests/headless-tests.py
+
+EOF


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]