[geary/wip/778276-better-flag-updates: 18/19] Pay attention when the server notifies us of altered flags.



commit 00cdb78435d84730a4f5a9f6c63ff6a333999210
Author: Michael James Gratton <mike vee net>
Date:   Tue Nov 28 10:43:18 2017 +1100

    Pay attention when the server notifies us of altered flags.
    
    Actually process unsolicited FETCH responses from the server rather than
    dropping them when in Selected state. This means we don't need to
    periodically sync flags when a folder is open.
    
    * src/engine/imap/api/imap-folder.vala (Folder): Add new updated signal,
      fire it when an unsolicited FETCH is received. Keep FETCH (and SEARCH)
      accumulators null when no operation is in progress so we know when a
      FETCH is unsolicited.
      (Folder::exec_commands_async): Make fetched and searched lists in-args
      and let caller manage lifecycle of these collections, using them as the
      accumulators.
    
    * src/engine/imap-engine/imap-engine-minimal-folder.vala (Move): Hook up
      to new Imap.Folder updated signal when opening, queue the replay op
      when it is fired.
    
    * src/engine/imap-engine/replay-ops/imap-engine-replay-update.vala
      (ReplayUpdate): New replay op for handling unsolicited FETCH responses.

 src/CMakeLists.txt                                 |    1 +
 .../imap-engine/imap-engine-minimal-folder.vala    |   11 +
 .../replay-ops/imap-engine-replay-update.vala      |   96 ++++++++
 src/engine/imap/api/imap-folder.vala               |  237 ++++++++++----------
 4 files changed, 230 insertions(+), 115 deletions(-)
---
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 82372a4..deb1910 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -231,6 +231,7 @@ engine/imap-engine/replay-ops/imap-engine-remove-email.vala
 engine/imap-engine/replay-ops/imap-engine-replay-append.vala
 engine/imap-engine/replay-ops/imap-engine-replay-disconnect.vala
 engine/imap-engine/replay-ops/imap-engine-replay-removal.vala
+engine/imap-engine/replay-ops/imap-engine-replay-update.vala
 engine/imap-engine/replay-ops/imap-engine-server-search-email.vala
 engine/imap-engine/replay-ops/imap-engine-user-close.vala
 engine/imap-engine/yahoo/imap-engine-yahoo-account.vala
diff --git a/src/engine/imap-engine/imap-engine-minimal-folder.vala 
b/src/engine/imap-engine/imap-engine-minimal-folder.vala
index ec2d319..d0bdccd 100644
--- a/src/engine/imap-engine/imap-engine-minimal-folder.vala
+++ b/src/engine/imap-engine/imap-engine-minimal-folder.vala
@@ -700,6 +700,7 @@ private class Geary.ImapEngine.MinimalFolder : Geary.Folder, Geary.FolderSupport
                 
                 // signals
                 opening_folder.appended.connect(on_remote_appended);
+                opening_folder.updated.connect(on_remote_updated);
                 opening_folder.removed.connect(on_remote_removed);
                 opening_folder.disconnected.connect(on_remote_disconnected);
                 
@@ -991,6 +992,7 @@ private class Geary.ImapEngine.MinimalFolder : Geary.Folder, Geary.FolderSupport
         if (remote_folder != null) {
             // disconnect signals before ripping out reference
             remote_folder.appended.disconnect(on_remote_appended);
+            remote_folder.updated.disconnect(on_remote_updated);
             remote_folder.removed.disconnect(on_remote_removed);
             remote_folder.disconnected.disconnect(on_remote_disconnected);
         }
@@ -1088,6 +1090,15 @@ private class Geary.ImapEngine.MinimalFolder : Geary.Folder, Geary.FolderSupport
         }
     }
 
