[meld] filediff: First pass at replacing process- with threaded-matching



commit 010eb6156e60db3ea5cd715c07a5f5b8a9cb7ea0
Author: Kai Willadsen <kai willadsen gmail com>
Date:   Sun Dec 21 11:57:33 2014 +1000

    filediff: First pass at replacing process- with threaded-matching

 meld/filediff.py |   64 +++++++++++++++++++++++++++++++++++++++++++----------
 1 files changed, 52 insertions(+), 12 deletions(-)
---
diff --git a/meld/filediff.py b/meld/filediff.py
index 2ec91df..300df2e 100644
--- a/meld/filediff.py
+++ b/meld/filediff.py
@@ -16,13 +16,13 @@
 
 import copy
 import functools
+import logging
 import math
 import os
+import queue
+import threading
 import time
 
-from multiprocessing.pool import ThreadPool
-
-
 from gi.repository import GLib
 from gi.repository import GObject
 from gi.repository import Gio
@@ -48,6 +48,31 @@ from meld.settings import bind_settings, meldsettings
 from meld.sourceview import LanguageManager, get_custom_encoding_candidates
 
 
+log = logging.getLogger(__name__)
+
+
+class MatcherWorker(threading.Thread):
+
+    def __init__(self, tasks, results):
+        super(MatcherWorker, self).__init__()
+        self.tasks = tasks
+        self.results = results
+        self.daemon = True
+        self.start()
+
+    def run(self):
+        while True:
+            task_id, (text1, textn) = self.tasks.get()
+            try:
+                opcodes = matchers.matcher_worker(text1, textn)
+                self.results.put((task_id, opcodes))
+            except Exception as e:
+                log.error("Exception while running diff: %s", e)
+            finally:
+                self.tasks.task_done()
+            time.sleep(0)
+
+
 def with_focused_pane(function):
     @functools.wraps(function)
     def wrap_function(*args, **kwargs):
@@ -67,21 +92,36 @@ class CachedSequenceMatcher(object):
     """
 
     def __init__(self):
-        self.process_pool = ThreadPool(None)
         self.cache = {}
+        self.tasks = queue.Queue()
+        self.results = queue.Queue()
+        self.thread = MatcherWorker(self.tasks, self.results)
+        self.task_id = 1
+        self.queued_matches = {}
 
     def match(self, text1, textn, cb):
+        texts = (text1, textn)
         try:
-            self.cache[(text1, textn)][1] = time.time()
-            opcodes = self.cache[(text1, textn)][0]
+            self.cache[texts][1] = time.time()
+            opcodes = self.cache[texts][0]
             GLib.idle_add(lambda: cb(opcodes))
         except KeyError:
-            def inline_cb(opcodes):
-                self.cache[(text1, textn)] = [opcodes, time.time()]
-                GLib.idle_add(lambda: cb(opcodes))
-            self.process_pool.apply_async(matchers.matcher_worker,
-                                          (text1, textn),
-                                          callback=inline_cb)
+            self.tasks.put((self.task_id, texts))
+            if not bool(self.queued_matches):
+                GLib.idle_add(self.check_results)
+            self.queued_matches[self.task_id] = (texts, cb)
+            self.task_id += 1
+
+    def check_results(self):
+        try:
+            task_id, opcodes = self.results.get_nowait()
+            texts, cb = self.queued_matches.pop(task_id)
+            self.cache[texts] = [opcodes, time.time()]
+            GLib.idle_add(lambda: cb(opcodes))
+        except queue.Empty:
+            pass
+
+        return bool(self.queued_matches)
 
     def clean(self, size_hint):
         """Clean the cache if necessary


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]