[geary/wip/789924-network-transition-redux: 1/2] Make ConversationMonitor more robust with no/changing connectivity.



commit cf86d07b54b947dd81bd2c37d35d1c1640ee2c2f
Author: Michael James Gratton <mike vee net>
Date:   Sat Mar 3 10:56:29 2018 +1100

    Make ConversationMonitor more robust with no/changing connectivity.
    
    Conversation monitor was built around older assumptions of how a folder's
    remote connections work - that once a folder opens it will likely also
    eventually establish a remote connection, that once the connection is up
    it will hang around, and so on.
    
    This patch removes any public notion of (re)seeding, since it can't be
    relied to actually happen over the course of the session, ensures that
    all folder operations are local-only when the folder does not have a
    working remote connection so it doesn't block, and take the opportunity
    to reorganise and clean up the monitor API and documentation comments.
    
    * src/engine/app/app-conversation-monitor.vala (ConversationMonitor):
      Remove seed signals, and don't bother running an initial reseed if the
      folder is already open, since the fill operation will cause any locally
      incomplete messages to be filled out from the report. Manage and use an
      internal Cancellable for cancelling internal operations when shutting
      down. Construct a queue only when starting to monitor conversations,
      delete it when stopping. Move as much operation-specific code into the
      operations themselves as reasonably possible, making some methods
      internal so thy can be accessed from the ops. Ensure all folder listing
      operations specify LOCAL_ONLY when the remote is not open. Removed
      LocalLoadOperation since that is now redundant. Update the API for
      accessing conversations to match Gee conventions and update call
      sites. Update documentation comments. Hook back up to locally-complete
      signals so we don't miss emails being filled out by the prefetcher, for
      now.
    
    * src/engine/app/conversation-monitor/app-conversation-set.vala
      (ConversationSet): Rename conversations property to match Gee
      conventions, update call sites.
    
    * src/engine/app/conversation-monitor/app-conversation-operation.vala
      (ConversationOperation): Allow operations to specify if they should
      allow duplicates, and allow the execution method to throw errors, so
      they can be handled in a uniform way.
    
    * src/engine/app/conversation-monitor/app-conversation-operation-queue.vala
      (ConversationOperationQueue): Accept progress monitor property as a
      ctor arg rather than constructing on itself, so it is tied to the
      life-cycle of the ConversationMonitor rather than the queue. Add a
      signal for notifying of errors thrown when running operations, and use
      the new operation-independent support for determining if duplicates
      should be queued.
    
    * src/engine/app/conversation-monitor/app-fill-window-operation.vala
      (FillWindowOperation): Enforce a maximum window size as well as minimum
      to keep loading large windows semi-responsive. Remove code to handle
      inserts now that they are handled by their own op.
    
    * src/engine/app/conversation-monitor/app-insert-operation.vala
      (InsertOperation): New operation to manage inserts, handle it them by
      simply adding them to the conversation if they are newer than the
      oldest message, rather that relisting all loaded messages.

 po/POTFILES.in                                     |    2 +-
 src/CMakeLists.txt                                 |    2 +-
 src/client/application/geary-controller.vala       |   54 +-
 .../conversation-list/conversation-list-store.vala |    8 +-
 .../conversation-list/conversation-list-view.vala  |   22 +-
 src/engine/app/app-conversation-monitor.vala       | 1006 +++++++++-----------
 .../conversation-monitor/app-append-operation.vala |   21 +-
 .../app-conversation-operation-queue.vala          |   73 +-
 .../app-conversation-operation.vala                |   31 +-
 .../conversation-monitor/app-conversation-set.vala |    3 +-
 .../app-external-append-operation.vala             |   27 +-
 .../app-fill-window-operation.vala                 |   58 +-
 .../conversation-monitor/app-insert-operation.vala |   46 +
 .../app-local-load-operation.vala                  |   15 -
 .../conversation-monitor/app-remove-operation.vala |   37 +-
 .../conversation-monitor/app-reseed-operation.vala |   44 +-
 src/engine/meson.build                             |    2 +-
 17 files changed, 776 insertions(+), 675 deletions(-)
---
diff --git a/po/POTFILES.in b/po/POTFILES.in
index 4109e7a..feeb76e 100644
--- a/po/POTFILES.in
+++ b/po/POTFILES.in
@@ -147,7 +147,7 @@ src/engine/app/conversation-monitor/app-conversation-operation.vala
 src/engine/app/conversation-monitor/app-conversation-set.vala
 src/engine/app/conversation-monitor/app-external-append-operation.vala
 src/engine/app/conversation-monitor/app-fill-window-operation.vala
-src/engine/app/conversation-monitor/app-local-load-operation.vala
+src/engine/app/conversation-monitor/app-insert-operation.vala
 src/engine/app/conversation-monitor/app-local-search-operation.vala
 src/engine/app/conversation-monitor/app-remove-operation.vala
 src/engine/app/conversation-monitor/app-reseed-operation.vala
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 7206924..91c3d7d 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -57,7 +57,7 @@ engine/app/conversation-monitor/app-conversation-operation.vala
 engine/app/conversation-monitor/app-conversation-set.vala
 engine/app/conversation-monitor/app-external-append-operation.vala
 engine/app/conversation-monitor/app-fill-window-operation.vala
-engine/app/conversation-monitor/app-local-load-operation.vala
+engine/app/conversation-monitor/app-insert-operation.vala
 engine/app/conversation-monitor/app-local-search-operation.vala
 engine/app/conversation-monitor/app-remove-operation.vala
 engine/app/conversation-monitor/app-reseed-operation.vala
diff --git a/src/client/application/geary-controller.vala b/src/client/application/geary-controller.vala
index 3fe7fdc..db89246 100644
--- a/src/client/application/geary-controller.vala
+++ b/src/client/application/geary-controller.vala
@@ -1366,16 +1366,17 @@ public class GearyController : Geary.BaseObject {
             current_folder,
             Geary.Folder.OpenFlags.NO_DELAY,
             ConversationListStore.REQUIRED_FIELDS,
-            MIN_CONVERSATION_COUNT);
+            MIN_CONVERSATION_COUNT
+        );
 
         if (inboxes.values.contains(current_folder)) {
             // Inbox selected, clear new messages if visible
             clear_new_messages("do_select_folder (inbox)", null);
         }
-        
+
+        current_conversations.scan_completed.connect(on_scan_completed);
         current_conversations.scan_error.connect(on_scan_error);
-        current_conversations.seed_completed.connect(on_seed_completed);
-        current_conversations.seed_completed.connect(on_conversation_count_changed);
+
         current_conversations.scan_completed.connect(on_conversation_count_changed);
         current_conversations.conversations_added.connect(on_conversation_count_changed);
         current_conversations.conversations_removed.connect(on_conversation_count_changed);
@@ -1388,23 +1389,11 @@ public class GearyController : Geary.BaseObject {
         debug("Switched to %s", folder.to_string());
     }
 
