[ostree] pull: Rework threading communication model



commit bac4d7a0d2834a1350bc0640a1c4a02432e79a6e
Author: Colin Walters <walters verbum org>
Date:   Fri Mar 29 17:16:03 2013 -0400

    pull: Rework threading communication model
    
    Previously, I've observed bugs where we either:
    1) Exit too early, leaving undownloaded objects
    2) Hang while downloading
    
    This rewrite hopefully fixes both.

 Makefile-otutil.am                                 |    4 +-
 src/libotutil/ot-waitable-queue.c                  |  120 ++++
 .../{ot-worker-queue.h => ot-waitable-queue.h}     |   25 +-
 src/libotutil/ot-worker-queue.c                    |  140 -----
 src/libotutil/otutil.h                             |    2 +-
 src/ostree/ostree-pull.c                           |  588 ++++++++++++--------
 6 files changed, 497 insertions(+), 382 deletions(-)
---
diff --git a/Makefile-otutil.am b/Makefile-otutil.am
index 58ea8f8..8afab95 100644
--- a/Makefile-otutil.am
+++ b/Makefile-otutil.am
@@ -35,8 +35,8 @@ libotutil_la_SOURCES = \
        src/libotutil/ot-spawn-utils.h \
        src/libotutil/ot-variant-utils.c \
        src/libotutil/ot-variant-utils.h \
-       src/libotutil/ot-worker-queue.c \
-       src/libotutil/ot-worker-queue.h \
+       src/libotutil/ot-waitable-queue.c \
+       src/libotutil/ot-waitable-queue.h \
        src/libotutil/ot-gio-utils.c \
        src/libotutil/ot-gio-utils.h \
        src/libotutil/otutil.c \
diff --git a/src/libotutil/ot-waitable-queue.c b/src/libotutil/ot-waitable-queue.c
new file mode 100644
index 0000000..874499d
--- /dev/null
+++ b/src/libotutil/ot-waitable-queue.c
@@ -0,0 +1,120 @@
+/* -*- mode: C; c-file-style: "gnu"; indent-tabs-mode: nil; -*-
+ *
+ * Copyright (C) 2012 Colin Walters <walters verbum org>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ *
+ * Author: Colin Walters <walters verbum org>
+ */
+
+#include "config.h"
+
+#include "otutil.h"
+
+#include <glib-unix.h>
+#include <string.h>
+#include <sys/eventfd.h>
+#include <errno.h>
+#include <unistd.h>
+
+struct OtWaitableQueue {
+  volatile gint refcount;
+  GMutex mutex;
+  int fd;
+  gboolean read_empty;
+  GQueue queue;
+};
+
+OtWaitableQueue *
+ot_waitable_queue_new (void)
+{
+  OtWaitableQueue *queue = g_new0 (OtWaitableQueue, 1);
+  queue->refcount = 1;
+  g_mutex_init (&queue->mutex);
+  g_queue_init (&queue->queue);
+
+  queue->fd = eventfd (0, EFD_CLOEXEC | EFD_NONBLOCK);
+  g_assert (queue->fd >= 0);
+  
+  return queue;
+}
+
+void
+ot_waitable_queue_push (OtWaitableQueue *queue,
+                        gpointer         data)
+{
+  const guint64 val = 1;
+  int rval;
+
+  g_mutex_lock (&queue->mutex);
+  g_queue_push_head (&queue->queue, data);
+  do 
+    rval = write (queue->fd, &val, sizeof (val));
+  while (G_UNLIKELY (rval == -1 && errno == EINTR));
+  queue->read_empty = FALSE;
+  g_mutex_unlock (&queue->mutex);
+}
+
+gboolean
+ot_waitable_queue_pop (OtWaitableQueue *queue,
+                       gpointer        *out_data)
+{
+  gpointer ret = NULL;
+  gboolean empty = TRUE;
+  int rval;
+  guint64 val;
+
+  g_mutex_lock (&queue->mutex);
+  if (g_queue_peek_tail_link (&queue->queue) != NULL)
+    {
+      ret = g_queue_pop_tail (&queue->queue);
+      empty = FALSE;
+    }
+  else if (!queue->read_empty)
+    {
+      do
+        rval = read (queue->fd, &val, sizeof (val));
+      while (G_UNLIKELY (rval == -1 && errno == EINTR));
+      queue->read_empty = TRUE;
+    }
+  g_mutex_unlock (&queue->mutex);
+
+  *out_data = ret;
+  return !empty;
+}
+
+void
+ot_waitable_queue_ref (OtWaitableQueue *queue)
+{
+  g_atomic_int_inc (&queue->refcount);
+}
+
+void
+ot_waitable_queue_unref (OtWaitableQueue *queue)
+{
+  if (!g_atomic_int_dec_and_test (&queue->refcount))
+    return;
+  g_mutex_clear (&queue->mutex);
+  g_queue_clear (&queue->queue);
+  (void) close (queue->fd);
+  g_free (queue);
+}
+
+GSource *
+ot_waitable_queue_create_source (OtWaitableQueue   *queue)
+{
+  return g_unix_fd_source_new (queue->fd, G_IO_IN);
+}
diff --git a/src/libotutil/ot-worker-queue.h b/src/libotutil/ot-waitable-queue.h
similarity index 58%
rename from src/libotutil/ot-worker-queue.h
rename to src/libotutil/ot-waitable-queue.h
index cfd7a92..0fcd7de 100644
--- a/src/libotutil/ot-worker-queue.h
+++ b/src/libotutil/ot-waitable-queue.h
@@ -20,30 +20,27 @@
  * Author: Colin Walters <walters verbum org>
  */
 
