[geary/wip/778276-better-flag-updates: 12/25] Rename Geary.Nonblocking.Mailbox to Queue, make constructed type explicit.



commit 9a83e95b89a0e4bbce453a4e40da6fce920be22d
Author: Michael James Gratton <mike vee net>
Date:   Wed Nov 22 12:00:37 2017 +1100

    Rename Geary.Nonblocking.Mailbox to Queue, make constructed type explicit.
    
    Mailbox would produce a FIFO or priority queue depending on if you pass
    in a comparator or not. This adds additional constructors to make that
    explicit, and also now allows the FIFO to have its own equality
    function. Renames the class to something less confusing for an email
    library.
    
    Also adds doc comments for all public members.
    
    * src/engine/nonblocking/nonblocking-queue.vala: Renamed from
      nonblocking-mailbox.vala, rename class from Mailbox to Queue, rename
      recv_async to just 'receive'. Provide full documentation
      comments. Update call sites.

 po/POTFILES.in                                     |    2 +-
 src/CMakeLists.txt                                 |    2 +-
 src/engine/app/app-draft-manager.vala              |   10 +-
 .../app-conversation-operation-queue.vala          |    6 +-
 src/engine/imap-db/outbox/smtp-outbox-folder.vala  |    4 +-
 .../imap-engine-account-synchronizer.vala          |   11 +-
 .../imap-engine/imap-engine-replay-queue.vala      |   17 ++-
 src/engine/nonblocking/nonblocking-mailbox.vala    |  109 -------------
 src/engine/nonblocking/nonblocking-queue.vala      |  171 ++++++++++++++++++++
 9 files changed, 203 insertions(+), 129 deletions(-)
---
diff --git a/po/POTFILES.in b/po/POTFILES.in
index 002f320..4f79688 100644
--- a/po/POTFILES.in
+++ b/po/POTFILES.in
@@ -339,8 +339,8 @@ src/engine/nonblocking/nonblocking-batch.vala
 src/engine/nonblocking/nonblocking-concurrent.vala
 src/engine/nonblocking/nonblocking-counting-semaphore.vala
 src/engine/nonblocking/nonblocking-error.vala
-src/engine/nonblocking/nonblocking-mailbox.vala
 src/engine/nonblocking/nonblocking-mutex.vala
+src/engine/nonblocking/nonblocking-queue.vala
 src/engine/nonblocking/nonblocking-reporting-semaphore.vala
 src/engine/nonblocking/nonblocking-variants.vala
 src/engine/rfc822/rfc822-error.vala
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index ecfbcb0..5f7fa99 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -258,8 +258,8 @@ engine/nonblocking/nonblocking-batch.vala
 engine/nonblocking/nonblocking-concurrent.vala
 engine/nonblocking/nonblocking-counting-semaphore.vala
 engine/nonblocking/nonblocking-error.vala
-engine/nonblocking/nonblocking-mailbox.vala
 engine/nonblocking/nonblocking-mutex.vala
+engine/nonblocking/nonblocking-queue.vala
 engine/nonblocking/nonblocking-reporting-semaphore.vala
 engine/nonblocking/nonblocking-variants.vala
 
