[ostree] pull: Fix a race condition causing an early exit



commit 9bd4d35c2b34a321c9da2ad115b113076366a146
Author: Colin Walters <walters verbum org>
Date:   Sun Dec 23 17:18:08 2012 -0500

    pull: Fix a race condition causing an early exit
    
    This is a little less magical than what we had before.

 src/libotutil/ot-worker-queue.c |   74 +++++++--------------------------------
 src/libotutil/ot-worker-queue.h |    9 +----
 src/ostree/ostree-pull.c        |   55 ++++++++++++++---------------
 3 files changed, 41 insertions(+), 97 deletions(-)
---
diff --git a/src/libotutil/ot-worker-queue.c b/src/libotutil/ot-worker-queue.c
index 79d48b7..4865662 100644
--- a/src/libotutil/ot-worker-queue.c
+++ b/src/libotutil/ot-worker-queue.c
@@ -31,21 +31,16 @@ struct OtWorkerQueue {
   GCond cond;
   GQueue queue;
 
-  volatile gint holds;
-
   char *thread_name;
   
   gboolean complete;
+  gboolean is_idle;
   gboolean destroyed;
 
   GThread *worker;
 
   OtWorkerQueueFunc work_func;
   OtWorkerQueueFunc work_data;
-  
-  GMainContext *idle_context;
-  OtWorkerQueueIdleFunc idle_callback;
-  gpointer idle_data;
 };
 
 static gpointer
@@ -61,6 +56,8 @@ ot_worker_queue_new (const char          *thread_name,
   g_cond_init (&queue->cond);
   g_queue_init (&queue->queue);
 
+  queue->is_idle = TRUE;
+
   queue->thread_name = g_strdup (thread_name);
   queue->work_func = func;
   queue->work_data = data;
@@ -72,40 +69,6 @@ void
 ot_worker_queue_start (OtWorkerQueue  *queue)
 {
   queue->worker = g_thread_new (queue->thread_name, ot_worker_queue_thread_main, queue);
-  ot_worker_queue_push (queue, queue); /* Self marks end of (initial) queue */
-}
-
-void
-ot_worker_queue_hold (OtWorkerQueue  *queue)
-{
-  g_atomic_int_inc (&queue->holds);
-}
-
-static gboolean
-invoke_idle_callback (gpointer user_data)
-{
-  OtWorkerQueue *queue = user_data;
-  queue->idle_callback (queue->idle_data);
-  return FALSE;
-}
-
-void
-ot_worker_queue_release (OtWorkerQueue  *queue)
-{
-  if (!g_atomic_int_dec_and_test (&queue->holds))
-    return;
-
-  g_mutex_lock (&queue->mutex);
-
-  if (!g_queue_peek_tail_link (&queue->queue))
-    {
-      if (queue->idle_callback)
-        g_main_context_invoke (queue->idle_context,
-                               invoke_idle_callback,
-                               queue);
-    }
-
-  g_mutex_unlock (&queue->mutex);
 }
 
 void
@@ -114,6 +77,7 @@ ot_worker_queue_push (OtWorkerQueue *queue,
 {
   g_mutex_lock (&queue->mutex);
   g_queue_push_head (&queue->queue, data);
+  queue->is_idle = FALSE;
   g_cond_signal (&queue->cond);
   g_mutex_unlock (&queue->mutex);
 }
@@ -131,11 +95,7 @@ ot_worker_queue_thread_main (gpointer user_data)
 
       while (!g_queue_peek_tail_link (&queue->queue))
         {
-          if (queue->idle_callback && queue->complete &&
-              g_atomic_int_get (&queue->holds) == 0)
-            g_main_context_invoke (queue->idle_context,
-                                   invoke_idle_callback,
-                                   queue);
+          queue->is_idle = TRUE;
           g_cond_wait (&queue->cond, &queue->mutex);
         }
 
@@ -146,27 +106,20 @@ ot_worker_queue_thread_main (gpointer user_data)
       if (!item)
         break;
 
-      if (item == queue)
-        queue->complete = TRUE;
-      else
-        queue->work_func (item, queue->work_data);
+      queue->work_func (item, queue->work_data);
     }
 
   return NULL;
 }
 
-void
-ot_worker_queue_set_idle_callback (OtWorkerQueue *queue,
-                                   GMainContext  *context,
-                                   OtWorkerQueueIdleFunc idle_callback,
-                                   gpointer       data)
+gboolean
+ot_worker_queue_is_idle (OtWorkerQueue *queue)
 {
-  g_assert (!queue->worker);
-  if (!context)
-    context = g_main_context_default ();
-  queue->idle_context = g_main_context_ref (context);
-  queue->idle_callback = idle_callback;
-  queue->idle_data = data;
+  gboolean ret;
+  g_mutex_lock (&queue->mutex);
+  ret = queue->is_idle;
+  g_mutex_unlock (&queue->mutex);
+  return ret;
 }
 
 void
