[geary/wip/789924-network-transition-redux: 9/11] Refine process for closing MinimalFolder.



commit 00c27126b9fb4896cd2c2ca4954c4329c1f6fe9a
Author: Michael James Gratton <mike vee net>
Date:   Mon Feb 5 18:33:54 2018 +1100

    Refine process for closing MinimalFolder.
    
    * src/engine/imap-engine/imap-engine-minimal-folder.vala (MinimalFolder):
      Remove user_close_async in favour of passing a closure to the UserClose
      replay op, allowing private code to be kept private. Rename
      close_internal_async and close_internal_locked_async to drop the
      "_async" prefix. Make close_remote_session notify tasks waiting for the
      remote session as appropriate that there isn't a remote around any
      more. Substantially rework close_internal_locked to greatly simplify
      it, ensure the open cancellable actually gets cancelled, don't try to
      flush replay ops if there isn't an open remote session and null out the
      replay queue after use rather than wasting CPU constructing a new one
      straight away. Don't pass open_cancellable to ReplayAppend() since we
      want it to keep going when flushing the replay queue.
    
    * src/engine/imap-engine/imap-engine-replay-queue.vala (ReplayQueue): Use
      a Cancellable to stop waiting for a remote session when closing the
      queue, rather than waiting for wait_for_remote_async itself, so it can
      be managed internally rather than relying on coordination with
      MinimalFolder::close_internal_locked.

 .../imap-engine/imap-engine-minimal-folder.vala    |  222 ++++++++++----------
 .../imap-engine/imap-engine-replay-queue.vala      |   25 ++-
 .../replay-ops/imap-engine-replay-append.vala      |    4 +-
 .../replay-ops/imap-engine-user-close.vala         |   43 ++--
 4 files changed, 150 insertions(+), 144 deletions(-)
---
diff --git a/src/engine/imap-engine/imap-engine-minimal-folder.vala 
b/src/engine/imap-engine/imap-engine-minimal-folder.vala
index 735f467..4c31e94 100644
--- a/src/engine/imap-engine/imap-engine-minimal-folder.vala
+++ b/src/engine/imap-engine/imap-engine-minimal-folder.vala
@@ -80,15 +80,15 @@ private class Geary.ImapEngine.MinimalFolder : Geary.Folder, Geary.FolderSupport
         }
     }
 
-    internal ReplayQueue replay_queue { get; private set; }
+    internal ReplayQueue? replay_queue { get; private set; default = null; }
     internal EmailPrefetcher email_prefetcher { get; private set; }
 
     private weak GenericAccount _account;
     private Geary.AggregatedFolderProperties _properties =
         new Geary.AggregatedFolderProperties(false, false);
 
-    private Folder.OpenFlags open_flags = OpenFlags.NONE;
     private int open_count = 0;
+    private Folder.OpenFlags open_flags = OpenFlags.NONE;
     private Cancellable? open_cancellable = null;
     private Nonblocking.Mutex open_mutex = new Nonblocking.Mutex();
     private Nonblocking.Mutex close_mutex = new Nonblocking.Mutex();
@@ -139,7 +139,6 @@ private class Geary.ImapEngine.MinimalFolder : Geary.Folder, Geary.FolderSupport
 
         this._special_folder_type = special_folder_type;
         this._properties.add(local_folder.get_properties());
