[gnome-music/wip/mschraal/prioritypool-v3: 5/5] asyncqueue: Add a slow running task pool




commit fc26f85051fa8591d93dd0f63f9f35e671265dce
Author: Marinus Schraal <mschraal gnome org>
Date:   Tue Feb 1 21:49:18 2022 +0100

    asyncqueue: Add a slow running task pool

 gnomemusic/asyncqueue.py | 30 ++++++++++++++++++++++++++----
 1 file changed, 26 insertions(+), 4 deletions(-)
---
diff --git a/gnomemusic/asyncqueue.py b/gnomemusic/asyncqueue.py
index 9b39ab6e5..813f95285 100644
--- a/gnomemusic/asyncqueue.py
+++ b/gnomemusic/asyncqueue.py
@@ -22,7 +22,8 @@
 # code, but you are not obligated to do so.  If you do not wish to do so,
 # delete this exception statement from your version.
 
-from typing import Any, Dict, List, Optional, Tuple
+from collections import deque
+from typing import Any, Deque, Dict, List, Optional, Tuple
 import time
 
 from gi.repository import GObject, GLib
@@ -58,8 +59,10 @@ class AsyncQueue(GObject.GObject):
         self._async_pool: Dict[int, Tuple] = {}
         self._async_pool_coreobject_hash: Dict = {}
         self._async_active_pool: Dict[int, Tuple] = {}
+        self._async_active_slow_pool: Dict[int, Tuple] = {}
         self._async_data: Dict[object, Tuple[int, float]] = {}
         self._log = MusicLogger()
+        self._task_tracker: Deque[Any] = deque([], 5)
         self._max_async = 4
         self._priority_pool = PriorityPool()
         self._queue_name = queue_name if queue_name else self
@@ -76,7 +79,8 @@ class AsyncQueue(GObject.GObject):
         async_obj_id = id(args[0])
 
         if (async_obj_id not in self._async_pool
-                and async_obj_id not in self._async_active_pool):
+                and async_obj_id not in self._async_active_pool
+                and async_obj_id not in self._async_active_slow_pool):
             self._async_pool[async_obj_id] = (args)
             self._async_pool_coreobject_hash[async_obj_id] = id(args[1])
 
@@ -84,8 +88,9 @@ class AsyncQueue(GObject.GObject):
             self._timeout_id = GLib.timeout_add(100, self._dispatch)
 
     def _dispatch(self) -> bool:
-        common_ids = self._common_ids()
+        self._track_long_running_tasks()
 
+        common_ids = self._common_ids()
         while len(self._async_active_pool) < self._max_async:
             if len(self._async_pool) == 0:
                 self._timeout_id = 0
@@ -127,8 +132,25 @@ class AsyncQueue(GObject.GObject):
             f"{a} active task(s) of {len(self._async_pool) + a}")
 
         obj.disconnect(handler_id)
-        self._async_active_pool.pop(id(obj))
+        if id(obj) in self._async_active_pool.keys():
+            self._async_active_pool.pop(id(obj))
+        else:
+            self._async_active_slow_pool.pop(id(obj))
 
     def _common_ids(self) -> List[int]:
         return list(set(self._priority_pool.props.pool).intersection(
             self._async_pool_coreobject_hash.values()))
+
+    def _track_long_running_tasks(self) -> None:
+        self._task_tracker.append(self._async_active_pool.keys())
+
+        if len(self._task_tracker) < 5:
+            return
+
+        common_task_ids = self._task_tracker[0]
+        for _, task_ids in enumerate(self._task_tracker):
+            common_task_ids = set(common_task_ids).intersection(task_ids)
+
+        for task_id in common_task_ids:
+            self._async_active_slow_pool[
+                task_id] = self._async_active_pool.pop(task_id)


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]