[evolution-data-server] Add locks to input/output streams



commit 6b8c7302d50dbb71be2f678b62e6a38e7fadf552
Author: Chenthill Palanisamy <pchenthill novell com>
Date:   Tue Feb 23 06:26:31 2010 +0530

    Add locks to input/output streams

 camel/providers/imapx/camel-imapx-server.c |  187 ++++++++++++++++-----------
 camel/providers/imapx/camel-imapx-server.h |    2 +
 2 files changed, 113 insertions(+), 76 deletions(-)
---
diff --git a/camel/providers/imapx/camel-imapx-server.c b/camel/providers/imapx/camel-imapx-server.c
index ad50202..3eab8e1 100644
--- a/camel/providers/imapx/camel-imapx-server.c
+++ b/camel/providers/imapx/camel-imapx-server.c
@@ -738,12 +738,8 @@ imapx_command_start (CamelIMAPXServer *imap, CamelIMAPXCommand *ic)
 	gboolean ret = TRUE;
 
 	camel_imapx_command_close(ic);
-
-	/* FIXME: assert the selected folder == ic->selected */
-
 	cp = (CamelIMAPXCommandPart *)ic->parts.head;
 	g_assert(cp->next);
-
 	ic->current = cp;
 
 	/* TODO: If we support literal+ we should be able to write the whole command out
@@ -754,25 +750,22 @@ imapx_command_start (CamelIMAPXServer *imap, CamelIMAPXCommand *ic)
 
 	camel_dlist_addtail(&imap->active, (CamelDListNode *)ic);
 
+	g_static_rec_mutex_lock (&imap->ostream_lock);
+	
 	c(printf("Staring command (active=%d,%s) %c%05u %s\r\n", camel_dlist_length(&imap->active), imap->literal?" literal":"", imap->tagprefix, ic->tag, cp->data));
 	if (!imap->stream || camel_stream_printf((CamelStream *)imap->stream, "%c%05u %s\r\n", imap->tagprefix, ic->tag, cp->data) == -1) {
+		g_print ("Command start failed  \n");
 		camel_exception_set (ic->ex, 1, "Command start failed");
 		ret = FALSE;
-		camel_dlist_remove((CamelDListNode *)ic);
+		camel_dlist_remove ((CamelDListNode *)ic);
 	}
-
+	
+	g_static_rec_mutex_unlock (&imap->ostream_lock);
+	
 	return ret;
 }
 
-/* must have QUEUE lock */
-static void
-imapx_command_start_next(CamelIMAPXServer *is, CamelException *ex)
-{
-	CamelIMAPXCommand *ic, *nc;
-	gint count = 0;
-	gint pri = -128;
-
-	/* See if we can start another task yet.
+/* See if we can start another task yet.
 
 	If we're waiting for a literal, we cannot proceed.
 
@@ -787,12 +780,17 @@ imapx_command_start_next(CamelIMAPXServer *is, CamelException *ex)
 	If we dont, select the first folder required,
 	then queue all the outstanding jobs on it, that
 	are at least as high priority as the first.
+	
+	must have QUEUE lock */
 
-	This is very shitty code!
-	*/
+static void
+imapx_command_start_next(CamelIMAPXServer *is, CamelException *ex)
+{
+	CamelIMAPXCommand *ic, *nc;
+	gint count = 0;
+	gint pri = -128;
 
 	c(printf("** Starting next command\n"));
-
 	if (is->literal != NULL || is->select_pending != NULL) {
 		c(printf("* no, waiting for literal/pending select '%s'\n", is->select_pending->full_name));
 		return;
@@ -1138,7 +1136,7 @@ imapx_untagged(CamelIMAPXServer *imap, CamelException *ex)
 		if (finfo->got & FETCH_FLAGS && !(finfo->got & FETCH_UID)) {
 			if (imap->select_folder) {
 				CamelFolder *folder;
-				CamelMessageInfo *mi;
+				CamelMessageInfo *mi = NULL;
 				gboolean changed = FALSE;
 				gchar *uid = NULL;
 
@@ -1366,12 +1364,16 @@ imapx_continuation(CamelIMAPXServer *imap, CamelException *ex)
 
 	printf("got continuation response\n");
 
+	CAMEL_SERVICE_REC_LOCK (imap->store, connect_lock);
 	/* The 'literal' pointer is like a write-lock, nothing else
 	   can write while we have it ... so we dont need any
 	   ohter lock here.  All other writes go through
 	   queue-lock */
 	if (imapx_idle_supported (imap) && imapx_in_idle (imap)) {
 		camel_imapx_stream_skip (imap->stream, ex);
+
+		CAMEL_SERVICE_REC_UNLOCK (imap->store, connect_lock);
+		
 		printf("Got continuation response for IDLE \n");
 		imap->idle->started = TRUE;
 
@@ -1386,6 +1388,7 @@ imapx_continuation(CamelIMAPXServer *imap, CamelException *ex)
 	ic = imap->literal;
 	if (ic == NULL) {
 		camel_imapx_stream_skip(imap->stream, ex);
+		CAMEL_SERVICE_REC_UNLOCK (imap->store, connect_lock);
 		printf("got continuation response with no outstanding continuation requests?\n");
 		return 1;
 	}
@@ -1441,6 +1444,7 @@ imapx_continuation(CamelIMAPXServer *imap, CamelException *ex)
 		/* should we just ignore? */
 		imap->literal = NULL;
 		camel_exception_set (ex, 1, "continuation response for non-continuation request");
+		CAMEL_SERVICE_REC_UNLOCK (imap->store, connect_lock);
 		return -1;
 	}
 
@@ -1460,6 +1464,8 @@ imapx_continuation(CamelIMAPXServer *imap, CamelException *ex)
 		printf("%p: queueing continuation\n", ic);
 		camel_stream_printf((CamelStream *)imap->stream, "\r\n");
 	}
+	
+	CAMEL_SERVICE_REC_UNLOCK (imap->store, connect_lock);
 
 	QUEUE_LOCK(imap);
 	imap->literal = newliteral;
@@ -1531,7 +1537,10 @@ imapx_completion(CamelIMAPXServer *imap, guchar *token, gint len, CamelException
 
 	camel_dlist_remove ((CamelDListNode *) ic);
 	QUEUE_UNLOCK(imap);
+	
+	CAMEL_SERVICE_REC_LOCK (imap->store, connect_lock);
 	ic->status = imapx_parse_status(imap->stream, ex);
+	CAMEL_SERVICE_REC_UNLOCK (imap->store, connect_lock);
 
 	if (ic->complete)
 		ic->complete (imap, ic);
@@ -1551,7 +1560,9 @@ imapx_step(CamelIMAPXServer *is, CamelException *ex)
 	gint tok;
 
 	// poll ?  wait for other stuff? loop?
+	CAMEL_SERVICE_REC_LOCK (is->store, connect_lock);
 	tok = camel_imapx_stream_token (is->stream, &token, &len, ex);
+	CAMEL_SERVICE_REC_UNLOCK (is->store, connect_lock);
 	if (camel_exception_is_set (ex))
 		return;
 
@@ -1990,11 +2001,14 @@ imapx_connect (CamelIMAPXServer *is, gint ssl_mode, gint try_starttls, CamelExce
 			}
 			tcp_stream = camel_tcp_stream_ssl_new(is->session, is->url->host, SSL_PORT_FLAGS);
 		}
+		is->is_ssl_stream = TRUE;
 	} else {
 		tcp_stream = camel_tcp_stream_raw_new ();
+		is->is_ssl_stream = FALSE;
 	}
 #else
 	tcp_stream = camel_tcp_stream_raw_new ();
+	is->is_ssl_stream = FALSE;
 #endif /* HAVE_SSL */
 
 	hints.ai_socktype = SOCK_STREAM;
@@ -2428,7 +2442,6 @@ imapx_command_append_message_done (CamelIMAPXServer *is, CamelIMAPXCommand *ic)
 						changes);
 				camel_folder_change_info_free (changes);
 
-				camel_message_info_free(mi);
 				g_free(cur);
 			} else {
 				printf("but uidvalidity changed, uh ...\n");
@@ -3173,6 +3186,52 @@ cancel_all_jobs (CamelIMAPXServer *is, CamelException *ex)
 
 /* ********************************************************************** */
 
+static void
+parse_contents (CamelIMAPXServer *is, CamelException *ex)
+{
+	gint buffered = 0;
+
+	do {
+		imapx_step(is, ex);
+		
+		CAMEL_SERVICE_REC_LOCK (is->store, connect_lock);
+		
+		buffered = camel_imapx_stream_buffered (is->stream);
+
+		CAMEL_SERVICE_REC_UNLOCK (is->store, connect_lock);
+
+	} while (buffered && !camel_exception_is_set (ex));
+}
+
+/*
+   The main processing (reading) loop.
+
+   Incoming requests are added as jobs and tasks from other threads,
+   we just read the results from the server continously, and match
+   them up with the queued tasks as they come back.
+
+   Of course this loop can also initiate its own commands as well.
+
+   So, multiple threads can submit jobs, and write to the
+   stream (issue: locking stream for write?), but only this
+   thread can ever read from the stream.  This simplifies
+   locking, and greatly simplifies working out when new
+   work is ready.
+  
+   TODO:
+   This poll stuff wont work - we might block
+   waiting for results inside loops etc.
+
+   Requires a different approach:
+   +
+
+   New commands are queued in other threads as well
+   as this thread, and get pipelined over the socket.
+
+   Main area of locking required is command_queue
+   and command_start_next, the 'literal' command,
+   the jobs queue, the active queue, the queue
+   queue. */
 static gpointer
 imapx_parser_thread (gpointer d)
 {
@@ -3180,59 +3239,34 @@ imapx_parser_thread (gpointer d)
 	CamelException ex = CAMEL_EXCEPTION_INITIALISER;
 	CamelOperation *op;
 
-	/*
-	  The main processing (reading) loop.
-
-	  Incoming requests are added as jobs and tasks from other threads,
-	  we just read the results from the server continously, and match
-	  them up with the queued tasks as they come back.
-
-	  Of course this loop can also initiate its own commands as well.
-
-	  So, multiple threads can submit jobs, and write to the
-	  stream (issue: locking stream for write?), but only this
-	  thread can ever read from the stream.  This simplifies
-	  locking, and greatly simplifies working out when new
-	  work is ready.
-	*/
-
-	e(printf("imapx server loop started\n"));
-
 	op = camel_operation_new (NULL, NULL);
 	op = camel_operation_register (op);
-	while (TRUE) {
 
+	while (TRUE) {
+		
 		CAMEL_SERVICE_REC_LOCK (is->store, connect_lock);
+		
+		g_static_rec_mutex_lock (&is->ostream_lock);
 		if (!is->stream)
 			imapx_reconnect(is, &ex);
-		CAMEL_SERVICE_REC_UNLOCK (is->store, connect_lock);
+		g_static_rec_mutex_unlock (&is->ostream_lock);
 
-		/* TODO:
-		   This poll stuff wont work - we might block
-		   waiting for results inside loops etc.
-
-		   Requires a different approach:
-		   +
-
-		   New commands are queued in other threads as well
-		   as this thread, and get pipelined over the socket.
-
-		   Main area of locking required is command_queue
-		   and command_start_next, the 'literal' command,
-		   the jobs queue, the active queue, the queue
-		   queue. */
+		CAMEL_SERVICE_REC_UNLOCK (is->store, connect_lock);
 
-		/* if ssl stream ... */
 #ifdef HAVE_SSL
-		if (CAMEL_IS_TCP_STREAM_SSL (is->stream->source))
+		if (is->is_ssl_stream)
 		{
 			PRPollDesc pollfds[2] = { };
 			gint res;
 
+			CAMEL_SERVICE_REC_LOCK (is->store, connect_lock);
+
 			pollfds[0].fd = camel_tcp_stream_ssl_sockfd ((CamelTcpStreamSSL *)is->stream->source);
 			pollfds[0].in_flags = PR_POLL_READ;
 			pollfds[1].fd = camel_operation_cancel_prfd (op);
 			pollfds[1].in_flags = PR_POLL_READ;
+
+			CAMEL_SERVICE_REC_UNLOCK (is->store, connect_lock);
 #include <prio.h>
 
 			res = PR_Poll(pollfds, 2, PR_MillisecondsToInterval (30 * 1000));
@@ -3241,27 +3275,25 @@ imapx_parser_thread (gpointer d)
 			else if (res == 0) {
 				/* timed out */
 			} else if ((pollfds[0].out_flags & PR_POLL_READ)) {
-				do {
-					/* This is quite shitty, it will often block on each
-					   part of the decode, causing significant
-					   processing delays. */
-					imapx_step(is, &ex);
-				} while (camel_imapx_stream_buffered(is->stream) && !camel_exception_is_set (&ex));
-			} else if (pollfds[1].out_flags & PR_POLL_READ) {
+				parse_contents (is, &ex);
+			} else if (pollfds[1].out_flags & PR_POLL_READ)
 				errno = EINTR;
-			}
 		}
 #endif
 
-		if (!CAMEL_IS_TCP_STREAM_SSL (is->stream->source))
+		if (!is->is_ssl_stream)
 		{
 			struct pollfd fds[2] = { {0, 0, 0}, {0, 0, 0} };
 			gint res;
 
+			CAMEL_SERVICE_REC_LOCK (is->store, connect_lock);
+			
 			fds[0].fd = ((CamelTcpStreamRaw *)is->stream->source)->sockfd;
 			fds[0].events = POLLIN;
 			fds[1].fd = camel_operation_cancel_fd (op);
 			fds[1].events = POLLIN;
+			
+			CAMEL_SERVICE_REC_UNLOCK (is->store, connect_lock);
 
 			res = poll(fds, 2, 1000*30);
 			if (res == -1)
@@ -3269,12 +3301,9 @@ imapx_parser_thread (gpointer d)
 			else if (res == 0)
 				/* timed out */;
 			else if (fds[0].revents & POLLIN) {
-				do {
-					imapx_step(is, &ex);
-				} while (camel_imapx_stream_buffered(is->stream) && !camel_exception_is_set (&ex));
-			} else if (fds[1].revents & POLLIN) {
+				parse_contents (is, &ex);
+			} else if (fds[1].revents & POLLIN)
 				errno = EINTR;
-			}
 		}
 
 		if (errno == EINTR)
@@ -3282,11 +3311,7 @@ imapx_parser_thread (gpointer d)
 
 		if (camel_exception_is_set (&ex)) {
 			if (errno == EINTR || !g_ascii_strcasecmp (ex.desc, "io error")) {
-
-				CAMEL_SERVICE_REC_LOCK (is->store, connect_lock);
 				imapx_disconnect (is);
-				CAMEL_SERVICE_REC_UNLOCK (is->store, connect_lock);
-
 				cancel_all_jobs (is, &ex);
 
 				if (imapx_idle_supported (is))
@@ -3294,13 +3319,13 @@ imapx_parser_thread (gpointer d)
 			}
 
 			if (errno == EINTR)
-				return NULL;
+				goto quit;
 
 			camel_exception_clear (&ex);
-			sleep(1);
 		}
 	}
 
+quit:
 	if (op)
 		camel_operation_unref (op);
 
@@ -3327,6 +3352,7 @@ imapx_server_init(CamelIMAPXServer *ie, CamelIMAPXServerClass *ieclass)
 	ie->job_timeout = 29 * 60 * 1000 * 1000;
 
 	ie->queue_lock = g_mutex_new();
+	g_static_rec_mutex_init (&ie->ostream_lock);
 
 	ie->tagprefix = ieclass->tagprefix;
 	ieclass->tagprefix++;
@@ -3344,6 +3370,7 @@ static void
 imapx_server_finalise(CamelIMAPXServer *ie, CamelIMAPXServerClass *ieclass)
 {
 	g_mutex_free(ie->queue_lock);
+	g_static_rec_mutex_free (&ie->ostream_lock);
 
 	camel_folder_change_info_free (ie->changes);
 }
@@ -3387,6 +3414,9 @@ imapx_disconnect (CamelIMAPXServer *is)
 {
 	gboolean ret = TRUE;
 
+	CAMEL_SERVICE_REC_LOCK (is->store, connect_lock);
+	g_static_rec_mutex_lock (&is->ostream_lock);
+
 	if (is->stream) {
 		if (camel_stream_close (is->stream->source) == -1)
 			ret = FALSE;
@@ -3416,6 +3446,9 @@ imapx_disconnect (CamelIMAPXServer *is)
 		camel_imapx_command_free (is->literal);
 		is->literal = NULL;
 	}
+				
+	g_static_rec_mutex_unlock (&is->ostream_lock);
+	CAMEL_SERVICE_REC_UNLOCK (is->store, connect_lock);
 
 	return ret;
 }
@@ -3436,7 +3469,9 @@ camel_imapx_server_connect(CamelIMAPXServer *is, gint state)
 			goto exit;
 		}
 
+		g_static_rec_mutex_lock (&is->ostream_lock);
 		imapx_reconnect (is, &ex);
+		g_static_rec_mutex_unlock (&is->ostream_lock);
 		if (camel_exception_is_set (&ex)) {
 			ret = FALSE;
 			goto exit;
diff --git a/camel/providers/imapx/camel-imapx-server.h b/camel/providers/imapx/camel-imapx-server.h
index bd984e5..585018c 100644
--- a/camel/providers/imapx/camel-imapx-server.h
+++ b/camel/providers/imapx/camel-imapx-server.h
@@ -51,6 +51,7 @@ struct _CamelIMAPXServer {
 	struct _CamelURL *url;
 	struct _CamelIMAPXStream *stream;
 	struct _capability_info *cinfo;
+	gboolean is_ssl_stream;
 
 	CamelIMAPXNamespaceList *nsl;
 
@@ -89,6 +90,7 @@ struct _CamelIMAPXServer {
 	GSList *expunged;
 
 	pthread_t parser_thread_id;
+	GStaticRecMutex ostream_lock;
 
 	/* Idle */
 	struct _CamelIMAPXIdle *idle;



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]