[geary/wip/713150-conversations] Implemented conversation remove/trim, simplified API



commit 904d145ef5191bd903298c71bebbb22fa18fb685
Author: Jim Nelson <jim yorba org>
Date:   Fri Feb 20 16:44:38 2015 -0800

    Implemented conversation remove/trim, simplified API

 src/CMakeLists.txt                                 |    1 -
 src/client/composer/composer-widget.vala           |   11 +-
 src/engine/api/geary-account.vala                  |   29 ++-
 src/engine/app/app-conversation-monitor.vala       |  271 +++++++++-----------
 src/engine/app/app-conversation.vala               |   14 +
 .../app-local-search-operation.vala                |   34 ---
 src/engine/imap-db/imap-db-account.vala            |   78 ++----
 src/engine/imap-db/imap-db-conversation.vala       |   85 ++++--
 .../imap-engine/imap-engine-generic-account.vala   |   18 +-
 src/engine/util/util-collection.vala               |   10 +
 10 files changed, 266 insertions(+), 285 deletions(-)
---
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 5ba1bc3..327923a 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -57,7 +57,6 @@ engine/app/conversation-monitor/app-conversation-set.vala
 engine/app/conversation-monitor/app-external-append-operation.vala
 engine/app/conversation-monitor/app-fill-window-operation.vala
 engine/app/conversation-monitor/app-local-load-operation.vala
-engine/app/conversation-monitor/app-local-search-operation.vala
 engine/app/conversation-monitor/app-remove-operation.vala
 engine/app/conversation-monitor/app-reseed-operation.vala
 engine/app/conversation-monitor/app-terminate-operation.vala
diff --git a/src/client/composer/composer-widget.vala b/src/client/composer/composer-widget.vala
index 4c4bff3..f8a43c5 100644
--- a/src/client/composer/composer-widget.vala
+++ b/src/client/composer/composer-widget.vala
@@ -634,15 +634,20 @@ public class ComposerWidget : Gtk.EventBox {
         }
     }
     
