[ostree] deltas: Drop async content writes



commit 6d1de23f878611e9de6a9a20e2f02d818a60edb0
Author: Colin Walters <walters verbum org>
Date:   Tue Jan 20 23:21:26 2015 -0500

    deltas: Drop async content writes
    
    This caused deadlocks and/or EMFILE due to the interaction between
    threads and fds.  What we really want here is a better pull-based
    model for parsing content objects.
    
    Another idea would be to change static deltas so that content objects
    have a special opcode that includes their metadata first, and then do
    rollsums etc. only over actual content.

 .../ostree-repo-static-delta-processing.c          |  116 +++++---------------
 1 files changed, 28 insertions(+), 88 deletions(-)
---
diff --git a/src/libostree/ostree-repo-static-delta-processing.c 
b/src/libostree/ostree-repo-static-delta-processing.c
index 703a52e..a5ad5fe 100644
--- a/src/libostree/ostree-repo-static-delta-processing.c
+++ b/src/libostree/ostree-repo-static-delta-processing.c
@@ -25,6 +25,7 @@
 #include <glib-unix.h>
 #include <gio/gunixinputstream.h>
 #include <gio/gunixoutputstream.h>
+#include <gio/gfiledescriptorbased.h>
 
 #include "ostree-repo-private.h"
 #include "ostree-repo-static-delta-private.h"
@@ -45,8 +46,6 @@ typedef struct {
   guint           oplen;
   
   gboolean        object_start;
-  guint           outstanding_content_writes;
-  GMainContext   *content_writing_context;
   gboolean        caught_error;
   GError        **async_error;
 
@@ -93,11 +92,6 @@ static OstreeStaticDeltaOperation op_dispatch_table[] = {
   { NULL }
 };
 
-static void
-on_content_written (GObject          *src,
-                    GAsyncResult     *result,
-                    gpointer          user_data);
-
 static gboolean
 read_varuint64 (StaticDeltaExecutionState  *state,
                 guint64                    *out_value,
@@ -146,43 +140,10 @@ open_output_target (StaticDeltaExecutionState   *state,
   if (!read_varuint64 (state, &object_size, error))
     goto out;
 
-  if (OSTREE_OBJECT_TYPE_IS_META (state->output_objtype))
-    {
-      if (!gs_file_open_in_tmpdir_at (state->repo->tmp_dir_fd, 0644,
-                                      &state->output_tmp_path, &state->output_tmp_stream,
-                                      cancellable, error))
-        goto out;
-    }
-  else
-    {
-      int pipefds[2];
-
-      if (!g_unix_open_pipe (pipefds, FD_CLOEXEC, error))
-        goto out;
-
-      content_in_stream = g_unix_input_stream_new (pipefds[0], TRUE);
-      state->output_tmp_stream = g_unix_output_stream_new (pipefds[1], TRUE);
-      
-      if (!state->content_writing_context)
-        state->content_writing_context = g_main_context_new();
-      
-      g_main_context_push_thread_default (state->content_writing_context);
-      
-      {
-        StaticDeltaContentWrite *writedata = g_new0 (StaticDeltaContentWrite, 1);
-        writedata->state = state;
-        memcpy (writedata->checksum, checksum, sizeof (writedata->checksum));
-        ostree_repo_write_content_async (state->repo, checksum,
-                                         content_in_stream,
-                                         object_size,
-                                         cancellable,
-                                         on_content_written,
-                                         writedata);
-      }
-      state->outstanding_content_writes++;
-      
-      g_main_context_pop_thread_default (state->content_writing_context);
-    }
+  if (!gs_file_open_in_tmpdir_at (state->repo->tmp_dir_fd, 0644,
+                                  &state->output_tmp_path, &state->output_tmp_stream,
+                                  cancellable, error))
+    goto out;
 
   ret = TRUE;
  out:
@@ -265,11 +226,6 @@ _ostree_static_delta_part_execute_raw (OstreeRepo      *repo,
       guint8 opcode;
       OstreeStaticDeltaOperation *op;
 
-      /* Limit the number of outstanding writes to 1 to prevent too many open files
-         at the same time.  */
-      while (state->outstanding_content_writes > 1)
-        g_main_context_iteration (state->content_writing_context, TRUE);
-
       if (state->object_start)
         {
           if (!open_output_target (state, cancellable, error))
@@ -294,15 +250,11 @@ _ostree_static_delta_part_execute_raw (OstreeRepo      *repo,
       n_executed++;
     }
 
-  while (state->outstanding_content_writes > 0)
-    g_main_context_iteration (state->content_writing_context, TRUE);
-
   if (state->caught_error)
     goto out;
 
   ret = TRUE;
  out:
-  g_clear_pointer (&state->content_writing_context, g_main_context_unref);
   g_clear_pointer (&state->output_tmp_path, g_free);
   g_clear_object (&state->output_tmp_stream);
   return ret;
@@ -496,38 +448,6 @@ validate_ofs (StaticDeltaExecutionState  *state,
   return TRUE;
 }
 
-static void
-on_content_written (GObject          *src,
-                    GAsyncResult     *result,
-                    gpointer          user_data)
-{
-  StaticDeltaContentWrite *writedata = user_data;
-  StaticDeltaExecutionState *state = writedata->state;
-  GError *local_error = NULL;
-  GError **error = &local_error;
-
-  if (!ostree_repo_write_content_finish ((OstreeRepo*)src, result, NULL, error))
-    goto out;
-
- out:
-  state->outstanding_content_writes--;
-  if (state->outstanding_content_writes == 0)
-    g_main_context_wakeup (state->content_writing_context);
-  if (local_error)
-    {
-      if (!state->caught_error)
-        {
-          state->caught_error = TRUE;
-          g_main_context_wakeup (state->content_writing_context);
-          g_propagate_error (state->async_error, local_error);
-        }
-      else
-        {
-          g_error_free (local_error);
-        }
-    }
-}
-
 static gboolean
 dispatch_write (OstreeRepo                 *repo,
                 StaticDeltaExecutionState  *state,
@@ -663,9 +583,29 @@ dispatch_close (OstreeRepo                 *repo,
     }
   else
     {
-      /* We already have an async write going, the close() above will
-       * ensure it completes.
-       */
+      gs_unref_object GInputStream *instream = NULL;
+      int fd;
+      struct stat stbuf;
+
+      if (!ot_openat_read_stream (state->repo->tmp_dir_fd,
+                                  state->output_tmp_path, FALSE,
+                                  &instream, cancellable, error))
+        goto out;
+
+      fd = g_file_descriptor_based_get_fd (G_FILE_DESCRIPTOR_BASED (instream));
+      if (fstat (fd, &stbuf) == -1)
+        {
+          gs_set_error_from_errno (error, errno);
+          goto out;
+        }
+
+      /* Now get rid of the temporary */
+      (void) unlinkat (state->repo->tmp_dir_fd, state->output_tmp_path, 0);
+
+      if (!ostree_repo_write_content (repo, tmp_checksum,
+                                      instream, stbuf.st_size,
+                                      NULL, cancellable, error))
+        goto out;
     }
 
   state->output_target = NULL;


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]