-    private void on_scan_error(Error err) {
-        debug("Scan error: %s", err.message);
-    }
-    
-    private void on_seed_completed() {
-        // Done scanning.  Check if we have enough messages to fill the conversation list; if not,
-        // trigger a load_more();
-        if (!main_window.conversation_list_has_scrollbar()) {
-            debug("Not enough messages, loading more for folder %s", current_folder.to_string());
-            on_load_more();
-        }
-    }
-
     private void on_conversation_count_changed() {
         if (this.current_conversations != null) {
+            ConversationListView list = this.main_window.conversation_list_view;
             ConversationViewer viewer = this.main_window.conversation_viewer;
-            int count = this.current_conversations.get_conversation_count();
+            int count = this.current_conversations.size;
             if (count == 0) {
                 // Let the user know if there's no available conversations
                 if (this.current_folder is Geary.SearchFolder) {
@@ -1415,9 +1404,10 @@ public class GearyController : Geary.BaseObject {
                 enable_message_buttons(false);
             } else {
                 // When not doing autoselect, we never get
-                // conversations_selected firing from the convo list,
-                // so we need to stop the loading spinner here
-                if (!this.application.config.autoselect) {
+                // conversations_selected firing from the convo list, so
+                // we need to stop the loading spinner here
+                if (!this.application.config.autoselect &&
+                    list.get_selection().count_selected_rows() == 0) {
                     viewer.show_none_selected();
                     enable_message_buttons(false);
                 }
@@ -1433,7 +1423,7 @@ public class GearyController : Geary.BaseObject {
             return;
         
         main_window.folder_list.select_folder(folder);
-        Geary.App.Conversation? conversation = current_conversations.get_conversation_for_email(email.id);
+        Geary.App.Conversation? conversation = current_conversations.get_by_email_identifier(email.id);
         if (conversation != null)
             main_window.conversation_list_view.select_conversation(conversation);
     }
@@ -2851,6 +2841,26 @@ public class GearyController : Geary.BaseObject {
         }
     }
 
+    private void on_scan_completed() {
+        // Done scanning.  Check if we have enough messages to fill
+        // the conversation list; if not, trigger a load_more();
+        if (!main_window.conversation_list_has_scrollbar()) {
+            debug("Not enough messages, loading more for folder %s", current_folder.to_string());
+            on_load_more();
+        }
+    }
+
+    private void on_scan_error(Geary.App.ConversationMonitor monitor, Error err) {
+        // XXX determine the problem better here
+        report_problem(
+            new Geary.AccountProblemReport(
+                Geary.ProblemType.GENERIC_ERROR,
+                monitor.base_folder.account.information,
+                err
+            )
+        );
+    }
+
     private void on_save_attachments(Gee.Collection<Geary.Attachment> attachments) {
         if (attachments.size == 1) {
             this.save_attachment_to_file.begin(attachments.to_array()[0], null);
diff --git a/src/client/conversation-list/conversation-list-store.vala 
b/src/client/conversation-list/conversation-list-store.vala
index 695e1ed..99821ea 100644
--- a/src/client/conversation-list/conversation-list-store.vala
+++ b/src/client/conversation-list/conversation-list-store.vala
@@ -119,7 +119,7 @@ public class ConversationListStore : Gtk.ListStore {
         conversations.email_flags_changed.connect(on_email_flags_changed);
 
         // add all existing conversations
-        on_conversations_added(conversations.get_conversations());
+        on_conversations_added(conversations.read_only_view);
     }
 
     public void destroy() {
@@ -149,7 +149,7 @@ public class ConversationListStore : Gtk.ListStore {
         // same set
         int token;
         try {
-            token = yield refresh_mutex.claim_async();
+            token = yield refresh_mutex.claim_async(this.cancellable);
         } catch (Error err) {
             debug("Unable to claim refresh mutex: %s", err.message);
             
@@ -184,7 +184,7 @@ public class ConversationListStore : Gtk.ListStore {
         
         debug("Displaying %d previews for %s...", emails.size, conversation_monitor.base_folder.to_string());
         foreach (Geary.Email email in emails) {
-            Geary.App.Conversation? conversation = conversation_monitor.get_conversation_for_email(email.id);
+            Geary.App.Conversation? conversation = conversation_monitor.get_by_email_identifier(email.id);
             if (conversation != null)
                 set_preview_for_conversation(conversation, email);
             else
@@ -219,7 +219,7 @@ public class ConversationListStore : Gtk.ListStore {
         // the user experience
         Gee.TreeSet<Geary.App.Conversation> sorted_conversations = new Gee.TreeSet<Geary.App.Conversation>(
             compare_conversation_descending);
-        sorted_conversations.add_all(this.conversations.get_conversations());
+        sorted_conversations.add_all(this.conversations.read_only_view);
         foreach (Geary.App.Conversation conversation in sorted_conversations) {
             // find oldest unread message for the preview
             Geary.Email? need_preview = null;
diff --git a/src/client/conversation-list/conversation-list-view.vala 
b/src/client/conversation-list/conversation-list-view.vala
index 3c7d6ec..fb86bb0 100644
--- a/src/client/conversation-list/conversation-list-view.vala
+++ b/src/client/conversation-list/conversation-list-view.vala
@@ -138,30 +138,25 @@ public class ConversationListView : Gtk.TreeView {
         if (conversation_monitor != null) {
             conversation_monitor.scan_started.disconnect(on_scan_started);
             conversation_monitor.scan_completed.disconnect(on_scan_completed);
-            conversation_monitor.seed_completed.disconnect(on_seed_completed);
         }
-        
+
         conversation_monitor = GearyApplication.instance.controller.current_conversations;
-        
+
         if (conversation_monitor != null) {
             conversation_monitor.scan_started.connect(on_scan_started);
             conversation_monitor.scan_completed.connect(on_scan_completed);
-            conversation_monitor.seed_completed.connect(on_seed_completed);
         }
     }
-    
+
     private void on_scan_started() {
         enable_load_more = false;
     }
-    
+
     private void on_scan_completed() {
         enable_load_more = true;
 
         // Select the first conversation, if autoselect is enabled,
-        // nothing has been selected yet and we're not composing. Do
-        // this here instead of in on_seed_completed since we want to
-        // to select the first row on folder change as soon as
-        // possible.
+        // nothing has been selected yet and we're not composing.
         if (GearyApplication.instance.config.autoselect &&
             get_selection().count_selected_rows() == 0 &&
             !GearyApplication.instance.controller.any_inline_composers()) {
@@ -169,13 +164,6 @@ public class ConversationListView : Gtk.TreeView {
         }
     }
 
-    private void on_seed_completed() {
-        if (!GearyApplication.instance.config.autoselect) {
-            // Notify that no conversations will be selected
-            conversations_selected(this.selected.read_only_view);
-        }
-    }
-
     private void on_conversations_added(bool start) {
         Gtk.Adjustment? adjustment = get_adjustment();
         if (start) {
diff --git a/src/engine/app/app-conversation-monitor.vala b/src/engine/app/app-conversation-monitor.vala
index 3f111f1..40817f5 100644
--- a/src/engine/app/app-conversation-monitor.vala
+++ b/src/engine/app/app-conversation-monitor.vala
@@ -1,19 +1,54 @@
-/* Copyright 2016 Software Freedom Conservancy Inc.
+/*
+ * Copyright 2016 Software Freedom Conservancy Inc.
+ * Copyright 2018 Michael Gratton <mike vee net>
  *
  * This software is licensed under the GNU Lesser General Public License
  * (version 2.1 or later).  See the COPYING file in this distribution.
  */
 
+/**
+ * Loads conversation in a folder and monitors changes to them.
+ *
+ * The standard IMAP model does not provide a means of easily
+ * aggregating messages in a single conversation across mailboxes,
+ * hence this class provides applications with a means of loading
+ * complete conversations, one for each email message found in a
+ * specified base folder. This is an expensive operation since it may
+ * require opening several other folders to find all messages in the
+ * conversation, and so the monitor is lazy and will only load enough
+ * conversations to fill a minimal window size. Additional
+ * conversations can be loaded afterwards as needed.
+ *
+ * When monitoring starts via a call to {@link
+ * start_monitoring_async}, the folder will perform an initial
+ * //scan// of messages in the base folder and load conversation load
+ * based on that. Increasing {@link min_window_count} will cause
+ * additional scan operations to be executed as needed to fill the new
+ * window size.
+ *
+ * If the folder is backed by a remote mailbox, scans will be
+ * local-only if the remote is not open so as to not block. However
+ * this means any messages (and their conversations) that are not
+ * sufficiently complete to satisfy both the monitor's and the owner's
+ * email field requirements will not be found. If or when the folder
+ * does open a remote connection, the folder will be re-scanned to
+ * ensure any missing messages are picked up.
+ *
+ * The monitor will also keep track of messages being appended or
+ * removed account-wide, so that known conversations can be updated as
+ * needed.
+ */
 public class Geary.App.ConversationMonitor : BaseObject {
+
     /**
-     * These are the fields Conversations require to thread emails together.  These fields will
-     * be retrieved regardless of the Field parameter passed to the constructor.
+     * The fields Conversations require to thread emails together.
+     *
+     * These fields will be retrieved regardless of the Field
+     * parameter passed to the constructor.
      */
     public const Geary.Email.Field REQUIRED_FIELDS = Geary.Email.Field.REFERENCES |
         Geary.Email.Field.FLAGS | Geary.Email.Field.DATE;
 
-    // # of messages to load at a time as we attempt to fill the min window.
-    private const int WINDOW_FILL_MESSAGE_COUNT = 5;
 
     private class ProcessJobContext : BaseObject {
         public Gee.HashMap<Geary.EmailIdentifier, Geary.Email> emails
@@ -27,6 +62,23 @@ public class Geary.App.ConversationMonitor : BaseObject {
     }
 
 
+    /**
+     * A read-only view of loaded conversations.
+     *
+     * Note that since background tasks may asynchronously update the
+     * set at ant time, any asynchronous tasks carried out while
+     * holding an returned by this method may allow the iterator to
+     * become invalid.
+     */
+    public Gee.Set<Conversation> read_only_view {
+        owned get { return this.conversations.read_only_view; }
+    }
+
+    /**
+     * Number of conversations currently loaded by the monitor.
+     */
+    public int size { get { return this.conversations.size; } }
+
     /** Folder from which the conversation is originating. */
     public Folder base_folder { get; private set; }
 
@@ -34,53 +86,40 @@ public class Geary.App.ConversationMonitor : BaseObject {
     public bool is_monitoring { get; private set; default = false; }
 
     /** Minimum number of emails large conversations should contain. */
-    public int min_window_count { get { return _min_window_count; }
+    public int min_window_count {
+        get { return _min_window_count; }
         set {
             _min_window_count = value;
-            operation_queue.add(new FillWindowOperation(this, false));
+            check_window_count();
         }
     }
+    private int _min_window_count = 0;
+
+    /** Indicates progress conversations are being loaded. */
+    public ProgressMonitor progress_monitor {
+        get; private set; default = new SimpleProgressMonitor(ProgressType.ACTIVITY);
+    }
 
-    /** Indicates process loading conversations. */
-    public Geary.ProgressMonitor progress_monitor {
-        get { return operation_queue.progress_monitor; }
+    /** The set of all conversations loaded by the monitor. */
+    internal ConversationSet conversations {
+        get; private set; default = new ConversationSet();
     }
 
-    private ConversationSet conversations = new ConversationSet();
     private Geary.Email.Field required_fields;
     private Geary.Folder.OpenFlags open_flags;
-    private Cancellable? cancellable_monitor = null;
-    private bool reseed_notified = false;
-    private int _min_window_count = 0;
-    private ConversationOperationQueue operation_queue = new ConversationOperationQueue();
+    private ConversationOperationQueue queue = null;
+    private Cancellable? operation_cancellable = null;
 
 
     /**
-     * "monitoring-started" is fired when the Conversations folder has been opened for monitoring.
-     */
-    public virtual signal void monitoring_started() {
-        Logging.debug(Logging.Flag.CONVERSATIONS,
-                      "[%s] ConversationMonitor::monitoring_started",
-                      this.base_folder.to_string());
-    }
-
-    /**
-     * "monitoring-stopped" is fired when the Geary.Folder object has closed (either due to error
-     * or user) and the Conversations object is therefore unable to continue monitoring.
-     */
-    public virtual signal void monitoring_stopped() {
-        Logging.debug(Logging.Flag.CONVERSATIONS,
-                      "[%s] ConversationMonitor::monitoring_stopped",
-                      this.base_folder.to_string());
-    }
-
-    /**
-     * "scan-started" is fired whenever beginning to load messages into the Conversations object.
+     * Fired when a message load has started.
      *
-     * Note that more than one load can be initiated, due to Conversations being completely
-     * asynchronous.  "scan-started", "scan-error", and "scan-completed" will be fired (as
-     * appropriate) for each individual load request; that is, there is no internal counter to ensure
-     * only a single "scan-completed" is fired to indicate multiple loads have finished.
+     * Note that more than one load can be initiated, due to
+     * Conversations being completely asynchronous. Both this, and
+     * {@link scan_completed} will be fired for each individual load
+     * request; that is, there is no internal counter to ensure only a
+     * single completed signal is fired to indicate multiple loads
+     * have finished.
      */
     public virtual signal void scan_started() {
         Logging.debug(Logging.Flag.CONVERSATIONS,
@@ -89,17 +128,9 @@ public class Geary.App.ConversationMonitor : BaseObject {
     }
 
     /**
-     * "scan-error" is fired when an Error is encounted while loading messages.  It will be followed
-     * by a "scan-completed" signal.
-     */
-    public virtual signal void scan_error(Error err) {
-        Logging.debug(Logging.Flag.CONVERSATIONS,
-                      "[%s] ConversationMonitor::scan_error %s",
-                      this.base_folder.to_string(), err.message);
-    }
-
-    /**
-     * "scan-completed" is fired when the scan of the email has finished.
+     * Fired when all extant message loads have completed.
+     *
+     * @see scan_started
      */
     public virtual signal void scan_completed() {
         Logging.debug(Logging.Flag.CONVERSATIONS,
@@ -108,17 +139,19 @@ public class Geary.App.ConversationMonitor : BaseObject {
     }
 
     /**
-     * "seed-completed" is fired when the folder has opened and email has been populated.
+     * Fired when an error was encountered while loading messages.
      */
-    public virtual signal void seed_completed() {
+    public virtual signal void scan_error(Error err) {
         Logging.debug(Logging.Flag.CONVERSATIONS,
-                      "[%s] ConversationMonitor::seed_completed",
-                      this.base_folder.to_string());
+                      "[%s] ConversationMonitor::scan_error %s",
+                      this.base_folder.to_string(), err.message);
     }
 
     /**
-     * "conversations-added" indicates that one or more new Conversations have been detected while
-     * processing email, either due to a user-initiated load request or due to monitoring.
+     * Fired when one or more new conversations have been detected.
+     *
+     * This may be due to either a user-initiated load request or due
+     * to background monitoring.
      */
     public virtual signal void conversations_added(Gee.Collection<Conversation> conversations) {
         Logging.debug(Logging.Flag.CONVERSATIONS,
@@ -127,13 +160,15 @@ public class Geary.App.ConversationMonitor : BaseObject {
     }
 
     /**
-     * "conversations-removed" is fired when all the email in a Conversation has been removed.
-     * It's possible this will be called without a signal alerting that it's emails have been
-     * removed, i.e. a "conversations-removed" signal may fire with no accompanying
+     * Fired when all email in a conversation has been removed.
+     *
+     * It's possible this will be called without a signal alerting
+     * that it's emails have been removed, i.e. a
+     * "conversations-removed" signal may fire with no accompanying
      * "conversation-trimmed".
      *
-     * Note that this can only occur when monitoring is enabled.  There is (currently) no
-     * user call to manually remove email from Conversations.
+     * This may be due to either a user-initiated load request or due
+     * to background monitoring.
      */
     public virtual signal void conversations_removed(Gee.Collection<Conversation> conversations) {
         Logging.debug(Logging.Flag.CONVERSATIONS,
@@ -142,9 +177,10 @@ public class Geary.App.ConversationMonitor : BaseObject {
     }
 
     /**
-     * "conversation-appended" is fired when one or more Email objects have been added to the
-     * specified Conversation.  This can happen due to a user-initiated load or while monitoring
-     * the Folder.
+     * Fired when one or more email have been added to a conversation.
+     *
+     * This may be due to either a user-initiated load request or due
+     * to background monitoring.
      */
     public virtual signal void conversation_appended(Conversation conversation,
                                                      Gee.Collection<Email> email) {
@@ -154,14 +190,17 @@ public class Geary.App.ConversationMonitor : BaseObject {
     }
 
     /**
-     * "conversation-trimmed" is fired when one or more Emails have been removed from the Folder,
-     * and therefore from the specified Conversation.  If the trimmed Email is the last usable
-     * Email in the Conversation, this signal will be followed by "conversation-removed".  However,
-     * it's possible for "conversation-removed" to fire without "conversation-trimmed" preceding
-     * it, in the case of all emails being removed from a Conversation at once.
+     * Fired when one or more email have been removed from a conversation.
      *
-     * There is (currently) no user-specified call to manually remove Email from Conversations.
-     * This is only called when monitoring is enabled.
+     * If the trimmed email is the last usable email in the
+     * Conversation, this signal will be followed by
+     * "conversation-removed".  However, it's possible for
+     * "conversation-removed" to fire without "conversation-trimmed"
+     * preceding it, in the case of all emails being removed from a
+     * conversation at once.
+     *
+     * This may be due to either a user-initiated load request or due
+     * to background monitoring.
      */
     public virtual signal void conversation_trimmed(Conversation conversation,
                                                     Gee.Collection<Email> email) {
@@ -171,12 +210,14 @@ public class Geary.App.ConversationMonitor : BaseObject {
     }
 
     /**
-     * "email-flags-changed" is fired when the flags of an email in a conversation have changed,
-     * as reported by the monitored folder.  The local copy of the Email is updated and this
+     * Fired when a conversation's email's flags have changed.
+     *
+     * The local copy of the email is first updated and then this
      * signal is fired.
      *
-     * Note that if the flags of an email not captured by the Conversations object change, no signal
-     * is fired.  To know of all changes to all flags, subscribe to the Geary.Folder's
+     * Note that if the flags of an email not captured by the
+     * Conversations object change, no signal is fired.  To know of
+     * all changes to all flags, subscribe to the base folder's
      * "email-flags-changed" signal.
      */
     public virtual signal void email_flags_changed(Conversation conversation,
@@ -189,7 +230,7 @@ public class Geary.App.ConversationMonitor : BaseObject {
     /**
      * Creates a conversation monitor for the given folder.
      *
-     * @param base_folder the Folder to monitor
+     * @param base_folder a Folder to monitor for conversations
      * @param open_flags See {@link Geary.Folder}
      * @param required_fields See {@link Geary.Folder}
      * @param min_window_count Minimum number of conversations that will be loaded
@@ -204,303 +245,318 @@ public class Geary.App.ConversationMonitor : BaseObject {
         this._min_window_count = min_window_count;
     }
 
-    public int get_conversation_count() {
-        return conversations.size;
-    }
-
-    public Gee.Collection<Conversation> get_conversations() {
-        return conversations.conversations;
-    }
-
-    public Geary.App.Conversation? get_conversation_for_email(Geary.EmailIdentifier email_id) {
-        return conversations.get_by_email_identifier(email_id);
-    }
-
+    /**
+     * Opens the base folder scans and starts monitoring conversations.
+     *
+     * This method will open the base folder, start a scan to load
+     * conversations from it, and starts monitoring the folder and
+     * account for messages being added or removed.
+     *
+     * The //cancellable// parameter will be used when opening the
+     * folder, but not subsequently when scanning for new messages. To
+     * cancel any such operations, simply close the monitor via {@link
+     * stop_monitoring_async}.
+     */
     public async bool start_monitoring_async(Cancellable? cancellable = null)
         throws Error {
-        if (is_monitoring)
+        if (this.is_monitoring)
             return false;
 
-        // set before yield to guard against reentrancy
-        is_monitoring = true;
-
-        cancellable_monitor = cancellable;
-
-        // Double check that the last run of the queue got stopped and that
-        // it's empty.
-        if (operation_queue.is_processing)
-            yield operation_queue.stop_processing_async(cancellable_monitor);
-        operation_queue.clear();
-
-        bool reseed_now = (
-            this.base_folder.get_open_state() != Geary.Folder.OpenState.CLOSED
-        );
-
-        // Add the necessary initial operations ahead of anything the folder
-        // might add as it opens.
-        operation_queue.add(new LocalLoadOperation(this));
-        // if already opened, go ahead and do a full load now from remote and local; otherwise,
-        // the reseed has to wait until the folder's remote is opened (handled in on_folder_opened)
-        if (reseed_now)
-            operation_queue.add(new ReseedOperation(this, "already opened"));
-        operation_queue.add(new FillWindowOperation(this, false));
+        // Set early yield to guard against reentrancy
+        this.is_monitoring = true;
+        this.operation_cancellable = new Cancellable();
 
         this.base_folder.email_appended.connect(on_folder_email_appended);
         this.base_folder.email_inserted.connect(on_folder_email_inserted);
+        this.base_folder.email_locally_complete.connect(on_folder_email_complete);
         this.base_folder.email_removed.connect(on_folder_email_removed);
         this.base_folder.opened.connect(on_folder_opened);
         this.base_folder.account.email_appended.connect(on_account_email_appended);
         this.base_folder.account.email_inserted.connect(on_account_email_inserted);
+        this.base_folder.account.email_locally_complete.connect(on_account_email_complete);
         this.base_folder.account.email_removed.connect(on_account_email_removed);
         this.base_folder.account.email_flags_changed.connect(on_account_email_flags_changed);
 
+        this.progress_monitor.start.connect(() => { debug("Monitor started"); });
+        this.progress_monitor.update.connect(() => { debug("Monitor progress"); });
+        this.progress_monitor.finish.connect(() => { debug("Monitor stopped"); });
+
+        this.queue = new ConversationOperationQueue(this.progress_monitor);
+        this.queue.operation_error.connect(on_operation_error);
+        this.queue.add(new FillWindowOperation(this));
+
         try {
             yield this.base_folder.open_async(open_flags, cancellable);
         } catch (Error err) {
-            is_monitoring = false;
-
-            this.base_folder.email_appended.disconnect(on_folder_email_appended);
-            this.base_folder.email_inserted.disconnect(on_folder_email_inserted);
-            this.base_folder.email_removed.disconnect(on_folder_email_removed);
-            this.base_folder.opened.disconnect(on_folder_opened);
-            this.base_folder.account.email_appended.disconnect(on_account_email_appended);
-            this.base_folder.account.email_inserted.disconnect(on_account_email_inserted);
-            this.base_folder.account.email_removed.disconnect(on_account_email_removed);
-            this.base_folder.account.email_flags_changed.disconnect(on_account_email_flags_changed);
-
+            try {
+                yield stop_monitoring_internal(false, null);
+            } catch (Error stop_error) {
+                debug("Error cleaning up after folder open error: %s", err.message);
+            }
             throw err;
         }
 
-        notify_monitoring_started();
-        reseed_notified = false;
-
-        // Process operations in the background.
-        operation_queue.run_process_async.begin();
+        // Now the folder is open, start the queue running
+        this.queue.run_process_async.begin();
 
         return true;
     }
 
     /**
-     * Halt monitoring of the Folder and, if specified, close it.  Note that the Cancellable
-     * supplied to start_monitoring_async() is used during monitoring but *not* for this method.
-     * If null is supplied as the Cancellable, no cancellable is used; pass the original Cancellable
-     * here to use that.
+     * Stops monitoring for new messages and closes the base folder.
+     *
+     * Returns a result code that is semantically identical to the
+     * result of {@link Geary.Folder.close_async}.
      *
-     * Returns a result code that is semantically identical to the result of
-     * {@link Geary.Folder.close_async}.
+     * The //cancellable// parameter will be used when waiting for
+     * internal monitor operations to complete, but will not prevent
+     * attempts to close the base folder.
      */
     public async bool stop_monitoring_async(Cancellable? cancellable) throws Error {
         if (!is_monitoring)
             return false;
 
-        yield operation_queue.stop_processing_async(cancellable);
-
-        // set now to prevent reentrancy during yield or signal
-        is_monitoring = false;
+        return yield stop_monitoring_internal(true, cancellable);
+    }
 
-        this.base_folder.email_appended.disconnect(on_folder_email_appended);
-        this.base_folder.email_inserted.disconnect(on_folder_email_inserted);
-        this.base_folder.email_removed.disconnect(on_folder_email_removed);
-        this.base_folder.opened.disconnect(on_folder_opened);
-        this.base_folder.account.email_appended.disconnect(on_account_email_appended);
-        this.base_folder.account.email_inserted.disconnect(on_account_email_inserted);
-        this.base_folder.account.email_removed.disconnect(on_account_email_removed);
-        this.base_folder.account.email_flags_changed.disconnect(on_account_email_flags_changed);
+    /**
+     * Returns the conversation containing the given email, if any.
+     */
+    public Conversation? get_by_email_identifier(Geary.EmailIdentifier email_id) {
+        return this.conversations.get_by_email_identifier(email_id);
+    }
 
-        bool closing = false;
-        Error? close_err = null;
-        try {
-            closing = yield this.base_folder.close_async(cancellable);
-        } catch (Error err) {
-            // throw, but only after cleaning up (which is to say, if close_async() fails,
-            // then the Folder is still treated as closed, which is the best that can be
-            // expected; it definitely shouldn't still be considered open).
-            debug("Unable to close monitored folder %s: %s",
-                  this.base_folder.to_string(), err.message);
-            close_err = err;
+    /** Ensures enough conversations are present, otherwise loads more. */
+    internal void check_window_count() {
+        if (this.is_monitoring && this.conversations.size < this.min_window_count) {
+            this.queue.add(new FillWindowOperation(this));
         }
-
-        notify_monitoring_stopped();
-
-        if (close_err != null)
-            throw close_err;
-
-        return closing;
     }
 
-    internal async void local_load_async() {
-        debug("ConversationMonitor seeding with local email for %s",
-              this.base_folder.to_string());
-        try {
-            yield load_by_id_async(null, min_window_count, Folder.ListFlags.LOCAL_ONLY, cancellable_monitor);
-        } catch (Error e) {
-            debug("Error loading local messages: %s", e.message);
+    /**
+     * Returns the list of folders that disqualify emails from conversations.
+     */
+    internal Gee.Collection<Geary.FolderPath> get_search_folder_blacklist() {
+        Geary.SpecialFolderType[] blacklisted_folder_types = {
+            Geary.SpecialFolderType.SPAM,
+            Geary.SpecialFolderType.TRASH,
+            Geary.SpecialFolderType.DRAFTS,
+        };
+
+        Gee.ArrayList<Geary.FolderPath?> blacklist = new Gee.ArrayList<Geary.FolderPath?>();
+        foreach (Geary.SpecialFolderType type in blacklisted_folder_types) {
+            try {
+                Geary.Folder? blacklist_folder = this.base_folder.account.get_special_folder(type);
+                if (blacklist_folder != null)
+                    blacklist.add(blacklist_folder.path);
+            } catch (Error e) {
+                debug("Error finding special folder %s on account %s: %s",
+                    type.to_string(), this.base_folder.account.to_string(), e.message);
+            }
         }
-        debug("ConversationMonitor seeded for %s",
-              this.base_folder.to_string());
-    }
 
-    internal async void append_emails_async(Gee.Collection<Geary.EmailIdentifier> appended_ids) {
-        debug("%d message(s) appended to %s, fetching to add to conversations...",
-              appended_ids.size, this.base_folder.to_string());
+        // Add "no folders" so we omit results that have been deleted permanently from the server.
+        blacklist.add(null);
 
-        yield load_by_sparse_id(appended_ids, Geary.Folder.ListFlags.NONE, null);
+        return blacklist;
     }
 
-    internal async void remove_emails_async(Folder source_folder,
-                                            Gee.Collection<EmailIdentifier> removed_ids) {
-        debug("%d messages(s) removed from %s, trimming/removing conversations...",
-            removed_ids.size, source_folder.to_string()
-        );
-
-        Gee.Collection<Conversation> removed;
-        Gee.MultiMap<Conversation, Email> trimmed;
-        conversations.remove_all_emails_by_identifier(
-            source_folder.path,
-            removed_ids,
-            out removed,
-            out trimmed
-        );
+    /**
+     * Returns the list of flags that disqualify emails from conversations.
+     */
+    internal Geary.EmailFlags get_search_flag_blacklist() {
+        Geary.EmailFlags flags = new Geary.EmailFlags();
+        flags.add(Geary.EmailFlags.DRAFT);
+        return flags;
+    }
 
-        // Check for conversations that have been evaporated as a
-        // result, update removed and trimmed collections to reflect
-        // any that evaporated
-        try {
-            Gee.Collection<Conversation> evaporated = yield check_conversations_in_base_folder(
-                trimmed.get_keys(), null
+    /** Returns the lowest id of any seen email in the base folder. */
+    internal async Geary.EmailIdentifier? get_lowest_email_id_async()
+        throws Error {
+        Geary.EmailIdentifier? earliest_id = null;
+        if (!this.conversations.is_empty) {
+            // XXX this is the only caller of
+            // Folder.find_boundaries_async in the whole code base,
+            // and the amount of DB work it's doing in the case of
+            // MinimalFolder's implementation is
+            // staggering. ConversationSet should be simply
+            // maintaining an ordered list of message ids that exist
+            // in the base folder and use that instead, which will be
+            // faster and remove a whole lot of pointless code
+            // overhead in Folder implementations.
+            yield this.base_folder.find_boundaries_async(
+                conversations.get_email_identifiers(),
+                out earliest_id,
+                null,
+                this.operation_cancellable
             );
-            removed.add_all(evaporated);
-            foreach (Conversation target in evaporated) {
-                trimmed.remove_all(target);
-            }
-        } catch (Error err) {
-            debug("Error checking conversation for messages in %s: %s",
-                  this.base_folder.path.to_string(), err.message);
         }
+        return earliest_id;
+    }
 
-        // Fire signals, clean up
-
-        foreach (Conversation conversation in trimmed.get_keys())
-            notify_conversation_trimmed(conversation, trimmed.get(conversation));
-
-        if (removed.size > 0)
-            notify_conversations_removed(removed);
+    /** Loads messages from the base folder by identifier range. */
+    internal async int load_by_id_async(EmailIdentifier? initial_id,
+                                        int count,
+                                        Folder.ListFlags flags = Folder.ListFlags.NONE)
+        throws Error {
+        notify_scan_started();
 
-        if (source_folder == this.base_folder) {
-            // For any still-existing conversations that we've trimmed messages
-            // from, do a search for any messages that should still be there due to
-            // full conversations.  This way, some removed messages are instead
-            // "demoted" to out-of-folder emails.  This is kind of inefficient, but
-            // it doesn't seem like there's a way around it.
-            Gee.HashSet<RFC822.MessageID> search_message_ids = new Gee.HashSet<RFC822.MessageID>();
-            foreach (Conversation conversation in trimmed.get_keys()) {
-                search_message_ids.add_all(conversation.get_message_ids());
-            }
-            yield expand_conversations_async(search_message_ids, new ProcessJobContext(false));
+        if (this.base_folder.get_open_state() != Folder.OpenState.REMOTE) {
+            flags |= Folder.ListFlags.LOCAL_ONLY;
         }
-    }
 
-    internal async void external_append_emails_async(Folder folder,
-                                                     Gee.Collection<EmailIdentifier> appended_ids) {
-        if (get_search_blacklist().contains(folder.path))
-            return;
+        int load_count = 0;
+        try {
+            Gee.Collection<Geary.Email>? email =
+                yield this.base_folder.list_email_by_id_async(
+                    initial_id, count, required_fields, flags,
+                    this.operation_cancellable
+                );
 
-        if (conversations.is_empty)
-            return;
+            if (email != null) {
+                load_count = email.size;
+            }
 
-        debug("%d out of folder message(s) appended to %s, fetching to add to conversations...", 
appended_ids.size,
-            folder.to_string());
+            yield process_email_async(email, new ProcessJobContext(true));
+        } catch (Error err) {
+            notify_scan_completed();
+            throw err;
+        }
 
-        yield external_load_by_sparse_id(folder, appended_ids, Geary.Folder.ListFlags.NONE, null);
+        return load_count;
     }
 
-    internal async void reseed_async(string why) {
-        Geary.EmailIdentifier? earliest_id = yield get_lowest_email_id_async(null);
-        try {
-            if (earliest_id != null) {
-                debug("ConversationMonitor (%s) reseeding starting from Email ID %s on opened %s", why,
-                    earliest_id.to_string(), this.base_folder.to_string());
-                yield load_by_id_async(earliest_id, int.MAX,
-                    Geary.Folder.ListFlags.OLDEST_TO_NEWEST | Geary.Folder.ListFlags.INCLUDING_ID,
-                    cancellable_monitor);
-            } else {
-                debug("ConversationMonitor (%s) reseeding latest %d emails on opened %s", why,
-                    min_window_count, this.base_folder.to_string());
-                yield load_by_id_async(null, min_window_count, Geary.Folder.ListFlags.NONE, 
cancellable_monitor);
-            }
-        } catch (Error e) {
-            debug("Reseed error: %s", e.message);
+    /** Loads messages from the base folder by identifier. */
+    internal async void load_by_sparse_id(Gee.Collection<EmailIdentifier> ids,
+                                          Folder.ListFlags flags = Folder.ListFlags.NONE)
+        throws Error {
+        notify_scan_started();
+
+        if (this.base_folder.get_open_state() != Folder.OpenState.REMOTE) {
+            flags |= Folder.ListFlags.LOCAL_ONLY;
         }
 
-        if (!reseed_notified) {
-            reseed_notified = true;
-            notify_seed_completed();
+        try {
+            yield process_email_async(
+                yield this.base_folder.list_email_by_sparse_id_async(
+                    ids, required_fields, flags, this.operation_cancellable
+                ),
+                new ProcessJobContext(true)
+            );
+        } catch (Error err) {
+            notify_scan_completed();
+            throw err;
         }
     }
 
     /**
-     * Attempts to load enough conversations to fill min_window_count.
+     * Loads messages from outside the monitor's base folder.
+     *
+     * Since this requires opening and closing the other folder, it is
+     * handled separately.
      */
-    internal async void fill_window_async(bool is_insert) {
-        if (!is_monitoring)
-            return;
+    internal async void external_load_by_sparse_id(Folder folder,
+                                                   Gee.Collection<EmailIdentifier> ids,
+                                                   Folder.ListFlags flags)
+        throws Error {
+        bool opened = false;
 
-        if (!is_insert && min_window_count <= conversations.size)
-            return;
+        Gee.List<Geary.Email>? emails = null;
+        try {
+            yield folder.open_async(
+                Geary.Folder.OpenFlags.NONE, this.operation_cancellable
+            );
+            opened = true;
 
-        int initial_message_count = conversations.get_email_count();
+            if (folder.get_open_state() != Folder.OpenState.REMOTE) {
+                flags |= Folder.ListFlags.LOCAL_ONLY;
+            }
 
-        // only do local-load if the Folder isn't completely opened, otherwise this operation
-        // will block other (more important) operations while it waits for the folder to
-        // remote-open
-        Folder.ListFlags flags;
-        switch (this.base_folder.get_open_state()) {
-            case Folder.OpenState.CLOSED:
-            case Folder.OpenState.LOCAL:
-                flags = Folder.ListFlags.LOCAL_ONLY;
-            break;
+            // First just get the bare minimum we need to determine if we even
+            // care about the messages.
+            emails = yield folder.list_email_by_sparse_id_async(
+                ids, Geary.Email.Field.REFERENCES, flags, this.operation_cancellable
+            );
+            if (emails != null) {
+                Gee.HashSet<Geary.EmailIdentifier> relevant_ids =
+                    new Gee.HashSet<Geary.EmailIdentifier>();
+                foreach (Geary.Email email in emails) {
+                    Gee.Set<RFC822.MessageID>? ancestors = email.get_ancestors();
+                    if (ancestors != null &&
+                        Geary.traverse<RFC822.MessageID>(ancestors).any(id => 
conversations.has_message_id(id)))
+                        relevant_ids.add(email.id);
+                }
 
-            case Folder.OpenState.REMOTE:
-                flags = Folder.ListFlags.NONE;
-            break;
+                // List the relevant messages again with the full set of fields, to
+                // make sure when we load them from the database we have all the
+                // data we need.
+                if (!relevant_ids.is_empty) {
+                    emails = yield folder.list_email_by_sparse_id_async(
+                        relevant_ids, required_fields, flags, this.operation_cancellable
+                    );
+                } else {
+                    emails = null;
+                }
+            }
 
-            default:
-                assert_not_reached();
+            yield folder.close_async(null);
+            opened = false;
+        } catch (Error err) {
+            debug("Error loading external emails: %s", err.message);
+            if (opened) {
+                // Always try to close the opened folder
+                try {
+                    yield folder.close_async(null);
+                } catch (Error close_err) {
+                    debug("Error closing folder %s: %s",
+                          folder.to_string(), close_err.message);
+                }
+            }
+            throw err;
         }
 
-        Geary.EmailIdentifier? low_id = yield get_lowest_email_id_async(null);
-        if (low_id != null && !is_insert) {
-            // Load at least as many messages as remianing conversations.
-            int num_to_load = min_window_count - conversations.size;
-            if (num_to_load < WINDOW_FILL_MESSAGE_COUNT)
-                num_to_load = WINDOW_FILL_MESSAGE_COUNT;
+        if (emails != null && !emails.is_empty) {
+            debug("Fetched %d relevant emails locally", emails.size);
+            yield process_email_async(emails, new ProcessJobContext(false));
+        }
+    }
 
-            try {
-                yield load_by_id_async(low_id, num_to_load, flags, cancellable_monitor);
-            } catch(Error e) {
-                debug("Error filling conversation window: %s", e.message);
-            }
-        } else {
-            // No existing messages or an insert invalidated our existing list,
-            // need to start from scratch.
-            try {
-                yield load_by_id_async(null, min_window_count, flags, cancellable_monitor);
-            } catch(Error e) {
-                debug("Error filling conversation window: %s", e.message);
+    /**
+     * Check conversations to see if they still exist in the base folder.
+     *
+     * Returns the set of emails that were removed due to not being in
+     * the base folder.
+     */
+    internal async Gee.Collection<Conversation>
+        check_conversations_in_base_folder(Gee.Collection<Conversation> conversations)
+        throws Error {
+        Gee.ArrayList<Conversation> evaporated = new Gee.ArrayList<Conversation>();
+        foreach (Conversation conversation in conversations) {
+            int count = yield conversation.get_count_in_folder_async(
+                this.base_folder.account,
+                this.base_folder.path,
+                this.operation_cancellable
+            );
+            if (count == 0) {
+                debug("Evaporating conversation %s because it has no emails in %s",
+                      conversation.to_string(), this.base_folder.to_string());
+                this.conversations.remove_conversation(conversation);
+                evaporated.add(conversation);
             }
         }
 
-        // Run again to make sure we're full unless we ran out of messages.
-        if (conversations.get_email_count() != initial_message_count)
-            operation_queue.add(new FillWindowOperation(this, is_insert));
+        return evaporated;
     }
 
-    protected virtual void notify_monitoring_started() {
-        monitoring_started();
-    }
+    internal void notify_emails_removed(Gee.Collection<Conversation> removed,
+                                        Gee.MultiMap<Conversation, Email> trimmed) {
+        foreach (Conversation conversation in trimmed.get_keys()) {
+            notify_conversation_trimmed(conversation, trimmed.get(conversation));
+        }
 
-    protected virtual void notify_monitoring_stopped() {
-        monitoring_stopped();
+        if (removed.size > 0) {
+            notify_conversations_removed(removed);
+        }
     }
 
     protected virtual void notify_scan_started() {
@@ -515,10 +571,6 @@ public class Geary.App.ConversationMonitor : BaseObject {
         scan_completed();
     }
 
-    protected virtual void notify_seed_completed() {
-        seed_completed();
-    }
-
     protected virtual void notify_conversations_added(Gee.Collection<Conversation> conversations) {
         conversations_added(conversations);
     }
@@ -542,119 +594,59 @@ public class Geary.App.ConversationMonitor : BaseObject {
         email_flags_changed(conversation, email);
     }
 
-    /**
-     * See Geary.Folder.list_email_by_id_async() for details of how these parameters operate.  Instead
-     * of returning emails, this method will load the Conversations object with them sorted into
-     * Conversation objects.
-     */
-    private async void load_by_id_async(Geary.EmailIdentifier? initial_id, int count,
-        Geary.Folder.ListFlags flags, Cancellable? cancellable) throws Error {
-        notify_scan_started();
-        try {
-            yield process_email_async(
-                yield this.base_folder.list_email_by_id_async(
-                    initial_id, count, required_fields, flags, cancellable
-                ),
-                new ProcessJobContext(true)
-            );
-        } catch (Error err) {
-            list_error(err);
-            throw err;
-        }
-    }
+    private async bool stop_monitoring_internal(bool close_folder,
+                                                Cancellable? cancellable)
+        throws Error {
+        // set now to prevent reentrancy during yield or signal
+        is_monitoring = false;
 
-    private async void load_by_sparse_id(Gee.Collection<Geary.EmailIdentifier> ids,
-        Geary.Folder.ListFlags flags, Cancellable? cancellable) {
-        notify_scan_started();
+        this.base_folder.email_appended.disconnect(on_folder_email_appended);
+        this.base_folder.email_inserted.disconnect(on_folder_email_inserted);
+        this.base_folder.email_locally_complete.disconnect(on_folder_email_complete);
+        this.base_folder.email_removed.disconnect(on_folder_email_removed);
+        this.base_folder.opened.disconnect(on_folder_opened);
+        this.base_folder.account.email_appended.disconnect(on_account_email_appended);
+        this.base_folder.account.email_inserted.disconnect(on_account_email_inserted);
+        this.base_folder.account.email_locally_complete.disconnect(on_account_email_complete);
+        this.base_folder.account.email_removed.disconnect(on_account_email_removed);
+        this.base_folder.account.email_flags_changed.disconnect(on_account_email_flags_changed);
+
+        // Cancel outstanding ops so they don't block the queue closing
+        this.operation_cancellable.cancel();
 
+        // Keep track of errors stopping the queue but continue, since
+        // the cleanup below needs to occur.
+        Error? close_err = null;
         try {
-            yield process_email_async(
-                yield this.base_folder.list_email_by_sparse_id_async(
-                    ids, required_fields, flags, cancellable
-                ),
-                new ProcessJobContext(true)
-            );
+            yield this.queue.stop_processing_async(cancellable);
         } catch (Error err) {
-            list_error(err);
+            close_err = err;
         }
-    }
-
-    /**
-     * Loads messages from outside the monitor's base folder.
-     *
-     * Since this requires opening and closing the other folder, it is
-     * handled separately.
-     */
-    private async void external_load_by_sparse_id(Folder folder,
-                                                  Gee.Collection<EmailIdentifier> ids,
-                                                  Folder.ListFlags flags,
-                                                  Cancellable? cancellable) {
-        bool opened = false;
-        try {
-            yield folder.open_async(Geary.Folder.OpenFlags.NONE, cancellable);
-            opened = true;
-
-            debug("Listing %d external emails", ids.size);
+        this.queue = null;
 
-            // First just get the bare minimum we need to determine if we even
-            // care about the messages.
-            Gee.List<Geary.Email>? emails = yield folder.list_email_by_sparse_id_async(ids,
-                Geary.Email.Field.REFERENCES, flags, cancellable);
-
-            debug("List found %d emails", (emails == null ? 0 : emails.size));
-
-            Gee.HashSet<Geary.EmailIdentifier> relevant_ids = new Gee.HashSet<Geary.EmailIdentifier>();
-            foreach (Geary.Email email in emails) {
-                Gee.Set<RFC822.MessageID>? ancestors = email.get_ancestors();
-                if (ancestors != null &&
-                    Geary.traverse<RFC822.MessageID>(ancestors).any(id => conversations.has_message_id(id)))
-                    relevant_ids.add(email.id);
-            }
-
-            debug("%d external emails are relevant to current conversations", relevant_ids.size);
-
-            // List the relevant messages again with the full set of fields, to
-            // make sure when we load them from the database we have all the
-            // data we need.
-            yield folder.list_email_by_sparse_id_async(relevant_ids, required_fields, flags, cancellable);
-            yield folder.close_async(cancellable);
-            opened = false;
-
-            Gee.ArrayList<Geary.Email> search_emails = new Gee.ArrayList<Geary.Email>();
-            foreach (Geary.EmailIdentifier id in relevant_ids) {
-                // TODO: parallelize this.
-                try {
-                    Geary.Email email = yield folder.account.local_fetch_email_async(id,
-                        required_fields, cancellable);
-                    search_emails.add(email);
-                } catch (Error e) {
-                    debug("Error fetching out of folder message: %s", e.message);
-                }
-            }
-
-            debug("Fetched %d relevant emails locally", search_emails.size);
+        this.operation_cancellable = null;
 
-            yield process_email_async(search_emails, new ProcessJobContext(false));
-        } catch (Error e) {
-            debug("Error loading external emails: %s", e.message);
-            if (opened) {
-                try {
-                    yield folder.close_async(cancellable);
-                } catch (Error e) {
-                    debug("Error closing folder %s: %s", folder.to_string(), e.message);
-                }
+        bool closing = false;
+        if (close_folder) {
+            try {
+                // Always close the folder to prevent open leaks
+                closing = yield this.base_folder.close_async(null);
+            } catch (Error err) {
+                debug("Unable to close monitored folder %s: %s",
+                      this.base_folder.to_string(), err.message);
+                close_err = err;
             }
         }
-    }
 
-    private void list_error(Error err) {
-        debug("Error while assembling conversations in %s: %s",
-              this.base_folder.to_string(), err.message);
-        notify_scan_error(err);
-        notify_scan_completed();
+        if (close_err != null)
+            throw close_err;
+
+        return closing;
     }
 
-    private async void process_email_async(Gee.Collection<Geary.Email>? emails, ProcessJobContext job) {
+    private async void process_email_async(Gee.Collection<Geary.Email>? emails,
+                                           ProcessJobContext job)
+        throws Error {
         if (emails == null || emails.size == 0) {
             yield process_email_complete_async(job);
             return;
@@ -687,40 +679,49 @@ public class Geary.App.ConversationMonitor : BaseObject {
                       this.base_folder.to_string(), emails.size);
     }
 
-    private Gee.Collection<Geary.FolderPath> get_search_blacklist() {
-        Geary.SpecialFolderType[] blacklisted_folder_types = {
-            Geary.SpecialFolderType.SPAM,
-            Geary.SpecialFolderType.TRASH,
-            Geary.SpecialFolderType.DRAFTS,
-        };
+    private async void process_email_complete_async(ProcessJobContext job) {
+        Gee.Collection<Conversation>? added = null;
+        Gee.MultiMap<Conversation, Geary.Email>? appended = null;
+        Gee.Collection<Conversation>? removed_due_to_merge = null;
+        try {
+            // Get known paths for all emails
+            Gee.MultiMap<Geary.EmailIdentifier, Geary.FolderPath>? email_paths =
+                yield this.base_folder.account.get_containing_folders_async(
+                    job.emails.keys, null
+                );
 
-        Gee.ArrayList<Geary.FolderPath?> blacklist = new Gee.ArrayList<Geary.FolderPath?>();
-        foreach (Geary.SpecialFolderType type in blacklisted_folder_types) {
-            try {
-                Geary.Folder? blacklist_folder = this.base_folder.account.get_special_folder(type);
-                if (blacklist_folder != null)
-                    blacklist.add(blacklist_folder.path);
-            } catch (Error e) {
-                debug("Error finding special folder %s on account %s: %s",
-                    type.to_string(), this.base_folder.account.to_string(), e.message);
+            // Add them to the conversation set
+            if (email_paths != null) {
+                this.conversations.add_all_emails(
+                    job.emails.values, email_paths, this.base_folder,
+                    out added, out appended, out removed_due_to_merge
+                );
             }
+        } catch (Error err) {
+            debug("Unable to add emails to conversation: %s", err.message);
+
+            // fall-through
         }
 
-        // Add "no folders" so we omit results that have been deleted permanently from the server.
-        blacklist.add(null);
+        if (removed_due_to_merge != null && removed_due_to_merge.size > 0) {
+            notify_conversations_removed(removed_due_to_merge);
+        }
 
-        return blacklist;
-    }
+        if (added != null && added.size > 0)
+            notify_conversations_added(added);
 
-    private Geary.EmailFlags get_search_flag_blacklist() {
-        Geary.EmailFlags flags = new Geary.EmailFlags();
-        flags.add(Geary.EmailFlags.DRAFT);
+        if (appended != null) {
+            foreach (Conversation conversation in appended.get_keys())
+                notify_conversation_appended(conversation, appended.get(conversation));
+        }
 
-        return flags;
+        if (job.inside_scan)
+            notify_scan_completed();
     }
 
     private async void expand_conversations_async(Gee.Set<RFC822.MessageID> needed_message_ids,
-        ProcessJobContext job) {
+                                                   ProcessJobContext job)
+        throws Error {
         if (needed_message_ids.size == 0) {
             yield process_email_complete_async(job);
             return;
@@ -730,7 +731,7 @@ public class Geary.App.ConversationMonitor : BaseObject {
                       "[%s] ConversationMonitor::expand_conversations: %d email ids",
                       this.base_folder.to_string(), needed_message_ids.size);
 
-        Gee.Collection<Geary.FolderPath> folder_blacklist = get_search_blacklist();
+        Gee.Collection<Geary.FolderPath> folder_blacklist = get_search_folder_blacklist();
         Geary.EmailFlags flag_blacklist = get_search_flag_blacklist();
 
         // execute all the local search operations at once
@@ -740,14 +741,7 @@ public class Geary.App.ConversationMonitor : BaseObject {
                 folder_blacklist, flag_blacklist));
         }
 
-        try {
-            yield batch.execute_all_async();
-        } catch (Error err) {
-            debug("Unable to search local mail for conversations: %s", err.message);
-
-            yield process_email_complete_async(job);
-            return;
-        }
+        yield batch.execute_all_async();
 
         // collect their results into a single collection of addt'l emails
         Gee.HashMap<Geary.EmailIdentifier, Geary.Email> needed_messages = new Gee.HashMap<
@@ -770,123 +764,64 @@ public class Geary.App.ConversationMonitor : BaseObject {
                       this.base_folder.to_string(), needed_message_ids.size, needed_messages.size);
     }
 
-    private async void process_email_complete_async(ProcessJobContext job) {
-        Gee.Collection<Geary.App.Conversation>? added = null;
-        Gee.MultiMap<Geary.App.Conversation, Geary.Email>? appended = null;
-        Gee.Collection<Conversation>? removed_due_to_merge = null;
-        try {
-            // Get known paths for all emails
-            Gee.MultiMap<Geary.EmailIdentifier, Geary.FolderPath>? email_paths =
-                yield this.base_folder.account.get_containing_folders_async(
-                    job.emails.keys, null
-                );
-
-            // Add them to the conversation set
-            if (email_paths != null) {
-                this.conversations.add_all_emails(
-                    job.emails.values, email_paths, this.base_folder,
-                    out added, out appended, out removed_due_to_merge
-                );
-            }
-        } catch (Error err) {
-            debug("Unable to add emails to conversation: %s", err.message);
-
-            // fall-through
-        }
-
-        if (removed_due_to_merge != null && removed_due_to_merge.size > 0) {
-            notify_conversations_removed(removed_due_to_merge);
-        }
-
-        if (added != null && added.size > 0)
-            notify_conversations_added(added);
-
-        if (appended != null) {
-            foreach (Geary.App.Conversation conversation in appended.get_keys())
-                notify_conversation_appended(conversation, appended.get(conversation));
-        }
-
-        if (job.inside_scan)
-            notify_scan_completed();
-    }
-
-    private async Geary.EmailIdentifier? get_lowest_email_id_async(Cancellable? cancellable) {
-        Geary.EmailIdentifier? earliest_id = null;
-        try {
-            yield this.base_folder.find_boundaries_async(conversations.get_email_identifiers(),
-                out earliest_id, null, cancellable);
-        } catch (Error e) {
-            debug("Error finding earliest email identifier: %s", e.message);
-        }
-
-        return earliest_id;
-    }
-
-    /**
-     * Check conversations to see if they still exist in the base folder.
-     *
-     * Returns the set of emails that were removed due to not being in
-     * the base folder.
-     */
-    private async Gee.Collection<Conversation>
-        check_conversations_in_base_folder(Gee.Collection<Conversation> conversations,
-                                           Cancellable? cancellable)
-        throws Error {
-        Gee.ArrayList<Conversation> evaporated = new Gee.ArrayList<Conversation>();
-        foreach (Geary.App.Conversation conversation in conversations) {
-            int count = yield conversation.get_count_in_folder_async(
-                this.base_folder.account, this.base_folder.path, cancellable
-            );
-            if (count == 0) {
-                debug("Evaporating conversation %s because it has no emails in %s",
-                      conversation.to_string(), this.base_folder.to_string());
-                this.conversations.remove_conversation(conversation);
-                evaporated.add(conversation);
-            }
-        }
-
-        return evaporated;
-    }
-
     private void on_folder_opened(Geary.Folder.OpenState state, int count) {
-        // once remote is open, reseed with messages from the earliest ID to the latest
         if (state == Geary.Folder.OpenState.REMOTE)
-            operation_queue.add(new ReseedOperation(this, state.to_string()));
+            this.queue.add(new ReseedOperation(this));
     }
 
-    private void on_folder_email_appended(Gee.Collection<EmailIdentifier> appended_ids) {
-        operation_queue.add(new AppendOperation(this, appended_ids));
+    private void on_folder_email_appended(Gee.Collection<EmailIdentifier> appended) {
+        this.queue.add(new AppendOperation(this, appended));
     }
 
-    private void on_folder_email_inserted(Gee.Collection<EmailIdentifier> inserted_ids) {
-        operation_queue.add(new FillWindowOperation(this, true));
+    private void on_folder_email_complete(Gee.Collection<EmailIdentifier> completed) {
+        // InsertOperation will add the emails only if they are after
+        // the earliest, which is what we want here.
+        this.queue.add(new InsertOperation(this, completed));
     }
 
-    private void on_folder_email_removed(Gee.Collection<EmailIdentifier> removed_ids) {
-        operation_queue.add(new RemoveOperation(this, this.base_folder, removed_ids));
-        operation_queue.add(new FillWindowOperation(this, false));
+    private void on_folder_email_inserted(Gee.Collection<EmailIdentifier> inserted) {
+        this.queue.add(new InsertOperation(this, inserted));
+    }
+
+    private void on_folder_email_removed(Gee.Collection<EmailIdentifier> removed) {
+        this.queue.add(new RemoveOperation(this, this.base_folder, removed));
     }
 
     private void on_account_email_appended(Folder folder,
                                            Gee.Collection<EmailIdentifier> added) {
-        if (folder != this.base_folder) {
-            operation_queue.add(
-                new ExternalAppendOperation(this, folder, added)
-            );
+        if (folder != this.base_folder &&
+            !get_search_folder_blacklist().contains(folder.path)) {
+            this.queue.add(new ExternalAppendOperation(this, folder, added));
+        }
+    }
+
+    private void on_account_email_complete(Folder folder,
+                                           Gee.Collection<EmailIdentifier> inserted) {
+        // ExternalAppendOperation will check to determine if the
+        // email is relevant for some existing conversation before
+        // adding it, which is what we want here.
+        if (folder != this.base_folder &&
+            !get_search_folder_blacklist().contains(folder.path)) {
+            this.queue.add(new ExternalAppendOperation(this, folder, inserted));
         }
     }
 
     private void on_account_email_inserted(Folder folder,
-                                           Gee.Collection<EmailIdentifier> added) {
-        if (folder != this.base_folder) {
-            operation_queue.add(new FillWindowOperation(this, false));
+                                           Gee.Collection<EmailIdentifier> inserted) {
+        // ExternalAppendOperation will check to determine if the
+        // email is relevant for some existing conversation before
+        // adding it, which is what we want here.
+        if (folder != this.base_folder &&
+            !get_search_folder_blacklist().contains(folder.path)) {
+            this.queue.add(new ExternalAppendOperation(this, folder, inserted));
         }
     }
 
     private void on_account_email_removed(Folder folder,
                                           Gee.Collection<EmailIdentifier> removed) {
-        if (folder != this.base_folder) {
-            operation_queue.add(new RemoveOperation(this, folder, removed));
+        if (folder != this.base_folder &&
+            !get_search_folder_blacklist().contains(folder.path)) {
+            this.queue.add(new RemoveOperation(this, folder, removed));
         }
     }
 
@@ -906,4 +841,9 @@ public class Geary.App.ConversationMonitor : BaseObject {
         }
     }
 
+    private void on_operation_error(ConversationOperation op, Error err) {
+        debug("Error executing %s: %s", op.get_type().name(), err.message);
+        notify_scan_error(err);
+    }
+
 }
diff --git a/src/engine/app/conversation-monitor/app-append-operation.vala 
b/src/engine/app/conversation-monitor/app-append-operation.vala
index 7c97adf..8507da2 100644
--- a/src/engine/app/conversation-monitor/app-append-operation.vala
+++ b/src/engine/app/conversation-monitor/app-append-operation.vala
@@ -1,18 +1,27 @@
-/* Copyright 2016 Software Freedom Conservancy Inc.
+/*
+ * Copyright 2016 Software Freedom Conservancy Inc.
  *
  * This software is licensed under the GNU Lesser General Public License
  * (version 2.1 or later).  See the COPYING file in this distribution.
  */
 
 private class Geary.App.AppendOperation : ConversationOperation {
+
+
     private Gee.Collection<Geary.EmailIdentifier> appended_ids;
-    
-    public AppendOperation(ConversationMonitor monitor, Gee.Collection<Geary.EmailIdentifier> appended_ids) {
+
+
+    public AppendOperation(ConversationMonitor monitor,
+                           Gee.Collection<Geary.EmailIdentifier> appended_ids) {
         base(monitor);
         this.appended_ids = appended_ids;
     }
-    
-    public override async void execute_async() {
-        yield monitor.append_emails_async(appended_ids);
+
+    public override async void execute_async() throws Error {
+        debug("%d message(s) appended to %s, fetching to add to conversations...",
+              this.appended_ids.size, this.monitor.base_folder.to_string());
+
+        yield this.monitor.load_by_sparse_id(this.appended_ids);
     }
+
 }
diff --git a/src/engine/app/conversation-monitor/app-conversation-operation-queue.vala 
b/src/engine/app/conversation-monitor/app-conversation-operation-queue.vala
index 76065cf..b250131 100644
--- a/src/engine/app/conversation-monitor/app-conversation-operation-queue.vala
+++ b/src/engine/app/conversation-monitor/app-conversation-operation-queue.vala
@@ -1,4 +1,5 @@
-/* Copyright 2016 Software Freedom Conservancy Inc.
+/*
+ * Copyright 2016 Software Freedom Conservancy Inc.
  *
  * This software is licensed under the GNU Lesser General Public License
  * (version 2.1 or later).  See the COPYING file in this distribution.
@@ -6,55 +7,43 @@
 
 private class Geary.App.ConversationOperationQueue : BaseObject {
     public bool is_processing { get; private set; default = false; }
-    public Geary.SimpleProgressMonitor progress_monitor { get; private set; default = 
-        new Geary.SimpleProgressMonitor(Geary.ProgressType.ACTIVITY); }
-    
+
+    /** Tracks progress running operations in this queue. */
+    public Geary.ProgressMonitor progress_monitor { get; private set; }
+
     private Geary.Nonblocking.Queue<ConversationOperation> mailbox
         = new Geary.Nonblocking.Queue<ConversationOperation>.fifo();
     private Geary.Nonblocking.Spinlock processing_done_spinlock
         = new Geary.Nonblocking.Spinlock();
-    
+
+    /** Fired when an error occurs executing an operation. */
+    public signal void operation_error(ConversationOperation op, Error err);
+
+    public ConversationOperationQueue(ProgressMonitor progress) {
+        this.progress_monitor = progress;
+    }
+
     public void clear() {
         mailbox.clear();
     }
-    
+
     public void add(ConversationOperation op) {
-        // There should only ever be one FillWindowOperation at a time.
-        FillWindowOperation? fill_op = op as FillWindowOperation;
-        if (fill_op != null) {
-            Gee.Collection<ConversationOperation> removed
-                = mailbox.revoke_matching(o => o is FillWindowOperation);
-            
-            // If there were any "insert" fill window ops, preserve that flag,
-            // as otherwise we might miss some data.
-            if (!fill_op.is_insert) {
-                foreach (ConversationOperation removed_op in removed) {
-                    FillWindowOperation? removed_fill = removed_op as FillWindowOperation;
-                    assert(removed_fill != null);
-                    
-                    if (removed_fill.is_insert) {
-                        fill_op.is_insert = true;
-                        break;
-                    }
-                }
-            }
+        Type op_type = op.get_type();
+        if (op.allow_duplicates ||
+            !this.mailbox.get_all().any_match(other => other.get_type() == op_type)) {
+            mailbox.send(op);
         }
-        
-        mailbox.send(op);
     }
-    
-    public async void stop_processing_async(Cancellable? cancellable) {
-        clear();
-        add(new TerminateOperation());
-        
-        try {
+
+    public async void stop_processing_async(Cancellable? cancellable)
+        throws Error {
+        if (this.is_processing) {
+            clear();
+            add(new TerminateOperation());
             yield processing_done_spinlock.wait_async(cancellable);
-        } catch (Error e) {
-            debug("Error waiting for conversation operation queue to finish processing: %s",
-                e.message);
         }
     }
-    
+
     public async void run_process_async() {
         is_processing = true;
         
@@ -71,9 +60,13 @@ private class Geary.App.ConversationOperationQueue : BaseObject {
             
             if (!progress_monitor.is_in_progress)
                 progress_monitor.notify_start();
-            
-            yield op.execute_async();
-            
+
+            try {
+                yield op.execute_async();
+            } catch (Error err) {
+                operation_error(op, err);
+            }
+
             if (mailbox.size == 0)
                 progress_monitor.notify_finish();
         }
diff --git a/src/engine/app/conversation-monitor/app-conversation-operation.vala 
b/src/engine/app/conversation-monitor/app-conversation-operation.vala
index 29f1c1d..fd57b5e 100644
--- a/src/engine/app/conversation-monitor/app-conversation-operation.vala
+++ b/src/engine/app/conversation-monitor/app-conversation-operation.vala
@@ -1,15 +1,34 @@
-/* Copyright 2016 Software Freedom Conservancy Inc.
+/*
+ * Copyright 2016 Software Freedom Conservancy Inc.
+ * Copyright 2018 Michael Gratton <mike vee net>
  *
  * This software is licensed under the GNU Lesser General Public License
  * (version 2.1 or later).  See the COPYING file in this distribution.
  */
 
-private abstract class Geary.App.ConversationOperation : BaseObject {
+/**
+ * An internal operation used to keep conversations up to date.
+ *
+ * Classes implementing this interface are used by {@link
+ * ConversationMonitor} to asynchronously keep conversations up to
+ * date as messages are added to, updated, and removed from folders.
+ */
+internal abstract class Geary.App.ConversationOperation : BaseObject {
+
+
+    /** Determines if multiple instances of this operation can be queued. */
+    public bool allow_duplicates { get; private set; }
+
+    /** The monitor this operation will be applied to. */
     protected weak ConversationMonitor? monitor = null;
-    
-    public ConversationOperation(ConversationMonitor? monitor) {
+
+
+    public ConversationOperation(ConversationMonitor? monitor,
+                                 bool allow_duplicates = false) {
         this.monitor = monitor;
+        this.allow_duplicates = allow_duplicates;
     }
-    
-    public abstract async void execute_async();
+
+    public abstract async void execute_async() throws Error;
+
 }
diff --git a/src/engine/app/conversation-monitor/app-conversation-set.vala 
b/src/engine/app/conversation-monitor/app-conversation-set.vala
index b6bd9ef..d23a8bd 100644
--- a/src/engine/app/conversation-monitor/app-conversation-set.vala
+++ b/src/engine/app/conversation-monitor/app-conversation-set.vala
@@ -18,10 +18,11 @@ private class Geary.App.ConversationSet : BaseObject {
     public bool is_empty { get { return _conversations.is_empty; } }
 
     /** Returns a read-only view of conversations in the set.  */
-    public Gee.Collection<Conversation> conversations {
+    public Gee.Set<Conversation> read_only_view {
         owned get { return _conversations.read_only_view; }
     }
 
+
     private Gee.Set<Conversation> _conversations = new Gee.HashSet<Conversation>();
 
     // Maps email ids to conversations.
diff --git a/src/engine/app/conversation-monitor/app-external-append-operation.vala 
b/src/engine/app/conversation-monitor/app-external-append-operation.vala
index 77d4d69..ee9acaa 100644
--- a/src/engine/app/conversation-monitor/app-external-append-operation.vala
+++ b/src/engine/app/conversation-monitor/app-external-append-operation.vala
@@ -1,21 +1,34 @@
-/* Copyright 2016 Software Freedom Conservancy Inc.
+/*
+ * Copyright 2016 Software Freedom Conservancy Inc.
  *
  * This software is licensed under the GNU Lesser General Public License
  * (version 2.1 or later).  See the COPYING file in this distribution.
  */
 
 private class Geary.App.ExternalAppendOperation : ConversationOperation {
+
     private Geary.Folder folder;
     private Gee.Collection<Geary.EmailIdentifier> appended_ids;
-    
-    public ExternalAppendOperation(ConversationMonitor monitor, Geary.Folder folder,
-        Gee.Collection<Geary.EmailIdentifier> appended_ids) {
+
+    public ExternalAppendOperation(ConversationMonitor monitor,
+                                   Geary.Folder folder,
+                                   Gee.Collection<Geary.EmailIdentifier> appended_ids) {
         base(monitor);
         this.folder = folder;
         this.appended_ids = appended_ids;
     }
-    
-    public override async void execute_async() {
-        yield monitor.external_append_emails_async(folder, appended_ids);
+
+    public override async void execute_async() throws Error {
+        if (!this.monitor.get_search_folder_blacklist().contains(folder.path) &&
+            !this.monitor.conversations.is_empty) {
+            debug("%d out of folder message(s) appended to %s, fetching to add to conversations...",
+                  this.appended_ids.size,
+                  this.folder.to_string());
+
+            yield this.monitor.external_load_by_sparse_id(
+                this.folder, this.appended_ids, Geary.Folder.ListFlags.NONE
+            );
+        }
     }
+
 }
diff --git a/src/engine/app/conversation-monitor/app-fill-window-operation.vala 
b/src/engine/app/conversation-monitor/app-fill-window-operation.vala
index 9e099c2..6927c4e 100644
--- a/src/engine/app/conversation-monitor/app-fill-window-operation.vala
+++ b/src/engine/app/conversation-monitor/app-fill-window-operation.vala
@@ -1,18 +1,58 @@
-/* Copyright 2016 Software Freedom Conservancy Inc.
+/*
+ * Copyright 2016 Software Freedom Conservancy Inc.
+ * Copyright 2018 Michael Gratton <mike vee net>
  *
  * This software is licensed under the GNU Lesser General Public License
  * (version 2.1 or later).  See the COPYING file in this distribution.
  */
 
 private class Geary.App.FillWindowOperation : ConversationOperation {
-    public bool is_insert { get; internal set; }
-    
-    public FillWindowOperation(ConversationMonitor monitor, bool is_insert) {
-        base(monitor);
-        this.is_insert = is_insert;
+
+
+    // Maximum and minimum number of messages to load in one fill
+    // operation. The maximum exists to retain some degree of
+    // responsiveness when loading conversations, given we must load
+    // the conversation closure for each message loaded here: Loading
+    // a single email might cause a conversation with tens of messages
+    // to also have to be pulled in, so the max provides some kind up
+    // upper bound to mitigate huge loads and start delivering
+    // conversations sooner rather than later. The minimum ensures
+    // that enough new messages are found in one operation to justify
+    // the expense.
+    private const int MAX_FILL_COUNT = 20;
+    private const int MIN_FILL_COUNT = 5;
+
+
+    public FillWindowOperation(ConversationMonitor monitor) {
+        base(monitor, false);
     }
-    
-    public override async void execute_async() {
-        yield monitor.fill_window_async(is_insert);
+
+    public override async void execute_async() throws Error {
+        int num_to_load = (int) (
+            (this.monitor.min_window_count - this.monitor.conversations.size)
+        );
+        if (num_to_load < MIN_FILL_COUNT) {
+            num_to_load = MIN_FILL_COUNT;
+        } else if (num_to_load > MAX_FILL_COUNT) {
+            num_to_load = MAX_FILL_COUNT;
+        }
+
+        debug(
+            "Filling %d messages in %s...",
+            num_to_load, this.monitor.base_folder.to_string()
+        );
+
+        EmailIdentifier? earliest_id =
+            yield this.monitor.get_lowest_email_id_async();
+        int loaded = yield this.monitor.load_by_id_async(
+            earliest_id, num_to_load
+        );
+
+        // Check to see if we need any more, but only if we actually
+        // loaded some, so we don't keep loop loading when we have
+        // already loaded all in the folder.
+        if (loaded > 0) {
+            this.monitor.check_window_count();
+        }
     }
 }
diff --git a/src/engine/app/conversation-monitor/app-insert-operation.vala 
b/src/engine/app/conversation-monitor/app-insert-operation.vala
new file mode 100644
index 0000000..9531de5
--- /dev/null
+++ b/src/engine/app/conversation-monitor/app-insert-operation.vala
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2016 Software Freedom Conservancy Inc.
+ *
+ * This software is licensed under the GNU Lesser General Public License
+ * (version 2.1 or later).  See the COPYING file in this distribution.
+ */
+
+/**
+ * Handles an insertion of messages from a monitor's base folder.
+ */
+private class Geary.App.InsertOperation : ConversationOperation {
+
+
+    private Gee.Collection<EmailIdentifier> inserted_ids;
+
+    public InsertOperation(ConversationMonitor monitor,
+                           Gee.Collection<EmailIdentifier> inserted_ids) {
+        base(monitor, false);
+        this.inserted_ids = inserted_ids;
+    }
+
+    public override async void execute_async() throws Error {
+        Geary.EmailIdentifier? lowest = yield this.monitor.get_lowest_email_id_async();
+        Gee.Collection<EmailIdentifier>? to_insert = null;
+
+        if (lowest != null) {
+            to_insert = new Gee.LinkedList<EmailIdentifier>();
+            foreach (EmailIdentifier inserted in this.inserted_ids) {
+                if (lowest.natural_sort_comparator(inserted) < 0) {
+                    to_insert.add(inserted);
+                }
+            }
+        } else {
+            to_insert = this.inserted_ids;
+        }
+
+        debug("Inserting %d messages in %s after %d inserted...",
+              to_insert.size,
+              this.monitor.base_folder.to_string(),
+              this.inserted_ids.size);
+        yield this.monitor.load_by_sparse_id(to_insert);
+
+        // Check to see if we need any more
+        this.monitor.check_window_count();
+    }
+}
diff --git a/src/engine/app/conversation-monitor/app-remove-operation.vala 
b/src/engine/app/conversation-monitor/app-remove-operation.vala
index fe53beb..eb8311b 100644
--- a/src/engine/app/conversation-monitor/app-remove-operation.vala
+++ b/src/engine/app/conversation-monitor/app-remove-operation.vala
@@ -1,4 +1,5 @@
-/* Copyright 2016 Software Freedom Conservancy Inc.
+/*
+ * Copyright 2016 Software Freedom Conservancy Inc.
  *
  * This software is licensed under the GNU Lesser General Public License
  * (version 2.1 or later).  See the COPYING file in this distribution.
@@ -17,8 +18,38 @@ private class Geary.App.RemoveOperation : ConversationOperation {
         this.removed_ids = removed_ids;
     }
 
-    public override async void execute_async() {
-        yield monitor.remove_emails_async(this.source_folder, this.removed_ids);
+    public override async void execute_async() throws Error {
+        debug("%d messages(s) removed from %s, trimming/removing conversations...",
+              this.removed_ids.size, this.source_folder.to_string()
+        );
+
+        Gee.Collection<Conversation> removed;
+        Gee.MultiMap<Conversation,Email> trimmed;
+        this.monitor.conversations.remove_all_emails_by_identifier(
+            source_folder.path,
+            removed_ids,
+            out removed,
+            out trimmed
+        );
+
+        // Check for conversations that have been evaporated as a
+        // result, update removed and trimmed collections to reflect
+        // any that evaporated
+        Gee.Collection<Conversation> evaporated =
+            yield this.monitor.check_conversations_in_base_folder(trimmed.get_keys());
+        removed.add_all(evaporated);
+        foreach (Conversation target in evaporated) {
+            trimmed.remove_all(target);
+        }
+
+        // Fire signals, clean up
+        this.monitor.notify_emails_removed(removed, trimmed);
+
+        // Check we still have enough conversations if any were
+        // removed
+        if (!removed.is_empty) {
+            this.monitor.check_window_count();
+        }
     }
 
 }
diff --git a/src/engine/app/conversation-monitor/app-reseed-operation.vala 
b/src/engine/app/conversation-monitor/app-reseed-operation.vala
index eb16e8c..9beaf1e 100644
--- a/src/engine/app/conversation-monitor/app-reseed-operation.vala
+++ b/src/engine/app/conversation-monitor/app-reseed-operation.vala
@@ -1,18 +1,44 @@
-/* Copyright 2016 Software Freedom Conservancy Inc.
+/*
+ * Copyright 2016 Software Freedom Conservancy Inc.
  *
  * This software is licensed under the GNU Lesser General Public License
  * (version 2.1 or later).  See the COPYING file in this distribution.
  */
 
+/**
+ * Re-scans the base folder for messages after its remote has opened.
+ *
+ * The reseed in effect checks for any existing message that did not
+ * satisfy the email field requirements for the conversation monitor
+ * or the required fields passed to its constructor, causing these
+ * fields to be downloaded from the remote.
+ */
 private class Geary.App.ReseedOperation : ConversationOperation {
-    private string why;
-    
-    public ReseedOperation(ConversationMonitor monitor, string why) {
-        base(monitor);
-        this.why = why;
+
+
+    public ReseedOperation(ConversationMonitor monitor) {
+        base(monitor, false);
     }
-    
-    public override async void execute_async() {
-         yield monitor.reseed_async(why);
+
+    public override async void execute_async() throws Error {
+        EmailIdentifier? earliest_id =
+            yield this.monitor.get_lowest_email_id_async();
+        if (earliest_id != null) {
+            debug("Reseeding starting from Email ID %s on opened %s",
+                  earliest_id.to_string(), this.monitor.base_folder.to_string());
+            // Some conversations have already been loaded, so check
+            // from the earliest known right through to the end of the
+            // vector for updated mesages
+            yield this.monitor.load_by_id_async(
+                earliest_id,
+                int.MAX,
+                Folder.ListFlags.OLDEST_TO_NEWEST | Folder.ListFlags.INCLUDING_ID
+            );
+        } else {
+            // No conversations are present, so do a check to get the
+            // side effect of queuing a fill operation.
+            this.monitor.check_window_count();
+        }
     }
+
 }
diff --git a/src/engine/meson.build b/src/engine/meson.build
index 1505805..fa6cbba 100644
--- a/src/engine/meson.build
+++ b/src/engine/meson.build
@@ -54,7 +54,7 @@ geary_engine_vala_sources = files(
   'app/conversation-monitor/app-conversation-set.vala',
   'app/conversation-monitor/app-external-append-operation.vala',
   'app/conversation-monitor/app-fill-window-operation.vala',
-  'app/conversation-monitor/app-local-load-operation.vala',
+  'app/conversation-monitor/app-insert-operation.vala',
   'app/conversation-monitor/app-local-search-operation.vala',
   'app/conversation-monitor/app-remove-operation.vala',
   'app/conversation-monitor/app-reseed-operation.vala',



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]