[ostree] pull: Asynchronous metadata fetch



commit 5c1dc390ae0e51487f9451fe433f76461f4c4127
Author: Colin Walters <walters verbum org>
Date:   Tue Aug 28 09:41:09 2012 -0400

    pull: Asynchronous metadata fetch
    
    Create a worker thread for processing metadata, reserving the main
    thread for HTTP requests.
    
    This can create a very significant efficiency win for large pull
    requests since we are much more likely to keep a full pipeline open.
    
    The status display is also nicer now.

 Makefile-otutil.am              |    2 +
 src/libotutil/ot-worker-queue.c |  169 +++++++++++
 src/libotutil/ot-worker-queue.h |   57 ++++
 src/libotutil/otutil.h          |    1 +
 src/ostree/ostree-pull.c        |  635 +++++++++++++++++++++++++--------------
 5 files changed, 632 insertions(+), 232 deletions(-)
---
diff --git a/Makefile-otutil.am b/Makefile-otutil.am
index ea94e3b..58ea8f8 100644
--- a/Makefile-otutil.am
+++ b/Makefile-otutil.am
@@ -35,6 +35,8 @@ libotutil_la_SOURCES = \
 	src/libotutil/ot-spawn-utils.h \
 	src/libotutil/ot-variant-utils.c \
 	src/libotutil/ot-variant-utils.h \
+	src/libotutil/ot-worker-queue.c \
+	src/libotutil/ot-worker-queue.h \
 	src/libotutil/ot-gio-utils.c \
 	src/libotutil/ot-gio-utils.h \
 	src/libotutil/otutil.c \
diff --git a/src/libotutil/ot-worker-queue.c b/src/libotutil/ot-worker-queue.c
new file mode 100644
index 0000000..797c168
--- /dev/null
+++ b/src/libotutil/ot-worker-queue.c
@@ -0,0 +1,169 @@
+/* -*- mode: C; c-file-style: "gnu"; indent-tabs-mode: nil; -*-
+ *
+ * Copyright (C) 2012 Colin Walters <walters verbum org>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ *
+ * Author: Colin Walters <walters verbum org>
+ */
+
+#include "config.h"
+
+#include "otutil.h"
+
+#include <string.h>
+
+struct OtWorkerQueue {
+  GMutex mutex;
+  GCond cond;
+  GQueue queue;
+
+  volatile gint holds;
+
+  char *thread_name;
+  
+  gboolean destroyed;
+
+  GThread *worker;
+
+  OtWorkerQueueFunc work_func;
+  OtWorkerQueueFunc work_data;
+  
+  GMainContext *idle_context;
+  OtWorkerQueueIdleFunc idle_callback;
+  gpointer idle_data;
+};
+
+static gpointer
+ot_worker_queue_thread_main (gpointer user_data);
+
+OtWorkerQueue *
+ot_worker_queue_new (const char          *thread_name,
+                     OtWorkerQueueFunc    func,
+                     gpointer             data)
+{
+  OtWorkerQueue *queue = g_new0 (OtWorkerQueue, 1);
+  g_mutex_init (&queue->mutex);
+  g_cond_init (&queue->cond);
+  g_queue_init (&queue->queue);
+
+  queue->thread_name = g_strdup (thread_name);
+  queue->work_func = func;
+  queue->work_data = data;
+  
+  return queue;
+}
+
+void
+ot_worker_queue_start (OtWorkerQueue  *queue)
+{
+  queue->worker = g_thread_new (queue->thread_name, ot_worker_queue_thread_main, queue);
+}
+
+void
+ot_worker_queue_hold (OtWorkerQueue  *queue)
+{
+  g_atomic_int_inc (&queue->holds);
+}
+
+void
+ot_worker_queue_release (OtWorkerQueue  *queue)
+{
+  g_atomic_int_add (&queue->holds, -1);
+}
+
+void
+ot_worker_queue_push (OtWorkerQueue *queue,
+                            gpointer            data)
+{
+  g_mutex_lock (&queue->mutex);
+  g_queue_push_head (&queue->queue, data);
+  g_cond_signal (&queue->cond);
+  g_mutex_unlock (&queue->mutex);
+}
+
+static gboolean
+invoke_idle_callback (gpointer user_data)
+{
+  OtWorkerQueue *queue = user_data;
+  queue->idle_callback (queue->idle_data);
+  return FALSE;
+}
+
+static gpointer
+ot_worker_queue_thread_main (gpointer user_data)
+{
+  OtWorkerQueue *queue = user_data;
+
+  while (TRUE)
+    {
+      gpointer item;
+
+      g_mutex_lock (&queue->mutex);
+
+      while (!g_queue_peek_tail_link (&queue->queue))
+        {
+          if (queue->idle_callback && g_atomic_int_get (&queue->holds) == 0)
+            g_main_context_invoke (queue->idle_context,
+                                   invoke_idle_callback,
+                                   queue);
+          g_cond_wait (&queue->cond, &queue->mutex);
+        }
+
+      item = g_queue_pop_tail (&queue->queue);
+
+      g_mutex_unlock (&queue->mutex);
+
+      if (!item)
+        break;
+
+      queue->work_func (item, queue->work_data);
+    }
+
+  return NULL;
+}
+
+void
+ot_worker_queue_set_idle_callback (OtWorkerQueue *queue,
+                                   GMainContext  *context,
+                                   OtWorkerQueueIdleFunc idle_callback,
+                                   gpointer       data)
+{
+  g_assert (!queue->worker);
+  if (!context)
+    context = g_main_context_default ();
+  queue->idle_context = g_main_context_ref (context);
+  queue->idle_callback = idle_callback;
+  queue->idle_data = data;
+}
+
+void
+ot_worker_queue_unref (OtWorkerQueue *queue)
+{
+  if (queue->worker)
+    {
+      ot_worker_queue_push (queue, NULL);
+      g_thread_join (queue->worker);
+    }
+
+  g_free (queue->thread_name);
+
+  g_main_context_unref (queue->idle_context);
+  g_mutex_clear (&queue->mutex);
+  g_cond_clear (&queue->cond);
+  g_queue_clear (&queue->queue);
+  g_free (queue);
+}
diff --git a/src/libotutil/ot-worker-queue.h b/src/libotutil/ot-worker-queue.h
new file mode 100644
index 0000000..590480e
--- /dev/null
+++ b/src/libotutil/ot-worker-queue.h
@@ -0,0 +1,57 @@
+/* -*- mode: C; c-file-style: "gnu"; indent-tabs-mode: nil; -*-
+ *
+ * Copyright (C) 2012 Colin Walters <walters verbum org>.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ *
+ * Author: Colin Walters <walters verbum org>
+ */
+
+#ifndef __OSTREE_WORKER_QUEUE_H__
+#define __OSTREE_WORKER_QUEUE_H__
+
+#include <gio/gio.h>
+
+G_BEGIN_DECLS
+
+typedef struct OtWorkerQueue OtWorkerQueue;
+
+typedef void (*OtWorkerQueueFunc) (gpointer data,
+                                   gpointer user_data);
+typedef void (*OtWorkerQueueIdleFunc) (gpointer user_data);
+
+OtWorkerQueue *ot_worker_queue_new (const char         *thread_name,
+                                    OtWorkerQueueFunc   func,
+                                    gpointer            data);
+
+void ot_worker_queue_start (OtWorkerQueue  *queue);
+
+void ot_worker_queue_hold (OtWorkerQueue  *queue);
+void ot_worker_queue_release (OtWorkerQueue  *queue);
+
+void ot_worker_queue_set_idle_callback (OtWorkerQueue          *queue,
+                                        GMainContext           *context,
+                                        OtWorkerQueueIdleFunc   idle_callback,
+                                        gpointer                data);
+
+void ot_worker_queue_push (OtWorkerQueue      *queue,
+                           gpointer            data);
+
+void ot_worker_queue_unref (OtWorkerQueue *queue);
+
+G_END_DECLS
+
+#endif
diff --git a/src/libotutil/otutil.h b/src/libotutil/otutil.h
index c6bb4b3..841ee7b 100644
--- a/src/libotutil/otutil.h
+++ b/src/libotutil/otutil.h
@@ -35,6 +35,7 @@
     }                                                      \
   } G_STMT_END;
 