-        this.replay_queue = new ReplayQueue(this);
         this.email_prefetcher = new EmailPrefetcher(this);
 
         this.remote_open_timer = new TimeoutManager.seconds(
@@ -228,9 +227,6 @@ private class Geary.ImapEngine.MinimalFolder : Geary.Folder, Geary.FolderSupport
         // first open gets to name the flags, but see note above
         this.open_flags = open_flags;
 
-        // reset to force waiting in wait_for_remote_async()
-        this.remote_wait_semaphore.reset();
-
         // reset to force waiting in wait_for_close_async()
         this.closed_semaphore.reset();
 
@@ -238,7 +234,9 @@ private class Geary.ImapEngine.MinimalFolder : Geary.Folder, Geary.FolderSupport
         // the remote opens
         this.refresh_unseen_timer.reset();
 
+        // Construct objects needed when open
         this.open_cancellable = new Cancellable();
+        this.replay_queue = new ReplayQueue(this);
 
         // Notify the email prefetcher
         this.email_prefetcher.open();
@@ -273,8 +271,9 @@ private class Geary.ImapEngine.MinimalFolder : Geary.Folder, Geary.FolderSupport
     public override async void wait_for_remote_async(Cancellable? cancellable = null) throws Error {
         check_open("wait_for_remote_async");
 
-        // if remote has not yet been opened, do it now ...
-        if (this.remote_session == null) {
+        // If remote has not yet been opened and we are not in the
+        // process of closing the folder, open a session right away.
+        if (this.remote_session == null && !this.open_cancellable.is_cancelled()) {
             this.open_remote_session.begin();
         }
 
@@ -304,16 +303,38 @@ private class Geary.ImapEngine.MinimalFolder : Geary.Folder, Geary.FolderSupport
     }
 
     /** {@inheritDoc} */
-    public override async bool close_async(Cancellable? cancellable = null) throws Error {
-        // Check open_count but only decrement inside of replay queue
-        if (open_count <= 0)
-            return false;
-
-        UserClose user_close = new UserClose(this, cancellable);
-        this.replay_queue.schedule(user_close);
-
-        yield user_close.wait_for_ready_async(cancellable);
-        return user_close.closing;
+    public override async bool close_async(Cancellable? cancellable = null)
+        throws Error {
+        bool is_closing = false;
+        if (open_count > 0) {
+            UserClose user_close = new UserClose(
+                () => {
+                    // Decrement the open count only if we are not
+                    // going to be fully closed here, since if so we
+                    // want close_internal_locked to be able to manage
+                    // when it is actually set to zero so it can clean
+                    // up beforehand as needed.
+                    if (this.open_count == 1) {
+                        is_closing = true;
+                        // Call close_internal in the background since
+                        // it recursively causes replay operations to
+                        // be scheduled, which since this is being
+                        // called from the replay queue would
+                        // otherwise deadlock.
+                        this.close_internal.begin(
+                            CloseReason.LOCAL_CLOSE,
+                            CloseReason.REMOTE_CLOSE,
+                            cancellable
+                        );
+                    } else if (this.open_count >= 1) {
+                        this.open_count -= 1;
+                    }
+                    return is_closing;
+                });
+            this.replay_queue.schedule(user_close);
+            yield user_close.wait_for_ready_async(cancellable);
+        }
+        return is_closing;
     }
 
     /** {@inheritDoc} */
@@ -697,9 +718,22 @@ private class Geary.ImapEngine.MinimalFolder : Geary.Folder, Geary.FolderSupport
      * Unhooks the IMAP folder session and returns it to the account.
      */
     internal void close_remote_session(Folder.CloseReason remote_reason) {
-        // Block anyone calling wait_for_remote_async(), as the session
-        // will no longer available.
-        this.remote_wait_semaphore.reset();
+        // Since the remote session has is/has gone away, we need to
+        // let waiters know. In the case of the folder being closed,
+        // notify that no more remotes will ever come back, otherwise
+        // reset the semaphore to keep them waiting, in case it does.
+        //
+        // We use open_cancellable to determine if the folder is open
+        // since that is cancelled before the replay queue is flushed,
+        // and open_count is only set to zero only afterwards. This is
+        // important since we need to let replay queue ops that are
+        // being flushed know if the session goes away so they wake
+        // up.
+        if (this.open_cancellable.is_cancelled()) {
+            notify_remote_waiters(false);
+        } else {
+            this.remote_wait_semaphore.reset();
+        }
 
         Imap.FolderSession session = this.remote_session;
         this.remote_session = null;
@@ -717,44 +751,15 @@ private class Geary.ImapEngine.MinimalFolder : Geary.Folder, Geary.FolderSupport
     }
 
     /**
-     * Starts closing the folder, called from {@link UserClose}.
+     * Closes the folder and the remote session.
      */
-    internal async bool user_close_async(Cancellable? cancellable) {
-        // decrement open_count and, if zero, continue closing Folder
-        if (open_count == 0 || --open_count > 0)
-            return false;
-
-        // Close the prefetcher early so it stops using the remote ASAP
-        this.email_prefetcher.close();
-
-        // block anyone from wait_for_remote_async(), as this is no longer open
-        this.remote_wait_semaphore.reset();
-
-        // don't yield here, close_internal_async() needs to be called outside of the replay queue
-        // the open_count protects against this path scheduling it more than once
-        this.close_internal_async.begin(
-            CloseReason.LOCAL_CLOSE,
-            CloseReason.REMOTE_CLOSE,
-            true,
-            cancellable
-        );
-
-        return true;
-    }
-
-    /**
-     * Forces closes the folder.
-     *
-     * NOTE: This bypasses open_count and forces the Folder closed.
-     */
-    internal async void close_internal_async(Folder.CloseReason local_reason,
-                                             Folder.CloseReason remote_reason,
-                                             bool flush_pending,
-                                             Cancellable? cancellable) {
+    private async void close_internal(Folder.CloseReason local_reason,
+                                      Folder.CloseReason remote_reason,
+                                      Cancellable? cancellable) {
         try {
             int token = yield this.close_mutex.claim_async(cancellable);
-            yield close_internal_locked_async(
-                local_reason, remote_reason, flush_pending, cancellable
+            yield close_internal_locked(
+                local_reason, remote_reason, cancellable
             );
             this.close_mutex.release(ref token);
         } catch (Error err) {
@@ -762,76 +767,73 @@ private class Geary.ImapEngine.MinimalFolder : Geary.Folder, Geary.FolderSupport
         }
     }
 
-    // Should only be called when close_mutex is locked, i.e. use close_internal_async()
-    private async void close_internal_locked_async(Folder.CloseReason local_reason,
-                                                   Folder.CloseReason remote_reason,
-                                                   bool flush_pending,
-                                                   Cancellable? cancellable) {
+    // Should only be called when close_mutex is locked, i.e. use close_internal()
+    private async void close_internal_locked(Folder.CloseReason local_reason,
+                                             Folder.CloseReason remote_reason,
+                                             Cancellable? cancellable) {
         // Ensure we don't attempt to start opening a remote while
         // closing
         this._account.session_pool.ready.disconnect(on_remote_ready);
         this.remote_open_timer.reset();
 
-        // only flushing pending ReplayOperations if this is a "clean" close, not forced due to
-        // error and if specified by caller (could be a non-error close on the server, i.e. "BYE",
-        // but the connection is dropping, so don't flush pending)
-        flush_pending = (
-            flush_pending &&
+        // Stop any internal tasks that are running
+        this.open_cancellable.cancel();
+        this.email_prefetcher.close();
+
+        // Once we get to this point, either there will be a remote
+        // session open already, or none will ever get opened - no
+        // more attempts to open a session will be made. We can't
+        // block access to any remote session here however since
+        // pending replay operations may need it still. Instead, rely
+        // on the replay queue itself to reject any new operations
+        // being queued once we close it.
+
+        // Only flush pending operations if the remote is open (if
+        // closed they will deadlock waiting for one to open), and if
+        // this is a "clean" close, that is not forced due to error.
+        bool flush_pending = (
+            this.remote_session != null &&
             !local_reason.is_error() &&
             !remote_reason.is_error()
         );
 
         if (flush_pending) {
-            // We are flushing the queue, so gather operations from
-            // Revokables to give them a chance to schedule their
+            // Since we are flushing the queue, gather operations
+            // from Revokables to give them a chance to schedule their
             // commit operations before going down
             Gee.List<ReplayOperation> final_ops = new Gee.ArrayList<ReplayOperation>();
             notify_closing(final_ops);
             foreach (ReplayOperation op in final_ops)
                 replay_queue.schedule(op);
-        } else {
-            // Not flushing the queue, so notify all operations
-            // waiting for the remote that it's not coming available
-            // ... this wakes up any ReplayOperation blocking on
-            // wait_for_remote_async(), necessary in order to finish
-            // ReplayQueue.close_async (i.e. to prevent deadlock);
-            // this is necessary because it's possible for this method
-            // to be called before a session has even had a chance to
-            // open.
-            //
-            // We don't want to do this for a clean close yet, because
-            // some pending operations may still need to use the
-            // session.
-            notify_remote_waiters(false);
         }
 
-        // swap out the ReplayQueue while closing so, if re-opened,
-        // future commands can be queued on the new queue
-        ReplayQueue closing_replay_queue = this.replay_queue;
-        this.replay_queue = new ReplayQueue(this);
-
-        // Close the replay queues; if a "clean" close, flush pending operations so everything
-        // gets a chance to run; if forced close, drop everything outstanding
+        // Close the replay queues; if a "clean" close, flush pending
+        // operations so everything gets a chance to run; if forced
+        // close, drop everything outstanding
+        debug("Closing replay queue for %s (flush_pending=%s): %s",
+              to_string(), flush_pending.to_string(), this.replay_queue.to_string());
         try {
-            debug("Closing replay queue for %s (flush_pending=%s): %s", to_string(),
-                  flush_pending.to_string(), closing_replay_queue.to_string());
-            yield closing_replay_queue.close_async(flush_pending);
-            debug("Closed replay queue for %s: %s", to_string(), closing_replay_queue.to_string());
-        } catch (Error replay_queue_err) {
-            debug("Error closing %s replay queue: %s", to_string(), replay_queue_err.message);
+            yield this.replay_queue.close_async(flush_pending);
+            debug("Closed replay queue for %s: %s",
+                  to_string(), this.replay_queue.to_string());
+        } catch (Error err) {
+            debug("Error closing %s replay queue: %s",
+                  to_string(), err.message);
         }
 
-        // If flushing, now notify waiters that the queue has bee flushed
-        if (flush_pending) {
-            notify_remote_waiters(false);
-        }
+        // Actually close the remote folder
+        close_remote_session(remote_reason);
 
-        // forced closed one way or another, so reset state
-        this.open_count = 0;
+        // Since both the remote session and replay queue have shut
+        // down, we can reset the folder's internal state.
+        this.remote_wait_semaphore.reset();
+        this.replay_queue = null;
+        this.open_cancellable = null;
         this.open_flags = OpenFlags.NONE;
 
-        // Actually close the remote folder
-        close_remote_session(remote_reason);
+        // Officially marks the folder as closed. Beyond this point it
+        // may start to re-open again if open_async is called.
+        this.open_count = 0;
 
         // need to call these every time, even if remote was not fully
         // opened, as some callers rely on order of signals
@@ -841,7 +843,7 @@ private class Geary.ImapEngine.MinimalFolder : Geary.Folder, Geary.FolderSupport
         // Notify waiting tasks
         this.closed_semaphore.blind_notify();
 
-        debug("Folder %s closed", to_string());
+        debug("%s: Folder closed", to_string());
     }
 
     /**
@@ -934,10 +936,9 @@ private class Geary.ImapEngine.MinimalFolder : Geary.Folder, Geary.FolderSupport
                     remote_reason = CloseReason.REMOTE_ERROR;
                 }
 
-                this.close_internal_async.begin(
+                yield close_internal(
                     local_reason,
                     remote_reason,
-                    false,
                     null // Don't pass cancellable, close must complete
                 );
             }
@@ -959,10 +960,9 @@ private class Geary.ImapEngine.MinimalFolder : Geary.Folder, Geary.FolderSupport
             this._account.release_folder_session(session);
             if (!(err is IOError.CANCELLED)) {
                 notify_open_failed(Folder.OpenFailed.LOCAL_ERROR, err);
-                this.close_internal_async.begin(
+                yield close_internal(
                     CloseReason.LOCAL_ERROR,
                     CloseReason.REMOTE_CLOSE,
-                    false,
                     null // Don't pass cancellable, close must complete
                 );
             }
@@ -1041,9 +1041,9 @@ private class Geary.ImapEngine.MinimalFolder : Geary.Folder, Geary.FolderSupport
             positions.add(new Imap.SequenceNumber(pos));
 
         if (positions.size > 0) {
-            ReplayAppend op = new ReplayAppend(
-                this, remote_count, positions, this.open_cancellable
-            );
+            // We don't pass in open_cancellable here since we want
+            // the op to still run when closing and flushing the queue
+            ReplayAppend op = new ReplayAppend(this, remote_count, positions, null);
             op.email_appended.connect(notify_email_appended);
             op.email_locally_appended.connect(notify_email_locally_appended);
             op.email_count_changed.connect(notify_email_count_changed);
diff --git a/src/engine/imap-engine/imap-engine-replay-queue.vala 
b/src/engine/imap-engine/imap-engine-replay-queue.vala
index eb88851..dd8d81e 100644
--- a/src/engine/imap-engine/imap-engine-replay-queue.vala
+++ b/src/engine/imap-engine/imap-engine-replay-queue.vala
@@ -73,6 +73,7 @@ private class Geary.ImapEngine.ReplayQueue : Geary.BaseObject {
     private Scheduler.Scheduled? notification_timer = null;
     private int64 next_submission_number = 0;
     private State state = State.OPEN;
+    private Cancellable remote_wait_cancellable = new Cancellable();
 
     public virtual signal void scheduled(ReplayOperation op) {
         Logging.debug(
@@ -207,9 +208,8 @@ private class Geary.ImapEngine.ReplayQueue : Geary.BaseObject {
      */
     public bool schedule_server_notification(ReplayOperation op) {
         if (state != State.OPEN) {
-            debug("Unable to schedule notification operation %s on %s: replay queue closed", op.to_string(),
-                to_string());
-            
+            debug("Unable to schedule notification operation %s on %s: replay queue closed",
+                  op.to_string(), to_string());
             return false;
         }
         
@@ -326,12 +326,15 @@ private class Geary.ImapEngine.ReplayQueue : Geary.BaseObject {
         // consideration in schedule()
         state = State.CLOSING;
         closing();
-        
-        // if not flushing pending, clear out all waiting operations, backing out any that need to
-        // be backed out
-        if (!flush_pending)
+
+        // if not flushing pending, stop waiting for a remote session
+        // and clear out all waiting operations, backing out any that
+        // need to be backed out
+        if (!flush_pending) {
+            this.remote_wait_cancellable.cancel();
             yield clear_pending_async(cancellable);
-        
+        }
+
         // flush a ReplayClose operation down the pipe so all working operations complete
         CloseReplayQueue close_op = new CloseReplayQueue();
         bool is_scheduled = schedule(close_op);
@@ -491,11 +494,11 @@ private class Geary.ImapEngine.ReplayQueue : Geary.BaseObject {
             // wait until the remote folder is opened (or throws an exception, in which case closed)
             try {
                 if (!is_close_op && folder_opened && state == State.OPEN)
-                    yield owner.wait_for_remote_async();
+                    yield owner.wait_for_remote_async(this.remote_wait_cancellable);
             } catch (Error remote_err) {
                 debug("Folder %s closed or failed to open, remote replay queue closing: %s",
-                    to_string(), remote_err.message);
-                
+                      to_string(), remote_err.message);
+
                 // not open
                 folder_opened = false;
                 
diff --git a/src/engine/imap-engine/replay-ops/imap-engine-replay-append.vala 
b/src/engine/imap-engine/replay-ops/imap-engine-replay-append.vala
index 02055b6..b376637 100644
--- a/src/engine/imap-engine/replay-ops/imap-engine-replay-append.vala
+++ b/src/engine/imap-engine/replay-ops/imap-engine-replay-append.vala
@@ -9,7 +9,7 @@ private class Geary.ImapEngine.ReplayAppend : Geary.ImapEngine.ReplayOperation {
     private MinimalFolder owner;
     private int remote_count;
     private Gee.List<Imap.SequenceNumber> positions;
-    private Cancellable cancellable;
+    private Cancellable? cancellable;
 
     public signal void email_appended(Gee.Collection<Geary.EmailIdentifier> ids);
     public signal void email_locally_appended(Gee.Collection<Geary.EmailIdentifier> ids);
@@ -19,7 +19,7 @@ private class Geary.ImapEngine.ReplayAppend : Geary.ImapEngine.ReplayOperation {
     public ReplayAppend(MinimalFolder owner,
                         int remote_count,
                         Gee.List<Imap.SequenceNumber> positions,
-                        Cancellable cancellable) {
+                        Cancellable? cancellable) {
         // IGNORE remote errors because the reconnect will re-normalize the folder, making this
         // append moot
         base ("Append", Scope.REMOTE_ONLY, OnError.IGNORE);
diff --git a/src/engine/imap-engine/replay-ops/imap-engine-user-close.vala 
b/src/engine/imap-engine/replay-ops/imap-engine-user-close.vala
index 52b381a..d023c53 100644
--- a/src/engine/imap-engine/replay-ops/imap-engine-user-close.vala
+++ b/src/engine/imap-engine/replay-ops/imap-engine-user-close.vala
@@ -5,43 +5,46 @@
  */
 
 private class Geary.ImapEngine.UserClose : Geary.ImapEngine.ReplayOperation {
-    public bool closing = false;
-    
-    private MinimalFolder owner;
-    private Cancellable? cancellable;
-    
-    public UserClose(MinimalFolder owner, Cancellable? cancellable) {
+
+    /** A function that this operation can call to close the folder. */
+    public delegate bool CloseFolder();
+
+    /** Determines the state of the close operation. */
+    public Trillian is_closing = Trillian.UNKNOWN;
+
+    private CloseFolder close;
+
+
+    public UserClose(owned CloseFolder close) {
         base ("UserClose", Scope.LOCAL_ONLY);
-        
-        this.owner = owner;
-        this.cancellable = cancellable;
+        this.close = (owned) close;
     }
-    
+
     public override void notify_remote_removed_position(Imap.SequenceNumber removed) {
     }
-    
+
     public override void notify_remote_removed_ids(Gee.Collection<ImapDB.EmailIdentifier> ids) {
     }
-    
+
     public override void get_ids_to_be_remote_removed(Gee.Collection<ImapDB.EmailIdentifier> ids) {
     }
-    
+
     public override async ReplayOperation.Status replay_local_async() throws Error {
-        closing = yield owner.user_close_async(cancellable);
-        
+        bool closing = this.close();
+        this.is_closing = Trillian.from_boolean(closing);
         return ReplayOperation.Status.COMPLETED;
     }
-    
+
     public override async void backout_local_async() throws Error {
     }
-    
+
     public override async ReplayOperation.Status replay_remote_async() throws Error {
         // should not be called
         return ReplayOperation.Status.COMPLETED;
     }
-    
+
     public override string describe_state() {
-        return "";
+        return "is_closing: %s".printf(this.is_closing.to_string());
     }
-}
 
+}


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]