@@ -180,7 +133,6 @@ ot_worker_queue_unref (OtWorkerQueue *queue)
 
   g_free (queue->thread_name);
 
-  g_main_context_unref (queue->idle_context);
   g_mutex_clear (&queue->mutex);
   g_cond_clear (&queue->cond);
   g_queue_clear (&queue->queue);
diff --git a/src/libotutil/ot-worker-queue.h b/src/libotutil/ot-worker-queue.h
index 590480e..cfd7a92 100644
--- a/src/libotutil/ot-worker-queue.h
+++ b/src/libotutil/ot-worker-queue.h
@@ -31,7 +31,6 @@ typedef struct OtWorkerQueue OtWorkerQueue;
 
 typedef void (*OtWorkerQueueFunc) (gpointer data,
                                    gpointer user_data);
-typedef void (*OtWorkerQueueIdleFunc) (gpointer user_data);
 
 OtWorkerQueue *ot_worker_queue_new (const char         *thread_name,
                                     OtWorkerQueueFunc   func,
@@ -39,13 +38,7 @@ OtWorkerQueue *ot_worker_queue_new (const char         *thread_name,
 
 void ot_worker_queue_start (OtWorkerQueue  *queue);
 
-void ot_worker_queue_hold (OtWorkerQueue  *queue);
-void ot_worker_queue_release (OtWorkerQueue  *queue);
-
-void ot_worker_queue_set_idle_callback (OtWorkerQueue          *queue,
-                                        GMainContext           *context,
-                                        OtWorkerQueueIdleFunc   idle_callback,
-                                        gpointer                data);
+gboolean ot_worker_queue_is_idle (OtWorkerQueue          *queue);
 
 void ot_worker_queue_push (OtWorkerQueue      *queue,
                            gpointer            data);
diff --git a/src/ostree/ostree-pull.c b/src/ostree/ostree-pull.c
index 4ce781d..f7fbf37 100644
--- a/src/ostree/ostree-pull.c
+++ b/src/ostree/ostree-pull.c
@@ -101,10 +101,10 @@ typedef struct {
   guint         outstanding_uri_requests;
 
   GQueue        queued_filemeta;
-  GThread      *metadata_scan_thread;
   OtWorkerQueue  *metadata_objects_to_scan;
   GHashTable   *scanned_metadata; /* Maps object name to itself */
   GHashTable   *requested_content; /* Maps object name to itself */
+  guint         n_outstanding_metadata_fetches;
   
   guint         n_fetched_content;
   guint         outstanding_filemeta_requests;
@@ -268,16 +268,24 @@ static void
 check_outstanding_requests_handle_error (OtPullData          *pull_data,
                                          GError              *error)
 {
-  if (!pull_data->metadata_scan_active &&
+  if ((!pull_data->metadata_objects_to_scan || ot_worker_queue_is_idle (pull_data->metadata_objects_to_scan)) &&
       pull_data->outstanding_uri_requests == 0 &&
       pull_data->outstanding_filemeta_requests == 0 &&
       pull_data->outstanding_filecontent_requests == 0 &&
+      pull_data->n_outstanding_metadata_fetches == 0 &&
       pull_data->outstanding_content_stage_requests == 0)
     g_main_loop_quit (pull_data->loop);
   throw_async_error (pull_data, error);
 }
 
 static gboolean
+idle_check_outstanding_requests (gpointer user_data)
+{
+  check_outstanding_requests_handle_error (user_data, NULL);
+  return FALSE;
+}
+
+static gboolean
 run_mainloop_monitor_fetcher (OtPullData   *pull_data)
 {
   GSource *update_timeout = NULL;
@@ -456,7 +464,6 @@ scan_dirtree_object (OtPullData   *pull_data,
           g_hash_table_insert (pull_data->requested_content, duped_checksum, duped_checksum);
 
           g_atomic_int_inc (&pull_data->n_requested_content);
-          ot_worker_queue_hold (pull_data->metadata_objects_to_scan);
           g_main_context_invoke (NULL, idle_queue_content_request, idle_fetch_data);
         }
     }
@@ -736,8 +743,6 @@ idle_queue_content_request (gpointer user_data)
       process_one_file_request (data);
     }
       
-  ot_worker_queue_release (pull_data->metadata_objects_to_scan);
-  
   return FALSE;
 }
 
@@ -756,10 +761,10 @@ on_metadata_staged (GObject           *object,
   OtPullData *pull_data = fetch_data->pull_data;
 
   pull_data->n_fetched_metadata++;
+  pull_data->n_outstanding_metadata_fetches--;
 
   ot_worker_queue_push (pull_data->metadata_objects_to_scan,
                         g_variant_ref (fetch_data->object));
-  ot_worker_queue_release (pull_data->metadata_objects_to_scan);
 
   (void) gs_file_unlink (fetch_data->temp_path, NULL, NULL);
   g_object_unref (fetch_data->temp_path);
@@ -821,6 +826,7 @@ idle_fetch_metadata_object (gpointer data)
   objpath = ostree_get_relative_object_path (checksum, objtype, compressed);
   obj_uri = suburi_new (pull_data->base_uri, objpath, NULL);
 
+  pull_data->n_outstanding_metadata_fetches++;
   ostree_fetcher_request_uri_async (pull_data->fetcher, obj_uri, pull_data->cancellable,
                                     meta_fetch_on_complete, fetch_data);
   soup_uri_free (obj_uri);
@@ -840,7 +846,6 @@ queue_metadata_object_fetch (OtPullData  *pull_data,
   IdleFetchMetadataObjectData *fetch_data = g_new (IdleFetchMetadataObjectData, 1);
   fetch_data->pull_data = pull_data;
   fetch_data->object = g_variant_ref (object);
-  ot_worker_queue_hold (fetch_data->pull_data->metadata_objects_to_scan);
   g_idle_add (idle_fetch_metadata_object, fetch_data);
 }
 
@@ -958,8 +963,9 @@ scan_one_metadata_object (OtPullData         *pull_data,
         }
       g_hash_table_insert (pull_data->scanned_metadata, g_variant_ref (object), object);
       g_atomic_int_inc (&pull_data->n_scanned_metadata);
-    }
 
+      g_idle_add (idle_check_outstanding_requests, pull_data);
+    }
 
   ret = TRUE;
  out:
@@ -1032,15 +1038,6 @@ scan_one_metadata_object_dispatch (gpointer item,
     }
 }
 
