[pygobject/benzea/gio-asyncio] events: Implement asyncio event loop based on glib




commit 7acd0b0c1b7fbf9a1a02880a052f2f87e4c820b9
Author: Benjamin Berg <bberg redhat com>
Date:   Wed Oct 27 15:01:21 2021 +0200

    events: Implement asyncio event loop based on glib

 gi/events.py         | 499 +++++++++++++++++++++++++++++++++++++++++++++++++++
 gi/meson.build       |   1 +
 tests/test_events.py | 114 ++++++++++++
 3 files changed, 614 insertions(+)
---
diff --git a/gi/events.py b/gi/events.py
new file mode 100644
index 00000000..82cf5e10
--- /dev/null
+++ b/gi/events.py
@@ -0,0 +1,499 @@
+# -*- Mode: Python; py-indent-offset: 4 -*-
+# pygobject - Python bindings for the GObject library
+# Copyright (C) 2021 Benjamin Berg <bberg redhat com
+# Copyright (C) 2019 James Henstridge <james jamesh id au>
+#
+#   gi/asyncio.py: GObject asyncio integration
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, see <http://www.gnu.org/licenses/>.
+
+__all__ = ['EventLoopPolicy', 'EventLoop', 'ChildWatcher']
+
+import os
+import sys
+import asyncio
+from asyncio import events
+from asyncio import coroutines
+import signal
+import threading
+import selectors
+from . import _ossighelper
+
+from gi.repository import GLib
+
+try:
+    g_main_loop_run = super(GLib.MainLoop, GLib.MainLoop).run
+except AttributeError:
+    g_main_loop_run = GLib.MainLoop.run
+
+
+class EventLoop(asyncio.SelectorEventLoop):
+    """An asyncio event loop that runs the python mainloop inside GLib.
+
+    Based on the asyncio.SelectorEventLoop"""
+
+    # This is based on the selector event loop, but never actually runs select()
+    # in the strict sense.
+    # We use the selector to register all FDs with the main context using our
+    # own GSource. For python timeouts/idle equivalent, we directly query them
+    # from the context by providing the _get_timeout_ms function that the
+    # GSource uses. This in turn accesses _ready and _scheduled to calculate
+    # the timeout and whether python can dispatch anything non-FD based yet.
+    #
+    # To simplify matters, we call the normal _run_once method of the base
+    # class which will call select(). As we know that we are ready at the time
+    # that select() will return immediately with the FD information we have
+    # gathered already.
+    #
+    # With that, we just need to override and slightly modify the run_forever
+    # method so that it calls g_main_loop_run instead of looping _run_once.
+
+    def __init__(self, main_context):
+        # A mainloop in case we want to run our context
+        assert main_context is not None
+        self._context = main_context
+        self._main_loop = GLib.MainLoop.new(self._context, False)
+
+        # _UnixSelectorEventLoop uses _signal_handlers, we could do the same,
+        # with the difference that close() would clean up the handlers for us.
+        self.__signal_handlers = {}
+
+        selector = _Selector(self._context, self)
+        super().__init__(selector)
+
+        # Used by run_once to not busy loop if the timeout is floor'ed to zero
+        self._clock_resolution = 1e-3
+
+    def run_forever(self):
+        assert self._context.acquire()
+
+        self._check_closed()
+        try:
+            # New in 3.7
+            self._set_coroutine_origin_tracking(self._debug)
+            # New in 3.8
+            self. _check_running()
+        except AttributeError:
+            pass
+        self._thread_id = threading.get_ident()
+
+        old_agen_hooks = sys.get_asyncgen_hooks()
+        sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook,
+                               finalizer=self._asyncgen_finalizer_hook)
+        try:
+            events._set_running_loop(self)
+            g_main_loop_run(self._main_loop)
+        finally:
+            self._context.release()
+            self._thread_id = None
+            events._set_running_loop(None)
+            try:
+                self._set_coroutine_origin_tracking(False)
+            except AttributeError:
+                pass
+            sys.set_asyncgen_hooks(*old_agen_hooks)
+
+    def time(self):
+        return GLib.get_monotonic_time() / 1000000
+
+    def _get_timeout_ms(self):
+        if self._ready:
+            return 0
+
+        if self._scheduled:
+            timeout = (self._scheduled[0]._when - self.time()) * 1000
+            return timeout if timeout >= 0 else 0
+
+        return -1
+
+    def stop(self):
+        # Simply quit the mainloop
+        self._main_loop.quit()
+
+    def close(self):
+        super().close()
+        for s in list(self.__signal_handlers):
+            self.remove_signal_handler(s)
+
+    if sys.platform != "win32":
+        def add_signal_handler(self, sig, callback, *args):
+            """Add a handler for UNIX signal"""
+
+            if (coroutines.iscoroutine(callback) or
+                    coroutines.iscoroutinefunction(callback)):
+                raise TypeError("coroutines cannot be used "
+                                "with add_signal_handler()")
+            self._check_closed()
+
+            # Can be useful while testing failures
+            # assert sig != signal.SIGALRM
+
+            if sig not in {signal.SIGHUP, signal.SIGINT, signal.SIGTERM, signal.SIGUSR1, signal.SIGUSR2, 
signal.SIGWINCH}:
+                return super().add_signal_handler(sig, callback, *args)
+
+            # Pure python demands that there is only one signal handler
+            source, _, _ = self.__signal_handlers.get(sig, (None, None, None))
+            if source:
+                source.destroy()
+
+            source = GLib.unix_signal_source_new(sig)
+
+            source.set_name(f"asyncio signal watch for {sig}")
+            source.attach(GLib.MainContext.get_thread_default())
+            source.set_callback(self._signal_cb, sig)
+
+            self.__signal_handlers[sig] = (source, callback, args)
+            del source
+
+        def remove_signal_handler(self, sig):
+            if sig not in {signal.SIGHUP, signal.SIGINT, signal.SIGTERM, signal.SIGUSR1, signal.SIGUSR2, 
signal.SIGWINCH}:
+                return super().remove_signal_handler(sig)
+
+            try:
+                source, _, _ = self.__signal_handlers[sig]
+                del self.__signal_handlers[sig]
+                # Really unref the underlying GSource so that GLib resets the signal handler
+                source.destroy()
+                source._clear_boxed()
+
+                # GLib does not restore the original signal handler.
+                # Try to restore the python handler for SIGINT, this makes
+                # Ctrl+C work after the mainloop has quit.
+                if sig == signal.SIGINT and _ossighelper.PyOS_getsig(signal.SIGINT) == 0:
+                    if _ossighelper.startup_sigint_ptr > 0:
+                        _ossighelper.PyOS_setsig(signal.SIGINT, _ossighelper.startup_sigint_ptr)
+
+                return True
+            except KeyError:
+                return False
+
+        def _signal_cb(self, sig):
+            source, cb, args = self.__signal_handlers.get(sig)
+
+            # Pass over to python mainloop
+            self.call_soon(cb, *args)
+
+
+if sys.platform != "win32":
+    # Note: We could support windows
+    class ChildWatcher(asyncio.AbstractChildWatcher):
+        """An asyncio child watcher based on GLib.
+
+        This is similar to asyncio.MultiLoopChildWatcher using the GLib child
+        watching functionality.
+        """
+        def __init__(self):
+            super().__init__()
+            self._lock = threading.Lock()
+            self._sources = {}
+
+        def add_child_handler(self, pid, callback, *args):
+            # Python requires that calling this multiple times replaces any
+            # previous handler. As such, we need to keep track of what we are
+            # watching.
+            with self._lock:
+                source, _, _ = self._sources.get(pid, (None, None, None))
+                if source:
+                    source.destroy()
+                source = GLib.child_watch_source_new(pid)
+                source.set_name(f"asyncio child watch for pid {pid}")
+                source.attach(GLib.MainContext.get_thread_default())
+
+                source.set_callback(self._child_watch_func)
+
+                self._sources[pid] = (source, callback, args)
+
+        def remove_child_handler(self, pid):
+            with self.lock:
+                try:
+                    source = self._sources.pop(pid)
+                    source.destroy()
+                    return True
+                except KeyError:
+                    return False
+
+        def attach_loop(self, loop):
+            # Nothing needs to be done to attach a loop
+            pass
+
+        def close(self):
+            for s, _, _ in self._sources.values():
+                s.destroy()
+            self._sources.clear()
+
+        def is_active(self):
+            # Assume that GLib owns SIGCHLD.
+            return True
+
+        def __enter__(self):
+            return self
+
+        def __exit__(self, a, b, c):
+            return
+
+        def _child_watch_func(self, pid, status):
+            with self._lock:
+                source, cb, args = self._sources[pid]
+
+                returncode = os.waitstatus_to_exitcode(status)
+
+                # Pass over to python mainloop
+                loop = asyncio.get_event_loop()
+                loop.call_soon(cb, pid, returncode, *args)
+
+                source.destroy()
+                del self._sources[pid]
+
+
+class EventLoopPolicy(events.AbstractEventLoopPolicy):
+    """An asyncio event loop policy that runs the GLib main loop.
+
+    The policy allows creating a new EventLoop for threads other than the main
+    thread. For the main thread, you can use get_event_loop() to retrieve the
+    correct mainloop and run it.
+
+    Note that, unlike GLib, python does not support running the EventLoop
+    recursively. You should never iterate the GLib.MainContext from within
+    the python EventLoop as doing so prevents asyncio events from being
+    dispatched.
+
+    As such, do not use API such as GLib.MainLoop.run or Gtk.Dialog.run.
+    Instead use the proper asynchronous patterns to prevent entirely blocking
+    asyncio.
+    """
+
+    _loops = {}
+    _child_watcher = None
+
+    def get_event_loop(self):
+        """Get the event loop for the current context.
+
+        Returns an event loop object for the thread default GLib.MainContext
+        or in case of the main thread for the default GLib.MainContext.
+
+        An exception will be thrown if there is no GLib.MainContext for the
+        current thread. In that case, using new_event_loop() will create a new
+        main context and main loop which can subsequently attached to the thread
+        by calling set_event_loop().
+
+        Returns a new EventLoop or raises an exception."""
+        # Get the thread default main context
+        ctx = GLib.MainContext.get_thread_default()
+        # If there is none, and we are on the main thread, then use the default context
+        if ctx is None and threading.current_thread() is threading.main_thread():
+            ctx = GLib.MainContext.default()
+
+        # We do not create a main context implicitly;
+        # we create a mainloop for an existing context though
+        if ctx is None:
+            raise RuntimeError('There is no main context set for thread %r.'
+                               % threading.current_thread().name)
+
+        # Note: We cannot attach it to ctx, as getting the default will always
+        #       return a new python wrapper. But, we can use hash() as that returns
+        #       the pointer to the C structure.
+        try:
+            return self._loops[ctx]
+        except KeyError:
+            pass
+
+        self._loops[ctx] = EventLoop(ctx)
+        return self._loops[ctx]
+
+    def set_event_loop(self, loop):
+        """Set the event loop for the current context (python thread) to loop.
+
+        This is not permitted for the main thread or if the thread already
+        has a main context associated with it.
+        """
+
+        # Only accept glib event loops, otherwise things will just mess up
+        assert loop is None or isinstance(loop, EventLoop)
+
+        # Refuse operating on the main thread. Technically we could allow it,
+        # but I can't think of a use-case and doing it is dangerous.
+        # Also, if someone *really* wants they can still push a new the thread
+        # default main context and call get_event_loop().
+        if threading.current_thread() is threading.main_thread():
+            raise RuntimeError('Changing the main loop/context of the main thread is not supported')
+
+        ctx = GLib.MainContext.get_thread_default()
+        if loop is None:
+            # We do permit unsetting the current loop/context
+            old = self._loops.pop(ctx, None)
+            if old:
+                GLib.MainContext.pop_thread_default(ctx)
+        else:
+            # Only allow attaching if the thread has no main context yet
+            if ctx:
+                raise RuntimeError('Thread %r already has a main context'
+                                   % threading.current_thread().name)
+
+            GLib.MainContext.push_thread_default(loop._context)
+            self._loops[loop._context] = loop
+
+    def new_event_loop(self):
+        """Create and return a new event loop that iterates a new
+        GLib.MainContext."""
+
+        return EventLoop(GLib.MainContext())
+
+    # Child processes handling (Unix only).
+    if sys.platform != "win32":
+        def get_child_watcher(self):
+            "Get the watcher for child processes."
+
+            if self._child_watcher is None:
+                self._child_watcher = ChildWatcher()
+
+            return self._child_watcher
+
+        def set_child_watcher(self, watcher):
+            """Set the watcher for child processes."""
+            raise NotImplementedError
+
+
+def _fileobj_to_fd(fileobj):
+    # Note: SelectorEventloop should only be passing FDs
+    if isinstance(fileobj, int):
+        return fileobj
+    else:
+        return fileobj.fileno()
+
+
+class _Source(GLib.Source):
+    def __init__(self, selector):
+        super().__init__()
+
+        # It is *not* safe to run the *python* part of the mainloop recursively.
+        # (This is the default, but make it explicit.)
+        self.set_can_recurse(False)
+        self.set_name('python asyncio integration')
+
+        self._selector = selector
+        self._loop = selector._loop
+        self._ready = []
+
+    def prepare(self):
+        timeout = self._loop._get_timeout_ms()
+
+        # NOTE: Always return False, FDs are queried in check and the timeout
+        #       needs to be rechecked anyway.
+        return False, timeout
+
+    def check(self):
+        ready = []
+
+        for key in self._selector._fd_to_key.values():
+            condition = self.query_unix_fd(key._tag)
+            events = 0
+            if condition & (GLib.IOCondition.IN | GLib.IOCondition.HUP):
+                events |= selectors.EVENT_READ
+            if condition & GLib.IOCondition.OUT:
+                events |= selectors.EVENT_WRITE
+            if events:
+                ready.append((key, events))
+        self._ready = ready
+
+        timeout = self._loop._get_timeout_ms()
+        if timeout == 0:
+            return True
+
+        return bool(ready)
+
+    def dispatch(self, callback, args):
+        # Now, wag the dog by its tail
+        self._dispatching = True
+        try:
+            self._loop._run_once()
+        finally:
+            self._dispatching = False
+
+        return GLib.SOURCE_CONTINUE
+
+    def _get_ready(self):
+        if not self._dispatching:
+            raise RuntimeError("gi.asyncio.Selector.select only works while it is dispatching!")
+
+        ready = self._ready
+        self._ready = []
+        return ready
+
+
+class _SelectorKey(selectors.SelectorKey):
+    # Subclass to attach _tag
+    pass
+
+
+class _Selector(selectors.BaseSelector):
+    """A Selector for gi.asyncio.EventLoop registering python IO with GLib."""
+
+    def __init__(self, context, loop):
+        super().__init__()
+
+        self._context = context
+        self._loop = loop
+        self._dispatching = False
+        self._fd_to_key = {}
+
+        self._source = _Source(self)
+        self._source.attach(loop._context)
+
+    def close(self):
+        self._source.destroy()
+        super().close()
+
+    def register(self, fileobj, events, data=None):
+        if (not events) or (events & ~(selectors.EVENT_READ | selectors.EVENT_WRITE)):
+            raise ValueError("Invalid events: {!r}".format(events))
+
+        fd = _fileobj_to_fd(fileobj)
+        assert fd not in self._fd_to_key
+
+        key = _SelectorKey(fileobj, fd, events, data)
+
+        condition = GLib.IOCondition(0)
+        if events & selectors.EVENT_READ:
+            condition |= GLib.IOCondition.IN | GLib.IOCondition.HUP
+        if events & selectors.EVENT_WRITE:
+            condition |= GLib.IOCondition.OUT
+        key._tag = self._source.add_unix_fd(fd, condition)
+
+        self._fd_to_key[fd] = key
+        return key
+
+    def unregister(self, fileobj):
+        fd = _fileobj_to_fd(fileobj)
+        key = self._fd_to_key[fd]
+
+        self._source.remove_unix_fd(key._tag)
+        del self._fd_to_key[fd]
+
+        return key
+
+    # We could override modify, but it is only slightly when the "events" change.
+
+    def get_key(self, fileobj):
+        fd = _fileobj_to_fd(fileobj)
+        return self._fd_to_key[fd]
+
+    def get_map(self):
+        """Return a mapping of file objects to selector keys."""
+        # Horribly inefficient
+        # It should never be called and exists just to prevent issues if e.g.
+        # python decides to use it for debug purposes.
+        return {k.fileobj: k for k in self._fd_to_key.values()}
+
+    def select(self, timeout=None):
+        return self._source._get_ready()
diff --git a/gi/meson.build b/gi/meson.build
index 7eb3311e..8fb90f67 100644
--- a/gi/meson.build
+++ b/gi/meson.build
@@ -45,6 +45,7 @@ headers = [
 install_headers(headers, subdir : 'pygobject-@0@'.format(platform_version))
 
 python_sources = [
+  'events.py',
   '_constants.py',
   'docstring.py',
   '_error.py',
diff --git a/tests/test_events.py b/tests/test_events.py
new file mode 100644
index 00000000..600128bc
--- /dev/null
+++ b/tests/test_events.py
@@ -0,0 +1,114 @@
+import unittest
+
+try:
+    from test.test_asyncio.test_events import UnixEventLoopTestsMixin
+    from test.test_asyncio.test_subprocess import SubprocessMixin
+except:
+    class UnixEventLoopTestsMixin():
+        def test_unix_event_loop_tests_missing(self):
+            import warnings
+            warnings.warn('UnixEventLoopTestsMixin is unavailable, not running tests!')
+            self.skipTest('UnixEventLoopTestsMixin is unavailable, not running tests!')
+
+    class SubprocessMixin():
+        def test_subprocess_mixin_tests_missing(self):
+            import warnings
+            warnings.warn('SubprocessMixin is unavailable, not running tests!')
+            self.skipTest('SubprocessMixin is unavailable, not running tests!')
+try:
+    from test.test_asyncio import utils as test_utils
+except ImportError:
+    from asyncio import test_utils
+
+import sys
+import gi
+import gi.events
+from gi.repository import GLib
+import threading
+
+
+if sys.platform != 'win32':
+
+    class GLibEventLoopTests(UnixEventLoopTestsMixin, test_utils.TestCase):
+
+        def create_event_loop(self):
+            return gi.events.EventLoop(main_context=GLib.MainContext.default())
+
+    class SubprocessWatcherTests(SubprocessMixin, test_utils.TestCase):
+
+        def setUp(self):
+            super().setUp()
+            policy = gi.events.EventLoopPolicy()
+            self.loop = policy.get_event_loop()
+            watcher = policy.get_child_watcher()
+
+            assert isinstance(watcher, gi.events.ChildWatcher)
+
+
+class GLibEventLoopPolicyTests(unittest.TestCase):
+
+    def create_policy(self):
+        return gi.events.EventLoopPolicy()
+
+    def test_get_event_loop(self):
+        policy = self.create_policy()
+        loop = policy.get_event_loop()
+        self.assertIsInstance(loop, gi.events.EventLoop)
+        self.assertIs(loop, policy.get_event_loop())
+        loop.close()
+
+    def test_new_event_loop(self):
+        policy = self.create_policy()
+        loop = policy.new_event_loop()
+        self.assertIsInstance(loop, gi.events.EventLoop)
+        loop.close()
+
+        # Attaching a loop to the main thread fails
+        with self.assertRaises(RuntimeError):
+            policy.set_event_loop(loop)
+
+    def test_thread_event_loop(self):
+        policy = self.create_policy()
+        loop = policy.new_event_loop()
+
+        res = []
+
+        def thread_func(res):
+            try:
+                # We cannot get an event loop for the current thread
+                with self.assertRaises(RuntimeError):
+                    policy.get_event_loop()
+
+                # We can attach our loop
+                policy.set_event_loop(loop)
+                # Now we can get it, and it is the same
+                self.assertIs(policy.get_event_loop(), loop)
+
+                # Simple call_soon test
+                results = []
+
+                def callback(arg1, arg2):
+                    results.append((arg1, arg2))
+                    loop.stop()
+
+                loop.call_soon(callback, 'hello', 'world')
+                loop.run_forever()
+                self.assertEqual(results, [('hello', 'world')])
+
+                # We can detach it again
+                policy.set_event_loop(None)
+
+                # Which means we have none and get a runtime error
+                with self.assertRaises(RuntimeError):
+                    policy.get_event_loop()
+            except:
+                res += sys.exc_info()
+
+        # Initially, the thread has no event loop
+        thread = threading.Thread(target=lambda: thread_func(res))
+        thread.start()
+        thread.join()
+
+        if res:
+            t, v, tb = res
+            raise t(v).with_traceback(tb)


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]