-#ifndef __OSTREE_WORKER_QUEUE_H__
-#define __OSTREE_WORKER_QUEUE_H__
+#ifndef __OSTREE_WAITABLE_QUEUE_H__
+#define __OSTREE_WAITABLE_QUEUE_H__
 
 #include <gio/gio.h>
 
 G_BEGIN_DECLS
 
-typedef struct OtWorkerQueue OtWorkerQueue;
+typedef struct OtWaitableQueue OtWaitableQueue;
 
-typedef void (*OtWorkerQueueFunc) (gpointer data,
-                                   gpointer user_data);
+OtWaitableQueue *ot_waitable_queue_new (void);
 
-OtWorkerQueue *ot_worker_queue_new (const char         *thread_name,
-                                    OtWorkerQueueFunc   func,
-                                    gpointer            data);
+void ot_waitable_queue_push (OtWaitableQueue      *queue,
+                             gpointer              data);
 
-void ot_worker_queue_start (OtWorkerQueue  *queue);
+GSource *ot_waitable_queue_create_source (OtWaitableQueue   *queue);
 
-gboolean ot_worker_queue_is_idle (OtWorkerQueue          *queue);
+gboolean ot_waitable_queue_pop (OtWaitableQueue *queue,
+                                gpointer        *out_val);
 
-void ot_worker_queue_push (OtWorkerQueue      *queue,
-                           gpointer            data);
-
-void ot_worker_queue_unref (OtWorkerQueue *queue);
+void ot_waitable_queue_ref (OtWaitableQueue *queue);
+void ot_waitable_queue_unref (OtWaitableQueue *queue);
 
 G_END_DECLS
 
diff --git a/src/libotutil/otutil.h b/src/libotutil/otutil.h
index cb3346b..53071bc 100644
--- a/src/libotutil/otutil.h
+++ b/src/libotutil/otutil.h
@@ -37,7 +37,7 @@
     }                                                      \
   } G_STMT_END;
 
-#include <ot-worker-queue.h>
+#include <ot-waitable-queue.h>
 #include <ot-local-alloc.h>
 #include <ot-keyfile-utils.h>
 #include <ot-gio-utils.h>
