[geary/mjog/invert-folder-class-hierarchy: 66/72] engine: Update EmailPrefetcher to use replay queue downloading email




commit 3a611f6f8251c9270bd0841f0fcef19ac9a7cef1
Author: Michael Gratton <mike vee net>
Date:   Mon Mar 1 09:20:23 2021 +1100

    engine: Update EmailPrefetcher to use replay queue downloading email
    
    Use new FetchEmail replay op for doing actual fetching. Ensure email
    not successfully fetched is not removed from the queue so it is
    reattempted next time around.

 .../imap-engine/imap-engine-email-prefetcher.vala  | 80 ++++++++--------------
 1 file changed, 30 insertions(+), 50 deletions(-)
---
diff --git a/src/engine/imap-engine/imap-engine-email-prefetcher.vala 
b/src/engine/imap-engine/imap-engine-email-prefetcher.vala
index 30ed4037b..fb43b0780 100644
--- a/src/engine/imap-engine/imap-engine-email-prefetcher.vala
+++ b/src/engine/imap-engine/imap-engine-email-prefetcher.vala
@@ -1,6 +1,6 @@
 /*
- * Copyright 2016 Software Freedom Conservancy Inc.
- * Copyright 2019 Michael Gratton <mike vee net>
+ * Copyright © 2016 Software Freedom Conservancy Inc.
+ * Copyright © 2019-2021 Michael Gratton <mike vee net>
  *
  * This software is licensed under the GNU Lesser General Public License
  * (version 2.1 or later). See the COPYING file in this distribution.
@@ -40,7 +40,8 @@ private class Geary.ImapEngine.EmailPrefetcher : BaseObject,
     private weak ImapEngine.MinimalFolder folder;
     private Nonblocking.Mutex mutex = new Nonblocking.Mutex();
     private Gee.TreeSet<Geary.Email> prefetch_emails = new Gee.TreeSet<Geary.Email>(
-        Email.compare_recv_date_descending);
+        Email.compare_recv_date_descending
+    );
     private TimeoutManager prefetch_timer;
     private Cancellable running = new GLib.Cancellable();
 
@@ -172,14 +173,11 @@ private class Geary.ImapEngine.EmailPrefetcher : BaseObject,
     }
 
     private async void do_prefetch_batch_async() throws Error {
-        // snarf up all requested Emails for this round
-        Gee.TreeSet<Geary.Email> emails = prefetch_emails;
-        prefetch_emails = new Gee.TreeSet<Geary.Email>(Email.compare_recv_date_descending);
-
-        if (emails.size == 0)
-            return;
-
-        debug("Processing batch, size: %d", emails.size);
+        var to_prefetch = new Gee.TreeSet<Geary.Email>(
+            Email.compare_recv_date_descending
+        );
+        to_prefetch.add_all(this.prefetch_emails);
+        debug("Processing batch, size: %d", to_prefetch.size);
 
         // Big TODO: The engine needs to be able to synthesize
         // ENVELOPE (and any of the fields constituting it) from
@@ -190,28 +188,28 @@ private class Geary.ImapEngine.EmailPrefetcher : BaseObject,
         // requests so a large email doesn't monopolize the pipe and
         // prevent other requests from going through
 
-        Gee.HashSet<EmailIdentifier> chunk = new Gee.HashSet<EmailIdentifier>();
-        Gee.HashSet<EmailIdentifier> blanks = new Gee.HashSet<EmailIdentifier>();
+        var chunk = new Gee.HashSet<ImapDB.EmailIdentifier>();
+        var blanks = new Gee.HashSet<ImapDB.EmailIdentifier>();
         int64 chunk_bytes = 0;
         int count = 0;
 
-        while (emails.size > 0) {
+        while (to_prefetch.size > 0) {
             // dequeue emails by date received, newest to oldest
-            Geary.Email email = emails.first();
+            Geary.Email email = to_prefetch.first();
 
             if (email.properties == null) {
                 // There's no properties, so there's no idea how large
                 // the message is. Do these one at a time at the end.
-                emails.remove(email);
-                blanks.add(email.id);
+                to_prefetch.remove(email);
+                blanks.add((ImapDB.EmailIdentifier) email.id);
             } else if (email.properties.total_bytes < PREFETCH_CHUNK_BYTES ||
                        chunk.size == 0) {
                 // Add email that is smaller than one chunk or there's
                 // nothing in this chunk so far ... this means an
                 // oversized email will be pulled all by itself in the
                 // next round if there's stuff already ahead of it
-                emails.remove(email);
-                chunk.add(email.id);
+                to_prefetch.remove(email);
+                chunk.add((ImapDB.EmailIdentifier) email.id);
                 chunk_bytes += email.properties.total_bytes;
                 count++;
 
@@ -220,9 +218,7 @@ private class Geary.ImapEngine.EmailPrefetcher : BaseObject,
                 }
             }
 
-            bool keep_going = yield do_prefetch_email_async(
-                chunk, chunk_bytes
-            );
+            bool keep_going = yield do_prefetch_email_async(chunk, chunk_bytes);
 
             // clear out for next chunk ... this also prevents the
             // final prefetch_async() from trying to pull twice if
@@ -241,7 +237,7 @@ private class Geary.ImapEngine.EmailPrefetcher : BaseObject,
         if (chunk.size > 0) {
             yield do_prefetch_email_async(chunk, chunk_bytes);
         }
-        foreach (EmailIdentifier id in blanks) {
+        foreach (var id in blanks) {
             yield do_prefetch_email_async(Collection.single(id), -1);
         }
 
@@ -249,37 +245,22 @@ private class Geary.ImapEngine.EmailPrefetcher : BaseObject,
     }
 
     // Return true to continue, false to stop prefetching (cancelled or not open)
-    private async bool do_prefetch_email_async(Gee.Collection<EmailIdentifier> ids,
+    private async bool do_prefetch_email_async(Gee.Collection<ImapDB.EmailIdentifier> ids,
                                                int64 chunk_bytes) {
         debug("Prefetching %d emails (%sb)", ids.size, chunk_bytes.to_string());
-        var success = true;
+        var success = false;
         var cancellable = this.running;
 
+        var op = new FetchEmail(
+            this.folder, ids, PREFETCH_FIELDS, cancellable
+        );
+        this.folder.replay_queue.schedule(op);
         try {
-            Imap.FolderSession remote = yield this.folder.claim_remote_session(
-                cancellable
-            );
-
-            Gee.Collection<Imap.UID> uids = ImapDB.EmailIdentifier.to_uids(
-                (Gee.Collection<ImapDB.EmailIdentifier>) ids
-            );
-            Gee.Collection<Imap.MessageSet> message_sets =
-                Imap.MessageSet.uid_sparse(uids);
-            foreach (var message_set in message_sets) {
-                Gee.List<Email>? email = yield remote.list_email_async(
-                    message_set,
-                    PREFETCH_FIELDS,
-                    cancellable
-                );
-                if (email != null && !email.is_empty) {
-                    yield this.folder.local_folder.create_or_merge_email_async(
-                        email,
-                        true,
-                        this.folder.harvester,
-                        cancellable
-                    );
-                }
-            }
+            yield op.wait_for_ready_async(cancellable);
+            // remove email from the queue only once it has been
+            // successfully downloaded
+            this.prefetch_emails.remove_all(op.fetched_email);
+            success = true;
         } catch (GLib.IOError.CANCELLED err) {
             // fine
         } catch (EngineError.SERVER_UNAVAILABLE err) {
@@ -287,7 +268,6 @@ private class Geary.ImapEngine.EmailPrefetcher : BaseObject,
             debug("Error prefetching %d emails: %s", ids.size, err.message);
         } catch (GLib.Error err) {
             // not fine
-            success = false;
             warning("Error prefetching %d emails: %s", ids.size, err.message);
         }
 


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]