[gnome-online-accounts/wip/rishi/gtask: 3/4] kerberos-identity-manager: Port to GThreadPool



commit 8d54812227930f530930b777a9ec5e64d5ce5dc5
Author: Debarshi Ray <debarshir gnome org>
Date:   Thu Jul 25 19:21:48 2019 +0200

    kerberos-identity-manager: Port to GThreadPool
    
    The destruction sequence of a GoaKerberosIdentityManager instance is
    simpler now because the GThreadPool makes some things easier, as
    compared to having to directly handle a GAsyncQueue.
    
    As before, there is a object-wide GCancellable and separate
    GCancellables for each GAsyncResult-based cancellable action; and there
    are two kinds of operations that get inserted into the thread pool -
    those that are based on the GAsyncResult idiom, and those that are
    internal fire-and-forget style operations. Note that due to the
    semantics of the GAsyncResult idiom, a GoaKerberosIdentityManager
    instance will not be destroyed while such a call is in flight because
    it holds a reference to the instance.
    
    When a GoaKerberosIdentityManager instance gets destroyed, the first
    step is to cancel the object-wide GCancellable, and wait for any
    ongoing to operation to complete. Then the GThreadPool gets cleared and
    destroyed.
    
    https://bugzilla.gnome.org/show_bug.cgi?id=764157

 src/goaidentity/goakerberosidentitymanager.c | 265 ++++++++++++---------------
 1 file changed, 117 insertions(+), 148 deletions(-)
---
diff --git a/src/goaidentity/goakerberosidentitymanager.c b/src/goaidentity/goakerberosidentitymanager.c
index 721bce89..ea589924 100644
--- a/src/goaidentity/goakerberosidentitymanager.c
+++ b/src/goaidentity/goakerberosidentitymanager.c
@@ -40,8 +40,8 @@ struct _GoaKerberosIdentityManagerPrivate
   GHashTable *identities;
   GHashTable *expired_identities;
   GHashTable *identities_by_realm;
-  GAsyncQueue *pending_operations;
   GCancellable *scheduler_cancellable;
+  GThreadPool *thread_pool;
 
   krb5_context kerberos_context;
   GFileMonitor *credentials_cache_monitor;
@@ -64,13 +64,13 @@ typedef enum
   OPERATION_TYPE_LIST,
   OPERATION_TYPE_RENEW,
   OPERATION_TYPE_SIGN_IN,
