[pygobject/benzea/gio-asyncio] test




commit 9f2b2b5cc5c1b882b03fa6b6cbf17583e8b7f7ea
Author: Benjamin Berg <bberg redhat com>
Date:   Wed Nov 17 15:48:18 2021 +0100

    test

 gi/events.py         | 18 ++++++++++++++++++
 tests/test_events.py |  8 ++++++++
 2 files changed, 26 insertions(+)
---
diff --git a/gi/events.py b/gi/events.py
index 189a687b..2df6203a 100644
--- a/gi/events.py
+++ b/gi/events.py
@@ -29,6 +29,7 @@ import threading
 import selectors
 import weakref
 from . import _ossighelper
+import warnings
 
 from gi.repository import GLib
 
@@ -122,9 +123,24 @@ class GLibEventLoop(asyncio.SelectorEventLoop):
         self._main_loop.quit()
 
     def close(self):
+        print("closing", self)
         super().close()
         for s in list(self.__signal_handlers):
             self.remove_signal_handler(s)
+        print("closed", self)
+
+    def __del__(self, _warn=warnings.warn):
+        print('deleting', self)
+        if not self.is_closed():
+            import traceback
+            traceback.print_stack
+        super().__del__(_warn)
+
+    def __repr__(self):
+        return (
+            f'<{self.__class__.__name__} running={self.is_running()} '
+            f'closed={self.is_closed()} debug={self.get_debug()}>' + str(hash(self))
+        )
 
     if sys.platform != "win32":
         def add_signal_handler(self, sig, callback, *args):
@@ -393,8 +409,10 @@ class _Selector(selectors.BaseSelector):
         self._source.attach(loop._context)
 
     def close(self):
+        print("closing", self)
         self._source.destroy()
         super().close()
+        print("closed")
 
     def register(self, fileobj, events, data=None):
         if (not events) or (events & ~(selectors.EVENT_READ | selectors.EVENT_WRITE)):
diff --git a/tests/test_events.py b/tests/test_events.py
index 8fc4de4d..470f1ebb 100644
--- a/tests/test_events.py
+++ b/tests/test_events.py
@@ -63,6 +63,14 @@ if sys.platform != 'win32':
 
                     self.assertEqual(warns, [])
 
+        def test_popen_error_with_stdin_pipe(self):
+            # Issue #35721: check that newly created socket pair is closed when
+            # Popen fails
+            import subprocess
+            print(self.loop)
+            self.loop.run_until_complete(
+                self._test_popen_error(stdin=subprocess.PIPE))
+
         def setUp(self):
             super().setUp()
             policy = gi.events.GLibEventLoopPolicy()


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]