+#include <ot-worker-queue.h>
 #include <ot-local-alloc.h>
 #include <ot-keyfile-utils.h>
 #include <ot-gio-utils.h>
diff --git a/src/ostree/ostree-pull.c b/src/ostree/ostree-pull.c
index 3ccd598..af06bdb 100644
--- a/src/ostree/ostree-pull.c
+++ b/src/ostree/ostree-pull.c
@@ -28,9 +28,16 @@
  *
  * Pull refs
  *   For each ref:
- *     Pull commit
+ *     Queue scan of commit
+ *
+ * Mainloop:
+ *  Process requests, await idle scan
+ *  
+ * Async queue:
+ *  Scan commit
+ *   If already cached, recursively scan content
+ *   If not, queue fetch
  * 
- * Pull commits:
  *  For each commit:
  *    Verify checksum
  *    Import
@@ -66,14 +73,13 @@
 
 #include "ostree-fetcher.h"
 
+
 gboolean verbose;
 gboolean opt_related;
-gint opt_depth;
 
 static GOptionEntry options[] = {
   { "verbose", 'v', 0, G_OPTION_ARG_NONE, &verbose, "Show more information", NULL },
   { "related", 0, 0, G_OPTION_ARG_NONE, &opt_related, "Download related commits", NULL },
-  { "depth", 0, 0, G_OPTION_ARG_INT, &opt_depth, "Download parent commits up to this depth (default: 0)", NULL },
   { NULL },
 };
 
