[libgda] Added notification for GdaThreadWrapper using a GIOChannel
- From: Vivien Malerba <vivien src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [libgda] Added notification for GdaThreadWrapper using a GIOChannel
- Date: Mon, 6 Jun 2011 18:46:25 +0000 (UTC)
commit dca910affc47faeb4c9e4e0b381b931ff0a21ced
Author: Vivien Malerba <malerba gnome-db org>
Date: Mon Jun 6 20:39:56 2011 +0200
Added notification for GdaThreadWrapper using a GIOChannel
to easily integrate into a main loop
doc/C/libgda-sections.txt | 4 +
libgda/libgda.symbols | 2 +
libgda/thread-wrapper/gda-thread-wrapper.c | 336 +++++++++++++++++++++++++++-
libgda/thread-wrapper/gda-thread-wrapper.h | 85 +++++++-
tools/browser/browser-connection-priv.h | 4 +-
tools/browser/browser-connection.c | 145 +++++++++----
6 files changed, 525 insertions(+), 51 deletions(-)
---
diff --git a/doc/C/libgda-sections.txt b/doc/C/libgda-sections.txt
index 8be9a5b..179f523 100644
--- a/doc/C/libgda-sections.txt
+++ b/doc/C/libgda-sections.txt
@@ -1815,6 +1815,10 @@ gda_sql_builder_get_type
<INCLUDE>libgda/thread-wrapper/gda-thread-wrapper.h</INCLUDE>
GdaThreadWrapper
gda_thread_wrapper_new
+GdaThreadNotification
+GdaThreadNotificationType
+gda_thread_wrapper_get_io_channel
+gda_thread_wrapper_unset_io_channel
GdaThreadWrapperFunc
gda_thread_wrapper_execute
GdaThreadWrapperVoidFunc
diff --git a/libgda/libgda.symbols b/libgda/libgda.symbols
index 9eeead1..97ef362 100644
--- a/libgda/libgda.symbols
+++ b/libgda/libgda.symbols
@@ -807,11 +807,13 @@
gda_thread_wrapper_execute
gda_thread_wrapper_execute_void
gda_thread_wrapper_fetch_result
+ gda_thread_wrapper_get_io_channel
gda_thread_wrapper_get_type
gda_thread_wrapper_get_waiting_size
gda_thread_wrapper_iterate
gda_thread_wrapper_new
gda_thread_wrapper_steal_signal
+ gda_thread_wrapper_unset_io_channel
gda_time_copy
gda_time_free
gda_time_get_type
diff --git a/libgda/thread-wrapper/gda-thread-wrapper.c b/libgda/thread-wrapper/gda-thread-wrapper.c
index eb4ba48..f319187 100644
--- a/libgda/thread-wrapper/gda-thread-wrapper.c
+++ b/libgda/thread-wrapper/gda-thread-wrapper.c
@@ -20,6 +20,8 @@
* Boston, MA 02111-1307, USA.
*/
+//#define DEBUG_NOTIFICATION
+
#include <string.h>
#include <glib/gi18n-lib.h>
#include "gda-thread-wrapper.h"
@@ -27,6 +29,8 @@
#include <gobject/gvaluecollector.h>
#include <libgda/gda-debug-macros.h>
#include <libgda/gda-value.h>
+#include <unistd.h>
+#include <sys/stat.h>
/* this GPrivate holds a pointer to the GAsyncQueue used by the job being currently treated
* by the worker thread. It is used to avoid creating signal data for threads for which
@@ -37,6 +41,7 @@ GStaticPrivate worker_thread_current_queue = G_STATIC_PRIVATE_INIT;
typedef struct _ThreadData ThreadData;
typedef struct _Job Job;
typedef struct _SignalSpec SignalSpec;
+typedef struct _Pipe Pipe;
struct _GdaThreadWrapperPrivate {
GdaMutex *mutex;
@@ -45,8 +50,106 @@ struct _GdaThreadWrapperPrivate {
GAsyncQueue *to_worker_thread;
GHashTable *threads_hash; /* key = a GThread, value = a #ThreadData pointer */
+ GHashTable *pipes_hash; /* key = a GThread, value = a #Pipe pointer */
+};
+
+/*
+ * Threads synchronization with notifications
+ */
+struct _Pipe {
+ GThread *thread;
+ int fds[2]; /* [0] for reading and [1] for writing */
+ GIOChannel *ioc;
+
+ GMutex *mutex; /* locks @ref_count */
+ guint ref_count;
};
+#define pipe_lock(x) g_mutex_lock(((Pipe*)x)->mutex);
+#define pipe_unlock(x) g_mutex_unlock(((Pipe*)x)->mutex);
+
+static Pipe *
+pipe_ref (Pipe *p)
+{
+ if (p) {
+ pipe_lock (p);
+ p->ref_count++;
+#ifdef DEBUG_NOTIFICATION
+ g_print ("Pipe %p ++: %u\n", p, p->ref_count);
+#endif
+ pipe_unlock (p);
+ }
+ return p;
+}
+
+static void
+pipe_unref (Pipe *p)
+{
+ if (p) {
+ pipe_lock (p);
+ p->ref_count--;
+#ifdef DEBUG_NOTIFICATION
+ g_print ("Pipe %p --: %u\n", p, p->ref_count);
+#endif
+ pipe_unlock (p);
+ if (p->ref_count == 0) {
+ /* destroy @p */
+ if (p->ioc)
+ g_io_channel_unref (p->ioc);
+ if (p->fds[0] >= 0)
+ close (p->fds[0]);
+ if (p->fds[1] >= 0)
+ close (p->fds[1]);
+ g_mutex_free (p->mutex);
+#ifdef DEBUG_NOTIFICATION
+ g_print ("Destroyed Pipe %p\n", p);
+#endif
+ g_free (p);
+ }
+ }
+}
+
+/*
+ * May return %NULL
+ */
+static Pipe *
+pipe_new (void)
+{
+ Pipe *p = g_new0 (Pipe, 1);
+ p->mutex = g_mutex_new ();
+ p->ref_count = 1;
+ p->thread = g_thread_self ();
+ if (pipe (p->fds) != 0) {
+ pipe_unref (p);
+ return NULL;
+ }
+#ifdef G_OS_WIN32
+ p->ioc = g_io_channel_win32_new_fd (p->fds [0]);
+#else
+ p->ioc = g_io_channel_unix_new (p->fds [0]);
+#endif
+
+ /* we want raw data */
+ if (g_io_channel_set_encoding (p->ioc, NULL, NULL) != G_IO_STATUS_NORMAL) {
+ g_warning ("Can't set IO encoding to NULL\n");
+ pipe_unref (p);
+ return NULL;
+ }
+#ifdef DEBUG_NOTIFICATION
+ g_print ("Created Pipe %p\n", p);
+#endif
+ return p;
+}
+
+static Pipe *
+get_pipe (GdaThreadWrapper *wrapper, GThread *thread)
+{
+ if (wrapper->priv->pipes_hash)
+ return g_hash_table_lookup (wrapper->priv->pipes_hash, thread);
+ else
+ return NULL;
+}
+
/*
* One instance for each job to execute (and its result) and
* one instance for each emitted signal
@@ -57,9 +160,10 @@ struct _GdaThreadWrapperPrivate {
* Passed to the sub job through obj->to_worker_thread
*/
typedef enum {
- JOB_TYPE_EXECUTE,
- JOB_TYPE_DESTROY,
- JOB_TYPE_SIGNAL
+ JOB_TYPE_EXECUTE, /* a "real" job for the GdaThreadWrapper */
+ JOB_TYPE_DESTROY, /* internal to signal the internal thread to shutdown */
+ JOB_TYPE_SIGNAL, /* a signal from an object in the internal thread */
+ JOB_TYPE_NOTIFICATION_ERROR /* internal to signal notification error and sutdown */
} JobType;
struct _Job {
JobType type;
@@ -71,6 +175,7 @@ struct _Job {
gpointer arg;
GDestroyNotify arg_destroy_func;
GAsyncQueue *reply_queue; /* holds a ref to it */
+ Pipe *notif; /* if not %NULL, notification when job finished */
/* result part */
union {
@@ -89,6 +194,7 @@ struct _Job {
static void
job_free (Job *job)
{
+ pipe_unref (job->notif);
if (job->arg && job->arg_destroy_func)
job->arg_destroy_func (job->arg);
if (job->reply_queue)
@@ -110,6 +216,9 @@ job_free (Job *job)
else if (job->type == JOB_TYPE_DESTROY) {
/* nothing to do here */
}
+ else if (job->type == JOB_TYPE_NOTIFICATION_ERROR) {
+ /* nothing to do here */
+ }
else
g_assert_not_reached ();
g_free (job);
@@ -126,6 +235,7 @@ struct _SignalSpec {
gboolean private;
GThread *worker_thread;
GAsyncQueue *reply_queue; /* a ref is held here */
+ Pipe *notif; /* if not %NULL, notification */
gpointer instance;
gulong signal_id;
@@ -154,6 +264,7 @@ signal_spec_unref (SignalSpec *sigspec)
g_signal_handler_disconnect (sigspec->instance, sigspec->signal_id);
if (sigspec->reply_queue)
g_async_queue_unref (sigspec->reply_queue);
+ pipe_unref (sigspec->notif);
g_free (sigspec);
}
else
@@ -182,6 +293,8 @@ struct _ThreadData {
GSList *jobs; /* list of Job pointers not yet handled, or being handled (ie not yet poped from @from_worker_thread) */
GSList *results; /* list of Job pointers to completed jobs (ie. poped from @from_worker_thread) */
+
+ Pipe *notif; /* if not %NULL, notification when any job has finished */
};
#define THREAD_DATA(x) ((ThreadData*)(x))
@@ -193,12 +306,15 @@ get_thread_data (GdaThreadWrapper *wrapper, GThread *thread)
gda_mutex_lock (wrapper->priv->mutex);
td = g_hash_table_lookup (wrapper->priv->threads_hash, thread);
if (!td) {
+ Pipe *p;
+ p = get_pipe (wrapper, thread);
+
td = g_new0 (ThreadData, 1);
td->owner = thread;
td->from_worker_thread = g_async_queue_new_full ((GDestroyNotify) job_free);
td->jobs = NULL;
td->results = NULL;
-
+ td->notif = pipe_ref (p);
g_hash_table_insert (wrapper->priv->threads_hash, thread, td);
}
gda_mutex_unlock (wrapper->priv->mutex);
@@ -208,6 +324,7 @@ get_thread_data (GdaThreadWrapper *wrapper, GThread *thread)
static void
thread_data_free (ThreadData *td)
{
+ pipe_unref (td->notif);
g_async_queue_unref (td->from_worker_thread);
td->from_worker_thread = NULL;
g_assert (!td->jobs);
@@ -274,6 +391,79 @@ gda_thread_wrapper_class_init (GdaThreadWrapperClass *klass)
object_class->dispose = gda_thread_wrapper_dispose;
}
+static void
+clean_notifications (GdaThreadWrapper *wrapper, ThreadData *td)
+{
+#ifdef DEBUG_NOTIFICATION
+ g_print ("%s(Pipe:%p)\n", __FUNCTION__, td->notif);
+#endif
+ GSList *list;
+ for (list = td->signals_list; list; list = list->next) {
+ SignalSpec *sigspec;
+ sigspec = (SignalSpec*) list->data;
+ signal_spec_lock (sigspec);
+ if (sigspec->notif == td->notif) {
+ pipe_unref (sigspec->notif);
+ sigspec->notif = NULL;
+ }
+ signal_spec_unlock (sigspec);
+ }
+
+ pipe_unref (td->notif);
+ td->notif = NULL;
+
+ g_hash_table_remove (wrapper->priv->pipes_hash, td->owner);
+}
+
+/*
+ * @wrapper: may be %NULL
+ * @td: may be %NULL
+ *
+ * Either @wrapper and @td are both NULL, or they are both NOT NULL
+ */
+static gboolean
+write_notification (GdaThreadWrapper *wrapper, ThreadData *td,
+ Pipe *p, GdaThreadNotificationType type, guint job_id)
+{
+ g_assert ((wrapper && td) || (!wrapper && !td));
+
+ if (!p || (p->fds[1] < 0))
+ return TRUE;
+#ifdef DEBUG_NOTIFICATION_FORCE
+ static guint c = 0;
+ c++;
+ if (c == 4)
+ goto onerror;
+#endif
+
+ GdaThreadNotification notif;
+ ssize_t nw;
+ notif.type = type;
+ notif.job_id = job_id;
+
+ nw = write (p->fds[1], ¬if, sizeof (notif));
+ if (nw != sizeof (notif)) {
+ /* Error */
+ goto onerror;
+ }
+#ifdef DEBUG_NOTIFICATION
+ g_print ("Wrote notification %d.%u to pipe %p\n", type, job_id, p);
+#endif
+ return TRUE;
+
+ onerror:
+#ifdef DEBUG_NOTIFICATION
+ g_print ("%s(): returned FALSE\n", __FUNCTION__);
+ g_print ("Closed FD %d\n", p->fds [1]);
+#endif
+ /* close the writing end of the pipe */
+ close (p->fds [1]);
+ p->fds [1] = -1;
+ if (td)
+ clean_notifications (wrapper, td);
+ return FALSE;
+}
+
/*
* Executed in the sub thread:
* takes a Job in (from the wrapper->priv->to_worker_thread queue) and creates a new Result which
@@ -320,6 +510,12 @@ worker_thread_entry_point (GAsyncQueue *to_worker_thread)
job->void_func (job->arg, &(job->u.exe.error));
}
g_async_queue_push (job->reply_queue, job);
+ if (! write_notification (NULL, NULL, job->notif, GDA_THREAD_NOTIFICATION_JOB,
+ job->job_id)) {
+ Job *je = g_new0 (Job, 1);
+ je->type = JOB_TYPE_NOTIFICATION_ERROR;
+ g_async_queue_push (job->reply_queue, je);
+ }
}
else
g_assert_not_reached ();
@@ -345,6 +541,9 @@ gda_thread_wrapper_init (GdaThreadWrapper *wrapper, G_GNUC_UNUSED GdaThreadWrapp
wrapper->priv->worker_thread = g_thread_create ((GThreadFunc) worker_thread_entry_point,
g_async_queue_ref (wrapper->priv->to_worker_thread), /* inc. ref for sub thread usage */
FALSE, NULL);
+
+ wrapper->priv->pipes_hash = NULL;
+
#ifdef THREAD_WRAPPER_DEBUG
g_print ("... new wrapper %p, worker_thread=%p\n", wrapper, wrapper->priv->worker_thread);
#endif
@@ -391,6 +590,7 @@ gda_thread_wrapper_dispose (GObject *object)
if (wrapper->priv) {
Job *job = g_new0 (Job, 1);
job->type = JOB_TYPE_DESTROY;
+ job->notif = NULL;
g_async_queue_push (wrapper->priv->to_worker_thread, job);
#ifdef THREAD_WRAPPER_DEBUG
g_print ("... pushed JOB_TYPE_DESTROY for wrapper %p\n", wrapper);
@@ -408,6 +608,9 @@ gda_thread_wrapper_dispose (GObject *object)
gda_mutex_free (wrapper->priv->mutex);
+ if (wrapper->priv->pipes_hash)
+ g_hash_table_destroy (wrapper->priv->pipes_hash);
+
g_free (wrapper->priv);
wrapper->priv = NULL;
}
@@ -514,7 +717,93 @@ gda_thread_wrapper_new (void)
}
/**
- * gda_thread_wrapper_execute
+ * gda_thread_wrapper_get_io_channel:
+ * @wrapper: a #GdaThreadWrapper object
+ *
+ * Allow @wrapper to notify when an execution job is finished, by making its exec ID
+ * readable through a new #GIOChannel. This function is useful when the notification needs
+ * to be included into a main loop. This also notifies that signals (emitted by objects in
+ * @wrapper's internal thread) are available.
+ *
+ * The returned #GIOChannel will have something to read everytime an execution job is finished
+ * for an execution job submitted from the calling thread. The user whould read #GdaThreadNotification
+ * structures from the channel and analyse its contents to call gda_thread_wrapper_iterate()
+ * or gda_thread_wrapper_fetch_result().
+ *
+ * Note1: the new communication channel will only be operational for jobs submitted after this
+ * function returns, and for signals which have been connected after this function returns. A safe
+ * practice is to call this function before the @wrapper object has been used.
+ *
+ * Note2: this function will return the same #GIOChannel everytime it's called from the same thread.
+ *
+ * Note3: if the usage of the returned #GIOChannel reveals an error, then g_io_channel_shutdown() and
+ * g_io_channel_unref() should be called on the #GIOChannel to let @wrapper know it should not use
+ * that object anymore.
+ *
+ * Returns: (transfer none): a new #GIOChannel, or %NULL if it could not be created
+ *
+ * Since: 4.2.9
+ */
+GIOChannel *
+gda_thread_wrapper_get_io_channel (GdaThreadWrapper *wrapper)
+{
+ Pipe *p;
+ GThread *th;
+ g_return_val_if_fail (GDA_IS_THREAD_WRAPPER (wrapper), NULL);
+ g_return_val_if_fail (wrapper->priv, NULL);
+
+ th = g_thread_self ();
+ gda_mutex_lock (wrapper->priv->mutex);
+ p = get_pipe (wrapper, th);
+ if (!p) {
+ p = pipe_new ();
+ if (p) {
+ if (! wrapper->priv->pipes_hash)
+ wrapper->priv->pipes_hash = g_hash_table_new_full (g_direct_hash,
+ g_direct_equal,
+ NULL,
+ (GDestroyNotify) pipe_unref);
+ g_hash_table_insert (wrapper->priv->pipes_hash, th, p);
+ }
+ }
+ gda_mutex_unlock (wrapper->priv->mutex);
+ if (p)
+ return p->ioc;
+ else
+ return NULL;
+}
+
+/**
+ * gda_thread_wrapper_unset_io_channel:
+ * @wrapper: a #GdaThreadWrapper
+ *
+ * Does the opposite of gda_thread_wrapper_get_io_channel()
+ *
+ * Since: 4.2.9
+ */
+void
+gda_thread_wrapper_unset_io_channel (GdaThreadWrapper *wrapper)
+{
+ GThread *th;
+ ThreadData *td;
+
+ g_return_if_fail (GDA_IS_THREAD_WRAPPER (wrapper));
+ g_return_if_fail (wrapper->priv);
+
+ gda_mutex_lock (wrapper->priv->mutex);
+ th = g_thread_self ();
+ td = g_hash_table_lookup (wrapper->priv->threads_hash, th);
+ if (td) {
+ Pipe *p;
+ p = get_pipe (wrapper, th);
+ if (p)
+ clean_notifications (wrapper, td);
+ }
+ gda_mutex_unlock (wrapper->priv->mutex);
+}
+
+/**
+ * gda_thread_wrapper_execute:
* @wrapper: a #GdaThreadWrapper object
* @func: the function to execute
* @arg: argument to pass to @func
@@ -562,6 +851,7 @@ gda_thread_wrapper_execute (GdaThreadWrapper *wrapper, GdaThreadWrapperFunc func
job->arg = arg;
job->arg_destroy_func = arg_destroy_func;
job->reply_queue = g_async_queue_ref (td->from_worker_thread);
+ job->notif = pipe_ref (td->notif);
id = job->job_id;
#ifdef THREAD_WRAPPER_DEBUG
@@ -582,6 +872,7 @@ gda_thread_wrapper_execute (GdaThreadWrapper *wrapper, GdaThreadWrapperFunc func
g_print ("... IMMEDIATELY done job %d => %p\n", job->job_id, job->u.exe.result);
#endif
g_async_queue_push (job->reply_queue, job);
+ write_notification (wrapper, td, job->notif, GDA_THREAD_NOTIFICATION_JOB, job->job_id);
}
else
g_async_queue_push (wrapper->priv->to_worker_thread, job);
@@ -640,6 +931,7 @@ gda_thread_wrapper_execute_void (GdaThreadWrapper *wrapper, GdaThreadWrapperVoid
job->arg = arg;
job->arg_destroy_func = arg_destroy_func;
job->reply_queue = g_async_queue_ref (td->from_worker_thread);
+ job->notif = pipe_ref (td->notif);
id = job->job_id;
#ifdef THREAD_WRAPPER_DEBUG
@@ -660,6 +952,7 @@ gda_thread_wrapper_execute_void (GdaThreadWrapper *wrapper, GdaThreadWrapperVoid
g_print ("... IMMEDIATELY done VOID job %d => %p\n", job->job_id, job->u.exe.result);
#endif
g_async_queue_push (job->reply_queue, job);
+ write_notification (wrapper, td, job->notif, GDA_THREAD_NOTIFICATION_JOB, job->job_id);
}
else
g_async_queue_push (wrapper->priv->to_worker_thread, job);
@@ -732,9 +1025,15 @@ gda_thread_wrapper_cancel (GdaThreadWrapper *wrapper, guint id)
* @may_block: whether the call may block
*
* This method gives @wrapper a chance to check if some functions to be executed have finished
- * <emphasis>for the calling thread</emphasis>. It handles one function's execution result, and
- * if @may_block is %TRUE, then it will block untill there is one (functions returning void are
- * ignored).
+ * <emphasis>for the calling thread</emphasis>. In this case it handles the execution result and
+ * makes it ready to be processed using gda_thread_wrapper_fetch_result().
+ *
+ * This method also allows @wrapper to handle signals which may have been emitted by objects
+ * while in the worker thread, and call the callback function specified when gda_thread_wrapper_connect_raw()
+ * was used.
+ *
+ * If @may_block is %TRUE, then it will block untill there is one finished execution
+ * (functions returning void and signals are ignored regarding this argument).
*
* Since: 4.2
*/
@@ -794,6 +1093,10 @@ gda_thread_wrapper_iterate (GdaThreadWrapper *wrapper, gboolean may_block)
signal_spec_unref (spec);
do_again = TRUE;
}
+ else if (job->type == JOB_TYPE_NOTIFICATION_ERROR) {
+ job_free (job);
+ clean_notifications (wrapper, td);
+ }
else
g_assert_not_reached ();
@@ -944,7 +1247,8 @@ worker_thread_closure_marshal (GClosure *closure,
if (g_thread_self () != sigspec->worker_thread)
return;
- /* check that the worker thread is working on a job for which job->reply_queue == sigspec->reply_queue */ if (sigspec->private &&
+ /* check that the worker thread is working on a job for which job->reply_queue == sigspec->reply_queue */
+ if (sigspec->private &&
g_static_private_get (&worker_thread_current_queue) != sigspec->reply_queue)
return;
@@ -973,6 +1277,11 @@ worker_thread_closure_marshal (GClosure *closure,
}
g_async_queue_push (sigspec->reply_queue, job);
+ if (! write_notification (NULL, NULL, sigspec->notif, GDA_THREAD_NOTIFICATION_SIGNAL, 0)) {
+ Job *je = g_new0 (Job, 1);
+ je->type = JOB_TYPE_NOTIFICATION_ERROR;
+ g_async_queue_push (sigspec->reply_queue, je);
+ }
signal_spec_unlock (sigspec);
}
@@ -1001,6 +1310,7 @@ worker_thread_closure_marshal_anythread (GClosure *closure,
job->u.signal.spec = signal_spec_ref (sigspec);
job->u.signal.n_param_values = n_param_values - 1;
job->u.signal.param_values = g_new0 (GValue, job->u.signal.n_param_values);
+ job->notif = NULL;
for (i = 1; i < n_param_values; i++) {
const GValue *src;
GValue *dest;
@@ -1015,6 +1325,11 @@ worker_thread_closure_marshal_anythread (GClosure *closure,
}
g_async_queue_push (sigspec->reply_queue, job);
+ if (! write_notification (NULL, NULL, sigspec->notif, GDA_THREAD_NOTIFICATION_SIGNAL, 0)) {
+ Job *je = g_new0 (Job, 1);
+ je->type = JOB_TYPE_NOTIFICATION_ERROR;
+ g_async_queue_push (sigspec->reply_queue, je);
+ }
signal_spec_unlock (sigspec);
}
@@ -1047,7 +1362,7 @@ worker_thread_closure_marshal_anythread (GClosure *closure,
* </itemizedlist>
*
* Also note that signal handling is done asynchronously: when emitted in the worker thread, it
- * will be "queued" to be processed in the user thread when it has the chance (when gda_thread_wrapper()
+ * will be "queued" to be processed in the user thread when it has the chance (when gda_thread_wrapper_iterate()
* is called directly or indirectly). The side effect is that the callback function is usually
* called long after the object emitting the signal has finished emitting it.
*
@@ -1095,6 +1410,7 @@ gda_thread_wrapper_connect_raw (GdaThreadWrapper *wrapper,
}
sigspec->worker_thread = wrapper->priv->worker_thread;
sigspec->reply_queue = g_async_queue_ref (td->from_worker_thread);
+ sigspec->notif = pipe_ref (td->notif);
sigspec->instance = instance;
sigspec->callback = callback;
sigspec->data = data;
diff --git a/libgda/thread-wrapper/gda-thread-wrapper.h b/libgda/thread-wrapper/gda-thread-wrapper.h
index 1c02026..a9660e5 100644
--- a/libgda/thread-wrapper/gda-thread-wrapper.h
+++ b/libgda/thread-wrapper/gda-thread-wrapper.h
@@ -63,6 +63,77 @@ struct _GdaThreadWrapperClass {
};
/**
+ * GdaThreadNotificationType:
+ * @GDA_THREAD_NOTIFICATION_SIGNAL: the notification regards a signal
+ * @GDA_THREAD_NOTIFICATION_JOB: the notification regards a job finished
+ *
+ * Defines the kind of notification which can be obtained when reading from te #GIOChannel
+ * returned by gda_thread_wrapper_get_io_channel().
+ */
+typedef enum {
+ GDA_THREAD_NOTIFICATION_JOB = 0x01,
+ GDA_THREAD_NOTIFICATION_SIGNAL = 0x02
+} GdaThreadNotificationType;
+
+/**
+ * GdaThreadNotification:
+ * @type: the notification type
+ * @job_id: the job ID, if @type is a #GDA_THREAD_NOTIFICATION_JOB
+ *
+ * A notification to be read through the #GIOChannel which is returned by gda_thread_wrapper_get_io_channel(),
+ * for example:
+ * <programlisting><![CDATA[
+ * gboolean
+ * wrapper_ioc_cb (GIOChannel *source, GIOCondition condition, gpointer data)
+ * {
+ * GIOStatus status;
+ * gsize nread;
+ * GdaThreadNotification notif;
+ * if (condition & G_IO_IN) {
+ * status = g_io_channel_read_chars (source, (gchar*) ¬if, sizeof (notif), &nread, NULL);
+ * if ((status != G_IO_STATUS_NORMAL) || (nread != sizeof (notif)))
+ * goto onerror;
+ * switch (notif.type) {
+ * case GDA_THREAD_NOTIFICATION_JOB:
+ * check_for_wrapper_result (bcnc);
+ * break;
+ * case GDA_THREAD_NOTIFICATION_SIGNAL:
+ * gda_thread_wrapper_iterate (bcnc->priv->wrapper, FALSE);
+ * break;
+ * default:
+ * goto onerror;
+ * break;
+ * }
+ * }
+ * if (condition & (G_IO_ERR | G_IO_HUP | G_IO_NVAL))
+ * goto onerror;
+ * return TRUE; // keep callback
+ *
+ * onerror:
+ * g_io_channel_shutdown (bcnc->priv->ioc, FALSE, NULL);
+ * return FALSE; // removed callback
+ * }
+ *
+ * {
+ * [...]
+ * GIOChannel *ioc;
+ * ioc = gda_thread_wrapper_get_io_channel (wrapper);
+ * if (!ioc)
+ * [handle error]
+ * else {
+ * guint watch_id;
+ * watch_id = g_io_add_watch (ioc, G_IO_IN | G_IO_HUP | G_IO_ERR | G_IO_NVAL,
+ * (GIOFunc) wrapper_ioc_cb, NULL);
+ * }
+ * }
+ * ]]></programlisting>
+ */
+typedef struct {
+ GdaThreadNotificationType type;
+ guint job_id;
+} GdaThreadNotification;
+
+/**
* GdaThreadWrapperFunc:
* @arg: pointer to the data (which is the @arg argument passed to gda_thread_wrapper_execute_void())
* @error: a place to store errors
@@ -125,7 +196,7 @@ typedef void (*GdaThreadWrapperCallback) (GdaThreadWrapper *wrapper, gpointer in
* are represented (assigned a red and green colors), both using a single #GdaThreadWrapper, so in this diagram, 3 threads
* are present. The communication between the threads are handled by some #GAsyncQueue objects (in a transparent way for
* the user, presented here only for illustration purposes). The queue represented in yellow is where jobs are
- * pushed by each user thread (step 1), and popped by the worker thread (step 2). Once the user thread has finished
+ * pushed by each user thread (step 1), and popped by the worker thread (step 2). Once the worker thread has finished
* with a job, it stores the result along with the job and pushes it to the queue dedicated to the user thread
* (step 3) in this example the red queue (because the job was issued from the thread represented in red). The last
* step is when the user fetches the result (in its user thread), step 4.
@@ -142,10 +213,22 @@ typedef void (*GdaThreadWrapperCallback) (GdaThreadWrapper *wrapper, gpointer in
* <phrase>GdaThreadWrapper's conceptual working</phrase>
* </textobject>
* </mediaobject>
+ *
+ * It's the user's responsability to regularly check if a submitted job has completed
+ * (using gda_thread_wrapper_fetch_result()) or if a signal
+ * has been emitted by an object while in the worker thread and needs to be handled by the #GdaThreadWrapper
+ * to call the associated callback (using gda_thread_wrapper_iterate() or automatically done when
+ * calling gda_thread_wrapper_fetch_result()).
+ *
+ * However, when a main loop is available, the #GdaThreadWrapper can emit notifications through a
+ * #GIOChannel which can be intregated in the main loop. The #GIOChannel can be created
+ * using gda_thread_wrapper_get_io_channel().
*/
GType gda_thread_wrapper_get_type (void) G_GNUC_CONST;
GdaThreadWrapper *gda_thread_wrapper_new (void);
+GIOChannel *gda_thread_wrapper_get_io_channel (GdaThreadWrapper *wrapper);
+void gda_thread_wrapper_unset_io_channel (GdaThreadWrapper *wrapper);
guint gda_thread_wrapper_execute (GdaThreadWrapper *wrapper, GdaThreadWrapperFunc func,
gpointer arg, GDestroyNotify arg_destroy_func, GError **error);
diff --git a/tools/browser/browser-connection-priv.h b/tools/browser/browser-connection-priv.h
index 6380f5b..b1d1c62 100644
--- a/tools/browser/browser-connection-priv.h
+++ b/tools/browser/browser-connection-priv.h
@@ -1,5 +1,5 @@
/*
- * Copyright (C) 2009 - 2010 Vivien Malerba
+ * Copyright (C) 2009 - 2011 Vivien Malerba
*
* This Program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License as
@@ -24,6 +24,8 @@
struct _BrowserConnectionPrivate {
GdaThreadWrapper *wrapper;
+ GIOChannel *ioc;
+ guint ioc_watch_id;
GSList *wrapper_jobs;
guint wrapper_results_timer;
gboolean long_timer;
diff --git a/tools/browser/browser-connection.c b/tools/browser/browser-connection.c
index d693138..a93d353 100644
--- a/tools/browser/browser-connection.c
+++ b/tools/browser/browser-connection.c
@@ -92,28 +92,45 @@ wrapper_job_free (WrapperJob *wj)
}
/*
+ * Returns: %TRUE if current timer should be removed
+ */
+static gboolean
+setup_results_timer (BrowserConnection *bcnc)
+{
+ gboolean short_timer = TRUE;
+
+ if (bcnc->priv->ioc_watch_id != 0)
+ return FALSE; /* nothing to do, we use notifications */
+
+ bcnc->priv->nb_no_job_waits ++;
+ if (bcnc->priv->nb_no_job_waits > 100)
+ short_timer = FALSE;
+
+ if ((bcnc->priv->wrapper_results_timer > 0) &&
+ (bcnc->priv->long_timer != short_timer))
+ return FALSE; /* nothing to do, timer already correctlyset up */
+
+ /* switch to a short/long timer to check for results */
+ if (bcnc->priv->long_timer == short_timer)
+ g_source_remove (bcnc->priv->wrapper_results_timer);
+
+ bcnc->priv->long_timer = !short_timer;
+ bcnc->priv->wrapper_results_timer = g_timeout_add (short_timer ? CHECK_RESULTS_SHORT_TIMER : CHECK_RESULTS_LONG_TIMER,
+ (GSourceFunc) check_for_wrapper_result,
+ bcnc);
+ bcnc->priv->nb_no_job_waits = 0;
+ return TRUE;
+}
+
+/*
* Pushes a job which has been asked to be exected in a sub thread using gda_thread_wrapper_execute()
*/
static void
push_wrapper_job (BrowserConnection *bcnc, guint job_id, JobType job_type, const gchar *reason,
BrowserConnectionJobCallback callback, gpointer cb_data)
{
- /* setup timer */
- if (bcnc->priv->wrapper_results_timer == 0) {
- bcnc->priv->long_timer = FALSE;
- bcnc->priv->wrapper_results_timer = g_timeout_add (CHECK_RESULTS_SHORT_TIMER,
- (GSourceFunc) check_for_wrapper_result,
- bcnc);
- }
- else if (bcnc->priv->long_timer) {
- /* switch to a short timer to check for results */
- g_source_remove (bcnc->priv->wrapper_results_timer);
- bcnc->priv->wrapper_results_timer = g_timeout_add (CHECK_RESULTS_SHORT_TIMER,
- (GSourceFunc) check_for_wrapper_result,
- bcnc);
- bcnc->priv->long_timer = FALSE;
- bcnc->priv->nb_no_job_waits = 0;
- }
+ /* handle timers if necessary */
+ setup_results_timer (bcnc);
/* add WrapperJob structure */
WrapperJob *wj;
@@ -257,12 +274,73 @@ browser_connection_class_init (BrowserConnectionClass *klass)
object_class->dispose = browser_connection_dispose;
}
+static gboolean
+wrapper_ioc_cb (GIOChannel *source, GIOCondition condition, BrowserConnection *bcnc)
+{
+ GIOStatus status;
+ gsize nread;
+ GdaThreadNotification notif;
+
+ g_assert (source == bcnc->priv->ioc);
+//#define DEBUG_POLLING_SWITCH
+#ifdef DEBUG_POLLING_SWITCH
+ static guint c = 0;
+ c++;
+ if (c == 4)
+ goto onerror;
+#endif
+ if (condition & G_IO_IN) {
+ status = g_io_channel_read_chars (bcnc->priv->ioc, (gchar*) ¬if, sizeof (notif),
+ &nread, NULL);
+ if ((status != G_IO_STATUS_NORMAL) || (nread != sizeof (notif)))
+ goto onerror;
+
+ switch (notif.type) {
+ case GDA_THREAD_NOTIFICATION_JOB:
+ check_for_wrapper_result (bcnc);
+ break;
+ case GDA_THREAD_NOTIFICATION_SIGNAL:
+ gda_thread_wrapper_iterate (bcnc->priv->wrapper, FALSE);
+ break;
+ default:
+ /* an error occurred somewhere */
+ goto onerror;
+ }
+ }
+ if (condition & (G_IO_ERR | G_IO_HUP | G_IO_NVAL))
+ goto onerror;
+
+ return TRUE; /* keep callback */
+
+ onerror:
+#ifdef GDA_DEBUG
+ g_print ("Switching to polling instead of notifications...\n");
+#endif
+ g_source_remove (bcnc->priv->ioc_watch_id);
+ bcnc->priv->ioc_watch_id = 0;
+ g_io_channel_shutdown (bcnc->priv->ioc, FALSE, NULL);
+ g_io_channel_unref (bcnc->priv->ioc);
+ bcnc->priv->ioc = NULL;
+
+ setup_results_timer (bcnc);
+ return FALSE; /* remove callback */
+}
+
static void
browser_connection_init (BrowserConnection *bcnc)
{
static guint index = 1;
bcnc->priv = g_new0 (BrowserConnectionPrivate, 1);
bcnc->priv->wrapper = gda_thread_wrapper_new ();
+ bcnc->priv->ioc = gda_thread_wrapper_get_io_channel (bcnc->priv->wrapper);
+ if (bcnc->priv->ioc) {
+ g_io_channel_ref (bcnc->priv->ioc);
+ bcnc->priv->ioc_watch_id = g_io_add_watch (bcnc->priv->ioc,
+ G_IO_IN | G_IO_HUP | G_IO_ERR | G_IO_NVAL,
+ (GIOFunc) wrapper_ioc_cb, bcnc);
+ }
+ else
+ bcnc->priv->ioc_watch_id = 0;
bcnc->priv->wrapper_jobs = NULL;
bcnc->priv->wrapper_results_timer = 0;
bcnc->priv->long_timer = FALSE;
@@ -615,6 +693,16 @@ browser_connection_dispose (GObject *object)
}
browser_connection_set_busy_state (bcnc, FALSE, NULL);
+ if (bcnc->priv->ioc_watch_id > 0) {
+ g_source_remove (bcnc->priv->ioc_watch_id);
+ bcnc->priv->ioc_watch_id = 0;
+ }
+
+ if (bcnc->priv->ioc) {
+ g_io_channel_unref (bcnc->priv->ioc);
+ bcnc->priv->ioc = NULL;
+ }
+
g_free (bcnc->priv);
bcnc->priv = NULL;
/*g_print ("Disposed BrowserConnection %p\n", bcnc);*/
@@ -632,31 +720,10 @@ check_for_wrapper_result (BrowserConnection *bcnc)
WrapperJob *wj;
gboolean retval = TRUE; /* return FALSE to interrupt current timer */
+ retval = !setup_results_timer (bcnc);
if (!bcnc->priv->wrapper_jobs) {
gda_thread_wrapper_iterate (bcnc->priv->wrapper, FALSE);
- if (! bcnc->priv->long_timer) {
- if (bcnc->priv->nb_no_job_waits > 100) {
- /* switch to a long timer to check for results */
- bcnc->priv->wrapper_results_timer = g_timeout_add_seconds (CHECK_RESULTS_LONG_TIMER,
- (GSourceFunc) check_for_wrapper_result,
- bcnc);
- bcnc->priv->nb_no_job_waits = 0;
- bcnc->priv->long_timer = TRUE;
- return FALSE;
- }
- else
- bcnc->priv->nb_no_job_waits ++;
- }
- return TRUE;
- }
- else if (bcnc->priv->long_timer) {
- /* switch to a short timer to check for results */
- bcnc->priv->wrapper_results_timer = g_timeout_add (CHECK_RESULTS_SHORT_TIMER,
- (GSourceFunc) check_for_wrapper_result,
- bcnc);
- retval = FALSE;
- bcnc->priv->long_timer = FALSE;
- bcnc->priv->nb_no_job_waits = 0;
+ return retval;
}
wj = (WrapperJob*) bcnc->priv->wrapper_jobs->data;
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]