[glib-networking/mcatanzaro/tls-thread: 11/14] progress



commit 281149b661bd14af9f8b6084707cdf08310f4bee
Author: Michael Catanzaro <mcatanzaro gnome org>
Date:   Wed Dec 4 21:53:05 2019 -0600

    progress

 tls/base/gtlsconnection-base.h       |   2 +-
 tls/base/gtlsoperationsthread-base.c | 215 ++++++++++++++++++++++++-----------
 2 files changed, 150 insertions(+), 67 deletions(-)
---
diff --git a/tls/base/gtlsconnection-base.h b/tls/base/gtlsconnection-base.h
index 8be17f1..33a89ec 100644
--- a/tls/base/gtlsconnection-base.h
+++ b/tls/base/gtlsconnection-base.h
@@ -107,7 +107,7 @@ gboolean                  g_tls_connection_base_handshake_thread_verify_certific
 
 void                      g_tls_connection_base_push_io                 (GTlsConnectionBase *tls,
                                                                          GIOCondition        direction,
-                                                                         gint64              timeout,
+                                                                         gint64              timeout, /* 
FIXME: remove timeout */
                                                                          GCancellable       *cancellable);
 GTlsConnectionBaseStatus  g_tls_connection_base_pop_io                  (GTlsConnectionBase  *tls,
                                                                          GIOCondition         direction,
diff --git a/tls/base/gtlsoperationsthread-base.c b/tls/base/gtlsoperationsthread-base.c
index 6d1d642..8db66a3 100644
--- a/tls/base/gtlsoperationsthread-base.c
+++ b/tls/base/gtlsoperationsthread-base.c
@@ -74,9 +74,6 @@ typedef struct {
   GThread *op_thread;
   GMainContext *op_thread_context;
 
-  /* Important: when pushing ops onto the queue, call g_main_context_wakeup()
-   * on op_thread_context to ensure the op is noticed.
-   */
   GAsyncQueue *queue;
 } GTlsOperationsThreadBasePrivate;
 
@@ -96,8 +93,15 @@ typedef struct {
   GTlsConnectionBase *connection; /* FIXME: threadsafety nightmare, not OK */
 
   /* Input */
-  void *data; /* unowned */
-  gsize size;
+  union {
+    void *data;
+    GInputVector *input_vectors;
+    GOutputVector *output_vectors;
+  } /* unowned */;
+  union {
+    gsize size; /* for non-vectored data buffer */
+    guint num_vectors;
+  };
   gint64 timeout;
   gint64 start_time;
   GCancellable *cancellable;
@@ -154,23 +158,76 @@ g_tls_thread_operation_new (GTlsThreadOperationType   type,
   switch (type)
     {
     case G_TLS_THREAD_OP_READ:
-      /* fallthrough */
-    case G_TLS_THREAD_OP_READ_MESSAGE:
       op->io_condition = G_IO_IN;
       break;
     case G_TLS_THREAD_OP_WRITE:
-      /* fallthough */
-    case G_TLS_THREAD_OP_WRITE_MESSAGE:
       op->io_condition = G_IO_OUT;
       break;
-    /* FIXME: more pls */
-    case G_TLS_THREAD_OP_SHUTDOWN:
-      break;
+    default:
+      g_assert_not_reached ();
     }
 
   return op;
 }
 
+static GTlsThreadOperation *
+g_tls_thread_operation_new_with_input_vectors (GTlsThreadOperationType   type,
+                                               GTlsOperationsThreadBase *thread,
+                                               GTlsConnectionBase       *connection,
+                                               GInputVector             *vectors,
+                                               guint                     num_vectors,
+                                               gint64                    timeout,
+                                               GCancellable             *cancellable)
+{
+  GTlsThreadOperation *op;
+
+  g_assert (type == G_TLS_THREAD_OP_READ_MESSAGE);
+
+  op = g_new0 (GTlsThreadOperation, 1);
+  op->type = type;
+  op->io_condition = G_IO_IN;
+  op->thread = thread; /* FIXME: use a weak ref? */
+  op->connection = g_object_ref (connection);
+  op->input_vectors = vectors;
+  op->num_vectors = num_vectors;
+  op->timeout = timeout;
+  op->cancellable = cancellable ? g_object_ref (cancellable) : NULL;
+
+  g_mutex_init (&op->finished_mutex);
+  g_cond_init (&op->finished_condition);
+
+  return op;
+}
+
+static GTlsThreadOperation *
+g_tls_thread_operation_new_with_output_vectors (GTlsThreadOperationType   type,
+                                                GTlsOperationsThreadBase *thread,
+                                                GTlsConnectionBase       *connection,
+                                                GOutputVector            *vectors,
+                                                guint                     num_vectors,
+                                                gint64                    timeout,
+                                                GCancellable             *cancellable)
+{
+  GTlsThreadOperation *op;
+
+  g_assert (type == G_TLS_THREAD_OP_WRITE_MESSAGE);
+
+  op = g_new0 (GTlsThreadOperation, 1);
+  op->type = type;
+  op->io_condition = G_IO_OUT;
+  op->thread = thread; /* FIXME: use a weak ref? */
+  op->connection = g_object_ref (connection);
+  op->output_vectors = vectors;
+  op->num_vectors = num_vectors;
+  op->timeout = timeout;
+  op->cancellable = cancellable ? g_object_ref (cancellable) : NULL;
+
+  g_mutex_init (&op->finished_mutex);
+  g_cond_init (&op->finished_condition);
+
+  return op;
+}
+
 static GTlsThreadOperation *
 g_tls_thread_shutdown_operation_new (void)
 {
@@ -214,44 +271,54 @@ g_tls_operations_thread_base_get_connection (GTlsOperationsThreadBase *self)
   return priv->connection;
 }
 
-GTlsConnectionBaseStatus
-g_tls_operations_thread_base_read (GTlsOperationsThreadBase  *self,
-                                   void                      *buffer,
-                                   gsize                      size,
-                                   gint64                     timeout,
-                                   gssize                    *nread,
-                                   GCancellable              *cancellable,
-                                   GError                   **error)
+static GTlsConnectionBaseStatus
+execute_op (GTlsOperationsThreadBase *self,
+            GTlsThreadOperation      *op /* owned */,
+            gssize                   *count,
+            GError                  **error)
 {
   GTlsOperationsThreadBasePrivate *priv = g_tls_operations_thread_base_get_instance_private (self);
-  GTlsThreadOperation *op;
   GTlsConnectionBaseStatus result;
 
-  op = g_tls_thread_operation_new (G_TLS_THREAD_OP_READ,
-                                   self,
-                                   priv->connection,
-                                   buffer, size, timeout,
-                                   cancellable);
   g_async_queue_push (priv->queue, op);
   g_main_context_wakeup (priv->op_thread_context);
 
   wait_for_op_completion (op);
 
-  *nread = op->count;
-
+  *count = op->count;
+  result = op->result;
   if (op->error)
     {
       g_propagate_error (error, op->error);
       op->error = NULL;
     }
 
-  result = op->result;
-
   g_tls_thread_operation_free (op);
 
   return result;
 }
 
+GTlsConnectionBaseStatus
+g_tls_operations_thread_base_read (GTlsOperationsThreadBase  *self,
+                                   void                      *buffer,
+                                   gsize                      size,
+                                   gint64                     timeout,
+                                   gssize                    *nread,
+                                   GCancellable              *cancellable,
+                                   GError                   **error)
+{
+  GTlsOperationsThreadBasePrivate *priv = g_tls_operations_thread_base_get_instance_private (self);
+  GTlsThreadOperation *op;
+
+  op = g_tls_thread_operation_new (G_TLS_THREAD_OP_READ,
+                                   self,
+                                   priv->connection,
+                                   buffer, size, timeout,
+                                   cancellable);
+
+  return execute_op (self, g_steal_pointer (&op), nread, error);
+}
+
 GTlsConnectionBaseStatus
 g_tls_operations_thread_base_read_message (GTlsOperationsThreadBase  *self,
                                            GInputVector              *vectors,
@@ -261,8 +328,16 @@ g_tls_operations_thread_base_read_message (GTlsOperationsThreadBase  *self,
                                            GCancellable              *cancellable,
                                            GError                   **error)
 {
-  /* FIXME: not enough room in GTlsThreadOperation to store the parameters */
-  g_assert_not_reached ();
+  GTlsOperationsThreadBasePrivate *priv = g_tls_operations_thread_base_get_instance_private (self);
+  GTlsThreadOperation *op;
+
+  op = g_tls_thread_operation_new_with_input_vectors (G_TLS_THREAD_OP_READ_MESSAGE,
+                                                      self,
+                                                      priv->connection,
+                                                      vectors, num_vectors, timeout,
+                                                      cancellable);
+
+  return execute_op (self, g_steal_pointer (&op), nread, error);
 }
 
 GTlsConnectionBaseStatus
@@ -276,31 +351,14 @@ g_tls_operations_thread_base_write (GTlsOperationsThreadBase  *self,
 {
   GTlsOperationsThreadBasePrivate *priv = g_tls_operations_thread_base_get_instance_private (self);
   GTlsThreadOperation *op;
-  GTlsConnectionBaseStatus result;
 
   op = g_tls_thread_operation_new (G_TLS_THREAD_OP_WRITE,
                                    self,
                                    priv->connection,
                                    (void *)buffer, size, timeout,
                                    cancellable);
-  g_async_queue_push (priv->queue, op);
-  g_main_context_wakeup (priv->op_thread_context);
-
-  wait_for_op_completion (op);
-
-  *nwrote = op->count;
-
-  if (op->error)
-    {
-      g_propagate_error (error, op->error);
-      op->error = NULL;
-    }
 
-  result = op->result;
-
-  g_tls_thread_operation_free (op);
-
-  return result;
+  return execute_op (self, g_steal_pointer (&op), nwrote, error);
 }
 
 GTlsConnectionBaseStatus
@@ -312,8 +370,16 @@ g_tls_operations_thread_base_write_message (GTlsOperationsThreadBase  *self,
                                             GCancellable              *cancellable,
                                             GError                   **error)
 {
-  /* FIXME: not enough room in GTlsThreadOperation to store the parameters */
-  g_assert_not_reached ();
+  GTlsOperationsThreadBasePrivate *priv = g_tls_operations_thread_base_get_instance_private (self);
+  GTlsThreadOperation *op;
+
+  op = g_tls_thread_operation_new_with_output_vectors (G_TLS_THREAD_OP_WRITE_MESSAGE,
+                                                       self,
+                                                       priv->connection,
+                                                       vectors, num_vectors, timeout,
+                                                       cancellable);
+
+  return execute_op (self, g_steal_pointer (&op), nwrote, error);
 }
 
 typedef struct {
@@ -567,6 +633,7 @@ process_op (GAsyncQueue         *queue,
             GMainLoop           *main_loop)
 {
   GTlsThreadOperation *op;
+  GTlsOperationsThreadBaseClass *base_class;
 
   if (delayed_op)
     {
@@ -620,30 +687,46 @@ process_op (GAsyncQueue         *queue,
       adjust_op_timeout (op);
     }
 
+  if (op->type != G_TLS_THREAD_OP_SHUTDOWN)
+    {
+      g_assert (op->thread);
+      base_class = G_TLS_OPERATIONS_THREAD_BASE_GET_CLASS (op->thread);
+    }
+
   switch (op->type)
     {
     /* FIXME: handle all op types, including handshake and directional closes */
     case G_TLS_THREAD_OP_READ:
-      op->result = G_TLS_OPERATIONS_THREAD_BASE_GET_CLASS (op->thread)->read_fn (op->thread,
-                                                                                 op->data, op->size,
-                                                                                 &op->count,
-                                                                                 op->cancellable,
-                                                                                 &op->error);
+      g_assert (base_class->read_fn);
+      op->result = base_class->read_fn (op->thread,
+                                        op->data, op->size,
+                                        &op->count,
+                                        op->cancellable,
+                                        &op->error);
       break;
     case G_TLS_THREAD_OP_READ_MESSAGE:
-      g_assert (G_TLS_OPERATIONS_THREAD_BASE_GET_CLASS (op->thread)->read_message_fn);
-      g_assert_not_reached (); /* FIXME */
+      g_assert (base_class->read_message_fn);
+      op->result = base_class->read_message_fn (op->thread,
+                                                op->input_vectors, op->num_vectors,
+                                                &op->count,
+                                                op->cancellable,
+                                                &op->error);
       break;
     case G_TLS_THREAD_OP_WRITE:
-      op->result = G_TLS_OPERATIONS_THREAD_BASE_GET_CLASS (op->thread)->write_fn (op->thread,
-                                                                                  op->data, op->size,
-                                                                                  &op->count,
-                                                                                  op->cancellable,
-                                                                                  &op->error);
+      g_assert (base_class->write_fn);
+      op->result = base_class->write_fn (op->thread,
+                                         op->data, op->size,
+                                         &op->count,
+                                         op->cancellable,
+                                         &op->error);
       break;
     case G_TLS_THREAD_OP_WRITE_MESSAGE:
-      g_assert (G_TLS_OPERATIONS_THREAD_BASE_GET_CLASS (op->thread)->write_message_fn);
-      g_assert_not_reached (); /* FIXME */
+      g_assert (base_class->write_message_fn);
+      op->result = base_class->write_message_fn (op->thread,
+                                                 op->output_vectors, op->num_vectors,
+                                                 &op->count,
+                                                 op->cancellable,
+                                                 &op->error);
       break;
     case G_TLS_THREAD_OP_SHUTDOWN:
       g_assert_not_reached ();


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]