[glib-networking] gnutls: handle simultaneous ops, do handshaking in a thread



commit 54b77031a17aa531d6139f668aeb8932bdee94f9
Author: Dan Winship <danw gnome org>
Date:   Mon Jul 2 12:58:02 2012 -0400

    gnutls: handle simultaneous ops, do handshaking in a thread
    
    Keep separate internal read/write states so that you can do
    simultaneous reads/writes (either simultaneous async ops in the same
    thread, or simultaneous sync ops in different threads). Add
    locking/blocking so that this works correctly even in the presence of
    rehandshakes (and add test cases for this).
    
    As part of this, change handshaking so that the I/O part of it always
    happens in a separate thread, which has three advantages:
    
      1. It simplifies GTlsConnectionGnutlsSource by making it not have to
         worry about flipping back and forth between read and write.
    
      2. (Future) It will let the caller asynchronously handle
         certificate-related functionality that is implemented via
         synchronous callbacks in gnutls.
    
      3. (Future) We can use g_task_set_return_on_cancel() to allow
         cancellation even during uncancellable PKCS#11 operations.
    
    https://bugzilla.gnome.org/show_bug.cgi?id=656343
    https://bugzilla.gnome.org/show_bug.cgi?id=660252

 tls/gnutls/gtlsconnection-gnutls.c |  927 ++++++++++++++++++++++--------------
 tls/tests/connection.c             |  317 ++++++++++++-
 2 files changed, 873 insertions(+), 371 deletions(-)
---
diff --git a/tls/gnutls/gtlsconnection-gnutls.c b/tls/gnutls/gtlsconnection-gnutls.c
index 46db1c7..33cc9ac 100644
--- a/tls/gnutls/gtlsconnection-gnutls.c
+++ b/tls/gnutls/gtlsconnection-gnutls.c
@@ -60,6 +60,14 @@ static P11KitPin*    on_pin_prompt_callback  (const char     *pinfile,
 
 static void g_tls_connection_gnutls_init_priorities (void);
 
+static gboolean do_implicit_handshake (GTlsConnectionGnutls  *gnutls,
+				       gboolean               blocking,
+				       GCancellable          *cancellable,
+				       GError               **error);
+static gboolean finish_handshake (GTlsConnectionGnutls  *gnutls,
+				  GSimpleAsyncResult    *thread_result,
+				  GError               **error);
+
 G_DEFINE_ABSTRACT_TYPE_WITH_CODE (GTlsConnectionGnutls, g_tls_connection_gnutls, G_TYPE_TLS_CONNECTION,
 				  G_IMPLEMENT_INTERFACE (G_TYPE_INITABLE,
 							 g_tls_connection_gnutls_initable_iface_init);
@@ -97,9 +105,34 @@ struct _GTlsConnectionGnutlsPrivate
   gboolean is_system_certdb;
   GTlsDatabase *database;
   gboolean database_is_unset;
-  gboolean need_handshake, handshaking, ever_handshaked;
+
+  /* need_handshake means the next claim_op() will get diverted into
+   * an implicit handshake (unless it's an OP_HANDSHAKE itself).
+   * need_finish_handshake means the next claim_op() will get
+   * diverted into finish_handshake().
+   *
+   * handshaking is TRUE as soon as a handshake thread is queued.
+   * Normally it becomes FALSE after finish_handshake() completes. For
+   * an implicit handshake, but in the case of an async implicit
+   * handshake, it becomes FALSE at the end of handshake_thread(),
+   * (and then the next read/write op will call finish_handshake()).
+   * This is because we don't want to call finish_handshake() (and
+   * possibly emit signals) if the caller is not actually in a TLS op
+   * at the time. (Eg, if they're waiting to try a nonblocking call
+   * again, we don't want to emit the signal until they do.)
+   *
+   * started_handshake indicates that the current handshake attempt
+   * got at least as far as calling gnutls_handshake() (and so any
+   * error should be copied to handshake_error and returned on all
+   * future operations). ever_handshaked indicates that TLS has
+   * been successfully negotiated at some point.
+   */
+  gboolean need_handshake, need_finish_handshake;
+  gboolean started_handshake, handshaking, ever_handshaked;
+  GSimpleAsyncResult *implicit_handshake;
   GError *handshake_error;
-  gboolean closing;
+
+  gboolean closing, closed;
 
   GInputStream *tls_istream;
   GOutputStream *tls_ostream;
@@ -107,13 +140,22 @@ struct _GTlsConnectionGnutlsPrivate
   GTlsInteraction *interaction;
   gchar *interaction_id;
 
-  GError *error;
-  GCancellable *cancellable;
-  gboolean blocking;
+  GMutex        op_mutex;
+  GCancellable *waiting_for_op;
+
+  gboolean      reading;
+  gboolean      read_blocking;
+  GError       *read_error;
+  GCancellable *read_cancellable;
+
+  gboolean      writing;
+  gboolean      write_blocking;
+  GError       *write_error;
+  GCancellable *write_cancellable;
+
 #ifndef GNUTLS_E_PREMATURE_TERMINATION
   gboolean eof;
 #endif
-  GIOCondition internal_direction;
 };
 
 static gint unique_interaction_id = 0;
@@ -141,6 +183,9 @@ g_tls_connection_gnutls_init (GTlsConnectionGnutls *gnutls)
   p11_kit_pin_register_callback (gnutls->priv->interaction_id,
                                  on_pin_prompt_callback, gnutls, NULL);
 #endif
+
+  gnutls->priv->waiting_for_op = g_cancellable_new ();
+  g_cancellable_cancel (gnutls->priv->waiting_for_op);
 }
 
 static gnutls_priority_t priorities[2][2];
@@ -248,8 +293,11 @@ g_tls_connection_gnutls_finalize (GObject *object)
   g_free (gnutls->priv->interaction_id);
   g_clear_object (&gnutls->priv->interaction);
 
-  g_clear_error (&gnutls->priv->error);
   g_clear_error (&gnutls->priv->handshake_error);
+  g_clear_error (&gnutls->priv->read_error);
+  g_clear_error (&gnutls->priv->write_error);
+
+  g_clear_object (&gnutls->priv->waiting_for_op);
 
   G_OBJECT_CLASS (g_tls_connection_gnutls_parent_class)->finalize (object);
 }
@@ -434,57 +482,226 @@ g_tls_connection_gnutls_get_certificate (GTlsConnectionGnutls *gnutls,
                                      gnutls->priv->interaction_id, st);
 }
 