-  OPERATION_TYPE_SIGN_OUT,
-  OPERATION_TYPE_STOP_JOB
+  OPERATION_TYPE_SIGN_OUT
 } OperationType;
 
 typedef struct
 {
   GCancellable *cancellable;
+  GCancellable *scheduler_cancellable;
   GoaKerberosIdentityManager *manager;
   OperationType type;
   GMainContext *context;
@@ -131,6 +131,8 @@ operation_new (GoaKerberosIdentityManager *self,
   operation = g_slice_new0 (Operation);
 
   operation->manager = self;
+  operation->scheduler_cancellable = g_object_ref (self->priv->scheduler_cancellable);
+
   operation->type = type;
 
   if (cancellable == NULL)
@@ -154,6 +156,7 @@ static void
 operation_free (Operation *operation)
 {
   g_clear_object (&operation->cancellable);
+  g_clear_object (&operation->scheduler_cancellable);
   g_clear_pointer (&operation->context, g_main_context_unref);
 
   if (operation->type != OPERATION_TYPE_SIGN_IN &&
@@ -255,7 +258,7 @@ schedule_refresh (GoaKerberosIdentityManager *self)
   g_atomic_int_inc (&self->priv->pending_refresh_count);
 
   operation = operation_new (self, NULL, OPERATION_TYPE_REFRESH, NULL);
-  g_async_queue_push (self->priv->pending_operations, operation);
+  g_thread_pool_push (self->priv->thread_pool, operation, NULL);
 }
 
 static IdentitySignalWork *
@@ -1037,116 +1040,96 @@ wait_for_scheduler_job_to_become_unblocked (GoaKerberosIdentityManager *self)
 }
 
 static void
-on_job_cancelled (GCancellable               *cancellable,
-                  GoaKerberosIdentityManager *self)
+on_scheduler_cancellable_cancelled (GCancellable               *cancellable,
+                                    GoaKerberosIdentityManager *self)
 {
-  Operation *operation;
-  operation = operation_new (self, cancellable, OPERATION_TYPE_STOP_JOB, NULL);
-  g_async_queue_push (self->priv->pending_operations, operation);
-
   stop_blocking_scheduler_job (self);
 }
 
-static gboolean
-on_job_scheduled (GIOSchedulerJob            *job,
-                  GCancellable               *cancellable,
-                  GoaKerberosIdentityManager *self)
+static void
+goa_kerberos_identity_manager_thread_pool_func (gpointer data, gpointer user_data)
 {
-  GAsyncQueue *pending_operations;
-
-  g_assert (cancellable != NULL);
-
-  g_cancellable_connect (cancellable, G_CALLBACK (on_job_cancelled), self, NULL);
+  Operation *operation = (Operation *) data;
+  GoaKerberosIdentityManager *self = GOA_KERBEROS_IDENTITY_MANAGER (operation->manager);
+  GError *error;
+  gboolean processed_operation = FALSE;
 
-  /* Take ownership of queue, since we may out live the identity manager */
-  pending_operations = g_async_queue_ref (self->priv->pending_operations);
-  while (!g_cancellable_is_cancelled (cancellable))
+  error = NULL;
+  if (operation->result != NULL && g_cancellable_set_error_if_cancelled (operation->cancellable, &error))
     {
-      Operation *operation;
-      gboolean processed_operation = FALSE;
-      GError *error = NULL;
-
-      operation = g_async_queue_pop (pending_operations);
-
-      if (operation->result != NULL &&
-          g_cancellable_set_error_if_cancelled (operation->cancellable,
-                                                &error))
-        {
-          g_simple_async_result_take_error (operation->result, error);
-          g_simple_async_result_complete_in_idle (operation->result);
-          g_clear_object (&operation->result);
-          continue;
-        }
-
-      switch (operation->type)
-        {
-        case OPERATION_TYPE_STOP_JOB:
-          /* do nothing, loop will exit next iteration since cancellable
-           * is cancelled
-           */
-          g_assert (g_cancellable_is_cancelled (cancellable));
-          operation_free (operation);
-          continue;
-        case OPERATION_TYPE_REFRESH:
-          processed_operation = refresh_identities (operation->manager, operation);
-          break;
-        case OPERATION_TYPE_GET_IDENTITY:
-          get_identity (operation->manager, operation);
-          processed_operation = TRUE;
-          break;
-        case OPERATION_TYPE_LIST:
-          list_identities (operation->manager, operation);
-          processed_operation = TRUE;
-
-          /* We want to block refreshes (and their associated "added"
-           * and "removed" signals) until the caller has had
-           * a chance to look at the batch of
-           * results we already processed
-           */
-          g_assert (operation->result != NULL);
-
-          g_debug
-            ("GoaKerberosIdentityManager:         Blocking until identities list processed");
-          block_scheduler_job (self);
-          g_object_weak_ref (G_OBJECT (operation->result),
-                             (GWeakNotify) stop_blocking_scheduler_job, self);
-          g_debug ("GoaKerberosIdentityManager:         Continuing");
-          break;
-        case OPERATION_TYPE_SIGN_IN:
-          sign_in_identity (operation->manager, operation);
-          processed_operation = TRUE;
-          break;
-        case OPERATION_TYPE_SIGN_OUT:
-          sign_out_identity (operation->manager, operation);
-          processed_operation = TRUE;
-          break;
-        case OPERATION_TYPE_RENEW:
-          renew_identity (operation->manager, operation);
-          processed_operation = TRUE;
-          break;
-        default:
-          break;
-        }
-
-      if (operation->result != NULL)
-        {
-          g_simple_async_result_complete_in_idle (operation->result);
-          g_clear_object (&operation->result);
-        }
-      operation_free (operation);
+      g_simple_async_result_take_error (operation->result, error);
+      g_simple_async_result_complete_in_idle (operation->result);
+      g_clear_object (&operation->result);
+      goto out;
+    }
 
-      wait_for_scheduler_job_to_become_unblocked (self);
+  if (g_cancellable_is_cancelled (operation->scheduler_cancellable))
+    goto out;
 
-      /* Don't bother saying "Waiting for next operation" if this operation
-       * was a no-op, since the debug spew probably already says the message
+  switch (operation->type)
+    {
+    case OPERATION_TYPE_REFRESH:
+      processed_operation = refresh_identities (operation->manager, operation);
+      break;
+
+    case OPERATION_TYPE_GET_IDENTITY:
+      get_identity (operation->manager, operation);
+      processed_operation = TRUE;
+      break;
+
+    case OPERATION_TYPE_LIST:
+      list_identities (operation->manager, operation);
+      processed_operation = TRUE;
+
+      /* We want to block refreshes (and their associated "added"
+       * and "removed" signals) until the caller has had
+       * a chance to look at the batch of
+       * results we already processed
        */
-      if (processed_operation)
-        g_debug ("GoaKerberosIdentityManager: Waiting for next operation");
+      g_assert (operation->result != NULL);
+
+      g_debug ("GoaKerberosIdentityManager: Blocking until identities list processed");
+      block_scheduler_job (self);
+      g_object_weak_ref (G_OBJECT (operation->result), (GWeakNotify) stop_blocking_scheduler_job, self);
+      g_debug ("GoaKerberosIdentityManager: Continuing");
+      break;
+
+    case OPERATION_TYPE_SIGN_IN:
+      sign_in_identity (operation->manager, operation);
+      processed_operation = TRUE;
+      break;
+
+    case OPERATION_TYPE_SIGN_OUT:
+      sign_out_identity (operation->manager, operation);
+      processed_operation = TRUE;
+      break;
+
+    case OPERATION_TYPE_RENEW:
+      renew_identity (operation->manager, operation);
+      processed_operation = TRUE;
+      break;
+
+    default:
+      break;
     }
 
-  g_async_queue_unref (pending_operations);
+  if (operation->result != NULL)
+    {
+      g_simple_async_result_complete_in_idle (operation->result);
+      g_clear_object (&operation->result);
+    }
 
-  return FALSE;
+  wait_for_scheduler_job_to_become_unblocked (self);
+
+  /* Don't bother saying "Waiting for next operation" if this operation
+   * was a no-op, since the debug spew probably already says the message
+   */
+  if (processed_operation)
+    g_debug ("GoaKerberosIdentityManager: Waiting for next operation");
+
+ out:
+  operation_free (operation);
+  return;
 }
 
 static void
@@ -1169,7 +1152,7 @@ goa_kerberos_identity_manager_get_identity (GoaIdentityManager   *manager,
 
   operation->identifier = g_strdup (identifier);
 
-  g_async_queue_push (self->priv->pending_operations, operation);
+  g_thread_pool_push (self->priv->thread_pool, operation, NULL);
 }
 
 static GoaIdentity *
@@ -1207,7 +1190,7 @@ goa_kerberos_identity_manager_list_identities (GoaIdentityManager  *manager,
   operation = operation_new (self, cancellable, OPERATION_TYPE_LIST, result);
   g_object_unref (result);
 
-  g_async_queue_push (self->priv->pending_operations, operation);
+  g_thread_pool_push (self->priv->thread_pool, operation, NULL);
 }
 
 static GList *
@@ -1248,7 +1231,7 @@ goa_kerberos_identity_manager_renew_identity (GoaIdentityManager  *manager,
 
   operation->identity = g_object_ref (identity);
 
-  g_async_queue_push (self->priv->pending_operations, operation);
+  g_thread_pool_push (self->priv->thread_pool, operation, NULL);
 }
 
 static void
@@ -1297,7 +1280,7 @@ goa_kerberos_identity_manager_sign_identity_in (GoaIdentityManager     *manager,
   g_cond_init (&operation->inquiry_finished_condition);
   operation->is_inquiring = FALSE;
 
-  g_async_queue_push (self->priv->pending_operations, operation);
+  g_thread_pool_push (self->priv->thread_pool, operation, NULL);
 }
 
 static GoaIdentity *
@@ -1336,7 +1319,7 @@ goa_kerberos_identity_manager_sign_identity_out (GoaIdentityManager  *manager,
 
   operation->identity = g_object_ref (identity);
 
-  g_async_queue_push (self->priv->pending_operations, operation);
+  g_thread_pool_push (self->priv->thread_pool, operation, NULL);
 }
 
 static void
@@ -1620,45 +1603,53 @@ initable_interface_init (GInitableIface *interface)
 static void
 goa_kerberos_identity_manager_init (GoaKerberosIdentityManager *self)
 {
+  GError *error;
+
   self->priv = G_TYPE_INSTANCE_GET_PRIVATE (self,
                                             GOA_TYPE_KERBEROS_IDENTITY_MANAGER,
                                             GoaKerberosIdentityManagerPrivate);
   self->priv->identities = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, g_object_unref);
   self->priv->expired_identities = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, NULL);
   self->priv->identities_by_realm = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, NULL);
