evolution-data-server r10162 - trunk/camel



Author: mbarnes
Date: Thu Mar 12 20:38:41 2009
New Revision: 10162
URL: http://svn.gnome.org/viewvc/evolution-data-server?rev=10162&view=rev

Log:
2009-03-12  Matthew Barnes  <mbarnes redhat com>

	* camel/camel-db.c:
	Rework the SQLite fsync() workaround to use a simpler thread
	design and fix a thread leak that caused Evolution to effectively
	stop working after several hours.



Modified:
   trunk/camel/ChangeLog
   trunk/camel/camel-db.c

Modified: trunk/camel/camel-db.c
==============================================================================
--- trunk/camel/camel-db.c	(original)
+++ trunk/camel/camel-db.c	Thu Mar 12 20:38:41 2009
@@ -36,239 +36,112 @@
 
 #include "camel-debug.h"
 
-/* how long to wait before invoking sync on the file; in miliseconds */
-#define SYNC_TIMEOUT 5000
+/* how long to wait before invoking sync on the file */
+#define SYNC_TIMEOUT_SECONDS 5
 
 static sqlite3_vfs *old_vfs = NULL;
 
-GStaticRecMutex only_once_lock = G_STATIC_REC_MUTEX_INIT;
-GStaticRecMutex sync_queue_lock = G_STATIC_REC_MUTEX_INIT;
-#define LockQueue()   g_static_rec_mutex_lock   (&sync_queue_lock)
-#define UnlockQueue() g_static_rec_mutex_unlock (&sync_queue_lock)
-
-/* 'sync_queue' is using keys sqlite3_file to sync_queue_data structures.
-   Access to this is guarded with LockQueue/UnlockQueue function. */
-static GHashTable *sync_queue = NULL;
-
-typedef struct _sync_queue_data {
-	guint timeout_source; /* id of the source */
-	GThread *running_thread;
-
-	int sync_flags; 
-} sync_queue_data;
-
-struct CamelSqlite3File
-{
+typedef struct {
 	sqlite3_file parent;
 	sqlite3_file *old_vfs_file; /* pointer to old_vfs' file */
-};
+	GAsyncQueue *queue;
+	GThread *thread;
+	guint timeout_id;
+	gint flags;
+} CamelSqlite3File;
 
 static int
-call_old_file_Sync (sqlite3_file *pFile, int flags)
+call_old_file_Sync (CamelSqlite3File *cFile, int flags)
 {
-	struct CamelSqlite3File *cFile;
-
 	g_return_val_if_fail (old_vfs != NULL, SQLITE_ERROR);
-	g_return_val_if_fail (pFile != NULL, SQLITE_ERROR);
+	g_return_val_if_fail (cFile != NULL, SQLITE_ERROR);
 
-	cFile = (struct CamelSqlite3File *)pFile;
 	g_return_val_if_fail (cFile->old_vfs_file->pMethods != NULL, SQLITE_ERROR);
 	return cFile->old_vfs_file->pMethods->xSync (cFile->old_vfs_file, flags);
 }
 
-static gboolean prepare_to_run_sync_in_thread (gpointer pFile);
+/* This special flag tells the sync request thread to exit.
+ * Just have to make sure it does not collide with SQLite's
+ * own synchronization flags (SQLITE_SYNC_xxx). */
+#define SYNC_THREAD_EXIT 0x100000
 
 static gpointer
