[ostree/wip/delta-continuation: 5/7] wip
- From: Colin Walters <walters src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [ostree/wip/delta-continuation: 5/7] wip
- Date: Sun, 13 Apr 2014 06:15:59 +0000 (UTC)
commit 69a0d495c41db68059c72ececd2e0db9fe94a4ec
Author: Colin Walters <walters verbum org>
Date: Sat Apr 12 14:54:28 2014 -0400
wip
src/libostree/ostree-repo-pull.c | 75 +++++++-
src/libostree/ostree-repo-static-delta-core.c | 83 +--------
src/libostree/ostree-repo-static-delta-private.h | 28 +++-
.../ostree-repo-static-delta-processing.c | 205 +++++++++++++++++++-
4 files changed, 303 insertions(+), 88 deletions(-)
---
diff --git a/src/libostree/ostree-repo-pull.c b/src/libostree/ostree-repo-pull.c
index 8f5ce89..4ac18c0 100644
--- a/src/libostree/ostree-repo-pull.c
+++ b/src/libostree/ostree-repo-pull.c
@@ -84,6 +84,7 @@ typedef struct {
typedef struct {
OtPullData *pull_data;
+ GVariant *objects;
char *expected_checksum;
} FetchStaticDeltaData;
@@ -707,6 +708,36 @@ meta_fetch_on_complete (GObject *object,
}
static void
+fetch_static_delta_data_free (gpointer data)
+{
+ FetchStaticDeltaData *fetch_data = data;
+ g_free (fetch_data->expected_checksum);
+ g_variant_unref (fetch_data->headers);
+ g_free (fetch_data);
+}
+
+static void
+on_static_delta_written (GObject *object,
+ GAsyncResult *result,
+ gpointer user_data)
+{
+ FetchStaticDeltaData *fetch_data = user_data;
+ OtPullData *pull_data = fetch_data->pull_data;
+ GError *local_error = NULL;
+ GError **error = &local_error;
+
+
+ out:
+ g_assert (pull_data->n_outstanding_deltapart_fetches > 0);
+ pull_data->n_outstanding_deltapart_fetches--;
+ pull_data->n_fetched_deltaparts++;
+ pull_data->n_outstanding_deltapart_write_requests++;
+ throw_async_error (pull_data, local_error);
+ if (local_error)
+ fetch_static_delta_data_free (fetch_data);
+}
+
+static void
static_deltapart_fetch_on_complete (GObject *object,
GAsyncResult *result,
gpointer user_data)
@@ -715,7 +746,8 @@ static_deltapart_fetch_on_complete (GObject *object,
OtPullData *pull_data = fetch_data->pull_data;
gs_unref_variant GVariant *metadata = NULL;
gs_unref_object GFile *temp_path = NULL;
- const char *checksum;
+ gs_unref_object GInputStream *in = NULL;
+ gs_free char *actual_checksum = NULL;
OstreeObjectType objtype;
GError *local_error = NULL;
GError **error = &local_error;
@@ -725,18 +757,50 @@ static_deltapart_fetch_on_complete (GObject *object,
temp_path = ostree_fetcher_request_uri_with_partial_finish ((OstreeFetcher*)object, result, error);
if (!temp_path)
goto out;
+
+ in = g_file_read (temp_path, cancellable, error);
+ if (!in)
+ goto out;
+ /* TODO - consider making async */
+ if (!ot_gio_checksum_stream (in, &csum, cancellable, error))
+ goto out;
+
+ actual_checksum = ostree_checksum_from_bytes (csum);
+
+ if (strcmp (checksum, fetch_data->expected_checksum) != 0)
+ {
+ g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED,
+ "Corrupted static delta part; checksum expected='%s' actual='%s'",
+ fetch_data->expected_checksum, checksum);
+ goto out;
+ }
+
+ /* Might as well close the fd here */
+ (void) g_input_stream_close (in, NULL, NULL);
+
+ {
+ gs_unref_bytes GBytes *delta_data
+ = gs_file_map_readonly (temp_path, cancellable, error);
+ if (!delta_data)
+ goto out;
+
+ _ostree_static_delta_part_execute_async (pull_data->repo,
+ fetch_data->objects,
+ delta_data,
+ cancellable,
+ on_static_delta_written,
+ fetch_data);
+ }
out:
g_assert (pull_data->n_outstanding_deltapart_fetches > 0);
pull_data->n_outstanding_deltapart_fetches--;
pull_data->n_fetched_deltaparts++;
+ pull_data->n_outstanding_deltapart_write_requests++;
throw_async_error (pull_data, local_error);
if (local_error)
- {
- g_free (fetch_data->expected_checksum);
- g_free (fetch_data);
- }
+ fetch_static_delta_data_free (fetch_data);
}
static gboolean
@@ -1161,6 +1225,7 @@ process_one_static_delta (OtPullData *pull_data,
fetch_data = g_new0 (FetchStaticDeltaData, 1);
fetch_data->pull_data = pull_data;
+ fetch_data->objects = g_variant_ref (objects);
fetch_data->expected_checksum = ostree_checksum_from_bytes_v (csum_v);
target_uri = suburi_new (pull_data->base_uri, deltapart_path, NULL);
diff --git a/src/libostree/ostree-repo-static-delta-core.c b/src/libostree/ostree-repo-static-delta-core.c
index 92174f3..e1289a1 100644
--- a/src/libostree/ostree-repo-static-delta-core.c
+++ b/src/libostree/ostree-repo-static-delta-core.c
@@ -160,31 +160,6 @@ _ostree_repo_static_delta_part_have_all_objects (OstreeRepo *repo,
return ret;
}
-static gboolean
-zlib_uncompress_data (GBytes *data,
- GBytes **out_uncompressed,
- GCancellable *cancellable,
- GError **error)
-{
- gboolean ret = FALSE;
- gs_unref_object GMemoryInputStream *memin = (GMemoryInputStream*)g_memory_input_stream_new_from_bytes
(data);
- gs_unref_object GMemoryOutputStream *memout = (GMemoryOutputStream*)g_memory_output_stream_new (NULL, 0,
g_realloc, g_free);
- gs_unref_object GConverter *zlib_decomp =
- (GConverter*) g_zlib_decompressor_new (G_ZLIB_COMPRESSOR_FORMAT_RAW);
- gs_unref_object GInputStream *convin = g_converter_input_stream_new ((GInputStream*)memin, zlib_decomp);
-
- if (0 > g_output_stream_splice ((GOutputStream*)memout, convin,
- G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE |
- G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET,
- cancellable, error))
- goto out;
-
- ret = TRUE;
- *out_uncompressed = g_memory_output_stream_steal_as_bytes (memout);
- out:
- return ret;
-}
-
/**
* ostree_repo_static_delta_execute_offline:
* @self: Repo
@@ -227,7 +202,6 @@ ostree_repo_static_delta_execute_offline (OstreeRepo *self,
gs_unref_variant GVariant *csum_v = NULL;
gs_unref_variant GVariant *objects = NULL;
gs_unref_object GFile *part_path = NULL;
- gs_unref_variant GVariant *part = NULL;
gs_unref_object GInputStream *raw_in = NULL;
gs_unref_object GInputStream *in = NULL;
@@ -256,70 +230,25 @@ ostree_repo_static_delta_execute_offline (OstreeRepo *self,
if (!skip_validation)
{
- gs_unref_object GInputStream *tmp_in = NULL;
- gs_free guchar *actual_checksum = NULL;
-
- tmp_in = (GInputStream*)g_file_read (part_path, cancellable, error);
- if (!tmp_in)
+ gs_free char *expected_checksum = ostree_checksum_from_bytes (csum);
+ if (!_ostree_static_delta_part_validate (self, part_path, i,
+ expected_checksum,
+ cancellable, error))
goto out;
-
- if (!ot_gio_checksum_stream (tmp_in, &actual_checksum,
- cancellable, error))
- goto out;
-
- if (ostree_cmp_checksum_bytes (csum, actual_checksum) != 0)
- {
- g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED,
- "Checksum mismatch in static delta %s part %u",
- gs_file_get_path_cached (dir), i);
- goto out;
- }
}
{
GMappedFile *mfile = gs_file_map_noatime (part_path, cancellable, error);
gs_unref_bytes GBytes *bytes = NULL;
- gs_unref_bytes GBytes *payload = NULL;
- gsize partlen;
- const guint8*partdata;
if (!mfile)
goto out;
bytes = g_mapped_file_get_bytes (mfile);
g_mapped_file_unref (mfile);
-
- partdata = g_bytes_get_data (bytes, &partlen);
-
- if (partlen < 1)
- {
- g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED,
- "Corrupted 0 length byte part %s/%i",
- gs_file_get_basename_cached (dir),
- i);
- goto out;
- }
-
- switch (partdata[0])
- {
- case 0:
- payload = g_bytes_new_from_bytes (bytes, 1, partlen - 1);
- break;
- case 'g':
- {
- gs_unref_bytes GBytes *subbytes = g_bytes_new_from_bytes (bytes, 1, partlen - 1);
- if (!zlib_uncompress_data (subbytes, &payload,
- cancellable, error))
- goto out;
- }
- break;
- }
-
- part = ot_variant_new_from_bytes (G_VARIANT_TYPE (OSTREE_STATIC_DELTA_PART_PAYLOAD_FORMAT),
- payload, FALSE);
-
- if (!_ostree_static_delta_part_execute (self, objects, part, cancellable, error))
+ if (!_ostree_static_delta_part_execute (self, objects, bytes,
+ cancellable, error))
{
g_prefix_error (error, "executing delta part %i: ", i);
goto out;
diff --git a/src/libostree/ostree-repo-static-delta-private.h
b/src/libostree/ostree-repo-static-delta-private.h
index 37a3a90..facc25b 100644
--- a/src/libostree/ostree-repo-static-delta-private.h
+++ b/src/libostree/ostree-repo-static-delta-private.h
@@ -32,6 +32,8 @@ G_BEGIN_DECLS
/**
* OSTREE_STATIC_DELTA_PART_PAYLOAD_FORMAT:
*
+ * y compression type (0: none, 'z': zlib)
+ * ---
* ay data source
* ay operations
*/
@@ -73,12 +75,36 @@ G_BEGIN_DECLS
*/
#define OSTREE_STATIC_DELTA_SUPERBLOCK_FORMAT "(a{sv}taya" OSTREE_STATIC_DELTA_META_ENTRY_FORMAT ")"
+gboolean _ostree_static_delta_part_validate (OstreeRepo *repo,
+ GFile *part_path,
+ guint part_offset,
+ const char *expected_checksum,
+ GCancellable *cancellable,
+ GError **error);
+
gboolean _ostree_static_delta_part_execute (OstreeRepo *repo,
GVariant *header,
- GVariant *part,
+ GBytes *partdata,
GCancellable *cancellable,
GError **error);
+gboolean _ostree_static_delta_part_execute_raw (OstreeRepo *repo,
+ GVariant *header,
+ GVariant *part,
+ GCancellable *cancellable,
+ GError **error);
+
+void _ostree_static_delta_part_execute_async (OstreeRepo *repo,
+ GVariant *header,
+ GBytes *partdata,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer user_data);
+
+gboolean _ostree_static_delta_part_execute_finish (OstreeRepo *repo,
+ GAsyncResult *result,
+ GError **error);
+
typedef enum {
OSTREE_STATIC_DELTA_OP_FETCH = 1,
OSTREE_STATIC_DELTA_OP_WRITE = 2,
diff --git a/src/libostree/ostree-repo-static-delta-processing.c
b/src/libostree/ostree-repo-static-delta-processing.c
index a373f00..5ed9074 100644
--- a/src/libostree/ostree-repo-static-delta-processing.c
+++ b/src/libostree/ostree-repo-static-delta-processing.c
@@ -110,13 +110,47 @@ open_output_target_csum (OstreeRepo *repo,
return ret;
}
+gboolean
+_ostree_static_delta_part_validate (OstreeRepo *repo,
+ GFile *part_path,
+ guint part_offset,
+ const char *expected_checksum,
+ GCancellable *cancellable,
+ GError **error)
+{
+ gboolean ret = FALSE;
+ gs_unref_object GInputStream *tmp_in = NULL;
+ gs_free guchar *actual_checksum_bytes = NULL;
+ gs_free gchar *actual_checksum = NULL;
+
+ tmp_in = (GInputStream*)g_file_read (part_path, cancellable, error);
+ if (!tmp_in)
+ goto out;
+
+ if (!ot_gio_checksum_stream (tmp_in, &actual_checksum_bytes,
+ cancellable, error))
+ goto out;
+
+ actual_checksum = ostree_checksum_from_bytes (actual_checksum_bytes);
+ if (strcmp (actual_checksum, expected_checksum) != 0)
+ {
+ g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED,
+ "Checksum mismatch in static delta part %u; expected=%s actual=%s",
+ part_offset, expected_checksum, actual_checksum);
+ goto out;
+ }
+
+ ret = TRUE;
+ out:
+ return ret;
+}
gboolean
-_ostree_static_delta_part_execute (OstreeRepo *repo,
- GVariant *objects,
- GVariant *part,
- GCancellable *cancellable,
- GError **error)
+_ostree_static_delta_part_execute_raw (OstreeRepo *repo,
+ GVariant *objects,
+ GVariant *part,
+ GCancellable *cancellable,
+ GError **error)
{
gboolean ret = FALSE;
guint8 *checksums_data;
@@ -172,6 +206,167 @@ _ostree_static_delta_part_execute (OstreeRepo *repo,
}
static gboolean
+zlib_uncompress_data (GBytes *data,
+ GBytes **out_uncompressed,
+ GCancellable *cancellable,
+ GError **error)
+{
+ gboolean ret = FALSE;
+ gs_unref_object GMemoryInputStream *memin = (GMemoryInputStream*)g_memory_input_stream_new_from_bytes
(data);
+ gs_unref_object GMemoryOutputStream *memout = (GMemoryOutputStream*)g_memory_output_stream_new (NULL, 0,
g_realloc, g_free);
+ gs_unref_object GConverter *zlib_decomp =
+ (GConverter*) g_zlib_decompressor_new (G_ZLIB_COMPRESSOR_FORMAT_RAW);
+ gs_unref_object GInputStream *convin = g_converter_input_stream_new ((GInputStream*)memin, zlib_decomp);
+
+ if (0 > g_output_stream_splice ((GOutputStream*)memout, convin,
+ G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE |
+ G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET,
+ cancellable, error))
+ goto out;
+
+ ret = TRUE;
+ *out_uncompressed = g_memory_output_stream_steal_as_bytes (memout);
+ out:
+ return ret;
+}
+
+gboolean
+_ostree_static_delta_part_execute (OstreeRepo *repo,
+ GVariant *header,
+ GBytes *part_bytes,
+ GCancellable *cancellable,
+ GError **error)
+{
+ gboolean ret = FALSE;
+ gsize partlen;
+ const guint8*partdata;
+ gs_unref_variant GVariant *part = NULL;
+ gs_unref_bytes GBytes *part_payload_bytes = NULL;
+ gs_unref_bytes GBytes *payload_data = NULL;
+ gs_unref_variant GVariant *payload = NULL;
+ guint8 comptype;
+
+ partdata = g_bytes_get_data (part_bytes, &partlen);
+
+ if (partlen < 1)
+ {
+ g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED,
+ "Corrupted 0 length delta part");
+ goto out;
+ }
+
+ /* First byte is compression type */
+ comptype = partdata[0];
+ /* Then the rest may be compressed or uncompressed */
+ part_payload_bytes = g_bytes_new_from_bytes (part_bytes, 1, partlen - 1);
+ switch (comptype)
+ {
+ case 0:
+ /* No compression */
+ payload_data = g_bytes_ref (part_payload_bytes);
+ break;
+ case 'g':
+ {
+ if (!zlib_uncompress_data (part_payload_bytes, &payload_data,
+ cancellable, error))
+ goto out;
+ }
+ break;
+ default:
+ g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED,
+ "Invalid compression type '%u'", comptype);
+ goto out;
+ }
+
+ payload = ot_variant_new_from_bytes (G_VARIANT_TYPE (OSTREE_STATIC_DELTA_PART_PAYLOAD_FORMAT),
+ payload_data, FALSE);
+ if (!_ostree_static_delta_part_execute_raw (repo, header, part,
+ cancellable, error))
+ goto out;
+
+ ret = TRUE;
+ out:
+ return ret;
+}
+
+typedef struct {
+ OstreeRepo *repo;
+ GVariant *header;
+ GBytes *partdata;
+ GCancellable *cancellable;
+ GAsyncResult *result;
+} StaticDeltaPartExecuteAsyncData;
+
+static void
+static_delta_part_execute_async_data_free (gpointer user_data)
+{
+ StaticDeltaPartExecuteAsyncData *data = user_data;
+
+ g_clear_object (&data->repo);
+ g_variant_unref (data->header);
+ g_bytes_unref (data->partdata);
+ g_clear_object (&data->cancellable);
+ g_free (data);
+}
+
+static void
+static_delta_part_execute_thread (GSimpleAsyncResult *res,
+ GObject *object,
+ GCancellable *cancellable)
+{
+ GError *error = NULL;
+ StaticDeltaPartExecuteAsyncData *data;
+
+ data = g_simple_async_result_get_op_res_gpointer (res);
+ if (!_ostree_static_delta_part_execute (data->repo,
+ data->header,
+ data->partdata,
+ cancellable, &error))
+ g_simple_async_result_take_error (res, error);
+}
+
+void
+_ostree_static_delta_part_execute_async (OstreeRepo *repo,
+ GVariant *header,
+ GBytes *partdata,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer user_data)
+{
+ StaticDeltaPartExecuteAsyncData *asyncdata;
+
+ asyncdata = g_new0 (StaticDeltaPartExecuteAsyncData, 1);
+ asyncdata->repo = g_object_ref (repo);
+ asyncdata->header = g_variant_ref (header);
+ asyncdata->partdata = g_bytes_ref (partdata);
+ asyncdata->cancellable = cancellable ? g_object_ref (cancellable) : NULL;
+
+ asyncdata->result = g_simple_async_result_new ((GObject*) self,
+ callback, user_data,
+ _ostree_static_delta_part_execute_async);
+
+ g_simple_async_result_set_op_res_gpointer (asyncdata->result, asyncdata,
+ static_delta_part_execute_async_data_free);
+ g_simple_async_result_run_in_thread (asyncdata->result, static_delta_part_execute_thread,
G_PRIORITY_DEFAULT, cancellable);
+ g_object_unref (asyncdata->result);
+}
+
+gboolean
+_ostree_static_delta_part_execute_finish (OstreeRepo *repo,
+ GAsyncResult *result,
+ GError **error)
+{
+ GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
+ StaticDeltaPartExecuteAsyncData *asyncdata;
+
+ g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == _ostree_static_delta_part_execute_async);
+
+ if (g_simple_async_result_propagate_error (simple, error))
+ return FALSE;
+ return TRUE;
+}
+
+static gboolean
dispatch_fetch (OstreeRepo *repo,
StaticDeltaExecutionState *state,
GCancellable *cancellable,
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]