[ostree] pull: Download and checksum asynchronously



commit e485bace0189625c5ffc422c4c8b5987a6de5e79
Author: Colin Walters <walters verbum org>
Date:   Sun May 20 16:21:57 2012 -0400

    pull: Download and checksum asynchronously
    
    This is quite a noticeable speedup when downloading loose objects.

 Makefile-ostree.am           |    2 +
 src/libotutil/ot-gio-utils.c |   44 ++++
 src/libotutil/ot-gio-utils.h |   10 +
 src/ostree/ostree-fetcher.c  |  313 ++++++++++++++++++++++++++
 src/ostree/ostree-fetcher.h  |   64 ++++++
 src/ostree/ostree-pull.c     |  506 +++++++++++++++++++++++++++++++-----------
 6 files changed, 804 insertions(+), 135 deletions(-)
---
diff --git a/Makefile-ostree.am b/Makefile-ostree.am
index 1f7ab0e..8024974 100644
--- a/Makefile-ostree.am
+++ b/Makefile-ostree.am
@@ -53,6 +53,8 @@ if USE_LIBSOUP_GNOME
 bin_PROGRAMS += ostree-pull
 ostree_pull_SOURCES = src/ostree/ot-main.h \
 	src/ostree/ot-main.c \
+	src/ostree/ostree-fetcher.h \
+	src/ostree/ostree-fetcher.c \
 	src/ostree/ostree-pull.c
 
 ostree_pull_CFLAGS = $(ostree_bin_shared_cflags) $(OT_DEP_SOUP_CFLAGS)
diff --git a/src/libotutil/ot-gio-utils.c b/src/libotutil/ot-gio-utils.c
index cf1e0cd..b27507d 100644
--- a/src/libotutil/ot-gio-utils.c
+++ b/src/libotutil/ot-gio-utils.c
@@ -358,6 +358,50 @@ ot_gio_checksum_stream (GInputStream   *in,
   return ot_gio_splice_get_checksum (NULL, in, out_csum, cancellable, error);
 }
 