+typedef enum {
+  G_TLS_CONNECTION_GNUTLS_OP_HANDSHAKE,
+  G_TLS_CONNECTION_GNUTLS_OP_READ,
+  G_TLS_CONNECTION_GNUTLS_OP_WRITE,
+  G_TLS_CONNECTION_GNUTLS_OP_CLOSE,
+} GTlsConnectionGnutlsOp;
+
+static gboolean
+claim_op (GTlsConnectionGnutls    *gnutls,
+	  GTlsConnectionGnutlsOp   op,
+	  gboolean                 blocking,
+	  GCancellable            *cancellable,
+	  GError                 **error)
+{
+ try_again:
+  if (g_cancellable_set_error_if_cancelled (cancellable, error))
+    return FALSE;
+
+  g_mutex_lock (&gnutls->priv->op_mutex);
+
+  if (gnutls->priv->closing || gnutls->priv->closed)
+    {
+      g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
+			   _("Connection is closed"));
+      g_mutex_unlock (&gnutls->priv->op_mutex);
+      return FALSE;
+    }
+
+  if (gnutls->priv->handshake_error && op != G_TLS_CONNECTION_GNUTLS_OP_CLOSE)
+    {
+      if (error)
+	*error = g_error_copy (gnutls->priv->handshake_error);
+      g_mutex_unlock (&gnutls->priv->op_mutex);
+      return FALSE;
+    }
+
+  if (op != G_TLS_CONNECTION_GNUTLS_OP_HANDSHAKE)
+    {
+      if (gnutls->priv->need_handshake)
+	{
+	  gnutls->priv->need_handshake = FALSE;
+	  gnutls->priv->handshaking = TRUE;
+	  if (!do_implicit_handshake (gnutls, blocking, cancellable, error))
+	    {
+	      g_mutex_unlock (&gnutls->priv->op_mutex);
+	      return FALSE;
+	    }
+	}
+
+      if (gnutls->priv->need_finish_handshake)
+	{
+	  gboolean success;
+
+	  gnutls->priv->need_finish_handshake = FALSE;
+
+	  g_mutex_unlock (&gnutls->priv->op_mutex);
+	  success = finish_handshake (gnutls, gnutls->priv->implicit_handshake, error);
+	  g_clear_object (&gnutls->priv->implicit_handshake);
+	  g_mutex_lock (&gnutls->priv->op_mutex);
+
+	  gnutls->priv->handshaking = FALSE;
+	  if (!success || g_cancellable_set_error_if_cancelled (cancellable, error))
+	    {
+	      g_mutex_unlock (&gnutls->priv->op_mutex);
+	      return FALSE;
+	    }
+	}
+    }
+
+  if ((op != G_TLS_CONNECTION_GNUTLS_OP_WRITE && gnutls->priv->reading) ||
+      (op != G_TLS_CONNECTION_GNUTLS_OP_READ && gnutls->priv->writing) ||
+      (op != G_TLS_CONNECTION_GNUTLS_OP_HANDSHAKE && gnutls->priv->handshaking))
+    {
+      GPollFD fds[2];
+      int nfds;
+
+      g_cancellable_reset (gnutls->priv->waiting_for_op);
+
+      g_mutex_unlock (&gnutls->priv->op_mutex);
+
+      if (!blocking)
+	{
+	  g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
+			       _("Operation would block"));
+	  return FALSE;
+	}
+
+      g_cancellable_make_pollfd (gnutls->priv->waiting_for_op, &fds[0]);
+      if (g_cancellable_make_pollfd (cancellable, &fds[0]))
+	nfds = 2;
+      else
+	nfds = 1;
+      g_poll (fds, nfds, -1);
+      g_cancellable_release_fd (cancellable);
+
+      goto try_again;
+    }
+
+  if (op == G_TLS_CONNECTION_GNUTLS_OP_HANDSHAKE)
+    {
+      gnutls->priv->handshaking = TRUE;
+      gnutls->priv->need_handshake = FALSE;
+    }
+  if (op == G_TLS_CONNECTION_GNUTLS_OP_CLOSE)
+    gnutls->priv->closing = TRUE;
+
+  if (op != G_TLS_CONNECTION_GNUTLS_OP_WRITE)
+    gnutls->priv->reading = TRUE;
+  if (op != G_TLS_CONNECTION_GNUTLS_OP_READ)
+    gnutls->priv->writing = TRUE;
+
+  g_mutex_unlock (&gnutls->priv->op_mutex);
+  return TRUE;
+}
+
+static void
+yield_op (GTlsConnectionGnutls   *gnutls,
+	  GTlsConnectionGnutlsOp  op)
+{
+  g_mutex_lock (&gnutls->priv->op_mutex);
+
+  if (op == G_TLS_CONNECTION_GNUTLS_OP_HANDSHAKE)
+    gnutls->priv->handshaking = FALSE;
+  if (op == G_TLS_CONNECTION_GNUTLS_OP_CLOSE)
+    gnutls->priv->closing = FALSE;
+
+  if (op != G_TLS_CONNECTION_GNUTLS_OP_WRITE)
+    gnutls->priv->reading = FALSE;
+  if (op != G_TLS_CONNECTION_GNUTLS_OP_READ)
+    gnutls->priv->writing = FALSE;
+
+  g_cancellable_cancel (gnutls->priv->waiting_for_op);
+  g_mutex_unlock (&gnutls->priv->op_mutex);
+}
+
 static void
 begin_gnutls_io (GTlsConnectionGnutls  *gnutls,
+		 GIOCondition           direction,
 		 gboolean               blocking,
 		 GCancellable          *cancellable)
 {
-  gnutls->priv->blocking = blocking;
-  gnutls->priv->cancellable = cancellable;
-  gnutls->priv->internal_direction = 0;
-  if (cancellable)
-    g_cancellable_push_current (cancellable);
-  g_clear_error (&gnutls->priv->error);
+  g_assert (direction & (G_IO_IN | G_IO_OUT));
+
+  if (direction & G_IO_IN)
+    {
+      gnutls->priv->read_blocking = blocking;
+      gnutls->priv->read_cancellable = cancellable;
+      g_clear_error (&gnutls->priv->read_error);
+    }
+
+  if (direction & G_IO_OUT)
+    {
+      gnutls->priv->write_blocking = blocking;
+      gnutls->priv->write_cancellable = cancellable;
+      g_clear_error (&gnutls->priv->write_error);
+    }
 }
 
 static int
 end_gnutls_io (GTlsConnectionGnutls  *gnutls,
+	       GIOCondition           direction,
 	       int                    status,
+	       const char            *errmsg,
 	       GError               **error)
 {
-  if (gnutls->priv->cancellable)
-    g_cancellable_pop_current (gnutls->priv->cancellable);
-  gnutls->priv->cancellable = NULL;
+  GError *my_error = NULL;
 
-  if (status >= 0)
+  g_assert (direction & (G_IO_IN | G_IO_OUT));
+  g_assert (!error || !*error);
+
+  if (status == GNUTLS_E_AGAIN ||
+      status == GNUTLS_E_WARNING_ALERT_RECEIVED)
+    return GNUTLS_E_AGAIN;
+
+  if (direction & G_IO_IN)
     {
-      g_clear_error (&gnutls->priv->error);
-      return status;
+      gnutls->priv->read_cancellable = NULL;
+      if (status < 0)
+	{
+	  my_error = gnutls->priv->read_error;
+	  gnutls->priv->read_error = NULL;
+	}
+      else
+	g_clear_error (&gnutls->priv->read_error);
+    }
+  if (direction & G_IO_OUT)
+    {
+      gnutls->priv->write_cancellable = NULL;
+      if (status < 0 && !my_error)
+	{
+	  my_error = gnutls->priv->write_error;
+	  gnutls->priv->write_error = NULL;
+	}
+      else
+	g_clear_error (&gnutls->priv->write_error);
     }
 
+  if (status >= 0)
+    return status;
+
   if (gnutls->priv->handshaking && !gnutls->priv->ever_handshaked)
     {
-      if (g_error_matches (gnutls->priv->error, G_IO_ERROR, G_IO_ERROR_FAILED) ||
+      if (g_error_matches (my_error, G_IO_ERROR, G_IO_ERROR_FAILED) ||
 	  status == GNUTLS_E_UNEXPECTED_PACKET_LENGTH ||
 	  status == GNUTLS_E_FATAL_ALERT_RECEIVED ||
 	  status == GNUTLS_E_DECRYPTION_FAILED ||
 	  status == GNUTLS_E_UNSUPPORTED_VERSION_PACKET)
 	{
-	  g_clear_error (&gnutls->priv->error);
+	  g_clear_error (&my_error);
 	  g_set_error_literal (error, G_TLS_ERROR, G_TLS_ERROR_NOT_TLS,
 			       _("Peer failed to perform TLS handshake"));
 	  return GNUTLS_E_PULL_ERROR;
 	}
     }
 
-  if (gnutls->priv->error)
+  if (my_error)
     {
-      if (g_error_matches (gnutls->priv->error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
-	status = GNUTLS_E_AGAIN;
-      else
+      if (!g_error_matches (my_error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
 	G_TLS_CONNECTION_GNUTLS_GET_CLASS (gnutls)->failed (gnutls);
-      g_propagate_error (error, gnutls->priv->error);
-      gnutls->priv->error = NULL;
+      g_propagate_error (error, my_error);
       return status;
     }
   else if (status == GNUTLS_E_REHANDSHAKE)
@@ -496,9 +713,17 @@ end_gnutls_io (GTlsConnectionGnutls  *gnutls,
 	  return GNUTLS_E_PULL_ERROR;
 	}
 
-      gnutls->priv->need_handshake = TRUE;
+      g_mutex_lock (&gnutls->priv->op_mutex);
+      if (!gnutls->priv->handshaking)
+	gnutls->priv->need_handshake = TRUE;
+      g_mutex_unlock (&gnutls->priv->op_mutex);
       return status;
     }
+  else if (status == GNUTLS_E_GOT_APPLICATION_DATA)
+    {
+      if (gnutls->priv->handshaking && G_IS_TLS_SERVER_CONNECTION (gnutls))
+	return GNUTLS_E_AGAIN;
+    }
   else if (
 #ifdef GNUTLS_E_PREMATURE_TERMINATION
 	   status == GNUTLS_E_PREMATURE_TERMINATION
@@ -518,34 +743,34 @@ end_gnutls_io (GTlsConnectionGnutls  *gnutls,
 	return 0;
     }
 
+  if (error)
+    {
+      g_set_error (error, G_TLS_ERROR, G_TLS_ERROR_MISC,
+                   errmsg, gnutls_strerror (status));
+    }
   return status;
 }
 
-#define BEGIN_GNUTLS_IO(gnutls, blocking, cancellable)	\
-  begin_gnutls_io (gnutls, blocking, cancellable);	\
+#define BEGIN_GNUTLS_IO(gnutls, direction, blocking, cancellable)	\
+  begin_gnutls_io (gnutls, direction, blocking, cancellable);		\
   do {
 
-#define END_GNUTLS_IO(gnutls, ret, errmsg, err)	\
-  } while ((ret == GNUTLS_E_AGAIN ||			\
-            ret == GNUTLS_E_WARNING_ALERT_RECEIVED) &&	\
-           !gnutls->priv->error);			\
-  ret = end_gnutls_io (gnutls, ret, err);		\
-  if (ret < 0 && ret != GNUTLS_E_REHANDSHAKE && err && !*err) \
-    {							\
-      g_set_error (err, G_TLS_ERROR, G_TLS_ERROR_MISC,\
-                   errmsg, gnutls_strerror (ret));	\
-    }							\
-  ;
+#define END_GNUTLS_IO(gnutls, direction, ret, errmsg, err)		\
+  } while ((ret = end_gnutls_io (gnutls, direction, ret, errmsg, err)) == GNUTLS_E_AGAIN);
 
 gboolean
 g_tls_connection_gnutls_check (GTlsConnectionGnutls  *gnutls,
 			       GIOCondition           condition)
 {
-  if (!gnutls->priv->internal_direction)
+  /* Racy, but worst case is that we just get WOULD_BLOCK back */
+  if (gnutls->priv->need_finish_handshake)
     return TRUE;
 
+  /* If a handshake or close is in progress, then tls_istream and
+   * tls_ostream are blocked, regardless of the base stream status.
+   */
   if (gnutls->priv->handshaking || gnutls->priv->closing)
-    condition = gnutls->priv->internal_direction;
+    return FALSE;
 
   if (condition & G_IO_IN)
     return g_pollable_input_stream_is_readable (gnutls->priv->base_istream);
@@ -554,13 +779,16 @@ g_tls_connection_gnutls_check (GTlsConnectionGnutls  *gnutls,
 }
 
 typedef struct {
-  GSource source;
+  GSource               source;
 
   GTlsConnectionGnutls *gnutls;
   GObject              *stream;
 
   GSource              *child_source;
-  GIOCondition          current_direction;
+  GIOCondition          condition;
+
+  gboolean              io_waiting;
+  gboolean              op_waiting;
 } GTlsConnectionGnutlsSource;
 
 static gboolean
@@ -577,40 +805,51 @@ gnutls_source_check (GSource *source)
   return FALSE;
 }
 
-static gboolean
-gnutls_source_sync_child_source (GTlsConnectionGnutlsSource *gnutls_source)
+static void
+gnutls_source_sync (GTlsConnectionGnutlsSource *gnutls_source)
 {
   GTlsConnectionGnutls *gnutls = gnutls_source->gnutls;
-  GSource *source = (GSource *)gnutls_source;
-  GIOCondition direction;
+  gboolean io_waiting, op_waiting;
 
-  if (gnutls->priv->handshaking || gnutls->priv->closing)
-    direction = gnutls->priv->internal_direction;
-  else if (!gnutls_source->stream)
-    return FALSE;
-  else if (G_IS_TLS_INPUT_STREAM_GNUTLS (gnutls_source->stream))
-    direction = G_IO_IN;
+  g_mutex_lock (&gnutls->priv->op_mutex);
+  if (((gnutls_source->condition & G_IO_IN) && gnutls->priv->reading) ||
+      ((gnutls_source->condition & G_IO_OUT) && gnutls->priv->writing) ||
+      (gnutls->priv->handshaking && !gnutls->priv->need_finish_handshake))
+    op_waiting = TRUE;
   else
-    direction = G_IO_OUT;
+    op_waiting = FALSE;
 
-  if (direction == gnutls_source->current_direction)
-    return TRUE;
+  if (!op_waiting && !gnutls->priv->need_handshake &&
+      !gnutls->priv->need_finish_handshake)
+    io_waiting = TRUE;
+  else
+    io_waiting = FALSE;
+  g_mutex_unlock (&gnutls->priv->op_mutex);
+
+  if (op_waiting == gnutls_source->op_waiting &&
+      io_waiting == gnutls_source->io_waiting)
+    return;
+  gnutls_source->op_waiting = op_waiting;
+  gnutls_source->io_waiting = io_waiting;
 
   if (gnutls_source->child_source)
     {
-      g_source_remove_child_source (source, gnutls_source->child_source);
+      g_source_remove_child_source ((GSource *)gnutls_source,
+				    gnutls_source->child_source);
       g_source_unref (gnutls_source->child_source);
     }
 
-  if (direction & G_IO_IN)
+  if (op_waiting)
+    gnutls_source->child_source = g_cancellable_source_new (gnutls->priv->waiting_for_op);
+  else if (io_waiting && G_IS_POLLABLE_INPUT_STREAM (gnutls_source->stream))
     gnutls_source->child_source = g_pollable_input_stream_create_source (gnutls->priv->base_istream, NULL);
-  else
+  else if (io_waiting && G_IS_POLLABLE_OUTPUT_STREAM (gnutls_source->stream))
     gnutls_source->child_source = g_pollable_output_stream_create_source (gnutls->priv->base_ostream, NULL);
+  else
+    gnutls_source->child_source = g_timeout_source_new (0);
 
   g_source_set_dummy_callback (gnutls_source->child_source);
-  g_source_add_child_source (source, gnutls_source->child_source);
-  gnutls_source->current_direction = direction;
-  return TRUE;
+  g_source_add_child_source ((GSource *)gnutls_source, gnutls_source->child_source);
 }
 
 static gboolean
@@ -624,7 +863,7 @@ gnutls_source_dispatch (GSource     *source,
 
   ret = (*func) (gnutls_source->stream, user_data);
   if (ret)
-    ret = gnutls_source_sync_child_source (gnutls_source);
+    gnutls_source_sync (gnutls_source);
 
   return ret;
 }
@@ -635,9 +874,7 @@ gnutls_source_finalize (GSource *source)
   GTlsConnectionGnutlsSource *gnutls_source = (GTlsConnectionGnutlsSource *)source;
 
   g_object_unref (gnutls_source->gnutls);
-
-  if (gnutls_source->child_source)
-    g_source_unref (gnutls_source->child_source);
+  g_source_unref (gnutls_source->child_source);
 }
 
 static gboolean
@@ -686,11 +923,15 @@ g_tls_connection_gnutls_create_source (GTlsConnectionGnutls  *gnutls,
   g_source_set_name (source, "GTlsConnectionGnutlsSource");
   gnutls_source = (GTlsConnectionGnutlsSource *)source;
   gnutls_source->gnutls = g_object_ref (gnutls);
+  gnutls_source->condition = condition;
   if (condition & G_IO_IN)
     gnutls_source->stream = G_OBJECT (gnutls->priv->tls_istream);
   else if (condition & G_IO_OUT)
     gnutls_source->stream = G_OBJECT (gnutls->priv->tls_ostream);
-  gnutls_source_sync_child_source (gnutls_source);
+
+  gnutls_source->op_waiting = (gboolean) -1;
+  gnutls_source->io_waiting = (gboolean) -1;
+  gnutls_source_sync (gnutls_source);
 
   if (cancellable)
     {
@@ -704,15 +945,20 @@ g_tls_connection_gnutls_create_source (GTlsConnectionGnutls  *gnutls,
 }
 
 static void
-set_gnutls_error (GTlsConnectionGnutls *gnutls, GIOCondition direction)
+set_gnutls_error (GTlsConnectionGnutls *gnutls,
+		  GError               *error)
 {
-  if (g_error_matches (gnutls->priv->error, G_IO_ERROR, G_IO_ERROR_CANCELLED))
+  /* We set EINTR rather than EAGAIN for G_IO_ERROR_WOULD_BLOCK so
+   * that GNUTLS_E_AGAIN only gets returned for gnutls-internal
+   * reasons, not for actual socket EAGAINs (and we have access
+   * to @error at the higher levels, so we can distinguish them
+   * that way later).
+   */
+
+  if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED))
+    gnutls_transport_set_errno (gnutls->priv->session, EINTR);
+  else if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
     gnutls_transport_set_errno (gnutls->priv->session, EINTR);
-  else if (g_error_matches (gnutls->priv->error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
-    {
-      gnutls_transport_set_errno (gnutls->priv->session, EAGAIN);
-      gnutls->priv->internal_direction = direction;
-    }
   else
     gnutls_transport_set_errno (gnutls->priv->session, EIO);
 }
@@ -725,21 +971,22 @@ g_tls_connection_gnutls_pull_func (gnutls_transport_ptr_t  transport_data,
   GTlsConnectionGnutls *gnutls = transport_data;
   ssize_t ret;
 
-  /* If gnutls->priv->error is non-%NULL when we're called, it means
+  /* If gnutls->priv->read_error is non-%NULL when we're called, it means
    * that an error previously occurred, but gnutls decided not to
    * propagate it. So it's correct for us to just clear it. (Usually
    * this means it ignored an EAGAIN after a short read, and now
    * we'll return EAGAIN again, which it will obey this time.)
    */
-  g_clear_error (&gnutls->priv->error);
+  g_clear_error (&gnutls->priv->read_error);
 
   ret = g_pollable_stream_read (G_INPUT_STREAM (gnutls->priv->base_istream),
-				buf, buflen, gnutls->priv->blocking,
-				gnutls->priv->cancellable,
-				&gnutls->priv->error);
+				buf, buflen,
+				gnutls->priv->read_blocking,
+				gnutls->priv->read_cancellable,
+				&gnutls->priv->read_error);
 
   if (ret < 0)
-    set_gnutls_error (gnutls, G_IO_IN);
+    set_gnutls_error (gnutls, gnutls->priv->read_error);
 #ifndef GNUTLS_E_PREMATURE_TERMINATION
   else if (ret == 0)
     gnutls->priv->eof = TRUE;
@@ -757,18 +1004,76 @@ g_tls_connection_gnutls_push_func (gnutls_transport_ptr_t  transport_data,
   ssize_t ret;
 
   /* See comment in pull_func. */
-  g_clear_error (&gnutls->priv->error);
+  g_clear_error (&gnutls->priv->write_error);
 
   ret = g_pollable_stream_write (G_OUTPUT_STREAM (gnutls->priv->base_ostream),
-				 buf, buflen, gnutls->priv->blocking,
-				 gnutls->priv->cancellable,
-				 &gnutls->priv->error);
+				 buf, buflen,
+				 gnutls->priv->write_blocking,
+				 gnutls->priv->write_cancellable,
+				 &gnutls->priv->write_error);
   if (ret < 0)
-    set_gnutls_error (gnutls, G_IO_OUT);
+    set_gnutls_error (gnutls, gnutls->priv->write_error);
 
   return ret;
 }
 
+static void
+handshake_thread (GSimpleAsyncResult *result,
+		  GObject            *object,
+		  GCancellable       *cancellable)
+{
+  GTlsConnectionGnutls *gnutls = G_TLS_CONNECTION_GNUTLS (object);
+  gboolean is_client;
+  GError *error = NULL;
+  int ret;
+
+  gnutls->priv->started_handshake = FALSE;
+
+  if (!claim_op (gnutls, G_TLS_CONNECTION_GNUTLS_OP_HANDSHAKE,
+		 TRUE, cancellable, &error))
+    {
+      g_simple_async_result_take_error (result, error);
+      return;
+    }
+
+  g_clear_error (&gnutls->priv->handshake_error);
+
+  is_client = G_IS_TLS_CLIENT_CONNECTION (gnutls);
+
+  if (!is_client && gnutls->priv->ever_handshaked &&
+      !gnutls->priv->implicit_handshake)
+    {
+      BEGIN_GNUTLS_IO (gnutls, G_IO_IN | G_IO_OUT, TRUE, cancellable);
+      ret = gnutls_rehandshake (gnutls->priv->session);
+      END_GNUTLS_IO (gnutls, G_IO_IN | G_IO_OUT, ret,
+		     _("Error performing TLS handshake: %s"), &error);
+
+      if (error)
+	{
+	  g_simple_async_result_take_error (result, error);
+	  return;
+	}
+    }
+
+  gnutls->priv->started_handshake = TRUE;
+
+  g_clear_object (&gnutls->priv->peer_certificate);
+  gnutls->priv->peer_certificate_errors = 0;
+
+  g_tls_connection_gnutls_set_handshake_priority (gnutls);
+
+  BEGIN_GNUTLS_IO (gnutls, G_IO_IN | G_IO_OUT, TRUE, cancellable);
+  ret = gnutls_handshake (gnutls->priv->session);
+  END_GNUTLS_IO (gnutls, G_IO_IN | G_IO_OUT, ret,
+		 _("Error performing TLS handshake: %s"), &error);
+
+  gnutls->priv->ever_handshaked = TRUE;
+
+  if (error)
+    g_simple_async_result_take_error (result, error);
+  else
+    g_simple_async_result_set_op_res_gboolean (result, TRUE);
+}
 
 static GTlsCertificate *
 get_peer_certificate_from_session (GTlsConnectionGnutls *gnutls)
@@ -874,57 +1179,16 @@ accept_peer_certificate (GTlsConnectionGnutls *gnutls,
 }
 
 static gboolean
-handshake_internal (GTlsConnectionGnutls  *gnutls,
-		    gboolean               blocking,
-		    GCancellable          *cancellable,
-		    GError               **error)
+finish_handshake (GTlsConnectionGnutls  *gnutls,
+		  GSimpleAsyncResult    *result,
+		  GError               **error)
 {
-  GTlsCertificate *peer_certificate = NULL;
-  GTlsCertificateFlags peer_certificate_errors = 0;
-  int ret;
-
-  g_clear_error (&gnutls->priv->handshake_error);
-
-  if (G_IS_TLS_SERVER_CONNECTION_GNUTLS (gnutls) &&
-      gnutls->priv->ever_handshaked && !gnutls->priv->handshaking &&
-      !gnutls->priv->need_handshake)
-    {
-      BEGIN_GNUTLS_IO (gnutls, blocking, cancellable);
-      ret = gnutls_rehandshake (gnutls->priv->session);
-      END_GNUTLS_IO (gnutls, ret, _("Error performing TLS handshake: %s"), error);
-
-      if (ret != 0)
-	return FALSE;
-    }
-
-  if (!gnutls->priv->handshaking)
-    {
-      gnutls->priv->handshaking = TRUE;
-
-      g_clear_object (&gnutls->priv->peer_certificate);
-      gnutls->priv->peer_certificate_errors = 0;
-
-      g_tls_connection_gnutls_set_handshake_priority (gnutls);
-      G_TLS_CONNECTION_GNUTLS_GET_CLASS (gnutls)->begin_handshake (gnutls);
-    }
-
-  BEGIN_GNUTLS_IO (gnutls, blocking, cancellable);
-  ret = gnutls_handshake (gnutls->priv->session);
-  END_GNUTLS_IO (gnutls, ret, _("Error performing TLS handshake: %s"),
-		 &gnutls->priv->handshake_error);
-
-  if (ret == GNUTLS_E_AGAIN)
-    {
-      g_propagate_error (error, gnutls->priv->handshake_error);
-      gnutls->priv->handshake_error = NULL;
-      return FALSE;
-    }
+  GTlsCertificate *peer_certificate;
+  GTlsCertificateFlags peer_certificate_errors;
 
-  gnutls->priv->handshaking = FALSE;
-  gnutls->priv->need_handshake = FALSE;
-  gnutls->priv->ever_handshaked = TRUE;
+  g_assert (error != NULL);
 
-  if (ret == 0 &&
+  if (!g_simple_async_result_propagate_error (result, error) &&
       gnutls_certificate_type_get (gnutls->priv->session) == GNUTLS_CRT_X509)
     peer_certificate = get_peer_certificate_from_session (gnutls);
   else
@@ -936,8 +1200,7 @@ handshake_internal (GTlsConnectionGnutls  *gnutls,
       if (!accept_peer_certificate (gnutls, peer_certificate,
 				    peer_certificate_errors))
 	{
-	  g_set_error_literal (&gnutls->priv->handshake_error,
-			       G_TLS_ERROR, G_TLS_ERROR_BAD_CERTIFICATE,
+	  g_set_error_literal (error, G_TLS_ERROR, G_TLS_ERROR_BAD_CERTIFICATE,
 			       _("Unacceptable TLS certificate"));
 	}
 
@@ -946,43 +1209,18 @@ handshake_internal (GTlsConnectionGnutls  *gnutls,
       g_object_notify (G_OBJECT (gnutls), "peer-certificate");
       g_object_notify (G_OBJECT (gnutls), "peer-certificate-errors");
     }
-  else if (G_IS_TLS_CLIENT_CONNECTION (gnutls))
+  else if (error && !*error && G_IS_TLS_CLIENT_CONNECTION (gnutls))
     {
-      g_set_error_literal (&gnutls->priv->handshake_error,
-			   G_TLS_ERROR, G_TLS_ERROR_BAD_CERTIFICATE,
+      g_set_error_literal (error, G_TLS_ERROR, G_TLS_ERROR_BAD_CERTIFICATE,
 			   _("Server did not return a valid TLS certificate"));
     }
 
-  G_TLS_CONNECTION_GNUTLS_GET_CLASS (gnutls)->
-    finish_handshake (gnutls, &gnutls->priv->handshake_error);
+  G_TLS_CONNECTION_GNUTLS_GET_CLASS (gnutls)->finish_handshake (gnutls, error);
 
-  if (gnutls->priv->handshake_error)
-    {
-      if (error)
-	*error = g_error_copy (gnutls->priv->handshake_error);
-      return FALSE;
-    }
-  else
-    return TRUE;
-}
+  if (*error && gnutls->priv->started_handshake)
+    gnutls->priv->handshake_error = g_error_copy (*error);
 
-static gboolean
-handshake_in_progress_or_failed (GTlsConnectionGnutls  *gnutls,
-				 gboolean               blocking,
-				 GCancellable          *cancellable,
-				 GError               **error)
-{
-  if (gnutls->priv->handshake_error)
-    {
-      if (error)
-	*error = g_error_copy (gnutls->priv->handshake_error);
-      return TRUE;
-    }
-
-  if (!(gnutls->priv->need_handshake || gnutls->priv->handshaking))
-    return FALSE;
-
-  return !handshake_internal (gnutls, blocking, cancellable, error);
+  return (*error == NULL);
 }
 
 static gboolean
@@ -991,40 +1229,52 @@ g_tls_connection_gnutls_handshake (GTlsConnection   *conn,
 				   GError          **error)
 {
   GTlsConnectionGnutls *gnutls = G_TLS_CONNECTION_GNUTLS (conn);
+  GSimpleAsyncResult *result;
+  gboolean success;
+  GError *my_error = NULL;
+
+  result = g_simple_async_result_new (G_OBJECT (conn), NULL, NULL,
+				      g_tls_connection_gnutls_handshake);
+  handshake_thread (result, G_OBJECT (conn), cancellable);
+
+  success = finish_handshake (gnutls, result, &my_error);
+  g_object_unref (result);
 
-  return handshake_internal (gnutls, TRUE, cancellable, error);
+  yield_op (gnutls, G_TLS_CONNECTION_GNUTLS_OP_HANDSHAKE);
+
+  if (my_error)
+    g_propagate_error (error, my_error);
+  return success;
 }
 
-static gboolean
-g_tls_connection_gnutls_handshake_ready (GObject  *pollable_stream,
-					 gpointer  user_data)
+/* In the async version we use two GSimpleAsyncResults; one to run
+ * handshake_thread() and then call handshake_thread_completed(), and
+ * a second to call the caller's original callback after we call
+ * finish_handshake().
+ */
+
+static void
+handshake_thread_completed (GObject      *object,
+			    GAsyncResult *result,
+			    gpointer      user_data)
 {
   GTlsConnectionGnutls *gnutls;
-  GSimpleAsyncResult *simple = user_data;
-  gboolean success;
+  GSimpleAsyncResult *caller_result = user_data;
   GError *error = NULL;
+  gboolean success;
 
-  gnutls = G_TLS_CONNECTION_GNUTLS (g_async_result_get_source_object (G_ASYNC_RESULT (simple)));
+  gnutls = G_TLS_CONNECTION_GNUTLS (g_async_result_get_source_object (G_ASYNC_RESULT (caller_result)));
   g_object_unref (gnutls);
 
-  success = handshake_internal (gnutls, FALSE, NULL, &error);
-  if (!success && g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
-    {
-      g_error_free (error);
-      return TRUE;
-    }
+  success = finish_handshake (gnutls, G_SIMPLE_ASYNC_RESULT (result), &error);
+  yield_op (gnutls, G_TLS_CONNECTION_GNUTLS_OP_HANDSHAKE);
 
-  if (error)
-    {
-      g_simple_async_result_set_from_error (simple, error);
-      g_error_free (error);
-    }
+  if (success)
+    g_simple_async_result_set_op_res_gboolean (caller_result, TRUE);
   else
-    g_simple_async_result_set_op_res_gboolean (simple, success);
-  g_simple_async_result_complete (simple);
-  g_object_unref (simple);
-
-  return FALSE;
+    g_simple_async_result_take_error (caller_result, error);
+  g_simple_async_result_complete (caller_result);
+  g_object_unref (caller_result);
 }
 
 static void
@@ -1034,40 +1284,17 @@ g_tls_connection_gnutls_handshake_async (GTlsConnection       *conn,
 					 GAsyncReadyCallback   callback,
 					 gpointer              user_data)
 {
-  GTlsConnectionGnutls *gnutls = G_TLS_CONNECTION_GNUTLS (conn);
-  GSimpleAsyncResult *simple;
-  gboolean success;
-  GError *error = NULL;
-  GSource *source;
+  GSimpleAsyncResult *thread_result, *caller_result;
 
-  simple = g_simple_async_result_new (G_OBJECT (conn), callback, user_data,
-				      g_tls_connection_gnutls_handshake_async);
-  success = handshake_internal (gnutls, FALSE, cancellable, &error);
-  if (success)
-    {
-      g_simple_async_result_set_op_res_gboolean (simple, TRUE);
-      g_simple_async_result_complete_in_idle (simple);
-      g_object_unref (simple);
-      return;
-    }
-  else if (!g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
-    {
-      g_simple_async_result_set_from_error (simple, error);
-      g_error_free (error);
-      g_simple_async_result_complete_in_idle (simple);
-      g_object_unref (simple);
-      return;
-    }
-  else if (error)
-    g_error_free (error);
-
-  source = g_tls_connection_gnutls_create_source (gnutls, 0, cancellable);
-  g_source_set_callback (source,
-			 (GSourceFunc) g_tls_connection_gnutls_handshake_ready,
-			 simple, NULL);
-  g_source_set_priority (source, io_priority);
-  g_source_attach (source, g_main_context_get_thread_default ());
-  g_source_unref (source);
+  caller_result = g_simple_async_result_new (G_OBJECT (conn), callback, user_data,
+					     g_tls_connection_gnutls_handshake_async);
+
+  thread_result = g_simple_async_result_new (G_OBJECT (conn),
+					     handshake_thread_completed, caller_result,
+					     g_tls_connection_gnutls_handshake_async);
+  g_simple_async_result_run_in_thread (thread_result, handshake_thread,
+				       io_priority, cancellable);
+  g_object_unref (thread_result);
 }
 
 static gboolean
@@ -1087,6 +1314,66 @@ g_tls_connection_gnutls_handshake_finish (GTlsConnection       *conn,
   return g_simple_async_result_get_op_res_gboolean (simple);
 }
 
+static void
+implicit_handshake_completed (GObject      *object,
+			      GAsyncResult *result,
+			      gpointer      user_data)
+{
+  GTlsConnectionGnutls *gnutls = G_TLS_CONNECTION_GNUTLS (object);
+
+  g_mutex_lock (&gnutls->priv->op_mutex);
+  gnutls->priv->need_finish_handshake = TRUE;
+  g_mutex_unlock (&gnutls->priv->op_mutex);
+
+  yield_op (gnutls, G_TLS_CONNECTION_GNUTLS_OP_HANDSHAKE);
+}
+
+static gboolean
+do_implicit_handshake (GTlsConnectionGnutls  *gnutls,
+		       gboolean               blocking,
+		       GCancellable          *cancellable,
+		       GError               **error)
+{
+  /* We have op_mutex */
+
+  gnutls->priv->implicit_handshake =
+    g_simple_async_result_new (G_OBJECT (gnutls),
+			       implicit_handshake_completed, NULL,
+			       do_implicit_handshake);
+
+  if (blocking)
+    {
+      GError *my_error = NULL;
+      gboolean success;
+
+      g_mutex_unlock (&gnutls->priv->op_mutex);
+      handshake_thread (gnutls->priv->implicit_handshake,
+			G_OBJECT (gnutls),
+			cancellable);
+      success = finish_handshake (gnutls,
+				  gnutls->priv->implicit_handshake,
+				  &my_error);
+      g_clear_object (&gnutls->priv->implicit_handshake);
+      yield_op (gnutls, G_TLS_CONNECTION_GNUTLS_OP_HANDSHAKE);
+      g_mutex_lock (&gnutls->priv->op_mutex);
+
+      if (my_error)
+	g_propagate_error (error, my_error);
+      return success;
+    }
+  else
+    {
+      g_simple_async_result_run_in_thread (gnutls->priv->implicit_handshake,
+					   handshake_thread,
+					   G_PRIORITY_DEFAULT, cancellable);
+
+      g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
+			   _("Operation would block"));
+
+      return FALSE;
+    }
+}
+
 gssize
 g_tls_connection_gnutls_read (GTlsConnectionGnutls  *gnutls,
 			      void                  *buffer,
@@ -1098,12 +1385,15 @@ g_tls_connection_gnutls_read (GTlsConnectionGnutls  *gnutls,
   gssize ret;
 
  again:
-  if (handshake_in_progress_or_failed (gnutls, blocking, cancellable, error))
+  if (!claim_op (gnutls, G_TLS_CONNECTION_GNUTLS_OP_READ,
+		 blocking, cancellable, error))
     return -1;
 
-  BEGIN_GNUTLS_IO (gnutls, blocking, cancellable);
+  BEGIN_GNUTLS_IO (gnutls, G_IO_IN, blocking, cancellable);
   ret = gnutls_record_recv (gnutls->priv->session, buffer, count);
-  END_GNUTLS_IO (gnutls, ret, _("Error reading data from TLS socket: %s"), error);
+  END_GNUTLS_IO (gnutls, G_IO_IN, ret, _("Error reading data from TLS socket: %s"), error);
+
+  yield_op (gnutls, G_TLS_CONNECTION_GNUTLS_OP_READ);
 
   if (ret >= 0)
     return ret;
@@ -1124,12 +1414,15 @@ g_tls_connection_gnutls_write (GTlsConnectionGnutls  *gnutls,
   gssize ret;
 
  again:
-  if (handshake_in_progress_or_failed (gnutls, blocking, cancellable, error))
+  if (!claim_op (gnutls, G_TLS_CONNECTION_GNUTLS_OP_WRITE,
+		 blocking, cancellable, error))
     return -1;
 
-  BEGIN_GNUTLS_IO (gnutls, blocking, cancellable);
+  BEGIN_GNUTLS_IO (gnutls, G_IO_OUT, blocking, cancellable);
   ret = gnutls_record_send (gnutls->priv->session, buffer, count);
-  END_GNUTLS_IO (gnutls, ret, _("Error writing data to TLS socket: %s"), error);
+  END_GNUTLS_IO (gnutls, G_IO_OUT, ret, _("Error writing data to TLS socket: %s"), error);
+
+  yield_op (gnutls, G_TLS_CONNECTION_GNUTLS_OP_WRITE);
 
   if (ret >= 0)
     return ret;
@@ -1156,114 +1449,64 @@ g_tls_connection_gnutls_get_output_stream (GIOStream *stream)
 }
 
 static gboolean
-close_internal (GTlsConnectionGnutls  *gnutls,
-		gboolean               blocking,
-		GCancellable          *cancellable,
-		GError               **error)
-{
-  int ret;
-
-  /* If we haven't finished the initial handshake yet, there's no
-   * reason to finish it just so we can close.
-   */
-  if (!gnutls->priv->ever_handshaked)
-    return TRUE;
-
-  if (handshake_in_progress_or_failed (gnutls, blocking, cancellable, error))
-    return FALSE;
-
-  gnutls->priv->closing = TRUE;
-  BEGIN_GNUTLS_IO (gnutls, blocking, cancellable);
-  ret = gnutls_bye (gnutls->priv->session, GNUTLS_SHUT_WR);
-  END_GNUTLS_IO (gnutls, ret, _("Error performing TLS close: %s"), error);
-  if (ret == 0 || !error || !g_error_matches (*error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
-    gnutls->priv->closing = FALSE;
-
-  return ret == 0;
-}
-
-static gboolean
 g_tls_connection_gnutls_close (GIOStream     *stream,
 			       GCancellable  *cancellable,
 			       GError       **error)
 {
   GTlsConnectionGnutls *gnutls = G_TLS_CONNECTION_GNUTLS (stream);
+  gboolean success;
+  int ret;
 
-  if (!close_internal (gnutls, TRUE, cancellable, error))
+  if (!claim_op (gnutls, G_TLS_CONNECTION_GNUTLS_OP_CLOSE,
+		 TRUE, cancellable, error))
     return FALSE;
-  return g_io_stream_close (gnutls->priv->base_io_stream,
-			    cancellable, error);
-}
 
-typedef struct {
-  GSimpleAsyncResult *simple;
-  GCancellable *cancellable;
-  int io_priority;
-} AsyncCloseData;
+  if (gnutls->priv->closed)
+    {
+      g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
+			   _("Connection is already closed"));
+      yield_op (gnutls, G_TLS_CONNECTION_GNUTLS_OP_CLOSE);
+      return FALSE;
+    }
 
-static void
-close_base_stream_cb (GObject      *base_stream,
-		      GAsyncResult *result,
-		      gpointer      user_data)
-{
-  gboolean success;
-  GError *error = NULL;
-  AsyncCloseData *acd = user_data;
+  if (gnutls->priv->ever_handshaked)
+    {
+      BEGIN_GNUTLS_IO (gnutls, G_IO_IN | G_IO_OUT, TRUE, cancellable);
+      ret = gnutls_bye (gnutls->priv->session, GNUTLS_SHUT_WR);
+      END_GNUTLS_IO (gnutls, G_IO_IN | G_IO_OUT, ret,
+		     _("Error performing TLS close: %s"), error);
+    }
 
-  success = g_io_stream_close_finish (G_IO_STREAM (base_stream),
-				      result, &error);
-  if (success)
-    g_simple_async_result_set_op_res_gboolean (acd->simple, TRUE);
-  else
+  gnutls->priv->closed = TRUE;
+
+  if (ret != 0)
     {
-      g_simple_async_result_set_from_error (acd->simple, error);
-      g_error_free (error);
+      yield_op (gnutls, G_TLS_CONNECTION_GNUTLS_OP_CLOSE);
+      return FALSE;
     }
 
-  g_simple_async_result_complete (acd->simple);
-  g_object_unref (acd->simple);
-  if (acd->cancellable)
-    g_object_unref (acd->cancellable);
-  g_slice_free (AsyncCloseData, acd);
+  success = g_io_stream_close (gnutls->priv->base_io_stream,
+			       cancellable, error);
+  yield_op (gnutls, G_TLS_CONNECTION_GNUTLS_OP_CLOSE);
+  return success;
 }
 
-static gboolean
-g_tls_connection_gnutls_close_ready (GObject  *pollable_stream,
-				     gpointer  user_data)
+/* We do async close as synchronous-in-a-thread so we don't need to
+ * implement G_IO_IN/G_IO_OUT flip-flopping just for this one case
+ * (since handshakes are also done synchronously now).
+ */
+static void
+close_thread (GSimpleAsyncResult *result,
+	      GObject            *object,
+	      GCancellable       *cancellable)
 {
-  GTlsConnectionGnutls *gnutls;
-  AsyncCloseData *acd = user_data;
-  gboolean success;
+  GIOStream *stream = G_IO_STREAM (object);
   GError *error = NULL;
 
-  gnutls = G_TLS_CONNECTION_GNUTLS (g_async_result_get_source_object (G_ASYNC_RESULT (acd->simple)));
-  g_object_unref (gnutls);
-
-  success = close_internal (gnutls, FALSE, NULL, &error);
-  if (!success && g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
-    {
-      g_error_free (error);
-      return TRUE;
-    }
-
-  if (error)
-    {
-      g_simple_async_result_set_from_error (acd->simple, error);
-      g_simple_async_result_complete (acd->simple);
-      g_error_free (error);
-      g_object_unref (acd->simple);
-      if (acd->cancellable)
-	g_object_unref (acd->cancellable);
-      g_slice_free (AsyncCloseData, acd);
-    }
+  if (!g_tls_connection_gnutls_close (stream, cancellable, &error))
+    g_simple_async_result_take_error (result, error);
   else
-    {
-      g_io_stream_close_async (gnutls->priv->base_io_stream,
-			       acd->io_priority, acd->cancellable,
-			       close_base_stream_cb, acd);
-    }
-
-  return FALSE;
+    g_simple_async_result_set_op_res_gboolean (result, TRUE);
 }
 
 static void
@@ -1273,48 +1516,14 @@ g_tls_connection_gnutls_close_async (GIOStream           *stream,
 				     GAsyncReadyCallback  callback,
 				     gpointer             user_data)
 {
-  GTlsConnectionGnutls *gnutls = G_TLS_CONNECTION_GNUTLS (stream);
-  GSimpleAsyncResult *simple;
-  gboolean success;
-  GError *error = NULL;
-  AsyncCloseData *acd;
-  GSource *source;
+  GSimpleAsyncResult *result;
 
-  simple = g_simple_async_result_new (G_OBJECT (stream), callback, user_data,
+  result = g_simple_async_result_new (G_OBJECT (stream),
+				      callback, user_data,
 				      g_tls_connection_gnutls_close_async);
-
-  success = close_internal (gnutls, FALSE, cancellable, &error);
-  if (error && !g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
-    {
-      g_simple_async_result_set_from_error (simple, error);
-      g_error_free (error);
-      g_simple_async_result_complete_in_idle (simple);
-      g_object_unref (simple);
-    }
-
-  if (error)
-    g_error_free (error);
-
-  acd = g_slice_new (AsyncCloseData);
-  acd->simple = simple;
-  acd->cancellable = cancellable ? g_object_ref (cancellable) : cancellable;
-  acd->io_priority = io_priority;
-
-  if (success)
-    {
-      g_io_stream_close_async (gnutls->priv->base_io_stream,
-			       io_priority, cancellable,
-			       close_base_stream_cb, acd);
-      return;
-    }
-
-  source = g_tls_connection_gnutls_create_source (gnutls, 0, acd->cancellable);
-  g_source_set_callback (source,
-			 (GSourceFunc) g_tls_connection_gnutls_close_ready,
-			 acd, NULL);
-  g_source_set_priority (source, acd->io_priority);
-  g_source_attach (source, g_main_context_get_thread_default ());
-  g_source_unref (source);
+  g_simple_async_result_run_in_thread (result, close_thread,
+				       io_priority, cancellable);
+  g_object_unref (result);
 }
 
 static gboolean
diff --git a/tls/tests/connection.c b/tls/tests/connection.c
index 30fdb4e..751490d 100644
--- a/tls/tests/connection.c
+++ b/tls/tests/connection.c
@@ -41,6 +41,9 @@ typedef struct {
   gboolean rehandshake;
   GTlsCertificateFlags accept_flags;
   GError *read_error;
+
+  char buf[128];
+  gssize nread, nwrote;
 } TestConnection;
 
 static void
@@ -75,7 +78,7 @@ teardown_connection (TestConnection *test, gconstpointer data)
       g_object_add_weak_pointer (G_OBJECT (test->service), (gpointer *)&test->service);
       g_object_unref (test->service);
       while (test->service)
-	g_main_context_iteration (NULL, TRUE);
+	g_main_context_iteration (NULL, FALSE);
     }
 
   if (test->server_connection)
@@ -212,7 +215,7 @@ on_incoming_connection (GSocketService     *service,
 }
 
 static void
-start_server_service (TestConnection *test, GTlsAuthenticationMode auth_mode)
+start_async_server_service (TestConnection *test, GTlsAuthenticationMode auth_mode)
 {
   GError *error = NULL;
 
@@ -227,14 +230,101 @@ start_server_service (TestConnection *test, GTlsAuthenticationMode auth_mode)
   g_signal_connect (test->service, "incoming", G_CALLBACK (on_incoming_connection), test);
 }
 
-static GIOStream*
-start_server_and_connect_to_it (TestConnection *test, GTlsAuthenticationMode auth_mode)
+static GIOStream *
+start_async_server_and_connect_to_it (TestConnection *test, GTlsAuthenticationMode auth_mode)
+{
+  GSocketClient *client;
+  GError *error = NULL;
+  GSocketConnection *connection;
+
+  start_async_server_service (test, auth_mode);
+
+  client = g_socket_client_new ();
+  connection = g_socket_client_connect (client, G_SOCKET_CONNECTABLE (test->address),
+                                        NULL, &error);
+  g_assert_no_error (error);
+  g_object_unref (client);
+
+  return G_IO_STREAM (connection);
+}
+
+static void
+run_echo_server (GThreadedSocketService *service,
+		 GSocketConnection      *connection,
+		 GObject                *source_object,
+		 gpointer                user_data)
+{
+  TestConnection *test = user_data;
+  GTlsConnection *tlsconn;
+  GTlsCertificate *cert;
+  GError *error = NULL;
+  GInputStream *istream;
+  GOutputStream *ostream;
+  gssize nread, nwrote, total;
+  gchar buf[128];
+
+  cert = g_tls_certificate_new_from_file (TEST_FILE ("server-and-key.pem"), &error);
+  g_assert_no_error (error);
+
+  test->server_connection = g_tls_server_connection_new (G_IO_STREAM (connection),
+                                                         cert, &error);
+  g_assert_no_error (error);
+  g_object_unref (cert);
+
+  tlsconn = G_TLS_CONNECTION (test->server_connection);
+  g_tls_connection_handshake (tlsconn, NULL, &error);
+  g_assert_no_error (error);
+
+  istream = g_io_stream_get_input_stream (test->server_connection);
+  ostream = g_io_stream_get_output_stream (test->server_connection);
+
+  while (TRUE)
+    {
+      nread = g_input_stream_read (istream, buf, sizeof (buf), NULL, &error);
+      g_assert_no_error (error);
+      g_assert_cmpint (nread, >=, 0);
+
+      if (nread == 0)
+	break;
+
+      for (total = 0; total < nread; total += nwrote)
+	{
+	  nwrote = g_output_stream_write (ostream, buf + total, nread - total, NULL, &error);
+	  g_assert_no_error (error);
+	}
+
+      if (test->rehandshake)
+	{
+	  test->rehandshake = FALSE;
+	  g_tls_connection_handshake (tlsconn, NULL, &error);
+	  g_assert_no_error (error);
+	}
+    }
+}
+
+static void
+start_echo_server_service (TestConnection *test)
+{
+  GError *error = NULL;
+
+  test->service = g_threaded_socket_service_new (5);
+  g_socket_listener_add_address (G_SOCKET_LISTENER (test->service),
+                                 G_SOCKET_ADDRESS (test->address),
+                                 G_SOCKET_TYPE_STREAM, G_SOCKET_PROTOCOL_TCP,
+                                 NULL, NULL, &error);
+  g_assert_no_error (error);
+
+  g_signal_connect (test->service, "run", G_CALLBACK (run_echo_server), test);
+}
+
+static GIOStream *
+start_echo_server_and_connect_to_it (TestConnection *test)
 {
   GSocketClient *client;
   GError *error = NULL;
   GSocketConnection *connection;
 
-  start_server_service (test, auth_mode);
+  start_echo_server_service (test);
 
   client = g_socket_client_new ();
   connection = g_socket_client_connect (client, G_SOCKET_CONNECTABLE (test->address),
@@ -289,7 +379,7 @@ test_basic_connection (TestConnection *test,
   GIOStream *connection;
   GError *error = NULL;
 
-  connection = start_server_and_connect_to_it (test, G_TLS_AUTHENTICATION_NONE);
+  connection = start_async_server_and_connect_to_it (test, G_TLS_AUTHENTICATION_NONE);
   test->client_connection = g_tls_client_connection_new (connection, test->identity, &error);
   g_assert_no_error (error);
   g_object_unref (connection);
@@ -314,7 +404,7 @@ test_verified_connection (TestConnection *test,
   g_assert_no_error (error);
   g_assert (test->database);
 
-  connection = start_server_and_connect_to_it (test, G_TLS_AUTHENTICATION_NONE);
+  connection = start_async_server_and_connect_to_it (test, G_TLS_AUTHENTICATION_NONE);
   test->client_connection = g_tls_client_connection_new (connection, test->identity, &error);
   g_assert_no_error (error);
   g_assert (test->client_connection);
@@ -343,7 +433,7 @@ test_client_auth_connection (TestConnection *test,
   g_assert_no_error (error);
   g_assert (test->database);
 
-  connection = start_server_and_connect_to_it (test, G_TLS_AUTHENTICATION_REQUIRED);
+  connection = start_async_server_and_connect_to_it (test, G_TLS_AUTHENTICATION_REQUIRED);
   test->client_connection = g_tls_client_connection_new (connection, test->identity, &error);
   g_assert_no_error (error);
   g_assert (test->client_connection);
@@ -381,7 +471,7 @@ test_connection_no_database (TestConnection *test,
   GIOStream *connection;
   GError *error = NULL;
 
-  connection = start_server_and_connect_to_it (test, G_TLS_AUTHENTICATION_NONE);
+  connection = start_async_server_and_connect_to_it (test, G_TLS_AUTHENTICATION_NONE);
   test->client_connection = g_tls_client_connection_new (connection, test->identity, &error);
   g_assert_no_error (error);
   g_assert (test->client_connection);
@@ -427,7 +517,7 @@ test_failed_connection (TestConnection *test,
   GError *error = NULL;
   GSocketConnectable *bad_addr;
 
-  connection = start_server_and_connect_to_it (test, G_TLS_AUTHENTICATION_NONE);
+  connection = start_async_server_and_connect_to_it (test, G_TLS_AUTHENTICATION_NONE);
 
   bad_addr = g_network_address_new ("wrong.example.com", 80);
   test->client_connection = g_tls_client_connection_new (connection, bad_addr, &error);
@@ -475,7 +565,7 @@ test_connection_socket_client (TestConnection *test,
   GIOStream *base;
   GError *error = NULL;
 
-  start_server_service (test, G_TLS_AUTHENTICATION_NONE);
+  start_async_server_service (test, G_TLS_AUTHENTICATION_NONE);
   client = g_socket_client_new ();
   g_socket_client_set_tls (client, TRUE);
   flags = G_TLS_CERTIFICATE_VALIDATE_ALL & ~G_TLS_CERTIFICATE_UNKNOWN_CA;
@@ -523,7 +613,7 @@ test_connection_socket_client_failed (TestConnection *test,
 {
   GSocketClient *client;
 
-  start_server_service (test, G_TLS_AUTHENTICATION_NONE);
+  start_async_server_service (test, G_TLS_AUTHENTICATION_NONE);
   client = g_socket_client_new ();
   g_socket_client_set_tls (client, TRUE);
   /* this time we don't adjust the validation flags */
@@ -535,6 +625,201 @@ test_connection_socket_client_failed (TestConnection *test,
   g_object_unref (client);
 }
 
+static void
+simul_async_read_complete (GObject      *object,
+			   GAsyncResult *result,
+			   gpointer      user_data)
+{
+  TestConnection *test = user_data;
+  gssize nread;
+  GError *error = NULL;
+
+  nread = g_input_stream_read_finish (G_INPUT_STREAM (object),
+				      result, &error);
+  g_assert_no_error (error);
+
+  test->nread += nread;
+  g_assert_cmpint (test->nread, <=, TEST_DATA_LENGTH);
+
+  if (test->nread == TEST_DATA_LENGTH)
+    {
+      g_io_stream_close (test->client_connection, NULL, &error);
+      g_assert_no_error (error);
+      g_main_loop_quit (test->loop);
+    }
+  else
+    {
+      g_input_stream_read_async (G_INPUT_STREAM (object),
+				 test->buf + test->nread,
+				 TEST_DATA_LENGTH / 2,
+				 G_PRIORITY_DEFAULT, NULL,
+				 simul_async_read_complete, test);
+    }
+}
+
+static void
+simul_async_write_complete (GObject      *object,
+			    GAsyncResult *result,
+			    gpointer      user_data)
+{
+  TestConnection *test = user_data;
+  gssize nwrote;
+  GError *error = NULL;
+
+  nwrote = g_output_stream_write_finish (G_OUTPUT_STREAM (object),
+					 result, &error);
+  g_assert_no_error (error);
+
+  test->nwrote += nwrote;
+  if (test->nwrote < TEST_DATA_LENGTH)
+    {
+      g_output_stream_write_async (G_OUTPUT_STREAM (object),
+				   TEST_DATA + test->nwrote,
+				   TEST_DATA_LENGTH - test->nwrote,
+				   G_PRIORITY_DEFAULT, NULL,
+				   simul_async_write_complete, test);
+    }
+}
+
+static void
+test_simultaneous_async (TestConnection *test,
+			 gconstpointer   data)
+{
+  GIOStream *connection;
+  GTlsCertificateFlags flags;
+  GError *error = NULL;
+
+  connection = start_echo_server_and_connect_to_it (test);
+  test->client_connection = g_tls_client_connection_new (connection, test->identity, &error);
+  g_assert_no_error (error);
+  g_object_unref (connection);
+
+  flags = G_TLS_CERTIFICATE_VALIDATE_ALL &
+    ~(G_TLS_CERTIFICATE_UNKNOWN_CA | G_TLS_CERTIFICATE_BAD_IDENTITY);
+  g_tls_client_connection_set_validation_flags (G_TLS_CLIENT_CONNECTION (test->client_connection),
+                                                flags);
+
+  memset (test->buf, 0, sizeof (test->buf));
+  test->nread = test->nwrote = 0;
+
+  g_input_stream_read_async (g_io_stream_get_input_stream (test->client_connection),
+			     test->buf, TEST_DATA_LENGTH / 2,
+			     G_PRIORITY_DEFAULT, NULL,
+			     simul_async_read_complete, test);
+  g_output_stream_write_async (g_io_stream_get_output_stream (test->client_connection),
+			       TEST_DATA, TEST_DATA_LENGTH / 2,
+			       G_PRIORITY_DEFAULT, NULL,
+			       simul_async_write_complete, test);
+
+  g_main_loop_run (test->loop);
+
+  g_assert_cmpint (test->nread, ==, TEST_DATA_LENGTH);
+  g_assert_cmpint (test->nwrote, ==, TEST_DATA_LENGTH);
+  g_assert_cmpstr (test->buf, ==, TEST_DATA);
+}
+
+static void
+test_simultaneous_async_rehandshake (TestConnection *test,
+				     gconstpointer   data)
+{
+  test->rehandshake = TRUE;
+  test_simultaneous_async (test, data);
+}
+
+static gpointer
+simul_read_thread (gpointer user_data)
+{
+  TestConnection *test = user_data;
+  GInputStream *istream = g_io_stream_get_input_stream (test->client_connection);
+  GError *error = NULL;
+  gssize nread;
+
+  while (test->nread < TEST_DATA_LENGTH)
+    {
+      nread = g_input_stream_read (istream,
+				   test->buf + test->nread,
+				   MIN (TEST_DATA_LENGTH / 2, TEST_DATA_LENGTH - test->nread),
+				   NULL, &error);
+      g_assert_no_error (error);
+
+      test->nread += nread;
+    }
+
+  return NULL;
+}
+
+static gpointer
+simul_write_thread (gpointer user_data)
+{
+  TestConnection *test = user_data;
+  GOutputStream *ostream = g_io_stream_get_output_stream (test->client_connection);
+  GError *error = NULL;
+  gssize nwrote;
+
+  while (test->nwrote < TEST_DATA_LENGTH)
+    {
+      nwrote = g_output_stream_write (ostream,
+				      TEST_DATA + test->nwrote,
+				      MIN (TEST_DATA_LENGTH / 2, TEST_DATA_LENGTH - test->nwrote),
+				      NULL, &error);
+      g_assert_no_error (error);
+
+      test->nwrote += nwrote;
+    }
+
+  return NULL;
+}
+
+static void
+test_simultaneous_sync (TestConnection *test,
+			gconstpointer   data)
+{
+  GIOStream *connection;
+  GTlsCertificateFlags flags;
+  GError *error = NULL;
+  GThread *read_thread, *write_thread;
+
+  connection = start_echo_server_and_connect_to_it (test);
+  test->client_connection = g_tls_client_connection_new (connection, test->identity, &error);
+  g_assert_no_error (error);
+  g_object_unref (connection);
+
+  flags = G_TLS_CERTIFICATE_VALIDATE_ALL &
+    ~(G_TLS_CERTIFICATE_UNKNOWN_CA | G_TLS_CERTIFICATE_BAD_IDENTITY);
+  g_tls_client_connection_set_validation_flags (G_TLS_CLIENT_CONNECTION (test->client_connection),
+                                                flags);
+
+  memset (test->buf, 0, sizeof (test->buf));
+  test->nread = test->nwrote = 0;
+
+  read_thread = g_thread_new ("reader", simul_read_thread, test);
+  write_thread = g_thread_new ("writer", simul_write_thread, test);
+
+  /* We need to run the main loop to get the GThreadedSocketService to
+   * receive the connection and spawn the server thread.
+   */
+  while (!test->server_connection)
+    g_main_context_iteration (NULL, FALSE);
+
+  g_thread_join (write_thread);
+  g_thread_join (read_thread);
+
+  g_assert_cmpint (test->nread, ==, TEST_DATA_LENGTH);
+  g_assert_cmpint (test->nwrote, ==, TEST_DATA_LENGTH);
+  g_assert_cmpstr (test->buf, ==, TEST_DATA);
+
+  g_io_stream_close (test->client_connection, NULL, &error);
+  g_assert_no_error (error);
+}
+
+static void
+test_simultaneous_sync_rehandshake (TestConnection *test,
+				    gconstpointer   data)
+{
+  test->rehandshake = TRUE;
+  test_simultaneous_sync (test, data);
+}
+
 int
 main (int   argc,
       char *argv[])
@@ -564,6 +849,14 @@ main (int   argc,
               setup_connection, test_connection_socket_client, teardown_connection);
   g_test_add ("/tls/connection/socket-client-failed", TestConnection, NULL,
               setup_connection, test_connection_socket_client_failed, teardown_connection);
+  g_test_add ("/tls/connection/simultaneous-async", TestConnection, NULL,
+              setup_connection, test_simultaneous_async, teardown_connection);
+  g_test_add ("/tls/connection/simultaneous-sync", TestConnection, NULL,
+	      setup_connection, test_simultaneous_sync, teardown_connection);
+  g_test_add ("/tls/connection/simultaneous-async-rehandshake", TestConnection, NULL,
+              setup_connection, test_simultaneous_async_rehandshake, teardown_connection);
+  g_test_add ("/tls/connection/simultaneous-sync-rehandshake", TestConnection, NULL,
+	      setup_connection, test_simultaneous_sync_rehandshake, teardown_connection);
 
   ret = g_test_run();
 



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]