[libgee] Rework the Futures to a new API



commit fa8bff33b8fadd0f167965cb56809068c5ce7d93
Author: Maciej Piechotka <uzytkownik2 gmail com>
Date:   Sat Jul 27 13:23:18 2013 +0200

    Rework the Futures to a new API
    
    A new API includes the error handling as well as removed redundant
    functions (such as when_done) which can be replaced by async calls.
    
    Future.map, Future.zip and Future.flat_map have also been implemented in
    terms of Promise.

 gee/Makefile.am         |    5 +-
 gee/flatmapfuture.vala  |  164 ------------------------------------------
 gee/future.vala         |  112 +++++++++++++++++------------
 gee/lightmapfuture.vala |   17 +++--
 gee/mapfuture.vala      |  167 -------------------------------------------
 gee/promise.vala        |  154 +++++++++++++++++++++++++++++++---------
 gee/task.vala           |    1 -
 gee/zipfuture.vala      |  180 -----------------------------------------------
 utils/async.h           |   31 ++++++++
 utils/geeutils.vapi     |    8 ++
 10 files changed, 235 insertions(+), 604 deletions(-)
---
diff --git a/gee/Makefile.am b/gee/Makefile.am
index 7094e57..dca4162 100644
--- a/gee/Makefile.am
+++ b/gee/Makefile.am
@@ -33,7 +33,6 @@ libgee_0_8_la_SOURCES = \
        deque.vala \
        functions.vala \
        future.vala \
-       flatmapfuture.vala \
        hashable.vala \
        hashmap.vala \
        hashmultimap.vala \
@@ -48,7 +47,6 @@ libgee_0_8_la_SOURCES = \
        list.vala \
        listiterator.vala \
        map.vala \
-       mapfuture.vala \
        mapiterator.vala \
        multimap.vala \
        multiset.vala \
@@ -79,7 +77,6 @@ libgee_0_8_la_SOURCES = \
        treemultiset.vala \
        treeset.vala \
        unfolditerator.vala \
-       zipfuture.vala \
        $(NULL)
 
 libgee_0_8_la_VALAFLAGS = \
@@ -88,6 +85,7 @@ libgee_0_8_la_VALAFLAGS = \
        --internal-vapi gee-internals-0.8.vapi \
        --library gee-0.8 --gir Gee-0.8.gir \
        --pkg gio-2.0 \
+       --vapidir $(top_srcdir)/utils --pkg geeutils \
        $(COVERAGE_VALAFLAGS) \
        $(VALAFLAGS) \
        $(NULL)
@@ -95,6 +93,7 @@ libgee_0_8_la_VALAFLAGS = \
 libgee_0_8_la_CPPFLAGS = \
        $(GLIB_CFLAGS) \
        $(COVERAGE_CFLAGS) \
+       -I$(top_srcdir)/utils \
        $(NULL)
 
 libgee_0_8_la_LIBADD = \
