[libgee] Implement ConcurrentList



commit 4c62ff2c105fc64139cace0e8211bda172988e8b
Author: Maciej Piechotka <uzytkownik2 gmail com>
Date:   Sat Mar 26 05:24:24 2011 +0100

    Implement ConcurrentList
    
     - After porting to volatile the freeing of data does not work

 gee/Makefile.am               |    1 +
 gee/concurrentlist.vala       |  585 +++++++++++++++++++++++++++++++++++++++++
 tests/Makefile.am             |    1 +
 tests/testconcurrentlist.vala |   38 +++
 tests/testmain.vala           |    1 +
 5 files changed, 626 insertions(+), 0 deletions(-)
---
diff --git a/gee/Makefile.am b/gee/Makefile.am
index cf41f42..7800cf4 100644
--- a/gee/Makefile.am
+++ b/gee/Makefile.am
@@ -22,6 +22,7 @@ libgee_0_8_la_SOURCES = \
 	bidirmapiterator.vala \
 	collection.vala \
 	comparable.vala \
+	concurrentlist.vala \
 	deque.vala \
 	functions.vala \
 	hashable.vala \
diff --git a/gee/concurrentlist.vala b/gee/concurrentlist.vala
new file mode 100644
index 0000000..91bfa22
--- /dev/null
+++ b/gee/concurrentlist.vala
@@ -0,0 +1,585 @@
+/* concurrentlist.vala
+ *
+ * Copyright (C) 2011  Maciej Piechotka
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA
+ *
+ * Author:
+ * 	Maciej Piechotka <uzytkownik2 gmail com>
+ */
+
+/**
+ * A single-linked list. This implementation is based on
+ * [[www.cse.yorku.ca/~ruppert/papers/lfll.pdf|Mikhail Fomitchev and  Eric Ruppert paper ]].
+ *
+ * Many threads are allowed to operate on the same structure as well as modification
+ * of structure during iteration is allowed. However the change may not be immidiatly
+ * visible to other threads.
+ */
+public class Gee.ConcurrentList<G> : AbstractList<G> {
+	/**
+	 * The elements' equality testing function.
+	 */
+	public Gee.EqualDataFunc equal_func { private set; get; }
+
+	/**
+	 * Construct new, empty single linked list
+	 *
+	 * If not provided, the function parameter is requested to the
+	 * { link Functions} function factory methods.
+	 *
+	 * @param equal_func an optional element equality testing function
+	 */
+	public ConcurrentList (owned Gee.EqualDataFunc? equal_func = null) {
+		if (equal_func == null)
+			equal_func = Gee.Functions.get_equal_func_for (typeof (G));
+		this.equal_func = (owned)equal_func;
+		_head = new Node<G>.head ();
+		HazardPointer.set_pointer<Node<G>> (&_tail, _head);
+	}
+
+	~ConcurrentList () {
+		HazardPointer.Context ctx = new HazardPointer.Context ();
+		_head = null;
+		HazardPointer.set_pointer<Node<G>> (&_tail, null);
+	}
+
+	/**
+	 * { inheritDoc}
+	 */
+	public override bool read_only {
+		get {
+			return false;
+		}
+	}
+
+	/**
+	 * { inheritDoc}
+	 */
+	public override int size {
+		get {
+			HazardPointer.Context ctx = new HazardPointer.Context ();
+			int result = 0;
+			for (var iter = iterator (); iter.next ();)
+				result++;
+			return result;
+		}
+	}
+
+	/**
+	 * { inheritDoc}
+	 */
+	public override bool is_empty {
+		get {
+			return !iterator ().next ();
+		}
+	}
+
+	/**
+	 * { inheritDoc}
+	 */
+	public override bool contains (G item) {
+		HazardPointer.Context ctx = new HazardPointer.Context ();
+		for (var iter = iterator (); iter.next ();)
+			if (equal_func (item, iter.get ()))
+				return true;
+		return false;
+	}
+
+	/**
+	 * { inheritDoc}
+	 */
+	public override bool add (G item) {
+		HazardPointer.Context ctx = new HazardPointer.Context ();
+		Node<G> node = new Node<G> (item);
+		node.insert (get_tail (), null);
+		return true;
+	}
+
+	/**
+	 * { inheritDoc}
+	 */
+	public override bool remove (G item) {
+		HazardPointer.Context ctx = new HazardPointer.Context ();
+		Gee.Iterator<G> iter = iterator ();
+		while (iter.next ()) {
+			if (equal_func (item, iter.get ())) {
+				iter.remove ();
+				return true;
+			}
+		}
+		return false;
+	}
+
+	/**
+	 * { inheritDoc}
+	 */
+	public override void clear () {
+		HazardPointer.Context ctx = new HazardPointer.Context ();
+		var iter = iterator ();
+		while (iter.next ())
+			iter.remove ();
+		HazardPointer.set_pointer (&_tail, _head);
+	}
+
+	/**
+	 * { inheritDoc}
+	 */
+	public override Gee.Iterator<G> iterator () {
+		return new Iterator<G> (_head);
+	}
+
+	/**
+	 * { inheritDoc}
+	 */
+	public override ListIterator<G> list_iterator () {
+		return new Iterator<G> (_head);
+	}
+
+	/**
+	 * { inheritDoc}
+	 */
+	public override G? get (int index) {
+		HazardPointer.Context ctx = new HazardPointer.Context ();
+		assert (index >= 0);
+		for (var iterator = iterator (); iterator.next ();)
+			if (index-- == 0)
+				return iterator.get ();
+		assert_not_reached ();
+	}
+
+	/**
+	 * { inheritDoc}
+	 */
+	public override void set (int index, G item) {
+		HazardPointer.Context ctx = new HazardPointer.Context ();
+		assert (index >= 0);
+		for (var iterator = list_iterator (); iterator.next ();) {
+			if (index-- == 0) {
+				iterator.set (item);
+				return;
+			}
+		}
+		assert_not_reached ();
+	}
+
+	/**
+	 * { inheritDoc}
+	 */
+	public override int index_of (G item) {
+		HazardPointer.Context ctx = new HazardPointer.Context ();
+		int index = 0;
+		for (var iterator = list_iterator (); iterator.next (); index++)
+			if (equal_func (item, iterator.get ()))
+				return index;
+		return -1;
+	}
+
+	/**
+	 * { inheritDoc}
+	 */
+	public override void insert (int index, G item) {
+		HazardPointer.Context ctx = new HazardPointer.Context ();
+		assert (index >= 0);
+		if (index == 0) {
+			var prev = _head;
+			var next = _head.get_next ();
+			Node<G> new_node = new Node<G> (item);
+			new_node.insert (prev, next);
+		} else {
+			for (var iterator = list_iterator (); iterator.next ();) {
+				if (--index == 0) {
+					iterator.add (item);
+					return;
+				}
+			}
+			assert_not_reached ();
+		}
+	}
+
+	/**
+	 * { inheritDoc}
+	 */
+	public override G remove_at (int index) {
+		HazardPointer.Context ctx = new HazardPointer.Context ();
+		for (var iterator = list_iterator (); iterator.next ();) {
+			if (index-- == 0) {
+				G data = iterator.get ();
+				iterator.remove ();
+				return data;
+			}
+		}
+		assert_not_reached ();
+	}
+
+	/**
+	 * { inheritDoc}
+	 */
+	public override List<G>? slice (int start, int end) {
+		HazardPointer.Context ctx = new HazardPointer.Context ();
+		assert (0 <= start);
+		assert (start <= end);
+		var list = new ConcurrentList<G> (equal_func);
+		var iterator = iterator ();
+		int idx = 0;
+		for (; iterator.next (); idx++)
+			if (idx >= start && idx < end)
+				list.add (iterator.get ());
+			else if (idx >= end)
+				break;
+		assert (idx >= end);
+		return list;
+	}
+
+	private inline Node<G> update_tail () {
+		Node<G> tail = HazardPointer.get_pointer (&_tail);
+		Node.backtrace<G> (ref tail);
+		Node.search_for<G> (null, ref tail);
+		HazardPointer.set_pointer<Node<G>> (&_tail, tail);
+		return tail;
+	}
+
+	private inline Node<G> get_tail () {
+		return update_tail ();
+	}
+
+	private Node<G> _head;
+	private Node<G> *_tail;
+
+	private class Iterator<G> : Object, Gee.Traversable<G>, Gee.Iterator<G>, ListIterator<G> {
+		public Iterator (Node<G> head) {
+			_started = false;
+			_removed = false;
+			_index = -1;
+			_prev = null;
+			_curr = head;
+		}
+
+		public bool next () {
+			HazardPointer.Context ctx = new HazardPointer.Context ();
+			Node<G>? _old_prev = _removed ? _prev : null;
+			bool success = Node.proceed<G> (ref _prev, ref _curr);
+			if (success) {
+				if (_removed)
+					_prev = _old_prev;
+				_removed = false;
+				_started = true;
+				_index++;
+			}
+			return success;
+		}
+
+		public bool has_next () {
+			HazardPointer.Context ctx = new HazardPointer.Context ();
+			Node<G>? prev = _prev;
+			Node<G>? curr = _curr;
+			return Node.proceed<G> (ref prev, ref curr);
+		}
+
+		public new G get () {
+			HazardPointer.Context ctx = new HazardPointer.Context ();
+			assert (valid);
+			return HazardPointer.get_pointer<G> (&_curr._data);
+		}
+
+		public new void set (G item) {
+			HazardPointer.Context ctx = new HazardPointer.Context ();
+			assert (valid);
+#if DEBUG
+			G item_copy = item;
+			stderr.printf ("  Setting data %p to %p\n", _curr, item_copy);
+			HazardPointer.set_pointer<G> (&_curr._data, (owned)item_copy);
+#else
+			HazardPointer.set_pointer<G> (&_curr._data, item);
+#endif
+		}
+
+		public void remove () {
+			HazardPointer.Context ctx = new HazardPointer.Context ();
+			assert (valid);
+			_curr.remove (_prev);
+			_removed = true;
+			_index--;
+		}
+
+		public bool valid {
+			get { return _started && !_removed && _curr != null; }
+		}
+
+		public bool read_only { get { return false; } }
+
+		public int index() {
+			assert (valid);
+			return _index;
+		}
+
+		public void add (G item) {
+			HazardPointer.Context ctx = new HazardPointer.Context ();
+			assert (valid);
+			if (!Node.proceed<G> (ref _prev, ref _curr)) {
+				_prev = (owned)_curr;
+				_curr = null;
+			}
+			Node<G> new_node = new Node<G> (item);
+			new_node.insert (_prev, _curr);
+			_curr = (owned)new_node;
+			_index++;
+		}
+
+		public new void foreach (ForallFunc<G> f) {
+			HazardPointer.Context ctx = new HazardPointer.Context ();
+			if (_started && !_removed)
+				f (HazardPointer.get_pointer<G> (&_curr._data));
+			Node<G>? _old_prev = _removed ? _prev : null;
+			while (Node.proceed<G> (ref _prev, ref _curr)) {
+				if (_removed)
+					_prev = _old_prev;
+				_removed = false;
+				_started = true;
+				_index++;
+				f (HazardPointer.get_pointer<G> (&_curr._data));
+			}
+		}
+
+		public Gee.Iterator<G> stream<A> (owned StreamFunc<G, A> f) {
+			return Gee.Iterator.stream_impl<G, A> (this, (owned)f);
+		}
+
+		public Gee.Iterator<G> filter (owned Predicate<G> f) {
+			return Traversable.filter_impl<G> (this, (owned)f);
+		}
+
+		public Gee.Iterator<G> chop (int offset, int length = -1) {
+			return Traversable.chop_impl<G> (this, offset, length);
+		}
+
+		private bool _started;
+		private bool _removed;
+		private int _index;
+		private Node<G> _prev;
+		private Node<G>? _curr;
+	}
+
+	private class Node<G> {
+		public inline Node (G data) {
+			AtomicPointer.set (&_succ, null);
+			AtomicPointer.set (&_backlink, null);
+			G data_copy = data;
+			G *data_ptr = (owned)data_copy;
+#if DEBUG
+			stderr.printf ("  Creating node %p with data %p\n", this, data_ptr);
+#endif
+			AtomicPointer.set (&_data, (owned)data_ptr);
+		}
+
+		public inline Node.head () {
+			AtomicPointer.set (&_succ, null);
+			AtomicPointer.set (&_backlink, null);
+			AtomicPointer.set (&_data, null);
+#if DEBUG
+			stderr.printf ("  Creating head node %p\n", this);
+#endif
+		}
+
+		inline ~Node () {
+			HazardPointer.set_pointer<Node<G>> (&_succ, null, 3);
+			HazardPointer.set_pointer<Node<G>> (&_backlink, null);
+#if DEBUG
+			G? old_data = HazardPointer.exchange_pointer (&_data, null);
+			stderr.printf ("  Freeing node %p (with data %p)\n", this, old_data);
+#else
+			HazardPointer.set_pointer<G> (&_data, null);
+#endif
+		}
+
+		public static inline bool proceed<G> (ref Node<G> prev, ref Node<G> curr, bool force = false) {
+			Node<G> next = curr.get_next ();
+			while (next != null) {
+				State next_state = next.get_state ();
+				State curr_state;
+				Node<G> curr_next = curr.get_succ (out curr_state);
+				if (next_state != State.MARKED || (curr_state == State.MARKED && curr_next == next))
+					break;
+				if (curr_next == next)
+					next.help_marked (curr);
+				next = curr_next;
+			}
+			bool success = next != null;
+			if (success || force) {
+				prev = (owned)curr;
+				curr = (owned)next;
+#if DEBUG
+				stderr.printf ("  Procceed to %p (previous %p)\n", curr, prev);
+#endif
+			}
+			return success;
+		}
+
+		public static inline bool search_for<G> (Node<G>? goal, ref Node<G>? prev) {
+			Node<G>? curr = prev.get_next ();
+			while ((curr != goal || curr != null) && proceed<G> (ref prev, ref curr, true));
+			return curr == goal;
+		}
+
+		public inline bool remove (Node<G> prev_node) {
+#if DEBUG
+			stderr.printf ("  Removing %p (previous %p)\n", this, prev_node);
+#endif
+			Node<G>? prev = prev_node;
+			bool result = try_flag (ref prev);
+			if (prev != null)
+				help_flagged (prev);
+			return result;
+		}
+
+		public inline void insert (owned Node<G> prev, Node<G>? next) {
+#if DEBUG
+			stderr.printf ("  Inserting %p between %p and %p\n", this, prev, next);
+#endif
+			while (true) {
+				State prev_state;
+				Node<G>? prev_next = get_succ (out prev_state);
+				if (prev_state == State.FLAGGED) {
+					prev_next.help_flagged (prev);
+				} else {
+					set_succ (next, State.NONE);
+					bool result = prev.compare_and_exchange (next, State.NONE, this, State.NONE);
+					if (result)
+						return;
+					prev_next = get_succ (out prev_state);
+					if (prev_state == State.FLAGGED)
+						prev_next.help_flagged (prev);
+					backtrace<G> (ref prev);
+				}
+				search_for<G> (next, ref prev);
+			}
+			
+		}
+
+		public inline void help_flagged (Node<G> prev) {
+#if DEBUG
+			stderr.printf ("    Help flagging %p (previous %p)\n", this, prev);
+#endif
+			set_backlink (prev);
+			if (get_state () == State.MARKED)
+				try_mark ();
+			help_marked (prev);
+		}
+
+		public inline void try_mark () {
+#if DEBUG
+			stderr.printf ("    Try flagging %p\n", this);
+#endif
+			do {
+				Node<G>? next_node = get_next ();
+				bool result = compare_and_exchange (next_node, State.NONE, next_node, State.MARKED);
+				if (!result && get_state () == State.FLAGGED)
+					help_flagged (next_node);
+			} while (get_state () != State.MARKED);
+		}
+
+		public inline void help_marked (Node<G> prev_node) {
+#if DEBUG
+			stderr.printf ("    Help marking %p (previous %p)\n", this, prev_node);
+#endif
+			prev_node.compare_and_exchange (this, State.FLAGGED, get_next (), State.NONE);
+		}
+
+		public inline bool try_flag (ref Node<G>? prev_node) {
+#if DEBUG
+			stderr.printf ("    Try flagging %p (previous %p)\n", this, prev_node);
+#endif
+			while (true) {
+				if (prev_node.compare_succ (this, State.FLAGGED))
+					return false;
+				bool result = prev_node.compare_and_exchange (this, State.NONE, this, State.FLAGGED);
+				if (result)
+					return true;
+				State result_state;
+				Node<G>? result_node = prev_node.get_succ (out result_state);
+				if (result_node == this && result_state == State.FLAGGED)
+					return false;
+				backtrace<G> (ref prev_node);
+				if (!search_for<G> (this, ref prev_node)) {
+					prev_node = null;
+					return false;
+				}
+			}
+		}
+
+		public static inline void backtrace<G> (ref Node<G>? curr) {
+			while (curr.get_state () == State.MARKED)
+				curr = curr.get_backlink ();
+		}
+
+		public inline bool compare_and_exchange (Node<G>? old_node, State old_state, Node<G>? new_node, State new_state) {
+#if DEBUG
+			bool b = HazardPointer.compare_and_exchange_pointer (&_succ, old_node, new_node, 3, (size_t)old_state, (size_t)new_state);
+			stderr.printf ("      Setting %p.succ to (%p, %s) if %p.succ is (%p, %s): %s\n", this, new_node, new_state.to_string (), this, old_node, old_state.to_string (), b ? "success" : "failure");
+			return b;
+#else
+			return HazardPointer.compare_and_exchange_pointer (&_succ, old_node, new_node, 3, (size_t)old_state, (size_t)new_state);
+#endif
+		}
+
+		public inline bool compare_succ (Node<G>? next, State state) {
+			size_t cur = (size_t)AtomicPointer.get (&_succ);
+			return cur == ((size_t)next | (size_t)state);
+		}
+
+		public inline Node<G>? get_next () {
+			return get_succ (null);
+		}
+
+		public inline State get_state () {
+			return (State)((size_t)AtomicPointer.get (&_succ) & 3);
+		}
+
+		public inline Node<G>? get_succ (out State state) {
+			size_t rstate;
+			Node<G>? succ = HazardPointer.get_pointer<Node<G>> (&_succ, 3, out rstate);
+			state = (State)rstate;
+			return (owned)succ;
+		}
+
+		public inline void set_succ (Node<G>? next, State state) {
+#if DEBUG
+			stderr.printf ("      Setting %p.succ to (%p, %s)\n", this, next, state.to_string ());
+#endif
+			HazardPointer.set_pointer<Node<G>> (&_succ, next, 3, (size_t)state);
+		}
+
+		public inline Node<G>? get_backlink () {
+			return HazardPointer.get_pointer<Node<G>> (&_backlink);
+		}
+
+		public inline void set_backlink (Node<G>? backlink) {
+#if DEBUG
+			stderr.printf ("      Setting backlink from %p to %p\n", this, backlink);
+#endif
+			HazardPointer.compare_and_exchange_pointer<Node<G>> (&_backlink, null, backlink);
+		}
+
+		public Node<G> *_succ;
+		public Node<G> *_backlink;
+		public G *_data;
+	}
+
+	private enum State {
+		NONE = 0,
+		MARKED = 1,
+		FLAGGED = 2
+	}
+}
diff --git a/tests/Makefile.am b/tests/Makefile.am
index 5cc570c..a3bcafe 100644
--- a/tests/Makefile.am
+++ b/tests/Makefile.am
@@ -9,6 +9,7 @@ tests_SOURCES = \
        testbidirlist.vala \
        testcase.vala \
        testcollection.vala \