diff --git a/src/ostree/ostree-pull.c b/src/ostree/ostree-pull.c
index 0953cc2..bdf8035 100644
--- a/src/ostree/ostree-pull.c
+++ b/src/ostree/ostree-pull.c
@@ -84,6 +84,20 @@ static GOptionEntry options[] = {
 };
 
 typedef struct {
+  enum {
+    PULL_MSG_SCAN_IDLE,
+    PULL_MSG_MAIN_IDLE,
+    PULL_MSG_FETCH,
+    PULL_MSG_SCAN,
+    PULL_MSG_QUIT
+  } t;
+  union {
+    guint     idle_serial;
+    GVariant *item;
+  } d;
+} PullWorkerMessage;
+
+typedef struct {
   OstreeRepo   *repo;
   char         *remote_name;
   OstreeRepoMode remote_mode;
@@ -93,21 +107,27 @@ typedef struct {
   GMainLoop    *loop;
   GCancellable *cancellable;
 
-  gboolean      metadata_scan_active;
   volatile gint n_scanned_metadata;
-  volatile gint n_requested_metadata;
-  volatile gint n_requested_content;
-  guint         n_fetched_metadata;
   guint         outstanding_uri_requests;
 
-  OtWorkerQueue  *metadata_objects_to_scan;
-  GHashTable   *scanned_metadata; /* Maps object name to itself */
-  GHashTable   *requested_content; /* Maps object name to itself */
-  guint         n_outstanding_metadata_fetches;
-  
-  guint         n_fetched_content;
-  guint         outstanding_filecontent_requests;
-  guint         outstanding_content_stage_requests;
+  GThread          *metadata_thread;
+  GMainContext     *metadata_thread_context;
+  GMainLoop        *metadata_thread_loop;
+  OtWaitableQueue  *metadata_objects_to_scan;
+  OtWaitableQueue  *metadata_objects_to_fetch;
+  GHashTable       *scanned_metadata; /* Maps object name to itself */
+  GHashTable       *requested_metadata; /* Maps object name to itself */
+  GHashTable       *requested_content; /* Maps object name to itself */
+  guint             metadata_scan_idle : 1; /* TRUE if we passed through an idle message */
+  guint             idle_serial; /* Incremented when we get a SCAN_IDLE message */
+  guint             n_outstanding_metadata_fetches;
+  guint             n_outstanding_metadata_stage_requests;
+  guint             n_outstanding_content_fetches;
+  guint             n_outstanding_content_stage_requests;
+  gint              n_requested_metadata;
+  gint              n_requested_content;
+  guint             n_fetched_metadata;
+  guint             n_fetched_content;
 
   gboolean      have_previous_bytes;
   guint64       previous_bytes_sec;
@@ -118,15 +138,10 @@ typedef struct {
 } OtPullData;
 
 typedef struct {
-  OtPullData *pull_data;
-
-  gboolean fetching_content;
-
-  GFile *meta_path;
-  GFile *content_path;
-
-  char *checksum;
-} OtFetchOneContentItemData;
+  OtPullData  *pull_data;
+  GVariant    *object;
+  GFile       *temp_path;
+} FetchObjectData;
 
 static SoupURI *
 suburi_new (SoupURI   *base,
@@ -182,46 +197,56 @@ static gboolean
 uri_fetch_update_status (gpointer user_data)
 {
   OtPullData *pull_data = user_data;
-  ot_lfree char *fetcher_status;
+  ot_lfree char *fetcher_status = NULL;
   GString *status;
   guint64 current_bytes_transferred;
   guint64 current_delta_bytes_transferred;
   guint64 delta_bytes_transferred;
+  guint outstanding_stages;
+  guint outstanding_fetches;
  
   status = g_string_new ("");
 
-  if (pull_data->metadata_scan_active)
+  if (!pull_data->metadata_scan_idle)
     g_string_append_printf (status, "scan: %u metadata; ",
                             g_atomic_int_get (&pull_data->n_scanned_metadata));
 
-  g_string_append_printf (status, "fetch: %u/%u metadata %u/%u content; ",
-                          g_atomic_int_get (&pull_data->n_fetched_metadata),
-                          g_atomic_int_get (&pull_data->n_requested_metadata),
-                          pull_data->n_fetched_content,
-                          g_atomic_int_get (&pull_data->n_requested_content));
-
-  current_bytes_transferred = ostree_fetcher_bytes_transferred (pull_data->fetcher);
-  current_delta_bytes_transferred = current_bytes_transferred - pull_data->previous_total_downloaded;
+  outstanding_stages = pull_data->n_outstanding_content_stage_requests + 
pull_data->n_outstanding_metadata_stage_requests;
+  if (outstanding_stages > 0)
+    g_string_append_printf (status, "writing: %u objects; ", outstanding_stages);
 
-  if (pull_data->have_previous_bytes)
-    delta_bytes_transferred = (guint64)(0.5 * current_delta_bytes_transferred + 0.5 * 
pull_data->previous_bytes_sec);
-  else
+  outstanding_fetches = pull_data->n_outstanding_content_fetches + pull_data->n_outstanding_metadata_fetches;
+  if (outstanding_fetches)
     {
-      pull_data->have_previous_bytes = TRUE;
-      delta_bytes_transferred = current_delta_bytes_transferred;
-    }
-  pull_data->previous_bytes_sec = delta_bytes_transferred;
-  pull_data->previous_total_downloaded = current_bytes_transferred;
+      g_string_append_printf (status, "fetch: %u/%u metadata %u/%u content; ",
+                              pull_data->n_fetched_metadata,
+                              pull_data->n_requested_metadata,
+                              pull_data->n_fetched_content,
+                              pull_data->n_requested_content);
 
-  if (delta_bytes_transferred < 1024)
-    g_string_append_printf (status, "%u B/s; ", 
-                            (guint)delta_bytes_transferred);
-  else
-    g_string_append_printf (status, "%.1f KiB/s; ", 
-                            (double)delta_bytes_transferred / 1024);
+      current_bytes_transferred = ostree_fetcher_bytes_transferred (pull_data->fetcher);
+      current_delta_bytes_transferred = current_bytes_transferred - pull_data->previous_total_downloaded;
 
-  fetcher_status = ostree_fetcher_query_state_text (pull_data->fetcher);
-  g_string_append (status, fetcher_status);
+      if (pull_data->have_previous_bytes)
+        delta_bytes_transferred = (guint64)(0.5 * current_delta_bytes_transferred + 0.5 * 
pull_data->previous_bytes_sec);
+      else
+        {
+          pull_data->have_previous_bytes = TRUE;
+          delta_bytes_transferred = current_delta_bytes_transferred;
+        }
+      pull_data->previous_bytes_sec = delta_bytes_transferred;
+      pull_data->previous_total_downloaded = current_bytes_transferred;
+      
+      if (delta_bytes_transferred < 1024)
+        g_string_append_printf (status, "%u B/s; ", 
+                                (guint)delta_bytes_transferred);
+      else
+        g_string_append_printf (status, "%.1f KiB/s; ", 
+                                (double)delta_bytes_transferred / 1024);
+
+      fetcher_status = ostree_fetcher_query_state_text (pull_data->fetcher);
+      g_string_append (status, fetcher_status);
+    }
 
   gs_console_begin_status_line (gs_console_get (), status->str, NULL, NULL);
 
@@ -230,6 +255,27 @@ uri_fetch_update_status (gpointer user_data)
   return TRUE;
 }
 
+static PullWorkerMessage *
+pull_worker_message_new (int msgtype, gpointer data)
+{
+  PullWorkerMessage *msg = g_new (PullWorkerMessage, 1);
+  msg->t = msgtype;
+  switch (msgtype)
+    {
+    case PULL_MSG_SCAN_IDLE:
+    case PULL_MSG_MAIN_IDLE:
+      msg->d.idle_serial = GPOINTER_TO_UINT (data);
+      break;
+    case PULL_MSG_SCAN:
+    case PULL_MSG_FETCH:
+      msg->d.item = data;
+      break;
+    case PULL_MSG_QUIT:
+      break;
+    }
+  return msg;
+}
+
 static void
 throw_async_error (OtPullData          *pull_data,
                    GError              *error)
@@ -253,12 +299,26 @@ static void
 check_outstanding_requests_handle_error (OtPullData          *pull_data,
                                          GError              *error)
 {
-  if ((!pull_data->metadata_objects_to_scan || ot_worker_queue_is_idle 
(pull_data->metadata_objects_to_scan)) &&
-      pull_data->outstanding_uri_requests == 0 &&
-      pull_data->outstanding_filecontent_requests == 0 &&
-      pull_data->n_outstanding_metadata_fetches == 0 &&
-      pull_data->outstanding_content_stage_requests == 0)
-    g_main_loop_quit (pull_data->loop);
+  gboolean current_fetch_idle = (pull_data->n_outstanding_metadata_fetches == 0 &&
+                                 pull_data->n_outstanding_content_fetches == 0);
+  gboolean current_stage_idle = (pull_data->n_outstanding_metadata_stage_requests == 0 &&
+                                 pull_data->n_outstanding_content_stage_requests == 0);
+
+  g_debug ("pull: scan: %u fetching: %u staging: %u",
+           !pull_data->metadata_scan_idle, !current_fetch_idle, !current_stage_idle);
+
+  /* This is true in the phase when we're fetching refs */
+  if (pull_data->metadata_objects_to_scan == NULL)
+    {
+      if (pull_data->outstanding_uri_requests == 0)
+        g_main_loop_quit (pull_data->loop);
+      return;
+    }
+  else if (pull_data->metadata_scan_idle && current_fetch_idle && current_stage_idle)
+    {
+      g_main_loop_quit (pull_data->loop);
+    }
+
   throw_async_error (pull_data, error);
 }
 
@@ -287,6 +347,7 @@ run_mainloop_monitor_fetcher (OtPullData   *pull_data)
       g_source_unref (update_timeout);
     }
   
