[geary] Tidy up and document lock classes in Geary.Nonblocking.
- From: Michael Gratton <mjog src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [geary] Tidy up and document lock classes in Geary.Nonblocking.
- Date: Thu, 25 Jan 2018 23:27:27 +0000 (UTC)
commit c0501bda19ee78139e0767539cba5d97fb061324
Author: Michael James Gratton <mike vee net>
Date: Wed Jan 17 14:47:57 2018 +1100
Tidy up and document lock classes in Geary.Nonblocking.
* src/engine/nonblocking/nonblocking-abstract-semaphore.vala: Renamed to
nonblocking-lock.vala, rename class from AbstractSemaphore to Lock and
update subclasses, since it is used as a basis for a number of
different lock implementations. Make two getter methods into read-only
properties. Fill out doc comments to be much more comprehensive.
* src/engine/nonblocking/nonblocking-mutex.vala (Mutex): Provide a
high-level API and update documentation comments.
* src/engine/nonblocking/nonblocking-queue.vala (Queue): Add to doc
comments.
* src/engine/nonblocking/nonblocking-variants.vala: Actually document
how each of the variants behave.
src/CMakeLists.txt | 2 +-
.../imap-engine/imap-engine-replay-operation.vala | 4 +-
src/engine/meson.build | 2 +-
src/engine/nonblocking/nonblocking-batch.vala | 2 +-
.../nonblocking-counting-semaphore.vala | 17 ++-
...stract-semaphore.vala => nonblocking-lock.vala} | 171 +++++++++++++-------
src/engine/nonblocking/nonblocking-mutex.vala | 90 ++++++++---
src/engine/nonblocking/nonblocking-queue.vala | 5 +-
src/engine/nonblocking/nonblocking-variants.vala | 47 ++++--
9 files changed, 235 insertions(+), 105 deletions(-)
---
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index b5e12c9..ab2d27c 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -255,11 +255,11 @@ engine/mime/mime-disposition-type.vala
engine/mime/mime-error.vala
engine/mime/mime-multipart-subtype.vala
-engine/nonblocking/nonblocking-abstract-semaphore.vala
engine/nonblocking/nonblocking-batch.vala
engine/nonblocking/nonblocking-concurrent.vala
engine/nonblocking/nonblocking-counting-semaphore.vala
engine/nonblocking/nonblocking-error.vala
+engine/nonblocking/nonblocking-lock.vala
engine/nonblocking/nonblocking-mutex.vala
engine/nonblocking/nonblocking-queue.vala
engine/nonblocking/nonblocking-reporting-semaphore.vala
diff --git a/src/engine/imap-engine/imap-engine-replay-operation.vala
b/src/engine/imap-engine/imap-engine-replay-operation.vala
index 36ecb99..9cecd2b 100644
--- a/src/engine/imap-engine/imap-engine-replay-operation.vala
+++ b/src/engine/imap-engine/imap-engine-replay-operation.vala
@@ -51,7 +51,7 @@ private abstract class Geary.ImapEngine.ReplayOperation : Geary.BaseObject, Gee.
public OnError on_remote_error { get; protected set; }
public int remote_retry_count { get; set; default = 0; }
public Error? err { get; private set; default = null; }
- public bool notified { get { return semaphore.is_passed(); } }
+ public bool notified { get { return semaphore.can_pass; } }
private Nonblocking.Semaphore semaphore = new Nonblocking.Semaphore();
@@ -152,7 +152,7 @@ private abstract class Geary.ImapEngine.ReplayOperation : Geary.BaseObject, Gee.
// Can only be called once
internal void notify_ready(Error? err) {
- assert(!semaphore.is_passed());
+ assert(!semaphore.can_pass);
this.err = err;
diff --git a/src/engine/meson.build b/src/engine/meson.build
index c8307cf..5ce7094 100644
--- a/src/engine/meson.build
+++ b/src/engine/meson.build
@@ -252,11 +252,11 @@ geary_engine_vala_sources = files(
'mime/mime-error.vala',
'mime/mime-multipart-subtype.vala',
- 'nonblocking/nonblocking-abstract-semaphore.vala',
'nonblocking/nonblocking-batch.vala',
'nonblocking/nonblocking-concurrent.vala',
'nonblocking/nonblocking-counting-semaphore.vala',
'nonblocking/nonblocking-error.vala',
+ 'nonblocking/nonblocking-lock.vala',
'nonblocking/nonblocking-mutex.vala',
'nonblocking/nonblocking-queue.vala',
'nonblocking/nonblocking-reporting-semaphore.vala',
diff --git a/src/engine/nonblocking/nonblocking-batch.vala b/src/engine/nonblocking/nonblocking-batch.vala
index 2eb4453..39770c7 100644
--- a/src/engine/nonblocking/nonblocking-batch.vala
+++ b/src/engine/nonblocking/nonblocking-batch.vala
@@ -170,7 +170,7 @@ public class Geary.Nonblocking.Batch : BaseObject {
*
* If the batch is executing or already executed, IOError.PENDING will be thrown. If the
* Cancellable is already cancelled, IOError.CANCELLED is thrown. Other errors may be thrown
- * as well; see {@link AbstractSemaphore.wait_async}.
+ * as well; see {@link Lock.wait_async}.
*
* Batch will launch each BatchOperation in the order added. Depending on the BatchOperation,
* this does not guarantee that they'll complete in any particular order.
diff --git a/src/engine/nonblocking/nonblocking-counting-semaphore.vala
b/src/engine/nonblocking/nonblocking-counting-semaphore.vala
index 5380384..0f64ce3 100644
--- a/src/engine/nonblocking/nonblocking-counting-semaphore.vala
+++ b/src/engine/nonblocking/nonblocking-counting-semaphore.vala
@@ -1,17 +1,22 @@
-/* Copyright 2016 Software Freedom Conservancy Inc.
+/*
+ * Copyright 2016 Software Freedom Conservancy Inc.
*
* This software is licensed under the GNU Lesser General Public License
* (version 2.1 or later). See the COPYING file in this distribution.
*/
/**
- * A nonblocking semaphore which allows for any number of tasks to run, but only signalling
- * completion when all have finished.
+ * A counting, asynchronous semaphore.
*
- * Unlike the other {@link AbstractSemaphore} variants, a task must {@link acquire} before it
- * can {@link notify}. The number of acquired tasks is kept in the {@link count} property.
+ * Unlike the other {@link Lock} variants, a task must {@link acquire}
+ * before it can {@link notify}. The number of acquired tasks is kept
+ * in the {@link count} property. Waiting tasks are released only when
+ * the count returns to zero.
+ *
+ * This class is ''not'' thread safe and should only be used by
+ * asynchronous tasks.
*/
-public class Geary.Nonblocking.CountingSemaphore : Geary.Nonblocking.AbstractSemaphore {
+public class Geary.Nonblocking.CountingSemaphore : Geary.Nonblocking.Lock {
/**
* The number of tasks which have {@link acquire} the semaphore.
*/
diff --git a/src/engine/nonblocking/nonblocking-abstract-semaphore.vala
b/src/engine/nonblocking/nonblocking-lock.vala
similarity index 54%
rename from src/engine/nonblocking/nonblocking-abstract-semaphore.vala
rename to src/engine/nonblocking/nonblocking-lock.vala
index e595d0f..5ded219 100644
--- a/src/engine/nonblocking/nonblocking-abstract-semaphore.vala
+++ b/src/engine/nonblocking/nonblocking-lock.vala
@@ -1,170 +1,227 @@
-/* Copyright 2016 Software Freedom Conservancy Inc.
+/*
+ * Copyright 2016 Software Freedom Conservancy Inc.
+ * Copyright 2018 Michael Gratton <mike vee net>.
*
* This software is licensed under the GNU Lesser General Public License
* (version 2.1 or later). See the COPYING file in this distribution.
*/
-public abstract class Geary.Nonblocking.AbstractSemaphore : BaseObject {
+/**
+ * A generic asynchronous lock data type.
+ *
+ * This class provides an asynchronous, queue-based lock
+ * implementation to allow implementing safe access to resources that
+ * are shared by asynchronous tasks. An asynchronous task may call
+ * {@link wait_async} to wait for the lock to be marked as safe to
+ * pass. Another asynchronous task may call {@link notify} to mark the
+ * lock as being safe, notifying waiting tasks. Once marked as being
+ * safe to pass, a lock may be reset to being unsafe by calling {@link
+ * reset}.
+ *
+ * See the specialised sub-classes for concrete implementations,
+ * which vary based on two features:
+ *
+ * //Broadcasting//: Whether all waiting tasks are notified when the
+ * lock may be passed, or just the next earliest waiting task.
+ *
+ * //Autoreset//: Whether the lock is automatically reset after
+ * notifying all waiting tasks, or if it must be manually reset by
+ * calling {@link reset}.
+ *
+ * This class is ''not'' thread safe and should only be used by
+ * asynchronous tasks.
+ */
+public abstract class Geary.Nonblocking.Lock : BaseObject {
+
private class Pending : BaseObject {
public unowned SourceFunc cb;
public Cancellable? cancellable;
public bool passed = false;
public bool scheduled = false;
-
+
public signal void cancelled();
-
+
public Pending(SourceFunc cb, Cancellable? cancellable) {
this.cb = cb;
this.cancellable = cancellable;
-
+
if (cancellable != null)
cancellable.cancelled.connect(on_cancelled);
}
-
+
~Pending() {
if (cancellable != null)
cancellable.cancelled.disconnect(on_cancelled);
}
-
+
private void on_cancelled() {
cancelled();
}
-
+
public void schedule(bool passed) {
assert(!scheduled);
-
+
this.passed = passed;
-
+
Scheduler.on_idle(cb);
scheduled = true;
}
}
-
+
+ /** Determines if this lock is marked as safe to pass. */
+ public bool can_pass { get { return this.passed; } }
+
+ /** Determines if this lock has been cancelled. */
+ public bool is_cancelled {
+ get {
+ return this.cancellable != null && this.cancellable.is_cancelled();
+ }
+ }
+
private bool broadcast;
private bool autoreset;
private Cancellable? cancellable;
private bool passed = false;
private Gee.List<Pending> pending_queue = new Gee.LinkedList<Pending>();
-
- protected AbstractSemaphore(bool broadcast, bool autoreset, Cancellable? cancellable = null) {
+
+ protected Lock(bool broadcast, bool autoreset, Cancellable? cancellable = null) {
this.broadcast = broadcast;
this.autoreset = autoreset;
this.cancellable = cancellable;
-
+
if (cancellable != null)
cancellable.cancelled.connect(on_cancelled);
}
-
- ~AbstractSemaphore() {
+
+ ~Lock() {
if (pending_queue.size > 0) {
- warning("Nonblocking semaphore destroyed with %d pending callers", pending_queue.size);
-
+ warning("Nonblocking lock destroyed with %d pending callers", pending_queue.size);
+
foreach (Pending pending in pending_queue)
pending.cancelled.disconnect(on_pending_cancelled);
}
-
+
if (cancellable != null)
cancellable.cancelled.disconnect(on_cancelled);
}
-
+
private void trigger(bool all) {
if (pending_queue.size == 0)
return;
-
- // in both cases, mark the Pending object(s) as passed in case this is an auto-reset
- // semaphore
+
+ // in both cases, mark the Pending object(s) as passed in case
+ // this is an auto-reset lock
if (all) {
foreach (Pending pending in pending_queue)
pending.schedule(passed);
-
+
pending_queue.clear();
} else {
Pending pending = pending_queue.remove_at(0);
pending.schedule(passed);
}
}
-
+
+ /**
+ * Marks the lock as being safe to pass.
+ *
+ * Asynchronous tasks waiting on this lock via a call to {@link
+ * wait_async} are resumed when this method is called. If this
+ * lock is broadcasting then all pending tasks are released,
+ * otherwise only the first in the queue is released.
+ *
+ * @throws GLib.IOError.CANCELLED if either the lock is cancelled
+ * or the caller's `cancellable` argument is cancelled.
+ */
public virtual new void notify() throws Error {
check_cancelled();
-
+
passed = true;
-
+
trigger(broadcast);
-
+
if (autoreset)
reset();
}
-
+
/**
- * Calls notify() without throwing an Exception, which is merely logged if encountered.
+ * Calls {@link notify} without throwing an exception.
+ *
+ * If an error is thrown, it is logged but otherwise ignored.
*/
public void blind_notify() {
try {
notify();
} catch (Error err) {
- message("Error notifying semaphore: %s", err.message);
+ message("Error notifying lock: %s", err.message);
}
}
-
+
+ /**
+ * Waits for the lock to be marked as being safe to pass.
+ *
+ * If the lock is already marked as being safe to pass, then this
+ * method will return immediately. If not, the call to this method
+ * will yield and not resume until the lock as been marked as safe
+ * by a call to {@link notify}.
+ *
+ * @throws GLib.IOError.CANCELLED if either the lock is cancelled or
+ * the caller's `cancellable` argument is cancelled.
+ */
public virtual async void wait_async(Cancellable? cancellable = null) throws Error {
for (;;) {
check_user_cancelled(cancellable);
check_cancelled();
-
+
if (passed)
return;
-
+
Pending pending = new Pending(wait_async.callback, cancellable);
pending.cancelled.connect(on_pending_cancelled);
-
+
pending_queue.add(pending);
yield;
-
+
pending.cancelled.disconnect(on_pending_cancelled);
-
+
if (pending.passed) {
check_user_cancelled(cancellable);
-
+
return;
}
}
}
-
+
+ /**
+ * Marks this lock as being unsafe to pass.
+ */
public virtual void reset() {
passed = false;
}
-
- public bool is_passed() {
- return passed;
- }
-
- public bool is_cancelled() {
- return (cancellable != null) ? cancellable.is_cancelled() : false;
- }
-
+
private void check_cancelled() throws Error {
- if (is_cancelled())
- throw new IOError.CANCELLED("Semaphore cancelled");
+ if (this.is_cancelled)
+ throw new IOError.CANCELLED("Lock was cancelled");
}
-
+
private static void check_user_cancelled(Cancellable? cancellable) throws Error {
if (cancellable != null && cancellable.is_cancelled())
- throw new IOError.CANCELLED("User cancelled operation");
+ throw new IOError.CANCELLED("User cancelled lock operation");
}
-
+
private void on_pending_cancelled(Pending pending) {
// if already scheduled, the cancellation will be dealt with when they wake up
if (pending.scheduled)
return;
-
+
bool removed = pending_queue.remove(pending);
assert(removed);
-
+
Scheduler.on_idle(pending.cb);
}
-
+
private void on_cancelled() {
trigger(true);
}
-}
+}
diff --git a/src/engine/nonblocking/nonblocking-mutex.vala b/src/engine/nonblocking/nonblocking-mutex.vala
index 1af8626..0030138 100644
--- a/src/engine/nonblocking/nonblocking-mutex.vala
+++ b/src/engine/nonblocking/nonblocking-mutex.vala
@@ -1,41 +1,79 @@
-/* Copyright 2016 Software Freedom Conservancy Inc.
+/*
+ * Copyright 2016 Software Freedom Conservancy Inc.
+ * Copyright 2018 Michael Gratton <mike vee net>
*
* This software is licensed under the GNU Lesser General Public License
* (version 2.1 or later). See the COPYING file in this distribution.
*/
/**
- * A task primitive for creating critical sections inside of asynchronous code.
+ * A primitive for creating critical sections inside of asynchronous tasks.
*
- * Like other primitives in {@link Nonblocking}, Mutex is ''not'' designed for a threaded
- * environment.
+ * Two methods can be used for executing code protected by this
+ * mutex. The easiest is to create a {@link CriticalSection} delegate
+ * and pass it to {@link execute_locked}. This will manage acquiring
+ * the lock as needed. The lower-level method is to call {@link
+ * claim_async}, execute the critical section, then ensure {@link
+ * release} is always called afterwards.
+ *
+ * This class is ''not'' thread safe and should only be used by
+ * asynchronous tasks.
*/
-
public class Geary.Nonblocking.Mutex : BaseObject {
+
public const int INVALID_TOKEN = -1;
-
+
+ /** A delegate that can be executed by this lock. */
+ public delegate void CriticalSection() throws GLib.Error;
+
private Spinlock spinlock = new Spinlock();
private bool locked = false;
private int next_token = INVALID_TOKEN + 1;
private int locked_token = INVALID_TOKEN;
-
- public Mutex() {
- }
-
+
+
/**
* Returns true if the {@link Mutex} has been claimed by a task.
*/
public bool is_locked() {
return locked;
}
-
+
+ /**
+ * Executes a critical section while protected by this mutex.
+ *
+ * This high-level method takes care of claiming, executing, then
+ * releasing the mutex, without requiring the caller to manage any
+ * this.
+ *
+ * @throws GLib.IOError.CANCELLED thrown if the caller's
+ * cancellable is cancelled before execution is completed
+ * @throws GLib.Error if an error occurred during execution of
+ * //target//.
+ */
+ public async void execute_locked(Mutex.CriticalSection target,
+ Cancellable? cancellable = null)
+ throws Error {
+ int token = yield claim_async(cancellable);
+ try {
+ target();
+ } finally {
+ try {
+ release(ref token);
+ } catch (Error err) {
+ debug("Mutex error releasing token: %s", err.message);
+ }
+ }
+ }
+
/**
- * Claim (i.e. lock) the {@link Mutex} and begin execution inside a critical section.
+ * Locks the mutex for execution inside a critical section.
*
- * claim_async will block asynchronously waiting for the Mutex to be released, if it's already
- * claimed.
+ * If already claimed, this call will block asynchronously waiting
+ * for the mutex to be released.
*
- * @return A token which must be used to {@link release} the Mutex.
+ * @return A token which must be passed to {@link release} when
+ * the critical section has completed executing.
*/
public async int claim_async(Cancellable? cancellable = null) throws Error {
for (;;) {
@@ -44,31 +82,33 @@ public class Geary.Nonblocking.Mutex : BaseObject {
do {
locked_token = next_token++;
} while (locked_token == INVALID_TOKEN);
-
+
return locked_token;
}
-
+
yield spinlock.wait_async(cancellable);
}
}
-
+
/**
- * Release (i.e. unlock) the {@link Mutex} and end execution inside a critical section.
+ * Releases the lock at the end of executing a critical section.
*
- * The token returned by {@link claim_async} must be supplied as a parameter. It will be
- * modified by this call so it can't be reused.
+ * The token returned by {@link claim_async} must be supplied as a
+ * parameter. It will be modified by this call so it can't be
+ * reused.
*
- * Throws IOError.INVALID_ARGUMENT if the token was not the one returned by claim_async.
+ * Throws IOError.INVALID_ARGUMENT if the token was not the one
+ * returned by claim_async.
*/
public void release(ref int token) throws Error {
if (token != locked_token || token == INVALID_TOKEN)
throw new IOError.INVALID_ARGUMENT("Token %d is not the lock token", token);
-
+
locked = false;
token = INVALID_TOKEN;
locked_token = INVALID_TOKEN;
-
+
spinlock.notify();
}
-}
+}
diff --git a/src/engine/nonblocking/nonblocking-queue.vala b/src/engine/nonblocking/nonblocking-queue.vala
index bda0450..fc166c2 100644
--- a/src/engine/nonblocking/nonblocking-queue.vala
+++ b/src/engine/nonblocking/nonblocking-queue.vala
@@ -11,7 +11,10 @@
*
* This class can be used to asynchronously wait for items to be added
* to the queue, the asynchronous call blocking until an item is
- * ready.
+ * ready. Multiple asynchronous tasks can queue objects via {@link
+ * send}, and tasks can wait for items via {@link receive}. If there
+ * are multiple tasks waiting for items, the first to wait will
+ * receive the next item.
*/
public class Geary.Nonblocking.Queue<G> : BaseObject {
diff --git a/src/engine/nonblocking/nonblocking-variants.vala
b/src/engine/nonblocking/nonblocking-variants.vala
index b774a6f..59e70d0 100644
--- a/src/engine/nonblocking/nonblocking-variants.vala
+++ b/src/engine/nonblocking/nonblocking-variants.vala
@@ -1,36 +1,61 @@
-/* Copyright 2016 Software Freedom Conservancy Inc.
+/*
+ * Copyright 2016 Software Freedom Conservancy Inc.
+ * Copyright 2018 Michael Gratton <mike vee net>.
*
* This software is licensed under the GNU Lesser General Public License
* (version 2.1 or later). See the COPYING file in this distribution.
*/
/**
- * A Semaphore is a broadcasting, manually-resetting {@link AbstractSemaphore}.
+ * A broadcasting, manually-resetting asynchronous lock.
+ *
+ * This lock type will notify all waiting asynchronous tasks when
+ * marked as being safe to pass, and requires a call to {@link
+ * Lock.reset} to be marked as unsafe again.
+ *
+ * This class is ''not'' thread safe and should only be used by
+ * asynchronous tasks.
+ *
+ * @see Lock
*/
-
-public class Geary.Nonblocking.Semaphore : Geary.Nonblocking.AbstractSemaphore {
+public class Geary.Nonblocking.Semaphore : Geary.Nonblocking.Lock {
public Semaphore(Cancellable? cancellable = null) {
base (true, false, cancellable);
}
}
/**
- * An Event is a broadcasting, auto-resetting {@link AbstractSemaphore}.
+ * A broadcasting, automatically-resetting asynchronous lock.
+ *
+ * This lock type will notify all waiting asynchronous tasks when
+ * marked as being safe to pass, and will automatically reset as being
+ * unsafe to pass after doing so.
+ *
+ * This class is ''not'' thread safe and should only be used by
+ * asynchronous tasks.
+ *
+ * @see Lock
*/
-
-public class Geary.Nonblocking.Event : Geary.Nonblocking.AbstractSemaphore {
+public class Geary.Nonblocking.Event : Geary.Nonblocking.Lock {
public Event(Cancellable? cancellable = null) {
base (true, true, cancellable);
}
}
/**
- * A Spinlock is a single-notifying, auto-resetting {@link AbstractSemaphore}.
+ * A single-task-notifying, automatically-resetting asynchronous lock.
+ *
+ * This lock type will the first asynchronous task waiting when marked
+ * as being safe to pass, and will automatically reset as being unsafe
+ * to pass after doing so.
+ *
+ * This class is ''not'' thread safe and should only be used by
+ * asynchronous tasks.
+ *
+ * @see Lock
*/
-
-public class Geary.Nonblocking.Spinlock : Geary.Nonblocking.AbstractSemaphore {
+public class Geary.Nonblocking.Spinlock : Geary.Nonblocking.Lock {
public Spinlock(Cancellable? cancellable = null) {
base (false, true, cancellable);
}
}
-
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]