[evolution-data-server] Use less threads in CamelDB



commit 793e9cda8a969f38939ef0ba69bef8bea4df28ad
Author: Milan Crha <mcrha redhat com>
Date:   Wed Dec 1 12:03:24 2010 +0100

    Use less threads in CamelDB

 camel/camel-db.c |  167 +++++++++++++++++++++++++----------------------------
 1 files changed, 79 insertions(+), 88 deletions(-)
---
diff --git a/camel/camel-db.c b/camel/camel-db.c
index 2a001cd..84b6239 100644
--- a/camel/camel-db.c
+++ b/camel/camel-db.c
@@ -32,6 +32,7 @@
 #include <stdio.h>
 #include <stdlib.h>
 #include <string.h>
+#include "libedataserver/e-flag.h"
 
 #include <glib/gi18n-lib.h>
 
@@ -42,12 +43,12 @@
 #define SYNC_TIMEOUT_SECONDS 5
 
 static sqlite3_vfs *old_vfs = NULL;
+static GThreadPool *sync_pool = NULL;
 
 typedef struct {
 	sqlite3_file parent;
 	sqlite3_file *old_vfs_file; /* pointer to old_vfs' file */
-	GAsyncQueue *queue;
-	GThread *thread;
+	GStaticRecMutex sync_mutex;
 	guint timeout_id;
 	gint flags;
 } CamelSqlite3File;
@@ -62,74 +63,84 @@ call_old_file_Sync (CamelSqlite3File *cFile, gint flags)
 	return cFile->old_vfs_file->pMethods->xSync (cFile->old_vfs_file, flags);
 }
 
-/* This special flag tells the sync request thread to exit.
- * Just have to make sure it does not collide with SQLite's
- * own synchronization flags (SQLITE_SYNC_xxx). */
-#define SYNC_THREAD_EXIT 0x100000
+struct SyncRequestData
+{
+	CamelSqlite3File *cFile;
+	guint32 flags;
+	EFlag *sync_op; /* not NULL when waiting for a finish; will be freed by the caller */
+};
 
-static gpointer
-sync_request_thread_cb (CamelSqlite3File *cFile)
+static void
+sync_request_thread_cb (gpointer task_data, gpointer null_data)
 {
-	gpointer data;
-	gint flags = 0;
+	struct SyncRequestData *sync_data = task_data;
+	EFlag *sync_op;
 
-	g_async_queue_ref (cFile->queue);
+	g_return_if_fail (sync_data != NULL);
+	g_return_if_fail (sync_data->cFile != NULL);
 
-	while (TRUE) {
-		/* Block until a request arrives. */
-		data = g_async_queue_pop (cFile->queue);
+	call_old_file_Sync (sync_data->cFile, sync_data->flags);
 
-		/* Make sure we can safely deference. */
-		if (data == NULL)
-			continue;
+	sync_op = sync_data->sync_op;
+	g_free (sync_data);
 
-		/* Extract flags and discard request. */
-		flags = *((gint *) data);
-		g_slice_free (gint, data);
+	if (sync_op)
+		e_flag_set (sync_op);
+}
 
-		/* Check for exit request. */
-		if (flags & SYNC_THREAD_EXIT)
-			break;
+static void
+sync_push_request (CamelSqlite3File *cFile, gboolean wait_for_finish)
+{
+	struct SyncRequestData *data;
+	EFlag *sync_op = NULL;
+	GError *error = NULL;
 
-		/* Got a boneafide sync request.
-		 * Do it, but ignore errors. */
-		call_old_file_Sync (cFile, flags);
-	}
+	g_return_if_fail (cFile != NULL);
+	g_return_if_fail (sync_pool != NULL);
 
-	/* Clear the exit flag. */
-	flags &= ~SYNC_THREAD_EXIT;
+	g_static_rec_mutex_lock (&cFile->sync_mutex);
 
-	/* One more for the road? */
-	if (flags != 0 && getenv ("CAMEL_NO_SYNC_ON_CLOSE") == NULL)
-		call_old_file_Sync (cFile, flags);
+	if (wait_for_finish)
+		sync_op = e_flag_new ();
 
-	g_async_queue_unref (cFile->queue);
+	data = g_new0 (struct SyncRequestData, 1);
+	data->cFile = cFile;
+	data->flags = cFile->flags;
+	data->sync_op = sync_op;
 
-	return NULL;
-}
+	cFile->flags = 0;
 
-static gboolean
-sync_push_request (CamelSqlite3File *cFile)
-{
-	gint *data;
+	g_static_rec_mutex_unlock (&cFile->sync_mutex);
 
-	/* The queue itself does not need to be locked yet,
-	 * but we use its mutex to safely manipulate flags. */
-	g_async_queue_lock (cFile->queue);
+	g_thread_pool_push (sync_pool, data, &error);
 
-	/* We can't just cast the flags to a pointer because
-	 * g_async_queue_push() won't take NULLs, and flags
-	 * may be zero.  So we have to allocate memory to
-	 * send an integer.  Bother. */
-	data = g_slice_new (gint);
-	*data = cFile->flags;
-	cFile->flags = 0;
+	if (error) {
+		g_warning ("%s: Failed to push to thread pool: %s\n", G_STRFUNC, error->message);
+		g_error_free (error);
 
-	g_async_queue_push_unlocked (cFile->queue, data);
+		if (sync_op)
+			e_flag_free (sync_op);
 
-	cFile->timeout_id = 0;
+		return;
+	}
 
-	g_async_queue_unlock (cFile->queue);
+	if (sync_op) {
+		e_flag_wait (sync_op);
+		e_flag_free (sync_op);
+	}
+}
+
+static gboolean
+sync_push_request_timeout (CamelSqlite3File *cFile)
+{
+	g_static_rec_mutex_lock (&cFile->sync_mutex);
+
+	if (cFile->timeout_id != 0) {
+		sync_push_request (cFile, FALSE);
+		cFile->timeout_id = 0;
+	}
+
+	g_static_rec_mutex_unlock (&cFile->sync_mutex);
 
 	return FALSE;
 }
