[ostree] pull-local: Make multithreaded



commit 05e7b6d5964540429ba0346212b635b738caa35f
Author: Colin Walters <walters verbum org>
Date:   Mon Nov 19 11:35:30 2012 -0500

    pull-local: Make multithreaded
    
    We were blocking for easily 1/10 or 1/5 of a second in fdatasync(),
    which drastically slows down the whole process.
    
    This threading isn't quite as good as the ostree-pull command, but it
    lets us avoid the dependency on libsoup everywhere, and it's simpler.

 src/libotutil/ot-spawn-utils.c     |   22 ++++++
 src/libotutil/ot-spawn-utils.h     |    4 +
 src/ostree/ot-builtin-pull-local.c |  141 +++++++++++++++++++++++++-----------
 3 files changed, 124 insertions(+), 43 deletions(-)
---
diff --git a/src/libotutil/ot-spawn-utils.c b/src/libotutil/ot-spawn-utils.c
index e125661..c05ac3b 100644
--- a/src/libotutil/ot-spawn-utils.c
+++ b/src/libotutil/ot-spawn-utils.c
@@ -56,3 +56,25 @@ ot_spawn_sync_checked (const char           *cwd,
  out:
   return ret;
 }
+
+/**
+ * ot_thread_pool_new_nproc:
+ *
+ * Like g_thread_pool_new (), but choose number of threads appropriate
+ * for CPU bound workers automatically.  Also aborts on error.
+ */
+GThreadPool *
+ot_thread_pool_new_nproc (GFunc     func,
+                          gpointer  user_data)
+{
+  long nproc_onln;
+  GThreadPool *ret;
+  GError *local_error = NULL;
+
+  nproc_onln = sysconf (_SC_NPROCESSORS_ONLN);
+  if (G_UNLIKELY (nproc_onln == -1 && errno == EINVAL))
+    nproc_onln = 2;
+  ret = g_thread_pool_new (func, user_data, (int)nproc_onln, FALSE, &local_error);
+  g_assert_no_error (local_error);
+  return ret;
+}
diff --git a/src/libotutil/ot-spawn-utils.h b/src/libotutil/ot-spawn-utils.h
index 86b4beb..6bd4058 100644
--- a/src/libotutil/ot-spawn-utils.h
+++ b/src/libotutil/ot-spawn-utils.h
@@ -37,6 +37,10 @@ gboolean ot_spawn_sync_checked (const char           *cwd,
                                 char                **stderr_data,
                                 GError              **error);
 
+GThreadPool * ot_thread_pool_new_nproc (GFunc     func,
+                                        gpointer  user_data);
+                                        
+
 G_END_DECLS
 
 #endif
diff --git a/src/ostree/ot-builtin-pull-local.c b/src/ostree/ot-builtin-pull-local.c
index 94f0b88..c45ae4e 100644
--- a/src/ostree/ot-builtin-pull-local.c
+++ b/src/ostree/ot-builtin-pull-local.c
@@ -35,6 +35,12 @@ static GOptionEntry options[] = {
 typedef struct {
   OstreeRepo *src_repo;
   OstreeRepo *dest_repo;
+  GThreadPool *threadpool;
+  GMainLoop *loop;
+  gboolean stdout_is_tty;
+  int n_objects_to_check;
+  volatile int n_objects_checked;
+  volatile int n_objects_copied;
 } OtLocalCloneData;
 
 static gboolean
@@ -84,11 +90,61 @@ import_one_object (OtLocalCloneData *data,
         goto out;
     }
 
+  g_atomic_int_inc (&data->n_objects_copied);
+
   ret = TRUE;
  out:
   return ret;
 }
 
