[libgee/asyncqueue] Add AsyncQueue implementation



commit 26ef498b6d72592443ab6c97f560257dc9a49dc6
Author: Maciej Piechotka <uzytkownik2 gmail com>
Date:   Mon Jan 23 13:52:02 2012 +0000

    Add AsyncQueue implementation

 gee/Makefile.am     |    1 +
 gee/asyncqueue.vala |  282 +++++++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 283 insertions(+), 0 deletions(-)
---
diff --git a/gee/Makefile.am b/gee/Makefile.am
index 1da2b39..033ed23 100644
--- a/gee/Makefile.am
+++ b/gee/Makefile.am
@@ -18,6 +18,7 @@ libgee_0_8_la_SOURCES = \
        abstractsortedset.vala \
        arraylist.vala \
        arrayqueue.vala \
+       asyncqueue.vala \
        bidiriterator.vala \
        bidirlist.vala \
        bidirlistiterator.vala \
diff --git a/gee/asyncqueue.vala b/gee/asyncqueue.vala
new file mode 100644
index 0000000..91cab07
--- /dev/null
+++ b/gee/asyncqueue.vala
@@ -0,0 +1,282 @@
+/* arrayqueue.vala
+ *
+ * Copyright (C) 2011  Evan Nemerson
+ * Copyright (C) 2012  Maciej Piechotka
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA
+ *
+ * Author:
+ *     Evan Nemerson <evan coeus-group com>
+ *     Maciej Piechotka <uzytkownik2 gmail com>
+ */
+
+public class Gee.AsyncQueue<G> : AbstractQueue<G> {
+
+       public override void foreach (Gee.ForallFunc<G> f) {
+               mutex.lock ();
+               queue.foreach (f);
+               mutex.unlock ();
+       }
+
+       public override Gee.Iterator<G> iterator () {
+               return new Iterator<G> (this);
+       }
+
+       public override int size {
+               get {
+                       mutex.lock ();
+                       int size = queue.size;
+                       mutex.unlock ();
+                       return size;
+               }
+       }
+
+       public override bool is_empty {
+               get {
+                       mutex.lock ();
+                       bool is_empty = queue.is_empty;
+                       mutex.unlock ();
+                       return is_empty;
+               }
+       }
+
+       public override bool read_only {
+               get {
+                       return queue.read_only;
+               }
+       }
+
+       public override bool contains (G item) {
+               mutex.lock ();
+               bool contains = queue.contains (item);
+               mutex.unlock ();
+               return contains;
+       }
+
+       public override bool remove (G item) {
+               mutex.lock ();
+               bool removed = queue.add (item);
+               if (removed) {
+                       non_full.signal ();
+               }
+               mutex.unlock ();
+               return removed;
+       }
+
+       public override void clear () {
+               mutex.lock ();
+               queue.clear ();
+               non_full.signal ();
+               mutex.unlock ();
+       }
+
+       public override G[] to_array () {
+               mutex.lock ();
+               G[] array = queue.to_array ();
+               mutex.unlock ();
+               return (owned)array;
+       }
+
+       public override bool add_all (Collection<G> collection) {
+               bool changed = false;
+               if (!collection.is_empty) {
+                       mutex.lock ();
+                       foreach (G item in collection) {
+                               if (queue.remaining_capacity == 0) {
+                                       non_full.wait (mutex);
+                               }
+                               changed |= queue.add (item);
+                       }
+                       if (changed) {
+                               non_empty.signal ();
+                       }
+                       mutex.unlock ();
+               }
+               return changed;
+       }
+
+       public override bool contains_all (Collection<G> collection) {
+               mutex.lock ();
+               bool contains_all = queue.contains_all (collection);
+               mutex.unlock ();
+               return contains_all;
+       }
+
+       public override bool retain_all (Collection<G> collection) {
+               bool changed = false;
+               mutex.lock ();
+               for (Gee.Iterator<G> iter = queue.iterator (); iter.next ();) {
+                       G item = iter.get ();
+                       if (collection.contains (item)) {
+                               iter.remove ();
+                               changed = true;
+                       }
+               }
+               if (changed) {
+                       non_full.signal ();
+               }
+               mutex.unlock ();
+               return changed;
+       }
+
+       public override int capacity {
+               get {
+                       return queue.capacity;
+               }
+       }
+
+       public override int remaining_capacity {
+               get {
+                       if (queue.capacity != Queue.UNBOUNDED_CAPACITY) {
+                               mutex.lock ();
+                               int remaning_capacity = queue.remaining_capacity;
+                               mutex.unlock ();
+                               return remaning_capacity;
+                       } else {
+                               return Queue.UNBOUNDED_CAPACITY;
+                       }
+               }
+       }
+
+       public override bool is_full {
+               get {
+                       if (queue.capacity != Queue.UNBOUNDED_CAPACITY) {
+                               mutex.lock ();
+                               bool is_full = queue.is_full;
+                               mutex.unlock ();
+                               return is_full;
+                       } else {
+                               return false;
+                       }
+               }
+       }
+
+       public override bool offer (G element) {
+               mutex.lock ();
+               if (queue.is_full) {
+                       non_full.wait (mutex);
+               }
+               bool changed = offer (element);
+               if (changed) {
+                       non_empty.signal ();
+               }
+               mutex.unlock ();
+               return changed;
+       }
+
+       public override G? peek () {
+               mutex.lock ();
+               if (queue.is_empty) {
+                       non_empty.wait (mutex);
+               }
+               G? element = queue.peek ();
+               mutex.unlock ();
+               return element;
+       }
+
+       public override G? poll () {
+               mutex.lock ();
+               if (queue.is_empty) {
+                       non_empty.wait (mutex);
+               }
+               G? element = queue.poll ();
+               mutex.unlock ();
+               return element;
+       }
+
+       public override int drain (Collection<G> recipient, int amount = -1) {
+               if (amount == 0) {
+                       return 0;
+               }
+               mutex.lock ();
+               int drained = queue.drain (recipient, amount);
+               if (amount != -1 && drained != amount) {
+                       do {
+                               non_empty.wait (mutex);
+                               drained += queue.drain (recipient, amount - drained);
+                       } while (drained < amount);
+               }
+               mutex.unlock ();
+               return drained;
+       }
+
+       protected Queue<G> queue;
+       protected Mutex mutex = new Mutex ();
+       protected Cond non_full = new Cond ();
+       protected Cond non_empty = new Cond ();
+
+       protected class Iterator<G> : Object, Traversable<G>, Gee.Iterator<G> {
+               public Iterator (AsyncQueue<G> queue) {
+                       this.queue = queue;
+                       queue.mutex.lock ();
+                       iterator = this.queue.iterator ();
+                       queue.mutex.unlock ();
+               }
+
+               public void foreach (Gee.ForallFunc<G> f) {
+                       queue.mutex.lock ();
+                       iterator.foreach (f);
+                       queue.mutex.unlock ();
+               }
+
+               public Gee.Iterator<A> stream<A> (owned StreamFunc<G, A> f) {
+                       return Gee.Iterator.stream_impl<G, A> (this, (owned)f);
+               }
+
+               public Gee.Iterator<A> filter<A> (owned Predicate<G> pred) {
+                       return Gee.Traversable.filter_impl<G> (this, (owned)pred);
+               }
+
+               public Gee.Iterator<A> chop<A> (int offset, int length) {
+                       return Gee.Traversable.chop_impl<G> (this, offset, length);
+               }
+
+               public bool next () {
+                       queue.mutex.lock ();
+                       bool next = iterator.next ();
+                       queue.mutex.unlock ();
+                       return next;
+               }
+
+               public bool has_next () {
+                       queue.mutex.lock ();
+                       bool has_next = iterator.has_next ();
+                       queue.mutex.unlock ();
+                       return has_next;
+               }
+
+               public new G get () {
+                       queue.mutex.lock ();
+                       G value = iterator.get ();
+                       queue.mutex.unlock ();
+                       return (owned)value;
+               }
+
+               public void remove () {
+                       queue.mutex.lock ();
+                       iterator.remove ();
+                       queue.mutex.unlock ();
+                       queue.mutex.unlock ();
+               }
+
+               public bool valid { get { return this.iterator.valid; } }
+
+               public bool read_only { get { return this.iterator.read_only; } }
+
+               protected AsyncQueue<G> queue;
+               protected Gee.Iterator<G> iterator;
+       }
+}
+


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]