[libgee/asyncqueue] Add AsyncQueue implementation
- From: Maciej Marcin Piechotka <mpiechotka src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [libgee/asyncqueue] Add AsyncQueue implementation
- Date: Mon, 4 Mar 2013 17:08:21 +0000 (UTC)
commit 26ef498b6d72592443ab6c97f560257dc9a49dc6
Author: Maciej Piechotka <uzytkownik2 gmail com>
Date: Mon Jan 23 13:52:02 2012 +0000
Add AsyncQueue implementation
gee/Makefile.am | 1 +
gee/asyncqueue.vala | 282 +++++++++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 283 insertions(+), 0 deletions(-)
---
diff --git a/gee/Makefile.am b/gee/Makefile.am
index 1da2b39..033ed23 100644
--- a/gee/Makefile.am
+++ b/gee/Makefile.am
@@ -18,6 +18,7 @@ libgee_0_8_la_SOURCES = \
abstractsortedset.vala \
arraylist.vala \
arrayqueue.vala \
+ asyncqueue.vala \
bidiriterator.vala \
bidirlist.vala \
bidirlistiterator.vala \
diff --git a/gee/asyncqueue.vala b/gee/asyncqueue.vala
new file mode 100644
index 0000000..91cab07
--- /dev/null
+++ b/gee/asyncqueue.vala
@@ -0,0 +1,282 @@
+/* arrayqueue.vala
+ *
+ * Copyright (C) 2011 Evan Nemerson
+ * Copyright (C) 2012 Maciej Piechotka
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ *
+ * Author:
+ * Evan Nemerson <evan coeus-group com>
+ * Maciej Piechotka <uzytkownik2 gmail com>
+ */
+
+public class Gee.AsyncQueue<G> : AbstractQueue<G> {
+
+ public override void foreach (Gee.ForallFunc<G> f) {
+ mutex.lock ();
+ queue.foreach (f);
+ mutex.unlock ();
+ }
+
+ public override Gee.Iterator<G> iterator () {
+ return new Iterator<G> (this);
+ }
+
+ public override int size {
+ get {
+ mutex.lock ();
+ int size = queue.size;
+ mutex.unlock ();
+ return size;
+ }
+ }
+
+ public override bool is_empty {
+ get {
+ mutex.lock ();
+ bool is_empty = queue.is_empty;
+ mutex.unlock ();
+ return is_empty;
+ }
+ }
+
+ public override bool read_only {
+ get {
+ return queue.read_only;
+ }
+ }
+
+ public override bool contains (G item) {
+ mutex.lock ();
+ bool contains = queue.contains (item);
+ mutex.unlock ();
+ return contains;
+ }
+
+ public override bool remove (G item) {
+ mutex.lock ();
+ bool removed = queue.add (item);
+ if (removed) {
+ non_full.signal ();
+ }
+ mutex.unlock ();
+ return removed;
+ }
+
+ public override void clear () {
+ mutex.lock ();
+ queue.clear ();
+ non_full.signal ();
+ mutex.unlock ();
+ }
+
+ public override G[] to_array () {
+ mutex.lock ();
+ G[] array = queue.to_array ();
+ mutex.unlock ();
+ return (owned)array;
+ }
+
+ public override bool add_all (Collection<G> collection) {
+ bool changed = false;
+ if (!collection.is_empty) {
+ mutex.lock ();
+ foreach (G item in collection) {
+ if (queue.remaining_capacity == 0) {
+ non_full.wait (mutex);
+ }
+ changed |= queue.add (item);
+ }
+ if (changed) {
+ non_empty.signal ();
+ }
+ mutex.unlock ();
+ }
+ return changed;
+ }
+
+ public override bool contains_all (Collection<G> collection) {
+ mutex.lock ();
+ bool contains_all = queue.contains_all (collection);
+ mutex.unlock ();
+ return contains_all;
+ }
+
+ public override bool retain_all (Collection<G> collection) {
+ bool changed = false;
+ mutex.lock ();
+ for (Gee.Iterator<G> iter = queue.iterator (); iter.next ();) {
+ G item = iter.get ();
+ if (collection.contains (item)) {
+ iter.remove ();
+ changed = true;
+ }
+ }
+ if (changed) {
+ non_full.signal ();
+ }
+ mutex.unlock ();
+ return changed;
+ }
+
+ public override int capacity {
+ get {
+ return queue.capacity;
+ }
+ }
+
+ public override int remaining_capacity {
+ get {
+ if (queue.capacity != Queue.UNBOUNDED_CAPACITY) {
+ mutex.lock ();
+ int remaning_capacity = queue.remaining_capacity;
+ mutex.unlock ();
+ return remaning_capacity;
+ } else {
+ return Queue.UNBOUNDED_CAPACITY;
+ }
+ }
+ }
+
+ public override bool is_full {
+ get {
+ if (queue.capacity != Queue.UNBOUNDED_CAPACITY) {
+ mutex.lock ();
+ bool is_full = queue.is_full;
+ mutex.unlock ();
+ return is_full;
+ } else {
+ return false;
+ }
+ }
+ }
+
+ public override bool offer (G element) {
+ mutex.lock ();
+ if (queue.is_full) {
+ non_full.wait (mutex);
+ }
+ bool changed = offer (element);
+ if (changed) {
+ non_empty.signal ();
+ }
+ mutex.unlock ();
+ return changed;
+ }
+
+ public override G? peek () {
+ mutex.lock ();
+ if (queue.is_empty) {
+ non_empty.wait (mutex);
+ }
+ G? element = queue.peek ();
+ mutex.unlock ();
+ return element;
+ }
+
+ public override G? poll () {
+ mutex.lock ();
+ if (queue.is_empty) {
+ non_empty.wait (mutex);
+ }
+ G? element = queue.poll ();
+ mutex.unlock ();
+ return element;
+ }
+
+ public override int drain (Collection<G> recipient, int amount = -1) {
+ if (amount == 0) {
+ return 0;
+ }
+ mutex.lock ();
+ int drained = queue.drain (recipient, amount);
+ if (amount != -1 && drained != amount) {
+ do {
+ non_empty.wait (mutex);
+ drained += queue.drain (recipient, amount - drained);
+ } while (drained < amount);
+ }
+ mutex.unlock ();
+ return drained;
+ }
+
+ protected Queue<G> queue;
+ protected Mutex mutex = new Mutex ();
+ protected Cond non_full = new Cond ();
+ protected Cond non_empty = new Cond ();
+
+ protected class Iterator<G> : Object, Traversable<G>, Gee.Iterator<G> {
+ public Iterator (AsyncQueue<G> queue) {
+ this.queue = queue;
+ queue.mutex.lock ();
+ iterator = this.queue.iterator ();
+ queue.mutex.unlock ();
+ }
+
+ public void foreach (Gee.ForallFunc<G> f) {
+ queue.mutex.lock ();
+ iterator.foreach (f);
+ queue.mutex.unlock ();
+ }
+
+ public Gee.Iterator<A> stream<A> (owned StreamFunc<G, A> f) {
+ return Gee.Iterator.stream_impl<G, A> (this, (owned)f);
+ }
+
+ public Gee.Iterator<A> filter<A> (owned Predicate<G> pred) {
+ return Gee.Traversable.filter_impl<G> (this, (owned)pred);
+ }
+
+ public Gee.Iterator<A> chop<A> (int offset, int length) {
+ return Gee.Traversable.chop_impl<G> (this, offset, length);
+ }
+
+ public bool next () {
+ queue.mutex.lock ();
+ bool next = iterator.next ();
+ queue.mutex.unlock ();
+ return next;
+ }
+
+ public bool has_next () {
+ queue.mutex.lock ();
+ bool has_next = iterator.has_next ();
+ queue.mutex.unlock ();
+ return has_next;
+ }
+
+ public new G get () {
+ queue.mutex.lock ();
+ G value = iterator.get ();
+ queue.mutex.unlock ();
+ return (owned)value;
+ }
+
+ public void remove () {
+ queue.mutex.lock ();
+ iterator.remove ();
+ queue.mutex.unlock ();
+ queue.mutex.unlock ();
+ }
+
+ public bool valid { get { return this.iterator.valid; } }
+
+ public bool read_only { get { return this.iterator.read_only; } }
+
+ protected AsyncQueue<G> queue;
+ protected Gee.Iterator<G> iterator;
+ }
+}
+
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]