[evolution-data-server] Add locks to input/output streams
- From: Chenthill Palanisamy <pchen src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [evolution-data-server] Add locks to input/output streams
- Date: Wed, 24 Feb 2010 13:53:57 +0000 (UTC)
commit 6b8c7302d50dbb71be2f678b62e6a38e7fadf552
Author: Chenthill Palanisamy <pchenthill novell com>
Date: Tue Feb 23 06:26:31 2010 +0530
Add locks to input/output streams
camel/providers/imapx/camel-imapx-server.c | 187 ++++++++++++++++-----------
camel/providers/imapx/camel-imapx-server.h | 2 +
2 files changed, 113 insertions(+), 76 deletions(-)
---
diff --git a/camel/providers/imapx/camel-imapx-server.c b/camel/providers/imapx/camel-imapx-server.c
index ad50202..3eab8e1 100644
--- a/camel/providers/imapx/camel-imapx-server.c
+++ b/camel/providers/imapx/camel-imapx-server.c
@@ -738,12 +738,8 @@ imapx_command_start (CamelIMAPXServer *imap, CamelIMAPXCommand *ic)
gboolean ret = TRUE;
camel_imapx_command_close(ic);
-
- /* FIXME: assert the selected folder == ic->selected */
-
cp = (CamelIMAPXCommandPart *)ic->parts.head;
g_assert(cp->next);
-
ic->current = cp;
/* TODO: If we support literal+ we should be able to write the whole command out
@@ -754,25 +750,22 @@ imapx_command_start (CamelIMAPXServer *imap, CamelIMAPXCommand *ic)
camel_dlist_addtail(&imap->active, (CamelDListNode *)ic);
+ g_static_rec_mutex_lock (&imap->ostream_lock);
+
c(printf("Staring command (active=%d,%s) %c%05u %s\r\n", camel_dlist_length(&imap->active), imap->literal?" literal":"", imap->tagprefix, ic->tag, cp->data));
if (!imap->stream || camel_stream_printf((CamelStream *)imap->stream, "%c%05u %s\r\n", imap->tagprefix, ic->tag, cp->data) == -1) {
+ g_print ("Command start failed \n");
camel_exception_set (ic->ex, 1, "Command start failed");
ret = FALSE;
- camel_dlist_remove((CamelDListNode *)ic);
+ camel_dlist_remove ((CamelDListNode *)ic);
}
-
+
+ g_static_rec_mutex_unlock (&imap->ostream_lock);
+
return ret;
}
-/* must have QUEUE lock */
-static void
-imapx_command_start_next(CamelIMAPXServer *is, CamelException *ex)
-{
- CamelIMAPXCommand *ic, *nc;
- gint count = 0;
- gint pri = -128;
-
- /* See if we can start another task yet.
+/* See if we can start another task yet.
If we're waiting for a literal, we cannot proceed.
@@ -787,12 +780,17 @@ imapx_command_start_next(CamelIMAPXServer *is, CamelException *ex)
If we dont, select the first folder required,
then queue all the outstanding jobs on it, that
are at least as high priority as the first.
+
+ must have QUEUE lock */
- This is very shitty code!
- */
+static void
+imapx_command_start_next(CamelIMAPXServer *is, CamelException *ex)
+{
+ CamelIMAPXCommand *ic, *nc;
+ gint count = 0;
+ gint pri = -128;
c(printf("** Starting next command\n"));
-
if (is->literal != NULL || is->select_pending != NULL) {
c(printf("* no, waiting for literal/pending select '%s'\n", is->select_pending->full_name));
return;
@@ -1138,7 +1136,7 @@ imapx_untagged(CamelIMAPXServer *imap, CamelException *ex)
if (finfo->got & FETCH_FLAGS && !(finfo->got & FETCH_UID)) {
if (imap->select_folder) {
CamelFolder *folder;
- CamelMessageInfo *mi;
+ CamelMessageInfo *mi = NULL;
gboolean changed = FALSE;
gchar *uid = NULL;
@@ -1366,12 +1364,16 @@ imapx_continuation(CamelIMAPXServer *imap, CamelException *ex)
printf("got continuation response\n");
+ CAMEL_SERVICE_REC_LOCK (imap->store, connect_lock);
/* The 'literal' pointer is like a write-lock, nothing else
can write while we have it ... so we dont need any
ohter lock here. All other writes go through
queue-lock */
if (imapx_idle_supported (imap) && imapx_in_idle (imap)) {
camel_imapx_stream_skip (imap->stream, ex);
+
+ CAMEL_SERVICE_REC_UNLOCK (imap->store, connect_lock);
+
printf("Got continuation response for IDLE \n");
imap->idle->started = TRUE;
@@ -1386,6 +1388,7 @@ imapx_continuation(CamelIMAPXServer *imap, CamelException *ex)
ic = imap->literal;
if (ic == NULL) {
camel_imapx_stream_skip(imap->stream, ex);
+ CAMEL_SERVICE_REC_UNLOCK (imap->store, connect_lock);
printf("got continuation response with no outstanding continuation requests?\n");
return 1;
}
@@ -1441,6 +1444,7 @@ imapx_continuation(CamelIMAPXServer *imap, CamelException *ex)
/* should we just ignore? */
imap->literal = NULL;
camel_exception_set (ex, 1, "continuation response for non-continuation request");
+ CAMEL_SERVICE_REC_UNLOCK (imap->store, connect_lock);
return -1;
}
@@ -1460,6 +1464,8 @@ imapx_continuation(CamelIMAPXServer *imap, CamelException *ex)
printf("%p: queueing continuation\n", ic);
camel_stream_printf((CamelStream *)imap->stream, "\r\n");
}
+
+ CAMEL_SERVICE_REC_UNLOCK (imap->store, connect_lock);
QUEUE_LOCK(imap);
imap->literal = newliteral;
@@ -1531,7 +1537,10 @@ imapx_completion(CamelIMAPXServer *imap, guchar *token, gint len, CamelException
camel_dlist_remove ((CamelDListNode *) ic);
QUEUE_UNLOCK(imap);
+
+ CAMEL_SERVICE_REC_LOCK (imap->store, connect_lock);
ic->status = imapx_parse_status(imap->stream, ex);
+ CAMEL_SERVICE_REC_UNLOCK (imap->store, connect_lock);
if (ic->complete)
ic->complete (imap, ic);
@@ -1551,7 +1560,9 @@ imapx_step(CamelIMAPXServer *is, CamelException *ex)
gint tok;
// poll ? wait for other stuff? loop?
+ CAMEL_SERVICE_REC_LOCK (is->store, connect_lock);
tok = camel_imapx_stream_token (is->stream, &token, &len, ex);
+ CAMEL_SERVICE_REC_UNLOCK (is->store, connect_lock);
if (camel_exception_is_set (ex))
return;
@@ -1990,11 +2001,14 @@ imapx_connect (CamelIMAPXServer *is, gint ssl_mode, gint try_starttls, CamelExce
}
tcp_stream = camel_tcp_stream_ssl_new(is->session, is->url->host, SSL_PORT_FLAGS);
}
+ is->is_ssl_stream = TRUE;
} else {
tcp_stream = camel_tcp_stream_raw_new ();
+ is->is_ssl_stream = FALSE;
}
#else
tcp_stream = camel_tcp_stream_raw_new ();
+ is->is_ssl_stream = FALSE;
#endif /* HAVE_SSL */
hints.ai_socktype = SOCK_STREAM;
@@ -2428,7 +2442,6 @@ imapx_command_append_message_done (CamelIMAPXServer *is, CamelIMAPXCommand *ic)
changes);
camel_folder_change_info_free (changes);
- camel_message_info_free(mi);
g_free(cur);
} else {
printf("but uidvalidity changed, uh ...\n");
@@ -3173,6 +3186,52 @@ cancel_all_jobs (CamelIMAPXServer *is, CamelException *ex)
/* ********************************************************************** */
+static void
+parse_contents (CamelIMAPXServer *is, CamelException *ex)
+{
+ gint buffered = 0;
+
+ do {
+ imapx_step(is, ex);
+
+ CAMEL_SERVICE_REC_LOCK (is->store, connect_lock);
+
+ buffered = camel_imapx_stream_buffered (is->stream);
+
+ CAMEL_SERVICE_REC_UNLOCK (is->store, connect_lock);
+
+ } while (buffered && !camel_exception_is_set (ex));
+}
+
+/*
+ The main processing (reading) loop.
+
+ Incoming requests are added as jobs and tasks from other threads,
+ we just read the results from the server continously, and match
+ them up with the queued tasks as they come back.
+
+ Of course this loop can also initiate its own commands as well.
+
+ So, multiple threads can submit jobs, and write to the
+ stream (issue: locking stream for write?), but only this
+ thread can ever read from the stream. This simplifies
+ locking, and greatly simplifies working out when new
+ work is ready.
+
+ TODO:
+ This poll stuff wont work - we might block
+ waiting for results inside loops etc.
+
+ Requires a different approach:
+ +
+
+ New commands are queued in other threads as well
+ as this thread, and get pipelined over the socket.
+
+ Main area of locking required is command_queue
+ and command_start_next, the 'literal' command,
+ the jobs queue, the active queue, the queue
+ queue. */
static gpointer
imapx_parser_thread (gpointer d)
{
@@ -3180,59 +3239,34 @@ imapx_parser_thread (gpointer d)
CamelException ex = CAMEL_EXCEPTION_INITIALISER;
CamelOperation *op;
- /*
- The main processing (reading) loop.
-
- Incoming requests are added as jobs and tasks from other threads,
- we just read the results from the server continously, and match
- them up with the queued tasks as they come back.
-
- Of course this loop can also initiate its own commands as well.
-
- So, multiple threads can submit jobs, and write to the
- stream (issue: locking stream for write?), but only this
- thread can ever read from the stream. This simplifies
- locking, and greatly simplifies working out when new
- work is ready.
- */
-
- e(printf("imapx server loop started\n"));
-
op = camel_operation_new (NULL, NULL);
op = camel_operation_register (op);
- while (TRUE) {
+ while (TRUE) {
+
CAMEL_SERVICE_REC_LOCK (is->store, connect_lock);
+
+ g_static_rec_mutex_lock (&is->ostream_lock);
if (!is->stream)
imapx_reconnect(is, &ex);
- CAMEL_SERVICE_REC_UNLOCK (is->store, connect_lock);
+ g_static_rec_mutex_unlock (&is->ostream_lock);
- /* TODO:
- This poll stuff wont work - we might block
- waiting for results inside loops etc.
-
- Requires a different approach:
- +
-
- New commands are queued in other threads as well
- as this thread, and get pipelined over the socket.
-
- Main area of locking required is command_queue
- and command_start_next, the 'literal' command,
- the jobs queue, the active queue, the queue
- queue. */
+ CAMEL_SERVICE_REC_UNLOCK (is->store, connect_lock);
- /* if ssl stream ... */
#ifdef HAVE_SSL
- if (CAMEL_IS_TCP_STREAM_SSL (is->stream->source))
+ if (is->is_ssl_stream)
{
PRPollDesc pollfds[2] = { };
gint res;
+ CAMEL_SERVICE_REC_LOCK (is->store, connect_lock);
+
pollfds[0].fd = camel_tcp_stream_ssl_sockfd ((CamelTcpStreamSSL *)is->stream->source);
pollfds[0].in_flags = PR_POLL_READ;
pollfds[1].fd = camel_operation_cancel_prfd (op);
pollfds[1].in_flags = PR_POLL_READ;
+
+ CAMEL_SERVICE_REC_UNLOCK (is->store, connect_lock);
#include <prio.h>
res = PR_Poll(pollfds, 2, PR_MillisecondsToInterval (30 * 1000));
@@ -3241,27 +3275,25 @@ imapx_parser_thread (gpointer d)
else if (res == 0) {
/* timed out */
} else if ((pollfds[0].out_flags & PR_POLL_READ)) {
- do {
- /* This is quite shitty, it will often block on each
- part of the decode, causing significant
- processing delays. */
- imapx_step(is, &ex);
- } while (camel_imapx_stream_buffered(is->stream) && !camel_exception_is_set (&ex));
- } else if (pollfds[1].out_flags & PR_POLL_READ) {
+ parse_contents (is, &ex);
+ } else if (pollfds[1].out_flags & PR_POLL_READ)
errno = EINTR;
- }
}
#endif
- if (!CAMEL_IS_TCP_STREAM_SSL (is->stream->source))
+ if (!is->is_ssl_stream)
{
struct pollfd fds[2] = { {0, 0, 0}, {0, 0, 0} };
gint res;
+ CAMEL_SERVICE_REC_LOCK (is->store, connect_lock);
+
fds[0].fd = ((CamelTcpStreamRaw *)is->stream->source)->sockfd;
fds[0].events = POLLIN;
fds[1].fd = camel_operation_cancel_fd (op);
fds[1].events = POLLIN;
+
+ CAMEL_SERVICE_REC_UNLOCK (is->store, connect_lock);
res = poll(fds, 2, 1000*30);
if (res == -1)
@@ -3269,12 +3301,9 @@ imapx_parser_thread (gpointer d)
else if (res == 0)
/* timed out */;
else if (fds[0].revents & POLLIN) {
- do {
- imapx_step(is, &ex);
- } while (camel_imapx_stream_buffered(is->stream) && !camel_exception_is_set (&ex));
- } else if (fds[1].revents & POLLIN) {
+ parse_contents (is, &ex);
+ } else if (fds[1].revents & POLLIN)
errno = EINTR;
- }
}
if (errno == EINTR)
@@ -3282,11 +3311,7 @@ imapx_parser_thread (gpointer d)
if (camel_exception_is_set (&ex)) {
if (errno == EINTR || !g_ascii_strcasecmp (ex.desc, "io error")) {
-
- CAMEL_SERVICE_REC_LOCK (is->store, connect_lock);
imapx_disconnect (is);
- CAMEL_SERVICE_REC_UNLOCK (is->store, connect_lock);
-
cancel_all_jobs (is, &ex);
if (imapx_idle_supported (is))
@@ -3294,13 +3319,13 @@ imapx_parser_thread (gpointer d)
}
if (errno == EINTR)
- return NULL;
+ goto quit;
camel_exception_clear (&ex);
- sleep(1);
}
}
+quit:
if (op)
camel_operation_unref (op);
@@ -3327,6 +3352,7 @@ imapx_server_init(CamelIMAPXServer *ie, CamelIMAPXServerClass *ieclass)
ie->job_timeout = 29 * 60 * 1000 * 1000;
ie->queue_lock = g_mutex_new();
+ g_static_rec_mutex_init (&ie->ostream_lock);
ie->tagprefix = ieclass->tagprefix;
ieclass->tagprefix++;
@@ -3344,6 +3370,7 @@ static void
imapx_server_finalise(CamelIMAPXServer *ie, CamelIMAPXServerClass *ieclass)
{
g_mutex_free(ie->queue_lock);
+ g_static_rec_mutex_free (&ie->ostream_lock);
camel_folder_change_info_free (ie->changes);
}
@@ -3387,6 +3414,9 @@ imapx_disconnect (CamelIMAPXServer *is)
{
gboolean ret = TRUE;
+ CAMEL_SERVICE_REC_LOCK (is->store, connect_lock);
+ g_static_rec_mutex_lock (&is->ostream_lock);
+
if (is->stream) {
if (camel_stream_close (is->stream->source) == -1)
ret = FALSE;
@@ -3416,6 +3446,9 @@ imapx_disconnect (CamelIMAPXServer *is)
camel_imapx_command_free (is->literal);
is->literal = NULL;
}
+
+ g_static_rec_mutex_unlock (&is->ostream_lock);
+ CAMEL_SERVICE_REC_UNLOCK (is->store, connect_lock);
return ret;
}
@@ -3436,7 +3469,9 @@ camel_imapx_server_connect(CamelIMAPXServer *is, gint state)
goto exit;
}
+ g_static_rec_mutex_lock (&is->ostream_lock);
imapx_reconnect (is, &ex);
+ g_static_rec_mutex_unlock (&is->ostream_lock);
if (camel_exception_is_set (&ex)) {
ret = FALSE;
goto exit;
diff --git a/camel/providers/imapx/camel-imapx-server.h b/camel/providers/imapx/camel-imapx-server.h
index bd984e5..585018c 100644
--- a/camel/providers/imapx/camel-imapx-server.h
+++ b/camel/providers/imapx/camel-imapx-server.h
@@ -51,6 +51,7 @@ struct _CamelIMAPXServer {
struct _CamelURL *url;
struct _CamelIMAPXStream *stream;
struct _capability_info *cinfo;
+ gboolean is_ssl_stream;
CamelIMAPXNamespaceList *nsl;
@@ -89,6 +90,7 @@ struct _CamelIMAPXServer {
GSList *expunged;
pthread_t parser_thread_id;
+ GStaticRecMutex ostream_lock;
/* Idle */
struct _CamelIMAPXIdle *idle;
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]