[pygobject/benzea/gio-asyncio] events: Implement asyncio event loop based on glib
- From: Benjamin Berg <bberg src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [pygobject/benzea/gio-asyncio] events: Implement asyncio event loop based on glib
- Date: Thu, 28 Oct 2021 10:43:28 +0000 (UTC)
commit 7acd0b0c1b7fbf9a1a02880a052f2f87e4c820b9
Author: Benjamin Berg <bberg redhat com>
Date: Wed Oct 27 15:01:21 2021 +0200
events: Implement asyncio event loop based on glib
gi/events.py | 499 +++++++++++++++++++++++++++++++++++++++++++++++++++
gi/meson.build | 1 +
tests/test_events.py | 114 ++++++++++++
3 files changed, 614 insertions(+)
---
diff --git a/gi/events.py b/gi/events.py
new file mode 100644
index 00000000..82cf5e10
--- /dev/null
+++ b/gi/events.py
@@ -0,0 +1,499 @@
+# -*- Mode: Python; py-indent-offset: 4 -*-
+# pygobject - Python bindings for the GObject library
+# Copyright (C) 2021 Benjamin Berg <bberg redhat com
+# Copyright (C) 2019 James Henstridge <james jamesh id au>
+#
+# gi/asyncio.py: GObject asyncio integration
+#
+# This library is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This library is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this library; if not, see <http://www.gnu.org/licenses/>.
+
+__all__ = ['EventLoopPolicy', 'EventLoop', 'ChildWatcher']
+
+import os
+import sys
+import asyncio
+from asyncio import events
+from asyncio import coroutines
+import signal
+import threading
+import selectors
+from . import _ossighelper
+
+from gi.repository import GLib
+
+try:
+ g_main_loop_run = super(GLib.MainLoop, GLib.MainLoop).run
+except AttributeError:
+ g_main_loop_run = GLib.MainLoop.run
+
+
+class EventLoop(asyncio.SelectorEventLoop):
+ """An asyncio event loop that runs the python mainloop inside GLib.
+
+ Based on the asyncio.SelectorEventLoop"""
+
+ # This is based on the selector event loop, but never actually runs select()
+ # in the strict sense.
+ # We use the selector to register all FDs with the main context using our
+ # own GSource. For python timeouts/idle equivalent, we directly query them
+ # from the context by providing the _get_timeout_ms function that the
+ # GSource uses. This in turn accesses _ready and _scheduled to calculate
+ # the timeout and whether python can dispatch anything non-FD based yet.
+ #
+ # To simplify matters, we call the normal _run_once method of the base
+ # class which will call select(). As we know that we are ready at the time
+ # that select() will return immediately with the FD information we have
+ # gathered already.
+ #
+ # With that, we just need to override and slightly modify the run_forever
+ # method so that it calls g_main_loop_run instead of looping _run_once.
+
+ def __init__(self, main_context):
+ # A mainloop in case we want to run our context
+ assert main_context is not None
+ self._context = main_context
+ self._main_loop = GLib.MainLoop.new(self._context, False)
+
+ # _UnixSelectorEventLoop uses _signal_handlers, we could do the same,
+ # with the difference that close() would clean up the handlers for us.
+ self.__signal_handlers = {}
+
+ selector = _Selector(self._context, self)
+ super().__init__(selector)
+
+ # Used by run_once to not busy loop if the timeout is floor'ed to zero
+ self._clock_resolution = 1e-3
+
+ def run_forever(self):
+ assert self._context.acquire()
+
+ self._check_closed()
+ try:
+ # New in 3.7
+ self._set_coroutine_origin_tracking(self._debug)
+ # New in 3.8
+ self. _check_running()
+ except AttributeError:
+ pass
+ self._thread_id = threading.get_ident()
+
+ old_agen_hooks = sys.get_asyncgen_hooks()
+ sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook,
+ finalizer=self._asyncgen_finalizer_hook)
+ try:
+ events._set_running_loop(self)
+ g_main_loop_run(self._main_loop)
+ finally:
+ self._context.release()
+ self._thread_id = None
+ events._set_running_loop(None)
+ try:
+ self._set_coroutine_origin_tracking(False)
+ except AttributeError:
+ pass
+ sys.set_asyncgen_hooks(*old_agen_hooks)
+
+ def time(self):
+ return GLib.get_monotonic_time() / 1000000
+
+ def _get_timeout_ms(self):
+ if self._ready:
+ return 0
+
+ if self._scheduled:
+ timeout = (self._scheduled[0]._when - self.time()) * 1000
+ return timeout if timeout >= 0 else 0
+
+ return -1
+
+ def stop(self):
+ # Simply quit the mainloop
+ self._main_loop.quit()
+
+ def close(self):
+ super().close()
+ for s in list(self.__signal_handlers):
+ self.remove_signal_handler(s)
+
+ if sys.platform != "win32":
+ def add_signal_handler(self, sig, callback, *args):
+ """Add a handler for UNIX signal"""
+
+ if (coroutines.iscoroutine(callback) or
+ coroutines.iscoroutinefunction(callback)):
+ raise TypeError("coroutines cannot be used "
+ "with add_signal_handler()")
+ self._check_closed()
+
+ # Can be useful while testing failures
+ # assert sig != signal.SIGALRM
+
+ if sig not in {signal.SIGHUP, signal.SIGINT, signal.SIGTERM, signal.SIGUSR1, signal.SIGUSR2,
signal.SIGWINCH}:
+ return super().add_signal_handler(sig, callback, *args)
+
+ # Pure python demands that there is only one signal handler
+ source, _, _ = self.__signal_handlers.get(sig, (None, None, None))
+ if source:
+ source.destroy()
+
+ source = GLib.unix_signal_source_new(sig)
+
+ source.set_name(f"asyncio signal watch for {sig}")
+ source.attach(GLib.MainContext.get_thread_default())
+ source.set_callback(self._signal_cb, sig)
+
+ self.__signal_handlers[sig] = (source, callback, args)
+ del source
+
+ def remove_signal_handler(self, sig):
+ if sig not in {signal.SIGHUP, signal.SIGINT, signal.SIGTERM, signal.SIGUSR1, signal.SIGUSR2,
signal.SIGWINCH}:
+ return super().remove_signal_handler(sig)
+
+ try:
+ source, _, _ = self.__signal_handlers[sig]
+ del self.__signal_handlers[sig]
+ # Really unref the underlying GSource so that GLib resets the signal handler
+ source.destroy()
+ source._clear_boxed()
+
+ # GLib does not restore the original signal handler.
+ # Try to restore the python handler for SIGINT, this makes
+ # Ctrl+C work after the mainloop has quit.
+ if sig == signal.SIGINT and _ossighelper.PyOS_getsig(signal.SIGINT) == 0:
+ if _ossighelper.startup_sigint_ptr > 0:
+ _ossighelper.PyOS_setsig(signal.SIGINT, _ossighelper.startup_sigint_ptr)
+
+ return True
+ except KeyError:
+ return False
+
+ def _signal_cb(self, sig):
+ source, cb, args = self.__signal_handlers.get(sig)
+
+ # Pass over to python mainloop
+ self.call_soon(cb, *args)
+
+
+if sys.platform != "win32":
+ # Note: We could support windows
+ class ChildWatcher(asyncio.AbstractChildWatcher):
+ """An asyncio child watcher based on GLib.
+
+ This is similar to asyncio.MultiLoopChildWatcher using the GLib child
+ watching functionality.
+ """
+ def __init__(self):
+ super().__init__()
+ self._lock = threading.Lock()
+ self._sources = {}
+
+ def add_child_handler(self, pid, callback, *args):
+ # Python requires that calling this multiple times replaces any
+ # previous handler. As such, we need to keep track of what we are
+ # watching.
+ with self._lock:
+ source, _, _ = self._sources.get(pid, (None, None, None))
+ if source:
+ source.destroy()
+ source = GLib.child_watch_source_new(pid)
+ source.set_name(f"asyncio child watch for pid {pid}")
+ source.attach(GLib.MainContext.get_thread_default())
+
+ source.set_callback(self._child_watch_func)
+
+ self._sources[pid] = (source, callback, args)
+
+ def remove_child_handler(self, pid):
+ with self.lock:
+ try:
+ source = self._sources.pop(pid)
+ source.destroy()
+ return True
+ except KeyError:
+ return False
+
+ def attach_loop(self, loop):
+ # Nothing needs to be done to attach a loop
+ pass
+
+ def close(self):
+ for s, _, _ in self._sources.values():
+ s.destroy()
+ self._sources.clear()
+
+ def is_active(self):
+ # Assume that GLib owns SIGCHLD.
+ return True
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, a, b, c):
+ return
+
+ def _child_watch_func(self, pid, status):
+ with self._lock:
+ source, cb, args = self._sources[pid]
+
+ returncode = os.waitstatus_to_exitcode(status)
+
+ # Pass over to python mainloop
+ loop = asyncio.get_event_loop()
+ loop.call_soon(cb, pid, returncode, *args)
+
+ source.destroy()
+ del self._sources[pid]
+
+
+class EventLoopPolicy(events.AbstractEventLoopPolicy):
+ """An asyncio event loop policy that runs the GLib main loop.
+
+ The policy allows creating a new EventLoop for threads other than the main
+ thread. For the main thread, you can use get_event_loop() to retrieve the
+ correct mainloop and run it.
+
+ Note that, unlike GLib, python does not support running the EventLoop
+ recursively. You should never iterate the GLib.MainContext from within
+ the python EventLoop as doing so prevents asyncio events from being
+ dispatched.
+
+ As such, do not use API such as GLib.MainLoop.run or Gtk.Dialog.run.
+ Instead use the proper asynchronous patterns to prevent entirely blocking
+ asyncio.
+ """
+
+ _loops = {}
+ _child_watcher = None
+
+ def get_event_loop(self):
+ """Get the event loop for the current context.
+
+ Returns an event loop object for the thread default GLib.MainContext
+ or in case of the main thread for the default GLib.MainContext.
+
+ An exception will be thrown if there is no GLib.MainContext for the
+ current thread. In that case, using new_event_loop() will create a new
+ main context and main loop which can subsequently attached to the thread
+ by calling set_event_loop().
+
+ Returns a new EventLoop or raises an exception."""
+ # Get the thread default main context
+ ctx = GLib.MainContext.get_thread_default()
+ # If there is none, and we are on the main thread, then use the default context
+ if ctx is None and threading.current_thread() is threading.main_thread():
+ ctx = GLib.MainContext.default()
+
+ # We do not create a main context implicitly;
+ # we create a mainloop for an existing context though
+ if ctx is None:
+ raise RuntimeError('There is no main context set for thread %r.'
+ % threading.current_thread().name)
+
+ # Note: We cannot attach it to ctx, as getting the default will always
+ # return a new python wrapper. But, we can use hash() as that returns
+ # the pointer to the C structure.
+ try:
+ return self._loops[ctx]
+ except KeyError:
+ pass
+
+ self._loops[ctx] = EventLoop(ctx)
+ return self._loops[ctx]
+
+ def set_event_loop(self, loop):
+ """Set the event loop for the current context (python thread) to loop.
+
+ This is not permitted for the main thread or if the thread already
+ has a main context associated with it.
+ """
+
+ # Only accept glib event loops, otherwise things will just mess up
+ assert loop is None or isinstance(loop, EventLoop)
+
+ # Refuse operating on the main thread. Technically we could allow it,
+ # but I can't think of a use-case and doing it is dangerous.
+ # Also, if someone *really* wants they can still push a new the thread
+ # default main context and call get_event_loop().
+ if threading.current_thread() is threading.main_thread():
+ raise RuntimeError('Changing the main loop/context of the main thread is not supported')
+
+ ctx = GLib.MainContext.get_thread_default()
+ if loop is None:
+ # We do permit unsetting the current loop/context
+ old = self._loops.pop(ctx, None)
+ if old:
+ GLib.MainContext.pop_thread_default(ctx)
+ else:
+ # Only allow attaching if the thread has no main context yet
+ if ctx:
+ raise RuntimeError('Thread %r already has a main context'
+ % threading.current_thread().name)
+
+ GLib.MainContext.push_thread_default(loop._context)
+ self._loops[loop._context] = loop
+
+ def new_event_loop(self):
+ """Create and return a new event loop that iterates a new
+ GLib.MainContext."""
+
+ return EventLoop(GLib.MainContext())
+
+ # Child processes handling (Unix only).
+ if sys.platform != "win32":
+ def get_child_watcher(self):
+ "Get the watcher for child processes."
+
+ if self._child_watcher is None:
+ self._child_watcher = ChildWatcher()
+
+ return self._child_watcher
+
+ def set_child_watcher(self, watcher):
+ """Set the watcher for child processes."""
+ raise NotImplementedError
+
+
+def _fileobj_to_fd(fileobj):
+ # Note: SelectorEventloop should only be passing FDs
+ if isinstance(fileobj, int):
+ return fileobj
+ else:
+ return fileobj.fileno()
+
+
+class _Source(GLib.Source):
+ def __init__(self, selector):
+ super().__init__()
+
+ # It is *not* safe to run the *python* part of the mainloop recursively.
+ # (This is the default, but make it explicit.)
+ self.set_can_recurse(False)
+ self.set_name('python asyncio integration')
+
+ self._selector = selector
+ self._loop = selector._loop
+ self._ready = []
+
+ def prepare(self):
+ timeout = self._loop._get_timeout_ms()
+
+ # NOTE: Always return False, FDs are queried in check and the timeout
+ # needs to be rechecked anyway.
+ return False, timeout
+
+ def check(self):
+ ready = []
+
+ for key in self._selector._fd_to_key.values():
+ condition = self.query_unix_fd(key._tag)
+ events = 0
+ if condition & (GLib.IOCondition.IN | GLib.IOCondition.HUP):
+ events |= selectors.EVENT_READ
+ if condition & GLib.IOCondition.OUT:
+ events |= selectors.EVENT_WRITE
+ if events:
+ ready.append((key, events))
+ self._ready = ready
+
+ timeout = self._loop._get_timeout_ms()
+ if timeout == 0:
+ return True
+
+ return bool(ready)
+
+ def dispatch(self, callback, args):
+ # Now, wag the dog by its tail
+ self._dispatching = True
+ try:
+ self._loop._run_once()
+ finally:
+ self._dispatching = False
+
+ return GLib.SOURCE_CONTINUE
+
+ def _get_ready(self):
+ if not self._dispatching:
+ raise RuntimeError("gi.asyncio.Selector.select only works while it is dispatching!")
+
+ ready = self._ready
+ self._ready = []
+ return ready
+
+
+class _SelectorKey(selectors.SelectorKey):
+ # Subclass to attach _tag
+ pass
+
+
+class _Selector(selectors.BaseSelector):
+ """A Selector for gi.asyncio.EventLoop registering python IO with GLib."""
+
+ def __init__(self, context, loop):
+ super().__init__()
+
+ self._context = context
+ self._loop = loop
+ self._dispatching = False
+ self._fd_to_key = {}
+
+ self._source = _Source(self)
+ self._source.attach(loop._context)
+
+ def close(self):
+ self._source.destroy()
+ super().close()
+
+ def register(self, fileobj, events, data=None):
+ if (not events) or (events & ~(selectors.EVENT_READ | selectors.EVENT_WRITE)):
+ raise ValueError("Invalid events: {!r}".format(events))
+
+ fd = _fileobj_to_fd(fileobj)
+ assert fd not in self._fd_to_key
+
+ key = _SelectorKey(fileobj, fd, events, data)
+
+ condition = GLib.IOCondition(0)
+ if events & selectors.EVENT_READ:
+ condition |= GLib.IOCondition.IN | GLib.IOCondition.HUP
+ if events & selectors.EVENT_WRITE:
+ condition |= GLib.IOCondition.OUT
+ key._tag = self._source.add_unix_fd(fd, condition)
+
+ self._fd_to_key[fd] = key
+ return key
+
+ def unregister(self, fileobj):
+ fd = _fileobj_to_fd(fileobj)
+ key = self._fd_to_key[fd]
+
+ self._source.remove_unix_fd(key._tag)
+ del self._fd_to_key[fd]
+
+ return key
+
+ # We could override modify, but it is only slightly when the "events" change.
+
+ def get_key(self, fileobj):
+ fd = _fileobj_to_fd(fileobj)
+ return self._fd_to_key[fd]
+
+ def get_map(self):
+ """Return a mapping of file objects to selector keys."""
+ # Horribly inefficient
+ # It should never be called and exists just to prevent issues if e.g.
+ # python decides to use it for debug purposes.
+ return {k.fileobj: k for k in self._fd_to_key.values()}
+
+ def select(self, timeout=None):
+ return self._source._get_ready()
diff --git a/gi/meson.build b/gi/meson.build
index 7eb3311e..8fb90f67 100644
--- a/gi/meson.build
+++ b/gi/meson.build
@@ -45,6 +45,7 @@ headers = [
install_headers(headers, subdir : 'pygobject-@0@'.format(platform_version))
python_sources = [
+ 'events.py',
'_constants.py',
'docstring.py',
'_error.py',
diff --git a/tests/test_events.py b/tests/test_events.py
new file mode 100644
index 00000000..600128bc
--- /dev/null
+++ b/tests/test_events.py
@@ -0,0 +1,114 @@
+import unittest
+
+try:
+ from test.test_asyncio.test_events import UnixEventLoopTestsMixin
+ from test.test_asyncio.test_subprocess import SubprocessMixin
+except:
+ class UnixEventLoopTestsMixin():
+ def test_unix_event_loop_tests_missing(self):
+ import warnings
+ warnings.warn('UnixEventLoopTestsMixin is unavailable, not running tests!')
+ self.skipTest('UnixEventLoopTestsMixin is unavailable, not running tests!')
+
+ class SubprocessMixin():
+ def test_subprocess_mixin_tests_missing(self):
+ import warnings
+ warnings.warn('SubprocessMixin is unavailable, not running tests!')
+ self.skipTest('SubprocessMixin is unavailable, not running tests!')
+try:
+ from test.test_asyncio import utils as test_utils
+except ImportError:
+ from asyncio import test_utils
+
+import sys
+import gi
+import gi.events
+from gi.repository import GLib
+import threading
+
+
+if sys.platform != 'win32':
+
+ class GLibEventLoopTests(UnixEventLoopTestsMixin, test_utils.TestCase):
+
+ def create_event_loop(self):
+ return gi.events.EventLoop(main_context=GLib.MainContext.default())
+
+ class SubprocessWatcherTests(SubprocessMixin, test_utils.TestCase):
+
+ def setUp(self):
+ super().setUp()
+ policy = gi.events.EventLoopPolicy()
+ self.loop = policy.get_event_loop()
+ watcher = policy.get_child_watcher()
+
+ assert isinstance(watcher, gi.events.ChildWatcher)
+
+
+class GLibEventLoopPolicyTests(unittest.TestCase):
+
+ def create_policy(self):
+ return gi.events.EventLoopPolicy()
+
+ def test_get_event_loop(self):
+ policy = self.create_policy()
+ loop = policy.get_event_loop()
+ self.assertIsInstance(loop, gi.events.EventLoop)
+ self.assertIs(loop, policy.get_event_loop())
+ loop.close()
+
+ def test_new_event_loop(self):
+ policy = self.create_policy()
+ loop = policy.new_event_loop()
+ self.assertIsInstance(loop, gi.events.EventLoop)
+ loop.close()
+
+ # Attaching a loop to the main thread fails
+ with self.assertRaises(RuntimeError):
+ policy.set_event_loop(loop)
+
+ def test_thread_event_loop(self):
+ policy = self.create_policy()
+ loop = policy.new_event_loop()
+
+ res = []
+
+ def thread_func(res):
+ try:
+ # We cannot get an event loop for the current thread
+ with self.assertRaises(RuntimeError):
+ policy.get_event_loop()
+
+ # We can attach our loop
+ policy.set_event_loop(loop)
+ # Now we can get it, and it is the same
+ self.assertIs(policy.get_event_loop(), loop)
+
+ # Simple call_soon test
+ results = []
+
+ def callback(arg1, arg2):
+ results.append((arg1, arg2))
+ loop.stop()
+
+ loop.call_soon(callback, 'hello', 'world')
+ loop.run_forever()
+ self.assertEqual(results, [('hello', 'world')])
+
+ # We can detach it again
+ policy.set_event_loop(None)
+
+ # Which means we have none and get a runtime error
+ with self.assertRaises(RuntimeError):
+ policy.get_event_loop()
+ except:
+ res += sys.exc_info()
+
+ # Initially, the thread has no event loop
+ thread = threading.Thread(target=lambda: thread_func(res))
+ thread.start()
+ thread.join()
+
+ if res:
+ t, v, tb = res
+ raise t(v).with_traceback(tb)
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]