[ostree] pull: Fix a race condition causing an early exit
- From: Colin Walters <walters src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [ostree] pull: Fix a race condition causing an early exit
- Date: Sun, 23 Dec 2012 22:22:22 +0000 (UTC)
commit 9bd4d35c2b34a321c9da2ad115b113076366a146
Author: Colin Walters <walters verbum org>
Date: Sun Dec 23 17:18:08 2012 -0500
pull: Fix a race condition causing an early exit
This is a little less magical than what we had before.
src/libotutil/ot-worker-queue.c | 74 +++++++--------------------------------
src/libotutil/ot-worker-queue.h | 9 +----
src/ostree/ostree-pull.c | 55 ++++++++++++++---------------
3 files changed, 41 insertions(+), 97 deletions(-)
---
diff --git a/src/libotutil/ot-worker-queue.c b/src/libotutil/ot-worker-queue.c
index 79d48b7..4865662 100644
--- a/src/libotutil/ot-worker-queue.c
+++ b/src/libotutil/ot-worker-queue.c
@@ -31,21 +31,16 @@ struct OtWorkerQueue {
GCond cond;
GQueue queue;
- volatile gint holds;
-
char *thread_name;
gboolean complete;
+ gboolean is_idle;
gboolean destroyed;
GThread *worker;
OtWorkerQueueFunc work_func;
OtWorkerQueueFunc work_data;
-
- GMainContext *idle_context;
- OtWorkerQueueIdleFunc idle_callback;
- gpointer idle_data;
};
static gpointer
@@ -61,6 +56,8 @@ ot_worker_queue_new (const char *thread_name,
g_cond_init (&queue->cond);
g_queue_init (&queue->queue);
+ queue->is_idle = TRUE;
+
queue->thread_name = g_strdup (thread_name);
queue->work_func = func;
queue->work_data = data;
@@ -72,40 +69,6 @@ void
ot_worker_queue_start (OtWorkerQueue *queue)
{
queue->worker = g_thread_new (queue->thread_name, ot_worker_queue_thread_main, queue);
- ot_worker_queue_push (queue, queue); /* Self marks end of (initial) queue */
-}
-
-void
-ot_worker_queue_hold (OtWorkerQueue *queue)
-{
- g_atomic_int_inc (&queue->holds);
-}
-
-static gboolean
-invoke_idle_callback (gpointer user_data)
-{
- OtWorkerQueue *queue = user_data;
- queue->idle_callback (queue->idle_data);
- return FALSE;
-}
-
-void
-ot_worker_queue_release (OtWorkerQueue *queue)
-{
- if (!g_atomic_int_dec_and_test (&queue->holds))
- return;
-
- g_mutex_lock (&queue->mutex);
-
- if (!g_queue_peek_tail_link (&queue->queue))
- {
- if (queue->idle_callback)
- g_main_context_invoke (queue->idle_context,
- invoke_idle_callback,
- queue);
- }
-
- g_mutex_unlock (&queue->mutex);
}
void
@@ -114,6 +77,7 @@ ot_worker_queue_push (OtWorkerQueue *queue,
{
g_mutex_lock (&queue->mutex);
g_queue_push_head (&queue->queue, data);
+ queue->is_idle = FALSE;
g_cond_signal (&queue->cond);
g_mutex_unlock (&queue->mutex);
}
@@ -131,11 +95,7 @@ ot_worker_queue_thread_main (gpointer user_data)
while (!g_queue_peek_tail_link (&queue->queue))
{
- if (queue->idle_callback && queue->complete &&
- g_atomic_int_get (&queue->holds) == 0)
- g_main_context_invoke (queue->idle_context,
- invoke_idle_callback,
- queue);
+ queue->is_idle = TRUE;
g_cond_wait (&queue->cond, &queue->mutex);
}
@@ -146,27 +106,20 @@ ot_worker_queue_thread_main (gpointer user_data)
if (!item)
break;
- if (item == queue)
- queue->complete = TRUE;
- else
- queue->work_func (item, queue->work_data);
+ queue->work_func (item, queue->work_data);
}
return NULL;
}
-void
-ot_worker_queue_set_idle_callback (OtWorkerQueue *queue,
- GMainContext *context,
- OtWorkerQueueIdleFunc idle_callback,
- gpointer data)
+gboolean
+ot_worker_queue_is_idle (OtWorkerQueue *queue)
{
- g_assert (!queue->worker);
- if (!context)
- context = g_main_context_default ();
- queue->idle_context = g_main_context_ref (context);
- queue->idle_callback = idle_callback;
- queue->idle_data = data;
+ gboolean ret;
+ g_mutex_lock (&queue->mutex);
+ ret = queue->is_idle;
+ g_mutex_unlock (&queue->mutex);
+ return ret;
}
void
@@ -180,7 +133,6 @@ ot_worker_queue_unref (OtWorkerQueue *queue)
g_free (queue->thread_name);
- g_main_context_unref (queue->idle_context);
g_mutex_clear (&queue->mutex);
g_cond_clear (&queue->cond);
g_queue_clear (&queue->queue);
diff --git a/src/libotutil/ot-worker-queue.h b/src/libotutil/ot-worker-queue.h
index 590480e..cfd7a92 100644
--- a/src/libotutil/ot-worker-queue.h
+++ b/src/libotutil/ot-worker-queue.h
@@ -31,7 +31,6 @@ typedef struct OtWorkerQueue OtWorkerQueue;
typedef void (*OtWorkerQueueFunc) (gpointer data,
gpointer user_data);
-typedef void (*OtWorkerQueueIdleFunc) (gpointer user_data);
OtWorkerQueue *ot_worker_queue_new (const char *thread_name,
OtWorkerQueueFunc func,
@@ -39,13 +38,7 @@ OtWorkerQueue *ot_worker_queue_new (const char *thread_name,
void ot_worker_queue_start (OtWorkerQueue *queue);
-void ot_worker_queue_hold (OtWorkerQueue *queue);
-void ot_worker_queue_release (OtWorkerQueue *queue);
-
-void ot_worker_queue_set_idle_callback (OtWorkerQueue *queue,
- GMainContext *context,
- OtWorkerQueueIdleFunc idle_callback,
- gpointer data);
+gboolean ot_worker_queue_is_idle (OtWorkerQueue *queue);
void ot_worker_queue_push (OtWorkerQueue *queue,
gpointer data);
diff --git a/src/ostree/ostree-pull.c b/src/ostree/ostree-pull.c
index 4ce781d..f7fbf37 100644
--- a/src/ostree/ostree-pull.c
+++ b/src/ostree/ostree-pull.c
@@ -101,10 +101,10 @@ typedef struct {
guint outstanding_uri_requests;
GQueue queued_filemeta;
- GThread *metadata_scan_thread;
OtWorkerQueue *metadata_objects_to_scan;
GHashTable *scanned_metadata; /* Maps object name to itself */
GHashTable *requested_content; /* Maps object name to itself */
+ guint n_outstanding_metadata_fetches;
guint n_fetched_content;
guint outstanding_filemeta_requests;
@@ -268,16 +268,24 @@ static void
check_outstanding_requests_handle_error (OtPullData *pull_data,
GError *error)
{
- if (!pull_data->metadata_scan_active &&
+ if ((!pull_data->metadata_objects_to_scan || ot_worker_queue_is_idle (pull_data->metadata_objects_to_scan)) &&
pull_data->outstanding_uri_requests == 0 &&
pull_data->outstanding_filemeta_requests == 0 &&
pull_data->outstanding_filecontent_requests == 0 &&
+ pull_data->n_outstanding_metadata_fetches == 0 &&
pull_data->outstanding_content_stage_requests == 0)
g_main_loop_quit (pull_data->loop);
throw_async_error (pull_data, error);
}
static gboolean
+idle_check_outstanding_requests (gpointer user_data)
+{
+ check_outstanding_requests_handle_error (user_data, NULL);
+ return FALSE;
+}
+
+static gboolean
run_mainloop_monitor_fetcher (OtPullData *pull_data)
{
GSource *update_timeout = NULL;
@@ -456,7 +464,6 @@ scan_dirtree_object (OtPullData *pull_data,
g_hash_table_insert (pull_data->requested_content, duped_checksum, duped_checksum);
g_atomic_int_inc (&pull_data->n_requested_content);
- ot_worker_queue_hold (pull_data->metadata_objects_to_scan);
g_main_context_invoke (NULL, idle_queue_content_request, idle_fetch_data);
}
}
@@ -736,8 +743,6 @@ idle_queue_content_request (gpointer user_data)
process_one_file_request (data);
}
- ot_worker_queue_release (pull_data->metadata_objects_to_scan);
-
return FALSE;
}
@@ -756,10 +761,10 @@ on_metadata_staged (GObject *object,
OtPullData *pull_data = fetch_data->pull_data;
pull_data->n_fetched_metadata++;
+ pull_data->n_outstanding_metadata_fetches--;
ot_worker_queue_push (pull_data->metadata_objects_to_scan,
g_variant_ref (fetch_data->object));
- ot_worker_queue_release (pull_data->metadata_objects_to_scan);
(void) gs_file_unlink (fetch_data->temp_path, NULL, NULL);
g_object_unref (fetch_data->temp_path);
@@ -821,6 +826,7 @@ idle_fetch_metadata_object (gpointer data)
objpath = ostree_get_relative_object_path (checksum, objtype, compressed);
obj_uri = suburi_new (pull_data->base_uri, objpath, NULL);
+ pull_data->n_outstanding_metadata_fetches++;
ostree_fetcher_request_uri_async (pull_data->fetcher, obj_uri, pull_data->cancellable,
meta_fetch_on_complete, fetch_data);
soup_uri_free (obj_uri);
@@ -840,7 +846,6 @@ queue_metadata_object_fetch (OtPullData *pull_data,
IdleFetchMetadataObjectData *fetch_data = g_new (IdleFetchMetadataObjectData, 1);
fetch_data->pull_data = pull_data;
fetch_data->object = g_variant_ref (object);
- ot_worker_queue_hold (fetch_data->pull_data->metadata_objects_to_scan);
g_idle_add (idle_fetch_metadata_object, fetch_data);
}
@@ -958,8 +963,9 @@ scan_one_metadata_object (OtPullData *pull_data,
}
g_hash_table_insert (pull_data->scanned_metadata, g_variant_ref (object), object);
g_atomic_int_inc (&pull_data->n_scanned_metadata);
- }
+ g_idle_add (idle_check_outstanding_requests, pull_data);
+ }
ret = TRUE;
out:
@@ -1032,15 +1038,6 @@ scan_one_metadata_object_dispatch (gpointer item,
}
}
-static void
-on_metadata_worker_idle (gpointer user_data)
-{
- OtPullData *pull_data = user_data;
-
- pull_data->metadata_scan_active = FALSE;
-
- check_outstanding_requests_handle_error (pull_data, NULL);
-}
static gboolean
idle_start_worker (gpointer user_data)
@@ -1362,8 +1359,6 @@ ostree_builtin_pull (int argc, char **argv, GFile *repo_path, GError **error)
pull_data->metadata_objects_to_scan = ot_worker_queue_new ("metadatascan",
scan_one_metadata_object_dispatch,
pull_data);
- ot_worker_queue_set_idle_callback (pull_data->metadata_objects_to_scan,
- NULL, on_metadata_worker_idle, pull_data);
g_hash_table_iter_init (&hash_iter, commits_to_fetch);
while (g_hash_table_iter_next (&hash_iter, &key, &value))
@@ -1400,15 +1395,19 @@ ostree_builtin_pull (int argc, char **argv, GFile *repo_path, GError **error)
g_hash_table_insert (updated_refs, g_strdup (ref), g_strdup (sha256));
}
}
-
- g_idle_add (idle_start_worker, pull_data);
-
+
/* Start metadata thread, which kicks off further metadata requests
* as well as content fetches.
*/
- if (!run_mainloop_monitor_fetcher (pull_data))
- goto out;
+ if (!ot_worker_queue_is_idle (pull_data->metadata_objects_to_scan))
+ {
+ g_idle_add (idle_start_worker, pull_data);
+ /* Now await work completion */
+ if (!run_mainloop_monitor_fetcher (pull_data))
+ goto out;
+ }
+
if (!ostree_repo_commit_transaction (pull_data->repo, cancellable, error))
goto out;
@@ -1418,15 +1417,15 @@ ostree_builtin_pull (int argc, char **argv, GFile *repo_path, GError **error)
const char *ref = key;
const char *checksum = value;
ot_lfree char *remote_ref = NULL;
-
+
remote_ref = g_strdup_printf ("%s/%s", pull_data->remote_name, ref);
-
+
if (!ostree_repo_write_ref (pull_data->repo, pull_data->remote_name, ref, checksum, error))
goto out;
-
+
g_print ("remote %s is now %s\n", remote_ref, checksum);
}
-
+
end_time = g_get_monotonic_time ();
bytes_transferred = ostree_fetcher_bytes_transferred (pull_data->fetcher);
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]