[libgda] GdaThreadWrapper: fixed crasher



commit 5da4221d77bd99114054270a0f968fc407d88b3e
Author: Vivien Malerba <malerba gnome-db org>
Date:   Tue Oct 25 20:42:28 2011 +0200

    GdaThreadWrapper: fixed crasher

 libgda/thread-wrapper/gda-thread-wrapper.c |   47 ++++++++++++++++++++-------
 1 files changed, 35 insertions(+), 12 deletions(-)
---
diff --git a/libgda/thread-wrapper/gda-thread-wrapper.c b/libgda/thread-wrapper/gda-thread-wrapper.c
index 97740f8..9b5b170 100644
--- a/libgda/thread-wrapper/gda-thread-wrapper.c
+++ b/libgda/thread-wrapper/gda-thread-wrapper.c
@@ -98,9 +98,10 @@ pipe_unref (Pipe *p)
 #ifdef DEBUG_NOTIFICATION
 		g_print ("Pipe %p --: %u\n", p, p->ref_count);
 #endif
-		pipe_unlock (p);
 		if (p->ref_count == 0) {
 			/* destroy @p */
+			GMutex *m = p->mutex;
+
 			if (p->ioc)
 				g_io_channel_unref (p->ioc);
 #ifdef G_OS_WIN32
@@ -114,12 +115,18 @@ pipe_unref (Pipe *p)
 			if (p->fds[1] >= 0)
 				close (p->fds[1]);
 #endif
-			g_mutex_free (p->mutex);
+
 #ifdef DEBUG_NOTIFICATION
 			g_print ("Destroyed Pipe %p\n", p);
 #endif
 			g_free (p);
+
+			g_mutex_unlock (m);
+			g_mutex_free (m);
+
 		}
+		else
+			pipe_unlock (p);
 	}
 }
 
