[ostree] pull: Download and checksum asynchronously
- From: Colin Walters <walters src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [ostree] pull: Download and checksum asynchronously
- Date: Wed, 6 Jun 2012 02:40:15 +0000 (UTC)
commit e485bace0189625c5ffc422c4c8b5987a6de5e79
Author: Colin Walters <walters verbum org>
Date: Sun May 20 16:21:57 2012 -0400
pull: Download and checksum asynchronously
This is quite a noticeable speedup when downloading loose objects.
Makefile-ostree.am | 2 +
src/libotutil/ot-gio-utils.c | 44 ++++
src/libotutil/ot-gio-utils.h | 10 +
src/ostree/ostree-fetcher.c | 313 ++++++++++++++++++++++++++
src/ostree/ostree-fetcher.h | 64 ++++++
src/ostree/ostree-pull.c | 506 +++++++++++++++++++++++++++++++-----------
6 files changed, 804 insertions(+), 135 deletions(-)
---
diff --git a/Makefile-ostree.am b/Makefile-ostree.am
index 1f7ab0e..8024974 100644
--- a/Makefile-ostree.am
+++ b/Makefile-ostree.am
@@ -53,6 +53,8 @@ if USE_LIBSOUP_GNOME
bin_PROGRAMS += ostree-pull
ostree_pull_SOURCES = src/ostree/ot-main.h \
src/ostree/ot-main.c \
+ src/ostree/ostree-fetcher.h \
+ src/ostree/ostree-fetcher.c \
src/ostree/ostree-pull.c
ostree_pull_CFLAGS = $(ostree_bin_shared_cflags) $(OT_DEP_SOUP_CFLAGS)
diff --git a/src/libotutil/ot-gio-utils.c b/src/libotutil/ot-gio-utils.c
index cf1e0cd..b27507d 100644
--- a/src/libotutil/ot-gio-utils.c
+++ b/src/libotutil/ot-gio-utils.c
@@ -358,6 +358,50 @@ ot_gio_checksum_stream (GInputStream *in,
return ot_gio_splice_get_checksum (NULL, in, out_csum, cancellable, error);
}
+static void
+checksum_stream_thread (GSimpleAsyncResult *result,
+ GObject *object,
+ GCancellable *cancellable)
+{
+ GError *error = NULL;
+ guchar *csum;
+
+ if (!ot_gio_checksum_stream ((GInputStream*)object, &csum,
+ cancellable, &error))
+ g_simple_async_result_take_error (result, error);
+ else
+ g_simple_async_result_set_op_res_gpointer (result, csum, g_free);
+}
+
+void
+ot_gio_checksum_stream_async (GInputStream *in,
+ int io_priority,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer user_data)
+{
+ GSimpleAsyncResult *result;
+
+ result = g_simple_async_result_new ((GObject*) in,
+ callback, user_data,
+ ot_gio_checksum_stream_async);
+
+ g_simple_async_result_run_in_thread (result, checksum_stream_thread, io_priority, cancellable);
+ g_object_unref (result);
+}
+
+guchar *
+ot_gio_checksum_stream_finish (GInputStream *in,
+ GAsyncResult *result,
+ GError **error)
+{
+ GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
+
+ g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == ot_gio_checksum_stream_async);
+ return g_memdup (g_simple_async_result_get_op_res_gpointer (simple), 32);
+
+}
+
gboolean
ot_gfile_merge_dirs (GFile *destination,
GFile *src,
diff --git a/src/libotutil/ot-gio-utils.h b/src/libotutil/ot-gio-utils.h
index 16b2f12..9299a44 100644
--- a/src/libotutil/ot-gio-utils.h
+++ b/src/libotutil/ot-gio-utils.h
@@ -88,6 +88,16 @@ gboolean ot_gio_checksum_stream (GInputStream *in,
GCancellable *cancellable,
GError **error);
+void ot_gio_checksum_stream_async (GInputStream *in,
+ int io_priority,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer user_data);
+
+guchar * ot_gio_checksum_stream_finish (GInputStream *in,
+ GAsyncResult *result,
+ GError **error);
+
gboolean ot_gfile_merge_dirs (GFile *destination,
GFile *src,
GCancellable *cancellable,
diff --git a/src/ostree/ostree-fetcher.c b/src/ostree/ostree-fetcher.c
new file mode 100644
index 0000000..cd42261
--- /dev/null
+++ b/src/ostree/ostree-fetcher.c
@@ -0,0 +1,313 @@
+/* -*- mode: C; c-file-style: "gnu"; indent-tabs-mode: nil; -*-
+ *
+ * Copyright (C) 2011 Colin Walters <walters verbum org>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ *
+ * Author: Colin Walters <walters verbum org>
+ */
+
+#include "config.h"
+
+#include "ostree-fetcher.h"
+#include "ostree.h"
+
+typedef enum {
+ OSTREE_FETCHER_STATE_PENDING,
+ OSTREE_FETCHER_STATE_DOWNLOADING,
+ OSTREE_FETCHER_STATE_COMPLETE
+} OstreeFetcherState;
+
+typedef struct {
+ OstreeFetcher *self;
+ SoupURI *uri;
+
+ OstreeFetcherState state;
+
+ SoupRequest *request;
+
+ GFile *tmpfile;
+ GInputStream *request_body;
+ GOutputStream *out_stream;
+
+ guint64 content_length;
+
+ GCancellable *cancellable;
+ GSimpleAsyncResult *result;
+} OstreeFetcherPendingURI;
+
+static void
+pending_uri_free (OstreeFetcherPendingURI *pending)
+{
+ g_clear_object (&pending->self);
+ g_clear_object (&pending->tmpfile);
+ g_clear_object (&pending->request);
+ g_clear_object (&pending->request_body);
+ g_clear_object (&pending->out_stream);
+ g_clear_object (&pending->cancellable);
+ g_free (pending);
+}
+
+struct OstreeFetcher
+{
+ GObject parent_instance;
+
+ GFile *tmpdir;
+
+ SoupSession *session;
+ SoupRequester *requester;
+
+ SoupMessage *sending_message;
+
+ GHashTable *message_to_request;
+
+ guint64 total_downloaded;
+};
+
+G_DEFINE_TYPE (OstreeFetcher, ostree_fetcher, G_TYPE_OBJECT)
+
+static void
+ostree_fetcher_finalize (GObject *object)
+{
+ OstreeFetcher *self;
+
+ self = OSTREE_FETCHER (object);
+
+ g_clear_object (&self->session);
+
+ G_OBJECT_CLASS (ostree_fetcher_parent_class)->finalize (object);
+}
+
+static void
+ostree_fetcher_class_init (OstreeFetcherClass *klass)
+{
+ GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
+
+ gobject_class->finalize = ostree_fetcher_finalize;
+}
+
+static void
+on_request_started (SoupSession *session,
+ SoupMessage *msg,
+ SoupSocket *socket,
+ gpointer user_data)
+{
+ OstreeFetcher *self = user_data;
+ self->sending_message = msg;
+}
+
+static void
+on_request_unqueued (SoupSession *session,
+ SoupMessage *msg,
+ gpointer user_data)
+{
+ OstreeFetcher *self = user_data;
+ if (msg == self->sending_message)
+ {
+ self->sending_message = NULL;
+ g_hash_table_remove (self->message_to_request, msg);
+ }
+}
+
+static void
+ostree_fetcher_init (OstreeFetcher *self)
+{
+ self->session = soup_session_async_new_with_options (SOUP_SESSION_USER_AGENT, "ostree ",
+ SOUP_SESSION_USE_THREAD_CONTEXT, TRUE,
+ SOUP_SESSION_ADD_FEATURE_BY_TYPE, SOUP_TYPE_REQUESTER,
+ NULL);
+ self->requester = (SoupRequester *)soup_session_get_feature (self->session, SOUP_TYPE_REQUESTER);
+
+ g_signal_connect (self->session, "request-started",
+ G_CALLBACK (on_request_started), self);
+ g_signal_connect (self->session, "request-unqueued",
+ G_CALLBACK (on_request_unqueued), self);
+
+ self->message_to_request = g_hash_table_new_full (NULL, NULL, (GDestroyNotify)g_object_unref, NULL);
+}
+
+OstreeFetcher *
+ostree_fetcher_new (GFile *tmpdir)
+{
+ OstreeFetcher *self = (OstreeFetcher*)g_object_new (OSTREE_TYPE_FETCHER, NULL);
+
+ self->tmpdir = g_object_ref (tmpdir);
+
+ return self;
+}
+
+static void
+on_splice_complete (GObject *object,
+ GAsyncResult *result,
+ gpointer user_data)
+{
+ OstreeFetcherPendingURI *pending = user_data;
+ ot_lobj GFileInfo *file_info = NULL;
+
+ pending->state = OSTREE_FETCHER_STATE_COMPLETE;
+ file_info = g_file_query_info (pending->tmpfile, OSTREE_GIO_FAST_QUERYINFO,
+ G_FILE_QUERY_INFO_NOFOLLOW_SYMLINKS,
+ NULL, NULL);
+ if (file_info)
+ pending->self->total_downloaded += g_file_info_get_size (file_info);
+
+ (void) g_input_stream_close (pending->request_body, NULL, NULL);
+
+ g_simple_async_result_complete (pending->result);
+ g_object_unref (pending->result);
+}
+
+static void
+on_request_sent (GObject *object,
+ GAsyncResult *result,
+ gpointer user_data)
+{
+ OstreeFetcherPendingURI *pending = user_data;
+ GError *local_error = NULL;
+
+ pending->request_body = soup_request_send_finish ((SoupRequest*) object,
+ result, &local_error);
+ if (!pending->request_body)
+ {
+ pending->state = OSTREE_FETCHER_STATE_COMPLETE;
+ g_simple_async_result_take_error (pending->result, local_error);
+ g_simple_async_result_complete (pending->result);
+ }
+ else
+ {
+ GOutputStreamSpliceFlags flags = G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET;
+
+ pending->state = OSTREE_FETCHER_STATE_DOWNLOADING;
+
+ pending->content_length = soup_request_get_content_length (pending->request);
+
+ /* TODO - make this async */
+ if (!ostree_create_temp_regular_file (pending->self->tmpdir,
+ NULL, NULL,
+ &pending->tmpfile,
+ &pending->out_stream,
+ NULL, &local_error))
+ {
+ g_simple_async_result_take_error (pending->result, local_error);
+ g_simple_async_result_complete (pending->result);
+ return;
+ }
+
+ g_output_stream_splice_async (pending->out_stream, pending->request_body, flags, G_PRIORITY_DEFAULT,
+ pending->cancellable, on_splice_complete, pending);
+ }
+}
+
+void
+ostree_fetcher_request_uri_async (OstreeFetcher *self,
+ SoupURI *uri,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer user_data)
+{
+ OstreeFetcherPendingURI *pending;
+ GError *local_error = NULL;
+
+ pending = g_new0 (OstreeFetcherPendingURI, 1);
+ pending->self = g_object_ref (self);
+ pending->uri = soup_uri_copy (uri);
+ pending->cancellable = cancellable ? g_object_ref (cancellable) : NULL;
+ pending->request = soup_requester_request_uri (self->requester, uri, &local_error);
+ g_assert_no_error (local_error);
+
+ g_hash_table_insert (self->message_to_request,
+ soup_request_http_get_message ((SoupRequestHTTP*)pending->request),
+ pending);
+
+ pending->result = g_simple_async_result_new ((GObject*) self,
+ callback, user_data,
+ ostree_fetcher_request_uri_async);
+ g_simple_async_result_set_op_res_gpointer (pending->result, pending,
+ (GDestroyNotify) pending_uri_free);
+
+ soup_request_send_async (pending->request, cancellable,
+ on_request_sent, pending);
+
+}
+
+GFile *
+ostree_fetcher_request_uri_finish (OstreeFetcher *self,
+ GAsyncResult *result,
+ GError **error)
+{
+ GSimpleAsyncResult *simple;
+ OstreeFetcherPendingURI *pending;
+
+ g_return_val_if_fail (g_simple_async_result_is_valid (result, (GObject*)self, ostree_fetcher_request_uri_async), FALSE);
+
+ simple = G_SIMPLE_ASYNC_RESULT (result);
+ if (g_simple_async_result_propagate_error (simple, error))
+ return NULL;
+ pending = g_simple_async_result_get_op_res_gpointer (simple);
+
+ return g_object_ref (pending->tmpfile);
+}
+
+static char *
+format_size_pair (guint64 start,
+ guint64 max)
+{
+ if (max < 1024)
+ return g_strdup_printf ("%lu/%lu bytes",
+ (gulong) start,
+ (gulong) max);
+ else
+ return g_strdup_printf ("%.1f/%.1f KiB", ((double) start) / 1024,
+ ((double) max) / 1024);
+}
+
+char *
+ostree_fetcher_query_state_text (OstreeFetcher *self)
+{
+ OstreeFetcherPendingURI *active;
+
+ if (self->sending_message)
+ active = g_hash_table_lookup (self->message_to_request, self->sending_message);
+ else
+ active = NULL;
+ if (active)
+ {
+ ot_lfree char *active_uri = soup_uri_to_string (active->uri, TRUE);
+
+ if (active->tmpfile)
+ {
+ ot_lobj GFileInfo *file_info = NULL;
+
+ file_info = g_file_query_info (active->tmpfile, OSTREE_GIO_FAST_QUERYINFO,
+ G_FILE_QUERY_INFO_NOFOLLOW_SYMLINKS,
+ NULL, NULL);
+ if (file_info)
+ {
+ ot_lfree char *size = format_size_pair (g_file_info_get_size (file_info),
+ active->content_length);
+ return g_strdup_printf ("Downloading %s [ %s, %.1f KiB downloaded ]",
+ active_uri, size, ((double)self->total_downloaded) / 1024);
+ }
+ }
+ else
+ {
+ return g_strdup_printf ("Requesting %s [ %.1f KiB downloaded ]",
+ active_uri, ((double)self->total_downloaded) / 1024);
+ }
+ }
+
+ return g_strdup_printf ("Idle [ %.1f KiB downloaded ]", ((double)self->total_downloaded) / 1024);
+}
diff --git a/src/ostree/ostree-fetcher.h b/src/ostree/ostree-fetcher.h
new file mode 100644
index 0000000..d32ea6f
--- /dev/null
+++ b/src/ostree/ostree-fetcher.h
@@ -0,0 +1,64 @@
+/* -*- mode: C; c-file-style: "gnu"; indent-tabs-mode: nil; -*-
+ *
+ * Copyright (C) 2012 Colin Walters <walters verbum org>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#ifndef _OSTREE_FETCHER
+#define _OSTREE_FETCHER
+
+#define LIBSOUP_USE_UNSTABLE_REQUEST_API
+#include <libsoup/soup.h>
+#include <libsoup/soup-requester.h>
+#include <libsoup/soup-request-http.h>
+
+G_BEGIN_DECLS
+
+#define OSTREE_TYPE_FETCHER (ostree_fetcher_get_type ())
+#define OSTREE_FETCHER(o) (G_TYPE_CHECK_INSTANCE_CAST ((o), OSTREE_TYPE_FETCHER, OstreeFetcher))
+#define OSTREE_FETCHER_CLASS(k) (G_TYPE_CHECK_CLASS_CAST((k), OSTREE_TYPE_FETCHER, OstreeFetcherClass))
+#define OSTREE_IS_FETCHER(o) (G_TYPE_CHECK_INSTANCE_TYPE ((o), OSTREE_TYPE_FETCHER))
+#define OSTREE_IS_FETCHER_CLASS(k) (G_TYPE_CHECK_CLASS_TYPE ((k), OSTREE_TYPE_FETCHER))
+#define OSTREE_FETCHER_GET_CLASS(o) (G_TYPE_INSTANCE_GET_CLASS ((o), OSTREE_TYPE_FETCHER, OstreeFetcherClass))
+
+typedef struct OstreeFetcherClass OstreeFetcherClass;
+typedef struct OstreeFetcher OstreeFetcher;
+
+struct OstreeFetcherClass
+{
+ GObjectClass parent_class;
+};
+
+GType ostree_fetcher_get_type (void) G_GNUC_CONST;
+
+OstreeFetcher *ostree_fetcher_new (GFile *tmpdir);
+
+char * ostree_fetcher_query_state_text (OstreeFetcher *self);
+
+void ostree_fetcher_request_uri_async (OstreeFetcher *self,
+ SoupURI *uri,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer user_data);
+
+GFile *ostree_fetcher_request_uri_finish (OstreeFetcher *self,
+ GAsyncResult *result,
+ GError **error);
+
+G_END_DECLS
+
+#endif
diff --git a/src/ostree/ostree-pull.c b/src/ostree/ostree-pull.c
index f8ba1d8..a7be92d 100644
--- a/src/ostree/ostree-pull.c
+++ b/src/ostree/ostree-pull.c
@@ -20,13 +20,49 @@
* Author: Colin Walters <walters verbum org>
*/
+/**
+ * DESIGN:
+ *
+ * Pull refs
+ * For each ref:
+ * Pull commit
+ *
+ * Pull commits:
+ * For each commit:
+ * Verify checksum
+ * Import
+ * Traverse and queue dirtree/dirmeta
+ *
+ * Pull dirtrees:
+ * For each dirtree:
+ * Verify checksum
+ * Import
+ * Traverse and queue content/dirtree/dirmeta
+ *
+ * Pull content meta:
+ * For each content:
+ * Pull meta
+ * If contentcontent needed:
+ * Queue contentcontent
+ * else:
+ * Import
+ *
+ * Pull contentcontent:
+ * For each contentcontent
+ * Verify checksum
+ * Import
+ *
+ *
+ */
+
#include "config.h"
-#include <libsoup/soup.h>
#include "ostree.h"
#include "ot-main.h"
+#include "ostree-fetcher.h"
+
gboolean verbose;
gboolean opt_prefer_loose;
gboolean opt_related;
@@ -43,7 +79,7 @@ static GOptionEntry options[] = {
typedef struct {
OstreeRepo *repo;
char *remote_name;
- SoupSession *session;
+ OstreeFetcher *fetcher;
SoupURI *base_uri;
gboolean fetched_packs;
@@ -52,10 +88,22 @@ typedef struct {
GHashTable *file_checksums_to_fetch;
- gboolean stdout_is_tty;
+ GMainLoop *loop;
+
+ /* Used in meta fetch phase */
+ guint outstanding_uri_requests;
+ guint outstanding_meta_requests;
+
+ /* Used in content fetch phase */
+ guint outstanding_filemeta_requests;
+ guint outstanding_filecontent_requests;
+ guint outstanding_checksum_requests;
+ GHashTable *loose_files;
- guint64 dl_current_bytes;
- guint64 dl_total_bytes;
+ GError **async_error;
+ gboolean caught_error;
+
+ gboolean stdout_is_tty;
} OtPullData;
static SoupURI *
@@ -96,55 +144,87 @@ suburi_new (SoupURI *base,
return ret;
}
-typedef struct {
- OtPullData *pull_data;
- GOutputStream *stream;
- gboolean had_error;
- GError **error;
-} OstreeSoupChunkData;
+static gboolean
+uri_fetch_update_status (gpointer user_data)
+{
+ OtPullData *pull_data = user_data;
+ ot_lfree char *fetcher_status;
+ GString *status;
+
+ status = g_string_new ("");
+
+ if (pull_data->loose_files != NULL)
+ g_string_append_printf (status, "%u loose files to fetch: ",
+ g_hash_table_size (pull_data->loose_files)
+ + pull_data->outstanding_filemeta_requests
+ + pull_data->outstanding_filecontent_requests);
+
+ if (pull_data->outstanding_checksum_requests > 0)
+ g_string_append_printf (status, "Calculating %u checksums; ",
+ pull_data->outstanding_checksum_requests);
+
+ fetcher_status = ostree_fetcher_query_state_text (pull_data->fetcher);
+ g_string_append (status, fetcher_status);
+ g_print ("%s\n", status->str);
+
+ g_string_free (status, TRUE);
+
+ return TRUE;
+}
static void
-sync_progress (OtPullData *pull_data)
+check_outstanding_requests_handle_error (OtPullData *pull_data,
+ GError *error)
{
- if (pull_data->stdout_is_tty)
+ if (pull_data->outstanding_uri_requests == 0 &&
+ pull_data->outstanding_meta_requests == 0 &&
+ pull_data->outstanding_filemeta_requests == 0 &&
+ pull_data->outstanding_filecontent_requests == 0 &&
+ pull_data->outstanding_checksum_requests == 0 &&
+ (pull_data->loose_files == NULL || g_hash_table_size (pull_data->loose_files) == 0))
+ g_main_loop_quit (pull_data->loop);
+ if (error)
{
- g_print ("%c8%" G_GUINT64_FORMAT "/%" G_GUINT64_FORMAT " KiB",
- 0x1b, (pull_data->dl_current_bytes / 1024), (pull_data->dl_total_bytes / 1024));
+ pull_data->caught_error = TRUE;
+ if (pull_data->async_error)
+ g_error_free (error);
+ else
+ g_propagate_error (pull_data->async_error, error);
}
}
static void
-on_got_chunk (SoupMessage *msg,
- SoupBuffer *buf,
- gpointer user_data)
+run_mainloop_monitor_fetcher (OtPullData *pull_data)
{
- OstreeSoupChunkData *data = user_data;
- gsize bytes_written;
+ GSource *update_timeout = NULL;
- data->pull_data->dl_current_bytes += buf->length;
- sync_progress (data->pull_data);
+ update_timeout = g_timeout_source_new_seconds (1);
+ g_source_set_callback (update_timeout, uri_fetch_update_status, pull_data, NULL);
+ g_source_attach (update_timeout, g_main_loop_get_context (pull_data->loop));
+ g_source_unref (update_timeout);
+
+ g_main_loop_run (pull_data->loop);
- if (!g_output_stream_write_all (data->stream, buf->data, buf->length,
- &bytes_written, NULL, data->error))
- {
- data->had_error = TRUE;
- soup_session_cancel_message (data->pull_data->session, msg, 500);
- }
+ g_source_destroy (update_timeout);
}
+typedef struct {
+ OtPullData *pull_data;
+ GFile *result_file;
+} OstreeFetchUriData;
+
static void
-on_got_content_length (SoupMessage *msg,
- OtPullData *pull_data)
+uri_fetch_on_complete (GObject *object,
+ GAsyncResult *result,
+ gpointer user_data)
{
- goffset size;
-
- g_assert (msg->response_headers);
-
- size = soup_message_headers_get_content_length (msg->response_headers);
- if (size > 0)
- pull_data->dl_total_bytes = (guint64) size;
+ OstreeFetchUriData *data = user_data;
+ GError *local_error = NULL;
- sync_progress (pull_data);
+ data->result_file = ostree_fetcher_request_uri_finish ((OstreeFetcher*)object,
+ result, &local_error);
+ data->pull_data->outstanding_uri_requests--;
+ check_outstanding_requests_handle_error (data->pull_data, local_error);
}
static gboolean
@@ -156,67 +236,31 @@ fetch_uri (OtPullData *pull_data,
GError **error)
{
gboolean ret = FALSE;
- guint response;
ot_lfree char *uri_string = NULL;
- ot_lobj GFile *ret_temp_filename = NULL;
- ot_lobj GOutputStream *output_stream = NULL;
- ot_lobj SoupMessage *msg = NULL;
- OstreeSoupChunkData chunkdata;
-
- if (!ostree_create_temp_regular_file (ostree_repo_get_tmpdir (pull_data->repo),
- tmp_prefix, NULL,
- &ret_temp_filename,
- &output_stream,
- NULL, error))
- goto out;
+ ot_lobj SoupRequest *request = NULL;
+ OstreeFetchUriData fetch_data;
- chunkdata.pull_data = pull_data;
- chunkdata.stream = output_stream;
- chunkdata.had_error = FALSE;
- chunkdata.error = error;
-
- uri_string = soup_uri_to_string (uri, FALSE);
- g_print ("Fetching %s\n", uri_string);
+ if (g_cancellable_set_error_if_cancelled (cancellable, error))
+ return FALSE;
- if (pull_data->stdout_is_tty)
- {
- g_print ("%c7", 0x1B);
- g_print ("0/? KiB");
- pull_data->dl_current_bytes = 0;
- pull_data->dl_total_bytes = 0;
- sync_progress (pull_data);
- }
+ memset (&fetch_data, 0, sizeof (fetch_data));
+ fetch_data.pull_data = pull_data;
- msg = soup_message_new_from_uri (SOUP_METHOD_GET, uri);
+ uri_string = soup_uri_to_string (uri, FALSE);
+ g_print ("Fetching %s\n", uri_string);
- soup_message_body_set_accumulate (msg->response_body, FALSE);
+ pull_data->outstanding_uri_requests++;
+ ostree_fetcher_request_uri_async (pull_data->fetcher, uri, cancellable,
+ uri_fetch_on_complete, &fetch_data);
- soup_message_add_header_handler (msg, "got-headers",
- "Content-Length",
- G_CALLBACK (on_got_content_length),
- pull_data);
- g_signal_connect (msg, "got-chunk", G_CALLBACK (on_got_chunk), &chunkdata);
-
- response = soup_session_send_message (pull_data->session, msg);
- if (response != 200)
- {
- g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED,
- "Failed to retrieve '%s': %d %s",
- uri_string, response, msg->reason_phrase);
- goto out;
- }
+ run_mainloop_monitor_fetcher (pull_data);
- if (!g_output_stream_close (output_stream, NULL, error))
+ if (pull_data->caught_error)
goto out;
- if (pull_data->stdout_is_tty)
- g_print ("\n");
-
ret = TRUE;
- ot_transfer_out_value (out_temp_filename, &ret_temp_filename);
+ ot_transfer_out_value (out_temp_filename, &fetch_data.result_file);
out:
- if (ret_temp_filename)
- (void) unlink (ot_gfile_get_path_cached (ret_temp_filename));
return ret;
}
@@ -924,6 +968,231 @@ store_file_from_pack (OtPullData *pull_data,
return ret;
}
+typedef struct {
+ OtPullData *pull_data;
+
+ gboolean fetching_content;
+
+ GFile *meta_path;
+ GFile *content_path;
+
+ char *checksum;
+} OtFetchOneContentItemData;
+
+static void
+destroy_fetch_one_content_item_data (OtFetchOneContentItemData *data)
+{
+ if (data->meta_path)
+ (void) ot_gfile_unlink (data->meta_path, NULL, NULL);
+ g_clear_object (&data->meta_path);
+ if (data->content_path)
+ (void) ot_gfile_unlink (data->content_path, NULL, NULL);
+ g_clear_object (&data->content_path);
+ g_free (data->checksum);
+ g_free (data);
+}
+
+static void
+content_fetch_on_checksum_complete (GObject *object,
+ GAsyncResult *result,
+ gpointer user_data)
+{
+ OtFetchOneContentItemData *data = user_data;
+ GError *local_error = NULL;
+ GError **error = &local_error;
+ guint64 length;
+ GCancellable *cancellable = NULL;
+ ot_lfree guchar *csum;
+ ot_lvariant GVariant *file_meta = NULL;
+ ot_lobj GFileInfo *file_info = NULL;
+ ot_lvariant GVariant *xattrs = NULL;
+ ot_lobj GInputStream *content_input = NULL;
+ ot_lobj GInputStream *file_object_input = NULL;
+ ot_lfree char *checksum;
+
+ csum = ot_gio_checksum_stream_finish ((GInputStream*)object, result, error);
+ if (!csum)
+ goto out;
+
+ if (!ot_util_variant_map (data->meta_path, OSTREE_FILE_HEADER_GVARIANT_FORMAT, FALSE,
+ &file_meta, error))
+ goto out;
+
+ if (!ostree_file_header_parse (file_meta, &file_info, &xattrs, error))
+ goto out;
+
+ if (data->content_path)
+ {
+ content_input = (GInputStream*)g_file_read (data->content_path, cancellable, error);
+ if (!content_input)
+ goto out;
+ }
+
+ if (!ostree_raw_file_to_content_stream (content_input, file_info, xattrs,
+ &file_object_input, &length,
+ cancellable, error))
+ goto out;
+
+ checksum = ostree_checksum_from_bytes (csum);
+
+ if (strcmp (checksum, data->checksum) != 0)
+ {
+ g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED,
+ "Corrupted object %s (actual checksum is %s)",
+ data->checksum, checksum);
+ goto out;
+ }
+
+ if (!ostree_repo_stage_file_object_trusted (data->pull_data->repo, checksum,
+ FALSE, file_object_input, length,
+ cancellable, error))
+ goto out;
+
+ out:
+ data->pull_data->outstanding_checksum_requests--;
+ check_outstanding_requests_handle_error (data->pull_data, local_error);
+ destroy_fetch_one_content_item_data (data);
+}
+
+static void
+enqueue_loose_meta_requests (OtPullData *pull_data);
+
+static void
+content_fetch_on_complete (GObject *object,
+ GAsyncResult *result,
+ gpointer user_data)
+{
+ OtFetchOneContentItemData *data = user_data;
+ GError *local_error = NULL;
+ GError **error = &local_error;
+ GCancellable *cancellable = NULL;
+ gboolean was_content_fetch = FALSE;
+ gboolean need_content_fetch = FALSE;
+ ot_lvariant GVariant *file_meta = NULL;
+ ot_lobj GFileInfo *file_info = NULL;
+ ot_lobj GInputStream *content_input = NULL;
+ ot_lobj GInputStream *file_object_input = NULL;
+ ot_lvariant GVariant *xattrs = NULL;
+
+ was_content_fetch = data->fetching_content;
+
+ if (was_content_fetch)
+ {
+ data->content_path = ostree_fetcher_request_uri_finish ((OstreeFetcher*)object, result, error);
+ if (!data->content_path)
+ goto out;
+ }
+ else
+ {
+ data->meta_path = ostree_fetcher_request_uri_finish ((OstreeFetcher*)object, result, error);
+ if (!data->meta_path)
+ goto out;
+ }
+
+ if (!was_content_fetch)
+ {
+ if (!ot_util_variant_map (data->meta_path, OSTREE_FILE_HEADER_GVARIANT_FORMAT, FALSE,
+ &file_meta, error))
+ goto out;
+
+ if (!ostree_file_header_parse (file_meta, &file_info, &xattrs, error))
+ goto out;
+
+ if (g_file_info_get_file_type (file_info) == G_FILE_TYPE_REGULAR)
+ {
+ ot_lfree char *content_path = ostree_get_relative_archive_content_path (data->checksum);
+ SoupURI *content_uri;
+
+ content_uri = suburi_new (data->pull_data->base_uri, content_path, NULL);
+
+ data->pull_data->outstanding_filecontent_requests++;
+ need_content_fetch = TRUE;
+ data->fetching_content = TRUE;
+
+ ostree_fetcher_request_uri_async (data->pull_data->fetcher, content_uri, cancellable,
+ content_fetch_on_complete, data);
+ soup_uri_free (content_uri);
+ }
+ }
+
+ if (!need_content_fetch)
+ {
+ if (data->content_path)
+ {
+ content_input = (GInputStream*)g_file_read (data->content_path, cancellable, error);
+ if (!content_input)
+ goto out;
+ }
+
+ if (file_meta == NULL)
+ {
+ if (!ot_util_variant_map (data->meta_path, OSTREE_FILE_HEADER_GVARIANT_FORMAT, FALSE,
+ &file_meta, error))
+ goto out;
+
+ if (!ostree_file_header_parse (file_meta, &file_info, &xattrs, error))
+ goto out;
+ }
+
+ if (!ostree_raw_file_to_content_stream (content_input, file_info, xattrs,
+ &file_object_input, NULL,
+ cancellable, error))
+ goto out;
+
+ data->pull_data->outstanding_checksum_requests++;
+ ot_gio_checksum_stream_async (file_object_input, G_PRIORITY_DEFAULT, NULL,
+ content_fetch_on_checksum_complete, data);
+ }
+
+ out:
+ if (was_content_fetch)
+ data->pull_data->outstanding_filecontent_requests--;
+ else
+ {
+ data->pull_data->outstanding_filemeta_requests--;
+ enqueue_loose_meta_requests (data->pull_data);
+ }
+ check_outstanding_requests_handle_error (data->pull_data, local_error);
+}
+
+static void
+enqueue_loose_meta_requests (OtPullData *pull_data)
+{
+ GHashTableIter hash_iter;
+ gpointer key, value;
+ GCancellable *cancellable = NULL;
+
+ g_hash_table_iter_init (&hash_iter, pull_data->loose_files);
+ while (g_hash_table_iter_next (&hash_iter, &key, &value))
+ {
+ const char *checksum = key;
+ ot_lfree char *objpath = NULL;
+ SoupURI *obj_uri = NULL;
+ OtFetchOneContentItemData *one_item_data;
+
+ one_item_data = g_new0 (OtFetchOneContentItemData, 1);
+ one_item_data->pull_data = pull_data;
+ one_item_data->checksum = g_strdup (checksum);
+ one_item_data->fetching_content = FALSE;
+
+ objpath = ostree_get_relative_object_path (checksum, OSTREE_OBJECT_TYPE_FILE);
+ obj_uri = suburi_new (pull_data->base_uri, objpath, NULL);
+
+ ostree_fetcher_request_uri_async (pull_data->fetcher, obj_uri, cancellable,
+ content_fetch_on_complete, one_item_data);
+ soup_uri_free (obj_uri);
+
+ pull_data->outstanding_filemeta_requests++;
+ g_hash_table_iter_remove (&hash_iter);
+
+ /* Don't let too many requests queue up; when we're fetching
+ * files we need to process the actual content.
+ */
+ if (pull_data->outstanding_filemeta_requests > 20)
+ break;
+ }
+}
+
static gboolean
fetch_content (OtPullData *pull_data,
GCancellable *cancellable,
@@ -1015,51 +1284,16 @@ fetch_content (OtPullData *pull_data,
if (g_hash_table_size (loose_files) > 0)
g_print ("Fetching %u loose objects\n",
g_hash_table_size (loose_files));
-
- g_hash_table_iter_init (&hash_iter, loose_files);
- while (g_hash_table_iter_next (&hash_iter, &key, &value))
+
+ pull_data->loose_files = loose_files;
+
+ if (g_hash_table_size (loose_files) > 0)
{
- const char *checksum = key;
- guint64 length;
- ot_lobj GInputStream *file_object_input = NULL;
- ot_lvariant GVariant *file_meta = NULL;
- ot_lobj GFileInfo *file_info = NULL;
- ot_lvariant GVariant *xattrs = NULL;
- ot_lobj GInputStream *content_input = NULL;
-
- if (!fetch_loose_object (pull_data, checksum, OSTREE_OBJECT_TYPE_FILE, &temp_path,
- cancellable, error))
- goto out;
+ enqueue_loose_meta_requests (pull_data);
- if (!ot_util_variant_map (temp_path, OSTREE_FILE_HEADER_GVARIANT_FORMAT, FALSE,
- &file_meta, error))
- goto out;
+ run_mainloop_monitor_fetcher (pull_data);
- if (!ostree_file_header_parse (file_meta, &file_info, &xattrs, error))
- goto out;
-
- if (g_file_info_get_file_type (file_info) == G_FILE_TYPE_REGULAR)
- {
- ot_lfree char *content_path = ostree_get_relative_archive_content_path (checksum);
- content_uri = suburi_new (pull_data->base_uri, content_path, NULL);
-
- if (!fetch_uri (pull_data, content_uri, "filecontent", &content_temp_path,
- cancellable, error))
- goto out;
-
- content_input = (GInputStream*)g_file_read (content_temp_path, cancellable, error);
- if (!content_input)
- goto out;
- }
-
- if (!ostree_raw_file_to_content_stream (content_input, file_info, xattrs,
- &file_object_input, &length,
- cancellable, error))
- goto out;
-
- if (!ostree_repo_stage_file_object (pull_data->repo, checksum,
- file_object_input, length,
- cancellable, error))
+ if (pull_data->caught_error)
goto out;
}
@@ -1203,6 +1437,9 @@ ostree_builtin_pull (int argc, char **argv, GFile *repo_path, GError **error)
if (!ostree_repo_check (repo, error))
goto out;
+ pull_data->async_error = error;
+ pull_data->loop = g_main_loop_new (NULL, FALSE);
+
pull_data->repo = repo;
pull_data->file_checksums_to_fetch = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, NULL);
@@ -1215,10 +1452,7 @@ ostree_builtin_pull (int argc, char **argv, GFile *repo_path, GError **error)
pull_data->stdout_is_tty = isatty (1);
pull_data->remote_name = g_strdup (argv[1]);
- pull_data->session = soup_session_sync_new_with_options (SOUP_SESSION_USER_AGENT, "ostree ",
- SOUP_SESSION_ADD_FEATURE_BY_TYPE, SOUP_TYPE_COOKIE_JAR,
- NULL);
-
+ pull_data->fetcher = ostree_fetcher_new (ostree_repo_get_tmpdir (pull_data->repo));
config = ostree_repo_get_config (repo);
remote_key = g_strdup_printf ("remote \"%s\"", pull_data->remote_name);
@@ -1384,10 +1618,12 @@ ostree_builtin_pull (int argc, char **argv, GFile *repo_path, GError **error)
ret = TRUE;
out:
+ if (pull_data->loop)
+ g_main_loop_unref (pull_data->loop);
g_strfreev (configured_branches);
if (context)
g_option_context_free (context);
- g_clear_object (&pull_data->session);
+ g_clear_object (&pull_data->fetcher);
g_free (pull_data->remote_name);
if (pull_data->base_uri)
soup_uri_free (pull_data->base_uri);
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]