[ostree] pull: Stage content asynchronously



commit 9618232f4da325692dcf98fd6ff5b8abd9fce66c
Author: Colin Walters <walters verbum org>
Date:   Thu Oct 4 20:00:00 2012 -0400

    pull: Stage content asynchronously
    
    For similar reasons as metadata, this avoids having the main thread
    blocked in fdatasync(), and even better - we can achieve much higher
    parallelism if we have multiple threads blocked on fdatasync().

 src/libostree/ostree-repo.c |   95 +++++++++++++++++++++++++++++++++
 src/libostree/ostree-repo.h |   15 +++++-
 src/ostree/ostree-fetcher.c |    9 +++
 src/ostree/ostree-fetcher.h |    2 +
 src/ostree/ostree-pull.c    |  121 ++++++++++++++++---------------------------
 5 files changed, 164 insertions(+), 78 deletions(-)
---
diff --git a/src/libostree/ostree-repo.c b/src/libostree/ostree-repo.c
index 8840887..ac3a3dc 100644
--- a/src/libostree/ostree-repo.c
+++ b/src/libostree/ostree-repo.c
@@ -1558,6 +1558,101 @@ ostree_repo_stage_content (OstreeRepo       *self,
                        cancellable, error);
 }
 
+typedef struct {
+  OstreeRepo *repo;
+  char *expected_checksum;
+  GInputStream *object;
+  guint64 file_object_length;
+  GCancellable *cancellable;
+  GSimpleAsyncResult *result;
+  
+  guchar *result_csum;
+} StageContentAsyncData;
+
+static void
+stage_content_async_data_free (gpointer user_data)
+{
+  StageContentAsyncData *data = user_data;
+
+  g_clear_object (&data->repo);
+  g_clear_object (&data->cancellable);
+  g_clear_object (&data->object);
+  g_free (data->result_csum);
+  g_free (data->expected_checksum);
+  g_free (data);
+}
+
+static void
+stage_content_thread (GSimpleAsyncResult  *res,
+                      GObject             *object,
+                      GCancellable        *cancellable)
+{
+  GError *error = NULL;
+  StageContentAsyncData *data;
+
+  data = g_simple_async_result_get_op_res_gpointer (res);
+  if (!ostree_repo_stage_content (data->repo, data->expected_checksum,
+                                  data->object, data->file_object_length,
+                                  &data->result_csum,
+                                  cancellable, &error))
+    g_simple_async_result_take_error (res, error);
+}
+
+/**
+ * ostree_repo_stage_content_async:
+ * 
+ * Asynchronously store the content object @object.  If provided,
+ * the checksum @expected_checksum will be verified.
+ */
+void          
+ostree_repo_stage_content_async (OstreeRepo               *self,
+                                 const char               *expected_checksum,
+                                 GInputStream             *object,
+                                 guint64                   file_object_length,
+                                 GCancellable             *cancellable,
+                                 GAsyncReadyCallback       callback,
+                                 gpointer                  user_data)
+{
+  StageContentAsyncData *asyncdata;
+
+  asyncdata = g_new0 (StageContentAsyncData, 1);
+  asyncdata->repo = g_object_ref (self);
+  asyncdata->expected_checksum = g_strdup (expected_checksum);
+  asyncdata->object = g_object_ref (object);
+  asyncdata->file_object_length = file_object_length;
+  asyncdata->cancellable = cancellable ? g_object_ref (cancellable) : NULL;
+
+  asyncdata->result = g_simple_async_result_new ((GObject*) self,
+                                                 callback, user_data,
+                                                 ostree_repo_stage_content_async);
+
+  g_simple_async_result_set_op_res_gpointer (asyncdata->result, asyncdata,
+                                             stage_content_async_data_free);
+  g_simple_async_result_run_in_thread (asyncdata->result, stage_content_thread, G_PRIORITY_DEFAULT, cancellable);
+  g_object_unref (asyncdata->result);
+}
+
+gboolean
+ostree_repo_stage_content_finish (OstreeRepo        *self,
+                                  GAsyncResult      *result,
+                                  guchar           **out_csum,
+                                  GError           **error)
+{
+  GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
+  StageContentAsyncData *data;
+
+  g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == ostree_repo_stage_content_async);
+
+  if (g_simple_async_result_propagate_error (simple, error))
+    return FALSE;
+
+  data = g_simple_async_result_get_op_res_gpointer (simple);
+  /* Transfer ownership */
+  *out_csum = data->result_csum;
+  data->result_csum = NULL;
+  return TRUE;
+}
+
 static GVariant *
 create_empty_gvariant_dict (void)
 {
diff --git a/src/libostree/ostree-repo.h b/src/libostree/ostree-repo.h
index 62f51d3..2f7ac7a 100644
--- a/src/libostree/ostree-repo.h
+++ b/src/libostree/ostree-repo.h
@@ -113,7 +113,7 @@ void          ostree_repo_stage_metadata_async (OstreeRepo              *self,
 
 gboolean      ostree_repo_stage_metadata_finish (OstreeRepo        *self,
                                                  GAsyncResult      *result,
-                                                 guchar           **out_checksum,
+                                                 guchar           **out_csum,
                                                  GError           **error);
 
 gboolean      ostree_repo_stage_content (OstreeRepo       *self,
@@ -138,6 +138,19 @@ gboolean      ostree_repo_stage_content_trusted (OstreeRepo       *self,
                                                  GCancellable     *cancellable,
                                                  GError          **error);
 
+void          ostree_repo_stage_content_async (OstreeRepo              *self,
+                                               const char              *expected_checksum,
+                                               GInputStream            *object,
+                                               guint64                  file_object_length,
+                                               GCancellable            *cancellable,
+                                               GAsyncReadyCallback      callback,
+                                               gpointer                 user_data);
+
+gboolean      ostree_repo_stage_content_finish (OstreeRepo        *self,
+                                                GAsyncResult      *result,
+                                                guchar           **out_csum,
+                                                GError           **error);
+
 gboolean      ostree_repo_resolve_rev (OstreeRepo  *self,
                                        const char  *rev,
                                        gboolean     allow_noent,
diff --git a/src/ostree/ostree-fetcher.c b/src/ostree/ostree-fetcher.c
index bf6421a..6d4b5db 100644
--- a/src/ostree/ostree-fetcher.c
+++ b/src/ostree/ostree-fetcher.c
@@ -82,6 +82,7 @@ struct OstreeFetcher
   GHashTable *message_to_request; /* SoupMessage -> SoupRequest */
   
   guint64 total_downloaded;
+  guint total_requests;
 };
 
 G_DEFINE_TYPE (OstreeFetcher, ostree_fetcher, G_TYPE_OBJECT)
@@ -243,6 +244,8 @@ ostree_fetcher_request_uri_async (OstreeFetcher         *self,
   OstreeFetcherPendingURI *pending;
   GError *local_error = NULL;
 
+  self->total_requests++;
+
   pending = g_new0 (OstreeFetcherPendingURI, 1);
   pending->refcount = 1;
   pending->self = g_object_ref (self);
@@ -352,3 +355,9 @@ ostree_fetcher_bytes_transferred (OstreeFetcher       *self)
 {
   return self->total_downloaded;
 }
+
+guint
+ostree_fetcher_get_n_requests (OstreeFetcher       *self)
+{
+  return self->total_requests;
+}
diff --git a/src/ostree/ostree-fetcher.h b/src/ostree/ostree-fetcher.h
index 803db72..4f6897e 100644
--- a/src/ostree/ostree-fetcher.h
+++ b/src/ostree/ostree-fetcher.h
@@ -51,6 +51,8 @@ char * ostree_fetcher_query_state_text (OstreeFetcher              *self);
 
 guint64 ostree_fetcher_bytes_transferred (OstreeFetcher       *self);
 
+guint ostree_fetcher_get_n_requests (OstreeFetcher       *self);
+
 void ostree_fetcher_request_uri_async (OstreeFetcher         *self,
                                        SoupURI               *uri,
                                        GCancellable          *cancellable,
diff --git a/src/ostree/ostree-pull.c b/src/ostree/ostree-pull.c
index b189c0b..ccbb003 100644
--- a/src/ostree/ostree-pull.c
+++ b/src/ostree/ostree-pull.c
@@ -109,7 +109,9 @@ typedef struct {
   guint         n_fetched_content;
   guint         outstanding_filemeta_requests;
   guint         outstanding_filecontent_requests;
-  guint         outstanding_checksum_requests;
+  guint         outstanding_content_stage_requests;
+
+  guint64       previous_total_downloaded;
 
   GError      **async_error;
   gboolean      caught_error;
@@ -185,18 +187,31 @@ uri_fetch_update_status (gpointer user_data)
   OtPullData *pull_data = user_data;
   ot_lfree char *fetcher_status;
   GString *status;
+  guint64 current_bytes_transferred;
+  guint64 delta_bytes_transferred;
  
   status = g_string_new ("");
 
-  g_string_append_printf (status, "%u/%u metadata %u/%u content fetched; ",
+  if (pull_data->metadata_scan_active)
+    g_string_append_printf (status, "scan: %u metadata; ",
+                            g_atomic_int_get (&pull_data->n_scanned_metadata));
+
+  g_string_append_printf (status, "fetch: %u/%u metadata %u/%u content; ",
                           g_atomic_int_get (&pull_data->n_fetched_metadata),
                           g_atomic_int_get (&pull_data->n_requested_metadata),
                           pull_data->n_fetched_content,
                           g_atomic_int_get (&pull_data->n_requested_content));
 
-  if (pull_data->outstanding_checksum_requests > 0)
-    g_string_append_printf (status, "Calculating %u checksums; ",
-                            pull_data->outstanding_checksum_requests);
+  current_bytes_transferred = ostree_fetcher_bytes_transferred (pull_data->fetcher);
+  delta_bytes_transferred = current_bytes_transferred - pull_data->previous_total_downloaded;
+  pull_data->previous_total_downloaded = current_bytes_transferred;
+
+  if (delta_bytes_transferred < 1024)
+    g_string_append_printf (status, "%u B/s; ", 
+                            (guint)delta_bytes_transferred);
+  else
+    g_string_append_printf (status, "%.1f KiB/s; ", 
+                            (double)delta_bytes_transferred / 1024);
 
   fetcher_status = ostree_fetcher_query_state_text (pull_data->fetcher);
   g_string_append (status, fetcher_status);
@@ -245,7 +260,7 @@ check_outstanding_requests_handle_error (OtPullData          *pull_data,
       pull_data->outstanding_uri_requests == 0 &&
       pull_data->outstanding_filemeta_requests == 0 &&
       pull_data->outstanding_filecontent_requests == 0 &&
-      pull_data->outstanding_checksum_requests == 0)
+      pull_data->outstanding_content_stage_requests == 0)
     g_main_loop_quit (pull_data->loop);
   throw_async_error (pull_data, error);
 }
@@ -507,79 +522,27 @@ destroy_fetch_one_content_item_data (OtFetchOneContentItemData *data)
 }
 
 static void
-content_fetch_on_checksum_complete (GObject        *object,
-                                    GAsyncResult   *result,
-                                    gpointer        user_data)
+content_fetch_on_stage_complete (GObject        *object,
+                                 GAsyncResult   *result,
+                                 gpointer        user_data)
 {
   OtFetchOneContentItemData *data = user_data;
   GError *local_error = NULL;
   GError **error = &local_error;
-  guint64 length;
-  GCancellable *cancellable = NULL;
-  gboolean compressed;
-  ot_lfree guchar *csum;
-  ot_lvariant GVariant *file_meta = NULL;
-  ot_lobj GFileInfo *file_info = NULL;
-  ot_lvariant GVariant *xattrs = NULL;
-  ot_lobj GInputStream *content_input = NULL;
-  ot_lobj GInputStream *file_object_input = NULL;
-  ot_lfree char *checksum;
+  ot_lfree guchar *csum = NULL;
+  ot_lfree char *checksum = NULL;
 
-  csum = ot_gio_checksum_stream_finish ((GInputStream*)object, result, error);
-  if (!csum)
+  if (!ostree_repo_stage_content_finish ((OstreeRepo*)object, result, 
+                                         &csum, error))
     goto out;
 
   checksum = ostree_checksum_from_bytes (csum);
 
-  if (strcmp (checksum, data->checksum) != 0)
-    {
-      g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED,
-                   "Corrupted object %s (actual checksum is %s)",
-                   data->checksum, checksum);
-      goto out;
-    }
-
-  compressed = data->pull_data->remote_mode == OSTREE_REPO_MODE_ARCHIVE_Z;
-
-  if (compressed)
-    {
-      content_input = (GInputStream*)g_file_read (data->content_path, cancellable, error);
-      if (!content_input)
-        goto out;
-      if (!ostree_zlib_content_stream_open (content_input, &length, &file_object_input, 
-                                            cancellable, error))
-        goto out;
-    }
-  else
-    {
-      if (!ot_util_variant_map (data->meta_path, OSTREE_FILE_HEADER_GVARIANT_FORMAT, TRUE,
-                                &file_meta, error))
-        goto out;
-  
-      if (!ostree_file_header_parse (file_meta, &file_info, &xattrs, error))
-        goto out;
-
-      if (data->content_path)
-        {
-          content_input = (GInputStream*)g_file_read (data->content_path, cancellable, error);
-          if (!content_input)
-            goto out;
-        }
-      
-      if (!ostree_raw_file_to_content_stream (content_input, file_info, xattrs,
-                                              &file_object_input, &length,
-                                              cancellable, error))
-        goto out;
-    }
-
-  if (!ostree_repo_stage_content_trusted (data->pull_data->repo, checksum,
-                                          file_object_input, length,
-                                          cancellable, error))
-    goto out;
+  g_assert (strcmp (checksum, data->checksum) == 0);
 
   data->pull_data->n_fetched_content++;
  out:
-  data->pull_data->outstanding_checksum_requests--;
+  data->pull_data->outstanding_content_stage_requests--;
   check_outstanding_requests_handle_error (data->pull_data, local_error);
   destroy_fetch_one_content_item_data (data);
 }
@@ -623,6 +586,7 @@ content_fetch_on_complete (GObject        *object,
   GCancellable *cancellable = NULL;
   gboolean was_content_fetch = FALSE;
   gboolean need_content_fetch = FALSE;
+  guint64 length;
   ot_lvariant GVariant *file_meta = NULL;
   ot_lobj GFileInfo *file_info = NULL;
   ot_lobj GInputStream *content_input = NULL;
@@ -674,20 +638,21 @@ content_fetch_on_complete (GObject        *object,
   if (!need_content_fetch && compressed)
     {
       ot_lobj GInputStream *uncomp_input = NULL;
-      guint64 uncompressed_len;
 
       g_assert (data->content_path != NULL);
       content_input = (GInputStream*)g_file_read (data->content_path, cancellable, error);
       if (!content_input)
         goto out;
 
-      if (!ostree_zlib_content_stream_open (content_input, &uncompressed_len, &uncomp_input, 
+      if (!ostree_zlib_content_stream_open (content_input, &length, &uncomp_input, 
                                             cancellable, error))
         goto out;
-      
-      data->pull_data->outstanding_checksum_requests++;
-      ot_gio_checksum_stream_async (uncomp_input, G_PRIORITY_DEFAULT, NULL,
-                                    content_fetch_on_checksum_complete, data);
+
+      data->pull_data->outstanding_content_stage_requests++;
+      ostree_repo_stage_content_async (data->pull_data->repo, data->checksum,
+                                       uncomp_input, length,
+                                       cancellable,
+                                       content_fetch_on_stage_complete, data);
     }
   else if (!need_content_fetch)
     {
@@ -709,13 +674,15 @@ content_fetch_on_complete (GObject        *object,
         }
 
       if (!ostree_raw_file_to_content_stream (content_input, file_info, xattrs,
-                                              &file_object_input, NULL,
+                                              &file_object_input, &length,
                                               cancellable, error))
         goto out;
 
-      data->pull_data->outstanding_checksum_requests++;
-      ot_gio_checksum_stream_async (file_object_input, G_PRIORITY_DEFAULT, NULL,
-                                    content_fetch_on_checksum_complete, data);
+      data->pull_data->outstanding_content_stage_requests++;
+      ostree_repo_stage_content_async (data->pull_data->repo, data->checksum,
+                                       file_object_input, length,
+                                       cancellable,
+                                       content_fetch_on_stage_complete, data);
     }
 
   while (data->pull_data->outstanding_filemeta_requests < 10)



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]