[dconf/wip/reorg: 465/523] Implement change signals



commit cc3221d143889375683a63ed888d984fe8aabe9a
Author: Ryan Lortie <desrt desrt ca>
Date:   Mon Jul 2 23:34:43 2012 -0400

    Implement change signals
    
    Support receiving and properly exposing change notifications.
    
    This required some changes to improve the thread-safety of destroying a
    DConfEngine.
    
    It is possible that a signal would be arriving (in the worker thread) at
    the exact instant that a DConfEngine was being destroyed (from the
    finalize of the DConfClient or DConfSettingsBackend).  This could lead
    to the object being accessed after it was finalized.
    
    We can avoid this by using weak references and by being more careful
    about when the DConfEngine is freed (by taking a ref to it in the signal
    handler and releasing it when done).

 client/dconf-client.c            |   26 ++++++-
 engine/dconf-engine.c            |  153 ++++++++++++++++++++++++++++++++++----
 engine/dconf-engine.h            |    5 +-
 gsettings/dconfsettingsbackend.c |   37 +++++++++-
 4 files changed, 199 insertions(+), 22 deletions(-)
---
diff --git a/client/dconf-client.c b/client/dconf-client.c
index ffc179e..1e3efc0 100644
--- a/client/dconf-client.c
+++ b/client/dconf-client.c
@@ -47,7 +47,7 @@ dconf_client_finalize (GObject *object)
 {
   DConfClient *client = DCONF_CLIENT (object);
 
-  dconf_engine_free (client->engine);
+  dconf_engine_unref (client->engine);
   g_main_context_unref (client->context);
 
   G_OBJECT_CLASS (dconf_client_parent_class)
@@ -103,13 +103,19 @@ dconf_engine_change_notify (DConfEngine         *engine,
                             const gchar *        tag,
                             gpointer             user_data)
 {
-  DConfClient *client = user_data;
+  GWeakRef *weak_ref = user_data;
   DConfClientChange *change;
+  DConfClient *client;
+
+  client = g_weak_ref_get (weak_ref);
+
+  if (client == NULL)
+    return;
 
   g_return_if_fail (DCONF_IS_CLIENT (client));
 
   change = g_slice_new (DConfClientChange);
-  change->client = g_object_ref (client);
+  change->client = client;
   change->prefix = g_strdup (prefix);
   change->changes = g_strdupv ((gchar **) changes);
   change->tag = g_strdup (tag);
@@ -117,13 +123,25 @@ dconf_engine_change_notify (DConfEngine         *engine,
   g_main_context_invoke (client->context, dconf_client_dispatch_change_signal, change);
 }
 
+static void
+dconf_client_free_weak_ref (gpointer data)
+{
+  GWeakRef *weak_ref = data;
+
+  g_weak_ref_clear (weak_ref);
+  g_slice_free (GWeakRef, weak_ref);
+}
+
 DConfClient *
 dconf_client_new (void)
 {
   DConfClient *client;
+  GWeakRef *weak_ref;
 
   client = g_object_new (DCONF_TYPE_CLIENT, NULL);
-  client->engine = dconf_engine_new (client);
+  weak_ref = g_slice_new (GWeakRef);
+  g_weak_ref_init (weak_ref, client);
+  client->engine = dconf_engine_new (weak_ref, dconf_client_free_weak_ref);
   client->context = g_main_context_ref_thread_default ();
 
   return client;
diff --git a/engine/dconf-engine.c b/engine/dconf-engine.c
index 230d2ab..75d39e0 100644
--- a/engine/dconf-engine.c
+++ b/engine/dconf-engine.c
@@ -147,18 +147,24 @@
 
 #define MAX_IN_FLIGHT 2
 
+static GSList *dconf_engine_global_list;
+static GMutex  dconf_engine_global_lock;
+
 struct _DConfEngine
 {
   gpointer            user_data;    /* Set at construct time */
+  GDestroyNotify      free_func;
+  gint                ref_count;
 
   GMutex              sources_lock; /* This lock is for the sources (ie: refreshing) and state. */
   guint64             state;        /* Counter that changes every time a source is refreshed. */
   DConfEngineSource **sources;      /* Array never changes, but each source changes internally. */
   gint                n_sources;
 
-  GMutex              queue_lock;   /* This lock is for pending, in_flight, last_handled */
+  GMutex              queue_lock;   /* This lock is for pending, in_flight */
   GQueue              pending;      /* DConfChangeset */
   GQueue              in_flight;    /* DConfChangeset */
+
   gchar              *last_handled; /* reply tag from last item in in_flight */
 };
 
@@ -214,13 +220,16 @@ dconf_engine_unlock_queues (DConfEngine *engine)
 }
 
 DConfEngine *
-dconf_engine_new (gpointer user_data)
+dconf_engine_new (gpointer       user_data,
+                  GDestroyNotify free_func)
 {
   DConfEngine *engine;
   gint i;
 
   engine = g_slice_new0 (DConfEngine);
   engine->user_data = user_data;
+  engine->free_func = free_func;
+  engine->ref_count = 1;
 
   g_mutex_init (&engine->sources_lock);
   g_mutex_init (&engine->queue_lock);
@@ -231,23 +240,83 @@ dconf_engine_new (gpointer user_data)
     if (!dconf_engine_source_init (engine->sources[i]))
       g_assert_not_reached ();
 
+  g_mutex_lock (&dconf_engine_global_lock);
+  dconf_engine_global_list = g_slist_prepend (dconf_engine_global_list, engine);
+  g_mutex_unlock (&dconf_engine_global_lock);
+
   return engine;
 }
 
 void
-dconf_engine_free (DConfEngine *engine)
+dconf_engine_unref (DConfEngine *engine)
 {
-  gint i;
+  if (g_atomic_int_dec_and_test (&engine->ref_count))
+    {
+      gint i;
 
-  g_mutex_clear (&engine->sources_lock);
-  g_mutex_clear (&engine->queue_lock);
+      /* We just dropped our refcount to zero, but we're still in the
+       * dconf_engine_global_list.
+       *
+       * If a signal arrives at this exact instant and the signal
+       * handler beats us to the lock then the refcount will be
+       * increased again.
+       *
+       * Acquire the lock and then double-check that the refcount is
+       * still zero before actually doing the remove.  If it's non-zero
+       * then the signal handler grabbed a ref and will call unref()
+       * later.
+       */
+      g_mutex_lock (&dconf_engine_global_lock);
+      if (g_atomic_int_get (&engine->ref_count) != 0)
+        {
+          g_mutex_unlock (&dconf_engine_global_lock);
+          return;
+        }
 
-  for (i = 0; i < engine->n_sources; i++)
-    dconf_engine_source_free (engine->sources[i]);
+      /* It's possible that another thread grabbed a reference at the
+       * last minute and dropped it back to zero again (thus causing the
+       * above check to pass).  In that case, however, the other thread
+       * will have also dropped the refcount from 1 to 0 and be inside
+       * of the dec-and-test above.
+       *
+       * We can only have one of the two threads doing the freeing of
+       * the data, so we have a simple rule: the thread that removes the
+       * engine from the global list is the one that does the free.
+       * Since this operation is performed under mutex we can be sure
+       * that only one thread will win.
+       */
+      if (!g_slist_find (dconf_engine_global_list, engine))
+        {
+          g_mutex_unlock (&dconf_engine_global_lock);
+          return;
+        }
+
+      dconf_engine_global_list = g_slist_remove (dconf_engine_global_list, engine);
+      g_mutex_unlock (&dconf_engine_global_lock);
+
+      g_mutex_clear (&engine->sources_lock);
+      g_mutex_clear (&engine->queue_lock);
+
+      g_free (engine->last_handled);
+
+      for (i = 0; i < engine->n_sources; i++)
+        dconf_engine_source_free (engine->sources[i]);
 
-  g_free (engine->sources);
+      g_free (engine->sources);
+
+      if (engine->free_func)
+        engine->free_func (engine->user_data);
+
+      g_slice_free (DConfEngine, engine);
+    }
+}
+
+static DConfEngine *
+dconf_engine_ref (DConfEngine *engine)
+{
+  g_atomic_int_inc (&engine->ref_count);
 
-  g_slice_free (DConfEngine, engine);
+  return engine;
 }
 
 guint64
@@ -586,7 +655,7 @@ dconf_engine_call_handle_new (DConfEngine                   *engine,
   g_assert (size >= sizeof (DConfEngineCallHandle));
 
   handle = g_malloc0 (size);
-  handle->engine = engine;
+  handle->engine = dconf_engine_ref (engine);
   handle->callback = callback;
   handle->expected_reply = expected_reply;
 
@@ -607,6 +676,7 @@ dconf_engine_call_handle_reply (DConfEngineCallHandle *handle,
 static void
 dconf_engine_call_handle_free (DConfEngineCallHandle *handle)
 {
+  dconf_engine_unref (handle->engine);
   g_free (handle);
 }
 
@@ -825,16 +895,28 @@ dconf_engine_change_completed (DConfEngine  *engine,
    * making room for another to be added.  Check that.
    */
   dconf_engine_manage_queue (engine);
+  dconf_engine_unlock_queues (engine);
 
   /* Deal with the reply we got. */
   if (reply)
     {
+      /* The write worked.
+       *
+       * We already sent a change notification for this item when we
+       * added it to the pending queue and we don't want to send another
+       * one again.  At the same time, it's very likely that we're just
+       * about to receive a change signal from the service.
+       *
+       * The tag sent as part of the reply to the Change call will be
+       * the same tag as on the change notification signal.  Record that
+       * tag so that we can ignore the signal when it comes.
+       *
+       * last_handled is only ever touched from the worker thread
+       */
       g_free (engine->last_handled);
       g_variant_get (reply, "(s)", &engine->last_handled);
     }
 
-  dconf_engine_unlock_queues (engine);
-
   if (error)
     {
       /* Some kind of unexpected failure occured while attempting to
@@ -1005,5 +1087,48 @@ dconf_engine_handle_dbus_signal (GBusType     type,
                                  const gchar *member,
                                  GVariant    *body)
 {
-  g_print ("saw a sig");
+  if (g_str_equal (member, "Notify"))
+    {
+      const gchar *prefix;
+      const gchar **changes;
+      const gchar *tag;
+      GSList *engines;
+
+      if (!g_variant_is_of_type (body, G_VARIANT_TYPE ("(sass)")))
+        return;
+
+      g_variant_get (body, "(&s^a&s&s)", &prefix, &changes, &tag);
+
+      g_mutex_lock (&dconf_engine_global_lock);
+      engines = g_slist_copy_deep (dconf_engine_global_list, (GCopyFunc) dconf_engine_ref, NULL);
+      g_mutex_unlock (&dconf_engine_global_lock);
+
+      while (engines)
+        {
+          DConfEngine *engine = engines->data;
+
+          /* It's possible that this incoming change notify is for a
+           * change that we already announced to the client when we
+           * placed it in the pending queue.
+           *
+           * Check last_handled to determine if we should ignore it.
+           */
+          if (!engine->last_handled || !g_str_equal (engine->last_handled, tag))
+            dconf_engine_change_notify (engine, prefix, changes, tag, engine->user_data);
+
+          engines = g_slist_delete_link (engines, engines);
+
+          dconf_engine_unref (engine);
+        }
+
+      g_free (changes);
+    }
+
+  else if (g_str_equal (member, "WritabilityNotify"))
+    {
+      if (!g_variant_is_of_type (body, G_VARIANT_TYPE ("(s)")))
+        return;
+
+      g_warning ("Need to handle writability changes"); /* XXX */
+    }
 }
diff --git a/engine/dconf-engine.h b/engine/dconf-engine.h
index 19b7097..bad8cf1 100644
--- a/engine/dconf-engine.h
+++ b/engine/dconf-engine.h
@@ -100,10 +100,11 @@ void                    dconf_engine_handle_dbus_signal                 (GBusTyp
                                                                          GVariant                *parameters);
 
 G_GNUC_INTERNAL
-DConfEngine *           dconf_engine_new                                (gpointer                 user_data);
+DConfEngine *           dconf_engine_new                                (gpointer                 user_data,
+                                                                         GDestroyNotify           free_func);
 
 G_GNUC_INTERNAL
-void                    dconf_engine_free                               (DConfEngine             *engine);
+void                    dconf_engine_unref                              (DConfEngine             *engine);
 
 /* Read API: always handled immediately */
 G_GNUC_INTERNAL
diff --git a/gsettings/dconfsettingsbackend.c b/gsettings/dconfsettingsbackend.c
index 85aef4e..53a982c 100644
--- a/gsettings/dconfsettingsbackend.c
+++ b/gsettings/dconfsettingsbackend.c
@@ -141,9 +141,22 @@ dconf_settings_backend_sync (GSettingsBackend *backend)
 }
 
 static void
+dconf_settings_backend_free_weak_ref (gpointer data)
+{
+  GWeakRef *weak_ref = data;
+
+  g_weak_ref_clear (weak_ref);
+  g_slice_free (GWeakRef, weak_ref);
+}
+
+static void
 dconf_settings_backend_init (DConfSettingsBackend *dcsb)
 {
-  dcsb->engine = dconf_engine_new (dcsb);
+  GWeakRef *weak_ref;
+
+  weak_ref = g_slice_new (GWeakRef);
+  g_weak_ref_init (weak_ref, dcsb);
+  dcsb->engine = dconf_engine_new (weak_ref, dconf_settings_backend_free_weak_ref);
 }
 
 static void
@@ -151,7 +164,7 @@ dconf_settings_backend_finalize (GObject *object)
 {
   DConfSettingsBackend *dcsb = (DConfSettingsBackend *) object;
 
-  dconf_engine_free (dcsb->engine);
+  dconf_engine_unref (dcsb->engine);
 
   G_OBJECT_CLASS (dconf_settings_backend_parent_class)
     ->finalize (object);
@@ -202,4 +215,24 @@ dconf_engine_change_notify (DConfEngine         *engine,
                             const gchar         *tag,
                             gpointer             user_data)
 {
+  GWeakRef *weak_ref = user_data;
+  DConfSettingsBackend *dcsb;
+
+  dcsb = g_weak_ref_get (weak_ref);
+
+  if (dcsb == NULL)
+    return;
+
+  if (changes[0] == NULL)
+    return;
+
+  if (changes[1] == NULL)
+    {
+      if (g_str_has_suffix (prefix, "/"))
+        g_settings_backend_path_changed (G_SETTINGS_BACKEND (dcsb), prefix, NULL);
+      else
+        g_settings_backend_changed (G_SETTINGS_BACKEND (dcsb), prefix, NULL);
+    }
+  else
+    g_settings_backend_keys_changed (G_SETTINGS_BACKEND (dcsb), prefix, changes, NULL);
 }



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]