[ostree/wip/delta-continuation: 5/7] wip



commit 69a0d495c41db68059c72ececd2e0db9fe94a4ec
Author: Colin Walters <walters verbum org>
Date:   Sat Apr 12 14:54:28 2014 -0400

    wip

 src/libostree/ostree-repo-pull.c                   |   75 +++++++-
 src/libostree/ostree-repo-static-delta-core.c      |   83 +--------
 src/libostree/ostree-repo-static-delta-private.h   |   28 +++-
 .../ostree-repo-static-delta-processing.c          |  205 +++++++++++++++++++-
 4 files changed, 303 insertions(+), 88 deletions(-)
---
diff --git a/src/libostree/ostree-repo-pull.c b/src/libostree/ostree-repo-pull.c
index 8f5ce89..4ac18c0 100644
--- a/src/libostree/ostree-repo-pull.c
+++ b/src/libostree/ostree-repo-pull.c
@@ -84,6 +84,7 @@ typedef struct {
 
 typedef struct {
   OtPullData  *pull_data;
+  GVariant *objects;
   char *expected_checksum;
 } FetchStaticDeltaData;
 
@@ -707,6 +708,36 @@ meta_fetch_on_complete (GObject           *object,
 }
 
 static void
+fetch_static_delta_data_free (gpointer  data)
+{
+  FetchStaticDeltaData *fetch_data = data;
+  g_free (fetch_data->expected_checksum);
+  g_variant_unref (fetch_data->headers);
+  g_free (fetch_data);
+}
+
+static void
+on_static_delta_written (GObject           *object,
+                         GAsyncResult      *result,
+                         gpointer           user_data)
+{
+  FetchStaticDeltaData *fetch_data = user_data;
+  OtPullData *pull_data = fetch_data->pull_data;
+  GError *local_error = NULL;
+  GError **error = &local_error;
+
+
+ out:
+  g_assert (pull_data->n_outstanding_deltapart_fetches > 0);
+  pull_data->n_outstanding_deltapart_fetches--;
+  pull_data->n_fetched_deltaparts++;
+  pull_data->n_outstanding_deltapart_write_requests++;
+  throw_async_error (pull_data, local_error);
+  if (local_error)
+    fetch_static_delta_data_free (fetch_data);
+}
+
+static void
 static_deltapart_fetch_on_complete (GObject           *object,
                                     GAsyncResult      *result,
                                     gpointer           user_data)
@@ -715,7 +746,8 @@ static_deltapart_fetch_on_complete (GObject           *object,
   OtPullData *pull_data = fetch_data->pull_data;
   gs_unref_variant GVariant *metadata = NULL;
   gs_unref_object GFile *temp_path = NULL;
-  const char *checksum;
+  gs_unref_object GInputStream *in = NULL;
+  gs_free char *actual_checksum = NULL;
   OstreeObjectType objtype;
   GError *local_error = NULL;
   GError **error = &local_error;
@@ -725,18 +757,50 @@ static_deltapart_fetch_on_complete (GObject           *object,
   temp_path = ostree_fetcher_request_uri_with_partial_finish ((OstreeFetcher*)object, result, error);
   if (!temp_path)
     goto out;
+
+  in = g_file_read (temp_path, cancellable, error);
+  if (!in)
+    goto out;
   
+  /* TODO - consider making async */
+  if (!ot_gio_checksum_stream (in, &csum, cancellable, error))
+    goto out;
+
+  actual_checksum = ostree_checksum_from_bytes (csum);
+
+  if (strcmp (checksum, fetch_data->expected_checksum) != 0)
+    {
+      g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED,
+                   "Corrupted static delta part; checksum expected='%s' actual='%s'",
+                   fetch_data->expected_checksum, checksum);
+      goto out;
+    }
+
+  /* Might as well close the fd here */
+  (void) g_input_stream_close (in, NULL, NULL);
+
+  {
+    gs_unref_bytes GBytes *delta_data
+      = gs_file_map_readonly (temp_path, cancellable, error);
+    if (!delta_data)
+      goto out;
+
+    _ostree_static_delta_part_execute_async (pull_data->repo,
+                                             fetch_data->objects,
+                                             delta_data,
+                                             cancellable,
+                                             on_static_delta_written,
+                                             fetch_data);
+  }
 
  out:
   g_assert (pull_data->n_outstanding_deltapart_fetches > 0);
   pull_data->n_outstanding_deltapart_fetches--;
   pull_data->n_fetched_deltaparts++;
+  pull_data->n_outstanding_deltapart_write_requests++;
   throw_async_error (pull_data, local_error);
   if (local_error)
-    {
-      g_free (fetch_data->expected_checksum);
-      g_free (fetch_data);
-    }
+    fetch_static_delta_data_free (fetch_data);
 }
 
 static gboolean
@@ -1161,6 +1225,7 @@ process_one_static_delta (OtPullData   *pull_data,
 
       fetch_data = g_new0 (FetchStaticDeltaData, 1);
       fetch_data->pull_data = pull_data;
+      fetch_data->objects = g_variant_ref (objects);
       fetch_data->expected_checksum = ostree_checksum_from_bytes_v (csum_v);
 
       target_uri = suburi_new (pull_data->base_uri, deltapart_path, NULL);
diff --git a/src/libostree/ostree-repo-static-delta-core.c b/src/libostree/ostree-repo-static-delta-core.c
index 92174f3..e1289a1 100644
--- a/src/libostree/ostree-repo-static-delta-core.c
+++ b/src/libostree/ostree-repo-static-delta-core.c
@@ -160,31 +160,6 @@ _ostree_repo_static_delta_part_have_all_objects (OstreeRepo             *repo,
   return ret;
 }
 
-static gboolean
-zlib_uncompress_data (GBytes       *data,
-                      GBytes      **out_uncompressed,
-                      GCancellable *cancellable,
-                      GError      **error)
-{
-  gboolean ret = FALSE;
-  gs_unref_object GMemoryInputStream *memin = (GMemoryInputStream*)g_memory_input_stream_new_from_bytes 
(data);
-  gs_unref_object GMemoryOutputStream *memout = (GMemoryOutputStream*)g_memory_output_stream_new (NULL, 0, 
g_realloc, g_free);
-  gs_unref_object GConverter *zlib_decomp =
-    (GConverter*) g_zlib_decompressor_new (G_ZLIB_COMPRESSOR_FORMAT_RAW);
-  gs_unref_object GInputStream *convin = g_converter_input_stream_new ((GInputStream*)memin, zlib_decomp);
-
-  if (0 > g_output_stream_splice ((GOutputStream*)memout, convin,
-                                  G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE |
-                                  G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET,
-                                  cancellable, error))
-    goto out;
-
-  ret = TRUE;
-  *out_uncompressed = g_memory_output_stream_steal_as_bytes (memout);
- out:
-  return ret;
-}
-
 /**
  * ostree_repo_static_delta_execute_offline:
  * @self: Repo
@@ -227,7 +202,6 @@ ostree_repo_static_delta_execute_offline (OstreeRepo                    *self,
       gs_unref_variant GVariant *csum_v = NULL;
       gs_unref_variant GVariant *objects = NULL;
       gs_unref_object GFile *part_path = NULL;
-      gs_unref_variant GVariant *part = NULL;
       gs_unref_object GInputStream *raw_in = NULL;
       gs_unref_object GInputStream *in = NULL;
 
@@ -256,70 +230,25 @@ ostree_repo_static_delta_execute_offline (OstreeRepo                    *self,
 
       if (!skip_validation)
         {
-          gs_unref_object GInputStream *tmp_in = NULL;
-          gs_free guchar *actual_checksum = NULL;
-
-          tmp_in = (GInputStream*)g_file_read (part_path, cancellable, error);
-          if (!tmp_in)
+          gs_free char *expected_checksum = ostree_checksum_from_bytes (csum);
+          if (!_ostree_static_delta_part_validate (self, part_path, i,
+                                                   expected_checksum,
+                                                   cancellable, error))
             goto out;
-
-          if (!ot_gio_checksum_stream (tmp_in, &actual_checksum,
-                                       cancellable, error))
-            goto out;
-
-          if (ostree_cmp_checksum_bytes (csum, actual_checksum) != 0)
-            {
-              g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED,
-                           "Checksum mismatch in static delta %s part %u",
-                           gs_file_get_path_cached (dir), i);
-              goto out;
-            }
         }
 
       {
         GMappedFile *mfile = gs_file_map_noatime (part_path, cancellable, error);
         gs_unref_bytes GBytes *bytes = NULL;
-        gs_unref_bytes GBytes *payload = NULL;
-        gsize partlen;
-        const guint8*partdata;
 
         if (!mfile)
           goto out;
 
         bytes = g_mapped_file_get_bytes (mfile);
         g_mapped_file_unref (mfile);
-
-        partdata = g_bytes_get_data (bytes, &partlen);
-
-        if (partlen < 1)
-          {
-            g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED,
-                         "Corrupted 0 length byte part %s/%i",
-                         gs_file_get_basename_cached (dir),
-                         i);
-            goto out;
-          }
-        
-        switch (partdata[0])
-          {
-          case 0:
-            payload = g_bytes_new_from_bytes (bytes, 1, partlen - 1);
-            break;
-          case 'g':
-            {
-              gs_unref_bytes GBytes *subbytes = g_bytes_new_from_bytes (bytes, 1, partlen - 1);
-              if (!zlib_uncompress_data (subbytes, &payload,
-                                         cancellable, error))
-                goto out;
-            }
-            break;
-          }
-        
-        part = ot_variant_new_from_bytes (G_VARIANT_TYPE (OSTREE_STATIC_DELTA_PART_PAYLOAD_FORMAT),
-                                          payload, FALSE);
-        
         
-        if (!_ostree_static_delta_part_execute (self, objects, part, cancellable, error))
+        if (!_ostree_static_delta_part_execute (self, objects, bytes,
+                                                cancellable, error))
           {
             g_prefix_error (error, "executing delta part %i: ", i);
             goto out;
diff --git a/src/libostree/ostree-repo-static-delta-private.h 
b/src/libostree/ostree-repo-static-delta-private.h
index 37a3a90..facc25b 100644
--- a/src/libostree/ostree-repo-static-delta-private.h
+++ b/src/libostree/ostree-repo-static-delta-private.h
@@ -32,6 +32,8 @@ G_BEGIN_DECLS
 /**
  * OSTREE_STATIC_DELTA_PART_PAYLOAD_FORMAT:
  *
+ *   y  compression type (0: none, 'z': zlib)
+ *   ---
  *   ay data source
  *   ay operations
  */
@@ -73,12 +75,36 @@ G_BEGIN_DECLS
  */ 
 #define OSTREE_STATIC_DELTA_SUPERBLOCK_FORMAT "(a{sv}taya" OSTREE_STATIC_DELTA_META_ENTRY_FORMAT ")"
 
+gboolean _ostree_static_delta_part_validate (OstreeRepo     *repo,
+                                             GFile          *part_path,
+                                             guint           part_offset,
+                                             const char     *expected_checksum,
+                                             GCancellable   *cancellable,
+                                             GError        **error);
+
 gboolean _ostree_static_delta_part_execute (OstreeRepo      *repo,
                                             GVariant        *header,
-                                            GVariant        *part,
+                                            GBytes          *partdata,
                                             GCancellable    *cancellable,
                                             GError         **error);
 
+gboolean _ostree_static_delta_part_execute_raw (OstreeRepo      *repo,
+                                                GVariant        *header,
+                                                GVariant        *part,
+                                                GCancellable    *cancellable,
+                                                GError         **error);
+
+void _ostree_static_delta_part_execute_async (OstreeRepo      *repo,
+                                              GVariant        *header,
+                                              GBytes          *partdata,
+                                              GCancellable    *cancellable,
+                                              GAsyncReadyCallback  callback,
+                                              gpointer         user_data);
+
+gboolean _ostree_static_delta_part_execute_finish (OstreeRepo      *repo,
+                                                   GAsyncResult    *result,
+                                                   GError         **error); 
+
 typedef enum {
   OSTREE_STATIC_DELTA_OP_FETCH = 1,
   OSTREE_STATIC_DELTA_OP_WRITE = 2,
diff --git a/src/libostree/ostree-repo-static-delta-processing.c 
b/src/libostree/ostree-repo-static-delta-processing.c
index a373f00..5ed9074 100644
--- a/src/libostree/ostree-repo-static-delta-processing.c
+++ b/src/libostree/ostree-repo-static-delta-processing.c
@@ -110,13 +110,47 @@ open_output_target_csum (OstreeRepo                  *repo,
   return ret;
 }
 
+gboolean
+_ostree_static_delta_part_validate (OstreeRepo     *repo,
+                                    GFile          *part_path,
+                                    guint           part_offset,
+                                    const char     *expected_checksum,
+                                    GCancellable   *cancellable,
+                                    GError        **error)
+{
+  gboolean ret = FALSE;
+  gs_unref_object GInputStream *tmp_in = NULL;
+  gs_free guchar *actual_checksum_bytes = NULL;
+  gs_free gchar *actual_checksum = NULL;
+  
+  tmp_in = (GInputStream*)g_file_read (part_path, cancellable, error);
+  if (!tmp_in)
+    goto out;
+  
+  if (!ot_gio_checksum_stream (tmp_in, &actual_checksum_bytes,
+                               cancellable, error))
+    goto out;
+
+  actual_checksum = ostree_checksum_from_bytes (actual_checksum_bytes);
+  if (strcmp (actual_checksum, expected_checksum) != 0)
+    {
+      g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED,
+                   "Checksum mismatch in static delta part %u; expected=%s actual=%s",
+                   part_offset, expected_checksum, actual_checksum);
+      goto out;
+    }
+  
+  ret = TRUE;
+ out:
+  return ret;
+}
 
 gboolean
-_ostree_static_delta_part_execute (OstreeRepo      *repo,
-                                   GVariant        *objects,
-                                   GVariant        *part,
-                                   GCancellable    *cancellable,
-                                   GError         **error)
+_ostree_static_delta_part_execute_raw (OstreeRepo      *repo,
+                                       GVariant        *objects,
+                                       GVariant        *part,
+                                       GCancellable    *cancellable,
+                                       GError         **error)
 {
   gboolean ret = FALSE;
   guint8 *checksums_data;
@@ -172,6 +206,167 @@ _ostree_static_delta_part_execute (OstreeRepo      *repo,
 }
 
 static gboolean
+zlib_uncompress_data (GBytes       *data,
+                      GBytes      **out_uncompressed,
+                      GCancellable *cancellable,
+                      GError      **error)
+{
+  gboolean ret = FALSE;
+  gs_unref_object GMemoryInputStream *memin = (GMemoryInputStream*)g_memory_input_stream_new_from_bytes 
(data);
+  gs_unref_object GMemoryOutputStream *memout = (GMemoryOutputStream*)g_memory_output_stream_new (NULL, 0, 
g_realloc, g_free);
+  gs_unref_object GConverter *zlib_decomp =
+    (GConverter*) g_zlib_decompressor_new (G_ZLIB_COMPRESSOR_FORMAT_RAW);
+  gs_unref_object GInputStream *convin = g_converter_input_stream_new ((GInputStream*)memin, zlib_decomp);
+
+  if (0 > g_output_stream_splice ((GOutputStream*)memout, convin,
+                                  G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE |
+                                  G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET,
+                                  cancellable, error))
+    goto out;
+
+  ret = TRUE;
+  *out_uncompressed = g_memory_output_stream_steal_as_bytes (memout);
+ out:
+  return ret;
+}
+
+gboolean
+_ostree_static_delta_part_execute (OstreeRepo      *repo,
+                                   GVariant        *header,
+                                   GBytes          *part_bytes,
+                                   GCancellable    *cancellable,
+                                   GError         **error)
+{
+  gboolean ret = FALSE;
+  gsize partlen;
+  const guint8*partdata;
+  gs_unref_variant GVariant *part = NULL;
+  gs_unref_bytes GBytes *part_payload_bytes = NULL;
+  gs_unref_bytes GBytes *payload_data = NULL;
+  gs_unref_variant GVariant *payload = NULL;
+  guint8 comptype;
+
+  partdata = g_bytes_get_data (part_bytes, &partlen);
+  
+  if (partlen < 1)
+    {
+      g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED,
+                   "Corrupted 0 length delta part");
+      goto out;
+    }
+        
+  /* First byte is compression type */
+  comptype = partdata[0];
+  /* Then the rest may be compressed or uncompressed */
+  part_payload_bytes = g_bytes_new_from_bytes (part_bytes, 1, partlen - 1);
+  switch (comptype)
+    {
+    case 0:
+      /* No compression */
+      payload_data = g_bytes_ref (part_payload_bytes);
+      break;
+    case 'g':
+      {
+        if (!zlib_uncompress_data (part_payload_bytes, &payload_data,
+                                   cancellable, error))
+          goto out;
+      }
+      break;
+    default:
+      g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED,
+                   "Invalid compression type '%u'", comptype);
+      goto out;
+    }
+        
+  payload = ot_variant_new_from_bytes (G_VARIANT_TYPE (OSTREE_STATIC_DELTA_PART_PAYLOAD_FORMAT),
+                                       payload_data, FALSE);
+  if (!_ostree_static_delta_part_execute_raw (repo, header, part,
+                                              cancellable, error))
+    goto out;
+  
+  ret = TRUE;
+ out:
+  return ret;
+}
+
+typedef struct {
+  OstreeRepo *repo;
+  GVariant *header;
+  GBytes *partdata;
+  GCancellable *cancellable;
+  GAsyncResult *result;
+} StaticDeltaPartExecuteAsyncData;
+
+static void
+static_delta_part_execute_async_data_free (gpointer user_data)
+{
+  StaticDeltaPartExecuteAsyncData *data = user_data;
+
+  g_clear_object (&data->repo);
+  g_variant_unref (data->header);
+  g_bytes_unref (data->partdata);
+  g_clear_object (&data->cancellable);
+  g_free (data);
+}
+
+static void
+static_delta_part_execute_thread (GSimpleAsyncResult  *res,
+                                  GObject             *object,
+                                  GCancellable        *cancellable)
+{
+  GError *error = NULL;
+  StaticDeltaPartExecuteAsyncData *data;
+
+  data = g_simple_async_result_get_op_res_gpointer (res);
+  if (!_ostree_static_delta_part_execute (data->repo,
+                                          data->header,
+                                          data->partdata,
+                                          cancellable, &error))
+    g_simple_async_result_take_error (res, error);
+}
+
+void
+_ostree_static_delta_part_execute_async (OstreeRepo      *repo,
+                                         GVariant        *header,
+                                         GBytes          *partdata,
+                                         GCancellable    *cancellable,
+                                         GAsyncReadyCallback  callback,
+                                         gpointer         user_data)
+{
+  StaticDeltaPartExecuteAsyncData *asyncdata;
+
+  asyncdata = g_new0 (StaticDeltaPartExecuteAsyncData, 1);
+  asyncdata->repo = g_object_ref (repo);
+  asyncdata->header = g_variant_ref (header);
+  asyncdata->partdata = g_bytes_ref (partdata);
+  asyncdata->cancellable = cancellable ? g_object_ref (cancellable) : NULL;
+
+  asyncdata->result = g_simple_async_result_new ((GObject*) self,
+                                                 callback, user_data,
+                                                 _ostree_static_delta_part_execute_async);
+
+  g_simple_async_result_set_op_res_gpointer (asyncdata->result, asyncdata,
+                                             static_delta_part_execute_async_data_free);
+  g_simple_async_result_run_in_thread (asyncdata->result, static_delta_part_execute_thread, 
G_PRIORITY_DEFAULT, cancellable);
+  g_object_unref (asyncdata->result);
+}
+
+gboolean
+_ostree_static_delta_part_execute_finish (OstreeRepo      *repo,
+                                          GAsyncResult    *result,
+                                          GError         **error) 
+{
+  GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
+  StaticDeltaPartExecuteAsyncData *asyncdata;
+
+  g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == _ostree_static_delta_part_execute_async);
+
+  if (g_simple_async_result_propagate_error (simple, error))
+    return FALSE;
+  return TRUE;
+}
+
+static gboolean
 dispatch_fetch (OstreeRepo    *repo,   
                 StaticDeltaExecutionState  *state,
                 GCancellable  *cancellable,  


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]