[geary/wip/713150-conversations] Fixes issue with nonexistant Message-IDs being associated properly



commit 9f21002f1025272a69fe929f91b6a7de3ac037ea
Author: Jim Nelson <jim yorba org>
Date:   Wed Feb 25 17:19:42 2015 -0800

    Fixes issue with nonexistant Message-IDs being associated properly
    
    Issue with loading more conversations in large folders, i.e. All Mail

 sql/version-025.sql                          |   11 +++-
 src/engine/app/app-conversation-monitor.vala |   66 ++++++++++++------
 src/engine/imap-db/imap-db-account.vala      |   20 ++++--
 src/engine/imap-db/imap-db-conversation.vala |   93 +++++++++++++++++--------
 4 files changed, 129 insertions(+), 61 deletions(-)
---
diff --git a/sql/version-025.sql b/sql/version-025.sql
index 5b3fc78..ad679cd 100644
--- a/sql/version-025.sql
+++ b/sql/version-025.sql
@@ -7,7 +7,14 @@ CREATE TABLE ConversationTable (
     flags TEXT DEFAULT NULL
 );
 
-ALTER TABLE MessageTable ADD COLUMN conversation_id INTEGER REFERENCES ConversationTable DEFAULT NULL;
+CREATE TABLE MessageConversationTable (
+    id INTEGER PRIMARY KEY,
+    conversation_id INTEGER REFERENCES ConversationTable DEFAULT NULL,
+    message_id INTEGER REFERENCES MessageTable DEFAULT NULL,
+    rfc822_message_id TEXT UNIQUE NOT NULL
+);
 
-CREATE INDEX MessageTableConversationIDIndex ON MessageTable(conversation_id);
+CREATE INDEX MessageConversationTableConversationIDIndex ON MessageConversationTable(conversation_id);
+CREATE INDEX MessageConversationTableMessageIDIndex ON MessageConversationTable(message_id);
+CREATE INDEX MessageConversationTableRFC822MessageIDIndex ON MessageConversationTable(rfc822_message_id);
 