-  self->priv->pending_operations = g_async_queue_new ();
+
+  error = NULL;
+  self->priv->thread_pool = g_thread_pool_new (goa_kerberos_identity_manager_thread_pool_func,
+                                               NULL,
+                                               1,
+                                               FALSE,
+                                               &error);
+  g_assert_no_error (error);
 
   g_mutex_init (&self->priv->scheduler_job_lock);
   g_cond_init (&self->priv->scheduler_job_unblocked);
 
   self->priv->scheduler_cancellable = g_cancellable_new ();
-  g_io_scheduler_push_job ((GIOSchedulerJobFunc)
-                           on_job_scheduled,
-                           self,
-                           NULL,
-                           G_PRIORITY_DEFAULT,
-                           self->priv->scheduler_cancellable);
-
+  g_cancellable_connect (self->priv->scheduler_cancellable,
+                         G_CALLBACK (on_scheduler_cancellable_cancelled),
+                         self,
+                         NULL);
 }
 
 static void
-cancel_pending_operations (GoaKerberosIdentityManager *self)
+goa_kerberos_identity_manager_dispose (GObject *object)
 {
-  Operation *operation;
+  GoaKerberosIdentityManager *self = GOA_KERBEROS_IDENTITY_MANAGER (object);
 
-  operation = g_async_queue_try_pop (self->priv->pending_operations);
-  while (operation != NULL)
+  if (self->priv->scheduler_cancellable != NULL)
     {
-      g_cancellable_cancel (operation->cancellable);
-      operation_free (operation);
-      operation = g_async_queue_try_pop (self->priv->pending_operations);
+      if (!g_cancellable_is_cancelled (self->priv->scheduler_cancellable))
+        {
+          g_cancellable_cancel (self->priv->scheduler_cancellable);
+        }
+
+      g_clear_object (&self->priv->scheduler_cancellable);
     }
-}
 
