[geary/wip/778276-better-flag-updates: 14/25] Add an operations queue to GenericAccount for server operations.



commit b24c554f5196998b9a712a62dec019ac23ba908c
Author: Michael James Gratton <mike vee net>
Date:   Thu Nov 23 10:11:30 2017 +1100

    Add an operations queue to GenericAccount for server operations.
    
    This generalises the approach used to execute the flag watcher and
    background sync, provides a high-level means of managing local and remote
    operations, and provides a means of compartmentalising operation-specific
    code.
    
    * src/engine/imap-engine/imap-engine-account-operation.vala
      (AccountProcessor): Interface for denoting classes that implements some
      account-specific operation.
    
    * src/engine/imap-engine/imap-engine-account-processor.vala
      (AccountProcessor): Class to manage the operation queue and execute
      operations.
    
    * src/engine/imap-engine/imap-engine-generic-account.vala
      (GenericAccount): Create and manage an instance of AccountProcessor,
      add queue_operation method to allow operations to be queued.

 po/POTFILES.in                                     |    2 +
 src/CMakeLists.txt                                 |    2 +
 .../imap-engine/imap-engine-account-operation.vala |   86 +++++++++++
 .../imap-engine/imap-engine-account-processor.vala |   88 +++++++++++
 .../imap-engine/imap-engine-generic-account.vala   |   19 +++
 test/CMakeLists.txt                                |    1 +
 .../engine/imap-engine/account-processor-test.vala |  156 ++++++++++++++++++++
 test/test-engine.vala                              |    1 +
 8 files changed, 355 insertions(+), 0 deletions(-)
---
diff --git a/po/POTFILES.in b/po/POTFILES.in
index 4f79688..b8a6290 100644
--- a/po/POTFILES.in
+++ b/po/POTFILES.in
@@ -230,6 +230,8 @@ src/engine/imap-engine/gmail/imap-engine-gmail-drafts-folder.vala
 src/engine/imap-engine/gmail/imap-engine-gmail-folder.vala
 src/engine/imap-engine/gmail/imap-engine-gmail-search-folder.vala
 src/engine/imap-engine/gmail/imap-engine-gmail-spam-trash-folder.vala
+src/engine/imap-engine/imap-engine-account-operation.vala
+src/engine/imap-engine/imap-engine-account-processor.vala
 src/engine/imap-engine/imap-engine-account-synchronizer.vala
 src/engine/imap-engine/imap-engine-batch-operations.vala
 src/engine/imap-engine/imap-engine-contact-store.vala
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 5e2ab1a..82372a4 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -190,6 +190,8 @@ engine/imap-db/outbox/smtp-outbox-folder-properties.vala
 engine/imap-db/outbox/smtp-outbox-folder-root.vala
 
 engine/imap-engine/imap-engine.vala
+engine/imap-engine/imap-engine-account-operation.vala
+engine/imap-engine/imap-engine-account-processor.vala
 engine/imap-engine/imap-engine-account-synchronizer.vala
 engine/imap-engine/imap-engine-batch-operations.vala
 engine/imap-engine/imap-engine-contact-store.vala