@@ -189,32 +200,18 @@ camel_sqlite3_file_xClose (sqlite3_file *pFile)
 
 	cFile = (CamelSqlite3File *) pFile;
 
-	/* The queue itself does not need to be locked yet,
-	 * but we use its mutex to safely manipulate flags. */
-	g_async_queue_lock (cFile->queue);
-
-	/* Tell the sync request thread to exit.  It may do
-	 * one last sync before exiting, so preserve any sync
-	 * flags that have accumulated. */
-	cFile->flags |= SYNC_THREAD_EXIT;
+	g_static_rec_mutex_lock (&cFile->sync_mutex);
 
 	/* Cancel any pending sync requests. */
-	if (cFile->timeout_id > 0)
+	if (cFile->timeout_id > 0) {
 		g_source_remove (cFile->timeout_id);
+		cFile->timeout_id = 0;
+	}
 
-	/* Unlock the queue before pushing the exit request. */
-	g_async_queue_unlock (cFile->queue);
-
-	/* Push the exit request. */
-	sync_push_request (cFile);
-
-	/* Wait for the thread to exit. */
-	g_thread_join (cFile->thread);
-	cFile->thread = NULL;
+	g_static_rec_mutex_unlock (&cFile->sync_mutex);
 
-	/* Now we can safely destroy the queue. */
-	g_async_queue_unref (cFile->queue);
-	cFile->queue = NULL;
+	/* Make the last sync. */
+	sync_push_request (cFile, TRUE);
 
 	if (cFile->old_vfs_file->pMethods)
 		res = cFile->old_vfs_file->pMethods->xClose (cFile->old_vfs_file);
@@ -224,6 +221,8 @@ camel_sqlite3_file_xClose (sqlite3_file *pFile)
 	g_free (cFile->old_vfs_file);
 	cFile->old_vfs_file = NULL;
 
+	g_static_rec_mutex_free (&cFile->sync_mutex);
+
 	return res;
 }
 
@@ -237,9 +236,7 @@ camel_sqlite3_file_xSync (sqlite3_file *pFile, gint flags)
 
 	cFile = (CamelSqlite3File *) pFile;
 
-	/* The queue itself does not need to be locked yet,
-	 * but we use its mutex to safely manipulate flags. */
-	g_async_queue_lock (cFile->queue);
+	g_static_rec_mutex_lock (&cFile->sync_mutex);
 
 	/* If a sync request is already scheduled, accumulate flags. */
 	cFile->flags |= flags;
@@ -251,9 +248,9 @@ camel_sqlite3_file_xSync (sqlite3_file *pFile, gint flags)
 	/* Wait SYNC_TIMEOUT_SECONDS before we actually sync. */
 	cFile->timeout_id = g_timeout_add_seconds (
 		SYNC_TIMEOUT_SECONDS, (GSourceFunc)
-		sync_push_request, cFile);
+		sync_push_request_timeout, cFile);
 
-	g_async_queue_unlock (cFile->queue);
+	g_static_rec_mutex_unlock (&cFile->sync_mutex);
 
 	return SQLITE_OK;
 }
@@ -264,7 +261,6 @@ camel_sqlite3_vfs_xOpen (sqlite3_vfs *pVfs, const gchar *zPath, sqlite3_file *pF
 	static GStaticRecMutex only_once_lock = G_STATIC_REC_MUTEX_INIT;
 	static sqlite3_io_methods io_methods = {0};
 	CamelSqlite3File *cFile;
-	GError *error = NULL;
 	gint res;
 
 	g_return_val_if_fail (old_vfs != NULL, -1);
@@ -279,18 +275,13 @@ camel_sqlite3_vfs_xOpen (sqlite3_vfs *pVfs, const gchar *zPath, sqlite3_file *pF
 		return res;
 	}
 
-	cFile->queue = g_async_queue_new ();
-
-	/* Spawn a joinable thread to listen for sync requests. */
-	cFile->thread = g_thread_create (
-		(GThreadFunc) sync_request_thread_cb, cFile, TRUE, &error);
-	if (error != NULL) {
-		g_warning ("%s", error->message);
-		g_error_free (error);
-	}
+	g_static_rec_mutex_init (&cFile->sync_mutex);
 
 	g_static_rec_mutex_lock (&only_once_lock);
 
+	if (!sync_pool)
+		sync_pool = g_thread_pool_new (sync_request_thread_cb, NULL, 2, FALSE, NULL);
+
 	/* cFile->old_vfs_file->pMethods is NULL when open failed for some reason,
 	   thus do not initialize our structure when do not know the version */
 	if (io_methods.xClose == NULL && cFile->old_vfs_file->pMethods) {



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]