-run_sync_in_thread (gpointer pFile)
+sync_request_thread_cb (CamelSqlite3File *cFile)
 {
-	int sync_flags = 0;
-	sync_queue_data *data;
+	gpointer data;
+	gint flags = 0;
 
-	g_return_val_if_fail (pFile != NULL, NULL);
-	g_return_val_if_fail (sync_queue != NULL, NULL);
+	g_async_queue_ref (cFile->queue);
 
-	LockQueue ();
-	data = g_hash_table_lookup (sync_queue, pFile);
-	if (data) {
-		/* sync_flags can change while we are running */
-		sync_flags = data->sync_flags;
-		data->sync_flags = 0;
-	}
-	UnlockQueue ();
-
-	/* this should not happen, once we are in a thread, the datas are ours */
-	g_return_val_if_fail (data != NULL, NULL);
-
-	/* do the sync itself, but do not block the sync_queue;
-	   any error here is silently ignored. */
-	call_old_file_Sync (/*sqlite3_file*/pFile, sync_flags);
-
-	LockQueue ();
-	if (data->timeout_source == -1) {
-		/* new sync request arrived meanwhile, indicate thread finished... */
-		data->running_thread = NULL;
-		/* ...and reschedule */
-		data->timeout_source = g_timeout_add (SYNC_TIMEOUT, prepare_to_run_sync_in_thread, pFile);
-	} else {
-		/* remove it from a sync_queue and free memory */
-		g_hash_table_remove (sync_queue, pFile);
-		g_free (data);
-	}
-	UnlockQueue ();
-
-	return NULL;
-}
+	while (TRUE) {
+		/* Block until a request arrives. */
+		data = g_async_queue_pop (cFile->queue);
 
-static gboolean
-prepare_to_run_sync_in_thread (gpointer pFile)
-{
-	sync_queue_data *data;
+		/* Make sure we can safely deference. */
+		if (data == NULL)
+			continue;
 
-	g_return_val_if_fail (pFile != NULL, FALSE);
-	g_return_val_if_fail (sync_queue != NULL, FALSE);
+		/* Extract flags and discard request. */
+		flags = *((gint *) data);
+		g_slice_free (gint, data);
 
-	LockQueue ();
+		/* Check for exit request. */
+		if (flags & SYNC_THREAD_EXIT)
+			break;
 
-	data = g_hash_table_lookup (sync_queue, pFile);
-	/* check if still tracking this file and if didn't get rescheduled */
-	if (data && data->timeout_source == g_source_get_id (g_main_current_source ())) {
-		/* run the thread */
-		data->running_thread = g_thread_create (run_sync_in_thread, pFile, TRUE, NULL);
-		data->timeout_source = 0;
+		/* Got a boneafide sync request.
+		 * Do it, but ignore errors. */
+		call_old_file_Sync (cFile, flags);
 	}
 
-	UnlockQueue ();
+	/* Clear the exit flag. */
+	flags &= ~SYNC_THREAD_EXIT;
 
-	return FALSE;
-}
-
-/*
-   Adds sync on this file to the queue. Flags are just bit-OR-ed,
-   which will not hopefully hurt. In case the file is waiting for
-   it's sync, we just postpone it once again.
-   In case file is syncing just in call of this, we schedule other
-   sync after that.
- */
-static void
-queue_sync (sqlite3_file *pFile, int flags)
-{
-	sync_queue_data *data;
-
-	g_return_if_fail (pFile != NULL);
-	g_return_if_fail (flags != 0);
-
-	LockQueue ();
-
-	if (!sync_queue)
-		sync_queue = g_hash_table_new (g_direct_hash, g_direct_equal);
-
-	data = g_hash_table_lookup (sync_queue, pFile);
-	if (data) {
-		/* There is a sync request for this file already. */
-		if (data->running_thread) {
-			/* -1 indicates to reschedule after the actual sync finishes */
-			data->timeout_source = -1;
-			/* start with new flags - thread set it to 0; next time just add others */
-			data->sync_flags = data->sync_flags | flags;
-		} else {
-			data->sync_flags = data->sync_flags | flags;
-
-			/* reschedule */
-			g_source_remove (data->timeout_source);
-			data->timeout_source = g_timeout_add (SYNC_TIMEOUT, prepare_to_run_sync_in_thread, pFile);
-		}
-	} else {
-		data = g_malloc0 (sizeof (sync_queue_data));
-		data->sync_flags = flags;
-		data->running_thread = NULL;
-		data->timeout_source = g_timeout_add (SYNC_TIMEOUT, prepare_to_run_sync_in_thread, pFile);
+	/* One more for the road? */
+	if (flags != 0 && getenv ("CAMEL_NO_SYNC_ON_CLOSE") == NULL)
+		call_old_file_Sync (cFile, flags);
 
-		g_hash_table_insert (sync_queue, pFile, data);
-	}
+	g_async_queue_unref (cFile->queue);
 
-	UnlockQueue ();
+	return NULL;
 }
 
-/*
-   If file is not in a queue, it does nothing, otherwise it removes
-   it from a queue, and calls sync immediately.
-   If file is syncing at the moment, it waits until the previous sync finishes.
- */
-static void
-dequeue_sync (sqlite3_file *pFile)
-{
-	sync_queue_data *data;
-
-	g_return_if_fail (pFile != NULL);
-
-	LockQueue ();
-
-	if (!sync_queue) {
-		/* closing file which wasn't requested to sync, and none
-		   before it too. */
-		UnlockQueue ();
-		return;
-	}
-
-	data = g_hash_table_lookup (sync_queue, pFile);
-	if (data) {
-		int sync_flags = data->sync_flags;
-
-		if (data->timeout_source) {
-			if (data->timeout_source != -1)
-				g_source_remove (data->timeout_source);
-			data->timeout_source = 0;
-		}
-
-		if (data->running_thread) {
-			GThread *thread = data->running_thread;
+static gboolean
+sync_push_request (CamelSqlite3File *cFile)
+{
+	gint *data;
 
-			/* do not do anything later */
-			data = NULL;
+	/* The queue itself does not need to be locked yet,
+	 * but we use its mutex to safely manipulate flags. */
+	g_async_queue_lock (cFile->queue);
 
-			/* unlock here, thus the thread can hold the lock again */
-			UnlockQueue ();
+	/* We can't just cast the flags to a pointer because
+	 * g_async_queue_push() won't take NULLs, and flags
+	 * may be zero.  So we have to allocate memory to
+	 * send an integer.  Bother. */
+	data = g_slice_new (gint);
+	*data = cFile->flags;
+	cFile->flags = 0;
 
-			/* it's running at the moment, wait for a finish.
-			   it'll remove structure from a sync_queue too. */
-			g_thread_join (thread);
-		} else {
-			g_hash_table_remove (sync_queue, pFile);
-		}
+	g_async_queue_push_unlocked (cFile->queue, data);
 
-		if (data) {
-			static gboolean no_sync_on_close = FALSE, iKnow = FALSE;
+	cFile->timeout_id = 0;
 
-			if (!iKnow) {
-				iKnow = TRUE;
-				no_sync_on_close = getenv ("CAMEL_NO_SYNC_ON_CLOSE") != NULL;
-			}
-
-			/* do not block queue on while syncing */
-			UnlockQueue ();
-
-			if (!no_sync_on_close) {
-				/* sync on close */
-				call_old_file_Sync (pFile, sync_flags);
-			}
+	g_async_queue_unlock (cFile->queue);
 
-			g_free (data);
-		}
-	} else {
-		UnlockQueue ();
-	}
+	return FALSE;
 }
 
 #define def_subclassed(_nm, _params, _call)			\
 static int							\
 camel_sqlite3_file_ ## _nm _params				\
 {								\
-	struct CamelSqlite3File *cFile;				\
+	CamelSqlite3File *cFile;				\
 								\
 	g_return_val_if_fail (old_vfs != NULL, SQLITE_ERROR);	\
 	g_return_val_if_fail (pFile != NULL, SQLITE_ERROR);	\
 								\
-	cFile = (struct CamelSqlite3File *) pFile;		\
+	cFile = (CamelSqlite3File *) pFile;		\
 	g_return_val_if_fail (cFile->old_vfs_file->pMethods != NULL, SQLITE_ERROR);	\
 	return cFile->old_vfs_file->pMethods->_nm _call;	\
 }
@@ -293,15 +166,41 @@
 static int
 camel_sqlite3_file_xClose (sqlite3_file *pFile)
 {
-	struct CamelSqlite3File *cFile;
+	CamelSqlite3File *cFile;
 	int res;
 
 	g_return_val_if_fail (old_vfs != NULL, SQLITE_ERROR);
 	g_return_val_if_fail (pFile != NULL, SQLITE_ERROR);
 
-	dequeue_sync (pFile);
+	cFile = (CamelSqlite3File *) pFile;
+
+	/* The queue itself does not need to be locked yet,
+	 * but we use its mutex to safely manipulate flags. */
+	g_async_queue_lock (cFile->queue);
+
+	/* Tell the sync request thread to exit.  It may do
+	 * one last sync before exiting, so preserve any sync
+	 * flags that have accumulated. */
+	cFile->flags |= SYNC_THREAD_EXIT;
+
+	/* Cancel any pending sync requests. */
+	if (cFile->timeout_id > 0)
+		g_source_remove (cFile->timeout_id);
+
+	/* Unlock the queue before pushing the exit request. */
+	g_async_queue_unlock (cFile->queue);
+
+	/* Push the exit request. */
+	sync_push_request (cFile);
+
+	/* Wait for the thread to exit. */
+	g_thread_join (cFile->thread);
+	cFile->thread = NULL;
+
+	/* Now we can safely destroy the queue. */
+	g_async_queue_unref (cFile->queue);
+	cFile->queue = NULL;
 
-	cFile = (struct CamelSqlite3File *) pFile;
 	if (cFile->old_vfs_file->pMethods)
 		res = cFile->old_vfs_file->pMethods->xClose (cFile->old_vfs_file);
 	else
@@ -316,10 +215,30 @@
 static int 
 camel_sqlite3_file_xSync (sqlite3_file *pFile, int flags)
 {
+	CamelSqlite3File *cFile;
+
 	g_return_val_if_fail (old_vfs != NULL, SQLITE_ERROR);
 	g_return_val_if_fail (pFile != NULL, SQLITE_ERROR);
 
-	queue_sync (pFile, flags);
+	cFile = (CamelSqlite3File *) pFile;
+
+	/* The queue itself does not need to be locked yet,
+	 * but we use its mutex to safely manipulate flags. */
+	g_async_queue_lock (cFile->queue);
+
+	/* If a sync request is already scheduled, accumulate flags. */
+	cFile->flags |= flags;
+
+	/* Cancel any pending sync requests. */
+	if (cFile->timeout_id > 0)
+		g_source_remove (cFile->timeout_id);
+
+	/* Wait SYNC_TIMEOUT_SECONDS before we actually sync. */
+	cFile->timeout_id = g_timeout_add_seconds (
+		SYNC_TIMEOUT_SECONDS, (GSourceFunc)
+		sync_push_request, cFile);
+
+	g_async_queue_unlock (cFile->queue);
 
 	return SQLITE_OK;
 }
@@ -327,15 +246,26 @@
 static int
 camel_sqlite3_vfs_xOpen (sqlite3_vfs *pVfs, const char *zPath, sqlite3_file *pFile, int flags, int *pOutFlags)
 {
+	static GStaticRecMutex only_once_lock = G_STATIC_REC_MUTEX_INIT;
 	static sqlite3_io_methods io_methods = {0};
-	struct CamelSqlite3File *cFile;
+	CamelSqlite3File *cFile;
+	GError *error = NULL;
 	int res;
 
 	g_return_val_if_fail (old_vfs != NULL, -1);
 	g_return_val_if_fail (pFile != NULL, -1);
 
-	cFile = (struct CamelSqlite3File *)pFile;
+	cFile = (CamelSqlite3File *)pFile;
 	cFile->old_vfs_file = g_malloc0 (old_vfs->szOsFile);
+	cFile->queue = g_async_queue_new ();
+
+	/* Spawn a joinable thread to listen for sync requests. */
+	cFile->thread = g_thread_create (
+		(GThreadFunc) sync_request_thread_cb, cFile, TRUE, &error);
+	if (error != NULL) {
+		g_warning ("%s", error->message);
+		g_error_free (error);
+	}
 
 	res = old_vfs->xOpen (old_vfs, zPath, cFile->old_vfs_file, flags, pOutFlags);
 
@@ -370,33 +300,23 @@
 	return res;
 }
 
-static void
+static gpointer
 init_sqlite_vfs (void)
 {
 	static sqlite3_vfs vfs = { 0 };
 
-	g_static_rec_mutex_lock (&only_once_lock);
-	if (old_vfs) {
-		g_static_rec_mutex_unlock (&only_once_lock);
-		return;
-	}
-
 	old_vfs = sqlite3_vfs_find (NULL);
-	if (!old_vfs) {
-		g_static_rec_mutex_unlock (&only_once_lock);
-		g_return_if_fail (old_vfs != NULL);
-		return;
-	}
+	g_return_val_if_fail (old_vfs != NULL, NULL);
 
 	memcpy (&vfs, old_vfs, sizeof (sqlite3_vfs));
 
-	vfs.szOsFile = sizeof (struct CamelSqlite3File);
+	vfs.szOsFile = sizeof (CamelSqlite3File);
 	vfs.zName = "camel_sqlite3_vfs";
 	vfs.xOpen = camel_sqlite3_vfs_xOpen;
 
 	sqlite3_vfs_register (&vfs, 1);
 
-	g_static_rec_mutex_unlock (&only_once_lock);
+	return NULL;
 }
 
 #define d(x) if (camel_debug("sqlite")) x
@@ -453,12 +373,13 @@
 CamelDB *
 camel_db_open (const char *path, CamelException *ex)
 {
+	static GOnce vfs_once = G_ONCE_INIT;
 	CamelDB *cdb;
 	sqlite3 *db;
 	char *cache;
 	int ret;
 
-	init_sqlite_vfs ();
+	g_once (&vfs_once, (GThreadFunc) init_sqlite_vfs, NULL);
 
 	CAMEL_DB_USE_SHARED_CACHE;
 	



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]