diff --git a/src/engine/app/app-draft-manager.vala b/src/engine/app/app-draft-manager.vala
index 068ee90..798e7f0 100644
--- a/src/engine/app/app-draft-manager.vala
+++ b/src/engine/app/app-draft-manager.vala
@@ -125,7 +125,8 @@ public class Geary.App.DraftManager : BaseObject {
     private Folder? drafts_folder = null;
     private FolderSupport.Create? create_support = null;
     private FolderSupport.Remove? remove_support = null;
-    private Nonblocking.Mailbox<Operation?> mailbox = new Nonblocking.Mailbox<Operation?>();
+    private Nonblocking.Queue<Operation?> mailbox =
+        new Nonblocking.Queue<Operation?>.fifo();
     private bool was_opened = false;
     private Error? fatal_err = null;
     
@@ -380,16 +381,15 @@ public class Geary.App.DraftManager : BaseObject {
             // reporting it again
             if (fatal_err != null)
                 break;
-            
+
             Operation op;
             try {
-                op = yield mailbox.recv_async(null);
+                op = yield mailbox.receive(null);
             } catch (Error err) {
                 fatal(err);
-                
                 break;
             }
-            
+
             bool continue_loop = yield operation_loop_iteration_async(op);
             
             // fire semaphore, if present
diff --git a/src/engine/app/conversation-monitor/app-conversation-operation-queue.vala 
b/src/engine/app/conversation-monitor/app-conversation-operation-queue.vala
index caa98a7..76065cf 100644
--- a/src/engine/app/conversation-monitor/app-conversation-operation-queue.vala
+++ b/src/engine/app/conversation-monitor/app-conversation-operation-queue.vala
@@ -9,8 +9,8 @@ private class Geary.App.ConversationOperationQueue : BaseObject {
     public Geary.SimpleProgressMonitor progress_monitor { get; private set; default = 
         new Geary.SimpleProgressMonitor(Geary.ProgressType.ACTIVITY); }
     
-    private Geary.Nonblocking.Mailbox<ConversationOperation> mailbox
-        = new Geary.Nonblocking.Mailbox<ConversationOperation>();
+    private Geary.Nonblocking.Queue<ConversationOperation> mailbox
+        = new Geary.Nonblocking.Queue<ConversationOperation>.fifo();
     private Geary.Nonblocking.Spinlock processing_done_spinlock
         = new Geary.Nonblocking.Spinlock();
     
@@ -61,7 +61,7 @@ private class Geary.App.ConversationOperationQueue : BaseObject {
         for (;;) {
             ConversationOperation op;
             try {
-                op = yield mailbox.recv_async();
+                op = yield mailbox.receive();
             } catch (Error e) {
                 debug("Error processing in conversation operation mailbox: %s", e.message);
                 break;
diff --git a/src/engine/imap-db/outbox/smtp-outbox-folder.vala 
b/src/engine/imap-db/outbox/smtp-outbox-folder.vala
index 4211392..4e08bb6 100644
--- a/src/engine/imap-db/outbox/smtp-outbox-folder.vala
+++ b/src/engine/imap-db/outbox/smtp-outbox-folder.vala
@@ -81,7 +81,7 @@ private class Geary.SmtpOutboxFolder :
     private ImapDB.Database db;
 
     private Cancellable? queue_cancellable = null;
-    private Nonblocking.Mailbox<OutboxRow> outbox_queue = new Nonblocking.Mailbox<OutboxRow>();
+    private Nonblocking.Queue<OutboxRow> outbox_queue = new Nonblocking.Queue<OutboxRow>.fifo();
     private Geary.ProgressMonitor sending_monitor;
     private SmtpOutboxFolderProperties _properties = new SmtpOutboxFolderProperties(0, 0);
     private int64 next_ordering = 0;
@@ -129,7 +129,7 @@ private class Geary.SmtpOutboxFolder :
             OutboxRow? row = null;
             bool row_handled = false;
             try {
-                row = yield this.outbox_queue.recv_async(cancellable);
+                row = yield this.outbox_queue.receive(cancellable);
                 row_handled = yield postman_send(row, cancellable);
             } catch (SmtpError err) {
                 ProblemType problem = ProblemType.GENERIC_ERROR;
diff --git a/src/engine/imap-engine/imap-engine-account-synchronizer.vala 
b/src/engine/imap-engine/imap-engine-account-synchronizer.vala
index ca0101e..b8dd07b 100644
--- a/src/engine/imap-engine/imap-engine-account-synchronizer.vala
+++ b/src/engine/imap-engine/imap-engine-account-synchronizer.vala
@@ -12,9 +12,12 @@ private class Geary.ImapEngine.AccountSynchronizer : Geary.BaseObject {
     private weak GenericAccount account { get; private set; }
     private weak Imap.Account remote { get; private set; }
 
-    private Nonblocking.Mailbox<MinimalFolder> bg_queue = new 
Nonblocking.Mailbox<MinimalFolder>(bg_queue_comparator);
-    private Gee.HashSet<MinimalFolder> made_available = new Gee.HashSet<MinimalFolder>();
-    private Gee.HashSet<FolderPath> unavailable_paths = new Gee.HashSet<FolderPath>();
+    private Nonblocking.Queue<MinimalFolder> bg_queue =
+        new Nonblocking.Queue<MinimalFolder>.priority(bg_queue_comparator);
+    private Gee.HashSet<MinimalFolder> made_available =
+        new Gee.HashSet<MinimalFolder>();
+    private Gee.HashSet<FolderPath> unavailable_paths =
+        new Gee.HashSet<FolderPath>();
     private MinimalFolder? current_folder = null;
     private Cancellable? bg_cancellable = null;
     private DateTime max_epoch = new DateTime(new TimeZone.local(), 2000, 1, 1, 0, 0, 0.0);
@@ -219,7 +222,7 @@ private class Geary.ImapEngine.AccountSynchronizer : Geary.BaseObject {
         while (!cancellable.is_cancelled()) {
             MinimalFolder folder;
             try {
-                folder = yield bg_queue.recv_async(bg_cancellable);
+                folder = yield bg_queue.receive(bg_cancellable);
             } catch (Error err) {
                 if (!(err is IOError.CANCELLED))
                     debug("Failed to receive next folder for background sync: %s", err.message);
diff --git a/src/engine/imap-engine/imap-engine-replay-queue.vala 
b/src/engine/imap-engine/imap-engine-replay-queue.vala
index be4056f..00301df 100644
--- a/src/engine/imap-engine/imap-engine-replay-queue.vala
+++ b/src/engine/imap-engine/imap-engine-replay-queue.vala
@@ -4,6 +4,13 @@
  * (version 2.1 or later).  See the COPYING file in this distribution.
  */
 
+/**
+ * Interleaves IMAP operations to maintain consistent sequence numbering.
+ *
+ * The replay queue manages and executes operations originating both
+ * locally and from the server for a specific IMAP mailbox so as to
+ * ensure the execution of the operations maintains consistent.
+ */
 private class Geary.ImapEngine.ReplayQueue : Geary.BaseObject {
     // this value is high because delays between back-to-back unsolicited notifications have been
     // see as high as 250ms
@@ -56,8 +63,10 @@ private class Geary.ImapEngine.ReplayQueue : Geary.BaseObject {
     } }
     
     private weak MinimalFolder owner;
-    private Nonblocking.Mailbox<ReplayOperation> local_queue = new Nonblocking.Mailbox<ReplayOperation>();
-    private Nonblocking.Mailbox<ReplayOperation> remote_queue = new Nonblocking.Mailbox<ReplayOperation>();
+    private Nonblocking.Queue<ReplayOperation> local_queue =
+        new Nonblocking.Queue<ReplayOperation>.fifo();
+    private Nonblocking.Queue<ReplayOperation> remote_queue =
+        new Nonblocking.Queue<ReplayOperation>.fifo();
     private ReplayOperation? local_op_active = null;
     private ReplayOperation? remote_op_active = null;
     private Gee.ArrayList<ReplayOperation> notification_queue = new Gee.ArrayList<ReplayOperation>();
@@ -359,7 +368,7 @@ private class Geary.ImapEngine.ReplayQueue : Geary.BaseObject {
         while (queue_running) {
             ReplayOperation op;
             try {
-                op = yield local_queue.recv_async();
+                op = yield local_queue.receive();
             } catch (Error recv_err) {
                 debug("Unable to receive next replay operation on local queue %s: %s", to_string(),
                     recv_err.message);
@@ -459,7 +468,7 @@ private class Geary.ImapEngine.ReplayQueue : Geary.BaseObject {
             // wait for the next operation ... do this *before* waiting for remote
             ReplayOperation op;
             try {
-                op = yield remote_queue.recv_async();
+                op = yield remote_queue.receive();
             } catch (Error recv_err) {
                 debug("Unable to receive next replay operation on remote queue %s: %s", to_string(),
                     recv_err.message);
diff --git a/src/engine/nonblocking/nonblocking-queue.vala b/src/engine/nonblocking/nonblocking-queue.vala
new file mode 100644
index 0000000..984a6f2
--- /dev/null
+++ b/src/engine/nonblocking/nonblocking-queue.vala
@@ -0,0 +1,171 @@
+/*
+ * Copyright 2016 Software Freedom Conservancy Inc.
+ * Copyright 2017 Michael Gratton <mike vee net>
+ *
+ * This software is licensed under the GNU Lesser General Public License
+ * (version 2.1 or later).  See the COPYING file in this distribution.
+ */
+
+/**
+ * An asynchronous queue, first-in first-out (FIFO) or priority.
+ *
+ * This class can be used to asynchronously wait for items to be added
+ * to the queue, the asynchronous call blocking until an item is
+ * ready.
+ */
+public class Geary.Nonblocking.Queue<G> : BaseObject {
+
+    /** Returns the number of items currently in the queue. */
+    public int size { get { return queue.size; } }
+
+    /**
+     * Determines if duplicate items can be added to the queue.
+     *
+     * If a priory queue, this applies to items of the same priority,
+     * otherwise uses the item's natural identity.
+     */
+    public bool allow_duplicates { get; set; default = true; }
+
+    /**
+     * Determines if duplicate items will be added to the queue.
+     *
+     * If {@link allow_duplicates} is `true` and an item is already in
+     * the queue, this determines if it will be added again.
+     */
+    public bool requeue_duplicate { get; set; default = false; }
+
+    /**
+     * Determines if the queue is currently running.
+     */
+    public bool is_paused {
+        get { return _is_paused; }
+
+        set {
+            // if no longer paused, wake up any waiting recipients
+            if (_is_paused && !value)
+                spinlock.blind_notify();
+
+            _is_paused = value;
+        }
+    }
+    private bool _is_paused = false;
+
+    private Gee.Queue<G> queue;
+    private Nonblocking.Spinlock spinlock = new Nonblocking.Spinlock();
+
+
+    /**
+     * Constructs a new first-in first-out (FIFO) queue.
+     *
+     * If `equalator` is not null it will be used to determine the
+     * identity of objects in the queue, else the items' natural
+     * identity will be used.
+     */
+    public Queue.fifo(owned Gee.EqualDataFunc<G>? equalator = null) {
+        this(new Gee.LinkedList<G>(equalator));
+    }
+
+    /**
+     * Constructs a new priority queue.
+     *
+     * If `comparator` is not null it will be used to determine the
+     * ordering of objects in the queue, else the items' natural
+     * ordering will be used.
+     */
+    public Queue.priority(owned CompareDataFunc<G>? comparator = null) {
+        this(new Gee.PriorityQueue<G>(comparator));
+    }
+
+    /**
+     * Constructs a new queue.
+     */
+    protected Queue(Gee.Queue<G> queue) {
+        this.queue = queue;
+    }
+
+    /**
+     * Adds an item to the queue.
+     *
+     * If the queue is a priority queue, it is added according to its
+     * relative priority, else it is added to the end.
+     *
+     * Returns `true` if the item was added to the queue.
+     */
+    public bool send(G msg) {
+        if (!allow_duplicates && queue.contains(msg)) {
+            if (requeue_duplicate)
+                queue.remove(msg);
+            else
+                return false;
+        }
+
+        if (!queue.offer(msg))
+            return false;
+
+        if (!is_paused)
+            spinlock.blind_notify();
+
+        return true;
+    }
+
+    /**
+     * Retrieves the next item from the queue, blocking until available.
+     *
+     * If the queue is paused, this will continue to wait until
+     * unpaused and an item is ready. If `cancellable` is non-null,
+     * when used will cancel this call.
+     */
+    public async G receive(Cancellable? cancellable = null) throws Error {
+        for (;;) {
+            if (queue.size > 0 && !is_paused)
+                return queue.poll();
+
+            yield spinlock.wait_async(cancellable);
+        }
+    }
+
+    /**
+     * Removes all items in queue, returning the number of removed items.
+     */
+    public int clear() {
+        int count = queue.size;
+        if (count != 0)
+            queue.clear();
+
+        return count;
+    }
+
+    /**
+     * Removes an item from the queue, returning `true` if removed.
+     */
+    public bool revoke(G msg) {
+        return queue.remove(msg);
+    }
+
+    /**
+     * Remove items matching the given predicate, returning those removed.
+     */
+    public Gee.Collection<G> revoke_matching(owned Gee.Predicate<G> predicate) {
+        Gee.ArrayList<G> removed = new Gee.ArrayList<G>();
+        // Iterate over a copy so we can modify the original.
+        foreach (G msg in queue.to_array()) {
+            if (predicate(msg)) {
+                queue.remove(msg);
+                removed.add(msg);
+            }
+        }
+
+        return removed;
+    }
+
+    /**
+     * Returns a read-only version of the queue queue.
+     *
+     * Since the queue could potentially alter when the main loop
+     * runs, it's important to only examine the queue when not
+     * allowing other operations to process.
+     */
+    public Gee.Collection<G> get_all() {
+        return queue.read_only_view;
+    }
+}


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]