+static void
+checksum_stream_thread (GSimpleAsyncResult   *result,
+                        GObject              *object,
+                        GCancellable         *cancellable)
+{
+  GError *error = NULL;
+  guchar *csum;
+
+  if (!ot_gio_checksum_stream ((GInputStream*)object, &csum,
+                               cancellable, &error))
+    g_simple_async_result_take_error (result, error);
+  else
+    g_simple_async_result_set_op_res_gpointer (result, csum, g_free);
+}
+
+void
+ot_gio_checksum_stream_async (GInputStream         *in,
+                              int                   io_priority,
+                              GCancellable         *cancellable,
+                              GAsyncReadyCallback   callback,
+                              gpointer              user_data)
+{
+  GSimpleAsyncResult *result;
+
+  result = g_simple_async_result_new ((GObject*) in,
+                                      callback, user_data,
+                                      ot_gio_checksum_stream_async);
+
+  g_simple_async_result_run_in_thread (result, checksum_stream_thread, io_priority, cancellable);
+  g_object_unref (result);
+}
+
+guchar *
+ot_gio_checksum_stream_finish (GInputStream   *in,
+                               GAsyncResult   *result,
+                               GError        **error)
+{
+  GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
+
+  g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == ot_gio_checksum_stream_async);
+  return g_memdup (g_simple_async_result_get_op_res_gpointer (simple), 32);
+
+}
+
 gboolean
 ot_gfile_merge_dirs (GFile    *destination,
                      GFile    *src,
diff --git a/src/libotutil/ot-gio-utils.h b/src/libotutil/ot-gio-utils.h
index 16b2f12..9299a44 100644
--- a/src/libotutil/ot-gio-utils.h
+++ b/src/libotutil/ot-gio-utils.h
@@ -88,6 +88,16 @@ gboolean ot_gio_checksum_stream (GInputStream   *in,
                                  GCancellable   *cancellable,
                                  GError        **error);
 
+void ot_gio_checksum_stream_async (GInputStream         *in,
+                                   int                   io_priority,
+                                   GCancellable         *cancellable,
+                                   GAsyncReadyCallback   callback,
+                                   gpointer              user_data);
+
+guchar * ot_gio_checksum_stream_finish (GInputStream   *in,
+                                        GAsyncResult   *result,
+                                        GError        **error);
+
 gboolean ot_gfile_merge_dirs (GFile    *destination,
                               GFile    *src,
                               GCancellable *cancellable,
diff --git a/src/ostree/ostree-fetcher.c b/src/ostree/ostree-fetcher.c
new file mode 100644
index 0000000..cd42261
--- /dev/null
+++ b/src/ostree/ostree-fetcher.c
@@ -0,0 +1,313 @@
+/* -*- mode: C; c-file-style: "gnu"; indent-tabs-mode: nil; -*-
+ *
+ * Copyright (C) 2011 Colin Walters <walters verbum org>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ *
+ * Author: Colin Walters <walters verbum org>
+ */
+
+#include "config.h"
+
+#include "ostree-fetcher.h"
+#include "ostree.h"
+
+typedef enum {
+  OSTREE_FETCHER_STATE_PENDING,
+  OSTREE_FETCHER_STATE_DOWNLOADING,
+  OSTREE_FETCHER_STATE_COMPLETE
+} OstreeFetcherState;
+
+typedef struct {
+  OstreeFetcher *self;
+  SoupURI *uri;
+
+  OstreeFetcherState state;
+
+  SoupRequest *request;
+
+  GFile *tmpfile;
+  GInputStream *request_body;
+  GOutputStream *out_stream;
+
+  guint64 content_length;
+
+  GCancellable *cancellable;
+  GSimpleAsyncResult *result;
+} OstreeFetcherPendingURI;
+
+static void
+pending_uri_free (OstreeFetcherPendingURI *pending)
+{
+  g_clear_object (&pending->self);
+  g_clear_object (&pending->tmpfile);
+  g_clear_object (&pending->request);
+  g_clear_object (&pending->request_body);
+  g_clear_object (&pending->out_stream);
+  g_clear_object (&pending->cancellable);
+  g_free (pending);
+}
+
+struct OstreeFetcher
+{
+  GObject parent_instance;
+
+  GFile *tmpdir;
+
+  SoupSession *session;
+  SoupRequester *requester;
+
+  SoupMessage *sending_message;
+
+  GHashTable *message_to_request;
+  
+  guint64 total_downloaded;
+};
+
+G_DEFINE_TYPE (OstreeFetcher, ostree_fetcher, G_TYPE_OBJECT)
+
+static void
+ostree_fetcher_finalize (GObject *object)
+{
+  OstreeFetcher *self;
+
+  self = OSTREE_FETCHER (object);
+
+  g_clear_object (&self->session);
+
+  G_OBJECT_CLASS (ostree_fetcher_parent_class)->finalize (object);
+}
+
+static void
+ostree_fetcher_class_init (OstreeFetcherClass *klass)
+{
+  GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
+
+  gobject_class->finalize = ostree_fetcher_finalize;
+}
+
+static void
+on_request_started (SoupSession  *session,
+                    SoupMessage  *msg,
+                    SoupSocket   *socket,
+                    gpointer      user_data)
+{
+  OstreeFetcher *self = user_data;
+  self->sending_message = msg;
+}
+
+static void
+on_request_unqueued (SoupSession  *session,
+                     SoupMessage  *msg,
+                     gpointer      user_data)
+{
+  OstreeFetcher *self = user_data;
+  if (msg == self->sending_message)
+    {
+      self->sending_message = NULL;
+      g_hash_table_remove (self->message_to_request, msg);
+    }
+}
+
+static void
+ostree_fetcher_init (OstreeFetcher *self)
+{
+  self->session = soup_session_async_new_with_options (SOUP_SESSION_USER_AGENT, "ostree ",
+                                                       SOUP_SESSION_USE_THREAD_CONTEXT, TRUE,
+                                                       SOUP_SESSION_ADD_FEATURE_BY_TYPE, SOUP_TYPE_REQUESTER,
+                                                       NULL);
+  self->requester = (SoupRequester *)soup_session_get_feature (self->session, SOUP_TYPE_REQUESTER);
+
+  g_signal_connect (self->session, "request-started",
+                    G_CALLBACK (on_request_started), self);
+  g_signal_connect (self->session, "request-unqueued",
+                    G_CALLBACK (on_request_unqueued), self);
+  
+  self->message_to_request = g_hash_table_new_full (NULL, NULL, (GDestroyNotify)g_object_unref, NULL);
+}
+
+OstreeFetcher *
+ostree_fetcher_new (GFile *tmpdir)
+{
+  OstreeFetcher *self = (OstreeFetcher*)g_object_new (OSTREE_TYPE_FETCHER, NULL);
+
+  self->tmpdir = g_object_ref (tmpdir);
+ 
+  return self;
+}
+
+static void
+on_splice_complete (GObject        *object,
+                    GAsyncResult   *result,
+                    gpointer        user_data) 
+{
+  OstreeFetcherPendingURI *pending = user_data;
+  ot_lobj GFileInfo *file_info = NULL;
+
+  pending->state = OSTREE_FETCHER_STATE_COMPLETE;
+  file_info = g_file_query_info (pending->tmpfile, OSTREE_GIO_FAST_QUERYINFO,
+                                 G_FILE_QUERY_INFO_NOFOLLOW_SYMLINKS,
+                                 NULL, NULL);
+  if (file_info)
+    pending->self->total_downloaded += g_file_info_get_size (file_info);
+
+  (void) g_input_stream_close (pending->request_body, NULL, NULL);
+
+  g_simple_async_result_complete (pending->result);
+  g_object_unref (pending->result);
+}
+
+static void
+on_request_sent (GObject        *object,
+                 GAsyncResult   *result,
+                 gpointer        user_data) 
+{
+  OstreeFetcherPendingURI *pending = user_data;
+  GError *local_error = NULL;
+
+  pending->request_body = soup_request_send_finish ((SoupRequest*) object,
+                                                   result, &local_error);
+  if (!pending->request_body)
+    {
+      pending->state = OSTREE_FETCHER_STATE_COMPLETE;
+      g_simple_async_result_take_error (pending->result, local_error);
+      g_simple_async_result_complete (pending->result);
+    }
+  else
+    {
+      GOutputStreamSpliceFlags flags = G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET;
+
+      pending->state = OSTREE_FETCHER_STATE_DOWNLOADING;
+
+      pending->content_length = soup_request_get_content_length (pending->request);
+
+      /* TODO - make this async */
+      if (!ostree_create_temp_regular_file (pending->self->tmpdir,
+                                            NULL, NULL,
+                                            &pending->tmpfile,
+                                            &pending->out_stream,
+                                            NULL, &local_error))
+        {
+          g_simple_async_result_take_error (pending->result, local_error);
+          g_simple_async_result_complete (pending->result);
+          return;
+        }
+
+      g_output_stream_splice_async (pending->out_stream, pending->request_body, flags, G_PRIORITY_DEFAULT,
+                                    pending->cancellable, on_splice_complete, pending);
+    }
+}
+
+void
+ostree_fetcher_request_uri_async (OstreeFetcher         *self,
+                                  SoupURI               *uri,
+                                  GCancellable          *cancellable,
+                                  GAsyncReadyCallback    callback,
+                                  gpointer               user_data)
+{
+  OstreeFetcherPendingURI *pending;
+  GError *local_error = NULL;
+
+  pending = g_new0 (OstreeFetcherPendingURI, 1);
+  pending->self = g_object_ref (self);
+  pending->uri = soup_uri_copy (uri);
+  pending->cancellable = cancellable ? g_object_ref (cancellable) : NULL;
+  pending->request = soup_requester_request_uri (self->requester, uri, &local_error);
+  g_assert_no_error (local_error);
+
+  g_hash_table_insert (self->message_to_request,
+                       soup_request_http_get_message ((SoupRequestHTTP*)pending->request),
+                       pending);
+
+  pending->result = g_simple_async_result_new ((GObject*) self,
+                                               callback, user_data,
+                                               ostree_fetcher_request_uri_async);
+  g_simple_async_result_set_op_res_gpointer (pending->result, pending,
+                                             (GDestroyNotify) pending_uri_free);
+
+  soup_request_send_async (pending->request, cancellable,
+                           on_request_sent, pending);
+  
+}
+
+GFile *
+ostree_fetcher_request_uri_finish (OstreeFetcher         *self,
+                                   GAsyncResult          *result,
+                                   GError               **error)
+{
+  GSimpleAsyncResult *simple;
+  OstreeFetcherPendingURI *pending;
+
+  g_return_val_if_fail (g_simple_async_result_is_valid (result, (GObject*)self, ostree_fetcher_request_uri_async), FALSE);
+
+  simple = G_SIMPLE_ASYNC_RESULT (result);
+  if (g_simple_async_result_propagate_error (simple, error))
+    return NULL;
+  pending = g_simple_async_result_get_op_res_gpointer (simple);
+
+  return g_object_ref (pending->tmpfile);
+}
+
+static char *
+format_size_pair (guint64 start,
+                  guint64 max)
+{
+  if (max < 1024)
+    return g_strdup_printf ("%lu/%lu bytes", 
+                            (gulong) start,
+                            (gulong) max);
+  else
+    return g_strdup_printf ("%.1f/%.1f KiB", ((double) start) / 1024,
+                            ((double) max) / 1024);
+}
+
+char *
+ostree_fetcher_query_state_text (OstreeFetcher              *self)
+{
+  OstreeFetcherPendingURI *active; 
+
+  if (self->sending_message)
+    active = g_hash_table_lookup (self->message_to_request, self->sending_message);
+  else
+    active = NULL;
+  if (active)
+    {
+      ot_lfree char *active_uri = soup_uri_to_string (active->uri, TRUE);
+
+      if (active->tmpfile)
+        {
+          ot_lobj GFileInfo *file_info = NULL;
+
+          file_info = g_file_query_info (active->tmpfile, OSTREE_GIO_FAST_QUERYINFO,
+                                         G_FILE_QUERY_INFO_NOFOLLOW_SYMLINKS,
+                                         NULL, NULL);
+          if (file_info)
+            {
+              ot_lfree char *size = format_size_pair (g_file_info_get_size (file_info),
+                                                      active->content_length);
+              return g_strdup_printf ("Downloading %s  [ %s, %.1f KiB downloaded ]",
+                                      active_uri, size, ((double)self->total_downloaded) / 1024);
+            }
+        }
+      else
+        {
+          return g_strdup_printf ("Requesting %s  [ %.1f KiB downloaded ]",
+                                  active_uri, ((double)self->total_downloaded) / 1024);
+        }
+    }
+
+  return g_strdup_printf ("Idle [ %.1f KiB downloaded ]", ((double)self->total_downloaded) / 1024);
+}
diff --git a/src/ostree/ostree-fetcher.h b/src/ostree/ostree-fetcher.h
new file mode 100644
index 0000000..d32ea6f
--- /dev/null
+++ b/src/ostree/ostree-fetcher.h
@@ -0,0 +1,64 @@
+/* -*- mode: C; c-file-style: "gnu"; indent-tabs-mode: nil; -*-
+ *
+ * Copyright (C) 2012 Colin Walters <walters verbum org>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#ifndef _OSTREE_FETCHER
+#define _OSTREE_FETCHER
+
+#define LIBSOUP_USE_UNSTABLE_REQUEST_API
+#include <libsoup/soup.h>
+#include <libsoup/soup-requester.h>
+#include <libsoup/soup-request-http.h>
+
+G_BEGIN_DECLS
+
+#define OSTREE_TYPE_FETCHER         (ostree_fetcher_get_type ())
+#define OSTREE_FETCHER(o)           (G_TYPE_CHECK_INSTANCE_CAST ((o), OSTREE_TYPE_FETCHER, OstreeFetcher))
+#define OSTREE_FETCHER_CLASS(k)     (G_TYPE_CHECK_CLASS_CAST((k), OSTREE_TYPE_FETCHER, OstreeFetcherClass))
+#define OSTREE_IS_FETCHER(o)        (G_TYPE_CHECK_INSTANCE_TYPE ((o), OSTREE_TYPE_FETCHER))
+#define OSTREE_IS_FETCHER_CLASS(k)  (G_TYPE_CHECK_CLASS_TYPE ((k), OSTREE_TYPE_FETCHER))
+#define OSTREE_FETCHER_GET_CLASS(o) (G_TYPE_INSTANCE_GET_CLASS ((o), OSTREE_TYPE_FETCHER, OstreeFetcherClass))
+
+typedef struct OstreeFetcherClass   OstreeFetcherClass;
+typedef struct OstreeFetcher   OstreeFetcher;
+
+struct OstreeFetcherClass
+{
+  GObjectClass parent_class;
+};
+
+GType   ostree_fetcher_get_type (void) G_GNUC_CONST;
+
+OstreeFetcher *ostree_fetcher_new (GFile *tmpdir);
+
+char * ostree_fetcher_query_state_text (OstreeFetcher              *self);
+
+void ostree_fetcher_request_uri_async (OstreeFetcher         *self,
+                                       SoupURI               *uri,
+                                       GCancellable          *cancellable,
+                                       GAsyncReadyCallback    callback,
+                                       gpointer               user_data);
+
+GFile *ostree_fetcher_request_uri_finish (OstreeFetcher         *self,
+                                          GAsyncResult          *result,
+                                          GError               **error);
+
+G_END_DECLS
+
+#endif
diff --git a/src/ostree/ostree-pull.c b/src/ostree/ostree-pull.c
index f8ba1d8..a7be92d 100644
--- a/src/ostree/ostree-pull.c
+++ b/src/ostree/ostree-pull.c
@@ -20,13 +20,49 @@
  * Author: Colin Walters <walters verbum org>
  */
 
+/**
+ * DESIGN:
+ *
+ * Pull refs
+ *   For each ref:
+ *     Pull commit
+ * 
+ * Pull commits:
+ *  For each commit:
+ *    Verify checksum
+ *    Import
+ *    Traverse and queue dirtree/dirmeta
+ * 
+ * Pull dirtrees:
+ *  For each dirtree:
+ *    Verify checksum
+ *    Import
+ *    Traverse and queue content/dirtree/dirmeta
+ *
+ * Pull content meta:
+ *  For each content:
+ *    Pull meta
+ *    If contentcontent needed:
+ *      Queue contentcontent
+ *    else:
+ *      Import
+ *
+ * Pull contentcontent:
+ *  For each contentcontent
+ *    Verify checksum
+ *    Import
+ *    
+ *  
+ */
+
 #include "config.h"
 
-#include <libsoup/soup.h>
 
 #include "ostree.h"
 #include "ot-main.h"
 
+#include "ostree-fetcher.h"
+
 gboolean verbose;
 gboolean opt_prefer_loose;
 gboolean opt_related;
@@ -43,7 +79,7 @@ static GOptionEntry options[] = {
 typedef struct {
   OstreeRepo   *repo;
   char         *remote_name;
-  SoupSession  *session;
+  OstreeFetcher *fetcher;
   SoupURI      *base_uri;
 
   gboolean      fetched_packs;
@@ -52,10 +88,22 @@ typedef struct {
 
   GHashTable   *file_checksums_to_fetch;
 
-  gboolean      stdout_is_tty;
+  GMainLoop    *loop;
+
+  /* Used in meta fetch phase */
+  guint         outstanding_uri_requests;
+  guint         outstanding_meta_requests;
+
+  /* Used in content fetch phase */
+  guint         outstanding_filemeta_requests;
+  guint         outstanding_filecontent_requests;
+  guint         outstanding_checksum_requests;
+  GHashTable   *loose_files;
 
-  guint64       dl_current_bytes;
-  guint64       dl_total_bytes;
+  GError      **async_error;
+  gboolean      caught_error;
+
+  gboolean      stdout_is_tty;
 } OtPullData;
 
 static SoupURI *
@@ -96,55 +144,87 @@ suburi_new (SoupURI   *base,
   return ret;
 }
 
-typedef struct {
-  OtPullData     *pull_data;
-  GOutputStream  *stream;
-  gboolean        had_error;
-  GError        **error;
-} OstreeSoupChunkData;
+static gboolean
+uri_fetch_update_status (gpointer user_data)
+{
+  OtPullData *pull_data = user_data;
+  ot_lfree char *fetcher_status;
+  GString *status;
+ 
+  status = g_string_new ("");
+
+  if (pull_data->loose_files != NULL)
+    g_string_append_printf (status, "%u loose files to fetch: ",
+                            g_hash_table_size (pull_data->loose_files)
+                            + pull_data->outstanding_filemeta_requests
+                            + pull_data->outstanding_filecontent_requests);
+
+  if (pull_data->outstanding_checksum_requests > 0)
+    g_string_append_printf (status, "Calculating %u checksums; ",
+                            pull_data->outstanding_checksum_requests);
+
+  fetcher_status = ostree_fetcher_query_state_text (pull_data->fetcher);
+  g_string_append (status, fetcher_status);
+  g_print ("%s\n", status->str);
+
+  g_string_free (status, TRUE);
+
+  return TRUE;
+}
 
 static void
-sync_progress (OtPullData   *pull_data)
+check_outstanding_requests_handle_error (OtPullData          *pull_data,
+                                         GError              *error)
 {
-  if (pull_data->stdout_is_tty)
+  if (pull_data->outstanding_uri_requests == 0 &&
+      pull_data->outstanding_meta_requests == 0 &&
+      pull_data->outstanding_filemeta_requests == 0 &&
+      pull_data->outstanding_filecontent_requests == 0 &&
+      pull_data->outstanding_checksum_requests == 0 &&
+      (pull_data->loose_files == NULL || g_hash_table_size (pull_data->loose_files) == 0))
+    g_main_loop_quit (pull_data->loop);
+  if (error)
     {
-      g_print ("%c8%" G_GUINT64_FORMAT "/%" G_GUINT64_FORMAT " KiB",
-               0x1b, (pull_data->dl_current_bytes / 1024), (pull_data->dl_total_bytes / 1024));
+      pull_data->caught_error = TRUE;
+      if (pull_data->async_error)
+        g_error_free (error);
+      else
+        g_propagate_error (pull_data->async_error, error);
     }
 }
 
 static void
-on_got_chunk (SoupMessage   *msg,
-              SoupBuffer    *buf,
-              gpointer       user_data)
+run_mainloop_monitor_fetcher (OtPullData   *pull_data)
 {
-  OstreeSoupChunkData *data = user_data;
-  gsize bytes_written;
+  GSource *update_timeout = NULL;
 
-  data->pull_data->dl_current_bytes += buf->length;
-  sync_progress (data->pull_data);
+  update_timeout = g_timeout_source_new_seconds (1);
+  g_source_set_callback (update_timeout, uri_fetch_update_status, pull_data, NULL);
+  g_source_attach (update_timeout, g_main_loop_get_context (pull_data->loop));
+  g_source_unref (update_timeout);
+  
+  g_main_loop_run (pull_data->loop);
 
-  if (!g_output_stream_write_all (data->stream, buf->data, buf->length,
-                                  &bytes_written, NULL, data->error))
-    {
-      data->had_error = TRUE;
-      soup_session_cancel_message (data->pull_data->session, msg, 500);
-    }
+  g_source_destroy (update_timeout);
 }
 
+typedef struct {
+  OtPullData     *pull_data;
+  GFile          *result_file;
+} OstreeFetchUriData;
+
 static void
-on_got_content_length (SoupMessage        *msg,
-                       OtPullData         *pull_data)
+uri_fetch_on_complete (GObject        *object,
+                       GAsyncResult   *result,
+                       gpointer        user_data) 
 {
-  goffset size;
-
-  g_assert (msg->response_headers);
-  
-  size = soup_message_headers_get_content_length (msg->response_headers);
-  if (size > 0)
-    pull_data->dl_total_bytes = (guint64) size;
+  OstreeFetchUriData *data = user_data;
+  GError *local_error = NULL;
 
-  sync_progress (pull_data);
+  data->result_file = ostree_fetcher_request_uri_finish ((OstreeFetcher*)object,
+                                                         result, &local_error);
+  data->pull_data->outstanding_uri_requests--;
+  check_outstanding_requests_handle_error (data->pull_data, local_error);
 }
 
 static gboolean
@@ -156,67 +236,31 @@ fetch_uri (OtPullData  *pull_data,
            GError     **error)
 {
   gboolean ret = FALSE;
-  guint response;
   ot_lfree char *uri_string = NULL;
-  ot_lobj GFile *ret_temp_filename = NULL;
-  ot_lobj GOutputStream *output_stream = NULL;
-  ot_lobj SoupMessage *msg = NULL;
-  OstreeSoupChunkData chunkdata;
-
-  if (!ostree_create_temp_regular_file (ostree_repo_get_tmpdir (pull_data->repo),
-                                        tmp_prefix, NULL,
-                                        &ret_temp_filename,
-                                        &output_stream,
-                                        NULL, error))
-    goto out;
+  ot_lobj SoupRequest *request = NULL;
+  OstreeFetchUriData fetch_data;
 
-  chunkdata.pull_data = pull_data;
-  chunkdata.stream = output_stream;
-  chunkdata.had_error = FALSE;
-  chunkdata.error = error;
-  
-  uri_string = soup_uri_to_string (uri, FALSE);
-  g_print ("Fetching %s\n", uri_string);
+  if (g_cancellable_set_error_if_cancelled (cancellable, error))
+    return FALSE;
 
-  if (pull_data->stdout_is_tty)
-    {
-      g_print ("%c7", 0x1B);
-      g_print ("0/? KiB");
-      pull_data->dl_current_bytes = 0;
-      pull_data->dl_total_bytes = 0;
-      sync_progress (pull_data);
-    }
+  memset (&fetch_data, 0, sizeof (fetch_data));
+  fetch_data.pull_data = pull_data;
 
-  msg = soup_message_new_from_uri (SOUP_METHOD_GET, uri);
+  uri_string = soup_uri_to_string (uri, FALSE);
+  g_print ("Fetching %s\n", uri_string);
 
-  soup_message_body_set_accumulate (msg->response_body, FALSE);
+  pull_data->outstanding_uri_requests++;
+  ostree_fetcher_request_uri_async (pull_data->fetcher, uri, cancellable,
+                                    uri_fetch_on_complete, &fetch_data);
 
-  soup_message_add_header_handler (msg, "got-headers",
-                                   "Content-Length",
-                                   G_CALLBACK (on_got_content_length),
-                                   pull_data);
-  g_signal_connect (msg, "got-chunk", G_CALLBACK (on_got_chunk), &chunkdata);
-  
-  response = soup_session_send_message (pull_data->session, msg);
-  if (response != 200)
-    {
-      g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED,
-                   "Failed to retrieve '%s': %d %s",
-                   uri_string, response, msg->reason_phrase);
-      goto out;
-    }
+  run_mainloop_monitor_fetcher (pull_data);
 
-  if (!g_output_stream_close (output_stream, NULL, error))
+  if (pull_data->caught_error)
     goto out;
 
-  if (pull_data->stdout_is_tty)
-    g_print ("\n");
-  
   ret = TRUE;
-  ot_transfer_out_value (out_temp_filename, &ret_temp_filename);
+  ot_transfer_out_value (out_temp_filename, &fetch_data.result_file);
  out:
-  if (ret_temp_filename)
-    (void) unlink (ot_gfile_get_path_cached (ret_temp_filename));
   return ret;
 }
 
@@ -924,6 +968,231 @@ store_file_from_pack (OtPullData          *pull_data,
   return ret;
 }
 
+typedef struct {
+  OtPullData *pull_data;
+
+  gboolean fetching_content;
+
+  GFile *meta_path;
+  GFile *content_path;
+
+  char *checksum;
+} OtFetchOneContentItemData;
+
+static void
+destroy_fetch_one_content_item_data (OtFetchOneContentItemData *data)
+{
+  if (data->meta_path)
+    (void) ot_gfile_unlink (data->meta_path, NULL, NULL);
+  g_clear_object (&data->meta_path);
+  if (data->content_path)
+    (void) ot_gfile_unlink (data->content_path, NULL, NULL);
+  g_clear_object (&data->content_path);
+  g_free (data->checksum);
+  g_free (data);
+}
+
+static void
+content_fetch_on_checksum_complete (GObject        *object,
+                                    GAsyncResult   *result,
+                                    gpointer        user_data)
+{
+  OtFetchOneContentItemData *data = user_data;
+  GError *local_error = NULL;
+  GError **error = &local_error;
+  guint64 length;
+  GCancellable *cancellable = NULL;
+  ot_lfree guchar *csum;
+  ot_lvariant GVariant *file_meta = NULL;
+  ot_lobj GFileInfo *file_info = NULL;
+  ot_lvariant GVariant *xattrs = NULL;
+  ot_lobj GInputStream *content_input = NULL;
+  ot_lobj GInputStream *file_object_input = NULL;
+  ot_lfree char *checksum;
+
+  csum = ot_gio_checksum_stream_finish ((GInputStream*)object, result, error);
+  if (!csum)
+    goto out;
+
+  if (!ot_util_variant_map (data->meta_path, OSTREE_FILE_HEADER_GVARIANT_FORMAT, FALSE,
+                            &file_meta, error))
+    goto out;
+  
+  if (!ostree_file_header_parse (file_meta, &file_info, &xattrs, error))
+    goto out;
+
+  if (data->content_path)
+    {
+      content_input = (GInputStream*)g_file_read (data->content_path, cancellable, error);
+      if (!content_input)
+        goto out;
+    }
+
+  if (!ostree_raw_file_to_content_stream (content_input, file_info, xattrs,
+                                          &file_object_input, &length,
+                                          cancellable, error))
+    goto out;
+
+  checksum = ostree_checksum_from_bytes (csum);
+
+  if (strcmp (checksum, data->checksum) != 0)
+    {
+      g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED,
+                   "Corrupted object %s (actual checksum is %s)",
+                   data->checksum, checksum);
+      goto out;
+    }
+
+  if (!ostree_repo_stage_file_object_trusted (data->pull_data->repo, checksum,
+                                              FALSE, file_object_input, length,
+                                              cancellable, error))
+    goto out;
+
+ out:
+  data->pull_data->outstanding_checksum_requests--;
+  check_outstanding_requests_handle_error (data->pull_data, local_error);
+  destroy_fetch_one_content_item_data (data);
+}
+
+static void
+enqueue_loose_meta_requests (OtPullData   *pull_data);
+
+static void
+content_fetch_on_complete (GObject        *object,
+                           GAsyncResult   *result,
+                           gpointer        user_data) 
+{
+  OtFetchOneContentItemData *data = user_data;
+  GError *local_error = NULL;
+  GError **error = &local_error;
+  GCancellable *cancellable = NULL;
+  gboolean was_content_fetch = FALSE;
+  gboolean need_content_fetch = FALSE;
+  ot_lvariant GVariant *file_meta = NULL;
+  ot_lobj GFileInfo *file_info = NULL;
+  ot_lobj GInputStream *content_input = NULL;
+  ot_lobj GInputStream *file_object_input = NULL;
+  ot_lvariant GVariant *xattrs = NULL;
+
+  was_content_fetch = data->fetching_content;
+
+  if (was_content_fetch)
+    {
+      data->content_path = ostree_fetcher_request_uri_finish ((OstreeFetcher*)object, result, error);
+      if (!data->content_path)
+        goto out;
+    }
+  else
+    {
+      data->meta_path = ostree_fetcher_request_uri_finish ((OstreeFetcher*)object, result, error);
+      if (!data->meta_path)
+        goto out;
+    }
+
+  if (!was_content_fetch)
+    {
+      if (!ot_util_variant_map (data->meta_path, OSTREE_FILE_HEADER_GVARIANT_FORMAT, FALSE,
+                                &file_meta, error))
+        goto out;
+
+      if (!ostree_file_header_parse (file_meta, &file_info, &xattrs, error))
+        goto out;
+
+      if (g_file_info_get_file_type (file_info) == G_FILE_TYPE_REGULAR)
+        {
+          ot_lfree char *content_path = ostree_get_relative_archive_content_path (data->checksum);
+          SoupURI *content_uri;
+
+          content_uri = suburi_new (data->pull_data->base_uri, content_path, NULL);
+
+          data->pull_data->outstanding_filecontent_requests++;
+          need_content_fetch = TRUE;
+          data->fetching_content = TRUE;
+
+          ostree_fetcher_request_uri_async (data->pull_data->fetcher, content_uri, cancellable,
+                                            content_fetch_on_complete, data);
+          soup_uri_free (content_uri);
+        }
+    }
+
+  if (!need_content_fetch)
+    {
+      if (data->content_path)
+        {
+          content_input = (GInputStream*)g_file_read (data->content_path, cancellable, error);
+          if (!content_input)
+            goto out;
+        }
+
+      if (file_meta == NULL)
+        {
+          if (!ot_util_variant_map (data->meta_path, OSTREE_FILE_HEADER_GVARIANT_FORMAT, FALSE,
+                                    &file_meta, error))
+            goto out;
+
+          if (!ostree_file_header_parse (file_meta, &file_info, &xattrs, error))
+            goto out;
+        }
+
+      if (!ostree_raw_file_to_content_stream (content_input, file_info, xattrs,
+                                              &file_object_input, NULL,
+                                              cancellable, error))
+        goto out;
+
+      data->pull_data->outstanding_checksum_requests++;
+      ot_gio_checksum_stream_async (file_object_input, G_PRIORITY_DEFAULT, NULL,
+                                    content_fetch_on_checksum_complete, data);
+    }
+
+ out:
+  if (was_content_fetch)
+    data->pull_data->outstanding_filecontent_requests--;
+  else
+    {
+      data->pull_data->outstanding_filemeta_requests--;
+      enqueue_loose_meta_requests (data->pull_data);
+    }
+  check_outstanding_requests_handle_error (data->pull_data, local_error);
+}
+
+static void
+enqueue_loose_meta_requests (OtPullData *pull_data)
+{
+  GHashTableIter hash_iter;
+  gpointer key, value;
+  GCancellable *cancellable = NULL;
+
+  g_hash_table_iter_init (&hash_iter, pull_data->loose_files);
+  while (g_hash_table_iter_next (&hash_iter, &key, &value))
+    {
+      const char *checksum = key;
+      ot_lfree char *objpath = NULL;
+      SoupURI *obj_uri = NULL;
+      OtFetchOneContentItemData *one_item_data;
+          
+      one_item_data = g_new0 (OtFetchOneContentItemData, 1);
+      one_item_data->pull_data = pull_data;
+      one_item_data->checksum = g_strdup (checksum);
+      one_item_data->fetching_content = FALSE;
+          
+      objpath = ostree_get_relative_object_path (checksum, OSTREE_OBJECT_TYPE_FILE);
+      obj_uri = suburi_new (pull_data->base_uri, objpath, NULL);
+
+      ostree_fetcher_request_uri_async (pull_data->fetcher, obj_uri, cancellable,
+                                        content_fetch_on_complete, one_item_data);
+      soup_uri_free (obj_uri);
+
+      pull_data->outstanding_filemeta_requests++;
+      g_hash_table_iter_remove (&hash_iter);
+
+      /* Don't let too many requests queue up; when we're fetching
+       * files we need to process the actual content.
+       */
+      if (pull_data->outstanding_filemeta_requests > 20)
+        break;
+    }
+}
+
 static gboolean
 fetch_content (OtPullData           *pull_data,
                GCancellable         *cancellable,
@@ -1015,51 +1284,16 @@ fetch_content (OtPullData           *pull_data,
   if (g_hash_table_size (loose_files) > 0)
     g_print ("Fetching %u loose objects\n",
              g_hash_table_size (loose_files));
-
-  g_hash_table_iter_init (&hash_iter, loose_files);
-  while (g_hash_table_iter_next (&hash_iter, &key, &value))
+  
+  pull_data->loose_files = loose_files;
+  
+  if (g_hash_table_size (loose_files) > 0)
     {
-      const char *checksum = key;
-      guint64 length;
-      ot_lobj GInputStream *file_object_input = NULL;
-      ot_lvariant GVariant *file_meta = NULL;
-      ot_lobj GFileInfo *file_info = NULL;
-      ot_lvariant GVariant *xattrs = NULL;
-      ot_lobj GInputStream *content_input = NULL;
-
-      if (!fetch_loose_object (pull_data, checksum, OSTREE_OBJECT_TYPE_FILE, &temp_path,
-                               cancellable, error))
-        goto out;
+      enqueue_loose_meta_requests (pull_data);
 
-      if (!ot_util_variant_map (temp_path, OSTREE_FILE_HEADER_GVARIANT_FORMAT, FALSE,
-                                &file_meta, error))
-        goto out;
+      run_mainloop_monitor_fetcher (pull_data);
 
-      if (!ostree_file_header_parse (file_meta, &file_info, &xattrs, error))
-        goto out;
-
-      if (g_file_info_get_file_type (file_info) == G_FILE_TYPE_REGULAR)
-        {
-          ot_lfree char *content_path = ostree_get_relative_archive_content_path (checksum);
-          content_uri = suburi_new (pull_data->base_uri, content_path, NULL);
-
-          if (!fetch_uri (pull_data, content_uri, "filecontent", &content_temp_path,
-                          cancellable, error))
-            goto out;
-
-          content_input = (GInputStream*)g_file_read (content_temp_path, cancellable, error);
-          if (!content_input)
-            goto out;
-        }
-
-      if (!ostree_raw_file_to_content_stream (content_input, file_info, xattrs,
-                                              &file_object_input, &length,
-                                              cancellable, error))
-        goto out;
-
-      if (!ostree_repo_stage_file_object (pull_data->repo, checksum,
-                                          file_object_input, length,
-                                          cancellable, error))
+      if (pull_data->caught_error)
         goto out;
     }
 
@@ -1203,6 +1437,9 @@ ostree_builtin_pull (int argc, char **argv, GFile *repo_path, GError **error)
   if (!ostree_repo_check (repo, error))
     goto out;
 
+  pull_data->async_error = error;
+  pull_data->loop = g_main_loop_new (NULL, FALSE);
+
   pull_data->repo = repo;
   pull_data->file_checksums_to_fetch = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, NULL);
 
@@ -1215,10 +1452,7 @@ ostree_builtin_pull (int argc, char **argv, GFile *repo_path, GError **error)
   pull_data->stdout_is_tty = isatty (1);
 
   pull_data->remote_name = g_strdup (argv[1]);
-  pull_data->session = soup_session_sync_new_with_options (SOUP_SESSION_USER_AGENT, "ostree ",
-                                                           SOUP_SESSION_ADD_FEATURE_BY_TYPE, SOUP_TYPE_COOKIE_JAR,
-                                                           NULL);
-
+  pull_data->fetcher = ostree_fetcher_new (ostree_repo_get_tmpdir (pull_data->repo));
   config = ostree_repo_get_config (repo);
 
   remote_key = g_strdup_printf ("remote \"%s\"", pull_data->remote_name);
@@ -1384,10 +1618,12 @@ ostree_builtin_pull (int argc, char **argv, GFile *repo_path, GError **error)
 
   ret = TRUE;
  out:
+  if (pull_data->loop)
+    g_main_loop_unref (pull_data->loop);
   g_strfreev (configured_branches);
   if (context)
     g_option_context_free (context);
-  g_clear_object (&pull_data->session);
+  g_clear_object (&pull_data->fetcher);
   g_free (pull_data->remote_name);
   if (pull_data->base_uri)
     soup_uri_free (pull_data->base_uri);



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]