+static void
+import_one_object_thread (gpointer   object,
+                          gpointer   user_data)
+{
+  OtLocalCloneData *data = user_data;
+  ot_lvariant GVariant *serialized_key = object;
+  GError *local_error = NULL;
+  GError **error = &local_error;
+  const char *checksum;
+  OstreeObjectType objtype;
+  gboolean has_object;
+  GCancellable *cancellable = NULL;
+
+  ostree_object_name_deserialize (serialized_key, &checksum, &objtype);
+
+  if (!ostree_repo_has_object (data->dest_repo, objtype, checksum, &has_object,
+                               cancellable, error))
+    goto out;
+
+  if (!has_object)
+    {
+      if (!import_one_object (data, checksum, objtype, cancellable, error))
+        goto out;
+    }
+  
+ out:
+  if (g_atomic_int_add (&data->n_objects_checked, 1) == data->n_objects_to_check - 1)
+    g_main_loop_quit (data->loop);
+  if (local_error != NULL)
+    {
+      g_printerr ("%s\n", local_error->message);
+      exit (1);
+    }
+}
+
+static gboolean
+idle_print_status (gpointer user_data)
+{
+  OtLocalCloneData *data = user_data;
+
+  g_print ("%c8pull: %d/%d scanned, %d objects copied", 0x1B,
+           g_atomic_int_get (&data->n_objects_checked),
+           data->n_objects_to_check,
+           g_atomic_int_get (&data->n_objects_copied));
+
+  return TRUE;
+}
+
 gboolean
 ostree_builtin_pull_local (int argc, char **argv, GFile *repo_path, GError **error)
 {
@@ -108,18 +164,19 @@ ostree_builtin_pull_local (int argc, char **argv, GFile *repo_path, GError **err
   ot_lhash GHashTable *refs_to_clone = NULL;
   ot_lhash GHashTable *source_objects = NULL;
   ot_lhash GHashTable *objects_to_copy = NULL;
-  OtLocalCloneData data;
+  OtLocalCloneData datav;
+  OtLocalCloneData *data = &datav;
 
   context = g_option_context_new ("SRC_REPO [REFS...] -  Copy data from SRC_REPO");
   g_option_context_add_main_entries (context, options, NULL);
 
-  memset (&data, 0, sizeof (data));
+  memset (&datav, 0, sizeof (datav));
 
   if (!g_option_context_parse (context, &argc, &argv, error))
     goto out;
 
-  data.dest_repo = ostree_repo_new (repo_path);
-  if (!ostree_repo_check (data.dest_repo, error))
+  data->dest_repo = ostree_repo_new (repo_path);
+  if (!ostree_repo_check (data->dest_repo, error))
     goto out;
 
   if (argc < 2)
@@ -135,16 +192,20 @@ ostree_builtin_pull_local (int argc, char **argv, GFile *repo_path, GError **err
   src_repo_path = argv[1];
   src_f = g_file_new_for_path (src_repo_path);
 
-  data.src_repo = ostree_repo_new (src_f);
-  if (!ostree_repo_check (data.src_repo, error))
+  data->src_repo = ostree_repo_new (src_f);
+  if (!ostree_repo_check (data->src_repo, error))
     goto out;
 
-  src_repo_dir = g_object_ref (ostree_repo_get_path (data.src_repo));
-  dest_repo_dir = g_object_ref (ostree_repo_get_path (data.dest_repo));
+  data->threadpool = ot_thread_pool_new_nproc (import_one_object_thread, data);
+  data->loop = g_main_loop_new (NULL, TRUE);
+  data->stdout_is_tty = isatty (1);
+
+  src_repo_dir = g_object_ref (ostree_repo_get_path (data->src_repo));
+  dest_repo_dir = g_object_ref (ostree_repo_get_path (data->dest_repo));
 
   if (argc == 2)
     {
-      if (!ostree_repo_list_all_refs (data.src_repo, &refs_to_clone, cancellable, error))
+      if (!ostree_repo_list_all_refs (data->src_repo, &refs_to_clone, cancellable, error))
         goto out;
     }
   else
@@ -155,7 +216,7 @@ ostree_builtin_pull_local (int argc, char **argv, GFile *repo_path, GError **err
           const char *ref = argv[i];
           char *rev;
           
-          if (!ostree_repo_resolve_rev (data.src_repo, ref, FALSE, &rev, error))
+          if (!ostree_repo_resolve_rev (data->src_repo, ref, FALSE, &rev, error))
             goto out;
           
           /* Transfer ownership of rev */
@@ -172,47 +233,39 @@ ostree_builtin_pull_local (int argc, char **argv, GFile *repo_path, GError **err
     {
       const char *checksum = value;
 
-      if (!ostree_traverse_commit (data.src_repo, checksum, 0, source_objects, cancellable, error))
+      if (!ostree_traverse_commit (data->src_repo, checksum, 0, source_objects, cancellable, error))
         goto out;
     }
 
-  objects_to_copy = ostree_traverse_new_reachable ();
+  if (!ostree_repo_prepare_transaction (data->dest_repo, FALSE, cancellable, error))
+    goto out;
+
   g_hash_table_iter_init (&hash_iter, source_objects);
   while (g_hash_table_iter_next (&hash_iter, &key, &value))
     {
       GVariant *serialized_key = key;
-      gboolean has_object;
-      const char *checksum;
-      OstreeObjectType objtype;
-
-      ostree_object_name_deserialize (serialized_key, &checksum, &objtype);
-
-      if (!ostree_repo_has_object (data.dest_repo, objtype, checksum, &has_object,
-                                   cancellable, error))
-        goto out;
-      if (!has_object)
-        g_hash_table_insert (objects_to_copy, g_variant_ref (serialized_key), serialized_key);
+      data->n_objects_to_check++;
+      g_thread_pool_push (data->threadpool, g_variant_ref (serialized_key), NULL);
     }
 
-  g_print ("%u objects to copy\n", g_hash_table_size (objects_to_copy));
-
-  if (!ostree_repo_prepare_transaction (data.dest_repo, FALSE, cancellable, error))
-    goto out;
-  
-  g_hash_table_iter_init (&hash_iter, objects_to_copy);
-  while (g_hash_table_iter_next (&hash_iter, &key, &value))
+  if (data->n_objects_to_check > 0)
     {
-      GVariant *serialized_key = key;
-      const char *checksum;
-      OstreeObjectType objtype;
-
-      ostree_object_name_deserialize (serialized_key, &checksum, &objtype);
+      if (data->stdout_is_tty)
+        {
+          g_print ("%c7", 0x1B);
+          g_timeout_add_seconds (1, idle_print_status, data);
+          idle_print_status (data);
+        }
+      
+      g_main_loop_run (data->loop);
 
-      if (!import_one_object (&data, checksum, objtype, cancellable, error))
-        goto out;
+      idle_print_status (data);
+      
+      if (data->stdout_is_tty)
+        g_print ("\n");
     }
 
-  if (!ostree_repo_commit_transaction (data.dest_repo, NULL, error))
+  if (!ostree_repo_commit_transaction (data->dest_repo, NULL, error))
     goto out;
 
   g_print ("Writing %u refs\n", g_hash_table_size (refs_to_clone));
@@ -223,16 +276,18 @@ ostree_builtin_pull_local (int argc, char **argv, GFile *repo_path, GError **err
       const char *name = key;
       const char *checksum = value;
 
-      if (!ostree_repo_write_ref (data.dest_repo, NULL, name, checksum, error))
+      if (!ostree_repo_write_ref (data->dest_repo, NULL, name, checksum, error))
         goto out;
     }
 
   ret = TRUE;
  out:
-  if (data.src_repo)
-    g_object_unref (data.src_repo);
-  if (data.dest_repo)
-    g_object_unref (data.dest_repo);
+  g_clear_pointer (&data->threadpool, (GDestroyNotify) g_thread_pool_free);
+  g_clear_pointer (&data->loop, (GDestroyNotify) g_main_loop_unref);
+  if (data->src_repo)
+    g_object_unref (data->src_repo);
+  if (data->dest_repo)
+    g_object_unref (data->dest_repo);
   if (context)
     g_option_context_free (context);
   return ret;



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]