[ostree] pull: Stage content asynchronously
- From: Colin Walters <walters src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [ostree] pull: Stage content asynchronously
- Date: Fri, 5 Oct 2012 00:03:08 +0000 (UTC)
commit 9618232f4da325692dcf98fd6ff5b8abd9fce66c
Author: Colin Walters <walters verbum org>
Date: Thu Oct 4 20:00:00 2012 -0400
pull: Stage content asynchronously
For similar reasons as metadata, this avoids having the main thread
blocked in fdatasync(), and even better - we can achieve much higher
parallelism if we have multiple threads blocked on fdatasync().
src/libostree/ostree-repo.c | 95 +++++++++++++++++++++++++++++++++
src/libostree/ostree-repo.h | 15 +++++-
src/ostree/ostree-fetcher.c | 9 +++
src/ostree/ostree-fetcher.h | 2 +
src/ostree/ostree-pull.c | 121 ++++++++++++++++---------------------------
5 files changed, 164 insertions(+), 78 deletions(-)
---
diff --git a/src/libostree/ostree-repo.c b/src/libostree/ostree-repo.c
index 8840887..ac3a3dc 100644
--- a/src/libostree/ostree-repo.c
+++ b/src/libostree/ostree-repo.c
@@ -1558,6 +1558,101 @@ ostree_repo_stage_content (OstreeRepo *self,
cancellable, error);
}
+typedef struct {
+ OstreeRepo *repo;
+ char *expected_checksum;
+ GInputStream *object;
+ guint64 file_object_length;
+ GCancellable *cancellable;
+ GSimpleAsyncResult *result;
+
+ guchar *result_csum;
+} StageContentAsyncData;
+
+static void
+stage_content_async_data_free (gpointer user_data)
+{
+ StageContentAsyncData *data = user_data;
+
+ g_clear_object (&data->repo);
+ g_clear_object (&data->cancellable);
+ g_clear_object (&data->object);
+ g_free (data->result_csum);
+ g_free (data->expected_checksum);
+ g_free (data);
+}
+
+static void
+stage_content_thread (GSimpleAsyncResult *res,
+ GObject *object,
+ GCancellable *cancellable)
+{
+ GError *error = NULL;
+ StageContentAsyncData *data;
+
+ data = g_simple_async_result_get_op_res_gpointer (res);
+ if (!ostree_repo_stage_content (data->repo, data->expected_checksum,
+ data->object, data->file_object_length,
+ &data->result_csum,
+ cancellable, &error))
+ g_simple_async_result_take_error (res, error);
+}
+
+/**
+ * ostree_repo_stage_content_async:
+ *
+ * Asynchronously store the content object @object. If provided,
+ * the checksum @expected_checksum will be verified.
+ */
+void
+ostree_repo_stage_content_async (OstreeRepo *self,
+ const char *expected_checksum,
+ GInputStream *object,
+ guint64 file_object_length,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer user_data)
+{
+ StageContentAsyncData *asyncdata;
+
+ asyncdata = g_new0 (StageContentAsyncData, 1);
+ asyncdata->repo = g_object_ref (self);
+ asyncdata->expected_checksum = g_strdup (expected_checksum);
+ asyncdata->object = g_object_ref (object);
+ asyncdata->file_object_length = file_object_length;
+ asyncdata->cancellable = cancellable ? g_object_ref (cancellable) : NULL;
+
+ asyncdata->result = g_simple_async_result_new ((GObject*) self,
+ callback, user_data,
+ ostree_repo_stage_content_async);
+
+ g_simple_async_result_set_op_res_gpointer (asyncdata->result, asyncdata,
+ stage_content_async_data_free);
+ g_simple_async_result_run_in_thread (asyncdata->result, stage_content_thread, G_PRIORITY_DEFAULT, cancellable);
+ g_object_unref (asyncdata->result);
+}
+
+gboolean
+ostree_repo_stage_content_finish (OstreeRepo *self,
+ GAsyncResult *result,
+ guchar **out_csum,
+ GError **error)
+{
+ GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
+ StageContentAsyncData *data;
+
+ g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == ostree_repo_stage_content_async);
+
+ if (g_simple_async_result_propagate_error (simple, error))
+ return FALSE;
+
+ data = g_simple_async_result_get_op_res_gpointer (simple);
+ /* Transfer ownership */
+ *out_csum = data->result_csum;
+ data->result_csum = NULL;
+ return TRUE;
+}
+
static GVariant *
create_empty_gvariant_dict (void)
{
diff --git a/src/libostree/ostree-repo.h b/src/libostree/ostree-repo.h
index 62f51d3..2f7ac7a 100644
--- a/src/libostree/ostree-repo.h
+++ b/src/libostree/ostree-repo.h
@@ -113,7 +113,7 @@ void ostree_repo_stage_metadata_async (OstreeRepo *self,
gboolean ostree_repo_stage_metadata_finish (OstreeRepo *self,
GAsyncResult *result,
- guchar **out_checksum,
+ guchar **out_csum,
GError **error);
gboolean ostree_repo_stage_content (OstreeRepo *self,
@@ -138,6 +138,19 @@ gboolean ostree_repo_stage_content_trusted (OstreeRepo *self,
GCancellable *cancellable,
GError **error);
+void ostree_repo_stage_content_async (OstreeRepo *self,
+ const char *expected_checksum,
+ GInputStream *object,
+ guint64 file_object_length,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer user_data);
+
+gboolean ostree_repo_stage_content_finish (OstreeRepo *self,
+ GAsyncResult *result,
+ guchar **out_csum,
+ GError **error);
+
gboolean ostree_repo_resolve_rev (OstreeRepo *self,
const char *rev,
gboolean allow_noent,
diff --git a/src/ostree/ostree-fetcher.c b/src/ostree/ostree-fetcher.c
index bf6421a..6d4b5db 100644
--- a/src/ostree/ostree-fetcher.c
+++ b/src/ostree/ostree-fetcher.c
@@ -82,6 +82,7 @@ struct OstreeFetcher
GHashTable *message_to_request; /* SoupMessage -> SoupRequest */
guint64 total_downloaded;
+ guint total_requests;
};
G_DEFINE_TYPE (OstreeFetcher, ostree_fetcher, G_TYPE_OBJECT)
@@ -243,6 +244,8 @@ ostree_fetcher_request_uri_async (OstreeFetcher *self,
OstreeFetcherPendingURI *pending;
GError *local_error = NULL;
+ self->total_requests++;
+
pending = g_new0 (OstreeFetcherPendingURI, 1);
pending->refcount = 1;
pending->self = g_object_ref (self);
@@ -352,3 +355,9 @@ ostree_fetcher_bytes_transferred (OstreeFetcher *self)
{
return self->total_downloaded;
}
+
+guint
+ostree_fetcher_get_n_requests (OstreeFetcher *self)
+{
+ return self->total_requests;
+}
diff --git a/src/ostree/ostree-fetcher.h b/src/ostree/ostree-fetcher.h
index 803db72..4f6897e 100644
--- a/src/ostree/ostree-fetcher.h
+++ b/src/ostree/ostree-fetcher.h
@@ -51,6 +51,8 @@ char * ostree_fetcher_query_state_text (OstreeFetcher *self);
guint64 ostree_fetcher_bytes_transferred (OstreeFetcher *self);
+guint ostree_fetcher_get_n_requests (OstreeFetcher *self);
+
void ostree_fetcher_request_uri_async (OstreeFetcher *self,
SoupURI *uri,
GCancellable *cancellable,
diff --git a/src/ostree/ostree-pull.c b/src/ostree/ostree-pull.c
index b189c0b..ccbb003 100644
--- a/src/ostree/ostree-pull.c
+++ b/src/ostree/ostree-pull.c
@@ -109,7 +109,9 @@ typedef struct {
guint n_fetched_content;
guint outstanding_filemeta_requests;
guint outstanding_filecontent_requests;
- guint outstanding_checksum_requests;
+ guint outstanding_content_stage_requests;
+
+ guint64 previous_total_downloaded;
GError **async_error;
gboolean caught_error;
@@ -185,18 +187,31 @@ uri_fetch_update_status (gpointer user_data)
OtPullData *pull_data = user_data;
ot_lfree char *fetcher_status;
GString *status;
+ guint64 current_bytes_transferred;
+ guint64 delta_bytes_transferred;
status = g_string_new ("");
- g_string_append_printf (status, "%u/%u metadata %u/%u content fetched; ",
+ if (pull_data->metadata_scan_active)
+ g_string_append_printf (status, "scan: %u metadata; ",
+ g_atomic_int_get (&pull_data->n_scanned_metadata));
+
+ g_string_append_printf (status, "fetch: %u/%u metadata %u/%u content; ",
g_atomic_int_get (&pull_data->n_fetched_metadata),
g_atomic_int_get (&pull_data->n_requested_metadata),
pull_data->n_fetched_content,
g_atomic_int_get (&pull_data->n_requested_content));
- if (pull_data->outstanding_checksum_requests > 0)
- g_string_append_printf (status, "Calculating %u checksums; ",
- pull_data->outstanding_checksum_requests);
+ current_bytes_transferred = ostree_fetcher_bytes_transferred (pull_data->fetcher);
+ delta_bytes_transferred = current_bytes_transferred - pull_data->previous_total_downloaded;
+ pull_data->previous_total_downloaded = current_bytes_transferred;
+
+ if (delta_bytes_transferred < 1024)
+ g_string_append_printf (status, "%u B/s; ",
+ (guint)delta_bytes_transferred);
+ else
+ g_string_append_printf (status, "%.1f KiB/s; ",
+ (double)delta_bytes_transferred / 1024);
fetcher_status = ostree_fetcher_query_state_text (pull_data->fetcher);
g_string_append (status, fetcher_status);
@@ -245,7 +260,7 @@ check_outstanding_requests_handle_error (OtPullData *pull_data,
pull_data->outstanding_uri_requests == 0 &&
pull_data->outstanding_filemeta_requests == 0 &&
pull_data->outstanding_filecontent_requests == 0 &&
- pull_data->outstanding_checksum_requests == 0)
+ pull_data->outstanding_content_stage_requests == 0)
g_main_loop_quit (pull_data->loop);
throw_async_error (pull_data, error);
}
@@ -507,79 +522,27 @@ destroy_fetch_one_content_item_data (OtFetchOneContentItemData *data)
}
static void
-content_fetch_on_checksum_complete (GObject *object,
- GAsyncResult *result,
- gpointer user_data)
+content_fetch_on_stage_complete (GObject *object,
+ GAsyncResult *result,
+ gpointer user_data)
{
OtFetchOneContentItemData *data = user_data;
GError *local_error = NULL;
GError **error = &local_error;
- guint64 length;
- GCancellable *cancellable = NULL;
- gboolean compressed;
- ot_lfree guchar *csum;
- ot_lvariant GVariant *file_meta = NULL;
- ot_lobj GFileInfo *file_info = NULL;
- ot_lvariant GVariant *xattrs = NULL;
- ot_lobj GInputStream *content_input = NULL;
- ot_lobj GInputStream *file_object_input = NULL;
- ot_lfree char *checksum;
+ ot_lfree guchar *csum = NULL;
+ ot_lfree char *checksum = NULL;
- csum = ot_gio_checksum_stream_finish ((GInputStream*)object, result, error);
- if (!csum)
+ if (!ostree_repo_stage_content_finish ((OstreeRepo*)object, result,
+ &csum, error))
goto out;
checksum = ostree_checksum_from_bytes (csum);
- if (strcmp (checksum, data->checksum) != 0)
- {
- g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED,
- "Corrupted object %s (actual checksum is %s)",
- data->checksum, checksum);
- goto out;
- }
-
- compressed = data->pull_data->remote_mode == OSTREE_REPO_MODE_ARCHIVE_Z;
-
- if (compressed)
- {
- content_input = (GInputStream*)g_file_read (data->content_path, cancellable, error);
- if (!content_input)
- goto out;
- if (!ostree_zlib_content_stream_open (content_input, &length, &file_object_input,
- cancellable, error))
- goto out;
- }
- else
- {
- if (!ot_util_variant_map (data->meta_path, OSTREE_FILE_HEADER_GVARIANT_FORMAT, TRUE,
- &file_meta, error))
- goto out;
-
- if (!ostree_file_header_parse (file_meta, &file_info, &xattrs, error))
- goto out;
-
- if (data->content_path)
- {
- content_input = (GInputStream*)g_file_read (data->content_path, cancellable, error);
- if (!content_input)
- goto out;
- }
-
- if (!ostree_raw_file_to_content_stream (content_input, file_info, xattrs,
- &file_object_input, &length,
- cancellable, error))
- goto out;
- }
-
- if (!ostree_repo_stage_content_trusted (data->pull_data->repo, checksum,
- file_object_input, length,
- cancellable, error))
- goto out;
+ g_assert (strcmp (checksum, data->checksum) == 0);
data->pull_data->n_fetched_content++;
out:
- data->pull_data->outstanding_checksum_requests--;
+ data->pull_data->outstanding_content_stage_requests--;
check_outstanding_requests_handle_error (data->pull_data, local_error);
destroy_fetch_one_content_item_data (data);
}
@@ -623,6 +586,7 @@ content_fetch_on_complete (GObject *object,
GCancellable *cancellable = NULL;
gboolean was_content_fetch = FALSE;
gboolean need_content_fetch = FALSE;
+ guint64 length;
ot_lvariant GVariant *file_meta = NULL;
ot_lobj GFileInfo *file_info = NULL;
ot_lobj GInputStream *content_input = NULL;
@@ -674,20 +638,21 @@ content_fetch_on_complete (GObject *object,
if (!need_content_fetch && compressed)
{
ot_lobj GInputStream *uncomp_input = NULL;
- guint64 uncompressed_len;
g_assert (data->content_path != NULL);
content_input = (GInputStream*)g_file_read (data->content_path, cancellable, error);
if (!content_input)
goto out;
- if (!ostree_zlib_content_stream_open (content_input, &uncompressed_len, &uncomp_input,
+ if (!ostree_zlib_content_stream_open (content_input, &length, &uncomp_input,
cancellable, error))
goto out;
-
- data->pull_data->outstanding_checksum_requests++;
- ot_gio_checksum_stream_async (uncomp_input, G_PRIORITY_DEFAULT, NULL,
- content_fetch_on_checksum_complete, data);
+
+ data->pull_data->outstanding_content_stage_requests++;
+ ostree_repo_stage_content_async (data->pull_data->repo, data->checksum,
+ uncomp_input, length,
+ cancellable,
+ content_fetch_on_stage_complete, data);
}
else if (!need_content_fetch)
{
@@ -709,13 +674,15 @@ content_fetch_on_complete (GObject *object,
}
if (!ostree_raw_file_to_content_stream (content_input, file_info, xattrs,
- &file_object_input, NULL,
+ &file_object_input, &length,
cancellable, error))
goto out;
- data->pull_data->outstanding_checksum_requests++;
- ot_gio_checksum_stream_async (file_object_input, G_PRIORITY_DEFAULT, NULL,
- content_fetch_on_checksum_complete, data);
+ data->pull_data->outstanding_content_stage_requests++;
+ ostree_repo_stage_content_async (data->pull_data->repo, data->checksum,
+ file_object_input, length,
+ cancellable,
+ content_fetch_on_stage_complete, data);
}
while (data->pull_data->outstanding_filemeta_requests < 10)
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]