[pygobject/benzea/gio-asyncio: 38/41] events: Implement asyncio event loop based on glib
- From: Benjamin Berg <bberg src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [pygobject/benzea/gio-asyncio: 38/41] events: Implement asyncio event loop based on glib
- Date: Wed, 24 Aug 2022 14:29:55 +0000 (UTC)
commit 682885550313384951ca659dbd85496155125cf6
Author: Benjamin Berg <bberg redhat com>
Date: Wed Oct 27 15:01:21 2021 +0200
events: Implement asyncio event loop based on glib
gi/events.py | 532 +++++++++++++++++++++++++++++++++++++++++++++++++++
gi/meson.build | 1 +
tests/test_events.py | 151 +++++++++++++++
3 files changed, 684 insertions(+)
---
diff --git a/gi/events.py b/gi/events.py
new file mode 100644
index 00000000..96156331
--- /dev/null
+++ b/gi/events.py
@@ -0,0 +1,532 @@
+# -*- Mode: Python; py-indent-offset: 4 -*-
+# pygobject - Python bindings for the GObject library
+# Copyright (C) 2021 Benjamin Berg <bberg redhat com
+# Copyright (C) 2019 James Henstridge <james jamesh id au>
+#
+# gi/asyncio.py: GObject asyncio integration
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, see <http://www.gnu.org/licenses/>.
+
+__all__ = ['GLibEventLoopPolicy', 'GLibEventLoop']
+
+import sys
+import asyncio
+from asyncio import coroutines
+import signal
+import threading
+import selectors
+import weakref
+import warnings
+from contextlib import contextmanager
+from . import _ossighelper
+
+from gi.repository import GLib
+
+try:
+ g_main_loop_run = super(GLib.MainLoop, GLib.MainLoop).run
+except AttributeError:
+ g_main_loop_run = GLib.MainLoop.run
+
+_GLIB_SIGNALS = {signal.SIGHUP, signal.SIGINT, signal.SIGTERM, signal.SIGUSR1, signal.SIGUSR2,
signal.SIGWINCH}
+
+
+class GLibEventLoop(asyncio.SelectorEventLoop):
+ """An asyncio event loop that runs the python mainloop inside GLib.
+
+ Based on the asyncio.SelectorEventLoop"""
+
+ # This is based on the selector event loop, but never actually runs select()
+ # in the strict sense.
+ # We use the selector to register all FDs with the main context using our
+ # own GSource. For python timeouts/idle equivalent, we directly query them
+ # from the context by providing the _get_timeout_ms function that the
+ # GSource uses. This in turn accesses _ready and _scheduled to calculate
+ # the timeout and whether python can dispatch anything non-FD based yet.
+ #
+ # To simplify matters, we call the normal _run_once method of the base
+ # class which will call select(). As we know that we are ready at the time
+ # that select() will return immediately with the FD information we have
+ # gathered already.
+ #
+ # With that, we just need to override and slightly modify the run_forever
+ # method so that it calls g_main_loop_run instead of looping _run_once.
+
+ def __init__(self, main_context):
+ # A mainloop in case we want to run our context
+ assert main_context is not None
+ self._context = main_context
+ self._main_loop = GLib.MainLoop.new(self._context, False)
+ self._quit_funcs = []
+
+ # _UnixSelectorEventLoop uses _signal_handlers, we could do the same,
+ # with the difference that close() would clean up the handlers for us.
+ self.__signal_handlers = {}
+
+ selector = _Selector(self._context, self)
+ super().__init__(selector)
+
+ # Used by run_once to not busy loop if the timeout is floor'ed to zero
+ self._clock_resolution = 1e-3
+
+ @contextmanager
+ def paused(self):
+ """This context manager ensures the EventLoop is *not* being iterated.
+
+ It purely exist to handle the case where python code iterates the main
+ context more gracefully."""
+ # Nothing to do if we are not running or dispatched by ourselves
+ if not self.is_running() or self._selector._source._dispatching:
+ yield
+ return
+
+ try:
+ self._selector.detach()
+ yield
+ finally:
+ self._selector.attach()
+
+ @contextmanager
+ def running(self, quit_func):
+ """This context manager ensures the EventLoop is marked as running
+ while other API is iterating its main context.
+ The passed quit function is used to stop all recursion levels when
+ stop() is called.
+ """
+ assert self._context.acquire()
+
+ self._quit_funcs.append(quit_func)
+ # Nested main context iteration (by using glib API)
+ if self.is_running():
+ try:
+ yield
+ finally:
+ self._context.release()
+ self._quit_funcs.pop()
+ # Stop recursively
+ if self._stopping:
+ self._quit_funcs[-1]()
+ return
+
+ # Outermost nesting
+ self._check_closed()
+ try:
+ # New in 3.7
+ self._set_coroutine_origin_tracking(self._debug)
+ except AttributeError:
+ pass
+ self._thread_id = threading.get_ident()
+
+ old_agen_hooks = sys.get_asyncgen_hooks()
+ sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook,
+ finalizer=self._asyncgen_finalizer_hook)
+ try:
+ asyncio._set_running_loop(self)
+ assert not self._selector._source._dispatching
+ self._selector.attach()
+ yield
+ finally:
+ self._selector.detach()
+ self._context.release()
+ self._thread_id = None
+ asyncio._set_running_loop(None)
+ try:
+ self._set_coroutine_origin_tracking(False)
+ except AttributeError:
+ pass
+ sys.set_asyncgen_hooks(*old_agen_hooks)
+
+ self._quit_funcs.pop()
+ assert len(self._quit_funcs) == 0
+ self._stopping = False
+
+ def run_forever(self):
+ # NOTE: self._check_running was only added in 3.8 (with a typo in 3.7)
+ if self.is_running():
+ raise RuntimeError('This event loop is already running')
+
+ with _ossighelper.register_sigint_fallback(self._main_loop.quit):
+ with self.running(self._main_loop.quit):
+ g_main_loop_run(self._main_loop)
+
+ def time(self):
+ return GLib.get_monotonic_time() / 1000000
+
+ def _get_timeout_ms(self):
+ if not self.is_running():
+ warnings.warn('GLibEventLoop is iterated without being marked as running. Missing override or
invalid use of existing API!', RuntimeWarning)
+ if self._stopping is True:
+ warnings.warn('GLibEventLoop is not stopping properly. Missing override or invalid use of
existing API!', RuntimeWarning)
+ if self._ready:
+ return 0
+
+ if self._scheduled:
+ # The time is floor'ed here.
+ # Python dispatches everything ready within the next _clock_resolution.
+ timeout = int((self._scheduled[0]._when - self.time()) * 1000)
+ return timeout if timeout >= 0 else 0
+
+ return -1
+
+ def stop(self):
+ # Simply quit the mainloop
+ self._stopping = True
+ if self._quit_funcs:
+ self._quit_funcs[-1]()
+
+ def close(self):
+ super().close()
+ for s in list(self.__signal_handlers):
+ self.remove_signal_handler(s)
+
+ if sys.platform != "win32":
+ def add_signal_handler(self, sig, callback, *args):
+ """Add a handler for UNIX signal"""
+
+ if (coroutines.iscoroutine(callback) or
+ coroutines.iscoroutinefunction(callback)):
+ raise TypeError("coroutines cannot be used "
+ "with add_signal_handler()")
+ self._check_closed()
+
+ # Can be useful while testing failures
+ # assert sig != signal.SIGALRM
+
+ if sig not in _GLIB_SIGNALS:
+ return super().add_signal_handler(sig, callback, *args)
+
+ # Pure python demands that there is only one signal handler
+ source, _, _ = self.__signal_handlers.get(sig, (None, None, None))
+ if source:
+ source.destroy()
+
+ # Setup a new source with a higher priority than our main one
+ source = GLib.unix_signal_source_new(sig)
+ source.set_name(f"asyncio signal watch for {sig}")
+ source.set_priority(GLib.PRIORITY_HIGH)
+ source.attach(self._context)
+ source.set_callback(self._signal_cb, sig)
+
+ self.__signal_handlers[sig] = (source, callback, args)
+ del source
+
+ def remove_signal_handler(self, sig):
+ if sig not in _GLIB_SIGNALS:
+ return super().remove_signal_handler(sig)
+
+ try:
+ source, _, _ = self.__signal_handlers[sig]
+ del self.__signal_handlers[sig]
+ # Really unref the underlying GSource so that GLib resets the signal handler
+ source.destroy()
+ source._clear_boxed()
+
+ # GLib does not restore the original signal handler.
+ # Try to restore the python handler for SIGINT, this makes
+ # Ctrl+C work after the mainloop has quit.
+ if sig == signal.SIGINT and _ossighelper.PyOS_getsig(signal.SIGINT) == 0:
+ if _ossighelper.startup_sigint_ptr > 0:
+ _ossighelper.PyOS_setsig(signal.SIGINT, _ossighelper.startup_sigint_ptr)
+
+ return True
+ except KeyError:
+ return False
+
+ def _signal_cb(self, sig):
+ source, cb, args = self.__signal_handlers.get(sig)
+
+ # Pass over to python mainloop
+ self.call_soon(cb, *args)
+
+ def __repr__(self):
+ return (
+ f'<{self.__class__.__name__} running={self.is_running()} '
+ f'closed={self.is_closed()} debug={self.get_debug()} '
+ f'ctx=0x{hash(self._context):X} loop=0x{hash(self._main_loop):X}>'
+ )
+
+
+class GLibEventLoopPolicy(asyncio.AbstractEventLoopPolicy):
+ """An asyncio event loop policy that runs the GLib main loop.
+
+ The policy allows creating a new EventLoop for threads other than the main
+ thread. For the main thread, you can use get_event_loop() to retrieve the
+ correct mainloop and run it.
+
+ Note that, unlike GLib, python does not support running the EventLoop
+ recursively. You should never iterate the GLib.MainContext from within
+ the python EventLoop as doing so prevents asyncio events from being
+ dispatched.
+
+ As such, do not use API such as GLib.MainLoop.run or Gtk.Dialog.run.
+ Instead use the proper asynchronous patterns to prevent entirely blocking
+ asyncio.
+ """
+
+ def __init__(self):
+ self._loops = {}
+ self._child_watcher = None
+
+ def get_event_loop(self):
+ """Get the event loop for the current context.
+
+ Returns an event loop object for the thread default GLib.MainContext
+ or in case of the main thread for the default GLib.MainContext.
+
+ An exception will be thrown if there is no GLib.MainContext for the
+ current thread. In that case, using new_event_loop() will create a new
+ main context and main loop which can subsequently attached to the thread
+ by calling set_event_loop().
+
+ Returns a new GLibEventLoop or raises an exception."""
+
+ # Get the thread default main context
+ ctx = GLib.MainContext.get_thread_default()
+ # If there is none, and we are on the main thread, then use the default context
+ if ctx is None and threading.current_thread() is threading.main_thread():
+ ctx = GLib.MainContext.default()
+
+ # We do not create a main context implicitly;
+ # we create a mainloop for an existing context though
+ if ctx is None:
+ raise RuntimeError('There is no main context set for thread %r.'
+ % threading.current_thread().name)
+
+ # Note: We cannot attach it to ctx, as getting the default will always
+ # return a new python wrapper. But, we can use hash() as that returns
+ # the pointer to the C structure.
+ try:
+ loop = self._loops[hash(ctx)]
+ if not loop.is_closed():
+ return loop
+ except KeyError:
+ pass
+
+ self._loops[hash(ctx)] = GLibEventLoop(ctx)
+ if self._child_watcher and ctx == GLib.MainContext.default():
+ self._child_watcher.attach_loop(self.get_event_loop())
+ return self._loops[hash(ctx)]
+
+ def set_event_loop(self, loop):
+ """Set the event loop for the current context (python thread) to loop.
+
+ This is only permitted if the thread has no thread default main context
+ with the main thread using the default main context.
+ """
+
+ # Only accept glib event loops, otherwise things will just mess up
+ assert loop is None or isinstance(loop, GLibEventLoop)
+
+ ctx = ctx_td = GLib.MainContext.get_thread_default()
+ if ctx is None and threading.current_thread() is threading.main_thread():
+ ctx = GLib.MainContext.default()
+
+ if loop is None:
+ # We do permit unsetting the current loop/context
+ old = self._loops.pop(hash(ctx), None)
+ if old:
+ if hash(old._context) != hash(ctx):
+ warnings.warn('GMainContext was changed unknowingly by asyncio integration!',
RuntimeWarning)
+ if ctx_td:
+ GLib.MainContext.pop_thread_default(ctx_td)
+ else:
+ # Only allow attaching if the thread has no main context yet
+ if ctx:
+ raise RuntimeError('Thread %r already has a main context, get_event_loop() will create a new
loop if needed'
+ % threading.current_thread().name)
+
+ GLib.MainContext.push_thread_default(loop._context)
+ self._loops[hash(loop._context)] = loop
+
+ def new_event_loop(self):
+ """Create and return a new event loop that iterates a new
+ GLib.MainContext."""
+
+ return GLibEventLoop(GLib.MainContext())
+
+ # NOTE: We do *not* provide a GLib based ChildWatcher implementation!
+ # This is *intentional* and *required*. The issue is that python provides
+ # API which uses wait4() internally. GLib at the same time uses a thread to
+ # handle SIGCHLD signals, which causes a race condition resulting in a
+ # critical warning.
+ # We just provide a reasonable sane child watcher and disallow the user
+ # from choosing one as e.g. MultiLoopChildWatcher is problematic.
+ #
+ # TODO: Use PidfdChildWatcher when available
+ if sys.platform != 'win32':
+ def get_child_watcher(self):
+ if self._child_watcher is None:
+ try:
+ self._child_watcher = asyncio.ThreadedChildWatcher()
+ except AttributeError:
+ # ThreadedChildWatcher is new in 3.7
+ self._child_watcher = asyncio.SafeChildWatcher()
+ if threading.current_thread() is threading.main_thread():
+ self._child_watcher.attach_loop(self.get_event_loop())
+
+ return self._child_watcher
+
+
+def _fileobj_to_fd(fileobj):
+ # Note: SelectorEventloop should only be passing FDs
+ if isinstance(fileobj, int):
+ return fileobj
+ else:
+ return fileobj.fileno()
+
+
+class _Source(GLib.Source):
+ def __init__(self, selector):
+ super().__init__()
+
+ self._dispatching = False
+
+ # It is *not* safe to run the *python* part of the mainloop recursively.
+ # This error must be caught further up in the chain, otherwise the
+ # mainloop will be blocking without an obvious reason.
+ self.set_can_recurse(False)
+ self.set_name('python asyncio integration')
+
+ self._selector = selector
+ # NOTE: Avoid loop -> selector -> source -> loop reference cycle,
+ # we need the source to be destroyed *after* the selector. Otherwise
+ # we need a flag to deal with FDs being unregistered after __del__ has
+ # been called on the source.
+ self._loop = weakref.ref(selector._loop)
+ self._ready = []
+
+ def prepare(self):
+ timeout = self._loop()._get_timeout_ms()
+
+ # NOTE: Always return False, FDs are queried in check and the timeout
+ # needs to be rechecked anyway.
+ return False, timeout
+
+ def check(self):
+ ready = []
+
+ for key in self._selector._fd_to_key.values():
+ condition = self.query_unix_fd(key._tag)
+ events = 0
+ if condition & (GLib.IOCondition.IN | GLib.IOCondition.HUP):
+ events |= selectors.EVENT_READ
+ if condition & GLib.IOCondition.OUT:
+ events |= selectors.EVENT_WRITE
+ if events:
+ ready.append((key, events))
+ self._ready = ready
+
+ timeout = self._loop()._get_timeout_ms()
+ if timeout == 0:
+ return True
+
+ return bool(ready)
+
+ def dispatch(self, callback, args):
+ # Now, wag the dog by its tail
+ self._dispatching = True
+ try:
+ self._loop()._run_once()
+ finally:
+ self._dispatching = False
+
+ return GLib.SOURCE_CONTINUE
+
+ def _get_ready(self):
+ if not self._dispatching:
+ raise RuntimeError("gi.asyncio.Selector.select only works while it is dispatching!")
+
+ ready = self._ready
+ self._ready = []
+ return ready
+
+
+class _SelectorKey(selectors.SelectorKey):
+ # Subclass to attach _tag
+ pass
+
+
+class _Selector(selectors.BaseSelector):
+ """A Selector for gi.events.GLibEventLoop registering python IO with GLib."""
+
+ def __init__(self, context, loop):
+ super().__init__()
+
+ self._context = context
+ self._loop = loop
+ self._fd_to_key = {}
+
+ self._source = _Source(self)
+
+ def close(self):
+ self._source.destroy()
+ super().close()
+
+ def attach(self):
+ self._source.attach(self._loop._context)
+
+ def detach(self):
+ self._source.destroy()
+ self._source = _Source(self)
+ # re-register the keys with the new source
+ for key in self._fd_to_key.values():
+ self._register_key(key)
+
+ def _register_key(self, key):
+ condition = GLib.IOCondition(0)
+ if key.events & selectors.EVENT_READ:
+ condition |= GLib.IOCondition.IN | GLib.IOCondition.HUP
+ if key.events & selectors.EVENT_WRITE:
+ condition |= GLib.IOCondition.OUT
+ key._tag = self._source.add_unix_fd(key.fd, condition)
+
+ def register(self, fileobj, events, data=None):
+ if (not events) or (events & ~(selectors.EVENT_READ | selectors.EVENT_WRITE)):
+ raise ValueError("Invalid events: {!r}".format(events))
+
+ fd = _fileobj_to_fd(fileobj)
+ assert fd not in self._fd_to_key
+
+ key = _SelectorKey(fileobj, fd, events, data)
+
+ self._register_key(key)
+
+ self._fd_to_key[fd] = key
+ return key
+
+ def unregister(self, fileobj):
+ # NOTE: may be called after __del__ has been called.
+ fd = _fileobj_to_fd(fileobj)
+ key = self._fd_to_key[fd]
+
+ if self._source:
+ self._source.remove_unix_fd(key._tag)
+ del self._fd_to_key[fd]
+
+ return key
+
+ # We could override modify, but it is only slightly when the "events" change.
+
+ def get_key(self, fileobj):
+ fd = _fileobj_to_fd(fileobj)
+ return self._fd_to_key[fd]
+
+ def get_map(self):
+ """Return a mapping of file objects to selector keys."""
+ # Horribly inefficient
+ # It should never be called and exists just to prevent issues if e.g.
+ # python decides to use it for debug purposes.
+ return {k.fileobj: k for k in self._fd_to_key.values()}
+
+ def select(self, timeout=None):
+ return self._source._get_ready()
diff --git a/gi/meson.build b/gi/meson.build
index 7eb3311e..8fb90f67 100644
--- a/gi/meson.build
+++ b/gi/meson.build
@@ -45,6 +45,7 @@ headers = [
install_headers(headers, subdir : 'pygobject-@0@'.format(platform_version))
python_sources = [
+ 'events.py',
'_constants.py',
'docstring.py',
'_error.py',
diff --git a/tests/test_events.py b/tests/test_events.py
new file mode 100644
index 00000000..fd067875
--- /dev/null
+++ b/tests/test_events.py
@@ -0,0 +1,151 @@
+import unittest
+
+try:
+ from test.test_asyncio.test_events import UnixEventLoopTestsMixin
+ from test.test_asyncio.test_subprocess import SubprocessMixin
+ from test.test_asyncio.utils import TestCase
+except:
+ class UnixEventLoopTestsMixin():
+ def test_unix_event_loop_tests_missing(self):
+ import warnings
+ warnings.warn('UnixEventLoopTestsMixin is unavailable, not running tests!')
+ self.skipTest('UnixEventLoopTestsMixin is unavailable, not running tests!')
+
+ class SubprocessMixin():
+ def test_subprocess_mixin_tests_missing(self):
+ import warnings
+ warnings.warn('SubprocessMixin is unavailable, not running tests!')
+ self.skipTest('SubprocessMixin is unavailable, not running tests!')
+
+ from unittest import TestCase
+
+import sys
+import gi
+import gi.events
+import asyncio
+import threading
+from gi.repository import GLib
+
+
+# None of this currently works on Windows
+if sys.platform != 'win32':
+
+ class GLibEventLoopTests(UnixEventLoopTestsMixin, TestCase):
+
+ def __init__(self, *args):
+ super().__init__(*args)
+ self.loop = None
+
+ def create_event_loop(self):
+ return gi.events.GLibEventLoop(GLib.MainContext())
+
+ class SubprocessWatcherTests(SubprocessMixin, TestCase):
+
+ def setUp(self):
+ super().setUp()
+ policy = gi.events.GLibEventLoopPolicy()
+ asyncio.set_event_loop_policy(policy)
+ self.loop = policy.get_event_loop()
+
+ def tearDown(self):
+ asyncio.set_event_loop_policy(None)
+ self.loop.close()
+ super().tearDown()
+
+ class GLibEventLoopPolicyTests(unittest.TestCase):
+
+ def create_policy(self):
+ return gi.events.GLibEventLoopPolicy()
+
+ def test_get_event_loop(self):
+ policy = self.create_policy()
+ loop = policy.get_event_loop()
+ self.assertIsInstance(loop, gi.events.GLibEventLoop)
+ self.assertIs(loop, policy.get_event_loop())
+ loop.close()
+
+ def test_new_event_loop(self):
+ policy = self.create_policy()
+ loop = policy.new_event_loop()
+ self.assertIsInstance(loop, gi.events.GLibEventLoop)
+ loop.close()
+
+ # Attaching a loop to the main thread fails
+ with self.assertRaises(RuntimeError):
+ policy.set_event_loop(loop)
+
+ def test_nested_context_iteration(self):
+ policy = self.create_policy()
+ loop = policy.new_event_loop()
+
+ called = False
+
+ def cb():
+ nonlocal called
+ called = True
+
+ async def run():
+ nonlocal loop, called
+
+ loop.call_soon(cb)
+ self.assertEqual(called, False)
+
+ # Iterating the main context does not cause cb to be called
+ while loop._context.iteration(False):
+ pass
+ self.assertEqual(called, False)
+
+ # Awaiting on anything *does* cause the cb to fire
+ await asyncio.sleep(0)
+ self.assertEqual(called, True)
+
+ loop.run_until_complete(run())
+ loop.close()
+
+ def test_thread_event_loop(self):
+ policy = self.create_policy()
+ loop = policy.new_event_loop()
+
+ res = []
+
+ def thread_func(res):
+ try:
+ # We cannot get an event loop for the current thread
+ with self.assertRaises(RuntimeError):
+ policy.get_event_loop()
+
+ # We can attach our loop
+ policy.set_event_loop(loop)
+ # Now we can get it, and it is the same
+ self.assertIs(policy.get_event_loop(), loop)
+
+ # Simple call_soon test
+ results = []
+
+ def callback(arg1, arg2):
+ results.append((arg1, arg2))
+ loop.stop()
+
+ loop.call_soon(callback, 'hello', 'world')
+ loop.run_forever()
+ self.assertEqual(results, [('hello', 'world')])
+
+ # We can detach it again
+ policy.set_event_loop(None)
+
+ # Which means we have none and get a runtime error
+ with self.assertRaises(RuntimeError):
+ policy.get_event_loop()
+ except:
+ res += sys.exc_info()
+
+ # Initially, the thread has no event loop
+ thread = threading.Thread(target=lambda: thread_func(res))
+ thread.start()
+ thread.join()
+
+ if res:
+ t, v, tb = res
+ raise t(v).with_traceback(tb)
+
+ loop.close()
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]