[evolution-data-server] CamelTcpStreamRaw: Use PR_Interrupt() to cancel blocked I/O.



commit 46c6be4af5da9d10884b92f4bb61960fa7fd23d0
Author: Matthew Barnes <mbarnes redhat com>
Date:   Fri Nov 4 17:47:53 2011 -0400

    CamelTcpStreamRaw: Use PR_Interrupt() to cancel blocked I/O.
    
    Call PR_Interrupt() from a GCancellable::cancelled signal handler to
    cancel blocking PR_Connect(), PR_Read() and PR_Write() calls.
    
    Way easier and more responsive than using camel_operation_cancel_prfd().

 camel/camel-tcp-stream-raw.c |  317 +++++++++++++-----------------------------
 1 files changed, 95 insertions(+), 222 deletions(-)
---
diff --git a/camel/camel-tcp-stream-raw.c b/camel/camel-tcp-stream-raw.c
index 4fd0161..8630070 100644
--- a/camel/camel-tcp-stream-raw.c
+++ b/camel/camel-tcp-stream-raw.c
@@ -36,6 +36,7 @@
 #include <prio.h>
 #include <prerror.h>
 #include <prerr.h>
+#include <prthread.h>
 
 #include <glib/gi18n-lib.h>
 
@@ -167,6 +168,13 @@ flaky_tcp_read (gint fd,
 #endif /* SIMULATE_FLAKY_NETWORK */
 
 static void
+tcp_stream_cancelled (GCancellable *cancellable,
+                      PRThread *thread)
+{
+	PR_Interrupt (thread);
+}
+
+static void
 tcp_stream_raw_finalize (GObject *object)
 {
 	CamelTcpStreamRaw *stream = CAMEL_TCP_STREAM_RAW (object);
@@ -265,6 +273,28 @@ _set_g_error_from_errno (GError **error,
 			"%s", g_strerror (errn));
 }
 
+static void
+tcp_stream_set_error_from_pr_error (GError **error)
+{
+	gchar *error_message;
+	PRInt32 length;
+
+	length = PR_GetErrorTextLength ();
+	g_return_if_fail (length > 0);
+
+	error_message = g_malloc0 (length + 1);
+	PR_GetErrorText (error_message);
+
+	_set_errno_from_pr_error (PR_GetError ());
+
+	g_set_error_literal (
+		error, G_IO_ERROR,
+		g_io_error_from_errno (errno),
+		error_message);
+
+	g_free (error_message);
+}
+
 static gssize
 read_from_prfd (PRFileDesc *fd,
                 gchar *buffer,
@@ -272,90 +302,30 @@ read_from_prfd (PRFileDesc *fd,
                 GCancellable *cancellable,
                 GError **error)
 {
-	PRFileDesc *cancel_fd = NULL;
-	gssize nread;
-
-	if (g_cancellable_is_cancelled (cancellable)) {
-		errno = EINTR;
-		_set_g_error_from_errno (error, TRUE);
-		return -1;
-	}
+	gssize bytes_read;
+	gulong cancel_id = 0;
 
-	if (CAMEL_IS_OPERATION (cancellable))
-		cancel_fd = camel_operation_cancel_prfd (
-			CAMEL_OPERATION (cancellable));
+	if (G_IS_CANCELLABLE (cancellable))
+		cancel_id = g_cancellable_connect (
+			cancellable, G_CALLBACK (tcp_stream_cancelled),
+			PR_GetCurrentThread (), (GDestroyNotify) NULL);
 
-	if (cancel_fd == NULL) {
-		do {
-			nread = PR_Read (fd, buffer, n);
-			if (nread == -1)
-				_set_errno_from_pr_error (PR_GetError ());
-		} while (nread == -1 && (PR_GetError () == PR_PENDING_INTERRUPT_ERROR ||
-					 PR_GetError () == PR_IO_PENDING_ERROR ||
-					 PR_GetError () == PR_WOULD_BLOCK_ERROR));
-	} else {
-		PRSocketOptionData sockopts;
-		PRPollDesc pollfds[2];
-		gboolean nonblock;
-		gint saved_errno;
-
-		/* get O_NONBLOCK options */
-		sockopts.option = PR_SockOpt_Nonblocking;
-		PR_GetSocketOption (fd, &sockopts);
-		sockopts.option = PR_SockOpt_Nonblocking;
-		nonblock = sockopts.value.non_blocking;
-		sockopts.value.non_blocking = TRUE;
-		PR_SetSocketOption (fd, &sockopts);
-
-		pollfds[0].fd = fd;
-		pollfds[0].in_flags = PR_POLL_READ;
-		pollfds[1].fd = cancel_fd;
-		pollfds[1].in_flags = PR_POLL_READ;
+	do {
+		bytes_read = PR_Read (fd, buffer, n);
+	} while (bytes_read == -1 && PR_GetError () == PR_IO_PENDING_ERROR);
 
-		do {
-			PRInt32 res;
+	if (cancel_id > 0)
+		g_cancellable_disconnect (cancellable, cancel_id);
 
-			pollfds[0].out_flags = 0;
-			pollfds[1].out_flags = 0;
-			nread = -1;
+	if (g_cancellable_set_error_if_cancelled (cancellable, error))
+		return -1;
 
-			res = PR_Poll (pollfds, 2, IO_TIMEOUT);
-			if (res == -1)
-				_set_errno_from_pr_error (PR_GetError ());
-			else if (res == 0) {
-#ifdef ETIMEDOUT
-				errno = ETIMEDOUT;
-#else
-				errno = EIO;
-#endif
-				goto failed;
-			} else if (pollfds[1].out_flags == PR_POLL_READ) {
-				errno = EINTR;
-				goto failed;
-			} else {
-				do {
-					nread = PR_Read (fd, buffer, n);
-					if (nread == -1)
-						_set_errno_from_pr_error (PR_GetError ());
-				} while (nread == -1 && PR_GetError () == PR_PENDING_INTERRUPT_ERROR);
-			}
-		} while (nread == -1 && (PR_GetError () == PR_PENDING_INTERRUPT_ERROR ||
-					 PR_GetError () == PR_IO_PENDING_ERROR ||
-					 PR_GetError () == PR_WOULD_BLOCK_ERROR));
-
-		/* restore O_NONBLOCK options */
-	failed:
-		saved_errno = errno;
-		sockopts.option = PR_SockOpt_Nonblocking;
-		sockopts.value.non_blocking = nonblock;
-		PR_SetSocketOption (fd, &sockopts);
-		errno = saved_errno;
+	if (bytes_read == -1) {
+		tcp_stream_set_error_from_pr_error (error);
+		return -1;
 	}
 
-	if (nread == -1)
-		_set_g_error_from_errno (error, TRUE);
-
-	return nread;
+	return bytes_read;
 }
 
 static gssize
@@ -374,105 +344,42 @@ tcp_stream_raw_read (CamelStream *stream,
 static gssize
 write_to_prfd (PRFileDesc *fd,
                const gchar *buffer,
-               gsize n,
+               gsize size,
                GCancellable *cancellable,
                GError **error)
 {
-	PRFileDesc *cancel_fd = NULL;
-	gssize w, written = 0;
+	gssize bytes_written;
+	gssize total_written = 0;
+	gulong cancel_id = 0;
 
-	if (g_cancellable_is_cancelled (cancellable)) {
-		errno = EINTR;
-		_set_g_error_from_errno (error, TRUE);
-		return -1;
-	}
-
-	if (CAMEL_IS_OPERATION (cancellable))
-		cancel_fd = camel_operation_cancel_prfd (
-			CAMEL_OPERATION (cancellable));
+	if (G_IS_CANCELLABLE (cancellable))
+		cancel_id = g_cancellable_connect (
+			cancellable, G_CALLBACK (tcp_stream_cancelled),
+			PR_GetCurrentThread (), (GDestroyNotify) NULL);
 
-	if (cancel_fd == NULL) {
+	do {
 		do {
-			do {
-				w = PR_Write (fd, buffer + written, n - written);
-				if (w == -1)
-					_set_errno_from_pr_error (PR_GetError ());
-			} while (w == -1 && (PR_GetError () == PR_PENDING_INTERRUPT_ERROR ||
-					     PR_GetError () == PR_IO_PENDING_ERROR ||
-					     PR_GetError () == PR_WOULD_BLOCK_ERROR));
-
-			if (w > 0)
-				written += w;
-		} while (w != -1 && written < n);
-	} else {
-		PRSocketOptionData sockopts;
-		PRPollDesc pollfds[2];
-		gboolean nonblock;
-		gint saved_errno;
-
-		/* get O_NONBLOCK options */
-		sockopts.option = PR_SockOpt_Nonblocking;
-		PR_GetSocketOption (fd, &sockopts);
-		sockopts.option = PR_SockOpt_Nonblocking;
-		nonblock = sockopts.value.non_blocking;
-		sockopts.value.non_blocking = TRUE;
-		PR_SetSocketOption (fd, &sockopts);
-
-		pollfds[0].fd = fd;
-		pollfds[0].in_flags = PR_POLL_WRITE;
-		pollfds[1].fd = cancel_fd;
-		pollfds[1].in_flags = PR_POLL_READ;
+			bytes_written = PR_Write (
+				fd, buffer + total_written,
+				size - total_written);
+		} while (bytes_written == -1 && PR_GetError () == PR_IO_PENDING_ERROR);
 
-		do {
-			PRInt32 res;
-
-			pollfds[0].out_flags = 0;
-			pollfds[1].out_flags = 0;
-			w = -1;
-
-			res = PR_Poll (pollfds, 2, IO_TIMEOUT);
-			if (res == -1) {
-				_set_errno_from_pr_error (PR_GetError ());
-				if (PR_GetError () == PR_PENDING_INTERRUPT_ERROR)
-					w = 0;
-			} else if (res == 0) {
-#ifdef ETIMEDOUT
-				errno = ETIMEDOUT;
-#else
-				errno = EIO;
-#endif
-			} else if (pollfds[1].out_flags == PR_POLL_READ) {
-				errno = EINTR;
-			} else {
-				do {
-					w = PR_Write (fd, buffer + written, n - written);
-					if (w == -1)
-						_set_errno_from_pr_error (PR_GetError ());
-				} while (w == -1 && PR_GetError () == PR_PENDING_INTERRUPT_ERROR);
-
-				if (w == -1) {
-					if (PR_GetError () == PR_IO_PENDING_ERROR ||
-					    PR_GetError () == PR_WOULD_BLOCK_ERROR)
-						w = 0;
-				} else
-					written += w;
-			}
-		} while (w != -1 && written < n);
-
-		/* restore O_NONBLOCK options */
-		saved_errno = errno;
-		sockopts.option = PR_SockOpt_Nonblocking;
-		sockopts.value.non_blocking = nonblock;
-		PR_SetSocketOption (fd, &sockopts);
-		errno = saved_errno;
-	}
+		if (bytes_written > 0)
+			total_written += bytes_written;
+	} while (bytes_written != -1 && total_written < size);
+
+	if (cancel_id > 0)
+		g_cancellable_disconnect (cancellable, cancel_id);
 
-	if (w == -1) {
-		_set_g_error_from_errno (error, TRUE);
-		written = -1;
+	if (g_cancellable_set_error_if_cancelled (cancellable, error))
+		return -1;
+
+	if (bytes_written == -1) {
+		tcp_stream_set_error_from_pr_error (error);
+		return -1;
 	}
 
-	return written;
+	return total_written;
 }
 
 static gssize
@@ -578,7 +485,9 @@ socket_connect (struct addrinfo *host,
                 GError **error)
 {
 	PRNetAddr netaddr;
-	PRFileDesc *fd, *cancel_fd = NULL;
+	PRFileDesc *fd;
+	PRStatus status;
+	gulong cancel_id = 0;
 
 	if (sockaddr_to_praddr (host->ai_addr, host->ai_addrlen, &netaddr) != 0) {
 		errno = EINVAL;
@@ -593,67 +502,31 @@ socket_connect (struct addrinfo *host,
 		return NULL;
 	}
 
-	if (CAMEL_IS_OPERATION (cancellable))
-		cancel_fd = camel_operation_cancel_prfd (
-			CAMEL_OPERATION (cancellable));
+	if (G_IS_CANCELLABLE (cancellable))
+		cancel_id = g_cancellable_connect (
+			cancellable, G_CALLBACK (tcp_stream_cancelled),
+			PR_GetCurrentThread (), (GDestroyNotify) NULL);
 
-	if (PR_Connect (fd, &netaddr, cancel_fd ? 0 : CONNECT_TIMEOUT) == PR_FAILURE) {
-		gint errnosave;
+	status = PR_Connect (fd, &netaddr, PR_INTERVAL_NO_TIMEOUT);
 
-		_set_errno_from_pr_error (PR_GetError ());
-		if (PR_GetError () == PR_IN_PROGRESS_ERROR ||
-		    (cancel_fd && (PR_GetError () == PR_CONNECT_TIMEOUT_ERROR ||
-				   PR_GetError () == PR_IO_TIMEOUT_ERROR))) {
-			gboolean connected = FALSE;
-			PRPollDesc poll[2];
-
-			poll[0].fd = fd;
-			poll[0].in_flags = PR_POLL_WRITE | PR_POLL_EXCEPT;
-			poll[1].fd = cancel_fd;
-			poll[1].in_flags = PR_POLL_READ;
-
-			do {
-				poll[0].out_flags = 0;
-				poll[1].out_flags = 0;
-
-				if (PR_Poll (poll, cancel_fd ? 2 : 1, CONNECT_TIMEOUT) == PR_FAILURE) {
-					_set_errno_from_pr_error (PR_GetError ());
-					goto exception;
-				}
-
-				if (poll[1].out_flags == PR_POLL_READ) {
-					errno = EINTR;
-					goto exception;
-				}
-
-				if (PR_ConnectContinue (fd, poll[0].out_flags) == PR_FAILURE) {
-					_set_errno_from_pr_error (PR_GetError ());
-					if (PR_GetError () != PR_IN_PROGRESS_ERROR)
-						goto exception;
-				} else {
-					connected = TRUE;
-				}
-			} while (!connected);
-		} else {
-		exception:
-			errnosave = errno;
-			PR_Shutdown (fd, PR_SHUTDOWN_BOTH);
-			PR_Close (fd);
-			errno = errnosave;
-			fd = NULL;
+	if (cancel_id > 0)
+		g_cancellable_disconnect (cancellable, cancel_id);
 
-			goto out;
-		}
+	if (g_cancellable_set_error_if_cancelled (cancellable, error))
+		goto fail;
 
-		errno = 0;
+	if (status == PR_FAILURE) {
+		tcp_stream_set_error_from_pr_error (error);
+		goto fail;
 	}
 
-out:
+	return fd;
 
-	if (!fd)
-		_set_g_error_from_errno (error, TRUE);
+fail:
+	PR_Shutdown (fd, PR_SHUTDOWN_BOTH);
+	PR_Close (fd);
 
-	return fd;
+	return NULL;
 }
 
 /* Just opens a TCP socket to a (presumed) SOCKS proxy.  Does not actually



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]