@@ -167,10 +174,12 @@ pipe_new (void)
 static Pipe *
 get_pipe (GdaThreadWrapper *wrapper, GThread *thread)
 {
+	Pipe *p = NULL;
+	gda_mutex_lock (wrapper->priv->mutex);
 	if (wrapper->priv->pipes_hash)
-		return g_hash_table_lookup (wrapper->priv->pipes_hash, thread);
-	else
-		return NULL;
+		p = g_hash_table_lookup (wrapper->priv->pipes_hash, thread);
+	gda_mutex_unlock (wrapper->priv->mutex);
+	return p;
 }
 
 /* 
@@ -443,6 +452,8 @@ clean_notifications (GdaThreadWrapper *wrapper, ThreadData *td)
  * @td: (allow-none): may be %NULL
  *
  * Either @wrapper and @td are both NULL, or they are both NOT NULL
+ *
+ * It is assumed that pipe_ref(p) has been called before calling this function
  */
 static gboolean
 write_notification (GdaThreadWrapper *wrapper, ThreadData *td,
@@ -450,8 +461,10 @@ write_notification (GdaThreadWrapper *wrapper, ThreadData *td,
 {
 	g_assert ((wrapper && td) || (!wrapper && !td));
 
-	if (!p || (p->fds[1] < 0))
+	if (!p || (p->fds[1] < 0)) {
+		pipe_unref (p);
 		return TRUE;
+	}
 #ifdef DEBUG_NOTIFICATION_FORCE
 	static guint c = 0;
 	c++;
@@ -475,6 +488,7 @@ write_notification (GdaThreadWrapper *wrapper, ThreadData *td,
 #ifdef DEBUG_NOTIFICATION
 	g_print ("Wrote notification %d.%u to pipe %p\n", type, job_id, p);
 #endif
+	pipe_unref (p);
 	return TRUE;
 
  onerror:
@@ -489,6 +503,7 @@ write_notification (GdaThreadWrapper *wrapper, ThreadData *td,
 	close (p->fds [1]);
 #endif
 	p->fds [1] = -1;
+	pipe_unref (p);
 	if (td)
 		clean_notifications (wrapper, td);
 	return FALSE;
@@ -539,9 +554,11 @@ worker_thread_entry_point (GAsyncQueue *to_worker_thread)
 				job->u.exe.result = NULL;
 				job->void_func (job->arg, &(job->u.exe.error));
 			}
+
+			guint jid = job->job_id;
+			Pipe *jpipe = pipe_ref (job->notif);
 			g_async_queue_push (job->reply_queue, job);
-			if (! write_notification (NULL, NULL, job->notif, GDA_THREAD_NOTIFICATION_JOB,
-						 job->job_id)) {
+			if (! write_notification (NULL, NULL, jpipe, GDA_THREAD_NOTIFICATION_JOB, jid)) {
 				Job *je = g_new0 (Job, 1);
 				je->type = JOB_TYPE_NOTIFICATION_ERROR;
 				g_async_queue_push (job->reply_queue, je);
@@ -901,8 +918,10 @@ gda_thread_wrapper_execute (GdaThreadWrapper *wrapper, GdaThreadWrapperFunc func
 #ifdef THREAD_WRAPPER_DEBUG
 		g_print ("... IMMEDIATELY done job %d => %p\n", job->job_id, job->u.exe.result);
 #endif
+		guint jid = job->job_id;
+		Pipe *jpipe = pipe_ref (job->notif);
                 g_async_queue_push (job->reply_queue, job);
-		write_notification (wrapper, td, job->notif, GDA_THREAD_NOTIFICATION_JOB, job->job_id);
+		write_notification (wrapper, td, jpipe, GDA_THREAD_NOTIFICATION_JOB, jid);
         }
         else
                 g_async_queue_push (wrapper->priv->to_worker_thread, job);
@@ -981,8 +1000,10 @@ gda_thread_wrapper_execute_void (GdaThreadWrapper *wrapper, GdaThreadWrapperVoid
 #ifdef THREAD_WRAPPER_DEBUG
 		g_print ("... IMMEDIATELY done VOID job %d => %p\n", job->job_id, job->u.exe.result);
 #endif
+		guint jid = job->job_id;
+		Pipe *jpipe = pipe_ref (job->notif);
                 g_async_queue_push (job->reply_queue, job);
-		write_notification (wrapper, td, job->notif, GDA_THREAD_NOTIFICATION_JOB, job->job_id);
+		write_notification (wrapper, td, jpipe, GDA_THREAD_NOTIFICATION_JOB, jid);
         }
         else
 		g_async_queue_push (wrapper->priv->to_worker_thread, job);
@@ -1304,8 +1325,9 @@ worker_thread_closure_marshal (GClosure *closure,
 		g_value_copy (src, dest);
 	}
 
+	Pipe *jpipe = pipe_ref (sigspec->notif);
 	g_async_queue_push (sigspec->reply_queue, job);
-	if (! write_notification (NULL, NULL, sigspec->notif, GDA_THREAD_NOTIFICATION_SIGNAL, 0)) {
+	if (! write_notification (NULL, NULL, jpipe, GDA_THREAD_NOTIFICATION_SIGNAL, 0)) {
 		Job *je = g_new0 (Job, 1);
 		je->type = JOB_TYPE_NOTIFICATION_ERROR;
 		g_async_queue_push (sigspec->reply_queue, je);
@@ -1350,8 +1372,9 @@ worker_thread_closure_marshal_anythread (GClosure *closure,
 		g_value_copy (src, dest);
 	}
 
+	Pipe *jpipe = pipe_ref (sigspec->notif);
 	g_async_queue_push (sigspec->reply_queue, job);
-	if (! write_notification (NULL, NULL, sigspec->notif, GDA_THREAD_NOTIFICATION_SIGNAL, 0)) {
+	if (! write_notification (NULL, NULL, jpipe, GDA_THREAD_NOTIFICATION_SIGNAL, 0)) {
 		Job *je = g_new0 (Job, 1);
 		je->type = JOB_TYPE_NOTIFICATION_ERROR;
 		g_async_queue_push (sigspec->reply_queue, je);



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]