[rhythmbox/rhythmdb-barrier] rhythmdb: wait for changes to be processed before committing




commit 9d715866012bfb92f3829255640ce964a2d65de0
Author: Jonathan Matthew <jonathan d14n org>
Date:   Fri Mar 26 11:56:09 2021 +1000

    rhythmdb: wait for changes to be processed before committing
    
    When committing from a worker thread, push an event onto the event queue
    and wait for it to be processed before proceeding, ensuring that changes
    made are actually processed by the main thread first.

 rhythmdb/rhythmdb-private.h |  7 ++++++-
 rhythmdb/rhythmdb.c         | 37 ++++++++++++++++++++++++++++++++++++-
 2 files changed, 42 insertions(+), 2 deletions(-)
---
diff --git a/rhythmdb/rhythmdb-private.h b/rhythmdb/rhythmdb-private.h
index 579f785c2..d07eaf0ce 100644
--- a/rhythmdb/rhythmdb-private.h
+++ b/rhythmdb/rhythmdb-private.h
@@ -181,6 +181,10 @@ struct _RhythmDBPrivate
        GList *deleted_entries_to_emit;
        GHashTable *changed_entries_to_emit;
 
+       gpointer barrier_current;
+       GMutex barrier_mutex;
+       GCond barrier_condition;
+
        gboolean can_save;
        gboolean saving;
        gboolean dirty;
@@ -206,7 +210,8 @@ typedef struct
                RHYTHMDB_EVENT_THREAD_EXITED,
                RHYTHMDB_EVENT_DB_SAVED,
                RHYTHMDB_EVENT_QUERY_COMPLETE,
-               RHYTHMDB_EVENT_ENTRY_SET
+               RHYTHMDB_EVENT_ENTRY_SET,
+               RHYTHMDB_EVENT_BARRIER
        } type;
        RBRefString *uri;
        RBRefString *real_uri; /* Target of a symlink, if any */
diff --git a/rhythmdb/rhythmdb.c b/rhythmdb/rhythmdb.c
index fa93ecc92..9cb78ebcd 100644
--- a/rhythmdb/rhythmdb.c
+++ b/rhythmdb/rhythmdb.c
@@ -1001,6 +1001,8 @@ rhythmdb_event_free (RhythmDB *db,
        case RHYTHMDB_EVENT_METADATA_CACHE:
                free_cached_metadata(&result->cached_metadata);
                break;
+       case RHYTHMDB_EVENT_BARRIER:
+               break;
        }
        if (result->error)
                g_error_free (result->error);
@@ -1547,6 +1549,28 @@ rhythmdb_commit_internal (RhythmDB *db,
                          gboolean sync_changes,
                          GThread *thread)
 {
+       /*
+        * during normal operation, if committing from a worker thread,
+        * wait for changes made on the thread to be processed by the main thread.
+        * this avoids races and ensures the signals emitted are correct.
+        */
+       if (db->priv->action_thread_running && !rb_is_main_thread ()) {
+               RhythmDBEvent *event;
+
+               event = g_slice_new0 (RhythmDBEvent);
+               event->db = db;
+               event->type = RHYTHMDB_EVENT_BARRIER;
+
+               g_mutex_lock (&db->priv->barrier_mutex);
+               rhythmdb_push_event (db, event);
+               while (db->priv->barrier_current != event)
+                       g_cond_wait (&db->priv->barrier_condition, &db->priv->barrier_mutex);
+               db->priv->barrier_current = NULL;
+               g_mutex_unlock (&db->priv->barrier_mutex);
+
+               rhythmdb_event_free (db, event);
+       }
+
        g_mutex_lock (&db->priv->change_mutex);
 
        if (sync_changes) {
@@ -2626,7 +2650,8 @@ rhythmdb_process_one_event (RhythmDBEvent *event, RhythmDB *db)
            ((event->type == RHYTHMDB_EVENT_STAT)
             || (event->type == RHYTHMDB_EVENT_METADATA_LOAD)
             || (event->type == RHYTHMDB_EVENT_METADATA_CACHE)
-            || (event->type == RHYTHMDB_EVENT_ENTRY_SET))) {
+            || (event->type == RHYTHMDB_EVENT_ENTRY_SET)
+            || (event->type == RHYTHMDB_EVENT_BARRIER))) {
                rb_debug ("Database is read-only, delaying event processing");
                g_async_queue_push (db->priv->delayed_write_queue, event);
                return;
@@ -2674,6 +2699,16 @@ rhythmdb_process_one_event (RhythmDBEvent *event, RhythmDB *db)
                rb_debug ("processing RHYTHMDB_EVENT_QUERY_COMPLETE");
                rhythmdb_read_leave (db);
                break;
+       case RHYTHMDB_EVENT_BARRIER:
+               rb_debug ("processing RHYTHMDB_EVENT_BARRIER");
+               g_mutex_lock (&db->priv->barrier_mutex);
+               db->priv->barrier_current = event;
+               g_cond_broadcast (&db->priv->barrier_condition);
+               g_mutex_unlock (&db->priv->barrier_mutex);
+
+               /* freed by the thread waiting on the barrier */
+               free = FALSE;
+               break;
        }
        if (free)
                rhythmdb_event_free (db, event);


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]