[rhythmbox/rhythmdb-barrier: 4/5] rhythmdb: wait for changes to be processed before committing




commit ef7b3e771e7dff42b0024689ef9178d916abbcb5
Author: Jonathan Matthew <jonathan d14n org>
Date:   Fri Mar 26 11:56:09 2021 +1000

    rhythmdb: wait for changes to be processed before committing
    
    When committing from a worker thread, push an event onto the event queue
    and wait for it to be processed before proceeding, ensuring that changes
    made are actually processed by the main thread first.

 rhythmdb/rhythmdb-private.h |  7 ++++++-
 rhythmdb/rhythmdb.c         | 37 ++++++++++++++++++++++++++++++++++++-
 2 files changed, 42 insertions(+), 2 deletions(-)
---
diff --git a/rhythmdb/rhythmdb-private.h b/rhythmdb/rhythmdb-private.h
index 579f785c2..585b63234 100644
--- a/rhythmdb/rhythmdb-private.h
+++ b/rhythmdb/rhythmdb-private.h
@@ -181,6 +181,10 @@ struct _RhythmDBPrivate
        GList *deleted_entries_to_emit;
        GHashTable *changed_entries_to_emit;
 
+       GList *barriers_done;
+       GMutex barrier_mutex;
+       GCond barrier_condition;
+
        gboolean can_save;
        gboolean saving;
        gboolean dirty;
@@ -206,7 +210,8 @@ typedef struct
                RHYTHMDB_EVENT_THREAD_EXITED,
                RHYTHMDB_EVENT_DB_SAVED,
                RHYTHMDB_EVENT_QUERY_COMPLETE,
-               RHYTHMDB_EVENT_ENTRY_SET
+               RHYTHMDB_EVENT_ENTRY_SET,
+               RHYTHMDB_EVENT_BARRIER
        } type;
        RBRefString *uri;
        RBRefString *real_uri; /* Target of a symlink, if any */
diff --git a/rhythmdb/rhythmdb.c b/rhythmdb/rhythmdb.c
index fa93ecc92..61832bc6c 100644
--- a/rhythmdb/rhythmdb.c
+++ b/rhythmdb/rhythmdb.c
@@ -1001,6 +1001,8 @@ rhythmdb_event_free (RhythmDB *db,
        case RHYTHMDB_EVENT_METADATA_CACHE:
                free_cached_metadata(&result->cached_metadata);
                break;
+       case RHYTHMDB_EVENT_BARRIER:
+               break;
        }
        if (result->error)
                g_error_free (result->error);
@@ -1547,6 +1549,28 @@ rhythmdb_commit_internal (RhythmDB *db,
                          gboolean sync_changes,
                          GThread *thread)
 {
+       /*
+        * during normal operation, if committing from a worker thread,
+        * wait for changes made on the thread to be processed by the main thread.
+        * this avoids races and ensures the signals emitted are correct.
+        */
+       if (db->priv->action_thread_running && !rb_is_main_thread ()) {
+               RhythmDBEvent *event;
+
+               event = g_slice_new0 (RhythmDBEvent);
+               event->db = db;
+               event->type = RHYTHMDB_EVENT_BARRIER;
+
+               g_mutex_lock (&db->priv->barrier_mutex);
+               rhythmdb_push_event (db, event);
+               while (g_list_find (db->priv->barriers_done, event) == NULL)
+                       g_cond_wait (&db->priv->barrier_condition, &db->priv->barrier_mutex);
+               db->priv->barriers_done = g_list_remove (db->priv->barriers_done, event);
+               g_mutex_unlock (&db->priv->barrier_mutex);
+
+               rhythmdb_event_free (db, event);
+       }
+
        g_mutex_lock (&db->priv->change_mutex);
 
        if (sync_changes) {
@@ -2626,7 +2650,8 @@ rhythmdb_process_one_event (RhythmDBEvent *event, RhythmDB *db)
            ((event->type == RHYTHMDB_EVENT_STAT)
             || (event->type == RHYTHMDB_EVENT_METADATA_LOAD)
             || (event->type == RHYTHMDB_EVENT_METADATA_CACHE)
-            || (event->type == RHYTHMDB_EVENT_ENTRY_SET))) {
+            || (event->type == RHYTHMDB_EVENT_ENTRY_SET)
+            || (event->type == RHYTHMDB_EVENT_BARRIER))) {
                rb_debug ("Database is read-only, delaying event processing");
                g_async_queue_push (db->priv->delayed_write_queue, event);
                return;
@@ -2674,6 +2699,16 @@ rhythmdb_process_one_event (RhythmDBEvent *event, RhythmDB *db)
                rb_debug ("processing RHYTHMDB_EVENT_QUERY_COMPLETE");
                rhythmdb_read_leave (db);
                break;
+       case RHYTHMDB_EVENT_BARRIER:
+               rb_debug ("processing RHYTHMDB_EVENT_BARRIER");
+               g_mutex_lock (&db->priv->barrier_mutex);
+               db->priv->barriers_done = g_list_prepend (db->priv->barriers_done, event);
+               g_cond_broadcast (&db->priv->barrier_condition);
+               g_mutex_unlock (&db->priv->barrier_mutex);
+
+               /* freed by the thread waiting on the barrier */
+               free = FALSE;
+               break;
        }
        if (free)
                rhythmdb_event_free (db, event);


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]