diff --git a/src/engine/imap-engine/imap-engine-account-operation.vala 
b/src/engine/imap-engine/imap-engine-account-operation.vala
new file mode 100644
index 0000000..3913021
--- /dev/null
+++ b/src/engine/imap-engine/imap-engine-account-operation.vala
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2017 Michael Gratton <mike vee net>
+ *
+ * This software is licensed under the GNU Lesser General Public License
+ * (version 2.1 or later).  See the COPYING file in this distribution.
+ */
+
+/**
+ * A unit of work to be executed by {@link GenericAccount}.
+ *
+ * To queue an operation for execution, pass an instance to {@link
+ * GenericAccount.queue_operation} when the account is opened. It will
+ * added to the accounts queue and executed asynchronously when it
+ * reaches the front.
+ *
+ * Execution of the operation is managed by {@link
+ * AccountProcessor}. Since the processor will not en-queue duplicate
+ * operations, implementations may override the {@link equal_to}
+ * method to ensure that the same operation is not queued twice.
+ */
+public abstract class Geary.ImapEngine.AccountOperation : Geary.BaseObject {
+
+
+    /**
+     * Fired by after processing when the operation has completed.
+     *
+     * This is fired regardless of if an error was thrown after {@link
+     * execute} is called. It is always fired after either {@link
+     * succeeded} or {@link failed} is fired.
+     *
+     * Implementations should not fire this themselves, the
+     * processor will do it for them.
+     */
+    public signal void completed();
+
+    /**
+     * Fired by the processor if the operation completes successfully.
+     *
+     * This is fired only after {@link execute} was called and did
+     * not raise an error.
+     *
+     * Implementations should not fire this themselves, the
+     * processor will do it for them.
+     */
+    public signal void succeeded();
+
+    /**
+     * Fired by the processor if the operation throws an error.
+     *
+     * This is fired only after {@link execute} was called and
+     * threw an error. The argument is the error that was thrown.
+     *
+     * Implementations should not fire this themselves, the
+     * processor will do it for them.
+     */
+    public signal void failed(Error err);
+
+
+    /**
+     * Called by the processor to execute this operation.
+     */
+    public abstract async void execute(Cancellable cancellable) throws Error;
+
+    /**
+     * Determines if this operation is equal to another.
+     *
+     * By default assumes that the same instance or two different
+     * instances of the exact same type are equal. Implementations
+     * should override it if they wish to guard against different
+     * instances of the same high-level operation from being executed
+     * twice.
+     */
+    public virtual bool equal_to(AccountOperation op) {
+        return (op != null && (this == op || this.get_type() == op.get_type()));
+    }
+
+    /**
+     * Provides a representation of this operation for debugging.
+     *
+     * By default simply returns the name of the class.
+     */
+    public virtual string to_string() {
+        return this.get_type().name();
+    }
+
+}
diff --git a/src/engine/imap-engine/imap-engine-account-processor.vala 
b/src/engine/imap-engine/imap-engine-account-processor.vala
new file mode 100644
index 0000000..0e6faea
--- /dev/null
+++ b/src/engine/imap-engine/imap-engine-account-processor.vala
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2017 Michael Gratton <mike vee net>
+ *
+ * This software is licensed under the GNU Lesser General Public License
+ * (version 2.1 or later).  See the COPYING file in this distribution.
+ */
+
+/**
+ * Queues and asynchronously executes {@link AccountOperation} instances.
+ *
+ * Operations that are equal to any currently executing or currently
+ * in the queue will not be re-queued.
+ *
+ * Errors thrown are reported to the user via the account's
+ * `problem_report` signal.
+ */
+internal class Geary.ImapEngine.AccountProcessor : Geary.BaseObject {
+
+
+    private static bool op_equal(AccountOperation a, AccountOperation b) {
+        return a.equal_to(b);
+    }
+
+    /** Determines an operation is currently being executed. */
+    public bool is_executing { get { return this.current_op != null; } }
+
+    /** Returns the number of operations currently waiting in the queue. */
+    public uint waiting { get { return this.queue.size; } }
+
+
+    /** Fired when an error occurs processing an operation. */
+    public signal void operation_error(AccountOperation op, Error error);
+
+
+    private string id;
+
+    private Nonblocking.Queue<AccountOperation> queue =
+        new Nonblocking.Queue<AccountOperation>.fifo(op_equal);
+
+    private AccountOperation? current_op = null;
+
+    private Cancellable cancellable = new Cancellable();
+
+
+    public AccountProcessor(string id) {
+        this.id = id;
+        this.queue.allow_duplicates = false;
+        this.run.begin();
+    }
+
+    public void enqueue(AccountOperation op) {
+        if (this.current_op == null || !op.equal_to(this.current_op)) {
+            this.queue.send(op);
+        }
+    }
+
+    public void stop() {
+        this.cancellable.cancel();
+        this.queue.clear();
+    }
+
+    private async void run() {
+        while (!this.cancellable.is_cancelled()) {
+            AccountOperation? op = null;
+            try {
+                op = yield this.queue.receive(this.cancellable);
+            } catch (Error err) {
+                // we've been cancelled, so bail out
+                return;
+            }
+
+            if (op != null) {
+                debug("%s: Executing operation: %s", id, op.to_string());
+                this.current_op = op;
+                try {
+                    yield op.execute(this.cancellable);
+                    op.succeeded();
+                } catch (Error err) {
+                    op.failed(err);
+                    operation_error(op, err);
+                }
+                op.completed();
+                this.current_op = null;
+            }
+        }
+    }
+
+}
diff --git a/src/engine/imap-engine/imap-engine-generic-account.vala 
b/src/engine/imap-engine/imap-engine-generic-account.vala
index f56faaf..653c356 100644
--- a/src/engine/imap-engine/imap-engine-generic-account.vala
+++ b/src/engine/imap-engine/imap-engine-generic-account.vala
@@ -28,6 +28,7 @@ private abstract class Geary.ImapEngine.GenericAccount : Geary.Account {
     private Gee.HashMap<FolderPath, uint> refresh_unseen_timeout_ids
         = new Gee.HashMap<FolderPath, uint>();
     private Gee.HashSet<Geary.Folder> in_refresh_unseen = new Gee.HashSet<Geary.Folder>();
+    private AccountProcessor? processor;
     private AccountSynchronizer sync;
     private Cancellable? enumerate_folder_cancellable = null;
     private TimeoutManager refresh_folder_timer;
@@ -72,6 +73,19 @@ private abstract class Geary.ImapEngine.GenericAccount : Geary.Account {
         compile_special_search_names();
     }
 
+    /**
+     * Queues an operation for execution by this account.
+     *
+     * The operation will added to the account's {@link
+     * AccountProcessor} and executed asynchronously by that when it
+     * reaches the front.
+     */
+    public void queue_operation(AccountOperation op)
+        throws EngineError {
+        check_open();
+        this.processor.enqueue(op);
+    }
+
     protected override void notify_folders_available_unavailable(Gee.List<Geary.Folder>? available,
         Gee.List<Geary.Folder>? unavailable) {
         base.notify_folders_available_unavailable(available, unavailable);
@@ -141,6 +155,8 @@ private abstract class Geary.ImapEngine.GenericAccount : Geary.Account {
     }
     
     private async void internal_open_async(Cancellable? cancellable) throws Error {
+        this.processor = new AccountProcessor(this.to_string());
+
         try {
             yield local.open_async(information.data_dir, Engine.instance.resource_dir.get_child("sql"),
                 cancellable);
@@ -195,6 +211,9 @@ private abstract class Geary.ImapEngine.GenericAccount : Geary.Account {
         if (!open)
             return;
 
+        // Halt internal tasks early so they stop using local and
+        // remote connections.
+        this.processor.stop();
         this.sync.stop();
 
         Cancellable folder_cancellable = this.enumerate_folder_cancellable;
diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt
index 7affa70..8c95e6c 100644
--- a/test/CMakeLists.txt
+++ b/test/CMakeLists.txt
@@ -11,6 +11,7 @@ set(TEST_ENGINE_SRC
   engine/imap/command/imap-create-command-test.vala
   engine/imap/response/imap-namespace-response-test.vala
   engine/imap/transport/imap-deserializer-test.vala
+  engine/imap-engine/account-processor-test.vala
   engine/mime-content-type-test.vala
   engine/rfc822-mailbox-address-test.vala
   engine/rfc822-message-test.vala
diff --git a/test/engine/imap-engine/account-processor-test.vala 
b/test/engine/imap-engine/account-processor-test.vala
new file mode 100644
index 0000000..7c14219
--- /dev/null
+++ b/test/engine/imap-engine/account-processor-test.vala
@@ -0,0 +1,156 @@
+/*
+ * Copyright 2017 Michael Gratton <mike vee net>
+ *
+ * This software is licensed under the GNU Lesser General Public License
+ * (version 2.1 or later). See the COPYING file in this distribution.
+ */
+
+errordomain AccountProcessorTestError {
+    TEST;
+}
+
+public class Geary.ImapEngine.AccountProcessorTest : Gee.TestCase {
+
+
+    public class TestOperation : AccountOperation {
+
+        public bool throw_error = false;
+        public bool wait_for_cancel = false;
+        public bool execute_called = false;
+
+        private Nonblocking.Spinlock spinlock = new Nonblocking.Spinlock();
+
+        public override async void execute(Cancellable cancellable)
+            throws Error {
+            print("Test op/");
+            this.execute_called = true;
+            if (this.wait_for_cancel) {
+                yield this.spinlock.wait_async(cancellable);
+            }
+            if (this.throw_error) {
+                throw new AccountProcessorTestError.TEST("Failed");
+            }
+        }
+
+    }
+
+
+    public class OtherOperation : TestOperation {
+
+    }
+
+
+    private AccountProcessor processor;
+    private uint succeeded;
+    private uint failed;
+    private uint completed;
+
+
+    public AccountProcessorTest() {
+        base("Geary.ImapEngine.AccountProcessorTest");
+        add_test("test_success", test_success);
+        add_test("test_failure", test_failure);
+        add_test("test_duplicate", test_duplicate);
+        add_test("test_stop", test_stop);
+
+        this.processor = new AccountProcessor("processor");
+    }
+
+    public override void set_up() {
+        this.succeeded = 0;
+        this.failed = 0;
+        this.completed = 0;
+    }
+
+    public void test_success() {
+        TestOperation op = setup_operation(new TestOperation());
+
+        this.processor.enqueue(op);
+        assert(this.processor.waiting == 1);
+
+        execute_all();
+
+        assert(op.execute_called);
+        assert(this.succeeded == 1);
+        assert(this.failed == 0);
+        assert(this.completed == 1);
+    }
+
+    public void test_failure() {
+        TestOperation op = setup_operation(new TestOperation());
+        op.throw_error = true;
+
+        AccountOperation? error_op = null;
+        Error? error = null;
+        this.processor.operation_error.connect((proc, op, err) => {
+                error_op = op;
+                error = err;
+            });
+
+        this.processor.enqueue(op);
+        execute_all();
+
+        assert(this.succeeded == 0);
+        assert(this.failed == 1);
+        assert(this.completed == 1);
+        assert(error_op == op);
+        assert(error is AccountProcessorTestError.TEST);
+    }
+
+    public void test_duplicate() {
+        TestOperation op1 = setup_operation(new TestOperation());
+        TestOperation op2 = setup_operation(new TestOperation());
+        TestOperation op3 = setup_operation(new OtherOperation());
+
+        this.processor.enqueue(op1);
+        this.processor.enqueue(op2);
+        assert(this.processor.waiting == 1);
+
+        this.processor.enqueue(op3);
+        assert(this.processor.waiting == 2);
+    }
+
+    public void test_stop() {
+        TestOperation op1 = setup_operation(new TestOperation());
+        op1.wait_for_cancel = true;
+        TestOperation op2 = setup_operation(new OtherOperation());
+
+        this.processor.enqueue(op1);
+        this.processor.enqueue(op2);
+
+        while (!this.processor.is_executing) {
+            this.main_loop.iteration(true);
+        }
+
+        this.processor.stop();
+
+        while (this.main_loop.pending()) {
+            this.main_loop.iteration(true);
+        }
+
+        assert(!this.processor.is_executing);
+        assert(this.processor.waiting == 0);
+        assert(this.succeeded == 0);
+        assert(this.failed == 1);
+        assert(this.completed == 1);
+    }
+
+    private TestOperation setup_operation(TestOperation op) {
+        op.succeeded.connect(() => {
+                this.succeeded++;
+            });
+        op.failed.connect(() => {
+                this.failed++;
+            });
+        op.completed.connect(() => {
+                this.completed++;
+            });
+        return op;
+    }
+
+    private void execute_all() {
+        while (this.processor.is_executing || this.processor.waiting > 0) {
+            this.main_loop.iteration(true);
+        }
+    }
+}
diff --git a/test/test-engine.vala b/test/test-engine.vala
index b30c7a2..31c59db 100644
--- a/test/test-engine.vala
+++ b/test/test-engine.vala
@@ -29,6 +29,7 @@ int main(string[] args) {
     engine.add_suite(new Geary.Imap.DeserializerTest().get_suite());
     engine.add_suite(new Geary.Imap.CreateCommandTest().get_suite());
     engine.add_suite(new Geary.Imap.NamespaceResponseTest().get_suite());
+    engine.add_suite(new Geary.ImapEngine.AccountProcessorTest().get_suite());
     engine.add_suite(new Geary.Inet.Test().get_suite());
     engine.add_suite(new Geary.JS.Test().get_suite());
     engine.add_suite(new Geary.Mime.ContentTypeTest().get_suite());


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]