+    private void on_remote_updated(Imap.SequenceNumber position, Imap.FetchedData data) {
+        debug("%s on_remote_updated: remote_count=%d position=%s", to_string(),
+              this.remote_count, position.to_string());
+
+        this.replay_queue.schedule_server_notification(
+            new ReplayUpdate(this, this.remote_count, position, data)
+        );
+    }
+
     private void on_remote_removed(Imap.SequenceNumber position, int reported_remote_count) {
         debug("%s on_remote_removed: remote_count=%d position=%s reported_remote_count=%d", to_string(),
             remote_count, position.to_string(), reported_remote_count);
diff --git a/src/engine/imap-engine/replay-ops/imap-engine-replay-update.vala 
b/src/engine/imap-engine/replay-ops/imap-engine-replay-update.vala
new file mode 100644
index 0000000..e8357f4
--- /dev/null
+++ b/src/engine/imap-engine/replay-ops/imap-engine-replay-update.vala
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2016 Software Freedom Conservancy Inc.
+ * Copyright 2017 Michael Gratton <mike vee net>
+ *
+ * This software is licensed under the GNU Lesser General Public License
+ * (version 2.1 or later).  See the COPYING file in this distribution.
+ */
+
+/**
+ * Updates an existing message in the local store after un unsolicited FETCH
+ */
+private class Geary.ImapEngine.ReplayUpdate : Geary.ImapEngine.ReplayOperation {
+
+
+    private MinimalFolder owner;
+    private int remote_count;
+    private Imap.SequenceNumber position;
+    private Imap.FetchedData data;
+
+
+    public ReplayUpdate(MinimalFolder owner,
+                        int remote_count,
+                        Imap.SequenceNumber position,
+                        Imap.FetchedData data) {
+        base ("Update", Scope.LOCAL_ONLY, OnError.RETRY);
+
+        this.owner = owner;
+        this.remote_count = remote_count;
+        this.position = position;
+        this.data = data;
+    }
+
+    public override void notify_remote_removed_position(Imap.SequenceNumber removed) {
+    }
+
+    public override void notify_remote_removed_ids(Gee.Collection<ImapDB.EmailIdentifier> ids) {
+    }
+
+    public override void get_ids_to_be_remote_removed(Gee.Collection<ImapDB.EmailIdentifier> ids) {
+    }
+
+    public override async ReplayOperation.Status replay_local_async()
+        throws Error {
+        Imap.MessageFlags? message_flags =
+            this.data.data_map.get(Imap.FetchDataSpecifier.FLAGS) as Imap.MessageFlags;
+        if (message_flags != null) {
+            int local_count = -1;
+            int64 local_position = -1;
+
+            // need total count, including those marked for removal, to accurately calculate position
+            // from server's point of view, not client's
+            local_count = yield this.owner.local_folder.get_email_count_async(
+                ImapDB.Folder.ListFlags.INCLUDE_MARKED_FOR_REMOVE, null);
+            local_position = this.position.value - (this.remote_count - local_count);
+
+            ImapDB.EmailIdentifier? id = null;
+            if (local_position > 0) {
+                id = yield this.owner.local_folder.get_id_at_async(
+                      local_position, null
+                );
+            }
+
+            if (id != null) {
+                Gee.Map<Geary.ImapDB.EmailIdentifier, Geary.EmailFlags> changed_map = 
+                new Gee.HashMap<Geary.ImapDB.EmailIdentifier, Geary.EmailFlags>();
+                changed_map.set(id, new Imap.EmailFlags(message_flags));
+
+                yield this.owner.local_folder.set_email_flags_async(changed_map, null);
+
+                this.owner.replay_notify_email_flags_changed(changed_map);
+            } else {
+                debug("%s replay_local_async id is null!", to_string());
+            }
+        } else {
+            debug("%s Don't know what to do without any FLAGS: %s",
+                  to_string(), this.data.to_string());
+        }
+
+        return ReplayOperation.Status.COMPLETED;
+    }
+
+    public override async void backout_local_async() throws Error {
+    }
+
+    public override async ReplayOperation.Status replay_remote_async() {
+        return ReplayOperation.Status.CONTINUE;
+    }
+
+    public override string describe_state() {
+        Imap.MessageData fetch_flags =
+            this.data.data_map.get(Imap.FetchDataSpecifier.FLAGS);
+        return "position.value=%lld, flags=%s".printf(
+            this.position.value, fetch_flags.to_string()
+        );
+    }
+}
diff --git a/src/engine/imap/api/imap-folder.vala b/src/engine/imap/api/imap-folder.vala
index c0bd773..69a2bd8 100644
--- a/src/engine/imap/api/imap-folder.vala
+++ b/src/engine/imap/api/imap-folder.vala
@@ -40,41 +40,45 @@ private class Geary.Imap.Folder : BaseObject {
     private ClientSessionManager session_mgr;
     private ClientSession? session = null;
     private Nonblocking.Mutex cmd_mutex = new Nonblocking.Mutex();
-    private Gee.HashMap<SequenceNumber, FetchedData> fetch_accumulator = new Gee.HashMap<
-        SequenceNumber, FetchedData>();
-    private Gee.Set<Imap.UID> search_accumulator = new Gee.HashSet<Imap.UID>();
-    
+    private Gee.HashMap<SequenceNumber, FetchedData>? fetch_accumulator = null;
+    private Gee.Set<Imap.UID>? search_accumulator = null;
+
     /**
      * A (potentially unsolicited) response from the server.
      *
      * See [[http://tools.ietf.org/html/rfc3501#section-7.3.1]]
      */
     public signal void exists(int total);
-    
+
     /**
      * A (potentially unsolicited) response from the server.
      *
-     * See [[http://tools.ietf.org/html/rfc3501#section-7.4.1]]
+     * See [[http://tools.ietf.org/html/rfc3501#section-7.3.2]]
      */
-    public signal void expunge(SequenceNumber position);
-    
+    public signal void recent(int total);
+
     /**
      * A (potentially unsolicited) response from the server.
      *
-     * See [[http://tools.ietf.org/html/rfc3501#section-7.3.2]]
+     * See [[http://tools.ietf.org/html/rfc3501#section-7.4.1]]
      */
-    public signal void recent(int total);
-    
+    public signal void expunge(SequenceNumber position);
+
     /**
      * Fabricated from the IMAP signals and state obtained at open_async().
      */
     public signal void appended(int total);
-    
+
+    /**
+     * Fabricated from the IMAP signals and state obtained at open_async().
+     */
+    public signal void updated(SequenceNumber pos, FetchedData data);
+
     /**
      * Fabricated from the IMAP signals and state obtained at open_async().
      */
     public signal void removed(SequenceNumber pos, int total);
-    
+
     /**
      * Note that close_async() still needs to be called after this signal is fired.
      */
@@ -90,11 +94,9 @@ private class Geary.Imap.Folder : BaseObject {
     public async void open_async(Cancellable? cancellable) throws Error {
         if (is_open)
             throw new EngineError.ALREADY_OPEN("%s already open", to_string());
-        
-        fetch_accumulator.clear();
-        
+
         session = yield session_mgr.claim_authorized_session_async(cancellable);
-        
+
         // connect to interesting signals *before* selecting
         session.exists.connect(on_exists);
         session.expunge.connect(on_expunge);
@@ -147,33 +149,32 @@ private class Geary.Imap.Folder : BaseObject {
     public async void close_async(Cancellable? cancellable) throws Error {
         if (!is_open)
             return;
-        
+
         yield release_session_async(cancellable);
-        
-        fetch_accumulator.clear();
-        
-        readonly = Trillian.UNKNOWN;
-        accepts_user_flags = Trillian.UNKNOWN;
-        
-        is_open = false;
+
+        this.fetch_accumulator = null;
+        this.search_accumulator = null;
+
+        this.readonly = Trillian.UNKNOWN;
+        this.accepts_user_flags = Trillian.UNKNOWN;
+
+        this.is_open = false;
     }
-    
+
     private async void release_session_async(Cancellable? cancellable) {
-        if (session == null)
+        if (this.session == null)
             return;
-        
-        // set this.session to null before yielding to ClientSessionManager
-        ClientSession release_session = session;
-        session = null;
-        
-        release_session.exists.disconnect(on_exists);
-        release_session.expunge.disconnect(on_expunge);
-        release_session.fetch.disconnect(on_fetch);
-        release_session.recent.disconnect(on_recent);
-        release_session.search.disconnect(on_search);
-        release_session.status_response_received.disconnect(on_status_response);
-        release_session.disconnected.disconnect(on_disconnected);
-        
+
+        this.session.exists.disconnect(on_exists);
+        this.session.expunge.disconnect(on_expunge);
+        this.session.fetch.disconnect(on_fetch);
+        this.session.recent.disconnect(on_recent);
+        this.session.search.disconnect(on_search);
+        this.session.status_response_received.disconnect(on_status_response);
+        this.session.disconnected.disconnect(on_disconnected);
+
+        ClientSession release_session = this.session;
+        this.session = null;
         try {
             yield session_mgr.release_session_async(release_session, cancellable);
         } catch (Error err) {
@@ -208,14 +209,23 @@ private class Geary.Imap.Folder : BaseObject {
         expunge(pos);
         removed(pos, properties.select_examine_messages);
     }
-    
-    private void on_fetch(FetchedData fetched_data) {
+
+    private void on_fetch(FetchedData data) {
         // add if not found, merge if already received data for this email
-        FetchedData? already_present = fetch_accumulator.get(fetched_data.seq_num);
-        fetch_accumulator.set(fetched_data.seq_num,
-            (already_present != null) ? fetched_data.combine(already_present) : fetched_data);
+        debug("%s: FETCH (%s): %s:",
+              to_string(),
+              this.fetch_accumulator != null ? "accumulating" : "unsolicited",
+              data.to_string());
+        if (this.fetch_accumulator != null) {
+            FetchedData? existing = this.fetch_accumulator.get(data.seq_num);
+            this.fetch_accumulator.set(
+                data.seq_num, (existing != null) ? data.combine(existing) : data
+            );
+        } else {
+            updated(data.seq_num, data);
+        }
     }
-    
+
     private void on_recent(int total) {
         debug("%s RECENT %d", to_string(), total);
         
@@ -229,15 +239,19 @@ private class Geary.Imap.Folder : BaseObject {
     private void on_search(int64[] seq_or_uid) {
         // All SEARCH from this class are UID SEARCH, so can reliably convert and add to
         // accumulator
-        foreach (int64 uid in seq_or_uid) {
-            try {
-                search_accumulator.add(new UID.checked(uid));
-            } catch (ImapError imaperr) {
-                debug("%s Unable to process SEARCH UID result: %s", to_string(), imaperr.message);
+        if (this.search_accumulator != null) {
+            foreach (int64 uid in seq_or_uid) {
+                try {
+                    this.search_accumulator.add(new UID.checked(uid));
+                } catch (ImapError imaperr) {
+                    debug("%s Unable to process SEARCH UID result: %s", to_string(), imaperr.message);
+                }
             }
+        } else {
+            debug("%s Not handling unsolicited SEARCH response", to_string());
         }
     }
-    
+
     private void on_status_response(StatusResponse status_response) {
         // only interested in ResponseCodes here
         ResponseCode? response_code = status_response.response_code;
@@ -303,51 +317,39 @@ private class Geary.Imap.Folder : BaseObject {
     // FETCH commands can generate a FolderError.RETRY.  State will be updated to accomodate retry,
     // but all Commands must be regenerated to ensure new state is reflected in requests.
     private async Gee.Map<Command, StatusResponse>? exec_commands_async(Gee.Collection<Command> cmds,
-        out Gee.HashMap<SequenceNumber, FetchedData>? fetched, out Gee.Set<Imap.UID>? search_results,
-        Cancellable? cancellable) throws Error {
-        int token = yield cmd_mutex.claim_async(cancellable);
+                                                                        Gee.HashMap<SequenceNumber, 
FetchedData>? fetch_results,
+                                                                        Gee.Set<Imap.UID>? search_results,
+                                                                        Cancellable? cancellable)
+        throws Error {
         Gee.Map<Command, StatusResponse>? responses = null;
-        // execute commands with mutex locked
-        Error? err = null;
+        int token = yield cmd_mutex.claim_async(cancellable);
+        Error? thrown = null;
         try {
-            // check open after acquiring mutex, so that if an error is thrown it's caught and
-            // mutex can be closed
             check_open();
-            
+
+            this.fetch_accumulator = fetch_results;
+            this.search_accumulator = search_results;
             responses = yield session.send_multiple_commands_async(cmds, cancellable);
-        } catch (Error store_fetch_err) {
-            err = store_fetch_err;
-        }
-        
-        // swap out results and clear accumulators
-        if (fetch_accumulator.size > 0) {
-            fetched = fetch_accumulator;
-            fetch_accumulator = new Gee.HashMap<SequenceNumber, FetchedData>();
-        } else {
-            fetched = null;
-        }
-        
-        if (search_accumulator.size > 0) {
-            search_results = search_accumulator;
-            search_accumulator = new Gee.HashSet<Imap.UID>();
-        } else {
-            search_results = null;
+        } catch (Error err) {
+            thrown = err;
         }
-        
-        // unlock after clearing accumulators
+
+        this.fetch_accumulator = null;
+        this.search_accumulator = null;
+
         cmd_mutex.release(ref token);
-        
-        if (err != null)
-            throw err;
-        
-        // process response stati after unlocking and clearing accumulators
-        assert(responses != null);
-        foreach (Command cmd in responses.keys)
+
+        if (thrown != null) {
+            throw thrown;
+        }
+
+        foreach (Command cmd in responses.keys) {
             throw_on_failed_status(responses.get(cmd), cmd);
-        
+        }
+
         return responses;
     }
-    
+
     // HACK: See https://bugzilla.gnome.org/show_bug.cgi?id=714902
     //
     // Detect when a server has returned a BAD response to FETCH BODY[HEADER.FIELDS (HEADER-LIST)]
@@ -420,12 +422,16 @@ private class Geary.Imap.Folder : BaseObject {
         // which is all we're interested in here
         SearchCriteria criteria = new SearchCriteria(SearchCriterion.message_set(msg_set));
         SearchCommand cmd = new SearchCommand.uid(criteria);
-        
-        Gee.Set<Imap.UID>? search_results;
-        yield exec_commands_async(Geary.iterate<Command>(cmd).to_array_list(), null, out search_results,
-            cancellable);
-        
-        return (search_results != null && search_results.size > 0) ? search_results : null;
+
+        Gee.Set<Imap.UID> search_results = new Gee.HashSet<Imap.UID>();
+        yield exec_commands_async(
+            Geary.iterate<Command>(cmd).to_array_list(),
+            null,
+            search_results,
+            cancellable
+        );
+
+        return (search_results.size > 0) ? search_results : null;
     }
     
     private Gee.Collection<FetchCommand> assemble_list_commands(Imap.MessageSet msg_set,
@@ -519,8 +525,8 @@ private class Geary.Imap.Folder : BaseObject {
     public async Gee.List<Geary.Email>? list_email_async(MessageSet msg_set, Geary.Email.Field fields,
         Cancellable? cancellable) throws Error {
         check_open();
-        
-        Gee.HashMap<SequenceNumber, FetchedData>? fetched = null;
+        Gee.HashMap<SequenceNumber, FetchedData> fetched =
+            new Gee.HashMap<SequenceNumber, FetchedData>();
         FetchBodyDataSpecifier? header_specifier = null;
         FetchBodyDataSpecifier? body_specifier = null;
         FetchBodyDataSpecifier? preview_specifier = null;
@@ -536,7 +542,7 @@ private class Geary.Imap.Folder : BaseObject {
             
             // Commands prepped, do the fetch and accumulate all the responses
             try {
-                yield exec_commands_async(cmds, out fetched, null, cancellable);
+                yield exec_commands_async(cmds, fetched, null, cancellable);
             } catch (Error err) {
                 if (err is FolderError.RETRY) {
                     debug("Retryable server failure detected for %s: %s", to_string(), err.message);
@@ -549,10 +555,10 @@ private class Geary.Imap.Folder : BaseObject {
             
             break;
         }
-        
-        if (fetched == null || fetched.size == 0)
+
+        if (fetched.size == 0)
             return null;
-        
+
         // Convert fetched data into Geary.Email objects
         // because this could be for a lot of email, do in a background thread
         Gee.List<Geary.Email> email_list = new Gee.ArrayList<Geary.Email>();
@@ -608,11 +614,12 @@ private class Geary.Imap.Folder : BaseObject {
         
         Gee.List<Command> cmds = new Gee.ArrayList<Command>();
         cmds.add(new FetchCommand.data_type(msg_set, FetchDataSpecifier.UID));
-        
-        Gee.HashMap<SequenceNumber, FetchedData>? fetched;
-        yield exec_commands_async(cmds, out fetched, null, cancellable);
 
-        if (fetched == null || fetched.is_empty) {
+        Gee.HashMap<SequenceNumber, FetchedData> fetched =
+            new Gee.HashMap<SequenceNumber, FetchedData>();
+        yield exec_commands_async(cmds, fetched, null, cancellable);
+
+        if (fetched.is_empty) {
             throw new ImapError.INVALID("Server returned no sequence numbers");
         }
 
@@ -625,7 +632,7 @@ private class Geary.Imap.Folder : BaseObject {
         }
         return map;
     }
-    
+
     public async void remove_email_async(Gee.List<MessageSet> msg_sets, Cancellable? cancellable)
         throws Error {
         check_open();
@@ -740,18 +747,18 @@ private class Geary.Imap.Folder : BaseObject {
         // always perform a UID SEARCH
         Gee.Collection<Command> cmds = new Gee.ArrayList<Command>();
         cmds.add(new SearchCommand.uid(criteria));
-        
-        Gee.Set<Imap.UID>? search_results;
-        yield exec_commands_async(cmds, null, out search_results, cancellable);
-        if (search_results == null || search_results.size == 0)
-            return null;
-        
-        Gee.SortedSet<Imap.UID> tree = new Gee.TreeSet<Imap.UID>();
-        tree.add_all(search_results);
-        
+
+        Gee.Set<Imap.UID> search_results = new Gee.HashSet<Imap.UID>();
+        yield exec_commands_async(cmds, null, search_results, cancellable);
+
+        Gee.SortedSet<Imap.UID> tree = null;
+        if (search_results.size > 0) {
+            tree = new Gee.TreeSet<Imap.UID>();
+            tree.add_all(search_results);
+        }
         return tree;
     }
-    
+
     // NOTE: If fields are added or removed from this method, BASIC_FETCH_FIELDS *must* be updated
     // as well
     private void fields_to_fetch_data_types(Geary.Email.Field fields,


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]