[geary] Reduce database pauses: Refs bug #725929



commit 28629ce6e2b149ea6b3ff94f27ab30237b7bcf2d
Author: Jim Nelson <jim yorba org>
Date:   Wed Jun 11 17:48:23 2014 -0700

    Reduce database pauses: Refs bug #725929
    
    Although not completely solved, this reduces database pauses
    (along with prior commit improving parallelization of database) by
    how ImapDB.Folder accesses the database.  In particular, one code
    path required an INNER JOIN that simply took too long (~8s) on large
    databases, locking the entire db in the process.  Tests and hacking
    showed that, annoyingly, manually performing the INNER JOIN by
    intersecting the results of two SQL transactions was more efficient.
    
    In addition, creating/merging emails into the database is done in
    chunks with a small pause between each chunk to allow other tasks
    the opportunity to get to the database.  Create/merge is one of the
    most expensive operations on the database.

 src/engine/imap-db/imap-db-folder.vala             |  172 ++++++++++++--------
 .../imap-engine/imap-engine-email-prefetcher.vala  |   39 ++---
 src/engine/util/util-collection.vala               |   17 ++
 3 files changed, 138 insertions(+), 90 deletions(-)
---
diff --git a/src/engine/imap-db/imap-db-folder.vala b/src/engine/imap-db/imap-db-folder.vala
index 7fa2d04..4dbfbb2 100644
--- a/src/engine/imap-db/imap-db-folder.vala
+++ b/src/engine/imap-db/imap-db-folder.vala
@@ -21,6 +21,7 @@ private class Geary.ImapDB.Folder : BaseObject, Geary.ReferenceSemantics {
     private const int LIST_EMAIL_WITH_MESSAGE_CHUNK_COUNT = 10;
     private const int LIST_EMAIL_METADATA_COUNT = 100;
     private const int LIST_EMAIL_FIELDS_CHUNK_COUNT = 500;
+    private const int CREATE_MERGE_EMAIL_CHUNK_COUNT = 25;
     
     [Flags]
     public enum ListFlags {
@@ -180,45 +181,57 @@ private class Geary.ImapDB.Folder : BaseObject, Geary.ReferenceSemantics {
     public async Gee.Map<Geary.Email, bool> create_or_merge_email_async(Gee.Collection<Geary.Email> emails,
         Cancellable? cancellable) throws Error {
         Gee.HashMap<Geary.Email, bool> results = new Gee.HashMap<Geary.Email, bool>();
-        Gee.ArrayList<Geary.EmailIdentifier> complete_ids = new Gee.ArrayList<Geary.EmailIdentifier>();
-        Gee.Collection<Contact> updated_contacts = new Gee.ArrayList<Contact>();
-        int total_unread_change = 0;
-        yield db.exec_transaction_async(Db.TransactionType.RW, (cx) => {
-            foreach (Geary.Email email in emails) {
-                Gee.Collection<Contact>? contacts_this_email = null;
-                Geary.Email.Field pre_fields;
-                Geary.Email.Field post_fields;
-                int unread_change = 0;
-                bool created = do_create_or_merge_email(cx, email, out pre_fields,
-                    out post_fields, out contacts_this_email, ref unread_change, cancellable);
-                
-                if (contacts_this_email != null)
-                    updated_contacts.add_all(contacts_this_email);
-                
-                results.set(email, created);
-                
-                // in essence, only fire the "email-completed" signal if the local version didn't
-                // have all the fields but after the create/merge now does
-                if (post_fields.is_all_set(Geary.Email.Field.ALL) && 
!pre_fields.is_all_set(Geary.Email.Field.ALL))
-                    complete_ids.add(email.id);
-                
-                // Update unread count in DB.
-                do_add_to_unread_count(cx, unread_change, cancellable);
+        
+        Gee.ArrayList<Geary.Email> list = traverse<Geary.Email>(emails).to_array_list();
+        int index = 0;
+        while (index < list.size) {
+            int stop = Numeric.int_ceiling(index + CREATE_MERGE_EMAIL_CHUNK_COUNT, list.size);
+            Gee.List<Geary.Email> slice = list.slice(index, stop);
+            
+            Gee.ArrayList<Geary.EmailIdentifier> complete_ids = new Gee.ArrayList<Geary.EmailIdentifier>();
+            Gee.Collection<Contact> updated_contacts = new Gee.ArrayList<Contact>();
+            int total_unread_change = 0;
+            yield db.exec_transaction_async(Db.TransactionType.RW, (cx) => {
+                foreach (Geary.Email email in slice) {
+                    Gee.Collection<Contact>? contacts_this_email = null;
+                    Geary.Email.Field pre_fields;
+                    Geary.Email.Field post_fields;
+                    int unread_change = 0;
+                    bool created = do_create_or_merge_email(cx, email, out pre_fields,
+                        out post_fields, out contacts_this_email, ref unread_change, cancellable);
+                    
+                    if (contacts_this_email != null)
+                        updated_contacts.add_all(contacts_this_email);
+                    
+                    results.set(email, created);
+                    
+                    // in essence, only fire the "email-completed" signal if the local version didn't
+                    // have all the fields but after the create/merge now does
+                    if (post_fields.is_all_set(Geary.Email.Field.ALL) && 
!pre_fields.is_all_set(Geary.Email.Field.ALL))
+                        complete_ids.add(email.id);
+                    
+                    // Update unread count in DB.
+                    do_add_to_unread_count(cx, unread_change, cancellable);
+                    
+                    total_unread_change += unread_change;
+                }
                 
-                total_unread_change += unread_change;
-            }
+                return Db.TransactionOutcome.COMMIT;
+            }, cancellable);
             
-            return Db.TransactionOutcome.COMMIT;
-        }, cancellable);
-        
-        if (updated_contacts.size > 0)
-            contact_store.update_contacts(updated_contacts);
-        
-        // Update the email_unread properties.
-        properties.set_status_unseen((properties.email_unread + total_unread_change).clamp(0, int.MAX));
-        
-        if (complete_ids.size > 0)
-            email_complete(complete_ids);
+            if (updated_contacts.size > 0)
+                contact_store.update_contacts(updated_contacts);
+            
+            // Update the email_unread properties.
+            properties.set_status_unseen((properties.email_unread + total_unread_change).clamp(0, int.MAX));
+            
+            if (complete_ids.size > 0)
+                email_complete(complete_ids);
+            
+            index = stop;
+            if (index < list.size)
+                yield Scheduler.sleep_ms_async(100);
+        }
         
         return results;
     }
@@ -269,12 +282,6 @@ private class Geary.ImapDB.Folder : BaseObject, Geary.ReferenceSemantics {
                 SELECT MessageLocationTable.message_id, ordering, remove_marker
                 FROM MessageLocationTable
             """);
-            if (only_incomplete) {
-                sql.append("""
-                    INNER JOIN MessageTable
-                    ON MessageTable.id = MessageLocationTable.message_id
-                """);
-            }
             
             sql.append("WHERE folder_id = ? ");
             
@@ -283,26 +290,32 @@ private class Geary.ImapDB.Folder : BaseObject, Geary.ReferenceSemantics {
             else
                 sql.append("AND ordering <= ? ");
             
-            if (only_incomplete)
-                sql.append_printf("AND fields != %d ", Geary.Email.Field.ALL);
-            
             if (oldest_to_newest)
                 sql.append("ORDER BY ordering ASC ");
             else
                 sql.append("ORDER BY ordering DESC ");
             
-            sql.append("LIMIT ?");
-            
             Db.Statement stmt = cx.prepare(sql.str);
             stmt.bind_rowid(0, folder_id);
             stmt.bind_int64(1, start_uid.value);
-            stmt.bind_int(2, count);
             
             locations = do_results_to_locations(stmt.exec(cancellable), flags, cancellable);
             
+            if (locations.size > count)
+                locations = locations.slice(0, count);
+            
             return Db.TransactionOutcome.SUCCESS;
         }, cancellable);
         
+        // remove complete locations (emails with all fields downloaded)
+        if (only_incomplete && locations.size > 0) {
+            yield db.exec_transaction_async(Db.TransactionType.RO, (cx) => {
+                do_remove_complete_locations(cx, locations, cancellable);
+                
+                return Db.TransactionOutcome.SUCCESS;
+            }, cancellable);
+        }
+        
         // Next, read in email in chunks
         return yield list_email_in_chunks_async(locations, required_fields, flags, cancellable);
     }
@@ -389,16 +402,7 @@ private class Geary.ImapDB.Folder : BaseObject, Geary.ReferenceSemantics {
                 FROM MessageLocationTable
             """);
             
-            if (only_incomplete) {
-                sql.append("""
-                    INNER JOIN MessageTable
-                    ON MessageTable.id = MessageLocationTable.message_id
-                """);
-            }
-            
             sql.append("WHERE folder_id = ? AND ordering >= ? AND ordering <= ? ");
-            if (only_incomplete)
-                sql.append_printf(" AND fields != %d ", Geary.Email.Field.ALL);
             
             Db.Statement stmt = cx.prepare(sql.str);
             stmt.bind_rowid(0, folder_id);
@@ -407,6 +411,9 @@ private class Geary.ImapDB.Folder : BaseObject, Geary.ReferenceSemantics {
             
             locations = do_results_to_locations(stmt.exec(cancellable), flags, cancellable);
             
+            if (only_incomplete)
+                do_remove_complete_locations(cx, locations, cancellable);
+            
             return Db.TransactionOutcome.SUCCESS;
         }, cancellable);
         
@@ -436,13 +443,6 @@ private class Geary.ImapDB.Folder : BaseObject, Geary.ReferenceSemantics {
                 FROM MessageLocationTable
             """);
             
-            if (only_incomplete) {
-                sql.append("""
-                    INNER JOIN MessageTable
-                    ON MessageTable.id = MessageLocationTable.message_id
-                """);
-            }
-            
             if (locs.size != 1) {
                 sql.append("WHERE ordering IN (");
                 bool first = true;
@@ -458,9 +458,6 @@ private class Geary.ImapDB.Folder : BaseObject, Geary.ReferenceSemantics {
                 sql.append_printf("WHERE ordering = '%s' ", locs[0].uid.to_string());
             }
             
-            if (only_incomplete)
-                sql.append_printf(" AND fields != %d ", Geary.Email.Field.ALL);
-            
             sql.append("AND folder_id = ? ");
             
             Db.Statement stmt = cx.prepare(sql.str);
@@ -468,6 +465,9 @@ private class Geary.ImapDB.Folder : BaseObject, Geary.ReferenceSemantics {
             
             locations = do_results_to_locations(stmt.exec(cancellable), flags, cancellable);
             
+            if (only_incomplete)
+                do_remove_complete_locations(cx, locations, cancellable);
+            
             return Db.TransactionOutcome.SUCCESS;
         }, cancellable);
         
@@ -2042,6 +2042,42 @@ private class Geary.ImapDB.Folder : BaseObject, Geary.ReferenceSemantics {
         return locations;
     }
     
+    // Use a separate step to strip out complete emails because original implementation (using an
+    // INNER JOIN) was horribly slow under load
+    private void do_remove_complete_locations(Db.Connection cx, Gee.List<LocationIdentifier>? locations,
+        Cancellable? cancellable) throws Error {
+        if (locations == null || locations.size == 0)
+            return;
+        
+        StringBuilder sql = new StringBuilder("""
+            SELECT id FROM MessageTable WHERE fields <> ?
+        """);
+        
+        Db.Statement stmt = cx.prepare(sql.str);
+        stmt.bind_int(0, Geary.Email.Field.ALL);
+        
+        Db.Result results = stmt.exec(cancellable);
+        
+        Gee.HashSet<int64?> incomplete_locations = new Gee.HashSet<int64?>(Collection.int64_hash_func,
+            Collection.int64_equal_func);
+        while (!results.finished) {
+            incomplete_locations.add(results.int64_at(0));
+            results.next(cancellable);
+        }
+        
+        if (incomplete_locations.size == 0) {
+            locations.clear();
+            
+            return;
+        }
+        
+        Gee.Iterator<LocationIdentifier> iter = locations.iterator();
+        while (iter.next()) {
+            if (!incomplete_locations.contains(iter.get().message_id))
+                iter.remove();
+        }
+    }
+    
     private LocationIdentifier? do_get_location_for_id(Db.Connection cx, ImapDB.EmailIdentifier id,
         ListFlags flags, Cancellable? cancellable) throws Error {
         Db.Statement stmt = cx.prepare("""
diff --git a/src/engine/imap-engine/imap-engine-email-prefetcher.vala 
b/src/engine/imap-engine/imap-engine-email-prefetcher.vala
index 81bf92a..871963f 100644
--- a/src/engine/imap-engine/imap-engine-email-prefetcher.vala
+++ b/src/engine/imap-engine/imap-engine-email-prefetcher.vala
@@ -15,7 +15,6 @@ private class Geary.ImapEngine.EmailPrefetcher : Object {
     public const int PREFETCH_DELAY_SEC = 1;
     
     private const Geary.Email.Field PREFETCH_FIELDS = Geary.Email.Field.ALL;
-    private const int PREFETCH_IDS_CHUNKS = 500;
     private const int PREFETCH_CHUNK_BYTES = 32 * 1024;
     
     public Nonblocking.CountingSemaphore active_sem { get; private set;
@@ -88,7 +87,10 @@ private class Geary.ImapEngine.EmailPrefetcher : Object {
     }
     
     // emails should include PROPERTIES
-    private void schedule_prefetch(Gee.Collection<Geary.Email> emails) {
+    private void schedule_prefetch(Gee.Collection<Geary.Email>? emails) {
+        if (emails == null || emails.size == 0)
+            return;
+        
         debug("%s: scheduling %d emails for prefetching", folder.to_string(), emails.size);
         
         prefetch_emails.add_all(emails);
@@ -108,41 +110,34 @@ private class Geary.ImapEngine.EmailPrefetcher : Object {
     }
     
     private async void do_prepare_all_local_async() {
-        Geary.EmailIdentifier? lowest = null;
-        for (;;) {
-            Gee.List<Geary.Email>? list = null;
-            try {
-                list = yield folder.local_folder.list_email_by_id_async((ImapDB.EmailIdentifier) lowest,
-                    PREFETCH_IDS_CHUNKS, Geary.Email.Field.PROPERTIES, 
ImapDB.Folder.ListFlags.ONLY_INCOMPLETE,
-                    cancellable);
-            } catch (Error err) {
-                debug("Error while list local emails for %s: %s", folder.to_string(), err.message);
-            }
-            
-            if (list == null || list.size == 0)
-                break;
-            
-            // find lowest for next iteration
-            lowest = Geary.EmailIdentifier.sort_emails(list).first().id;
-            
-            schedule_prefetch(list);
+        Gee.List<Geary.Email>? list = null;
+        try {
+            debug("Listing all emails needing prefetching in %s...", folder.to_string());
+            list = yield folder.local_folder.list_email_by_id_async(null, int.MAX,
+                Geary.Email.Field.PROPERTIES, ImapDB.Folder.ListFlags.ONLY_INCOMPLETE, cancellable);
+            debug("Listed all emails needing prefetching in %s", folder.to_string());
+        } catch (Error err) {
+            debug("Error while list local emails for %s: %s", folder.to_string(), err.message);
         }
         
+        schedule_prefetch(list);
+        
         active_sem.blind_notify();
     }
     
     private async void do_prepare_new_async(Gee.Collection<Geary.EmailIdentifier> ids) {
         Gee.List<Geary.Email>? list = null;
         try {
+            debug("Listing new %d emails needing prefetching in %s...", ids.size, folder.to_string());
             list = yield folder.local_folder.list_email_by_sparse_id_async(
                 (Gee.Collection<ImapDB.EmailIdentifier>) ids,
                 Geary.Email.Field.PROPERTIES, ImapDB.Folder.ListFlags.ONLY_INCOMPLETE, cancellable);
+            debug("Listed new emails needing prefetching in %s", folder.to_string());
         } catch (Error err) {
             debug("Error while list local emails for %s: %s", folder.to_string(), err.message);
         }
         
-        if (list != null && list.size > 0)
-            schedule_prefetch(list);
+        schedule_prefetch(list);
         
         active_sem.blind_notify();
     }
diff --git a/src/engine/util/util-collection.vala b/src/engine/util/util-collection.vala
index 7c86c2e..7ec7a65 100644
--- a/src/engine/util/util-collection.vala
+++ b/src/engine/util/util-collection.vala
@@ -126,6 +126,23 @@ public inline static uint int64_hash(int64 value) {
 }
 
 /**
+ * To be used as hash_func for Gee collections.
+ */
+public uint int64_hash_func(int64? n) {
+    return hash_memory((uint8 *) n, sizeof(int64));
+}
+
+/**
+ * To be used as equal_func for Gee collections.
+ */
+public bool int64_equal_func(int64? a, int64? b) {
+    int64 *bia = (int64 *) a;
+    int64 *bib = (int64 *) b;
+    
+    return (*bia) == (*bib);
+}
+
+/**
  * A rotating-XOR hash that can be used to hash memory buffers of any size.
  */
 public static uint hash_memory(void *ptr, size_t bytes) {


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]