+       testconcurrentlist.vala \
        testdeque.vala \
        testfunctions.vala \
        testhashmap.vala \
diff --git a/tests/testconcurrentlist.vala b/tests/testconcurrentlist.vala
new file mode 100644
index 0000000..3bc5039
--- /dev/null
+++ b/tests/testconcurrentlist.vala
@@ -0,0 +1,38 @@
+/* testconcurrentlist.vala
+ *
+ * Copyright (C) 2011  Maciej Piechotka
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA
+ *
+ * Author:
+ * 	Maciej Piechotka <uzytkownik2 gmail com>
+ */
+
+using Gee;
+
+public class ConcurrentListTests : ListTests {
+	public ConcurrentListTests () {
+		base ("ConcurrentList");
+	}
+
+	public override void set_up () {
+		test_collection = new Gee.ConcurrentList<string> ();
+	}
+
+	public override void tear_down () {
+		test_collection = null;
+	}
+}
+
diff --git a/tests/testmain.vala b/tests/testmain.vala
index 214a241..b605f70 100644
--- a/tests/testmain.vala
+++ b/tests/testmain.vala
@@ -25,6 +25,7 @@ void main (string[] args) {
 	Test.init (ref args);
 
 	TestSuite.get_root ().add_suite (new ArrayListTests ().get_suite ());
+	TestSuite.get_root ().add_suite (new ConcurrentListTests ().get_suite ());
 	TestSuite.get_root ().add_suite (new FunctionsTests ().get_suite ());
 	TestSuite.get_root ().add_suite (new HashMapTests ().get_suite ());
 	TestSuite.get_root ().add_suite (new HashMultiMapTests ().get_suite ());



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]