[libgda/LIBGDA_4.2] Added notification for GdaThreadWrapper using a GIOChannel



commit d6be07ff7dd2dc228f39c53d9dcf4b207adeaf42
Author: Vivien Malerba <malerba gnome-db org>
Date:   Mon Jun 6 20:59:47 2011 +0200

    Added notification for GdaThreadWrapper using a GIOChannel
    
    to easily integrate into a main loop

 doc/C/libgda-sections.txt                  |    4 +
 libgda/libgda.symbols                      |    2 +
 libgda/thread-wrapper/gda-thread-wrapper.c |  336 +++++++++++++++++++++++++++-
 libgda/thread-wrapper/gda-thread-wrapper.h |  138 ++++++++++++
 tools/browser/browser-connection-priv.h    |    4 +-
 tools/browser/browser-connection.c         |  145 +++++++++----
 6 files changed, 579 insertions(+), 50 deletions(-)
---
diff --git a/doc/C/libgda-sections.txt b/doc/C/libgda-sections.txt
index 130dd9f..960d722 100644
--- a/doc/C/libgda-sections.txt
+++ b/doc/C/libgda-sections.txt
@@ -1839,6 +1839,10 @@ gda_sql_builder_get_type
 <INCLUDE>libgda/thread-wrapper/gda-thread-wrapper.h</INCLUDE>
 GdaThreadWrapper
 gda_thread_wrapper_new
+GdaThreadNotification
+GdaThreadNotificationType
+gda_thread_wrapper_get_io_channel
+gda_thread_wrapper_unset_io_channel
 GdaThreadWrapperFunc
 gda_thread_wrapper_execute
 GdaThreadWrapperVoidFunc
diff --git a/libgda/libgda.symbols b/libgda/libgda.symbols
index 29f3ad6..d485e8e 100644
--- a/libgda/libgda.symbols
+++ b/libgda/libgda.symbols
@@ -832,11 +832,13 @@
 	gda_thread_wrapper_execute
 	gda_thread_wrapper_execute_void
 	gda_thread_wrapper_fetch_result
+	gda_thread_wrapper_get_io_channel
 	gda_thread_wrapper_get_type
 	gda_thread_wrapper_get_waiting_size
 	gda_thread_wrapper_iterate
 	gda_thread_wrapper_new
 	gda_thread_wrapper_steal_signal
+	gda_thread_wrapper_unset_io_channel
 	gda_time_copy
 	gda_time_free
 	gda_time_get_type
diff --git a/libgda/thread-wrapper/gda-thread-wrapper.c b/libgda/thread-wrapper/gda-thread-wrapper.c
index eb4ba48..f319187 100644
--- a/libgda/thread-wrapper/gda-thread-wrapper.c
+++ b/libgda/thread-wrapper/gda-thread-wrapper.c
@@ -20,6 +20,8 @@
  * Boston, MA 02111-1307, USA.
  */
 
+//#define DEBUG_NOTIFICATION
+
 #include <string.h>
 #include <glib/gi18n-lib.h>
 #include "gda-thread-wrapper.h"
@@ -27,6 +29,8 @@
 #include <gobject/gvaluecollector.h>
 #include <libgda/gda-debug-macros.h>
 #include <libgda/gda-value.h>
+#include <unistd.h>
+#include <sys/stat.h>
 
 /* this GPrivate holds a pointer to the GAsyncQueue used by the job being currently treated
  * by the worker thread. It is used to avoid creating signal data for threads for which
@@ -37,6 +41,7 @@ GStaticPrivate worker_thread_current_queue = G_STATIC_PRIVATE_INIT;
 typedef struct _ThreadData ThreadData;
 typedef struct _Job Job;
 typedef struct _SignalSpec SignalSpec;
+typedef struct _Pipe Pipe;
 
 struct _GdaThreadWrapperPrivate {
 	GdaMutex    *mutex;
@@ -45,8 +50,106 @@ struct _GdaThreadWrapperPrivate {
 	GAsyncQueue *to_worker_thread;
 
 	GHashTable  *threads_hash; /* key = a GThread, value = a #ThreadData pointer */
+	GHashTable  *pipes_hash; /* key = a GThread, value = a #Pipe pointer */
+};
+
+/*
+ * Threads synchronization with notifications
+ */
+struct _Pipe {
+	GThread     *thread;
+	int          fds[2]; /* [0] for reading and [1] for writing */
+	GIOChannel  *ioc;
+
+	GMutex      *mutex; /* locks @ref_count */
+	guint        ref_count;
 };
 
