[evolution-data-server] CamelTcpStreamRaw: Use PR_Interrupt() to cancel blocked I/O.
- From: Matthew Barnes <mbarnes src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [evolution-data-server] CamelTcpStreamRaw: Use PR_Interrupt() to cancel blocked I/O.
- Date: Mon, 7 Nov 2011 01:10:49 +0000 (UTC)
commit 46c6be4af5da9d10884b92f4bb61960fa7fd23d0
Author: Matthew Barnes <mbarnes redhat com>
Date: Fri Nov 4 17:47:53 2011 -0400
CamelTcpStreamRaw: Use PR_Interrupt() to cancel blocked I/O.
Call PR_Interrupt() from a GCancellable::cancelled signal handler to
cancel blocking PR_Connect(), PR_Read() and PR_Write() calls.
Way easier and more responsive than using camel_operation_cancel_prfd().
camel/camel-tcp-stream-raw.c | 317 +++++++++++++-----------------------------
1 files changed, 95 insertions(+), 222 deletions(-)
---
diff --git a/camel/camel-tcp-stream-raw.c b/camel/camel-tcp-stream-raw.c
index 4fd0161..8630070 100644
--- a/camel/camel-tcp-stream-raw.c
+++ b/camel/camel-tcp-stream-raw.c
@@ -36,6 +36,7 @@
#include <prio.h>
#include <prerror.h>
#include <prerr.h>
+#include <prthread.h>
#include <glib/gi18n-lib.h>
@@ -167,6 +168,13 @@ flaky_tcp_read (gint fd,
#endif /* SIMULATE_FLAKY_NETWORK */
static void
+tcp_stream_cancelled (GCancellable *cancellable,
+ PRThread *thread)
+{
+ PR_Interrupt (thread);
+}
+
+static void
tcp_stream_raw_finalize (GObject *object)
{
CamelTcpStreamRaw *stream = CAMEL_TCP_STREAM_RAW (object);
@@ -265,6 +273,28 @@ _set_g_error_from_errno (GError **error,
"%s", g_strerror (errn));
}
+static void
+tcp_stream_set_error_from_pr_error (GError **error)
+{
+ gchar *error_message;
+ PRInt32 length;
+
+ length = PR_GetErrorTextLength ();
+ g_return_if_fail (length > 0);
+
+ error_message = g_malloc0 (length + 1);
+ PR_GetErrorText (error_message);
+
+ _set_errno_from_pr_error (PR_GetError ());
+
+ g_set_error_literal (
+ error, G_IO_ERROR,
+ g_io_error_from_errno (errno),
+ error_message);
+
+ g_free (error_message);
+}
+
static gssize
read_from_prfd (PRFileDesc *fd,
gchar *buffer,
@@ -272,90 +302,30 @@ read_from_prfd (PRFileDesc *fd,
GCancellable *cancellable,
GError **error)
{
- PRFileDesc *cancel_fd = NULL;
- gssize nread;
-
- if (g_cancellable_is_cancelled (cancellable)) {
- errno = EINTR;
- _set_g_error_from_errno (error, TRUE);
- return -1;
- }
+ gssize bytes_read;
+ gulong cancel_id = 0;
- if (CAMEL_IS_OPERATION (cancellable))
- cancel_fd = camel_operation_cancel_prfd (
- CAMEL_OPERATION (cancellable));
+ if (G_IS_CANCELLABLE (cancellable))
+ cancel_id = g_cancellable_connect (
+ cancellable, G_CALLBACK (tcp_stream_cancelled),
+ PR_GetCurrentThread (), (GDestroyNotify) NULL);
- if (cancel_fd == NULL) {
- do {
- nread = PR_Read (fd, buffer, n);
- if (nread == -1)
- _set_errno_from_pr_error (PR_GetError ());
- } while (nread == -1 && (PR_GetError () == PR_PENDING_INTERRUPT_ERROR ||
- PR_GetError () == PR_IO_PENDING_ERROR ||
- PR_GetError () == PR_WOULD_BLOCK_ERROR));
- } else {
- PRSocketOptionData sockopts;
- PRPollDesc pollfds[2];
- gboolean nonblock;
- gint saved_errno;
-
- /* get O_NONBLOCK options */
- sockopts.option = PR_SockOpt_Nonblocking;
- PR_GetSocketOption (fd, &sockopts);
- sockopts.option = PR_SockOpt_Nonblocking;
- nonblock = sockopts.value.non_blocking;
- sockopts.value.non_blocking = TRUE;
- PR_SetSocketOption (fd, &sockopts);
-
- pollfds[0].fd = fd;
- pollfds[0].in_flags = PR_POLL_READ;
- pollfds[1].fd = cancel_fd;
- pollfds[1].in_flags = PR_POLL_READ;
+ do {
+ bytes_read = PR_Read (fd, buffer, n);
+ } while (bytes_read == -1 && PR_GetError () == PR_IO_PENDING_ERROR);
- do {
- PRInt32 res;
+ if (cancel_id > 0)
+ g_cancellable_disconnect (cancellable, cancel_id);
- pollfds[0].out_flags = 0;
- pollfds[1].out_flags = 0;
- nread = -1;
+ if (g_cancellable_set_error_if_cancelled (cancellable, error))
+ return -1;
- res = PR_Poll (pollfds, 2, IO_TIMEOUT);
- if (res == -1)
- _set_errno_from_pr_error (PR_GetError ());
- else if (res == 0) {
-#ifdef ETIMEDOUT
- errno = ETIMEDOUT;
-#else
- errno = EIO;
-#endif
- goto failed;
- } else if (pollfds[1].out_flags == PR_POLL_READ) {
- errno = EINTR;
- goto failed;
- } else {
- do {
- nread = PR_Read (fd, buffer, n);
- if (nread == -1)
- _set_errno_from_pr_error (PR_GetError ());
- } while (nread == -1 && PR_GetError () == PR_PENDING_INTERRUPT_ERROR);
- }
- } while (nread == -1 && (PR_GetError () == PR_PENDING_INTERRUPT_ERROR ||
- PR_GetError () == PR_IO_PENDING_ERROR ||
- PR_GetError () == PR_WOULD_BLOCK_ERROR));
-
- /* restore O_NONBLOCK options */
- failed:
- saved_errno = errno;
- sockopts.option = PR_SockOpt_Nonblocking;
- sockopts.value.non_blocking = nonblock;
- PR_SetSocketOption (fd, &sockopts);
- errno = saved_errno;
+ if (bytes_read == -1) {
+ tcp_stream_set_error_from_pr_error (error);
+ return -1;
}
- if (nread == -1)
- _set_g_error_from_errno (error, TRUE);
-
- return nread;
+ return bytes_read;
}
static gssize
@@ -374,105 +344,42 @@ tcp_stream_raw_read (CamelStream *stream,
static gssize
write_to_prfd (PRFileDesc *fd,
const gchar *buffer,
- gsize n,
+ gsize size,
GCancellable *cancellable,
GError **error)
{
- PRFileDesc *cancel_fd = NULL;
- gssize w, written = 0;
+ gssize bytes_written;
+ gssize total_written = 0;
+ gulong cancel_id = 0;
- if (g_cancellable_is_cancelled (cancellable)) {
- errno = EINTR;
- _set_g_error_from_errno (error, TRUE);
- return -1;
- }
-
- if (CAMEL_IS_OPERATION (cancellable))
- cancel_fd = camel_operation_cancel_prfd (
- CAMEL_OPERATION (cancellable));
+ if (G_IS_CANCELLABLE (cancellable))
+ cancel_id = g_cancellable_connect (
+ cancellable, G_CALLBACK (tcp_stream_cancelled),
+ PR_GetCurrentThread (), (GDestroyNotify) NULL);
- if (cancel_fd == NULL) {
+ do {
do {
- do {
- w = PR_Write (fd, buffer + written, n - written);
- if (w == -1)
- _set_errno_from_pr_error (PR_GetError ());
- } while (w == -1 && (PR_GetError () == PR_PENDING_INTERRUPT_ERROR ||
- PR_GetError () == PR_IO_PENDING_ERROR ||
- PR_GetError () == PR_WOULD_BLOCK_ERROR));
-
- if (w > 0)
- written += w;
- } while (w != -1 && written < n);
- } else {
- PRSocketOptionData sockopts;
- PRPollDesc pollfds[2];
- gboolean nonblock;
- gint saved_errno;
-
- /* get O_NONBLOCK options */
- sockopts.option = PR_SockOpt_Nonblocking;
- PR_GetSocketOption (fd, &sockopts);
- sockopts.option = PR_SockOpt_Nonblocking;
- nonblock = sockopts.value.non_blocking;
- sockopts.value.non_blocking = TRUE;
- PR_SetSocketOption (fd, &sockopts);
-
- pollfds[0].fd = fd;
- pollfds[0].in_flags = PR_POLL_WRITE;
- pollfds[1].fd = cancel_fd;
- pollfds[1].in_flags = PR_POLL_READ;
+ bytes_written = PR_Write (
+ fd, buffer + total_written,
+ size - total_written);
+ } while (bytes_written == -1 && PR_GetError () == PR_IO_PENDING_ERROR);
- do {
- PRInt32 res;
-
- pollfds[0].out_flags = 0;
- pollfds[1].out_flags = 0;
- w = -1;
-
- res = PR_Poll (pollfds, 2, IO_TIMEOUT);
- if (res == -1) {
- _set_errno_from_pr_error (PR_GetError ());
- if (PR_GetError () == PR_PENDING_INTERRUPT_ERROR)
- w = 0;
- } else if (res == 0) {
-#ifdef ETIMEDOUT
- errno = ETIMEDOUT;
-#else
- errno = EIO;
-#endif
- } else if (pollfds[1].out_flags == PR_POLL_READ) {
- errno = EINTR;
- } else {
- do {
- w = PR_Write (fd, buffer + written, n - written);
- if (w == -1)
- _set_errno_from_pr_error (PR_GetError ());
- } while (w == -1 && PR_GetError () == PR_PENDING_INTERRUPT_ERROR);
-
- if (w == -1) {
- if (PR_GetError () == PR_IO_PENDING_ERROR ||
- PR_GetError () == PR_WOULD_BLOCK_ERROR)
- w = 0;
- } else
- written += w;
- }
- } while (w != -1 && written < n);
-
- /* restore O_NONBLOCK options */
- saved_errno = errno;
- sockopts.option = PR_SockOpt_Nonblocking;
- sockopts.value.non_blocking = nonblock;
- PR_SetSocketOption (fd, &sockopts);
- errno = saved_errno;
- }
+ if (bytes_written > 0)
+ total_written += bytes_written;
+ } while (bytes_written != -1 && total_written < size);
+
+ if (cancel_id > 0)
+ g_cancellable_disconnect (cancellable, cancel_id);
- if (w == -1) {
- _set_g_error_from_errno (error, TRUE);
- written = -1;
+ if (g_cancellable_set_error_if_cancelled (cancellable, error))
+ return -1;
+
+ if (bytes_written == -1) {
+ tcp_stream_set_error_from_pr_error (error);
+ return -1;
}
- return written;
+ return total_written;
}
static gssize
@@ -578,7 +485,9 @@ socket_connect (struct addrinfo *host,
GError **error)
{
PRNetAddr netaddr;
- PRFileDesc *fd, *cancel_fd = NULL;
+ PRFileDesc *fd;
+ PRStatus status;
+ gulong cancel_id = 0;
if (sockaddr_to_praddr (host->ai_addr, host->ai_addrlen, &netaddr) != 0) {
errno = EINVAL;
@@ -593,67 +502,31 @@ socket_connect (struct addrinfo *host,
return NULL;
}
- if (CAMEL_IS_OPERATION (cancellable))
- cancel_fd = camel_operation_cancel_prfd (
- CAMEL_OPERATION (cancellable));
+ if (G_IS_CANCELLABLE (cancellable))
+ cancel_id = g_cancellable_connect (
+ cancellable, G_CALLBACK (tcp_stream_cancelled),
+ PR_GetCurrentThread (), (GDestroyNotify) NULL);
- if (PR_Connect (fd, &netaddr, cancel_fd ? 0 : CONNECT_TIMEOUT) == PR_FAILURE) {
- gint errnosave;
+ status = PR_Connect (fd, &netaddr, PR_INTERVAL_NO_TIMEOUT);
- _set_errno_from_pr_error (PR_GetError ());
- if (PR_GetError () == PR_IN_PROGRESS_ERROR ||
- (cancel_fd && (PR_GetError () == PR_CONNECT_TIMEOUT_ERROR ||
- PR_GetError () == PR_IO_TIMEOUT_ERROR))) {
- gboolean connected = FALSE;
- PRPollDesc poll[2];
-
- poll[0].fd = fd;
- poll[0].in_flags = PR_POLL_WRITE | PR_POLL_EXCEPT;
- poll[1].fd = cancel_fd;
- poll[1].in_flags = PR_POLL_READ;
-
- do {
- poll[0].out_flags = 0;
- poll[1].out_flags = 0;
-
- if (PR_Poll (poll, cancel_fd ? 2 : 1, CONNECT_TIMEOUT) == PR_FAILURE) {
- _set_errno_from_pr_error (PR_GetError ());
- goto exception;
- }
-
- if (poll[1].out_flags == PR_POLL_READ) {
- errno = EINTR;
- goto exception;
- }
-
- if (PR_ConnectContinue (fd, poll[0].out_flags) == PR_FAILURE) {
- _set_errno_from_pr_error (PR_GetError ());
- if (PR_GetError () != PR_IN_PROGRESS_ERROR)
- goto exception;
- } else {
- connected = TRUE;
- }
- } while (!connected);
- } else {
- exception:
- errnosave = errno;
- PR_Shutdown (fd, PR_SHUTDOWN_BOTH);
- PR_Close (fd);
- errno = errnosave;
- fd = NULL;
+ if (cancel_id > 0)
+ g_cancellable_disconnect (cancellable, cancel_id);
- goto out;
- }
+ if (g_cancellable_set_error_if_cancelled (cancellable, error))
+ goto fail;
- errno = 0;
+ if (status == PR_FAILURE) {
+ tcp_stream_set_error_from_pr_error (error);
+ goto fail;
}
-out:
+ return fd;
- if (!fd)
- _set_g_error_from_errno (error, TRUE);
+fail:
+ PR_Shutdown (fd, PR_SHUTDOWN_BOTH);
+ PR_Close (fd);
- return fd;
+ return NULL;
}
/* Just opens a TCP socket to a (presumed) SOCKS proxy. Does not actually
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]