-static void
-on_metadata_worker_idle (gpointer user_data)
-{
-  OtPullData *pull_data = user_data;
-
-  pull_data->metadata_scan_active = FALSE;
-  
-  check_outstanding_requests_handle_error (pull_data, NULL);
-}
 
 static gboolean
 idle_start_worker (gpointer user_data)
@@ -1362,8 +1359,6 @@ ostree_builtin_pull (int argc, char **argv, GFile *repo_path, GError **error)
   pull_data->metadata_objects_to_scan = ot_worker_queue_new ("metadatascan",
                                                              scan_one_metadata_object_dispatch,
                                                              pull_data);
-  ot_worker_queue_set_idle_callback (pull_data->metadata_objects_to_scan,
-                                     NULL, on_metadata_worker_idle, pull_data);
 
   g_hash_table_iter_init (&hash_iter, commits_to_fetch);
   while (g_hash_table_iter_next (&hash_iter, &key, &value))
@@ -1400,15 +1395,19 @@ ostree_builtin_pull (int argc, char **argv, GFile *repo_path, GError **error)
           g_hash_table_insert (updated_refs, g_strdup (ref), g_strdup (sha256));
         }
     }
-
-  g_idle_add (idle_start_worker, pull_data);
-
+  
   /* Start metadata thread, which kicks off further metadata requests
    * as well as content fetches.
    */
-  if (!run_mainloop_monitor_fetcher (pull_data))
-    goto out;
+  if (!ot_worker_queue_is_idle (pull_data->metadata_objects_to_scan))
+    {
+      g_idle_add (idle_start_worker, pull_data);
 
+      /* Now await work completion */
+      if (!run_mainloop_monitor_fetcher (pull_data))
+        goto out;
+    }
+  
   if (!ostree_repo_commit_transaction (pull_data->repo, cancellable, error))
     goto out;
 
@@ -1418,15 +1417,15 @@ ostree_builtin_pull (int argc, char **argv, GFile *repo_path, GError **error)
       const char *ref = key;
       const char *checksum = value;
       ot_lfree char *remote_ref = NULL;
-
+          
       remote_ref = g_strdup_printf ("%s/%s", pull_data->remote_name, ref);
-
+          
       if (!ostree_repo_write_ref (pull_data->repo, pull_data->remote_name, ref, checksum, error))
         goto out;
-      
+          
       g_print ("remote %s is now %s\n", remote_ref, checksum);
     }
-
+      
   end_time = g_get_monotonic_time ();
 
   bytes_transferred = ostree_fetcher_bytes_transferred (pull_data->fetcher);



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]