[geary/wip/713150-conversations] Fixes issue with nonexistant Message-IDs being associated properly
- From: Jim Nelson <jnelson src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [geary/wip/713150-conversations] Fixes issue with nonexistant Message-IDs being associated properly
- Date: Thu, 26 Feb 2015 02:08:13 +0000 (UTC)
commit 9f21002f1025272a69fe929f91b6a7de3ac037ea
Author: Jim Nelson <jim yorba org>
Date: Wed Feb 25 17:19:42 2015 -0800
Fixes issue with nonexistant Message-IDs being associated properly
Issue with loading more conversations in large folders, i.e. All Mail
sql/version-025.sql | 11 +++-
src/engine/app/app-conversation-monitor.vala | 66 ++++++++++++------
src/engine/imap-db/imap-db-account.vala | 20 ++++--
src/engine/imap-db/imap-db-conversation.vala | 93 +++++++++++++++++--------
4 files changed, 129 insertions(+), 61 deletions(-)
---
diff --git a/sql/version-025.sql b/sql/version-025.sql
index 5b3fc78..ad679cd 100644
--- a/sql/version-025.sql
+++ b/sql/version-025.sql
@@ -7,7 +7,14 @@ CREATE TABLE ConversationTable (
flags TEXT DEFAULT NULL
);
-ALTER TABLE MessageTable ADD COLUMN conversation_id INTEGER REFERENCES ConversationTable DEFAULT NULL;
+CREATE TABLE MessageConversationTable (
+ id INTEGER PRIMARY KEY,
+ conversation_id INTEGER REFERENCES ConversationTable DEFAULT NULL,
+ message_id INTEGER REFERENCES MessageTable DEFAULT NULL,
+ rfc822_message_id TEXT UNIQUE NOT NULL
+);
-CREATE INDEX MessageTableConversationIDIndex ON MessageTable(conversation_id);
+CREATE INDEX MessageConversationTableConversationIDIndex ON MessageConversationTable(conversation_id);
+CREATE INDEX MessageConversationTableMessageIDIndex ON MessageConversationTable(message_id);
+CREATE INDEX MessageConversationTableRFC822MessageIDIndex ON MessageConversationTable(rfc822_message_id);
diff --git a/src/engine/app/app-conversation-monitor.vala b/src/engine/app/app-conversation-monitor.vala
index d67a191..5b939dc 100644
--- a/src/engine/app/app-conversation-monitor.vala
+++ b/src/engine/app/app-conversation-monitor.vala
@@ -43,8 +43,14 @@ public class Geary.App.ConversationMonitor : BaseObject {
// All generated Conversations
private Gee.HashSet<Conversation> conversations = new Gee.HashSet<Conversation>();
- // A map of EmailIdentifiers to Conversations
- private Gee.HashMap<EmailIdentifier, Conversation> email_id_to_conversation =
+ // A map of all EmailIdentifiers to Conversations ... since emails from deep in the folder can
+ // be added to a conversation, use primary_email_id_to_conversation for finding boundaries
+ private Gee.HashMap<EmailIdentifier, Conversation> all_email_id_to_conversation =
+ new Gee.HashMap<EmailIdentifier, Conversation>();
+
+ // A map of primary EmailIdentifiers to Conversations ... primary ids come from ranged listing
+ // and don't include ids found "deep" in the folder or from other folders
+ private Gee.HashMap<EmailIdentifier, Conversation> primary_email_id_to_conversation =
new Gee.HashMap<EmailIdentifier, Conversation>();
/**
@@ -258,7 +264,7 @@ public class Geary.App.ConversationMonitor : BaseObject {
}
public int get_email_count() {
- return email_id_to_conversation.size;
+ return all_email_id_to_conversation.size;
}
public Gee.Collection<Conversation> get_conversations() {
@@ -266,7 +272,7 @@ public class Geary.App.ConversationMonitor : BaseObject {
}
public Geary.App.Conversation? get_conversation_for_email(Geary.EmailIdentifier email_id) {
- return email_id_to_conversation[email_id];
+ return all_email_id_to_conversation[email_id];
}
public async bool start_monitoring_async(Cancellable? cancellable = null)
@@ -378,8 +384,8 @@ public class Geary.App.ConversationMonitor : BaseObject {
notify_scan_started();
try {
// list by required_flags to ensure all are present in local store
- yield process_email_async(yield folder.list_email_by_id_async(initial_id, count,
- required_fields, flags, cancellable));
+ yield process_email_async(folder.path,
+ yield folder.list_email_by_id_async(initial_id, count, required_fields, flags, cancellable));
} catch (Error err) {
notify_scan_error(err);
} finally {
@@ -392,8 +398,8 @@ public class Geary.App.ConversationMonitor : BaseObject {
notify_scan_started();
try {
// list by required_flags to ensure all are present in local store
- yield process_email_async(yield folder.list_email_by_sparse_id_async(ids, required_fields,
- flags, cancellable));
+ yield process_email_async(folder.path,
+ yield folder.list_email_by_sparse_id_async(ids, required_fields, flags, cancellable));
} catch (Error err) {
notify_scan_error(err);
} finally {
@@ -426,7 +432,7 @@ public class Geary.App.ConversationMonitor : BaseObject {
return true;
}
- private async void process_email_async(Gee.Collection<Geary.Email>? emails) {
+ private async void process_email_async(FolderPath path, Gee.Collection<Geary.Email>? emails) {
if (emails == null || emails.size == 0)
return;
@@ -434,16 +440,29 @@ public class Geary.App.ConversationMonitor : BaseObject {
.map<EmailIdentifier>(email => email.id)
.to_hash_set();
- yield process_email_ids_async(ids);
+ yield process_email_ids_async(path, ids);
}
- private async void process_email_ids_async(Gee.Collection<Geary.EmailIdentifier>? email_ids) {
+ private async void process_email_ids_async(FolderPath path, Gee.Collection<Geary.EmailIdentifier>?
email_ids) {
if (email_ids == null || email_ids.size == 0)
return;
Logging.debug(Logging.Flag.CONVERSATIONS, "[%s] ConversationMonitor::process_email: %d emails",
folder.to_string(), email_ids.size);
+ // don't re-process existing emails
+ /*
+ Gee.Collection<EmailIdentifier> trimmed_ids = traverse<EmailIdentifier>(email_ids)
+ .filter(id => !all_email_id_to_conversation.has_key(id))
+ .to_hash_set();
+
+ debug("Filtering existing identifiers: %d => %d", email_ids.size, trimmed_ids.size);
+
+ email_ids = trimmed_ids;
+ if (email_ids.size == 0)
+ return false;
+ */
+
Gee.Collection<AssociatedEmails>? associations = null;
try {
associations = yield folder.account.local_search_associated_emails_async(
@@ -470,38 +489,37 @@ public class Geary.App.ConversationMonitor : BaseObject {
// started and then coalesce as new emails come in)
Gee.HashSet<Conversation> existing = new Gee.HashSet<Conversation>();
foreach (EmailIdentifier associated_id in associated_ids) {
- Conversation? conversation = email_id_to_conversation[associated_id];
+ Conversation? conversation = all_email_id_to_conversation[associated_id];
if (conversation != null)
existing.add(conversation);
}
// Create or pick conversation for these emails
Conversation conversation;
- unowned string ctype;
switch (existing.size) {
case 0:
- ctype = "NEW";
conversation = new Conversation(this);
break;
case 1:
- ctype = "FOUND";
conversation = traverse<Conversation>(existing).first();
break;
default:
// TODO
- ctype = "MERGED";
conversation = merge_conversations(existing);
break;
}
// add all emails and each known path(s) to the Conversation and EmailIdentifier mapping
foreach (Email email in association.emails) {
- if (email.subject.value.contains("714922"))
- debug("ADDING \"%s TO %s", email.subject.to_string(), ctype);
conversation.add(email, association.known_paths[email]);
- email_id_to_conversation[email.id] = conversation;
+ all_email_id_to_conversation[email.id] = conversation;
+
+ // only add to primary map if identifier is part of the original set of arguments
+ // and they're from this folder and not another one
+ if (email_ids.contains(email.id) && path.equal_to(folder.path))
+ primary_email_id_to_conversation[email.id] = conversation;
}
// if new, added, otherwise appended (if not already added)
@@ -570,8 +588,10 @@ public class Geary.App.ConversationMonitor : BaseObject {
// remove the emails from internal state, noting which conversations are trimmed or flat-out
// removed (evaporated)
foreach (EmailIdentifier removed_id in removed_ids) {
+ primary_email_id_to_conversation.unset(removed_id);
+
Conversation conversation;
- if (!email_id_to_conversation.unset(removed_id, out conversation))
+ if (!all_email_id_to_conversation.unset(removed_id, out conversation))
continue;
Geary.Email? removed_email = conversation.get_email_by_id(removed_id);
@@ -610,13 +630,13 @@ public class Geary.App.ConversationMonitor : BaseObject {
debug("%d out of folder message(s) appended to %s, fetching to add to conversations...",
appended_ids.size,
folder.to_string());
- yield process_email_ids_async(appended_ids);
+ yield process_email_ids_async(folder.path, appended_ids);
}
private void on_account_email_flags_changed(Geary.Folder folder,
Gee.Map<Geary.EmailIdentifier, Geary.EmailFlags> map) {
foreach (Geary.EmailIdentifier id in map.keys) {
- Conversation? conversation = email_id_to_conversation[id];
+ Conversation? conversation = all_email_id_to_conversation[id];
if (conversation == null)
continue;
@@ -632,7 +652,7 @@ public class Geary.App.ConversationMonitor : BaseObject {
private async Geary.EmailIdentifier? get_lowest_email_id_async(Cancellable? cancellable) {
Geary.EmailIdentifier? earliest_id = null;
try {
- yield folder.find_boundaries_async(email_id_to_conversation.keys, out earliest_id, null,
+ yield folder.find_boundaries_async(primary_email_id_to_conversation.keys, out earliest_id, null,
cancellable);
} catch (Error e) {
debug("Error finding earliest email identifier: %s", e.message);
diff --git a/src/engine/imap-db/imap-db-account.vala b/src/engine/imap-db/imap-db-account.vala
index 230dcbe..ee2516a 100644
--- a/src/engine/imap-db/imap-db-account.vala
+++ b/src/engine/imap-db/imap-db-account.vala
@@ -652,6 +652,7 @@ private class Geary.ImapDB.Account : BaseObject {
requested_fields = requested_fields | Geary.Email.Field.FLAGS;
yield db.exec_transaction_async(Db.TransactionType.RO, (cx) => {
+ // BUG: in-reply-to can be an address list, not simply an address
Db.Statement stmt = cx.prepare("SELECT id FROM MessageTable WHERE message_id = ? OR in_reply_to
= ?");
stmt.bind_string(0, message_id.value);
stmt.bind_string(1, message_id.value);
@@ -702,13 +703,14 @@ private class Geary.ImapDB.Account : BaseObject {
yield db.exec_transaction_async(Db.TransactionType.RO, (cx) => {
foreach (ImapDB.EmailIdentifier db_id in db_ids) {
+ // if message found in previous conversation search, don't re-fetch
if (found_ids.contains(db_id))
continue;
Db.Statement stmt = cx.prepare("""
SELECT conversation_id
- FROM MessageTable
- WHERE id = ?
+ FROM MessageConversationTable
+ WHERE message_id = ?
""");
stmt.bind_rowid(0, db_id.message_id);
@@ -719,16 +721,22 @@ private class Geary.ImapDB.Account : BaseObject {
int64 conversation_id = result.rowid_at(0);
stmt = cx.prepare("""
- SELECT id
- FROM MessageTable
+ SELECT message_id
+ FROM MessageConversationTable
WHERE conversation_id = ?
""");
stmt.bind_rowid(0, conversation_id);
- AssociatedEmails association = new AssociatedEmails();
-
result = stmt.exec(cancellable);
+
+ AssociatedEmails association = new AssociatedEmails();
while (!result.finished) {
+ if (result.is_null_at(0)) {
+ result.next(cancellable);
+
+ continue;
+ }
+
Email? email;
Gee.Collection<FolderPath?>? known_paths;
do_fetch_message(cx, result.rowid_at(0), requested_fields, search_predicate,
diff --git a/src/engine/imap-db/imap-db-conversation.vala b/src/engine/imap-db/imap-db-conversation.vala
index 26d9bb3..215a2b6 100644
--- a/src/engine/imap-db/imap-db-conversation.vala
+++ b/src/engine/imap-db/imap-db-conversation.vala
@@ -20,7 +20,7 @@ internal const Geary.Email.Field REQUIRED_FIELDS = Geary.Account.ASSOCIATED_REQU
internal void do_add_message_to_conversation(Db.Connection cx, int64 message_id, Cancellable? cancellable)
throws Error {
Db.Statement references_stmt = cx.prepare("""
- SELECT message_id, in_reply_to, reference_ids, conversation_id
+ SELECT message_id, in_reply_to, reference_ids
FROM MessageTable
WHERE id = ?
""");
@@ -33,25 +33,22 @@ internal void do_add_message_to_conversation(Db.Connection cx, int64 message_id,
return;
}
- // Check if part of an existing conversation; not an error to call this multiple times on same
- // message, but not necessary either (since REFERENCES should be static)
- int64 existing_conversation_id = Db.INVALID_ROWID;
- if (!references_result.is_null_at(3))
- existing_conversation_id = references_result.rowid_at(3);
-
// Create a common set of ancestors from In-Reply-To and References
Gee.HashSet<RFC822.MessageID> ancestors = new Gee.HashSet<RFC822.MessageID>();
add_ancestors(references_result.string_at(1), ancestors);
add_ancestors(references_result.string_at(2), ancestors);
- // Add this message's Message-ID
+ // Add this message's Message-ID to the ancestors
unowned string? rfc822_message_id_text = references_result.string_at(0);
- if (!String.is_empty(rfc822_message_id_text))
- ancestors.add(new RFC822.MessageID(rfc822_message_id_text));
+ RFC822.MessageID? this_rfc822_message_id = null;
+ if (!String.is_empty(rfc822_message_id_text)) {
+ this_rfc822_message_id = new RFC822.MessageID(rfc822_message_id_text);
+ ancestors.add(this_rfc822_message_id);
+ }
// in reality, no ancestors indicates that REFERENCES was not complete, so log that and exit
if (ancestors.size == 0) {
- message("Unable to add message %s to conversation table: no ancestors", message_id.to_string());
+ message("Unable to add message %s to conversation table: no references", message_id.to_string());
return;
}
@@ -61,8 +58,8 @@ internal void do_add_message_to_conversation(Db.Connection cx, int64 message_id,
// conversation each time
StringBuilder sql = new StringBuilder("""
SELECT conversation_id
- FROM MessageTable
- WHERE message_id IN (
+ FROM MessageConversationTable
+ WHERE rfc822_message_id IN
""");
for (int ctr = 0; ctr < ancestors.size; ctr++)
sql.append(ctr == 0 ? "?" : ",?");
@@ -78,7 +75,7 @@ internal void do_add_message_to_conversation(Db.Connection cx, int64 message_id,
Db.Result search_result = search_stmt.exec(cancellable);
while (!search_result.finished) {
- // watch for NULL, which is the default value when a row is added to the MessageTable
+ // watch for NULL, which is the default value when a row is added to the MessageConversationTable
// without a conversation id (which is almost always for new rows)
if (!search_result.is_null_at(0))
conversation_ids.add(search_result.rowid_at(0));
@@ -120,20 +117,56 @@ internal void do_add_message_to_conversation(Db.Connection cx, int64 message_id,
break;
}
- // if already assigned this conversation, avoid a write
- if (conversation_id == existing_conversation_id)
- return;
-
- // Assign the message to this conversation
- Db.Statement insert = cx.prepare("""
- UPDATE MessageTable
- SET conversation_id = ?
- WHERE id = ?
- """);
- insert.bind_rowid(0, conversation_id);
- insert.bind_rowid(1, message_id);
-
- insert.exec(cancellable);
+ // If each Message-ID present in table, update its conversation_id, otherwise add Message-ID and
+ // index to the conversation
+ foreach (RFC822.MessageID ancestor in ancestors) {
+ bool ancestor_is_added_message =
+ this_rfc822_message_id != null && this_rfc822_message_id.equal_to(ancestor);
+
+ Db.Statement select = cx.prepare("""
+ SELECT id, conversation_id, message_id
+ FROM MessageConversationTable
+ WHERE rfc822_message_id = ?
+ """);
+ select.bind_string(0, ancestor.value);
+
+ Db.Result result = select.exec(cancellable);
+ if (result.finished) {
+ // not present, add new
+ Db.Statement insert = cx.prepare("""
+ INSERT INTO MessageConversationTable
+ (conversation_id, message_id, rfc822_message_id)
+ VALUES (?, ?, ?)
+ """);
+ insert.bind_rowid(0, conversation_id);
+ // if ancestor is the added message's Message-ID, connect them now
+ if (ancestor_is_added_message)
+ insert.bind_rowid(1, message_id);
+ else
+ insert.bind_null(1);
+ insert.bind_string(2, ancestor.value);
+
+ insert.exec(cancellable);
+ } else if (ancestor_is_added_message || result.is_null_at(1) || result.rowid_at(1) !=
conversation_id) {
+ // already present but with different conversation id, or this message is the Message-ID,
+ // so connect them now
+ Db.Statement update = cx.prepare("""
+ UPDATE MessageConversationTable
+ SET conversation_id = ?, message_id = ?
+ WHERE id = ?
+ """);
+ update.bind_rowid(0, conversation_id);
+ if (ancestor_is_added_message)
+ update.bind_rowid(1, message_id);
+ else if (!result.is_null_at(2))
+ update.bind_rowid(1, result.rowid_at(2));
+ else
+ update.bind_null(1);
+ update.bind_rowid(2, result.rowid_at(0));
+
+ update.exec(cancellable);
+ }
+ }
}
private void add_ancestors(string? text, Gee.Collection<RFC822.MessageID> ancestors) {
@@ -152,7 +185,7 @@ private int64 do_merge_conversations(Db.Connection cx, Gee.Set<int64?> conversat
// although multithreaded transactions aren't problem per se with database locking, it is
// possible for multiple threads to be processing mail on the same conversation at the same
// time and will therefore be reading the same list and choosing which to merge; by being
- // preditable here, ensure that the same conversation is selected in both cases
+ // predictable here, ensure that the same conversation is selected in both cases
int64 conversation_id = traverse<int64?>(conversation_ids)
.to_tree_set(Collection.int64_compare_func)
.first();
@@ -178,7 +211,7 @@ private int64 do_merge_conversations(Db.Connection cx, Gee.Set<int64?> conversat
// set other messages in the other conversations to the chosen one
StringBuilder merge_sql = new StringBuilder("""
- UPDATE MessageTable
+ UPDATE MessageConversationTable
SET conversation_id = ?
WHERE conversation_id IN
""");
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]