+#define pipe_lock(x) g_mutex_lock(((Pipe*)x)->mutex);
+#define pipe_unlock(x) g_mutex_unlock(((Pipe*)x)->mutex);
+
+static Pipe *
+pipe_ref (Pipe *p)
+{
+	if (p) {
+		pipe_lock (p);
+		p->ref_count++;
+#ifdef DEBUG_NOTIFICATION
+		g_print ("Pipe %p ++: %u\n", p, p->ref_count);
+#endif
+		pipe_unlock (p);
+	}
+	return p;
+}
+
+static void
+pipe_unref (Pipe *p)
+{
+	if (p) {
+		pipe_lock (p);
+		p->ref_count--;
+#ifdef DEBUG_NOTIFICATION
+		g_print ("Pipe %p --: %u\n", p, p->ref_count);
+#endif
+		pipe_unlock (p);
+		if (p->ref_count == 0) {
+			/* destroy @p */
+			if (p->ioc)
+				g_io_channel_unref (p->ioc);
+			if (p->fds[0] >= 0)
+				close (p->fds[0]);
+			if (p->fds[1] >= 0)
+				close (p->fds[1]);
+			g_mutex_free (p->mutex);
+#ifdef DEBUG_NOTIFICATION
+			g_print ("Destroyed Pipe %p\n", p);
+#endif
+			g_free (p);
+		}
+	}
+}
+
+/*
+ * May return %NULL
+ */
+static Pipe *
+pipe_new (void)
+{
+	Pipe *p = g_new0 (Pipe, 1);
+	p->mutex = g_mutex_new ();
+	p->ref_count = 1;
+	p->thread = g_thread_self ();
+	if (pipe (p->fds) != 0) {
+		pipe_unref (p);
+		return NULL;
+	}
+#ifdef G_OS_WIN32
+	p->ioc = g_io_channel_win32_new_fd (p->fds [0]);
+#else
+	p->ioc = g_io_channel_unix_new (p->fds [0]);
+#endif
+
+	/* we want raw data */
+	if (g_io_channel_set_encoding (p->ioc, NULL, NULL) != G_IO_STATUS_NORMAL) {
+		g_warning ("Can't set IO encoding to NULL\n");
+		pipe_unref (p);
+		return NULL;
+	}
+#ifdef DEBUG_NOTIFICATION
+	g_print ("Created Pipe %p\n", p);
+#endif
+	return p;
+}
+
+static Pipe *
+get_pipe (GdaThreadWrapper *wrapper, GThread *thread)
+{
+	if (wrapper->priv->pipes_hash)
+		return g_hash_table_lookup (wrapper->priv->pipes_hash, thread);
+	else
+		return NULL;
+}
+
 /* 
  * One instance for each job to execute (and its result) and
  * one instance for each emitted signal
@@ -57,9 +160,10 @@ struct _GdaThreadWrapperPrivate {
  * Passed to the sub job through obj->to_worker_thread
  */
 typedef enum {
-	JOB_TYPE_EXECUTE,
-	JOB_TYPE_DESTROY,
-	JOB_TYPE_SIGNAL
+	JOB_TYPE_EXECUTE, /* a "real" job for the GdaThreadWrapper */
+	JOB_TYPE_DESTROY, /* internal to signal the internal thread to shutdown */
+	JOB_TYPE_SIGNAL, /* a signal from an object in the internal thread */
+	JOB_TYPE_NOTIFICATION_ERROR /* internal to signal notification error and sutdown */
 } JobType;
 struct _Job {
 	JobType                  type;
@@ -71,6 +175,7 @@ struct _Job {
 	gpointer                 arg;
 	GDestroyNotify           arg_destroy_func;
 	GAsyncQueue             *reply_queue; /* holds a ref to it */
+	Pipe                    *notif; /* if not %NULL, notification when job finished */
 
 	/* result part */
 	union {
@@ -89,6 +194,7 @@ struct _Job {
 static void
 job_free (Job *job)
 {
+	pipe_unref (job->notif);
 	if (job->arg && job->arg_destroy_func)
 		job->arg_destroy_func (job->arg);
 	if (job->reply_queue)
@@ -110,6 +216,9 @@ job_free (Job *job)
 	else if (job->type == JOB_TYPE_DESTROY) {
 		/* nothing to do here */
 	}
+	else if (job->type == JOB_TYPE_NOTIFICATION_ERROR) {
+		/* nothing to do here */
+	}
 	else
 		g_assert_not_reached ();
 	g_free (job);
@@ -126,6 +235,7 @@ struct _SignalSpec {
 	gboolean      private;
 	GThread      *worker_thread;
 	GAsyncQueue  *reply_queue; /* a ref is held here */
+	Pipe         *notif; /* if not %NULL, notification */
 
         gpointer      instance;
         gulong        signal_id;
@@ -154,6 +264,7 @@ signal_spec_unref (SignalSpec *sigspec)
 			g_signal_handler_disconnect (sigspec->instance, sigspec->signal_id);
 		if (sigspec->reply_queue)
 			g_async_queue_unref (sigspec->reply_queue);
+		pipe_unref (sigspec->notif);
 		g_free (sigspec);
 	}
 	else
@@ -182,6 +293,8 @@ struct _ThreadData {
 
 	GSList      *jobs; /* list of Job pointers not yet handled, or being handled (ie not yet poped from @from_worker_thread) */
 	GSList      *results; /* list of Job pointers to completed jobs (ie. poped from @from_worker_thread) */
+
+	Pipe        *notif; /* if not %NULL, notification when any job has finished */
 };
 #define THREAD_DATA(x) ((ThreadData*)(x))
 
@@ -193,12 +306,15 @@ get_thread_data (GdaThreadWrapper *wrapper, GThread *thread)
 	gda_mutex_lock (wrapper->priv->mutex);
 	td = g_hash_table_lookup (wrapper->priv->threads_hash, thread);
 	if (!td) {
+		Pipe *p;
+		p = get_pipe (wrapper, thread);
+
 		td = g_new0 (ThreadData, 1);
 		td->owner = thread;
 		td->from_worker_thread = g_async_queue_new_full ((GDestroyNotify) job_free);
 		td->jobs = NULL;
 		td->results = NULL;
-
+		td->notif = pipe_ref (p);
 		g_hash_table_insert (wrapper->priv->threads_hash, thread, td);
 	}
 	gda_mutex_unlock (wrapper->priv->mutex);
@@ -208,6 +324,7 @@ get_thread_data (GdaThreadWrapper *wrapper, GThread *thread)
 static void
 thread_data_free (ThreadData *td)
 {
+	pipe_unref (td->notif);
 	g_async_queue_unref (td->from_worker_thread);
 	td->from_worker_thread = NULL;
 	g_assert (!td->jobs);
@@ -274,6 +391,79 @@ gda_thread_wrapper_class_init (GdaThreadWrapperClass *klass)
 	object_class->dispose = gda_thread_wrapper_dispose;
 }
 
+static void
+clean_notifications (GdaThreadWrapper *wrapper, ThreadData *td)
+{
+#ifdef DEBUG_NOTIFICATION
+	g_print ("%s(Pipe:%p)\n", __FUNCTION__, td->notif);
+#endif
+	GSList *list;
+	for (list = td->signals_list; list; list = list->next) {
+		SignalSpec *sigspec;
+		sigspec = (SignalSpec*) list->data;
+		signal_spec_lock (sigspec);
+		if (sigspec->notif == td->notif) {
+			pipe_unref (sigspec->notif);
+			sigspec->notif = NULL;
+		}
+		signal_spec_unlock (sigspec);
+	}
+
+	pipe_unref (td->notif);
+	td->notif = NULL;
+
+	g_hash_table_remove (wrapper->priv->pipes_hash, td->owner);
+}
+
+/*
+ * @wrapper: may be %NULL
+ * @td: may be %NULL
+ *
+ * Either @wrapper and @td are both NULL, or they are both NOT NULL
+ */
+static gboolean
+write_notification (GdaThreadWrapper *wrapper, ThreadData *td,
+		    Pipe *p, GdaThreadNotificationType type, guint job_id)
+{
+	g_assert ((wrapper && td) || (!wrapper && !td));
+
+	if (!p || (p->fds[1] < 0))
+		return TRUE;
+#ifdef DEBUG_NOTIFICATION_FORCE
+	static guint c = 0;
+	c++;
+	if (c == 4)
+		goto onerror;
+#endif
+
+	GdaThreadNotification notif;
+	ssize_t nw;
+	notif.type = type;
+	notif.job_id = job_id;
+
+	nw = write (p->fds[1], &notif, sizeof (notif));
+	if (nw != sizeof (notif)) {
+		/* Error */
+		goto onerror;
+	}
+#ifdef DEBUG_NOTIFICATION
+	g_print ("Wrote notification %d.%u to pipe %p\n", type, job_id, p);
+#endif
+	return TRUE;
+
+ onerror:
+#ifdef DEBUG_NOTIFICATION
+	g_print ("%s(): returned FALSE\n", __FUNCTION__);
+	g_print ("Closed FD %d\n", p->fds [1]);
+#endif
+	/* close the writing end of the pipe */
+	close (p->fds [1]);
+	p->fds [1] = -1;
+	if (td)
+		clean_notifications (wrapper, td);
+	return FALSE;
+}
+
 /*
  * Executed in the sub thread:
  * takes a Job in (from the wrapper->priv->to_worker_thread queue) and creates a new Result which 
@@ -320,6 +510,12 @@ worker_thread_entry_point (GAsyncQueue *to_worker_thread)
 				job->void_func (job->arg, &(job->u.exe.error));
 			}
 			g_async_queue_push (job->reply_queue, job);
+			if (! write_notification (NULL, NULL, job->notif, GDA_THREAD_NOTIFICATION_JOB,
+						 job->job_id)) {
+				Job *je = g_new0 (Job, 1);
+				je->type = JOB_TYPE_NOTIFICATION_ERROR;
+				g_async_queue_push (job->reply_queue, je);
+			}
 		}
 		else
 			g_assert_not_reached ();
@@ -345,6 +541,9 @@ gda_thread_wrapper_init (GdaThreadWrapper *wrapper, G_GNUC_UNUSED GdaThreadWrapp
 	wrapper->priv->worker_thread = g_thread_create ((GThreadFunc) worker_thread_entry_point,
 						     g_async_queue_ref (wrapper->priv->to_worker_thread), /* inc. ref for sub thread usage */
 						     FALSE, NULL);
+	
+	wrapper->priv->pipes_hash = NULL;
+
 #ifdef THREAD_WRAPPER_DEBUG
 	g_print ("... new wrapper %p, worker_thread=%p\n", wrapper, wrapper->priv->worker_thread);
 #endif
@@ -391,6 +590,7 @@ gda_thread_wrapper_dispose (GObject *object)
 	if (wrapper->priv) {
 		Job *job = g_new0 (Job, 1);
 		job->type = JOB_TYPE_DESTROY;
+		job->notif = NULL;
 		g_async_queue_push (wrapper->priv->to_worker_thread, job);
 #ifdef THREAD_WRAPPER_DEBUG
 		g_print ("... pushed JOB_TYPE_DESTROY for wrapper %p\n", wrapper);
@@ -408,6 +608,9 @@ gda_thread_wrapper_dispose (GObject *object)
 
 		gda_mutex_free (wrapper->priv->mutex);
 
+		if (wrapper->priv->pipes_hash)
+			g_hash_table_destroy (wrapper->priv->pipes_hash);
+
 		g_free (wrapper->priv);
 		wrapper->priv = NULL;
 	}
@@ -514,7 +717,93 @@ gda_thread_wrapper_new (void)
 }
 
 /**
- * gda_thread_wrapper_execute
+ * gda_thread_wrapper_get_io_channel:
+ * @wrapper: a #GdaThreadWrapper object
+ *
+ * Allow @wrapper to notify when an execution job is finished, by making its exec ID
+ * readable through a new #GIOChannel. This function is useful when the notification needs
+ * to be included into a main loop. This also notifies that signals (emitted by objects in
+ * @wrapper's internal thread) are available.
+ *
+ * The returned #GIOChannel will have something to read everytime an execution job is finished
+ * for an execution job submitted from the calling thread. The user whould read #GdaThreadNotification
+ * structures from the channel and analyse its contents to call gda_thread_wrapper_iterate()
+ * or gda_thread_wrapper_fetch_result().
+ *
+ * Note1: the new communication channel will only be operational for jobs submitted after this
+ * function returns, and for signals which have been connected after this function returns. A safe
+ * practice is to call this function before the @wrapper object has been used.
+ *
+ * Note2: this function will return the same #GIOChannel everytime it's called from the same thread.
+ *
+ * Note3: if the usage of the returned #GIOChannel reveals an error, then g_io_channel_shutdown() and
+ * g_io_channel_unref() should be called on the #GIOChannel to let @wrapper know it should not use
+ * that object anymore.
+ *
+ * Returns: (transfer none): a new #GIOChannel, or %NULL if it could not be created
+ *
+ * Since: 4.2.9
+ */
+GIOChannel *
+gda_thread_wrapper_get_io_channel (GdaThreadWrapper *wrapper)
+{
+	Pipe *p;
+	GThread *th;
+	g_return_val_if_fail (GDA_IS_THREAD_WRAPPER (wrapper), NULL);
+	g_return_val_if_fail (wrapper->priv, NULL);
+
+	th = g_thread_self ();
+	gda_mutex_lock (wrapper->priv->mutex);
+	p = get_pipe (wrapper, th);
+	if (!p) {
+		p = pipe_new ();
+		if (p) {
+			if (! wrapper->priv->pipes_hash)
+				wrapper->priv->pipes_hash = g_hash_table_new_full (g_direct_hash,
+										   g_direct_equal,
+										   NULL,
+										   (GDestroyNotify) pipe_unref);
+			g_hash_table_insert (wrapper->priv->pipes_hash, th, p);
+		}
+	}
+	gda_mutex_unlock (wrapper->priv->mutex);
+	if (p)
+		return p->ioc;
+	else
+		return NULL;
+}
+
+/**
+ * gda_thread_wrapper_unset_io_channel:
+ * @wrapper: a #GdaThreadWrapper
+ *
+ * Does the opposite of gda_thread_wrapper_get_io_channel()
+ *
+ * Since: 4.2.9
+ */
+void
+gda_thread_wrapper_unset_io_channel (GdaThreadWrapper *wrapper)
+{
+	GThread *th;
+	ThreadData *td;
+
+	g_return_if_fail (GDA_IS_THREAD_WRAPPER (wrapper));
+	g_return_if_fail (wrapper->priv);
+
+	gda_mutex_lock (wrapper->priv->mutex);
+	th = g_thread_self ();
+	td = g_hash_table_lookup (wrapper->priv->threads_hash, th);
+	if (td) {
+		Pipe *p;
+		p = get_pipe (wrapper, th);
+		if (p)
+			clean_notifications (wrapper, td);
+	}
+	gda_mutex_unlock (wrapper->priv->mutex);
+}
+
+/**
+ * gda_thread_wrapper_execute:
  * @wrapper: a #GdaThreadWrapper object
  * @func: the function to execute
  * @arg: argument to pass to @func
@@ -562,6 +851,7 @@ gda_thread_wrapper_execute (GdaThreadWrapper *wrapper, GdaThreadWrapperFunc func
 	job->arg = arg;
 	job->arg_destroy_func = arg_destroy_func;
 	job->reply_queue = g_async_queue_ref (td->from_worker_thread);
+	job->notif = pipe_ref (td->notif);
 
 	id = job->job_id;
 #ifdef THREAD_WRAPPER_DEBUG
@@ -582,6 +872,7 @@ gda_thread_wrapper_execute (GdaThreadWrapper *wrapper, GdaThreadWrapperFunc func
 		g_print ("... IMMEDIATELY done job %d => %p\n", job->job_id, job->u.exe.result);
 #endif
                 g_async_queue_push (job->reply_queue, job);
+		write_notification (wrapper, td, job->notif, GDA_THREAD_NOTIFICATION_JOB, job->job_id);
         }
         else
                 g_async_queue_push (wrapper->priv->to_worker_thread, job);
@@ -640,6 +931,7 @@ gda_thread_wrapper_execute_void (GdaThreadWrapper *wrapper, GdaThreadWrapperVoid
 	job->arg = arg;
 	job->arg_destroy_func = arg_destroy_func;
 	job->reply_queue = g_async_queue_ref (td->from_worker_thread);
+	job->notif = pipe_ref (td->notif);
 
 	id = job->job_id;
 #ifdef THREAD_WRAPPER_DEBUG
@@ -660,6 +952,7 @@ gda_thread_wrapper_execute_void (GdaThreadWrapper *wrapper, GdaThreadWrapperVoid
 		g_print ("... IMMEDIATELY done VOID job %d => %p\n", job->job_id, job->u.exe.result);
 #endif
                 g_async_queue_push (job->reply_queue, job);
+		write_notification (wrapper, td, job->notif, GDA_THREAD_NOTIFICATION_JOB, job->job_id);
         }
         else
 		g_async_queue_push (wrapper->priv->to_worker_thread, job);
@@ -732,9 +1025,15 @@ gda_thread_wrapper_cancel (GdaThreadWrapper *wrapper, guint id)
  * @may_block: whether the call may block
  *
  * This method gives @wrapper a chance to check if some functions to be executed have finished
- * <emphasis>for the calling thread</emphasis>. It handles one function's execution result, and
- * if @may_block is %TRUE, then it will block untill there is one (functions returning void are
- * ignored).
+ * <emphasis>for the calling thread</emphasis>. In this case it handles the execution result and
+ * makes it ready to be processed using gda_thread_wrapper_fetch_result().
+ *
+ * This method also allows @wrapper to handle signals which may have been emitted by objects
+ * while in the worker thread, and call the callback function specified when gda_thread_wrapper_connect_raw()
+ * was used.
+ *
+ * If @may_block is %TRUE, then it will block untill there is one finished execution 
+ * (functions returning void and signals are ignored regarding this argument).
  *
  * Since: 4.2
  */
@@ -794,6 +1093,10 @@ gda_thread_wrapper_iterate (GdaThreadWrapper *wrapper, gboolean may_block)
 			signal_spec_unref (spec);
 			do_again = TRUE;
 		}
+		else if (job->type == JOB_TYPE_NOTIFICATION_ERROR) {
+			job_free (job);
+			clean_notifications (wrapper, td);
+		}
 		else
 			g_assert_not_reached ();
 
@@ -944,7 +1247,8 @@ worker_thread_closure_marshal (GClosure *closure,
 	if (g_thread_self () !=  sigspec->worker_thread)
 		return;
 
-	/* check that the worker thread is working on a job for which job->reply_queue == sigspec->reply_queue */	if (sigspec->private &&
+	/* check that the worker thread is working on a job for which job->reply_queue == sigspec->reply_queue */
+	if (sigspec->private &&
 	    g_static_private_get (&worker_thread_current_queue) != sigspec->reply_queue)
 		return;
 
@@ -973,6 +1277,11 @@ worker_thread_closure_marshal (GClosure *closure,
 	}
 
 	g_async_queue_push (sigspec->reply_queue, job);
+	if (! write_notification (NULL, NULL, sigspec->notif, GDA_THREAD_NOTIFICATION_SIGNAL, 0)) {
+		Job *je = g_new0 (Job, 1);
+		je->type = JOB_TYPE_NOTIFICATION_ERROR;
+		g_async_queue_push (sigspec->reply_queue, je);
+	}
 	signal_spec_unlock (sigspec);
 }
 
@@ -1001,6 +1310,7 @@ worker_thread_closure_marshal_anythread (GClosure *closure,
 	job->u.signal.spec = signal_spec_ref (sigspec);
 	job->u.signal.n_param_values = n_param_values - 1;
 	job->u.signal.param_values = g_new0 (GValue, job->u.signal.n_param_values);
+	job->notif = NULL;
 	for (i = 1; i < n_param_values; i++) {
 		const GValue *src;
 		GValue *dest;
@@ -1015,6 +1325,11 @@ worker_thread_closure_marshal_anythread (GClosure *closure,
 	}
 
 	g_async_queue_push (sigspec->reply_queue, job);
+	if (! write_notification (NULL, NULL, sigspec->notif, GDA_THREAD_NOTIFICATION_SIGNAL, 0)) {
+		Job *je = g_new0 (Job, 1);
+		je->type = JOB_TYPE_NOTIFICATION_ERROR;
+		g_async_queue_push (sigspec->reply_queue, je);
+	}
 	signal_spec_unlock (sigspec);
 }
 
@@ -1047,7 +1362,7 @@ worker_thread_closure_marshal_anythread (GClosure *closure,
  * </itemizedlist>
  *
  * Also note that signal handling is done asynchronously: when emitted in the worker thread, it
- * will be "queued" to be processed in the user thread when it has the chance (when gda_thread_wrapper()
+ * will be "queued" to be processed in the user thread when it has the chance (when gda_thread_wrapper_iterate()
  * is called directly or indirectly). The side effect is that the callback function is usually
  * called long after the object emitting the signal has finished emitting it.
  *
@@ -1095,6 +1410,7 @@ gda_thread_wrapper_connect_raw (GdaThreadWrapper *wrapper,
 	}
 	sigspec->worker_thread = wrapper->priv->worker_thread;
 	sigspec->reply_queue = g_async_queue_ref (td->from_worker_thread);
+	sigspec->notif = pipe_ref (td->notif);
         sigspec->instance = instance;
         sigspec->callback = callback;
         sigspec->data = data;
diff --git a/libgda/thread-wrapper/gda-thread-wrapper.h b/libgda/thread-wrapper/gda-thread-wrapper.h
index 7b98312..ba5a5af 100644
--- a/libgda/thread-wrapper/gda-thread-wrapper.h
+++ b/libgda/thread-wrapper/gda-thread-wrapper.h
@@ -62,14 +62,152 @@ struct _GdaThreadWrapperClass {
 	void (*_gda_reserved4) (void);
 };
 
+/**
+ * GdaThreadNotificationType:
+ * @GDA_THREAD_NOTIFICATION_SIGNAL: the notification regards a signal
+ * @GDA_THREAD_NOTIFICATION_JOB: the notification regards a job finished
+ *
+ * Defines the kind of notification which can be obtained when reading from te #GIOChannel
+ * returned by gda_thread_wrapper_get_io_channel().
+ */
+typedef enum {
+	GDA_THREAD_NOTIFICATION_JOB    = 0x01,
+	GDA_THREAD_NOTIFICATION_SIGNAL = 0x02
+} GdaThreadNotificationType;
+
+/**
+ * GdaThreadNotification:
+ * @type: the notification type
+ * @job_id: the job ID, if @type is a #GDA_THREAD_NOTIFICATION_JOB
+ *
+ * A notification to be read through the #GIOChannel which is returned by gda_thread_wrapper_get_io_channel(),
+ * for example:
+ * <programlisting><![CDATA[
+ * gboolean
+ * wrapper_ioc_cb (GIOChannel *source, GIOCondition condition, gpointer data)
+ * {
+ *     GIOStatus status;
+ *     gsize nread;
+ *     GdaThreadNotification notif;
+ *     if (condition & G_IO_IN) {
+ *	   status = g_io_channel_read_chars (source, (gchar*) &notif, sizeof (notif), &nread, NULL);
+ *         if ((status != G_IO_STATUS_NORMAL) || (nread != sizeof (notif)))
+ *             goto onerror;
+ *	   switch (notif.type) {
+ *	   case GDA_THREAD_NOTIFICATION_JOB:
+ *             check_for_wrapper_result (bcnc);
+ *             break;
+ *         case GDA_THREAD_NOTIFICATION_SIGNAL:
+ *             gda_thread_wrapper_iterate (bcnc->priv->wrapper, FALSE);
+ *             break;
+ *         default:
+ *             goto onerror;
+ *             break;
+ *	   }
+ *   }
+ *   if (condition & (G_IO_ERR | G_IO_HUP | G_IO_NVAL))
+ *             goto onerror;
+ *   return TRUE; // keep callback
+ *
+ * onerror:
+ *   g_io_channel_shutdown (bcnc->priv->ioc, FALSE, NULL);
+ *   return FALSE; // removed callback
+ * }
+ *
+ * {
+ * [...]
+ *     GIOChannel *ioc;
+ *     ioc = gda_thread_wrapper_get_io_channel (wrapper);
+ *     if (!ioc)
+ *         [handle error]
+ *     else {
+ *         guint watch_id;
+ *         watch_id = g_io_add_watch (ioc, G_IO_IN | G_IO_HUP | G_IO_ERR | G_IO_NVAL,
+ *                                    (GIOFunc) wrapper_ioc_cb, NULL);
+ *     }
+ * }
+ * ]]></programlisting>
+ */
+typedef struct {
+	GdaThreadNotificationType type;
+	guint                     job_id;
+} GdaThreadNotification;
+
+/**
+ * GdaThreadWrapperFunc:
+ * @arg: pointer to the data (which is the @arg argument passed to gda_thread_wrapper_execute_void())
+ * @error: a place to store errors
+ * @Returns: a pointer to some data which will be returned by gda_thread_wrapper_fetch_result()
+ *
+ * Specifies the type of function to be passed to gda_thread_wrapper_execute().
+ */
 typedef gpointer (*GdaThreadWrapperFunc) (gpointer arg, GError **error);
 typedef void (*GdaThreadWrapperVoidFunc) (gpointer arg, GError **error);
 typedef void (*GdaThreadWrapperCallback) (GdaThreadWrapper *wrapper, gpointer instance, const gchar *signame,
 					  gint n_param_values, const GValue *param_values, gpointer gda_reserved,
 					  gpointer data);
 
+/**
+ * SECTION:gda-thread-wrapper
+ * @short_description: Execute functions in a sub thread
+ * @title: GdaThreadWrapper
+ * @stability: Stable
+ * @see_also:
+ *
+ * The purpose of the #GdaThreadWrapper object is to execute functions in an isolated sub thread. As the
+ * #GdaThreadWrapper is thread safe, one is able to isolate some code's execution is a <emphasis>private</emphasis>
+ * <emphasis>worker</emphasis> thread, and make a non thread safe code thread safe.
+ *
+ * The downside of this is that the actual execution of the code will be slower as it requires
+ * threads to be synchronized.
+ *
+ * The #GdaThreadWrapper implements its own locking mechanism and can safely be used from multiple
+ * threads at once without needing further locking.
+ *
+ * Each thread using a #GdaThreadWrapper object can use it as if it was the only user: the #GdaThreadWrapper will
+ * simply dispatch all the execution requests to its private <emphasis>worker</emphasis> thread and report the
+ * execution's status only to the thread which made the request.
+ *
+ * The user can also specify a callback function to be called when an object exmits a signal while being
+ * used by the worker thread, see the gda_thread_wrapper_connect_raw() method.
+ *
+ * The following diagram illustrates the conceptual working of the #GdaThreadWrapper object: here two user threads
+ * are represented (assigned a red and green colors), both using a single #GdaThreadWrapper, so in this diagram, 3 threads
+ * are present. The communication between the threads are handled by some #GAsyncQueue objects (in a transparent way for
+ * the user, presented here only for illustration purposes). The queue represented in yellow is where jobs are
+ * pushed by each user thread (step 1), and popped by the worker thread (step 2). Once the worker thread has finished
+ * with a job, it stores the result along with the job and pushes it to the queue dedicated to the user thread
+ * (step 3) in this example the red queue (because the job was issued from the thread represented in red). The last
+ * step is when the user fetches the result (in its user thread), step 4.
+ *
+ * If, when the worker thread is busy with a job, a signal is emitted, and if the user has set up a signal handler
+ * using gda_thread_wrapper_connect_raw(),
+ * then a "job as signal" is created by the worker thread and pushed to the user thread as illustrated
+ * at the bottom of the diagram.
+ * <mediaobject>
+ *   <imageobject role="html">
+ *     <imagedata fileref="thread-wrapper.png" format="PNG" contentwidth="170mm"/>
+ *   </imageobject>
+ *   <textobject>
+ *     <phrase>GdaThreadWrapper's conceptual working</phrase>
+ *   </textobject>
+ * </mediaobject>
+ *
+ * It's the user's responsability to regularly check if a submitted job has completed
+ * (using gda_thread_wrapper_fetch_result()) or if a signal
+ * has been emitted by an object while in the worker thread and needs to be handled by the #GdaThreadWrapper
+ * to call the associated callback (using gda_thread_wrapper_iterate() or automatically done when
+ * calling gda_thread_wrapper_fetch_result()).
+ *
+ * However, when a main loop is available, the #GdaThreadWrapper can emit notifications through a
+ * #GIOChannel which can be intregated in the main loop. The #GIOChannel can be created
+ * using gda_thread_wrapper_get_io_channel().
+ */
+
 GType                  gda_thread_wrapper_get_type          (void) G_GNUC_CONST;
 GdaThreadWrapper      *gda_thread_wrapper_new               (void);
+GIOChannel            *gda_thread_wrapper_get_io_channel    (GdaThreadWrapper *wrapper);
+void                   gda_thread_wrapper_unset_io_channel  (GdaThreadWrapper *wrapper);
 
 guint                  gda_thread_wrapper_execute           (GdaThreadWrapper *wrapper, GdaThreadWrapperFunc func,
 							     gpointer arg, GDestroyNotify arg_destroy_func, GError **error);
diff --git a/tools/browser/browser-connection-priv.h b/tools/browser/browser-connection-priv.h
index ce1e38f..84e1d8f 100644
--- a/tools/browser/browser-connection-priv.h
+++ b/tools/browser/browser-connection-priv.h
@@ -1,5 +1,5 @@
 /*
- * Copyright (C) 2009 - 2010 Vivien Malerba
+ * Copyright (C) 2009 - 2011 Vivien Malerba
  *
  * This Library is free software; you can redistribute it and/or
  * modify it under the terms of the GNU Library General Public License as
@@ -24,6 +24,8 @@
 
 struct _BrowserConnectionPrivate {
 	GdaThreadWrapper *wrapper;
+	GIOChannel       *ioc;
+	guint             ioc_watch_id;
 	GSList           *wrapper_jobs;
 	guint             wrapper_results_timer;
 	gboolean          long_timer;
diff --git a/tools/browser/browser-connection.c b/tools/browser/browser-connection.c
index 4a3fd6f..df4c999 100644
--- a/tools/browser/browser-connection.c
+++ b/tools/browser/browser-connection.c
@@ -92,28 +92,45 @@ wrapper_job_free (WrapperJob *wj)
 }
 
 /*
+ * Returns: %TRUE if current timer should be removed
+ */
+static gboolean
+setup_results_timer (BrowserConnection *bcnc)
+{
+	gboolean short_timer = TRUE;
+
+	if (bcnc->priv->ioc_watch_id != 0)
+		return FALSE; /* nothing to do, we use notifications */
+
+	bcnc->priv->nb_no_job_waits ++;
+	if (bcnc->priv->nb_no_job_waits > 100)
+		short_timer = FALSE;
+
+	if ((bcnc->priv->wrapper_results_timer > 0) &&
+	    (bcnc->priv->long_timer != short_timer))
+		return FALSE; /* nothing to do, timer already correctlyset up */
+
+	/* switch to a short/long timer to check for results */
+	if (bcnc->priv->long_timer == short_timer)
+		g_source_remove (bcnc->priv->wrapper_results_timer);
+
+	bcnc->priv->long_timer = !short_timer;
+	bcnc->priv->wrapper_results_timer = g_timeout_add (short_timer ? CHECK_RESULTS_SHORT_TIMER : CHECK_RESULTS_LONG_TIMER,
+							   (GSourceFunc) check_for_wrapper_result,
+							   bcnc);
+	bcnc->priv->nb_no_job_waits = 0;
+	return TRUE;
+}
+
+/*
  * Pushes a job which has been asked to be exected in a sub thread using gda_thread_wrapper_execute()
  */
 static void
 push_wrapper_job (BrowserConnection *bcnc, guint job_id, JobType job_type, const gchar *reason,
 		  BrowserConnectionJobCallback callback, gpointer cb_data)
 {
-	/* setup timer */
-	if (bcnc->priv->wrapper_results_timer == 0) {
-		bcnc->priv->long_timer = FALSE;
-		bcnc->priv->wrapper_results_timer = g_timeout_add (CHECK_RESULTS_SHORT_TIMER,
-								   (GSourceFunc) check_for_wrapper_result,
-								   bcnc);
-	}
-	else if (bcnc->priv->long_timer) {
-		/* switch to a short timer to check for results */
-		g_source_remove (bcnc->priv->wrapper_results_timer);
-		bcnc->priv->wrapper_results_timer = g_timeout_add (CHECK_RESULTS_SHORT_TIMER,
-								   (GSourceFunc) check_for_wrapper_result,
-								   bcnc);
-		bcnc->priv->long_timer = FALSE;
-		bcnc->priv->nb_no_job_waits = 0;
-	}
+	/* handle timers if necessary */
+	setup_results_timer (bcnc);
 
 	/* add WrapperJob structure */
 	WrapperJob *wj;
@@ -257,12 +274,73 @@ browser_connection_class_init (BrowserConnectionClass *klass)
 	object_class->dispose = browser_connection_dispose;
 }
 
+static gboolean
+wrapper_ioc_cb (GIOChannel *source, GIOCondition condition, BrowserConnection *bcnc)
+{
+	GIOStatus status;
+	gsize nread;
+	GdaThreadNotification notif;
+
+	g_assert (source == bcnc->priv->ioc);
+//#define DEBUG_POLLING_SWITCH
+#ifdef DEBUG_POLLING_SWITCH
+	static guint c = 0;
+	c++;
+	if (c == 4)
+		goto onerror;
+#endif
+	if (condition & G_IO_IN) {
+		status = g_io_channel_read_chars (bcnc->priv->ioc, (gchar*) &notif, sizeof (notif),
+						  &nread, NULL);
+		if ((status != G_IO_STATUS_NORMAL) || (nread != sizeof (notif)))
+			goto onerror;
+
+		switch (notif.type) {
+		case GDA_THREAD_NOTIFICATION_JOB:
+			check_for_wrapper_result (bcnc);
+			break;
+		case GDA_THREAD_NOTIFICATION_SIGNAL:
+			gda_thread_wrapper_iterate (bcnc->priv->wrapper, FALSE);
+			break;
+		default:
+			/* an error occurred somewhere */
+			goto onerror;
+		}
+	}
+	if (condition & (G_IO_ERR | G_IO_HUP | G_IO_NVAL))
+		goto onerror;
+
+	return TRUE; /* keep callback */
+
+ onerror:
+#ifdef GDA_DEBUG
+	g_print ("Switching to polling instead of notifications...\n");
+#endif
+	g_source_remove (bcnc->priv->ioc_watch_id);
+	bcnc->priv->ioc_watch_id = 0;
+	g_io_channel_shutdown (bcnc->priv->ioc, FALSE, NULL);
+	g_io_channel_unref (bcnc->priv->ioc);
+	bcnc->priv->ioc = NULL;
+
+	setup_results_timer (bcnc);
+	return FALSE; /* remove callback */
+}
+
 static void
 browser_connection_init (BrowserConnection *bcnc)
 {
 	static guint index = 1;
 	bcnc->priv = g_new0 (BrowserConnectionPrivate, 1);
 	bcnc->priv->wrapper = gda_thread_wrapper_new ();
+	bcnc->priv->ioc = gda_thread_wrapper_get_io_channel (bcnc->priv->wrapper);
+	if (bcnc->priv->ioc) {
+		g_io_channel_ref (bcnc->priv->ioc);
+		bcnc->priv->ioc_watch_id = g_io_add_watch (bcnc->priv->ioc,
+							   G_IO_IN | G_IO_HUP | G_IO_ERR | G_IO_NVAL,
+							   (GIOFunc) wrapper_ioc_cb, bcnc);
+	}
+	else
+		bcnc->priv->ioc_watch_id = 0;
 	bcnc->priv->wrapper_jobs = NULL;
 	bcnc->priv->wrapper_results_timer = 0;
 	bcnc->priv->long_timer = FALSE;
@@ -615,6 +693,16 @@ browser_connection_dispose (GObject *object)
 		}
 		browser_connection_set_busy_state (bcnc, FALSE, NULL);
 
+		if (bcnc->priv->ioc_watch_id > 0) {
+			g_source_remove (bcnc->priv->ioc_watch_id);
+			bcnc->priv->ioc_watch_id = 0;
+		}
+
+		if (bcnc->priv->ioc) {
+			g_io_channel_unref (bcnc->priv->ioc);
+			bcnc->priv->ioc = NULL;
+		}
+
 		g_free (bcnc->priv);
 		bcnc->priv = NULL;
 		/*g_print ("Disposed BrowserConnection %p\n", bcnc);*/
@@ -632,31 +720,10 @@ check_for_wrapper_result (BrowserConnection *bcnc)
 	WrapperJob *wj;
 	gboolean retval = TRUE; /* return FALSE to interrupt current timer */
 
+	retval = !setup_results_timer (bcnc);
 	if (!bcnc->priv->wrapper_jobs) {
 		gda_thread_wrapper_iterate (bcnc->priv->wrapper, FALSE);
-		if (! bcnc->priv->long_timer) {
-			if (bcnc->priv->nb_no_job_waits > 100) {
-				/* switch to a long timer to check for results */
-				bcnc->priv->wrapper_results_timer = g_timeout_add_seconds (CHECK_RESULTS_LONG_TIMER,
-											   (GSourceFunc) check_for_wrapper_result,
-											   bcnc);
-				bcnc->priv->nb_no_job_waits = 0;
-				bcnc->priv->long_timer = TRUE;
-				return FALSE;
-			}
-			else
-				bcnc->priv->nb_no_job_waits ++;
-		}
-		return TRUE;
-	}
-	else if (bcnc->priv->long_timer) {
-		/* switch to a short timer to check for results */
-		bcnc->priv->wrapper_results_timer = g_timeout_add (CHECK_RESULTS_SHORT_TIMER,
-								   (GSourceFunc) check_for_wrapper_result,
-								   bcnc);
-		retval = FALSE;
-		bcnc->priv->long_timer = FALSE;
-		bcnc->priv->nb_no_job_waits = 0;
+		return retval;
 	}
 
 	wj = (WrapperJob*) bcnc->priv->wrapper_jobs->data;



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]