[ostree/wip/delta2: 20/21] delta: Write content directly (and async), not via tmpfiles
- From: Colin Walters <walters src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [ostree/wip/delta2: 20/21] delta: Write content directly (and async), not via tmpfiles
- Date: Sun, 4 May 2014 20:56:25 +0000 (UTC)
commit e835aef223a79b96579efdeb2d899931e18d6bf0
Author: Colin Walters <walters verbum org>
Date: Mon Apr 28 18:43:52 2014 -0400
delta: Write content directly (and async), not via tmpfiles
This way we avoid the speed hit of writing a potentially large
tempfile, only to read it and write it again.
Here we're also doing the content writes in parallel.
.../ostree-repo-static-delta-processing.c | 195 ++++++++++++++------
1 files changed, 139 insertions(+), 56 deletions(-)
---
diff --git a/src/libostree/ostree-repo-static-delta-processing.c
b/src/libostree/ostree-repo-static-delta-processing.c
index afbaa3b..7e7c1d9 100644
--- a/src/libostree/ostree-repo-static-delta-processing.c
+++ b/src/libostree/ostree-repo-static-delta-processing.c
@@ -22,6 +22,10 @@
#include <string.h>
+#include <glib-unix.h>
+#include <gio/gunixinputstream.h>
+#include <gio/gunixoutputstream.h>
+
#include "ostree-repo-private.h"
#include "ostree-repo-static-delta-private.h"
#include "ostree-lzma-decompressor.h"
@@ -32,6 +36,7 @@
G_STATIC_ASSERT (sizeof (guint) >= sizeof (guint32));
typedef struct {
+ OstreeRepo *repo;
guint checksum_index;
const guint8 *checksums;
guint n_checksums;
@@ -40,7 +45,10 @@ typedef struct {
guint oplen;
gboolean object_start;
- guint64 object_size;
+ guint outstanding_content_writes;
+ GMainContext *content_writing_context;
+ gboolean caught_error;
+ GError **async_error;
OstreeObjectType output_objtype;
const guint8 *output_target;
@@ -52,6 +60,11 @@ typedef struct {
guint64 payload_size;
} StaticDeltaExecutionState;
+typedef struct {
+ StaticDeltaExecutionState *state;
+ char checksum[65];
+} StaticDeltaContentWrite;
+
typedef gboolean (*DispatchOpFunc) (OstreeRepo *repo,
StaticDeltaExecutionState *state,
GCancellable *cancellable,
@@ -80,14 +93,38 @@ static OstreeStaticDeltaOperation op_dispatch_table[] = {
{ NULL }
};
+static void
+on_content_written (GObject *src,
+ GAsyncResult *result,
+ gpointer user_data);
+
static gboolean
-open_output_target_csum (OstreeRepo *repo,
- StaticDeltaExecutionState *state,
- GCancellable *cancellable,
- GError **error)
+read_varuint64 (StaticDeltaExecutionState *state,
+ guint64 *out_value,
+ GError **error)
+{
+ gsize bytes_read;
+ if (!_ostree_read_varuint64 (state->opdata, state->oplen, out_value, &bytes_read))
+ {
+ g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_FAILED,
+ "Unexpected EOF reading varint");
+ return FALSE;
+ }
+ state->opdata += bytes_read;
+ state->oplen -= bytes_read;
+ return TRUE;
+}
+
+static gboolean
+open_output_target (StaticDeltaExecutionState *state,
+ GCancellable *cancellable,
+ GError **error)
{
gboolean ret = FALSE;
guint8 *objcsum;
+ char checksum[65];
+ guint64 object_size;
+ gs_unref_object GInputStream *content_in_stream = NULL;
g_assert (state->checksums != NULL);
g_assert (state->output_target == NULL);
@@ -102,11 +139,51 @@ open_output_target_csum (OstreeRepo *repo,
state->output_objtype = (OstreeObjectType) *objcsum;
state->output_target = objcsum + 1;
- if (!gs_file_open_in_tmpdir (repo->tmp_dir, 0644,
- &state->output_tmp_path, &state->output_tmp_stream,
- cancellable, error))
+
+ ostree_checksum_inplace_from_bytes (state->output_target, checksum);
+
+ /* Object size is the first element of the opstream */
+ if (!read_varuint64 (state, &object_size, error))
goto out;
+ if (OSTREE_OBJECT_TYPE_IS_META (state->output_objtype))
+ {
+ if (!gs_file_open_in_tmpdir (state->repo->tmp_dir, 0644,
+ &state->output_tmp_path, &state->output_tmp_stream,
+ cancellable, error))
+ goto out;
+ }
+ else
+ {
+ int pipefds[2];
+
+ if (!g_unix_open_pipe (pipefds, FD_CLOEXEC, error))
+ goto out;
+
+ content_in_stream = g_unix_input_stream_new (pipefds[0], TRUE);
+ state->output_tmp_stream = g_unix_output_stream_new (pipefds[1], TRUE);
+
+ if (!state->content_writing_context)
+ state->content_writing_context = g_main_context_new();
+
+ g_main_context_push_thread_default (state->content_writing_context);
+
+ {
+ StaticDeltaContentWrite *writedata = g_new0 (StaticDeltaContentWrite, 1);
+ writedata->state = state;
+ memcpy (writedata->checksum, checksum, sizeof (writedata->checksum));
+ ostree_repo_write_content_async (state->repo, checksum,
+ content_in_stream,
+ object_size,
+ cancellable,
+ on_content_written,
+ writedata);
+ }
+ state->outstanding_content_writes++;
+
+ g_main_context_pop_thread_default (state->content_writing_context);
+ }
+
ret = TRUE;
out:
return ret;
@@ -147,23 +224,6 @@ _ostree_static_delta_part_validate (OstreeRepo *repo,
return ret;
}
-static gboolean
-read_varuint64 (StaticDeltaExecutionState *state,
- guint64 *out_value,
- GError **error)
-{
- gsize bytes_read;
- if (!_ostree_read_varuint64 (state->opdata, state->oplen, out_value, &bytes_read))
- {
- g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_FAILED,
- "Unexpected EOF reading varint");
- return FALSE;
- }
- state->opdata += bytes_read;
- state->oplen -= bytes_read;
- return TRUE;
-}
-
gboolean
_ostree_static_delta_part_execute_raw (OstreeRepo *repo,
GVariant *objects,
@@ -180,6 +240,9 @@ _ostree_static_delta_part_execute_raw (OstreeRepo *repo,
StaticDeltaExecutionState *state = &statedata;
guint n_executed = 0;
+ state->repo = repo;
+ state->async_error = error;
+
if (!_ostree_static_delta_parse_checksum_array (objects,
&checksums_data,
&state->n_checksums,
@@ -188,8 +251,6 @@ _ostree_static_delta_part_execute_raw (OstreeRepo *repo,
state->checksums = checksums_data;
g_assert (state->n_checksums > 0);
- if (!open_output_target_csum (repo, state, cancellable, error))
- goto out;
g_variant_get (part, "(@ay ay)", &payload, &ops);
@@ -206,7 +267,7 @@ _ostree_static_delta_part_execute_raw (OstreeRepo *repo,
if (state->object_start)
{
- if (!read_varuint64 (state, &state->object_size, error))
+ if (!open_output_target (state, cancellable, error))
goto out;
state->object_start = FALSE;
}
@@ -229,8 +290,17 @@ _ostree_static_delta_part_execute_raw (OstreeRepo *repo,
n_executed++;
}
+ while (state->outstanding_content_writes > 0)
+ g_main_context_iteration (state->content_writing_context, TRUE);
+
+ if (state->caught_error)
+ goto out;
+
ret = TRUE;
out:
+ g_clear_pointer (&state->content_writing_context, g_main_context_unref);
+ g_clear_object (&state->output_tmp_path);
+ g_clear_object (&state->output_tmp_stream);
return ret;
}
@@ -323,7 +393,7 @@ _ostree_static_delta_part_execute (OstreeRepo *repo,
if (!_ostree_static_delta_part_execute_raw (repo, header, payload,
cancellable, error))
goto out;
-
+
ret = TRUE;
out:
return ret;
@@ -421,7 +491,41 @@ validate_ofs (StaticDeltaExecutionState *state,
}
return TRUE;
}
-
+
+static void
+on_content_written (GObject *src,
+ GAsyncResult *result,
+ gpointer user_data)
+{
+ StaticDeltaContentWrite *writedata = user_data;
+ StaticDeltaExecutionState *state = writedata->state;
+ GError *local_error = NULL;
+ GError **error = &local_error;
+
+ if (!ostree_repo_write_content_finish ((OstreeRepo*)src, result, NULL, error))
+ goto out;
+
+ g_print ("Wrote content object '%s'\n", writedata->checksum);
+
+ out:
+ state->outstanding_content_writes--;
+ if (state->outstanding_content_writes == 0)
+ g_main_context_wakeup (state->content_writing_context);
+ if (local_error)
+ {
+ if (!state->caught_error)
+ {
+ state->caught_error = TRUE;
+ g_main_context_wakeup (state->content_writing_context);
+ g_propagate_error (state->async_error, local_error);
+ }
+ else
+ {
+ g_error_free (local_error);
+ }
+ }
+}
+
static gboolean
dispatch_write (OstreeRepo *repo,
StaticDeltaExecutionState *state,
@@ -521,7 +625,6 @@ dispatch_close (OstreeRepo *repo,
}
g_assert (state->output_tmp_stream);
- g_assert (state->output_tmp_path);
if (!g_output_stream_close (state->output_tmp_stream, cancellable, error))
goto out;
@@ -533,6 +636,8 @@ dispatch_close (OstreeRepo *repo,
if (OSTREE_OBJECT_TYPE_IS_META (state->output_objtype))
{
gs_unref_variant GVariant *metadata = NULL;
+
+ g_assert (state->output_tmp_path);
if (!ot_util_variant_map (state->output_tmp_path,
ostree_metadata_variant_type (state->output_objtype),
@@ -548,38 +653,16 @@ dispatch_close (OstreeRepo *repo,
}
else
{
- gs_unref_object GInputStream *in = NULL;
- gs_unref_object GFileInfo *info = NULL;
-
- in = (GInputStream*)g_file_read (state->output_tmp_path, cancellable, error);
- if (!in)
- goto out;
-
- info = g_file_input_stream_query_info ((GFileInputStream*)in, G_FILE_ATTRIBUTE_STANDARD_SIZE,
- cancellable, error);
- if (!info)
- goto out;
-
- if (!ostree_repo_write_content (repo, tmp_checksum, in,
- g_file_info_get_size (info), NULL,
- cancellable, error))
- goto out;
-
- g_print ("Wrote content object '%s'\n",
- tmp_checksum);
+ /* We already have an async write going, the close() above will
+ * ensure it completes.
+ */
}
state->output_target = NULL;
g_clear_object (&state->output_tmp_path);
state->object_start = TRUE;
-
state->checksum_index++;
- if (state->checksum_index < state->n_checksums)
- {
- if (!open_output_target_csum (repo, state, cancellable, error))
- goto out;
- }
ret = TRUE;
out:
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]