[ostree/wip/delta2: 20/21] delta: Write content directly (and async), not via tmpfiles



commit e835aef223a79b96579efdeb2d899931e18d6bf0
Author: Colin Walters <walters verbum org>
Date:   Mon Apr 28 18:43:52 2014 -0400

    delta: Write content directly (and async), not via tmpfiles
    
    This way we avoid the speed hit of writing a potentially large
    tempfile, only to read it and write it again.
    
    Here we're also doing the content writes in parallel.

 .../ostree-repo-static-delta-processing.c          |  195 ++++++++++++++------
 1 files changed, 139 insertions(+), 56 deletions(-)
---
diff --git a/src/libostree/ostree-repo-static-delta-processing.c 
b/src/libostree/ostree-repo-static-delta-processing.c
index afbaa3b..7e7c1d9 100644
--- a/src/libostree/ostree-repo-static-delta-processing.c
+++ b/src/libostree/ostree-repo-static-delta-processing.c
@@ -22,6 +22,10 @@
 
 #include <string.h>
 
+#include <glib-unix.h>
+#include <gio/gunixinputstream.h>
+#include <gio/gunixoutputstream.h>
+
 #include "ostree-repo-private.h"
 #include "ostree-repo-static-delta-private.h"
 #include "ostree-lzma-decompressor.h"