diff --git a/gee/future.vala b/gee/future.vala
index 7b8cc5a..a927ced 100644
--- a/gee/future.vala
+++ b/gee/future.vala
@@ -48,9 +48,13 @@ public interface Gee.Future<G> : Object {
         * Returned value is always the same and it is alive at least as long
         * as the future.
         */
-       public virtual new G value {
+       public virtual new G? value {
                get {
-                       return wait ();
+                       try {
+                               return wait ();
+                       } catch (FutureError ex) {
+                               return null;
+                       }
                }
        }
 
@@ -61,6 +65,16 @@ public interface Gee.Future<G> : Object {
        public abstract bool ready {get;}
 
        /**
+        * Checks the exception that have been set. I.e. if the computation
+        * has thrown the exception it should be set here and the { link wait},
+        * { link wait_until} and { link wait_async} should throw
+        * { link FutureError.EXCEPTION}.
+        *
+        * @since 0.11.5
+        */
+       public abstract GLib.Error? exception {get;}
+
+       /**
         * Waits until the value is ready.
         *
         * @returns The { link value} associated with future
@@ -68,7 +82,7 @@ public interface Gee.Future<G> : Object {
         * @see wait_until
         * @see wait_async
         */
-       public abstract unowned G wait ();
+       public abstract unowned G wait () throws Gee.FutureError;
 
        /**
         * Waits until the value is ready or deadline have passed.
@@ -80,7 +94,7 @@ public interface Gee.Future<G> : Object {
         * @see wait
         * @see wait_async
         */
-       public abstract bool wait_until (int64 end_time, out unowned G? value = null);
+       public abstract bool wait_until (int64 end_time, out unowned G? value = null) throws Gee.FutureError;
 
        /**
         * Reschedules the callback until the { link value} is available.
@@ -90,40 +104,7 @@ public interface Gee.Future<G> : Object {
         * @see wait
         * @see wait_until
         */
-       public virtual async unowned G wait_async () {
-               unowned G? result = null;
-               bool looped = true;
-               RecMutex mutex = RecMutex();
-               mutex.lock ();
-               when_done ((value) => {
-                       mutex.lock ();
-                       bool looped_copy = looped;
-                       mutex.unlock ();
-                       result = value;
-                       if (looped_copy) {
-                               Idle.add (wait_async.callback);
-                       } else {
-                               wait_async.callback ();
-                       }
-               });
-               looped = false;
-               mutex.unlock ();
-               yield;
-               return result;
-       }
-
-       [CCode (scope = "async")]
-       public delegate void WhenDoneFunc<G>(G value);
-
-       /**
-        * Registers a callback which is called once the future is { link ready}.
-        *
-        * Note: As usually the callbacks are called from thread finishing the
-        *   future it is recommended to not include lengthly computation.
-        *   If one is needed please use { link task}.
-        */
-       public abstract void when_done (owned WhenDoneFunc<G> func);
-
+       public abstract async unowned G wait_async () throws Gee.FutureError;
        public delegate A MapFunc<A, G> (G value);
 
        /**
@@ -142,7 +123,15 @@ public interface Gee.Future<G> : Object {
         *   { link task} and { link flat_map} for longer computation.
         */
        public virtual Future<A> map<A> (owned MapFunc<A, G> func) {
-               return new MapFuture<A, G> (this, (owned)func);
+               Promise<A> promise = new Promise<A> ();
+               wait_async.begin ((obj, res) => {
+                       try {
+                               promise.set_value (func (wait_async.end (res)));
+                       } catch (Error ex) {
+                               promise.set_exception ((owned)ex);
+                       }
+               });
+               return promise.future;
        }
 
        public delegate unowned A LightMapFunc<A, G> (G value);
@@ -190,10 +179,21 @@ public interface Gee.Future<G> : Object {
         *   future from { link task} and use { link flat_map} for longer computation.
         */
        public virtual Future<B> zip<A, B> (owned ZipFunc<G, A, B> zip_func, Future<A> second) {
-               return new ZipFuture<G, A, B> ((owned)zip_func, this, second);
+               Promise<B> promise = new Promise<B> ();
+               do_zip.begin<G, A, B> ((owned) zip_func, this, second, promise, (obj, res) => {do_zip.end<G, 
A, B> (res);});
+               return promise.future;
+       }
+
+       private static async void do_zip<A, B, C> (owned ZipFunc<A, B, C> zip_func, Future<A> first, 
Future<B> second, Promise<C> result) {
+               try {
+                       A left = yield first.wait_async ();
+                       B right = yield second.wait_async ();
+                       result.set_value (zip_func (left, right));
+               } catch (Error ex) {
+                       result.set_exception ((owned)ex);
+               }
        }
 
-       [CCode (scope = "async")]
        public delegate Gee.Future<A> FlatMapFunc<A, G>(G value);
 
        /**
@@ -211,14 +211,36 @@ public interface Gee.Future<G> : Object {
         *   { link task}
         */
        public virtual Gee.Future<A> flat_map<A>(owned FlatMapFunc<A, G> func) {
-               return new FlatMapFuture<A, G> (this, (owned)func);
+               Promise<A> promise = new Promise<A> ();
+               do_flat_map.begin<G, A> ((owned)func, this, promise, (obj, res) => {do_flat_map.end<G, A> 
(res);});
+               return promise.future;
+       }
+
+       private static async void do_flat_map<A, B> (owned FlatMapFunc<B, A> func, Future<A> future, 
Promise<B> promise) {
+               try {
+                       A input = yield future.wait_async ();
+                       B output = yield func (input).wait_async ();
+                       promise.set_value ((owned)output);
+               } catch (Error ex) {
+                       promise.set_exception ((owned)ex);
+               }
        }
 
-       internal struct WhenDoneArrayElement<G> {
-               public WhenDoneArrayElement (owned WhenDoneFunc<G> func) {
+       internal struct SourceFuncArrayElement {
+               public SourceFuncArrayElement (owned SourceFunc func) {
                        this.func = (owned)func;
                }
-               public WhenDoneFunc<G> func;
+               public SourceFunc func;
        }
 }
 
+public errordomain Gee.FutureError {
+       /**
+        * The promise have been abandon - this indicates an error in program.
+        */
+       ABANDON_PROMISE,
+       /**
+        * Exception field has been set.
+        */
+       EXCEPTION
+}
diff --git a/gee/lightmapfuture.vala b/gee/lightmapfuture.vala
index bc8e995..a5adaed 100644
--- a/gee/lightmapfuture.vala
+++ b/gee/lightmapfuture.vala
@@ -19,7 +19,6 @@
  * Author:
  *     Maciej Piechotka <uzytkownik2 gmail com>
  */
-
 internal class Gee.LightMapFuture<A, G> : Object, Future<A> {
        public LightMapFuture (Future<G> base_future, Future.LightMapFunc<A, G> func) {
                _base = base_future;
@@ -32,11 +31,17 @@ internal class Gee.LightMapFuture<A, G> : Object, Future<A> {
                }
        }
 
-       public unowned A wait () {
+       public GLib.Error exception {
+               get {
+                       return _base.exception;
+               }
+       }
+
+       public unowned A wait () throws Gee.FutureError {
                return _func (_base.wait ());
        }
 
-       public bool wait_until (int64 end_time, out unowned G? value = null) {
+       public bool wait_until (int64 end_time, out unowned G? value = null) throws Gee.FutureError {
                unowned A arg;
                bool result;
                if ((result = _base.wait_until (end_time, out arg))) {
@@ -45,15 +50,11 @@ internal class Gee.LightMapFuture<A, G> : Object, Future<A> {
                return result;
        }
 
-       public async unowned G wait_async () {
+       public async unowned G wait_async () throws Gee.FutureError {
                unowned A arg = yield _base.wait_async ();
                return _func (arg);
        }
 
-       public void when_done (owned Future.WhenDoneFunc<G> func) {
-               _base.when_done ((a) => {_func (a);});
-       }
-
        private Future<G> _base;
        private Future.LightMapFunc<A, G> _func;
 }
diff --git a/gee/promise.vala b/gee/promise.vala
index 45f46ce..cb92196 100644
--- a/gee/promise.vala
+++ b/gee/promise.vala
@@ -34,6 +34,10 @@ using GLib;
  * @since 0.11.0
  */
 public class Gee.Promise<G> {
+       ~Promise () {
+               _future.abandon ();
+       }
+
        /**
         * { link Future} value of this promise
         */
@@ -49,79 +53,157 @@ public class Gee.Promise<G> {
         * @params value Value of future
         */
        public void set_value (owned G value) {
-               _future.set_value (value);
+               _future.set_value ((owned)value);
+       }
+
+       /**
+        * Sets the exception.
+        *
+        * @params exception Exception thrown
+        */
+       public void set_exception (owned GLib.Error exception) {
+               _future.set_exception ((owned)exception);
        }
 
        private class Future<G> : Object, Gee.Future<G> {
                public bool ready {
                        get {
-                               _mutex.lock();
-                               bool result = _ready;
-                               _mutex.unlock();
+                               _mutex.lock ();
+                               bool result = _state != State.INIT;
+                               _mutex.unlock ();
                                return result;
                        }
                }
 
-               public unowned G wait () {
-                       _mutex.lock();
-                       if (!_ready) {
+               public GLib.Error? exception {
+                       get {
+                               return _exception;
+                       }
+               }
+
+               public unowned G wait () throws FutureError {
+                       _mutex.lock ();
+                       State state = _state;
+                       if (_state == State.INIT) {
                                _set.wait (_mutex);
+                               state = _state;
+                       }
+                       assert (state != State.INIT);
+                       _mutex.unlock ();
+                       switch (state) {
+                       case State.ABANDON:
+                               throw new FutureError.ABANDON_PROMISE ("Promise has been abandon");
+                       case State.EXCEPTION:
+                               throw new FutureError.EXCEPTION ("Exception has been thrown");
+                       case State.READY:
+                               return _value;
+                       default:
+                               assert_not_reached ();
                        }
-                       _mutex.unlock();
-                       return _value;
                }
 
-               public bool wait_until (int64 end_time, out unowned G? value = null) {
-                       bool result = true;
-                       _mutex.lock();
-                       if (!_ready) {
-                               if (!_set.wait_until (_mutex, end_time)) {
-                                       result = false;
-                               }
+               public bool wait_until (int64 end_time, out unowned G? value = null) throws FutureError {
+                       _mutex.lock ();
+                       State state = _state;
+                       if (state == State.INIT) {
+                               _set.wait_until (_mutex, end_time);
+                               state = _state;
                        }
-                       _mutex.unlock();
-                       if (result) {
+                       _mutex.unlock ();
+                       switch (state) {
+                       case State.INIT:
+                               return false;
+                       case State.ABANDON:
+                               throw new FutureError.ABANDON_PROMISE ("Promise has been abandon");
+                       case State.EXCEPTION:
+                               throw new FutureError.EXCEPTION ("Exception has been thrown");
+                       case State.READY:
                                value = _value;
-                       } else {
-                               value = null;
+                               return true;
+                       default:
+                               assert_not_reached ();
                        }
-                       return result;
                }
 
-               public void when_done (owned Gee.Future.WhenDoneFunc<G> func) {
+               public async unowned G wait_async () throws Gee.FutureError {
                        _mutex.lock ();
-                       if (_ready) {
-                               _mutex.unlock ();
-                               func (_value);
+                       State state = _state;
+                       if (state == State.INIT) {
+                               _when_done += SourceFuncArrayElement(wait_async.callback);
+                               yield Gee.Utils.Async.yield_and_unlock (_mutex);
+                               state = _state;
                        } else {
-                               _when_done += Gee.Future.WhenDoneArrayElement<G>(func);
                                _mutex.unlock ();
                        }
+                       assert (state != State.INIT);
+                       switch (state) {
+                       case State.ABANDON:
+                               throw new FutureError.ABANDON_PROMISE ("Promise has been abandon");
+                       case State.EXCEPTION:
+                               throw new FutureError.EXCEPTION ("Exception has been thrown");
+                       case State.READY:
+                               return _value;
+                       default:
+                               assert_not_reached ();
+                       }
                }
 
                internal void set_value (owned G value) {
-                       unowned G value_copy = value;
-
                        _mutex.lock ();
-                       assert (!_ready);
+                       assert (_state == State.INIT);
+                       _state = State.READY;
                        _value = (owned)value;
-                       _ready = true;
                        _set.broadcast ();
                        _mutex.unlock ();
+                       Gee.Future.SourceFuncArrayElement<G>[] when_done = (owned)_when_done;
+                       for (int i = 0; i < when_done.length; i++) {
+                               when_done[i].func ();
+                       }
+               }
 
-                       Gee.Future.WhenDoneArrayElement<G>[] when_done = (owned)_when_done;
+               internal void set_exception (owned GLib.Error? exception) {
+                       _mutex.lock ();
+                       assert (_state == State.INIT);
+                       _state = State.EXCEPTION;
+                       _exception = (owned)exception;
+                       _set.broadcast ();
+                       _mutex.unlock ();
+                       Gee.Future.SourceFuncArrayElement<G>[] when_done = (owned)_when_done;
                        for (int i = 0; i < when_done.length; i++) {
-                               when_done[i].func (value_copy);
+                               when_done[i].func ();
+                       }
+               }
+
+               internal void abandon () {
+                       _mutex.lock ();
+                       if (_state != State.INIT) {
+                               _mutex.unlock ();
+                               return;
+                       }
+                       assert (_state == State.INIT);
+                       _state = State.ABANDON;
+                       _set.broadcast ();
+                       _mutex.unlock ();
+                       Gee.Future.SourceFuncArrayElement<G>[] when_done = (owned)_when_done;
+                       for (int i = 0; i < when_done.length; i++) {
+                               when_done[i].func ();
                        }
                }
 
                private Mutex _mutex = Mutex ();
                private Cond _set = Cond ();
-               private G _value;
-               private bool _ready;
-               private Gee.Future.WhenDoneArrayElement<G>[]? _when_done = new 
Gee.Future.WhenDoneArrayElement<G>[0];
-       }
+               private State _state;
+               private G? _value;
+               private GLib.Error? _exception;
+               private Gee.Future.SourceFuncArrayElement<G>[]? _when_done = new 
Gee.Future.SourceFuncArrayElement<G>[0];
 
+               private enum State {
+                       INIT,
+                       ABANDON,
+                       EXCEPTION,
+                       READY
+               }
+       }
        private Future<G> _future = new Future<G>();
 }
 
diff --git a/gee/task.vala b/gee/task.vala
index 7aac703..9c914a5 100644
--- a/gee/task.vala
+++ b/gee/task.vala
@@ -19,7 +19,6 @@
  * Author:
  *     Maciej Piechotka <uzytkownik2 gmail com>
  */
-
 namespace Gee {
        public delegate G Task<G>();
 
diff --git a/utils/async.h b/utils/async.h
new file mode 100644
index 0000000..63270e7
--- /dev/null
+++ b/utils/async.h
@@ -0,0 +1,31 @@
+/* async.h
+ *
+ * Copyright (C) 2013  Maciej Piechotka
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA
+ *
+ * Author:
+ *     Maciej Piechotka <uzytkownik2 gmail com>
+ */
+#ifndef GEE_UTILS_ASYNC
+#define GEE_UTILS_ASYNC
+
+#include <glib.h>
+
+#define gee_utils_async_yield_and_unlock(mutex, callback, user_data) g_mutex_unlock (mutex)
+#define gee_utils_async_yield_and_unlock_finish(arg) do {} while (0)
+
+#endif
+
diff --git a/utils/geeutils.vapi b/utils/geeutils.vapi
new file mode 100644
index 0000000..451d1e1
--- /dev/null
+++ b/utils/geeutils.vapi
@@ -0,0 +1,8 @@
+namespace Gee {
+       namespace Utils {
+               namespace Async {
+                       [CCode (cheader_filename = "async.h")]
+                       public async void yield_and_unlock (GLib.Mutex mutex);
+               }
+       }
+}


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]