@@ -87,12 +93,20 @@ typedef struct {
   GHashTable   *file_checksums_to_fetch;
 
   GMainLoop    *loop;
+  GCancellable *cancellable;
 
   /* Used in meta fetch phase */
+  gboolean      fetching_metadata;
+  volatile gint n_scanned_metadata;
+  guint         n_fetched_metadata;
   guint         outstanding_uri_requests;
-  guint         outstanding_meta_requests;
 
+  GThread      *metadata_scan_thread;
+  OtWorkerQueue  *metadata_objects_to_scan;
+  GHashTable   *scanned_metadata; /* Maps object name to itself */
+  
   /* Used in content fetch phase */
+  guint         n_fetched_content;
   guint         outstanding_filemeta_requests;
   guint         outstanding_filecontent_requests;
   guint         outstanding_checksum_requests;
@@ -110,6 +124,18 @@ suburi_new (SoupURI   *base,
             const char *first,
             ...) G_GNUC_NULL_TERMINATED;
 
+static gboolean scan_one_metadata_object (OtPullData         *pull_data,
+                                          const guchar       *csum,
+                                          OstreeObjectType    objtype,
+                                          guint               recursion_depth,
+                                          GCancellable       *cancellable,
+                                          GError            **error);
+static gboolean scan_one_metadata_object_v_name (OtPullData         *pull_data,
+                                                 GVariant           *object,
+                                                 GCancellable       *cancellable,
+                                                 GError            **error);
+
+
 static SoupURI *
 suburi_new (SoupURI   *base,
             const char *first,
@@ -152,6 +178,13 @@ uri_fetch_update_status (gpointer user_data)
  
   status = g_string_new ("");
 
+  if (pull_data->fetching_metadata)
+    {
+      g_string_append_printf (status, "Metadata phase: %u fetched, %u scanned; ",
+                              g_atomic_int_get (&pull_data->n_fetched_metadata),
+                              g_atomic_int_get (&pull_data->n_scanned_metadata));
+    }
+
   if (pull_data->loose_files != NULL)
     g_string_append_printf (status, "%u loose files to fetch: ",
                             g_hash_table_size (pull_data->loose_files)
@@ -162,6 +195,7 @@ uri_fetch_update_status (gpointer user_data)
     g_string_append_printf (status, "Calculating %u checksums; ",
                             pull_data->outstanding_checksum_requests);
 
+
   fetcher_status = ostree_fetcher_query_state_text (pull_data->fetcher);
   g_string_append (status, fetcher_status);
   if (status->len > pull_data->last_padding)
@@ -183,16 +217,9 @@ uri_fetch_update_status (gpointer user_data)
 }
 
 static void
-check_outstanding_requests_handle_error (OtPullData          *pull_data,
-                                         GError              *error)
+throw_async_error (OtPullData          *pull_data,
+                   GError              *error)
 {
-  if (pull_data->outstanding_uri_requests == 0 &&
-      pull_data->outstanding_meta_requests == 0 &&
-      pull_data->outstanding_filemeta_requests == 0 &&
-      pull_data->outstanding_filecontent_requests == 0 &&
-      pull_data->outstanding_checksum_requests == 0 &&
-      (pull_data->loose_files == NULL || g_hash_table_size (pull_data->loose_files) == 0))
-    g_main_loop_quit (pull_data->loop);
   if (error)
     {
       if (!pull_data->caught_error)
@@ -209,6 +236,19 @@ check_outstanding_requests_handle_error (OtPullData          *pull_data,
 }
 
 static void
+check_outstanding_requests_handle_error (OtPullData          *pull_data,
+                                         GError              *error)
+{
+  if (pull_data->outstanding_uri_requests == 0 &&
+      pull_data->outstanding_filemeta_requests == 0 &&
+      pull_data->outstanding_filecontent_requests == 0 &&
+      pull_data->outstanding_checksum_requests == 0 &&
+      (pull_data->loose_files == NULL || g_hash_table_size (pull_data->loose_files) == 0))
+    g_main_loop_quit (pull_data->loop);
+  throw_async_error (pull_data, error);
+}
+
+static gboolean
 run_mainloop_monitor_fetcher (OtPullData   *pull_data)
 {
   GSource *update_timeout = NULL;
@@ -229,6 +269,8 @@ run_mainloop_monitor_fetcher (OtPullData   *pull_data)
       g_print ("\n");
       g_source_destroy (update_timeout);
     }
+
+  return !pull_data->caught_error;
 }
 
 typedef struct {
@@ -276,9 +318,7 @@ fetch_uri (OtPullData  *pull_data,
   ostree_fetcher_request_uri_async (pull_data->fetcher, uri, cancellable,
                                     uri_fetch_on_complete, &fetch_data);
 
-  run_mainloop_monitor_fetcher (pull_data);
-
-  if (pull_data->caught_error)
+  if (!run_mainloop_monitor_fetcher (pull_data))
     goto out;
 
   ret = TRUE;
@@ -321,103 +361,11 @@ fetch_uri_contents_utf8 (OtPullData  *pull_data,
 }
 
 static gboolean
-fetch_loose_object (OtPullData  *pull_data,
-                    const char  *checksum,
-                    OstreeObjectType objtype,
-                    GFile           **out_temp_path,
-                    GCancellable *cancellable,
-                    GError     **error)
-{
-  gboolean ret = FALSE;
-  ot_lfree char *objpath = NULL;
-  ot_lobj GFile *ret_temp_path = NULL;
-  SoupURI *obj_uri = NULL;
-
-  objpath = ostree_get_relative_object_path (checksum, objtype,
-                                             pull_data->remote_mode == OSTREE_REPO_MODE_ARCHIVE_Z);
-  obj_uri = suburi_new (pull_data->base_uri, objpath, NULL);
-  
-  if (!fetch_uri (pull_data, obj_uri, ostree_object_type_to_string (objtype), &ret_temp_path,
-                  cancellable, error))
-    goto out;
-
-  ret = TRUE;
-  ot_transfer_out_value (out_temp_path, &ret_temp_path);
- out:
-  if (obj_uri)
-    soup_uri_free (obj_uri);
-  return ret;
-}
-
-static gboolean
-fetch_and_store_metadata (OtPullData          *pull_data,
-                          const char          *checksum,
-                          OstreeObjectType     objtype,
-                          GVariant           **out_variant,
-                          GCancellable        *cancellable,
-                          GError             **error)
-{
-  gboolean ret = FALSE;
-  gboolean is_stored;
-  ot_lvariant GVariant *ret_variant = NULL;
-  ot_lobj GFile *temp_path = NULL;
-  ot_lvariant GVariant *metadata = NULL;
-
-  g_assert (OSTREE_OBJECT_TYPE_IS_META (objtype));
-
-  if (!ostree_repo_has_object (pull_data->repo, objtype, checksum, &is_stored,
-                               cancellable, error))
-    goto out;
-      
-  if (!is_stored)
-    {
-      ot_lvariant GVariant *tmp_metadata = NULL;
-      const GVariantType *vtype;
-  
-      if (!fetch_loose_object (pull_data, checksum, objtype, &temp_path, cancellable, error))
-        goto out;
-
-      switch (objtype)
-        {
-        case OSTREE_OBJECT_TYPE_DIR_TREE:
-          vtype = OSTREE_TREE_GVARIANT_FORMAT;
-          break;
-        case OSTREE_OBJECT_TYPE_DIR_META:
-          vtype = OSTREE_DIRMETA_GVARIANT_FORMAT;
-          break;
-        case OSTREE_OBJECT_TYPE_COMMIT:
-          vtype = OSTREE_COMMIT_GVARIANT_FORMAT;
-          break;
-        default:
-          g_assert_not_reached ();
-        }
-
-      if (!ot_util_variant_map (temp_path, vtype, FALSE, &tmp_metadata, error))
-        goto out;
-      
-      if (!ostree_repo_stage_metadata (pull_data->repo, objtype, checksum, tmp_metadata, NULL,
-                                       cancellable, error))
-        goto out;
-    }
-
-  if (!ostree_repo_load_variant (pull_data->repo, objtype, checksum,
-                                 &ret_variant, error))
-    goto out;
-
-  ret = TRUE;
-  ot_transfer_out_value (out_variant, &ret_variant);
- out:
-  if (temp_path)
-    (void) ot_gfile_unlink (temp_path, NULL, NULL);
-  return ret;
-}
-
-static gboolean
-fetch_and_store_tree_metadata_recurse (OtPullData   *pull_data,
-                                       int           depth,
-                                       const char   *rev,
-                                       GCancellable *cancellable,
-                                       GError      **error)
+scan_dirtree_object (OtPullData   *pull_data,
+                     const char   *checksum,
+                     int           recursion_depth,
+                     GCancellable *cancellable,
+                     GError      **error)
 {
   gboolean ret = FALSE;
   int i, n;
@@ -426,15 +374,15 @@ fetch_and_store_tree_metadata_recurse (OtPullData   *pull_data,
   ot_lvariant GVariant *dirs_variant = NULL;
   ot_lobj GFile *stored_path = NULL;
 
-  if (depth > OSTREE_MAX_RECURSION)
+  if (recursion_depth > OSTREE_MAX_RECURSION)
     {
       g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED,
                    "Exceeded maximum recursion");
       goto out;
     }
 
-  if (!fetch_and_store_metadata (pull_data, rev, OSTREE_OBJECT_TYPE_DIR_TREE,
-                                 &tree, cancellable, error))
+  if (!ostree_repo_load_variant (pull_data->repo, OSTREE_OBJECT_TYPE_DIR_TREE, checksum,
+                                 &tree, error))
     goto out;
 
   /* PARSE OSTREE_SERIALIZED_TREE_VARIANT */
@@ -473,15 +421,14 @@ fetch_and_store_tree_metadata_recurse (OtPullData   *pull_data,
       if (!ot_util_filename_validate (dirname, error))
         goto out;
 
-      g_free (tmp_checksum);
-      tmp_checksum = ostree_checksum_from_bytes_v (meta_csum);
-      if (!fetch_and_store_metadata (pull_data, tmp_checksum, OSTREE_OBJECT_TYPE_DIR_META,
-                                     NULL, cancellable, error))
+      if (!scan_one_metadata_object (pull_data, ostree_checksum_bytes_peek (tree_csum),
+                                     OSTREE_OBJECT_TYPE_DIR_TREE, recursion_depth + 1,
+                                     cancellable, error))
         goto out;
-
-      g_free (tmp_checksum);
-      tmp_checksum = ostree_checksum_from_bytes_v (tree_csum);
-      if (!fetch_and_store_tree_metadata_recurse (pull_data, depth+1, tmp_checksum, cancellable, error))
+      
+      if (!scan_one_metadata_object (pull_data, ostree_checksum_bytes_peek (meta_csum),
+                                     OSTREE_OBJECT_TYPE_DIR_META, recursion_depth + 1,
+                                     cancellable, error))
         goto out;
     }
 
@@ -491,94 +438,6 @@ fetch_and_store_tree_metadata_recurse (OtPullData   *pull_data,
 }
 
 static gboolean
-fetch_and_store_commit_metadata_recurse (OtPullData   *pull_data,
-                                         int           parent_depth,
-                                         int           related_depth,
-                                         const char   *rev,
-                                         GCancellable *cancellable,
-                                         GError      **error)
-{
-  gboolean ret = FALSE;
-  ot_lvariant GVariant *commit = NULL;
-  ot_lvariant GVariant *related_objects = NULL;
-  ot_lvariant GVariant *tree_contents_csum = NULL;
-  ot_lvariant GVariant *tree_meta_csum = NULL;
-  ot_lfree char *tmp_checksum = NULL;
-  GVariantIter *iter = NULL;
-
-  if (!fetch_and_store_metadata (pull_data, rev, OSTREE_OBJECT_TYPE_COMMIT,
-                                 &commit, cancellable, error))
-    goto out;
-
-  /* PARSE OSTREE_SERIALIZED_COMMIT_VARIANT */
-  g_variant_get_child (commit, 6, "@ay", &tree_contents_csum);
-  g_variant_get_child (commit, 7, "@ay", &tree_meta_csum);
-
-  g_free (tmp_checksum);
-  tmp_checksum = ostree_checksum_from_bytes_v (tree_meta_csum);
-  if (!fetch_and_store_metadata (pull_data, tmp_checksum, OSTREE_OBJECT_TYPE_DIR_META,
-                                 NULL, cancellable, error))
-    goto out;
-  
-  g_free (tmp_checksum);
-  tmp_checksum = ostree_checksum_from_bytes_v (tree_contents_csum);
-  if (!fetch_and_store_tree_metadata_recurse (pull_data, 0, tmp_checksum,
-                                              cancellable, error))
-    goto out;
-
-  if (opt_related)
-    {
-      const char *name;
-      ot_lvariant GVariant *csum_v = NULL;
-
-      if (parent_depth > OSTREE_MAX_RECURSION
-          || related_depth > OSTREE_MAX_RECURSION)
-        {
-          g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED,
-                       "Exceeded maximum recursion");
-          goto out;
-        }
-
-      related_objects = g_variant_get_child_value (commit, 2);
-      iter = g_variant_iter_new (related_objects);
-
-      while (g_variant_iter_loop (iter, "(&s ay)", &name, &csum_v))
-        {
-          ot_lfree char *checksum = ostree_checksum_from_bytes_v (csum_v);
-
-          /* Pass opt_depth here to ensure we aren't fetching parents of related */
-          if (!fetch_and_store_commit_metadata_recurse (pull_data, opt_depth,
-                                                        related_depth + 1, checksum,
-                                                        cancellable, error))
-            goto out;
-        }
-    }
-
-  if (parent_depth < opt_depth)
-    {
-      ot_lvariant GVariant *parent_csum_v = NULL;
-
-      parent_csum_v = g_variant_get_child_value (commit, 1);
-
-      if (g_variant_n_children (parent_csum_v) > 0)
-        {
-          ot_lfree char *checksum = ostree_checksum_from_bytes_v (parent_csum_v);
-
-          if (!fetch_and_store_commit_metadata_recurse (pull_data, parent_depth + 1,
-                                                        0, checksum,
-                                                        cancellable, error))
-            goto out;
-        }
-    }
-
-  ret = TRUE;
- out:
-  if (iter)
-    g_variant_iter_free (iter);
-  return ret;
-}
-
-static gboolean
 fetch_ref_contents (OtPullData    *pull_data,
                     const char    *ref,
                     char         **out_contents,
@@ -702,6 +561,7 @@ content_fetch_on_checksum_complete (GObject        *object,
                                           cancellable, error))
     goto out;
 
+  data->pull_data->n_fetched_content++;
  out:
   data->pull_data->outstanding_checksum_requests--;
   check_outstanding_requests_handle_error (data->pull_data, local_error);
@@ -918,9 +778,7 @@ fetch_content (OtPullData           *pull_data,
     {
       enqueue_loose_meta_requests (pull_data);
 
-      run_mainloop_monitor_fetcher (pull_data);
-
-      if (pull_data->caught_error)
+      if (!run_mainloop_monitor_fetcher (pull_data))
         goto out;
     }
 
@@ -935,6 +793,299 @@ fetch_content (OtPullData           *pull_data,
   return ret;
 }
 
+typedef struct {
+  OtPullData  *pull_data;
+  GVariant *object;
+} IdleFetchMetadataObjectData;
+
+static void
+meta_fetch_on_complete (GObject           *object,
+                        GAsyncResult      *result,
+                        gpointer           user_data)
+{
+  IdleFetchMetadataObjectData *fetch_data = user_data;
+  OtPullData *pull_data = fetch_data->pull_data;
+  ot_lobj GFile *temp_path = NULL;
+  ot_lvariant GVariant *metadata = NULL;
+  const char *checksum;
+  OstreeObjectType objtype;
+  GError *local_error = NULL;
+  GError **error = &local_error;
+
+  temp_path = ostree_fetcher_request_uri_finish ((OstreeFetcher*)object, result, error);
+  if (!temp_path)
+    goto out;
+
+  ostree_object_name_deserialize (fetch_data->object, &checksum, &objtype);
+
+  if (!ot_util_variant_map (temp_path, ostree_metadata_variant_type (objtype),
+                            FALSE, &metadata, error))
+    goto out;
+
+  if (!ostree_repo_stage_metadata (pull_data->repo, objtype, checksum, metadata, (guchar**)NULL, 
+                                   pull_data->cancellable, error))
+    goto out;
+
+  pull_data->n_fetched_metadata++;
+
+  ot_worker_queue_push (pull_data->metadata_objects_to_scan,
+                        g_variant_ref (fetch_data->object));
+  ot_worker_queue_release (pull_data->metadata_objects_to_scan);
+
+ out:
+  (void) ot_gfile_unlink (temp_path, NULL, NULL);
+  throw_async_error (pull_data, local_error);
+  g_variant_unref (fetch_data->object);
+  g_free (fetch_data);
+}
+
+static gboolean
+idle_fetch_metadata_object (gpointer data)
+{
+  IdleFetchMetadataObjectData *fetch_data = data;
+  OtPullData *pull_data = fetch_data->pull_data;
+  ot_lfree char *objpath = NULL;
+  const char *checksum;
+  OstreeObjectType objtype;
+  SoupURI *obj_uri = NULL;
+  gboolean compressed;
+
+  compressed = pull_data->remote_mode == OSTREE_REPO_MODE_ARCHIVE_Z;
+
+  ostree_object_name_deserialize (fetch_data->object, &checksum, &objtype);
+
+  objpath = ostree_get_relative_object_path (checksum, objtype, compressed);
+  obj_uri = suburi_new (pull_data->base_uri, objpath, NULL);
+
+  ostree_fetcher_request_uri_async (pull_data->fetcher, obj_uri, pull_data->cancellable,
+                                    meta_fetch_on_complete, fetch_data);
+  soup_uri_free (obj_uri);
+
+  return FALSE;
+}
+
+/**
+ * queue_metadata_object_fetch:
+ *
+ * Pass a request to the main thread to fetch a metadata object.
+ */
+static void
+queue_metadata_object_fetch (OtPullData  *pull_data,
+                             GVariant    *object)
+{
+  IdleFetchMetadataObjectData *fetch_data = g_new (IdleFetchMetadataObjectData, 1);
+  fetch_data->pull_data = pull_data;
+  fetch_data->object = g_variant_ref (object);
+  ot_worker_queue_hold (fetch_data->pull_data->metadata_objects_to_scan);
+  g_idle_add (idle_fetch_metadata_object, fetch_data);
+}
+
+static gboolean
+scan_commit_object (OtPullData         *pull_data,
+                    const char         *checksum,
+                    guint               recursion_depth,
+                    GCancellable       *cancellable,
+                    GError            **error)
+{
+  gboolean ret = FALSE;
+  ot_lvariant GVariant *commit = NULL;
+  ot_lvariant GVariant *related_objects = NULL;
+  ot_lvariant GVariant *tree_contents_csum = NULL;
+  ot_lvariant GVariant *tree_meta_csum = NULL;
+  ot_lfree char *tmp_checksum = NULL;
+  GVariantIter *iter = NULL;
+
+  if (recursion_depth > OSTREE_MAX_RECURSION)
+    {
+      g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED,
+                   "Exceeded maximum recursion");
+      goto out;
+    }
+
+  if (!ostree_repo_load_variant (pull_data->repo, OSTREE_OBJECT_TYPE_COMMIT, checksum,
+                                 &commit, error))
+    goto out;
+
+  /* PARSE OSTREE_SERIALIZED_COMMIT_VARIANT */
+  g_variant_get_child (commit, 6, "@ay", &tree_contents_csum);
+  g_variant_get_child (commit, 7, "@ay", &tree_meta_csum);
+
+  if (!scan_one_metadata_object (pull_data, ostree_checksum_bytes_peek (tree_contents_csum),
+                                 OSTREE_OBJECT_TYPE_DIR_TREE, recursion_depth + 1,
+                                 cancellable, error))
+    goto out;
+
+  if (!scan_one_metadata_object (pull_data, ostree_checksum_bytes_peek (tree_meta_csum),
+                                 OSTREE_OBJECT_TYPE_DIR_META, recursion_depth + 1,
+                                 cancellable, error))
+    goto out;
+  
+  if (opt_related)
+    {
+      const char *name;
+      ot_lvariant GVariant *csum_v = NULL;
+
+      related_objects = g_variant_get_child_value (commit, 2);
+      iter = g_variant_iter_new (related_objects);
+
+      while (g_variant_iter_loop (iter, "(&s ay)", &name, &csum_v))
+        {
+          if (!scan_one_metadata_object (pull_data, ostree_checksum_bytes_peek (csum_v),
+                                         OSTREE_OBJECT_TYPE_COMMIT, recursion_depth + 1,
+                                         cancellable, error))
+            goto out;
+        }
+    }
+
+  ret = TRUE;
+ out:
+  if (iter)
+    g_variant_iter_free (iter);
+  return ret;
+}
+
+static gboolean
+scan_one_metadata_object (OtPullData         *pull_data,
+                          const guchar       *csum,
+                          OstreeObjectType    objtype,
+                          guint               recursion_depth,
+                          GCancellable       *cancellable,
+                          GError            **error)
+{
+  gboolean ret = FALSE;
+  ot_lvariant GVariant *object = NULL;
+  ot_lfree char *tmp_checksum = NULL;
+  gboolean is_stored;
+
+  tmp_checksum = ostree_checksum_from_bytes (csum);
+  object = ostree_object_name_serialize (tmp_checksum, objtype);
+
+  if (g_hash_table_lookup (pull_data->scanned_metadata, object))
+    return TRUE;
+
+  if (!ostree_repo_has_object (pull_data->repo, objtype, tmp_checksum, &is_stored,
+                               cancellable, error))
+    goto out;
+      
+  if (!is_stored)
+    {
+      queue_metadata_object_fetch (pull_data, object);
+    }
+  else
+    {
+      switch (objtype)
+        {
+        case OSTREE_OBJECT_TYPE_COMMIT:
+          if (!scan_commit_object (pull_data, tmp_checksum, recursion_depth,
+                                   pull_data->cancellable, error))
+            goto out;
+          break;
+        case OSTREE_OBJECT_TYPE_DIR_META:
+          break;
+        case OSTREE_OBJECT_TYPE_DIR_TREE:
+          if (!scan_dirtree_object (pull_data, tmp_checksum, recursion_depth,
+                                    pull_data->cancellable, error))
+            goto out;
+          break;
+        case OSTREE_OBJECT_TYPE_FILE:
+          g_assert_not_reached ();
+          break;
+        }
+      g_hash_table_insert (pull_data->scanned_metadata, g_variant_ref (object), object);
+      g_atomic_int_inc (&pull_data->n_scanned_metadata);
+    }
+
+
+  ret = TRUE;
+ out:
+  return ret;
+}
+
+static gboolean
+scan_one_metadata_object_v_name (OtPullData         *pull_data,
+                                 GVariant           *object,
+                                 GCancellable       *cancellable,
+                                 GError            **error)
+{
+  OstreeObjectType objtype;
+  const char *checksum = NULL;
+  ot_lfree guchar *csum = NULL;
+
+  ostree_object_name_deserialize (object, &checksum, &objtype);
+  csum = ostree_checksum_to_bytes (checksum);
+
+  return scan_one_metadata_object (pull_data, csum, objtype, 0,
+                                   cancellable, error);
+}
+
+typedef struct {
+  OtPullData *pull_data;
+  GError *error;
+} IdleThrowErrorData;
+
+static gboolean
+idle_throw_error (gpointer user_data)
+{
+  IdleThrowErrorData *data = user_data;
+  
+  throw_async_error (data->pull_data, data->error);
+
+  g_free (data);
+  return FALSE;
+}
+
+/**
+ * scan_one_metadata_object_dispatch:
+ *
+ * Called from the metadatascan worker thread. If we're missing an
+ * object from one of them, we queue a request to the main thread to
+ * fetch it.  When it's fetched, we get passed the object back and
+ * scan it.
+ */
+static void
+scan_one_metadata_object_dispatch (gpointer item,
+                                   gpointer user_data)
+{
+  OtPullData *pull_data = user_data;
+  GError *local_error = NULL;
+  GError **error = &local_error;
+  ot_lvariant GVariant *v_item = NULL;
+
+  v_item = item;
+
+  if (!scan_one_metadata_object_v_name (pull_data, v_item,
+                                        pull_data->cancellable, error))
+    goto out;
+
+ out:
+  if (local_error)
+    {
+      IdleThrowErrorData *throwdata = g_new0 (IdleThrowErrorData, 1);
+      throwdata->pull_data = pull_data;
+      throwdata->error = local_error;
+      g_main_context_invoke (NULL, idle_throw_error, throwdata);
+    }
+}
+
+static void
+on_metadata_worker_idle (gpointer user_data)
+{
+  OtPullData *pull_data = user_data;
+  
+  g_main_loop_quit (pull_data->loop);
+}
+
+static gboolean
+idle_start_worker (gpointer user_data)
+{
+  OtPullData *pull_data = user_data;
+
+  ot_worker_queue_start (pull_data->metadata_objects_to_scan);
+
+  return FALSE;
+}
+
+
 static gboolean
 parse_ref_summary (const char    *contents,
                    GHashTable   **out_refs,
@@ -1104,6 +1255,9 @@ ostree_builtin_pull (int argc, char **argv, GFile *repo_path, GError **error)
   pull_data->repo = repo;
   pull_data->file_checksums_to_fetch = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, NULL);
 
+  pull_data->scanned_metadata = g_hash_table_new_full (ostree_hash_object_name, g_variant_equal,
+                                                       (GDestroyNotify)g_variant_unref, NULL);
+
   if (argc < 2)
     {
       ot_util_usage_error (context, "REMOTE must be specified", error);
@@ -1232,16 +1386,23 @@ ostree_builtin_pull (int argc, char **argv, GFile *repo_path, GError **error)
   if (!ostree_repo_prepare_transaction (pull_data->repo, NULL, error))
     goto out;
 
+  pull_data->fetching_metadata = TRUE;
+
   g_print ("Analyzing objects needed...\n");
 
+  pull_data->metadata_objects_to_scan = ot_worker_queue_new ("metadatascan",
+                                                             scan_one_metadata_object_dispatch,
+                                                             pull_data);
+  ot_worker_queue_set_idle_callback (pull_data->metadata_objects_to_scan,
+                                     NULL, on_metadata_worker_idle, pull_data);
+
   g_hash_table_iter_init (&hash_iter, commits_to_fetch);
   while (g_hash_table_iter_next (&hash_iter, &key, &value))
     {
       const char *commit = value;
-      
-      if (!fetch_and_store_commit_metadata_recurse (pull_data, 0, 0, commit,
-                                                    cancellable, error))
-        goto out;
+
+      ot_worker_queue_push (pull_data->metadata_objects_to_scan,
+                            ostree_object_name_serialize (commit, OSTREE_OBJECT_TYPE_COMMIT));
     }
 
   g_hash_table_iter_init (&hash_iter, requested_refs_to_fetch);
@@ -1259,26 +1420,26 @@ ostree_builtin_pull (int argc, char **argv, GFile *repo_path, GError **error)
       if (!ostree_repo_resolve_rev (pull_data->repo, remote_ref, TRUE, &original_rev, error))
         goto out;
 
-      /* Only skip traversal if depth == 0; otherwise, we have to
-       * handle the case where the user specified a bigger depth than
-       * they originally did.
-       */
-      if (original_rev && strcmp (sha256, original_rev) == 0 && opt_depth == 0)
+      if (original_rev && strcmp (sha256, original_rev) == 0)
         {
           g_print ("No changes in %s\n", remote_ref);
         }
       else
         {
-          if (!ostree_validate_checksum_string (sha256, error))
-            goto out;
-
-          if (!fetch_and_store_commit_metadata_recurse (pull_data, 0, 0, sha256, cancellable, error))
-            goto out;
-         
+          ot_worker_queue_push (pull_data->metadata_objects_to_scan,
+                                ostree_object_name_serialize (sha256, OSTREE_OBJECT_TYPE_COMMIT));
           g_hash_table_insert (updated_refs, g_strdup (ref), g_strdup (sha256));
         }
     }
 
+  g_idle_add (idle_start_worker, pull_data);
+
+  /* Handle queued metadata requests here */
+  if (!run_mainloop_monitor_fetcher (pull_data))
+    goto out;
+
+  pull_data->fetching_metadata = FALSE;
+
   if (!fetch_content (pull_data, cancellable, error))
     goto out;
 
@@ -1303,7 +1464,15 @@ ostree_builtin_pull (int argc, char **argv, GFile *repo_path, GError **error)
   bytes_transferred = ostree_fetcher_bytes_transferred (pull_data->fetcher);
   if (bytes_transferred > 0)
     {
-      g_print ("%" G_GUINT64_FORMAT " KiB transferred\n", (guint64)(bytes_transferred / 1024.0));
+      guint shift; 
+      if (bytes_transferred < 1024)
+        shift = 1;
+      else
+        shift = 1024;
+      g_print ("%u metadata, %u content objects fetched; %" G_GUINT64_FORMAT " %s transferred\n", 
+               pull_data->n_fetched_metadata, pull_data->n_fetched_content,
+               (guint64)(bytes_transferred / shift),
+               shift == 1 ? "B" : "KiB");
     }
 
   ret = TRUE;
@@ -1317,6 +1486,8 @@ ostree_builtin_pull (int argc, char **argv, GFile *repo_path, GError **error)
   g_free (pull_data->remote_name);
   if (pull_data->base_uri)
     soup_uri_free (pull_data->base_uri);
+  g_clear_pointer (&pull_data->metadata_objects_to_scan, (GDestroyNotify) ot_worker_queue_unref);
+  g_clear_pointer (&pull_data->scanned_metadata, (GDestroyNotify) g_hash_table_unref);
   g_clear_pointer (&pull_data->file_checksums_to_fetch, (GDestroyNotify) g_hash_table_unref);
   g_clear_pointer (&remote_config, (GDestroyNotify) g_key_file_unref);
   if (summary_uri)



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]