+    // TODO: Folder blacklist
+    private bool local_search_predicate(Geary.EmailIdentifier email_id, bool only_partial,
+        Gee.Collection<Geary.FolderPath?> known_paths, Geary.EmailFlags flags) {
+        return !flags.contains(Geary.EmailFlags.DRAFT);
+    }
+    
     public async void restore_draft_state_async(Geary.Account account) {
         bool first_email = true;
         
         foreach (Geary.RFC822.MessageID mid in in_reply_to) {
             Gee.MultiMap<Geary.Email, Geary.FolderPath?>? email_map;
             try {
-                email_map =
-                    yield account.local_search_message_id_async(mid, Geary.Email.Field.ENVELOPE,
-                    true, null, new Geary.EmailFlags.with(Geary.EmailFlags.DRAFT)); // TODO: Folder blacklist
+                email_map = yield account.local_search_message_id_async(mid, Geary.Email.Field.ENVELOPE,
+                    local_search_predicate);
             } catch (Error error) {
                 continue;
             }
diff --git a/src/engine/api/geary-account.vala b/src/engine/api/geary-account.vala
index 997b579..64659af 100644
--- a/src/engine/api/geary-account.vala
+++ b/src/engine/api/geary-account.vala
@@ -31,6 +31,20 @@ public abstract class Geary.Account : BaseObject {
         SAVE_SENT_MAIL_FAILED,
     }
     
+    /**
+     * @see local_search_associated_emails_async
+     */
+    public const Geary.Email.Field ASSOCIATED_REQUIRED_FIELDS = Email.Field.REFERENCES;
+    
+    /**
+     * Predicate used for filtering results from various local search methods.
+     *
+     * It's possible (and likely) this will be called from the context of a background thread,
+     * so use appropriate locking.
+     */
+    public delegate bool EmailSearchPredicate(Geary.EmailIdentifier email_id, bool only_partial,
+        Gee.Collection<Geary.FolderPath?> known_paths, Geary.EmailFlags flags);
+    
     public Geary.AccountInformation information { get; protected set; }
     
     public Geary.ProgressMonitor search_upgrade_monitor { get; protected set; }
@@ -327,9 +341,8 @@ public abstract class Geary.Account : BaseObject {
      * it's in, which can be null if it's in no folders.
      */
     public abstract async Gee.MultiMap<Geary.Email, Geary.FolderPath?>? local_search_message_id_async(
-        Geary.RFC822.MessageID message_id, Geary.Email.Field requested_fields, bool partial_ok,
-        Gee.Collection<Geary.FolderPath?>? folder_blacklist, Geary.EmailFlags? flag_blacklist,
-        Cancellable? cancellable = null) throws Error;
+        Geary.RFC822.MessageID message_id, Geary.Email.Field requested_fields,
+        EmailSearchPredicate? search_predicate, Cancellable? cancellable = null) throws Error;
     
     /**
      * Fetch all local messages associated with supplied { link EmailIdentifier}.
@@ -343,11 +356,15 @@ public abstract class Geary.Account : BaseObject {
      *
      * The particulars of the folder_blacklist and flag_blacklist parameters are the same as in
      * local_search_message_id_async.
+     *
+     * The Emails can only be searched for if they are stored locally with
+     * { link ASSOCIATED_EMAILS_REQUIRED_FIELDS} { link Email.Field}s.  Thus, when listing for
+     * EmailIdentifiers, add that field to the required fields to ensure they're available
+     * locally.
      */
     public abstract async Gee.Collection<Geary.AssociatedEmails>? local_search_associated_emails_async(
-        Gee.Set<Geary.EmailIdentifier> email_ids, Geary.Email.Field requested_fields, bool partial_ok,
-        Gee.Collection<Geary.FolderPath?> folder_blacklist, Geary.EmailFlags? flag_blacklist,
-        Cancellable? cancellable = null) throws Error;
+        Gee.Set<Geary.EmailIdentifier> email_ids, Geary.Email.Field requested_fields,
+        EmailSearchPredicate? search_predicate, Cancellable? cancellable = null) throws Error;
     
     /**
      * Return a single email fulfilling the required fields.  The email to pull
diff --git a/src/engine/app/app-conversation-monitor.vala b/src/engine/app/app-conversation-monitor.vala
index efbbae6..1fb0bca 100644
--- a/src/engine/app/app-conversation-monitor.vala
+++ b/src/engine/app/app-conversation-monitor.vala
@@ -15,6 +15,12 @@ public class Geary.App.ConversationMonitor : BaseObject {
     // # of messages to load at a time as we attempt to fill the min window.
     private const int WINDOW_FILL_MESSAGE_COUNT = 5;
     
+    private const Geary.SpecialFolderType[] BLACKLISTED_FOLDER_TYPES = {
+        Geary.SpecialFolderType.SPAM,
+        Geary.SpecialFolderType.TRASH,
+        Geary.SpecialFolderType.DRAFTS,
+    };
+    
     public Geary.Folder folder { get; private set; }
     public bool is_monitoring { get; private set; default = false; }
     public int min_window_count { get { return _min_window_count; }
@@ -28,6 +34,7 @@ public class Geary.App.ConversationMonitor : BaseObject {
     
     private Geary.Email.Field required_fields;
     private Geary.Folder.OpenFlags open_flags;
+    private Gee.Collection<FolderPath?> search_path_blacklist;
     private Cancellable? cancellable_monitor = null;
     private bool reseed_notified = false;
     private int _min_window_count = 0;
@@ -38,6 +45,8 @@ public class Geary.App.ConversationMonitor : BaseObject {
     
     // A logical map of Message-ID to Conversation ... these Message-IDs are merely referenced by
     // emails and the email itself may not be present in the conversation
+    //
+    // TODO: Is this necessary any longer?
     private Gee.HashMap<RFC822.MessageID, Conversation> message_id_to_conversation =
         new Gee.HashMap<RFC822.MessageID, Conversation>();
     
@@ -177,13 +186,33 @@ public class Geary.App.ConversationMonitor : BaseObject {
         Geary.Email.Field required_fields, int min_window_count) {
         this.folder = folder;
         this.open_flags = open_flags;
-        this.required_fields = required_fields | REQUIRED_FIELDS;
+        this.required_fields = required_fields | REQUIRED_FIELDS | Account.ASSOCIATED_REQUIRED_FIELDS;
         _min_window_count = min_window_count;
+        
+        // generate FolderPaths for the blacklisted special folder types
+        // TODO: Update when Account notifies of change to special folders
+        search_path_blacklist = new Gee.HashSet<Geary.FolderPath?>();
+        foreach (Geary.SpecialFolderType type in BLACKLISTED_FOLDER_TYPES) {
+            try {
+                Geary.Folder? blacklist_folder = folder.account.get_special_folder(type);
+                if (blacklist_folder != null)
+                    search_path_blacklist.add(blacklist_folder.path);
+            } catch (Error e) {
+                debug("Error finding special folder %s on account %s: %s",
+                    type.to_string(), folder.account.to_string(), e.message);
+            }
+        }
+        
+        // Add "no folders" so we omit results that have been deleted permanently from the server.
+        search_path_blacklist.add(null);
     }
     
     ~ConversationMonitor() {
         if (is_monitoring)
             debug("Warning: Conversations object destroyed without stopping monitoring");
+        
+        foreach (Conversation conversation in conversations)
+            conversation.clear_owner();
     }
     
     protected virtual void notify_monitoring_started() {
@@ -360,8 +389,8 @@ public class Geary.App.ConversationMonitor : BaseObject {
         Geary.Folder.ListFlags flags, Cancellable? cancellable) throws Error {
         notify_scan_started();
         try {
-            yield process_email_async(yield folder.list_email_by_id_async(initial_id,
-                count, Email.Field.NONE, flags, cancellable));
+            yield process_email_async(yield folder.list_email_by_id_async(initial_id, count,
+                required_fields, flags, cancellable));
         } catch (Error err) {
             notify_scan_error(err);
             
@@ -375,8 +404,8 @@ public class Geary.App.ConversationMonitor : BaseObject {
         Geary.Folder.ListFlags flags, Cancellable? cancellable) {
         notify_scan_started();
         try {
-            yield process_email_async(yield folder.list_email_by_sparse_id_async(ids,
-                Email.Field.NONE, flags, cancellable));
+            yield process_email_async(yield folder.list_email_by_sparse_id_async(ids, required_fields,
+                flags, cancellable));
         } catch (Error err) {
             notify_scan_error(err);
         } finally {
@@ -445,8 +474,27 @@ public class Geary.App.ConversationMonitor : BaseObject {
         }
     }
     
-    // Emails for this call only require Email.Field.NONE, as the EmailIdentifier is then used to
-    // load the associations and the required_fields
+    
+    // NOTE: This is called from a background thread.
+    private bool search_associated_predicate(EmailIdentifier email_id, bool only_partial,
+        Gee.Collection<FolderPath?> known_paths, EmailFlags flags) {
+        if (only_partial)
+            return false;
+        
+        if (known_paths.contains(folder.path))
+            return true;
+        
+        foreach (FolderPath? blacklist_path in search_path_blacklist) {
+            if (known_paths.contains(blacklist_path))
+                return false;
+        }
+        
+        if (flags.contains(EmailFlags.DRAFT))
+            return false;
+        
+        return true;
+    }
+    
     private async void process_email_async(Gee.Collection<Geary.Email>? emails) {
         if (emails == null || emails.size == 0)
             return;
@@ -461,8 +509,7 @@ public class Geary.App.ConversationMonitor : BaseObject {
         Gee.Collection<AssociatedEmails> associated;
         try {
             associated = yield folder.account.local_search_associated_emails_async(
-                email_ids, required_fields, false, get_search_blacklist(), get_search_flag_blacklist(),
-                null);
+                email_ids, required_fields, search_associated_predicate, null);
         } catch (Error err) {
             debug("Unable to search for associated emails: %s", err.message);
             
@@ -487,7 +534,7 @@ public class Geary.App.ConversationMonitor : BaseObject {
                     existing.add(conversation);
             }
             
-            // Create or pick conversation and reporting collection for these emails
+            // Create or pick conversation for these emails
             Conversation conversation;
             switch (existing.size) {
                 case 0:
@@ -499,6 +546,7 @@ public class Geary.App.ConversationMonitor : BaseObject {
                 break;
                 
                 default:
+                    // TODO
                     conversation = merge_conversations(existing);
                 break;
             }
@@ -515,9 +563,9 @@ public class Geary.App.ConversationMonitor : BaseObject {
             
             // if new, added, otherwise appended
             if (!conversations.contains(conversation)) {
-                added.add(conversation);
                 conversations.add(conversation);
-            } else {
+                added.add(conversation);
+            } else if (!added.contains(conversation)) {
                 foreach (Email email in association.emails)
                     appended.set(conversation, email);
             }
@@ -541,123 +589,6 @@ public class Geary.App.ConversationMonitor : BaseObject {
         return new Conversation(this);
     }
     
-    private Gee.Collection<Geary.FolderPath> get_search_blacklist() {
-        Geary.SpecialFolderType[] blacklisted_folder_types = {
-            Geary.SpecialFolderType.SPAM,
-            Geary.SpecialFolderType.TRASH,
-            Geary.SpecialFolderType.DRAFTS,
-        };
-        
-        Gee.ArrayList<Geary.FolderPath?> blacklist = new Gee.ArrayList<Geary.FolderPath?>();
-        foreach (Geary.SpecialFolderType type in blacklisted_folder_types) {
-            try {
-                Geary.Folder? blacklist_folder = folder.account.get_special_folder(type);
-                if (blacklist_folder != null)
-                    blacklist.add(blacklist_folder.path);
-            } catch (Error e) {
-                debug("Error finding special folder %s on account %s: %s",
-                    type.to_string(), folder.account.to_string(), e.message);
-            }
-        }
-        
-        // Add "no folders" so we omit results that have been deleted permanently from the server.
-        blacklist.add(null);
-        
-        return blacklist;
-    }
-    
-    private Geary.EmailFlags get_search_flag_blacklist() {
-        Geary.EmailFlags flags = new Geary.EmailFlags();
-        flags.add(Geary.EmailFlags.DRAFT);
-        
-        return flags;
-    }
-    
-    /*
-    private async void expand_conversations_async(Gee.Set<RFC822.MessageID> needed_message_ids,
-        ProcessJobContext job) {
-        if (needed_message_ids.size == 0) {
-            yield process_email_complete_async(job);
-            return;
-        }
-        
-        Logging.debug(Logging.Flag.CONVERSATIONS,
-            "[%s] ConversationMonitor::expand_conversations: %d email ids",
-            folder.to_string(), needed_message_ids.size);
-        
-        Gee.Collection<Geary.FolderPath> folder_blacklist = get_search_blacklist();
-        Geary.EmailFlags flag_blacklist = get_search_flag_blacklist();
-        
-        // execute all the local search operations at once
-        Nonblocking.Batch batch = new Nonblocking.Batch();
-        foreach (RFC822.MessageID message_id in needed_message_ids) {
-            batch.add(new LocalSearchOperation(folder.account, message_id, required_fields,
-                folder_blacklist, flag_blacklist));
-        }
-        
-        try {
-            yield batch.execute_all_async();
-        } catch (Error err) {
-            debug("Unable to search local mail for conversations: %s", err.message);
-            
-            yield process_email_complete_async(job);
-            return;
-        }
-        
-        // collect their results into a single collection of addt'l emails
-        Gee.HashMap<Geary.EmailIdentifier, Geary.Email> needed_messages = new Gee.HashMap<
-            Geary.EmailIdentifier, Geary.Email>();
-        foreach (int id in batch.get_ids()) {
-            LocalSearchOperation op = (LocalSearchOperation) batch.get_operation(id);
-            if (op.emails != null) {
-                Geary.traverse<Geary.Email>(op.emails.get_keys())
-                    .filter(e => !needed_messages.has_key(e.id))
-                    .add_all_to_map<Geary.EmailIdentifier>(needed_messages, e => e.id);
-            }
-        }
-        
-        // process them as through they're been loaded from the folder; this, in turn, may
-        // require more local searching of email
-        yield process_email_async(needed_messages.values, job);
-        
-        Logging.debug(Logging.Flag.CONVERSATIONS,
-            "[%s] ConversationMonitor::expand_conversations completed: %d email ids (%d found)",
-            folder.to_string(), needed_message_ids.size, needed_messages.size);
-    }
-    */
-    
-    /*
-    private async void process_email_complete_async(ProcessJobContext job) {
-        Gee.Collection<Geary.App.Conversation>? added = null;
-        Gee.MultiMap<Geary.App.Conversation, Geary.Email>? appended = null;
-        Gee.Collection<Conversation>? removed_due_to_merge = null;
-        try {
-            yield conversations.add_all_emails_async(job.emails.values, this, folder.path, out added, out 
appended,
-                out removed_due_to_merge, null);
-        } catch (Error err) {
-            debug("Unable to add emails to conversation: %s", err.message);
-            
-            // fall-through
-        }
-        
-        if (removed_due_to_merge != null) {
-            foreach (Conversation conversation in removed_due_to_merge)
-                notify_conversation_removed(conversation);
-        }
-        
-        if (added != null && added.size > 0)
-            notify_conversations_added(added);
-        
-        if (appended != null) {
-            foreach (Geary.App.Conversation conversation in appended.get_keys())
-                notify_conversation_appended(conversation, appended.get(conversation));
-        }
-        
-        if (job.inside_scan)
-            notify_scan_completed();
-    }
-    */
-    
     private void on_folder_email_appended(Gee.Collection<Geary.EmailIdentifier> appended_ids) {
         operation_queue.add(new AppendOperation(this, appended_ids));
     }
@@ -687,35 +618,73 @@ public class Geary.App.ConversationMonitor : BaseObject {
         debug("%d messages(s) removed from %s, trimming/removing conversations...", removed_ids.size,
             folder.to_string());
         
-        /*
-        Gee.Collection<Geary.App.Conversation> removed;
-        Gee.MultiMap<Geary.App.Conversation, Geary.Email> trimmed;
-        yield conversations.remove_emails_and_check_in_folder_async(removed_ids, folder.account,
-            folder.path, out removed, out trimmed, null);
+        Gee.HashSet<Conversation> removed_conversations = new Gee.HashSet<Conversation>();
+        Gee.HashMultiMap<Conversation, Email> trimmed_conversations = new Gee.HashMultiMap<
+            Conversation, Email>();
         
-        foreach (Conversation conversation in trimmed.get_keys())
-            notify_conversation_trimmed(conversation, trimmed.get(conversation));
+        // remove the emails from internal state, noting which conversations are trimmed or flat-out
+        // removed (evaporated)
+        foreach (EmailIdentifier removed_id in removed_ids) {
+            Conversation conversation;
+            if (!email_id_to_conversation.unset(removed_id, out conversation))
+                continue;
+            
+            Geary.Email? removed_email = conversation.get_email_by_id(removed_id);
+            if (removed_email == null)
+                continue;
+            
+            Gee.Set<RFC822.MessageID>? removed_message_ids = conversation.remove(removed_email);
+            if (removed_message_ids != null) {
+                foreach (RFC822.MessageID removed_message_id in removed_message_ids)
+                    message_id_to_conversation.unset(removed_message_id);
+            }
+            
+            if (conversation.get_count() == 0) {
+                conversations.remove(conversation);
+                removed_conversations.add(conversation);
+            } else {
+                trimmed_conversations.set(conversation, removed_email);
+            }
+        }
         
-        foreach (Conversation conversation in removed)
-            notify_conversation_removed(conversation);
-        */
+        // Look for trimmed conversations no longer holding messages in this folder;
+        // those are then evaporated themselves
+        int evaporated_count = 0;
+        foreach (Conversation conversation in trimmed_conversations.get_keys().to_array()) {
+            if (conversation.any_in_folder_path(folder.path))
+                continue;
+            
+            trimmed_conversations.remove_all(conversation);
+            
+            conversations.remove(conversation);
+            removed_conversations.add(conversation);
+            
+            evaporated_count++;
+        }
+        
+        if (evaporated_count > 0) {
+            debug("Evaporated %d conversations from %s due to no in-folder messages",
+                evaporated_count, folder.to_string());
+        }
+        
+        if (trimmed_conversations.size > 0) {
+            debug("Trimmed %d conversations of %d emails from %s", trimmed_conversations.get_keys().size,
+                trimmed_conversations.get_values().size, folder.to_string());
+        }
         
-        /*
-        // For any still-existing conversations that we've trimmed messages
-        // from, do a search for any messages that should still be there due to
-        // full conversations.  This way, some removed messages are instead
-        // "demoted" to out-of-folder emails.  This is kind of inefficient, but
-        // it doesn't seem like there's a way around it.
-        Gee.HashSet<RFC822.MessageID> search_message_ids = new Gee.HashSet<RFC822.MessageID>();
-        foreach (Conversation conversation in trimmed.get_keys())
-            search_message_ids.add_all(conversation.get_message_ids());
-        yield expand_conversations_async(search_message_ids, new ProcessJobContext(false));
-        */
+        foreach (Conversation conversation in trimmed_conversations.get_keys())
+            notify_conversation_trimmed(conversation, trimmed_conversations.get(conversation));
+        
+        if (removed_conversations.size > 0)
+            debug("Removed %d conversations from %s", removed_conversations.size, folder.to_string());
+        
+        foreach (Conversation conversation in removed_conversations)
+            notify_conversation_removed(conversation);
     }
     
     internal async void external_append_emails_async(Geary.Folder folder,
         Gee.Collection<Geary.EmailIdentifier> appended_ids) {
-        if (get_search_blacklist().contains(folder.path))
+        if (search_path_blacklist.contains(folder.path))
             return;
         
         if (conversations.is_empty)
diff --git a/src/engine/app/app-conversation.vala b/src/engine/app/app-conversation.vala
index 60a3905..e549a36 100644
--- a/src/engine/app/app-conversation.vala
+++ b/src/engine/app/app-conversation.vala
@@ -108,6 +108,8 @@ public class Geary.App.Conversation : BaseObject {
     
     /**
      * Returns the number of emails in the conversation in a particular folder.
+     *
+     * TODO: Remove?
      */
     public async int get_count_in_folder_async(Geary.Account account, Geary.FolderPath path,
         Cancellable? cancellable) throws Error {
@@ -126,6 +128,18 @@ public class Geary.App.Conversation : BaseObject {
     }
     
     /**
+     * Returns the number of { link Email}s in the conversation in the specified { link FolderPath}.
+     */
+    public bool any_in_folder_path(Geary.FolderPath path) {
+        foreach (EmailIdentifier email_id in path_map.get_keys()) {
+            if (path_map.get(email_id).contains(path))
+                return true;
+        }
+        
+        return false;
+    }
+    
+    /**
      * Returns all the email in the conversation sorted and filtered according to the specifiers.
      *
      * { link Location.IN_FOLDER} and { link Location.OUT_OF_FOLDER} are the
diff --git a/src/engine/imap-db/imap-db-account.vala b/src/engine/imap-db/imap-db-account.vala
index 0c55aba..ac5512d 100644
--- a/src/engine/imap-db/imap-db-account.vala
+++ b/src/engine/imap-db/imap-db-account.vala
@@ -641,15 +641,14 @@ private class Geary.ImapDB.Account : BaseObject {
     }
     
     public async Gee.MultiMap<Geary.Email, Geary.FolderPath?>? search_message_id_async(
-        Geary.RFC822.MessageID message_id, Geary.Email.Field requested_fields, bool partial_ok,
-        Gee.Collection<Geary.FolderPath?>? folder_blacklist, Geary.EmailFlags? flag_blacklist,
-        Cancellable? cancellable = null) throws Error {
+        Geary.RFC822.MessageID message_id, Geary.Email.Field requested_fields,
+        Geary.Account.EmailSearchPredicate? search_predicate, Cancellable? cancellable) throws Error {
         check_open();
         
         Gee.HashMultiMap<Geary.Email, Geary.FolderPath?> messages
             = new Gee.HashMultiMap<Geary.Email, Geary.FolderPath?>();
         
-        if (flag_blacklist != null)
+        if (search_predicate != null)
             requested_fields = requested_fields | Geary.Email.Field.FLAGS;
         
         yield db.exec_transaction_async(Db.TransactionType.RO, (cx) => {
@@ -661,8 +660,8 @@ private class Geary.ImapDB.Account : BaseObject {
             while (!result.finished) {
                 Email? email;
                 Gee.Collection<FolderPath?>? known_paths;
-                do_fetch_message(cx, result.int64_at(0), requested_fields, partial_ok, folder_blacklist,
-                    flag_blacklist, out email, out known_paths, cancellable);
+                do_fetch_message(cx, result.int64_at(0), requested_fields, search_predicate,
+                    out email, out known_paths, cancellable);
                 if (email != null) {
                     assert(known_paths != null);
                     
@@ -680,13 +679,11 @@ private class Geary.ImapDB.Account : BaseObject {
     }
     
     public async Gee.Collection<Geary.AssociatedEmails>? search_associated_emails_async(
-        Gee.Set<Geary.EmailIdentifier> email_ids, Email.Field requested_fields, bool partial_ok,
-        Gee.Collection<Geary.FolderPath?>? folder_blacklist, Geary.EmailFlags? flag_blacklist,
-        Cancellable? cancellable) throws Error {
+        Gee.Set<Geary.EmailIdentifier> email_ids, Email.Field requested_fields,
+        Geary.Account.EmailSearchPredicate? search_predicate, Cancellable? cancellable) throws Error {
         check_open();
         
-        // Store in a casted HashSet that can be modified internally, as associated identifiers
-        // will be weeded out as loaded with other identifiers
+        // Cast all at once and report error if invalid found
         Gee.HashSet<ImapDB.EmailIdentifier> db_ids = new Gee.HashSet<ImapDB.EmailIdentifier>();
         foreach (Geary.EmailIdentifier email_id in email_ids) {
             ImapDB.EmailIdentifier? db_id = email_id as ImapDB.EmailIdentifier;
@@ -699,10 +696,10 @@ private class Geary.ImapDB.Account : BaseObject {
         Gee.Collection<AssociatedEmails> associations = new Gee.ArrayList<AssociatedEmails>();
         Gee.HashSet<ImapDB.EmailIdentifier> found_ids = new Gee.HashSet<ImapDB.EmailIdentifier>();
         
-        if (flag_blacklist != null)
+        // Need flags for search predicate
+        if (search_predicate != null)
             requested_fields = requested_fields | Geary.Email.Field.FLAGS;
         
-        debug("Searching %d ids for associations...", email_ids.size);
         yield db.exec_transaction_async(Db.TransactionType.RO, (cx) => {
             foreach (ImapDB.EmailIdentifier db_id in db_ids) {
                 if (found_ids.contains(db_id))
@@ -734,8 +731,8 @@ private class Geary.ImapDB.Account : BaseObject {
                 while (!result.finished) {
                     Email? email;
                     Gee.Collection<FolderPath?>? known_paths;
-                    do_fetch_message(cx, result.int64_at(0), requested_fields, partial_ok, folder_blacklist,
-                        flag_blacklist, out email, out known_paths, cancellable);
+                    do_fetch_message(cx, result.rowid_at(0), requested_fields, search_predicate,
+                        out email, out known_paths, cancellable);
                     if (email != null) {
                         association.add(email, known_paths);
                         found_ids.add((ImapDB.EmailIdentifier) email.id);
@@ -744,60 +741,39 @@ private class Geary.ImapDB.Account : BaseObject {
                     result.next(cancellable);
                 }
                 
-                associations.add(association);
+                if (association.emails.size > 0)
+                    associations.add(association);
             }
             
             return Db.TransactionOutcome.DONE;
         }, cancellable);
-        debug("Found %d associations from %d ids", associations.size, email_ids.size);
         
         return associations.size > 0 ? associations : null;
     }
     
     private void do_fetch_message(Db.Connection cx, int64 message_id, Email.Field required_fields,
-        bool partial_ok, Gee.Collection<Geary.FolderPath?>? folder_blacklist, Geary.EmailFlags? 
flag_blacklist,
-        out Email? email, out Gee.Collection<FolderPath?>? known_paths, Cancellable? cancellable) throws 
Error {
+        Geary.Account.EmailSearchPredicate? search_predicate, out Email? email,
+        out Gee.Collection<FolderPath?>? known_paths, Cancellable? cancellable) throws Error {
         Email.Field actual_fields;
         MessageRow row = ImapDB.Folder.do_fetch_message_row(cx, message_id, required_fields,
             out actual_fields, cancellable);
         
-        // prepare for the worst
-        email = null;
-        known_paths = null;
-        
-        // Ignore any messages that don't have the required fields unless partial is ok
-        if (!partial_ok && !row.fields.fulfills(required_fields))
-            return;
-        
         email = row.to_email(new Geary.ImapDB.EmailIdentifier(message_id, null));
         ImapDB.Folder.do_add_attachments(cx, email, message_id, cancellable);
         
-        // Check for blacklisted flags.
-        if (flag_blacklist != null && email.email_flags != null && 
email.email_flags.contains_any(flag_blacklist)) {
-            email = null;
-            
-            return;
-        }
+        Gee.Set<Geary.FolderPath>? folders = do_find_email_folders(cx, message_id, true, cancellable);
         
         known_paths = new Gee.HashSet<FolderPath?>();
-        
-        // Add folders email is found in, respecting blacklist
-        Gee.Set<Geary.FolderPath>? folders = do_find_email_folders(cx, message_id, true, cancellable);
-        if (folders == null) {
-            if (folder_blacklist == null || !folder_blacklist.contains(null))
-                known_paths.add(null);
-        } else {
-            foreach (Geary.FolderPath path in folders) {
-                // If it's in a blacklisted folder, we don't report it at all.
-                if (folder_blacklist != null && folder_blacklist.contains(path)) {
-                    email = null;
-                    known_paths = null;
-                    
-                    break;
-                } else {
-                    known_paths.add(path);
-                }
-            }
+        if (folders == null)
+            known_paths.add(null);
+        else
+            known_paths.add_all(folders);
+        
+        // Allow caller to filter results in callback
+        if (search_predicate != null
+            && !search_predicate(email.id, !row.fields.fulfills(required_fields), known_paths, 
email.email_flags)) {
+            email = null;
+            known_paths = null;
         }
     }
     
diff --git a/src/engine/imap-db/imap-db-conversation.vala b/src/engine/imap-db/imap-db-conversation.vala
index 84d52de..26d9bb3 100644
--- a/src/engine/imap-db/imap-db-conversation.vala
+++ b/src/engine/imap-db/imap-db-conversation.vala
@@ -12,7 +12,7 @@
  
 namespace Geary.ImapDB.Conversation {
 
-internal const Geary.Email.Field REQUIRED_FIELDS = Email.Field.REFERENCES;
+internal const Geary.Email.Field REQUIRED_FIELDS = Geary.Account.ASSOCIATED_REQUIRED_FIELDS;
 
 /**
  * Should only be called when an email message's { link REQUIRED_FIELDS} are initially fulfilled.
@@ -20,13 +20,24 @@ internal const Geary.Email.Field REQUIRED_FIELDS = Email.Field.REFERENCES;
 internal void do_add_message_to_conversation(Db.Connection cx, int64 message_id, Cancellable? cancellable)
     throws Error {
     Db.Statement references_stmt = cx.prepare("""
-        SELECT message_id, in_reply_to, reference_ids
+        SELECT message_id, in_reply_to, reference_ids, conversation_id
         FROM MessageTable
         WHERE id = ?
     """);
     references_stmt.bind_rowid(0, message_id);
     
     Db.Result references_result = references_stmt.exec(cancellable);
+    if (references_result.finished) {
+        message("Unable to add message %s to conversation table: not found", message_id.to_string());
+        
+        return;
+    }
+    
+    // Check if part of an existing conversation; not an error to call this multiple times on same
+    // message, but not necessary either (since REFERENCES should be static)
+    int64 existing_conversation_id = Db.INVALID_ROWID;
+    if (!references_result.is_null_at(3))
+        existing_conversation_id = references_result.rowid_at(3);
     
     // Create a common set of ancestors from In-Reply-To and References
     Gee.HashSet<RFC822.MessageID> ancestors = new Gee.HashSet<RFC822.MessageID>();
@@ -45,7 +56,9 @@ internal void do_add_message_to_conversation(Db.Connection cx, int64 message_id,
         return;
     }
     
-    // search for existing conversation(s) for any of these Message-IDs that's not this message
+    // search for existing conversation(s) for any of these Message-IDs ... include this message
+    // to avoid a single-message conversation being processed multiple times and creating a new
+    // conversation each time
     StringBuilder sql = new StringBuilder("""
         SELECT conversation_id
         FROM MessageTable
@@ -53,13 +66,12 @@ internal void do_add_message_to_conversation(Db.Connection cx, int64 message_id,
     """);
     for (int ctr = 0; ctr < ancestors.size; ctr++)
         sql.append(ctr == 0 ? "?" : ",?");
-    sql.append(") AND id <> ?");
+    sql.append(")");
 
     Db.Statement search_stmt = cx.prepare(sql.str);
     int col = 0;
     foreach (RFC822.MessageID ancestor in ancestors)
         search_stmt.bind_string(col++, ancestor.value);
-    search_stmt.bind_rowid(col++, message_id);
     
     Gee.HashSet<int64?> conversation_ids = new Gee.HashSet<int64?>(Collection.int64_hash_func,
         Collection.int64_equal_func);
@@ -76,32 +88,42 @@ internal void do_add_message_to_conversation(Db.Connection cx, int64 message_id,
     
     // Select the message's conversation_id from the following three scenarios:
     int64 conversation_id;
-    if (conversation_ids.size > 1) {
-        // this indicates that two (or more) conversations were created due to emails arriving
-        // out of order and the complete(r) tree is only being available now; merge the
-        // conversations into one
-        conversation_id = do_merge_conversations(cx, conversation_ids, cancellable);
+    switch (conversation_ids.size) {
+        case 0:
+            // No conversation for this Message-ID, so generate a new one
+            cx.exec("""
+                INSERT INTO ConversationTable
+                DEFAULT VALUES
+            """);
+            conversation_id = cx.last_insert_rowid;
+            
+            debug("Created new conversation %s for message %s: %s", conversation_id.to_string(),
+                message_id.to_string(), rfc822_message_id_text);
+        break;
         
-        debug("Merged %d conversations to conversation %s", conversation_ids.size - 1,
-            conversation_id.to_string());
-    } else if (conversation_ids.size == 0) {
-        // No conversation for this Message-ID, so generate a new one
-        cx.exec("""
-            INSERT INTO ConversationTable
-            DEFAULT VALUES
-        """);
-        conversation_id = cx.last_insert_rowid;
+        case 1:
+            // one conversation found, so use that one
+            conversation_id = traverse<int64?>(conversation_ids).first();
+            
+            debug("Expanding existing conversation %s with message %s: %s", conversation_id.to_string(),
+                message_id.to_string(), rfc822_message_id_text);
+        break;
         
-        debug("Created new conversation %s for message %s: %s", conversation_id.to_string(),
-            message_id.to_string(), rfc822_message_id_text);
-    } else {
-        // one conversation found, so use that one
-        conversation_id = traverse<int64?>(conversation_ids).first();
-        
-        debug("Expanding existing conversation %s with message %s: %s", conversation_id.to_string(),
-            message_id.to_string(), rfc822_message_id_text);
+        default:
+            // this indicates that two (or more) conversations were created due to emails arriving
+            // out of order and the complete(r) tree is only being available now; merge the
+            // conversations into one
+            conversation_id = do_merge_conversations(cx, conversation_ids, cancellable);
+            
+            debug("Merged %d conversations to conversation %s", conversation_ids.size - 1,
+                conversation_id.to_string());
+        break;
     }
     
+    // if already assigned this conversation, avoid a write
+    if (conversation_id == existing_conversation_id)
+        return;
+    
     // Assign the message to this conversation
     Db.Statement insert = cx.prepare("""
         UPDATE MessageTable
@@ -127,8 +149,13 @@ private int64 do_merge_conversations(Db.Connection cx, Gee.Set<int64?> conversat
     // must be at least two in order to merge
     assert(conversation_ids.size > 1);
     
-    // doesn't really matter which; use the first one
-    int64 conversation_id = traverse<int64?>(conversation_ids).first();
+    // although multithreaded transactions aren't problem per se with database locking, it is
+    // possible for multiple threads to be processing mail on the same conversation at the same
+    // time and will therefore be reading the same list and choosing which to merge; by being
+    // preditable here, ensure that the same conversation is selected in both cases
+    int64 conversation_id = traverse<int64?>(conversation_ids)
+        .to_tree_set(Collection.int64_compare_func)
+        .first();
     
     //
     // TODO: Merge flags together
diff --git a/src/engine/imap-engine/imap-engine-generic-account.vala 
b/src/engine/imap-engine/imap-engine-generic-account.vala
index 8a3d3ea..d48ddda 100644
--- a/src/engine/imap-engine/imap-engine-generic-account.vala
+++ b/src/engine/imap-engine/imap-engine-generic-account.vala
@@ -852,19 +852,17 @@ private abstract class Geary.ImapEngine.GenericAccount : Geary.Account {
     }
     
     public override async Gee.MultiMap<Geary.Email, Geary.FolderPath?>? local_search_message_id_async(
-        Geary.RFC822.MessageID message_id, Geary.Email.Field requested_fields, bool partial_ok,
-        Gee.Collection<Geary.FolderPath?>? folder_blacklist, Geary.EmailFlags? flag_blacklist,
-        Cancellable? cancellable = null) throws Error {
-        return yield local.search_message_id_async(
-            message_id, requested_fields, partial_ok, folder_blacklist, flag_blacklist, cancellable);
+        Geary.RFC822.MessageID message_id, Geary.Email.Field requested_fields,
+        Account.EmailSearchPredicate? search_predicate, Cancellable? cancellable = null) throws Error {
+        return yield local.search_message_id_async(message_id, requested_fields, search_predicate,
+            cancellable);
     }
     
     public override async Gee.Collection<Geary.AssociatedEmails>? local_search_associated_emails_async(
-        Gee.Set<Geary.EmailIdentifier> email_ids, Geary.Email.Field requested_fields, bool partial_ok,
-        Gee.Collection<Geary.FolderPath?> folder_blacklist, Geary.EmailFlags? flag_blacklist,
-        Cancellable? cancellable = null) throws Error {
-        return yield local.search_associated_emails_async(email_ids, requested_fields, partial_ok,
-            folder_blacklist, flag_blacklist, cancellable);
+        Gee.Set<Geary.EmailIdentifier> email_ids, Geary.Email.Field requested_fields,
+        Account.EmailSearchPredicate? search_predicate, Cancellable? cancellable = null) throws Error {
+        return yield local.search_associated_emails_async(email_ids, requested_fields, search_predicate,
+            cancellable);
     }
     
     public override async Geary.Email local_fetch_email_async(Geary.EmailIdentifier email_id,
diff --git a/src/engine/util/util-collection.vala b/src/engine/util/util-collection.vala
index 6b81870..653d08b 100644
--- a/src/engine/util/util-collection.vala
+++ b/src/engine/util/util-collection.vala
@@ -149,6 +149,16 @@ public bool int64_equal_func(int64? a, int64? b) {
 }
 
 /**
+ * To be used as CompareDataFunc for Gee collections.
+ */
+public int int64_compare_func(int64? a, int64? b) {
+    int64 *bia = (int64 *) a;
+    int64 *bib = (int64 *) b;
+    
+    return (int) (*bia - *bib).clamp(-1, 1);
+}
+
+/**
  * A rotating-XOR hash that can be used to hash memory buffers of any size.
  */
 public uint hash_memory(void *ptr, size_t bytes) {



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]