[pygobject/benzea/gio-asyncio] test
- From: Benjamin Berg <bberg src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [pygobject/benzea/gio-asyncio] test
- Date: Wed, 17 Nov 2021 16:12:18 +0000 (UTC)
commit 9f2b2b5cc5c1b882b03fa6b6cbf17583e8b7f7ea
Author: Benjamin Berg <bberg redhat com>
Date: Wed Nov 17 15:48:18 2021 +0100
test
gi/events.py | 18 ++++++++++++++++++
tests/test_events.py | 8 ++++++++
2 files changed, 26 insertions(+)
---
diff --git a/gi/events.py b/gi/events.py
index 189a687b..2df6203a 100644
--- a/gi/events.py
+++ b/gi/events.py
@@ -29,6 +29,7 @@ import threading
import selectors
import weakref
from . import _ossighelper
+import warnings
from gi.repository import GLib
@@ -122,9 +123,24 @@ class GLibEventLoop(asyncio.SelectorEventLoop):
self._main_loop.quit()
def close(self):
+ print("closing", self)
super().close()
for s in list(self.__signal_handlers):
self.remove_signal_handler(s)
+ print("closed", self)
+
+ def __del__(self, _warn=warnings.warn):
+ print('deleting', self)
+ if not self.is_closed():
+ import traceback
+ traceback.print_stack
+ super().__del__(_warn)
+
+ def __repr__(self):
+ return (
+ f'<{self.__class__.__name__} running={self.is_running()} '
+ f'closed={self.is_closed()} debug={self.get_debug()}>' + str(hash(self))
+ )
if sys.platform != "win32":
def add_signal_handler(self, sig, callback, *args):
@@ -393,8 +409,10 @@ class _Selector(selectors.BaseSelector):
self._source.attach(loop._context)
def close(self):
+ print("closing", self)
self._source.destroy()
super().close()
+ print("closed")
def register(self, fileobj, events, data=None):
if (not events) or (events & ~(selectors.EVENT_READ | selectors.EVENT_WRITE)):
diff --git a/tests/test_events.py b/tests/test_events.py
index 8fc4de4d..470f1ebb 100644
--- a/tests/test_events.py
+++ b/tests/test_events.py
@@ -63,6 +63,14 @@ if sys.platform != 'win32':
self.assertEqual(warns, [])
+ def test_popen_error_with_stdin_pipe(self):
+ # Issue #35721: check that newly created socket pair is closed when
+ # Popen fails
+ import subprocess
+ print(self.loop)
+ self.loop.run_until_complete(
+ self._test_popen_error(stdin=subprocess.PIPE))
+
def setUp(self):
super().setUp()
policy = gi.events.GLibEventLoopPolicy()
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]