@@ -32,6 +36,7 @@
 G_STATIC_ASSERT (sizeof (guint) >= sizeof (guint32));
 
 typedef struct {
+  OstreeRepo     *repo;
   guint           checksum_index;
   const guint8   *checksums;
   guint           n_checksums;
@@ -40,7 +45,10 @@ typedef struct {
   guint           oplen;
   
   gboolean        object_start;
-  guint64         object_size;
+  guint           outstanding_content_writes;
+  GMainContext   *content_writing_context;
+  gboolean        caught_error;
+  GError        **async_error;
 
   OstreeObjectType output_objtype;
   const guint8   *output_target;
@@ -52,6 +60,11 @@ typedef struct {
   guint64         payload_size; 
 } StaticDeltaExecutionState;
 
+typedef struct {
+  StaticDeltaExecutionState *state;
+  char checksum[65];
+} StaticDeltaContentWrite;
+
 typedef gboolean (*DispatchOpFunc) (OstreeRepo                 *repo,
                                     StaticDeltaExecutionState  *state,
                                     GCancellable               *cancellable,
@@ -80,14 +93,38 @@ static OstreeStaticDeltaOperation op_dispatch_table[] = {
   { NULL }
 };
 
+static void
+on_content_written (GObject          *src,
+                    GAsyncResult     *result,
+                    gpointer          user_data);
+
 static gboolean
-open_output_target_csum (OstreeRepo                  *repo,
-                         StaticDeltaExecutionState   *state,
-                         GCancellable                *cancellable,
-                         GError                     **error)
+read_varuint64 (StaticDeltaExecutionState  *state,
+                guint64                    *out_value,
+                GError                    **error)
+{
+  gsize bytes_read;
+  if (!_ostree_read_varuint64 (state->opdata, state->oplen, out_value, &bytes_read))
+    {
+      g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_FAILED,
+                           "Unexpected EOF reading varint");
+      return FALSE;
+    }
+  state->opdata += bytes_read;
+  state->oplen -= bytes_read;
+  return TRUE;
+}
+
+static gboolean
+open_output_target (StaticDeltaExecutionState   *state,
+                    GCancellable                *cancellable,
+                    GError                     **error)
 {
   gboolean ret = FALSE;
   guint8 *objcsum;
+  char checksum[65];
+  guint64 object_size;
+  gs_unref_object GInputStream *content_in_stream = NULL;
 
   g_assert (state->checksums != NULL);
   g_assert (state->output_target == NULL);
@@ -102,11 +139,51 @@ open_output_target_csum (OstreeRepo                  *repo,
 
   state->output_objtype = (OstreeObjectType) *objcsum;
   state->output_target = objcsum + 1;
-  if (!gs_file_open_in_tmpdir (repo->tmp_dir, 0644,
-                               &state->output_tmp_path, &state->output_tmp_stream,
-                               cancellable, error))
+
+  ostree_checksum_inplace_from_bytes (state->output_target, checksum);
+
+  /* Object size is the first element of the opstream */
+  if (!read_varuint64 (state, &object_size, error))
     goto out;
 
+  if (OSTREE_OBJECT_TYPE_IS_META (state->output_objtype))
+    {
+      if (!gs_file_open_in_tmpdir (state->repo->tmp_dir, 0644,
+                                   &state->output_tmp_path, &state->output_tmp_stream,
+                                   cancellable, error))
+        goto out;
+    }
+  else
+    {
+      int pipefds[2];
+
+      if (!g_unix_open_pipe (pipefds, FD_CLOEXEC, error))
+        goto out;
+
+      content_in_stream = g_unix_input_stream_new (pipefds[0], TRUE);
+      state->output_tmp_stream = g_unix_output_stream_new (pipefds[1], TRUE);
+      
+      if (!state->content_writing_context)
+        state->content_writing_context = g_main_context_new();
+      
+      g_main_context_push_thread_default (state->content_writing_context);
+      
+      {
+        StaticDeltaContentWrite *writedata = g_new0 (StaticDeltaContentWrite, 1);
+        writedata->state = state;
+        memcpy (writedata->checksum, checksum, sizeof (writedata->checksum));
+        ostree_repo_write_content_async (state->repo, checksum,
+                                         content_in_stream,
+                                         object_size,
+                                         cancellable,
+                                         on_content_written,
+                                         writedata);
+      }
+      state->outstanding_content_writes++;
+      
+      g_main_context_pop_thread_default (state->content_writing_context);
+    }
+
   ret = TRUE;
  out:
   return ret;
@@ -147,23 +224,6 @@ _ostree_static_delta_part_validate (OstreeRepo     *repo,
   return ret;
 }
 
-static gboolean
-read_varuint64 (StaticDeltaExecutionState  *state,
-                guint64                    *out_value,
-                GError                    **error)
-{
-  gsize bytes_read;
-  if (!_ostree_read_varuint64 (state->opdata, state->oplen, out_value, &bytes_read))
-    {
-      g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_FAILED,
-                           "Unexpected EOF reading varint");
-      return FALSE;
-    }
-  state->opdata += bytes_read;
-  state->oplen -= bytes_read;
-  return TRUE;
-}
-
 gboolean
 _ostree_static_delta_part_execute_raw (OstreeRepo      *repo,
                                        GVariant        *objects,
@@ -180,6 +240,9 @@ _ostree_static_delta_part_execute_raw (OstreeRepo      *repo,
   StaticDeltaExecutionState *state = &statedata;
   guint n_executed = 0;
 
+  state->repo = repo;
+  state->async_error = error;
+
   if (!_ostree_static_delta_parse_checksum_array (objects,
                                                   &checksums_data,
                                                   &state->n_checksums,
@@ -188,8 +251,6 @@ _ostree_static_delta_part_execute_raw (OstreeRepo      *repo,
 
   state->checksums = checksums_data;
   g_assert (state->n_checksums > 0);
-  if (!open_output_target_csum (repo, state, cancellable, error))
-    goto out;
 
   g_variant_get (part, "(@ay ay)", &payload, &ops);
 
@@ -206,7 +267,7 @@ _ostree_static_delta_part_execute_raw (OstreeRepo      *repo,
 
       if (state->object_start)
         {
-          if (!read_varuint64 (state, &state->object_size, error))
+          if (!open_output_target (state, cancellable, error))
             goto out;
           state->object_start = FALSE;
         }
@@ -229,8 +290,17 @@ _ostree_static_delta_part_execute_raw (OstreeRepo      *repo,
       n_executed++;
     }
 
+  while (state->outstanding_content_writes > 0)
+    g_main_context_iteration (state->content_writing_context, TRUE);
+
+  if (state->caught_error)
+    goto out;
+
   ret = TRUE;
  out:
+  g_clear_pointer (&state->content_writing_context, g_main_context_unref);
+  g_clear_object (&state->output_tmp_path);
+  g_clear_object (&state->output_tmp_stream);
   return ret;
 }
 
@@ -323,7 +393,7 @@ _ostree_static_delta_part_execute (OstreeRepo      *repo,
   if (!_ostree_static_delta_part_execute_raw (repo, header, payload,
                                               cancellable, error))
     goto out;
-  
+
   ret = TRUE;
  out:
   return ret;
@@ -421,7 +491,41 @@ validate_ofs (StaticDeltaExecutionState  *state,
     }
   return TRUE;
 }
-  
+
+static void
+on_content_written (GObject          *src,
+                    GAsyncResult     *result,
+                    gpointer          user_data)
+{
+  StaticDeltaContentWrite *writedata = user_data;
+  StaticDeltaExecutionState *state = writedata->state;
+  GError *local_error = NULL;
+  GError **error = &local_error;
+
+  if (!ostree_repo_write_content_finish ((OstreeRepo*)src, result, NULL, error))
+    goto out;
+
+  g_print ("Wrote content object '%s'\n", writedata->checksum);
+
+ out:
+  state->outstanding_content_writes--;
+  if (state->outstanding_content_writes == 0)
+    g_main_context_wakeup (state->content_writing_context);
+  if (local_error)
+    {
+      if (!state->caught_error)
+        {
+          state->caught_error = TRUE;
+          g_main_context_wakeup (state->content_writing_context);
+          g_propagate_error (state->async_error, local_error);
+        }
+      else
+        {
+          g_error_free (local_error);
+        }
+    }
+}
+
 static gboolean
 dispatch_write (OstreeRepo                 *repo,
                 StaticDeltaExecutionState  *state,
@@ -521,7 +625,6 @@ dispatch_close (OstreeRepo                 *repo,
     }
 
   g_assert (state->output_tmp_stream);
-  g_assert (state->output_tmp_path);
 
   if (!g_output_stream_close (state->output_tmp_stream, cancellable, error))
     goto out;
@@ -533,6 +636,8 @@ dispatch_close (OstreeRepo                 *repo,
   if (OSTREE_OBJECT_TYPE_IS_META (state->output_objtype))
     {
       gs_unref_variant GVariant *metadata = NULL;
+      
+      g_assert (state->output_tmp_path);
 
       if (!ot_util_variant_map (state->output_tmp_path,
                                 ostree_metadata_variant_type (state->output_objtype),
@@ -548,38 +653,16 @@ dispatch_close (OstreeRepo                 *repo,
     }
   else
     {
-      gs_unref_object GInputStream *in = NULL;
-      gs_unref_object GFileInfo *info = NULL;
-
-      in = (GInputStream*)g_file_read (state->output_tmp_path, cancellable, error);
-      if (!in)
-        goto out;
-
-      info = g_file_input_stream_query_info ((GFileInputStream*)in, G_FILE_ATTRIBUTE_STANDARD_SIZE,
-                                             cancellable, error);
-      if (!info)
-        goto out;
-      
-      if (!ostree_repo_write_content (repo, tmp_checksum, in,
-                                      g_file_info_get_size (info), NULL,
-                                      cancellable, error))
-        goto out;
-
-      g_print ("Wrote content object '%s'\n",
-               tmp_checksum);
+      /* We already have an async write going, the close() above will
+       * ensure it completes.
+       */
     }
 
   state->output_target = NULL;
   g_clear_object (&state->output_tmp_path);
 
   state->object_start = TRUE;
-
   state->checksum_index++;
-  if (state->checksum_index < state->n_checksums)
-    {
-      if (!open_output_target_csum (repo, state, cancellable, error))
-        goto out;
-    }
       
   ret = TRUE;
  out:


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]