diff --git a/src/engine/app/app-conversation-monitor.vala b/src/engine/app/app-conversation-monitor.vala
index d67a191..5b939dc 100644
--- a/src/engine/app/app-conversation-monitor.vala
+++ b/src/engine/app/app-conversation-monitor.vala
@@ -43,8 +43,14 @@ public class Geary.App.ConversationMonitor : BaseObject {
     // All generated Conversations
     private Gee.HashSet<Conversation> conversations = new Gee.HashSet<Conversation>();
     
-    // A map of EmailIdentifiers to Conversations
-    private Gee.HashMap<EmailIdentifier, Conversation> email_id_to_conversation =
+    // A map of all EmailIdentifiers to Conversations ... since emails from deep in the folder can
+    // be added to a conversation, use primary_email_id_to_conversation for finding boundaries
+    private Gee.HashMap<EmailIdentifier, Conversation> all_email_id_to_conversation =
+        new Gee.HashMap<EmailIdentifier, Conversation>();
+    
+    // A map of primary EmailIdentifiers to Conversations ... primary ids come from ranged listing
+    // and don't include ids found "deep" in the folder or from other folders
+    private Gee.HashMap<EmailIdentifier, Conversation> primary_email_id_to_conversation =
         new Gee.HashMap<EmailIdentifier, Conversation>();
     
     /**
@@ -258,7 +264,7 @@ public class Geary.App.ConversationMonitor : BaseObject {
     }
     
     public int get_email_count() {
-        return email_id_to_conversation.size;
+        return all_email_id_to_conversation.size;
     }
     
     public Gee.Collection<Conversation> get_conversations() {
@@ -266,7 +272,7 @@ public class Geary.App.ConversationMonitor : BaseObject {
     }
     
     public Geary.App.Conversation? get_conversation_for_email(Geary.EmailIdentifier email_id) {
-        return email_id_to_conversation[email_id];
+        return all_email_id_to_conversation[email_id];
     }
     
     public async bool start_monitoring_async(Cancellable? cancellable = null)
@@ -378,8 +384,8 @@ public class Geary.App.ConversationMonitor : BaseObject {
         notify_scan_started();
         try {
             // list by required_flags to ensure all are present in local store
-            yield process_email_async(yield folder.list_email_by_id_async(initial_id, count,
-                required_fields, flags, cancellable));
+            yield process_email_async(folder.path,
+                yield folder.list_email_by_id_async(initial_id, count, required_fields, flags, cancellable));
         } catch (Error err) {
             notify_scan_error(err);
         } finally {
@@ -392,8 +398,8 @@ public class Geary.App.ConversationMonitor : BaseObject {
         notify_scan_started();
         try {
             // list by required_flags to ensure all are present in local store
-            yield process_email_async(yield folder.list_email_by_sparse_id_async(ids, required_fields,
-                flags, cancellable));
+            yield process_email_async(folder.path,
+                yield folder.list_email_by_sparse_id_async(ids, required_fields, flags, cancellable));
         } catch (Error err) {
             notify_scan_error(err);
         } finally {
@@ -426,7 +432,7 @@ public class Geary.App.ConversationMonitor : BaseObject {
         return true;
     }
     
-    private async void process_email_async(Gee.Collection<Geary.Email>? emails) {
+    private async void process_email_async(FolderPath path, Gee.Collection<Geary.Email>? emails) {
         if (emails == null || emails.size == 0)
             return;
         
@@ -434,16 +440,29 @@ public class Geary.App.ConversationMonitor : BaseObject {
             .map<EmailIdentifier>(email => email.id)
             .to_hash_set();
         
-        yield process_email_ids_async(ids);
+        yield process_email_ids_async(path, ids);
     }
     
-    private async void process_email_ids_async(Gee.Collection<Geary.EmailIdentifier>? email_ids) {
+    private async void process_email_ids_async(FolderPath path, Gee.Collection<Geary.EmailIdentifier>? 
email_ids) {
         if (email_ids == null || email_ids.size == 0)
             return;
         
         Logging.debug(Logging.Flag.CONVERSATIONS, "[%s] ConversationMonitor::process_email: %d emails",
             folder.to_string(), email_ids.size);
         
+        // don't re-process existing emails
+        /*
+        Gee.Collection<EmailIdentifier> trimmed_ids = traverse<EmailIdentifier>(email_ids)
+            .filter(id => !all_email_id_to_conversation.has_key(id))
+            .to_hash_set();
+        
+        debug("Filtering existing identifiers: %d => %d", email_ids.size, trimmed_ids.size);
+        
+        email_ids = trimmed_ids;
+        if (email_ids.size == 0)
+            return false;
+        */
+        
         Gee.Collection<AssociatedEmails>? associations = null;
         try {
             associations = yield folder.account.local_search_associated_emails_async(
@@ -470,38 +489,37 @@ public class Geary.App.ConversationMonitor : BaseObject {
             // started and then coalesce as new emails come in)
             Gee.HashSet<Conversation> existing = new Gee.HashSet<Conversation>();
             foreach (EmailIdentifier associated_id in associated_ids) {
-                Conversation? conversation = email_id_to_conversation[associated_id];
+                Conversation? conversation = all_email_id_to_conversation[associated_id];
                 if (conversation != null)
                     existing.add(conversation);
             }
             
             // Create or pick conversation for these emails
             Conversation conversation;
-            unowned string ctype;
             switch (existing.size) {
                 case 0:
-                    ctype = "NEW";
                     conversation = new Conversation(this);
                 break;
                 
                 case 1:
-                    ctype = "FOUND";
                     conversation = traverse<Conversation>(existing).first();
                 break;
                 
                 default:
                     // TODO
-                    ctype = "MERGED";
                     conversation = merge_conversations(existing);
                 break;
             }
             
             // add all emails and each known path(s) to the Conversation and EmailIdentifier mapping
             foreach (Email email in association.emails) {
-                if (email.subject.value.contains("714922"))
-                    debug("ADDING \"%s TO %s", email.subject.to_string(), ctype);
                 conversation.add(email, association.known_paths[email]);
-                email_id_to_conversation[email.id] = conversation;
+                all_email_id_to_conversation[email.id] = conversation;
+                
+                // only add to primary map if identifier is part of the original set of arguments
+                // and they're from this folder and not another one
+                if (email_ids.contains(email.id) && path.equal_to(folder.path))
+                    primary_email_id_to_conversation[email.id] = conversation;
             }
             
             // if new, added, otherwise appended (if not already added)
@@ -570,8 +588,10 @@ public class Geary.App.ConversationMonitor : BaseObject {
         // remove the emails from internal state, noting which conversations are trimmed or flat-out
         // removed (evaporated)
         foreach (EmailIdentifier removed_id in removed_ids) {
+            primary_email_id_to_conversation.unset(removed_id);
+            
             Conversation conversation;
-            if (!email_id_to_conversation.unset(removed_id, out conversation))
+            if (!all_email_id_to_conversation.unset(removed_id, out conversation))
                 continue;
             
             Geary.Email? removed_email = conversation.get_email_by_id(removed_id);
@@ -610,13 +630,13 @@ public class Geary.App.ConversationMonitor : BaseObject {
         debug("%d out of folder message(s) appended to %s, fetching to add to conversations...", 
appended_ids.size,
             folder.to_string());
         
-        yield process_email_ids_async(appended_ids);
+        yield process_email_ids_async(folder.path, appended_ids);
     }
     
     private void on_account_email_flags_changed(Geary.Folder folder,
         Gee.Map<Geary.EmailIdentifier, Geary.EmailFlags> map) {
         foreach (Geary.EmailIdentifier id in map.keys) {
-            Conversation? conversation = email_id_to_conversation[id];
+            Conversation? conversation = all_email_id_to_conversation[id];
             if (conversation == null)
                 continue;
             
@@ -632,7 +652,7 @@ public class Geary.App.ConversationMonitor : BaseObject {
     private async Geary.EmailIdentifier? get_lowest_email_id_async(Cancellable? cancellable) {
         Geary.EmailIdentifier? earliest_id = null;
         try {
-            yield folder.find_boundaries_async(email_id_to_conversation.keys, out earliest_id, null,
+            yield folder.find_boundaries_async(primary_email_id_to_conversation.keys, out earliest_id, null,
                 cancellable);
         } catch (Error e) {
             debug("Error finding earliest email identifier: %s", e.message);
diff --git a/src/engine/imap-db/imap-db-account.vala b/src/engine/imap-db/imap-db-account.vala
index 230dcbe..ee2516a 100644
--- a/src/engine/imap-db/imap-db-account.vala
+++ b/src/engine/imap-db/imap-db-account.vala
@@ -652,6 +652,7 @@ private class Geary.ImapDB.Account : BaseObject {
             requested_fields = requested_fields | Geary.Email.Field.FLAGS;
         
         yield db.exec_transaction_async(Db.TransactionType.RO, (cx) => {
+            // BUG: in-reply-to can be an address list, not simply an address
             Db.Statement stmt = cx.prepare("SELECT id FROM MessageTable WHERE message_id = ? OR in_reply_to 
= ?");
             stmt.bind_string(0, message_id.value);
             stmt.bind_string(1, message_id.value);
@@ -702,13 +703,14 @@ private class Geary.ImapDB.Account : BaseObject {
         
         yield db.exec_transaction_async(Db.TransactionType.RO, (cx) => {
             foreach (ImapDB.EmailIdentifier db_id in db_ids) {
+                // if message found in previous conversation search, don't re-fetch
                 if (found_ids.contains(db_id))
                     continue;
                 
                 Db.Statement stmt = cx.prepare("""
                     SELECT conversation_id
-                    FROM MessageTable
-                    WHERE id = ?
+                    FROM MessageConversationTable
+                    WHERE message_id = ?
                 """);
                 stmt.bind_rowid(0, db_id.message_id);
                 
@@ -719,16 +721,22 @@ private class Geary.ImapDB.Account : BaseObject {
                 int64 conversation_id = result.rowid_at(0);
                 
                 stmt = cx.prepare("""
-                    SELECT id
-                    FROM MessageTable
+                    SELECT message_id
+                    FROM MessageConversationTable
                     WHERE conversation_id = ?
                 """);
                 stmt.bind_rowid(0, conversation_id);
                 
-                AssociatedEmails association = new AssociatedEmails();
-                
                 result = stmt.exec(cancellable);
+                
+                AssociatedEmails association = new AssociatedEmails();
                 while (!result.finished) {
+                    if (result.is_null_at(0)) {
+                        result.next(cancellable);
+                        
+                        continue;
+                    }
+                    
                     Email? email;
                     Gee.Collection<FolderPath?>? known_paths;
                     do_fetch_message(cx, result.rowid_at(0), requested_fields, search_predicate,
diff --git a/src/engine/imap-db/imap-db-conversation.vala b/src/engine/imap-db/imap-db-conversation.vala
index 26d9bb3..215a2b6 100644
--- a/src/engine/imap-db/imap-db-conversation.vala
+++ b/src/engine/imap-db/imap-db-conversation.vala
@@ -20,7 +20,7 @@ internal const Geary.Email.Field REQUIRED_FIELDS = Geary.Account.ASSOCIATED_REQU
 internal void do_add_message_to_conversation(Db.Connection cx, int64 message_id, Cancellable? cancellable)
     throws Error {
     Db.Statement references_stmt = cx.prepare("""
-        SELECT message_id, in_reply_to, reference_ids, conversation_id
+        SELECT message_id, in_reply_to, reference_ids
         FROM MessageTable
         WHERE id = ?
     """);
@@ -33,25 +33,22 @@ internal void do_add_message_to_conversation(Db.Connection cx, int64 message_id,
         return;
     }
     
-    // Check if part of an existing conversation; not an error to call this multiple times on same
-    // message, but not necessary either (since REFERENCES should be static)
-    int64 existing_conversation_id = Db.INVALID_ROWID;
-    if (!references_result.is_null_at(3))
-        existing_conversation_id = references_result.rowid_at(3);
-    
     // Create a common set of ancestors from In-Reply-To and References
     Gee.HashSet<RFC822.MessageID> ancestors = new Gee.HashSet<RFC822.MessageID>();
     add_ancestors(references_result.string_at(1), ancestors);
     add_ancestors(references_result.string_at(2), ancestors);
     
-    // Add this message's Message-ID
+    // Add this message's Message-ID to the ancestors
     unowned string? rfc822_message_id_text = references_result.string_at(0);
-    if (!String.is_empty(rfc822_message_id_text))
-        ancestors.add(new RFC822.MessageID(rfc822_message_id_text));
+    RFC822.MessageID? this_rfc822_message_id = null;
+    if (!String.is_empty(rfc822_message_id_text)) {
+        this_rfc822_message_id = new RFC822.MessageID(rfc822_message_id_text);
+        ancestors.add(this_rfc822_message_id);
+    }
     
     // in reality, no ancestors indicates that REFERENCES was not complete, so log that and exit
     if (ancestors.size == 0) {
-        message("Unable to add message %s to conversation table: no ancestors", message_id.to_string());
+        message("Unable to add message %s to conversation table: no references", message_id.to_string());
         
         return;
     }
@@ -61,8 +58,8 @@ internal void do_add_message_to_conversation(Db.Connection cx, int64 message_id,
     // conversation each time
     StringBuilder sql = new StringBuilder("""
         SELECT conversation_id
-        FROM MessageTable
-        WHERE message_id IN (
+        FROM MessageConversationTable
+        WHERE rfc822_message_id IN
     """);
     for (int ctr = 0; ctr < ancestors.size; ctr++)
         sql.append(ctr == 0 ? "?" : ",?");
@@ -78,7 +75,7 @@ internal void do_add_message_to_conversation(Db.Connection cx, int64 message_id,
     
     Db.Result search_result = search_stmt.exec(cancellable);
     while (!search_result.finished) {
-        // watch for NULL, which is the default value when a row is added to the MessageTable
+        // watch for NULL, which is the default value when a row is added to the MessageConversationTable
         // without a conversation id (which is almost always for new rows)
         if (!search_result.is_null_at(0))
             conversation_ids.add(search_result.rowid_at(0));
@@ -120,20 +117,56 @@ internal void do_add_message_to_conversation(Db.Connection cx, int64 message_id,
         break;
     }
     
-    // if already assigned this conversation, avoid a write
-    if (conversation_id == existing_conversation_id)
-        return;
-    
-    // Assign the message to this conversation
-    Db.Statement insert = cx.prepare("""
-        UPDATE MessageTable
-        SET conversation_id = ?
-        WHERE id = ?
-    """);
-    insert.bind_rowid(0, conversation_id);
-    insert.bind_rowid(1, message_id);
-    
-    insert.exec(cancellable);
+    // If each Message-ID present in table, update its conversation_id, otherwise add Message-ID and
+    // index to the conversation
+    foreach (RFC822.MessageID ancestor in ancestors) {
+        bool ancestor_is_added_message =
+            this_rfc822_message_id != null && this_rfc822_message_id.equal_to(ancestor);
+        
+        Db.Statement select = cx.prepare("""
+            SELECT id, conversation_id, message_id
+            FROM MessageConversationTable
+            WHERE rfc822_message_id = ?
+        """);
+        select.bind_string(0, ancestor.value);
+        
+        Db.Result result = select.exec(cancellable);
+        if (result.finished) {
+            // not present, add new
+            Db.Statement insert = cx.prepare("""
+                INSERT INTO MessageConversationTable
+                (conversation_id, message_id, rfc822_message_id)
+                VALUES (?, ?, ?)
+            """);
+            insert.bind_rowid(0, conversation_id);
+            // if ancestor is the added message's Message-ID, connect them now
+            if (ancestor_is_added_message)
+                insert.bind_rowid(1, message_id);
+            else
+                insert.bind_null(1);
+            insert.bind_string(2, ancestor.value);
+            
+            insert.exec(cancellable);
+        } else if (ancestor_is_added_message || result.is_null_at(1) || result.rowid_at(1) != 
conversation_id) {
+            // already present but with different conversation id, or this message is the Message-ID,
+            // so connect them now
+            Db.Statement update = cx.prepare("""
+                UPDATE MessageConversationTable
+                SET conversation_id = ?, message_id = ?
+                WHERE id = ?
+            """);
+            update.bind_rowid(0, conversation_id);
+            if (ancestor_is_added_message)
+                update.bind_rowid(1, message_id);
+            else if (!result.is_null_at(2))
+                update.bind_rowid(1, result.rowid_at(2));
+            else
+                update.bind_null(1);
+            update.bind_rowid(2, result.rowid_at(0));
+            
+            update.exec(cancellable);
+        }
+    }
 }
 
 private void add_ancestors(string? text, Gee.Collection<RFC822.MessageID> ancestors) {
@@ -152,7 +185,7 @@ private int64 do_merge_conversations(Db.Connection cx, Gee.Set<int64?> conversat
     // although multithreaded transactions aren't problem per se with database locking, it is
     // possible for multiple threads to be processing mail on the same conversation at the same
     // time and will therefore be reading the same list and choosing which to merge; by being
-    // preditable here, ensure that the same conversation is selected in both cases
+    // predictable here, ensure that the same conversation is selected in both cases
     int64 conversation_id = traverse<int64?>(conversation_ids)
         .to_tree_set(Collection.int64_compare_func)
         .first();
@@ -178,7 +211,7 @@ private int64 do_merge_conversations(Db.Connection cx, Gee.Set<int64?> conversat
     
     // set other messages in the other conversations to the chosen one
     StringBuilder merge_sql = new StringBuilder("""
-        UPDATE MessageTable
+        UPDATE MessageConversationTable
         SET conversation_id = ?
         WHERE conversation_id IN
     """);


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]