+  g_idle_add (idle_check_outstanding_requests, pull_data);
   g_main_loop_run (pull_data->loop);
 
   if (console)
@@ -337,7 +398,6 @@ fetch_uri (OtPullData  *pull_data,
   fetch_data.pull_data = pull_data;
 
   uri_string = soup_uri_to_string (uri, FALSE);
-  g_print ("Fetching %s\n", uri_string);
 
   pull_data->outstanding_uri_requests++;
   ostree_fetcher_request_uri_async (pull_data->fetcher, uri, cancellable,
@@ -386,9 +446,6 @@ fetch_uri_contents_utf8 (OtPullData  *pull_data,
 }
 
 static gboolean
-idle_queue_content_request (gpointer user_data);
-
-static gboolean
 scan_dirtree_object (OtPullData   *pull_data,
                      const char   *checksum,
                      int           recursion_depth,
@@ -422,7 +479,6 @@ scan_dirtree_object (OtPullData   *pull_data,
     {
       const char *filename;
       gboolean file_is_stored;
-      OtFetchOneContentItemData *idle_fetch_data;
       ot_lvariant GVariant *csum = NULL;
       ot_lfree char *file_checksum;
 
@@ -436,21 +492,15 @@ scan_dirtree_object (OtPullData   *pull_data,
       if (!ostree_repo_has_object (pull_data->repo, OSTREE_OBJECT_TYPE_FILE, file_checksum,
                                    &file_is_stored, cancellable, error))
         goto out;
-
+      
       if (!file_is_stored && !g_hash_table_lookup (pull_data->requested_content, file_checksum))
         {
-          char *duped_checksum;
-
-          idle_fetch_data = g_new0 (OtFetchOneContentItemData, 1);
-          idle_fetch_data->pull_data = pull_data;
-          idle_fetch_data->checksum = file_checksum;
-          file_checksum = NULL; /* Transfer ownership */
-
-          duped_checksum = g_strdup (idle_fetch_data->checksum);
-          g_hash_table_insert (pull_data->requested_content, duped_checksum, duped_checksum);
-
-          g_atomic_int_inc (&pull_data->n_requested_content);
-          g_main_context_invoke (NULL, idle_queue_content_request, idle_fetch_data);
+          g_hash_table_insert (pull_data->requested_content, file_checksum, file_checksum);
+      
+          ot_waitable_queue_push (pull_data->metadata_objects_to_fetch,
+                                  pull_worker_message_new (PULL_MSG_FETCH,
+                                                           ostree_object_name_serialize (file_checksum, 
OSTREE_OBJECT_TYPE_FILE)));
+          file_checksum = NULL; /* Transfer ownership to hash */
         }
     }
       
@@ -514,26 +564,16 @@ fetch_ref_contents (OtPullData    *pull_data,
 }
 
 static void
-destroy_fetch_one_content_item_data (OtFetchOneContentItemData *data)
-{
-  if (data->meta_path)
-    (void) gs_file_unlink (data->meta_path, NULL, NULL);
-  g_clear_object (&data->meta_path);
-  if (data->content_path)
-    (void) gs_file_unlink (data->content_path, NULL, NULL);
-  g_clear_object (&data->content_path);
-  g_free (data->checksum);
-  g_free (data);
-}
-
-static void
 content_fetch_on_stage_complete (GObject        *object,
                                  GAsyncResult   *result,
                                  gpointer        user_data)
 {
-  OtFetchOneContentItemData *data = user_data;
+  FetchObjectData *fetch_data = user_data;
+  OtPullData *pull_data = fetch_data->pull_data;
   GError *local_error = NULL;
   GError **error = &local_error;
+  OstreeObjectType objtype;
+  const char *expected_checksum;
   ot_lfree guchar *csum = NULL;
   ot_lfree char *checksum = NULL;
 
@@ -543,27 +583,30 @@ content_fetch_on_stage_complete (GObject        *object,
 
   checksum = ostree_checksum_from_bytes (csum);
 
-  g_assert (strcmp (checksum, data->checksum) == 0);
+  ostree_object_name_deserialize (fetch_data->object, &expected_checksum, &objtype);
+  g_assert (objtype == OSTREE_OBJECT_TYPE_FILE);
 
-  data->pull_data->n_fetched_content++;
- out:
-  data->pull_data->outstanding_content_stage_requests--;
-  check_outstanding_requests_handle_error (data->pull_data, local_error);
-  destroy_fetch_one_content_item_data (data);
-}
+  g_debug ("stage of %s complete", ostree_object_to_string (checksum, objtype));
 
-static void
-content_fetch_on_complete (GObject        *object,
-                           GAsyncResult   *result,
-                           gpointer        user_data);
+  g_assert (strcmp (checksum, expected_checksum) == 0);
 
+  pull_data->n_fetched_content++;
+ out:
+  pull_data->n_outstanding_content_stage_requests--;
+  check_outstanding_requests_handle_error (pull_data, local_error);
+  (void) gs_file_unlink (fetch_data->temp_path, NULL, NULL);
+  g_object_unref (fetch_data->temp_path);
+  g_variant_unref (fetch_data->object);
+  g_free (fetch_data);
+}
 
 static void
 content_fetch_on_complete (GObject        *object,
                            GAsyncResult   *result,
                            gpointer        user_data) 
 {
-  OtFetchOneContentItemData *data = user_data;
+  FetchObjectData *fetch_data = user_data;
+  OtPullData *pull_data = fetch_data->pull_data;
   GError *local_error = NULL;
   GError **error = &local_error;
   GCancellable *cancellable = NULL;
@@ -575,13 +618,19 @@ content_fetch_on_complete (GObject        *object,
   ot_lvariant GVariant *xattrs = NULL;
   ot_lobj GInputStream *file_in = NULL;
   ot_lobj GInputStream *object_input = NULL;
+  const char *checksum;
+  OstreeObjectType objtype;
 
-  data->content_path = ostree_fetcher_request_uri_finish ((OstreeFetcher*)object, result, error);
-  if (!data->content_path)
+  fetch_data->temp_path = ostree_fetcher_request_uri_finish ((OstreeFetcher*)object, result, error);
+  if (!fetch_data->temp_path)
     goto out;
 
-  g_assert (data->content_path != NULL);
-  if (!ostree_content_file_parse (TRUE, data->content_path, FALSE,
+  ostree_object_name_deserialize (fetch_data->object, &checksum, &objtype);
+  g_assert (objtype == OSTREE_OBJECT_TYPE_FILE);
+
+  g_debug ("fetch of %s complete", ostree_object_to_string (checksum, objtype));
+
+  if (!ostree_content_file_parse (TRUE, fetch_data->temp_path, FALSE,
                                   &file_in, &file_info, &xattrs,
                                   cancellable, error))
     goto out;
@@ -591,62 +640,56 @@ content_fetch_on_complete (GObject        *object,
                                           cancellable, error))
     goto out;
   
-  data->pull_data->outstanding_content_stage_requests++;
-  ostree_repo_stage_content_async (data->pull_data->repo, data->checksum,
+  pull_data->n_outstanding_content_stage_requests++;
+  ostree_repo_stage_content_async (pull_data->repo, checksum,
                                    object_input, length,
                                    cancellable,
-                                   content_fetch_on_stage_complete, data);
+                                   content_fetch_on_stage_complete, fetch_data);
 
  out:
-  data->pull_data->outstanding_filecontent_requests--;
-  check_outstanding_requests_handle_error (data->pull_data, local_error);
+  pull_data->n_outstanding_content_fetches--;
+  check_outstanding_requests_handle_error (pull_data, local_error);
 }
 
-static gboolean
-idle_queue_content_request (gpointer user_data)
-{
-  OtFetchOneContentItemData *data = user_data;
-  OtPullData *pull_data = data->pull_data;
-  const char *checksum = data->checksum;
-  ot_lfree char *objpath = NULL;
-  SoupURI *obj_uri = NULL;
-
-  objpath = ostree_get_relative_object_path (checksum, OSTREE_OBJECT_TYPE_FILE, TRUE);
-  obj_uri = suburi_new (pull_data->base_uri, objpath, NULL);
-      
-  ostree_fetcher_request_uri_async (pull_data->fetcher, obj_uri, pull_data->cancellable,
-                                    content_fetch_on_complete, data);
-  soup_uri_free (obj_uri);
-  
-  pull_data->outstanding_filecontent_requests++;
-
-  return FALSE;
-}
-
-typedef struct {
-  OtPullData  *pull_data;
-  GVariant *object;
-  GFile *temp_path;
-} IdleFetchMetadataObjectData;
-
 static void
 on_metadata_staged (GObject           *object,
                     GAsyncResult      *result,
                     gpointer           user_data)
 {
-  IdleFetchMetadataObjectData *fetch_data = user_data;
+  FetchObjectData *fetch_data = user_data;
   OtPullData *pull_data = fetch_data->pull_data;
+  GError *local_error = NULL;
+  GError **error = &local_error;
+  const char *expected_checksum;
+  OstreeObjectType objtype;
+  gs_free char *checksum = NULL;
+  gs_free guchar *csum = NULL;
 
-  pull_data->n_fetched_metadata++;
-  pull_data->n_outstanding_metadata_fetches--;
+  if (!ostree_repo_stage_metadata_finish ((OstreeRepo*)object, result, 
+                                          &csum, error))
+    goto out;
 
-  ot_worker_queue_push (pull_data->metadata_objects_to_scan,
-                        g_variant_ref (fetch_data->object));
+  checksum = ostree_checksum_from_bytes (csum);
+
+  ostree_object_name_deserialize (fetch_data->object, &expected_checksum, &objtype);
+  g_assert (OSTREE_OBJECT_TYPE_IS_META (objtype));
+
+  g_debug ("stage of %s complete", ostree_object_to_string (checksum, objtype));
 
+  g_assert (strcmp (checksum, expected_checksum) == 0);
+
+  pull_data->metadata_scan_idle = FALSE;
+  ot_waitable_queue_push (pull_data->metadata_objects_to_scan,
+                          pull_worker_message_new (PULL_MSG_SCAN,
+                                                  g_variant_ref (fetch_data->object)));
+ out:
+  pull_data->n_outstanding_metadata_stage_requests--;
   (void) gs_file_unlink (fetch_data->temp_path, NULL, NULL);
   g_object_unref (fetch_data->temp_path);
   g_variant_unref (fetch_data->object);
   g_free (fetch_data);
+
+  check_outstanding_requests_handle_error (pull_data, local_error);
 }
 
 static void
@@ -654,7 +697,7 @@ meta_fetch_on_complete (GObject           *object,
                         GAsyncResult      *result,
                         gpointer           user_data)
 {
-  IdleFetchMetadataObjectData *fetch_data = user_data;
+  FetchObjectData *fetch_data = user_data;
   OtPullData *pull_data = fetch_data->pull_data;
   ot_lvariant GVariant *metadata = NULL;
   const char *checksum;
@@ -668,6 +711,8 @@ meta_fetch_on_complete (GObject           *object,
 
   ostree_object_name_deserialize (fetch_data->object, &checksum, &objtype);
 
+  g_debug ("fetch of %s complete", ostree_object_to_string (checksum, objtype));
+
   if (!ot_util_variant_map (fetch_data->temp_path, ostree_metadata_variant_type (objtype),
                             FALSE, &metadata, error))
     goto out;
@@ -676,7 +721,10 @@ meta_fetch_on_complete (GObject           *object,
                                     pull_data->cancellable,
                                     on_metadata_staged, fetch_data);
 
+  pull_data->n_outstanding_metadata_stage_requests++;
  out:
+  pull_data->n_outstanding_metadata_fetches--;
+  pull_data->n_fetched_metadata++;
   throw_async_error (pull_data, local_error);
   if (local_error)
     {
@@ -686,44 +734,6 @@ meta_fetch_on_complete (GObject           *object,
 }
 
 static gboolean
-idle_fetch_metadata_object (gpointer data)
-{
-  IdleFetchMetadataObjectData *fetch_data = data;
-  OtPullData *pull_data = fetch_data->pull_data;
-  ot_lfree char *objpath = NULL;
-  const char *checksum;
-  OstreeObjectType objtype;
-  SoupURI *obj_uri = NULL;
-
-  ostree_object_name_deserialize (fetch_data->object, &checksum, &objtype);
-
-  objpath = ostree_get_relative_object_path (checksum, objtype, TRUE);
-  obj_uri = suburi_new (pull_data->base_uri, objpath, NULL);
-
-  pull_data->n_outstanding_metadata_fetches++;
-  ostree_fetcher_request_uri_async (pull_data->fetcher, obj_uri, pull_data->cancellable,
-                                    meta_fetch_on_complete, fetch_data);
-  soup_uri_free (obj_uri);
-
-  return FALSE;
-}
-
-/**
- * queue_metadata_object_fetch:
- *
- * Pass a request to the main thread to fetch a metadata object.
- */
-static void
-queue_metadata_object_fetch (OtPullData  *pull_data,
-                             GVariant    *object)
-{
-  IdleFetchMetadataObjectData *fetch_data = g_new (IdleFetchMetadataObjectData, 1);
-  fetch_data->pull_data = pull_data;
-  fetch_data->object = g_variant_ref (object);
-  g_idle_add (idle_fetch_metadata_object, fetch_data);
-}
-
-static gboolean
 scan_commit_object (OtPullData         *pull_data,
                     const char         *checksum,
                     guint               recursion_depth,
@@ -798,6 +808,7 @@ scan_one_metadata_object (OtPullData         *pull_data,
   gboolean ret = FALSE;
   ot_lvariant GVariant *object = NULL;
   ot_lfree char *tmp_checksum = NULL;
+  gboolean is_requested;
   gboolean is_stored;
 
   tmp_checksum = ostree_checksum_from_bytes (csum);
@@ -806,16 +817,21 @@ scan_one_metadata_object (OtPullData         *pull_data,
   if (g_hash_table_lookup (pull_data->scanned_metadata, object))
     return TRUE;
 
+  is_requested = g_hash_table_lookup (pull_data->requested_metadata, tmp_checksum) != NULL;
   if (!ostree_repo_has_object (pull_data->repo, objtype, tmp_checksum, &is_stored,
                                cancellable, error))
     goto out;
-      
-  if (!is_stored)
+  
+  if (!is_stored && !is_requested)
     {
-      g_atomic_int_inc (&pull_data->n_requested_metadata);
-      queue_metadata_object_fetch (pull_data, object);
+      char *duped_checksum = g_strdup (tmp_checksum);
+      g_hash_table_insert (pull_data->requested_metadata, duped_checksum, duped_checksum);
+      
+      ot_waitable_queue_push (pull_data->metadata_objects_to_fetch,
+                              pull_worker_message_new (PULL_MSG_FETCH,
+                                                       g_variant_ref (object)));
     }
-  else
+  else if (is_stored)
     {
       switch (objtype)
         {
@@ -837,8 +853,6 @@ scan_one_metadata_object (OtPullData         *pull_data,
         }
       g_hash_table_insert (pull_data->scanned_metadata, g_variant_ref (object), object);
       g_atomic_int_inc (&pull_data->n_scanned_metadata);
-
-      g_idle_add (idle_check_outstanding_requests, pull_data);
     }
 
   ret = TRUE;
@@ -879,28 +893,48 @@ idle_throw_error (gpointer user_data)
   return FALSE;
 }
 
-/**
- * scan_one_metadata_object_dispatch:
- *
- * Called from the metadatascan worker thread. If we're missing an
- * object from one of them, we queue a request to the main thread to
- * fetch it.  When it's fetched, we get passed the object back and
- * scan it.
- */
-static void
-scan_one_metadata_object_dispatch (gpointer item,
-                                   gpointer user_data)
+static gboolean
+on_metadata_objects_to_scan_ready (gint         fd,
+                                   GIOCondition condition,
+                                   gpointer     user_data)
 {
   OtPullData *pull_data = user_data;
+  PullWorkerMessage *msg;
+  PullWorkerMessage *last_idle_msg = NULL;
   GError *local_error = NULL;
   GError **error = &local_error;
-  ot_lvariant GVariant *v_item = NULL;
-
-  v_item = item;
 
-  if (!scan_one_metadata_object_v_name (pull_data, v_item,
-                                        pull_data->cancellable, error))
-    goto out;
+  while (ot_waitable_queue_pop (pull_data->metadata_objects_to_scan, (gpointer*)&msg))
+    {
+      if (msg->t == PULL_MSG_SCAN)
+        {
+          if (!scan_one_metadata_object_v_name (pull_data, msg->d.item,
+                                                pull_data->cancellable, error))
+            goto out;
+          g_variant_unref (msg->d.item);
+          g_free (msg);
+        }
+      else if (msg->t == PULL_MSG_MAIN_IDLE)
+        {
+          g_free (last_idle_msg);
+          last_idle_msg = msg;
+        }
+      else if (msg->t == PULL_MSG_QUIT)
+        {
+          g_free (msg);
+          g_main_loop_quit (pull_data->metadata_thread_loop);
+        }
+      else
+        g_assert_not_reached ();
+      }
+    
+  if (last_idle_msg)
+    ot_waitable_queue_push (pull_data->metadata_objects_to_fetch,
+                            last_idle_msg);
+  
+  /* When we have no queue to process, notify the main thread */
+  ot_waitable_queue_push (pull_data->metadata_objects_to_fetch,
+                          pull_worker_message_new (PULL_MSG_SCAN_IDLE, GUINT_TO_POINTER (0)));
 
  out:
   if (local_error)
@@ -910,17 +944,107 @@ scan_one_metadata_object_dispatch (gpointer item,
       throwdata->error = local_error;
       g_main_context_invoke (NULL, idle_throw_error, throwdata);
     }
+  return TRUE;
 }
 
+/**
+ * metadata_thread_main:
+ *
+ * Called from the metadatascan worker thread. If we're missing an
+ * object from one of them, we queue a request to the main thread to
+ * fetch it.  When it's fetched, we get passed the object back and
+ * scan it.
+ */
+static gpointer
+metadata_thread_main (gpointer user_data)
+{
+  OtPullData *pull_data = user_data;
+  GSource *src;
+
+  pull_data->metadata_thread_context = g_main_context_new ();
+  pull_data->metadata_thread_loop = g_main_loop_new (pull_data->metadata_thread_context, TRUE);
+
+  src = ot_waitable_queue_create_source (pull_data->metadata_objects_to_scan);
+  g_source_set_callback (src, (GSourceFunc)on_metadata_objects_to_scan_ready, pull_data, NULL);
+  g_source_attach (src, pull_data->metadata_thread_context);
+  g_source_unref (src);
+
+  g_main_loop_run (pull_data->metadata_thread_loop);
+  return NULL;
+}
 
 static gboolean
-idle_start_worker (gpointer user_data)
+on_metadata_objects_to_fetch_ready (gint         fd,
+                                    GIOCondition condition,
+                                    gpointer     user_data)
 {
   OtPullData *pull_data = user_data;
+  PullWorkerMessage *msg;
+
+  if (!ot_waitable_queue_pop (pull_data->metadata_objects_to_fetch, (gpointer*)&msg))
+    goto out;
 
-  ot_worker_queue_start (pull_data->metadata_objects_to_scan);
+  if (msg->t == PULL_MSG_MAIN_IDLE)
+    {
+      if (msg->d.idle_serial == pull_data->idle_serial)
+        {
+          g_assert (!pull_data->metadata_scan_idle);
+          pull_data->metadata_scan_idle = TRUE;
+          g_debug ("pull: metadata scan is idle");
+        }
+    }
+  else if (msg->t == PULL_MSG_SCAN_IDLE)
+    {
+      if (!pull_data->metadata_scan_idle)
+        {
+          g_debug ("pull: queue MAIN_IDLE");
+          pull_data->idle_serial++;
+          ot_waitable_queue_push (pull_data->metadata_objects_to_scan,
+                                  pull_worker_message_new (PULL_MSG_MAIN_IDLE, GUINT_TO_POINTER 
(pull_data->idle_serial)));
+        }
+    }
+  else if (msg->t == PULL_MSG_FETCH)
+    {
+      const char *checksum;
+      gs_free char *objpath = NULL;
+      OstreeObjectType objtype;
+      SoupURI *obj_uri = NULL;
+      gboolean is_meta;
+      FetchObjectData *fetch_data;
+      
+      ostree_object_name_deserialize (msg->d.item, &checksum, &objtype);
+      objpath = ostree_get_relative_object_path (checksum, objtype, TRUE);
+      obj_uri = suburi_new (pull_data->base_uri, objpath, NULL);
 
-  return FALSE;
+      is_meta = OSTREE_OBJECT_TYPE_IS_META (objtype);
+      if (is_meta)
+        {
+          pull_data->n_outstanding_metadata_fetches++;
+          pull_data->n_requested_metadata++;
+        }
+      else
+        {
+          pull_data->n_outstanding_content_fetches++;
+          pull_data->n_requested_content++;
+        }
+      fetch_data = g_new (FetchObjectData, 1);
+      fetch_data->pull_data = pull_data;
+      fetch_data->object = g_variant_ref (msg->d.item);
+      ostree_fetcher_request_uri_async (pull_data->fetcher, obj_uri, pull_data->cancellable,
+                                        is_meta ? meta_fetch_on_complete : content_fetch_on_complete, 
fetch_data);
+      soup_uri_free (obj_uri);
+      g_variant_unref (msg->d.item);
+    }
+  else
+    {
+      g_assert_not_reached ();
+    }
+  g_free (msg);
+
+ out:
+  check_outstanding_requests_handle_error (pull_data, NULL);
+  
+  return TRUE;
 }
 
 static gboolean
@@ -1097,6 +1221,8 @@ ostree_builtin_pull (int argc, char **argv, GFile *repo_path, GError **error)
                                                        (GDestroyNotify)g_variant_unref, NULL);
   pull_data->requested_content = g_hash_table_new_full (g_str_hash, g_str_equal,
                                                         (GDestroyNotify)g_free, NULL);
+  pull_data->requested_metadata = g_hash_table_new_full (g_str_hash, g_str_equal,
+                                                         (GDestroyNotify)g_free, NULL);
 
   if (argc < 2)
     {
@@ -1223,19 +1349,18 @@ ostree_builtin_pull (int argc, char **argv, GFile *repo_path, GError **error)
   if (!ostree_repo_prepare_transaction (pull_data->repo, FALSE, NULL, error))
     goto out;
 
-  pull_data->metadata_scan_active = TRUE;
-
-  pull_data->metadata_objects_to_scan = ot_worker_queue_new ("metadatascan",
-                                                             scan_one_metadata_object_dispatch,
-                                                             pull_data);
+  pull_data->metadata_objects_to_fetch = ot_waitable_queue_new ();
+  pull_data->metadata_objects_to_scan = ot_waitable_queue_new ();
+  pull_data->metadata_thread = g_thread_new ("metadatascan", metadata_thread_main, pull_data);
 
   g_hash_table_iter_init (&hash_iter, commits_to_fetch);
   while (g_hash_table_iter_next (&hash_iter, &key, &value))
     {
       const char *commit = value;
 
-      ot_worker_queue_push (pull_data->metadata_objects_to_scan,
-                            ostree_object_name_serialize (commit, OSTREE_OBJECT_TYPE_COMMIT));
+      ot_waitable_queue_push (pull_data->metadata_objects_to_scan,
+                              pull_worker_message_new (PULL_MSG_SCAN,
+                                                       ostree_object_name_serialize (commit, 
OSTREE_OBJECT_TYPE_COMMIT)));
     }
 
   g_hash_table_iter_init (&hash_iter, requested_refs_to_fetch);
@@ -1259,23 +1384,28 @@ ostree_builtin_pull (int argc, char **argv, GFile *repo_path, GError **error)
         }
       else
         {
-          ot_worker_queue_push (pull_data->metadata_objects_to_scan,
-                                ostree_object_name_serialize (sha256, OSTREE_OBJECT_TYPE_COMMIT));
+          ot_waitable_queue_push (pull_data->metadata_objects_to_scan,
+                                  pull_worker_message_new (PULL_MSG_SCAN,
+                                                           ostree_object_name_serialize (sha256, 
OSTREE_OBJECT_TYPE_COMMIT)));
           g_hash_table_insert (updated_refs, g_strdup (ref), g_strdup (sha256));
         }
     }
   
-  /* Start metadata thread, which kicks off further metadata requests
-   * as well as content fetches.
-   */
-  if (!ot_worker_queue_is_idle (pull_data->metadata_objects_to_scan))
-    {
-      g_idle_add (idle_start_worker, pull_data);
-
-      /* Now await work completion */
-      if (!run_mainloop_monitor_fetcher (pull_data))
-        goto out;
-    }
+  {
+    GSource *src = ot_waitable_queue_create_source (pull_data->metadata_objects_to_fetch);
+    g_source_set_callback (src, (GSourceFunc)on_metadata_objects_to_fetch_ready, pull_data, NULL);
+    g_source_attach (src, NULL);
+    g_source_unref (src);
+  }
+
+  /* Prime the message queue */
+  pull_data->idle_serial++;
+  ot_waitable_queue_push (pull_data->metadata_objects_to_scan,
+                          pull_worker_message_new (PULL_MSG_MAIN_IDLE, GUINT_TO_POINTER 
(pull_data->idle_serial)));
+  
+  /* Now await work completion */
+  if (!run_mainloop_monitor_fetcher (pull_data))
+    goto out;
   
   if (!ostree_repo_commit_transaction (pull_data->repo, cancellable, error))
     goto out;
@@ -1323,9 +1453,17 @@ ostree_builtin_pull (int argc, char **argv, GFile *repo_path, GError **error)
   g_free (pull_data->remote_name);
   if (pull_data->base_uri)
     soup_uri_free (pull_data->base_uri);
-  g_clear_pointer (&pull_data->metadata_objects_to_scan, (GDestroyNotify) ot_worker_queue_unref);
+  if (pull_data->metadata_thread)
+    {
+      ot_waitable_queue_push (pull_data->metadata_objects_to_scan,
+                              pull_worker_message_new (PULL_MSG_QUIT, NULL));
+      g_thread_join (pull_data->metadata_thread);
+    }
+  g_clear_pointer (&pull_data->metadata_objects_to_scan, (GDestroyNotify) ot_waitable_queue_unref);
+  g_clear_pointer (&pull_data->metadata_objects_to_fetch, (GDestroyNotify) ot_waitable_queue_unref);
   g_clear_pointer (&pull_data->scanned_metadata, (GDestroyNotify) g_hash_table_unref);
   g_clear_pointer (&pull_data->requested_content, (GDestroyNotify) g_hash_table_unref);
+  g_clear_pointer (&pull_data->requested_metadata, (GDestroyNotify) g_hash_table_unref);
   g_clear_pointer (&remote_config, (GDestroyNotify) g_key_file_unref);
   if (summary_uri)
     soup_uri_free (summary_uri);



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]