-static void
-goa_kerberos_identity_manager_dispose (GObject *object)
-{
-  GoaKerberosIdentityManager *self = GOA_KERBEROS_IDENTITY_MANAGER (object);
+  if (self->priv->thread_pool != NULL)
+    {
+      g_thread_pool_free (self->priv->thread_pool, FALSE, TRUE);
+      self->priv->thread_pool = NULL;
+    }
 
   if (self->priv->identities_by_realm != NULL)
     {
@@ -1680,28 +1671,6 @@ goa_kerberos_identity_manager_dispose (GObject *object)
 
   stop_watching_credentials_cache (self);
 
-  if (self->priv->pending_operations != NULL)
-    cancel_pending_operations (self);
-
-  if (self->priv->scheduler_cancellable != NULL)
-    {
-      if (!g_cancellable_is_cancelled (self->priv->scheduler_cancellable))
-        {
-          g_cancellable_cancel (self->priv->scheduler_cancellable);
-        }
-
-      g_clear_object (&self->priv->scheduler_cancellable);
-    }
-
-  /* Note, other thread may still be holding a local reference to queue
-   * while it shuts down from cancelled scheduler_cancellable above
-   */
-  if (self->priv->pending_operations != NULL)
-    {
-      g_async_queue_unref (self->priv->pending_operations);
-      self->priv->pending_operations = NULL;
-    }
-
   G_OBJECT_CLASS (goa_kerberos_identity_manager_parent_class)->dispose (object);
 }
 


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]