[gnome-software: 2/8] gs-plugin: Split out and refactor download code




commit 46080de16e379764785ac94ecf8569ffe636444e
Author: Philip Withnall <pwithnall endlessos org>
Date:   Tue Feb 1 19:13:48 2022 +0000

    gs-plugin: Split out and refactor download code
    
    Split out the bulk of the download code and rewrite it to be
    asynchronous. Keep a wrapper implementation in `gs-plugin.h` which
    binds the new implementation to the existing `gs_plugin_download_file()`
    and `gs_plugin_download_rewrite_resource()` APIs, until all callers of
    those APIs can be ported.
    
    This rewrite of the download code brings a few advantages:
     * Local and remote code paths are combined so they both get more
       testing.
     * The download is chunked so that the entire downloaded file isn’t
       allocated in memory near the end of the download. This significantly
       reduces the heap allocation sizes.
     * The code operates as a splice, so downloading to different kinds of
       `GOutputStream` is supported in the same code path. While
       `gs_plugin_download_data()` wasn’t actually used any more (and hence
       has been deleted), if it needs to be re-added in future, this can
       easily be done using a `GMemoryOutputStream`.
    
    Signed-off-by: Philip Withnall <pwithnall endlessos org>
    
    Helps: #1472

 lib/gs-download-utils.c | 494 ++++++++++++++++++++++++++++++++++++++++++++++++
 lib/gs-download-utils.h |  36 ++++
 lib/gs-plugin.c         | 376 +++++++-----------------------------
 lib/gs-plugin.h         |   5 -
 4 files changed, 596 insertions(+), 315 deletions(-)
---
diff --git a/lib/gs-download-utils.c b/lib/gs-download-utils.c
index a4e203755..54e057718 100644
--- a/lib/gs-download-utils.c
+++ b/lib/gs-download-utils.c
@@ -19,7 +19,10 @@
 
 #include "config.h"
 
+#include <gio/gio.h>
 #include <glib.h>
+#include <glib-object.h>
+#include <glib/gi18n.h>
 #include <libsoup/soup.h>
 
 #include "gs-download-utils.h"
@@ -45,3 +48,494 @@ gs_build_soup_session (void)
                                              "timeout", 10,
                                              NULL);
 }
+
+typedef struct {
+       /* Input data. */
+       gchar *uri;  /* (not nullable) (owned) */
+       GInputStream *input_stream;  /* (nullable) (owned) */
+       GOutputStream *output_stream;  /* (nullable) (owned) */
+       gsize buffer_size_bytes;
+       int io_priority;
+       GsDownloadProgressCallback progress_callback;  /* (nullable) */
+       gpointer progress_user_data;
+
+       /* In-progress state. */
+       SoupMessage *message;  /* (nullable) (owned) */
+       gboolean close_input_stream;
+       gboolean close_output_stream;
+       gboolean discard_output_stream;
+       gsize total_read_bytes;
+       gsize total_written_bytes;
+       gsize expected_stream_size_bytes;
+       GBytes *currently_unwritten_chunk;  /* (nullable) (owned) */
+
+       /* Output data. */
+       gchar *new_etag;  /* (nullable) (owned) */
+       GError *error;  /* (nullable) (owned) */
+} DownloadData;
+
+static void
+download_data_free (DownloadData *data)
+{
+       g_assert (data->input_stream == NULL || g_input_stream_is_closed (data->input_stream));
+       g_assert (data->output_stream == NULL || g_output_stream_is_closed (data->output_stream));
+
+       g_assert (data->currently_unwritten_chunk == NULL || data->error != NULL);
+
+       g_clear_object (&data->input_stream);
+       g_clear_object (&data->output_stream);
+
+       g_clear_object (&data->message);
+       g_clear_pointer (&data->uri, g_free);
+       g_clear_pointer (&data->new_etag, g_free);
+       g_clear_pointer (&data->currently_unwritten_chunk, g_bytes_unref);
+       g_clear_error (&data->error);
+
+       g_free (data);
+}
+
+G_DEFINE_AUTOPTR_CLEANUP_FUNC (DownloadData, download_data_free)
+
+static void open_input_stream_cb (GObject      *source_object,
+                                  GAsyncResult *result,
+                                  gpointer      user_data);
+static void read_bytes_cb (GObject      *source_object,
+                           GAsyncResult *result,
+                           gpointer      user_data);
+static void write_bytes_cb (GObject      *source_object,
+                            GAsyncResult *result,
+                            gpointer      user_data);
+static void finish_download (GTask  *task,
+                             GError *error);
+static void close_stream_cb (GObject      *source_object,
+                             GAsyncResult *result,
+                             gpointer      user_data);
+static void download_progress (GTask *task);
+
+/**
+ * gs_download_stream_async:
+ * @soup_session: a #SoupSession
+ * @uri: (not nullable): the URI to download
+ * @output_stream: (not nullable): an output stream to write the download to
+ * @last_etag: (nullable): the last-known ETag of the URI, or %NULL if unknown
+ * @io_priority: I/O priority to download and write at
+ * @progress_callback: (nullable): callback to call with progress information
+ * @progress_user_data: (nullable) (closure progress_callback): data to pass
+ *   to @progress_callback
+ * @cancellable: (nullable): a #GCancellable, or %NULL
+ * @callback: callback to call once the operation is complete
+ * @user_data: (closure callback): data to pass to @callback
+ *
+ * Download @uri and write it to @output_stream asynchronously.
+ *
+ * If @last_etag is non-%NULL, it will be sent to the server, which may return
+ * a ‘not modified’ response. If so, @output_stream will not be written to, and
+ * will be closed with a cancelled close operation. This will ensure that the
+ * existing content of the output stream (if it’s a file, for example) will not
+ * be overwritten.
+ *
+ * If specified, @progress_callback will be called zero or more times until
+ * @callback is called, providing progress updates on the download.
+ *
+ * Since: 42
+ */
+void
+gs_download_stream_async (SoupSession                *soup_session,
+                          const gchar                *uri,
+                          GOutputStream              *output_stream,
+                          const gchar                *last_etag,
+                          int                         io_priority,
+                          GsDownloadProgressCallback  progress_callback,
+                          gpointer                    progress_user_data,
+                          GCancellable               *cancellable,
+                          GAsyncReadyCallback         callback,
+                          gpointer                    user_data)
+{
+       g_autoptr(GTask) task = NULL;
+       g_autoptr(GError) local_error = NULL;
+       g_autoptr(SoupMessage) msg = NULL;
+       DownloadData *data;
+       g_autoptr(DownloadData) data_owned = NULL;
+
+       g_return_if_fail (SOUP_IS_SESSION (soup_session));
+       g_return_if_fail (uri != NULL);
+       g_return_if_fail (G_IS_OUTPUT_STREAM (output_stream));
+       g_return_if_fail (cancellable == NULL || G_IS_CANCELLABLE (cancellable));
+
+       task = g_task_new (soup_session, cancellable, callback, user_data);
+       g_task_set_source_tag (task, gs_download_stream_async);
+
+       data = data_owned = g_new0 (DownloadData, 1);
+       data->uri = g_strdup (uri);
+       data->output_stream = g_object_ref (output_stream);
+       data->close_output_stream = TRUE;
+       data->buffer_size_bytes = 8192;  /* arbitrarily chosen */
+       data->io_priority = io_priority;
+       data->progress_callback = progress_callback;
+       data->progress_user_data = progress_user_data;
+
+       g_task_set_task_data (task, g_steal_pointer (&data_owned), (GDestroyNotify) download_data_free);
+
+       /* local */
+       if (g_str_has_prefix (uri, "file://")) {
+               g_autoptr(GFile) local_file = g_file_new_for_path (uri + strlen ("file://"));
+               g_file_read_async (local_file, io_priority, cancellable, open_input_stream_cb, 
g_steal_pointer (&task));
+               return;
+       }
+
+       /* remote */
+       g_debug ("Downloading %s to %s", uri, G_OBJECT_TYPE_NAME (output_stream));
+       msg = soup_message_new (SOUP_METHOD_GET, uri);
+       if (msg == NULL) {
+               finish_download (task,
+                                g_error_new (G_IO_ERROR,
+                                             G_IO_ERROR_INVALID_ARGUMENT,
+                                             "Failed to parse URI ‘%s’", uri));
+               return;
+       }
+
+       data->message = g_object_ref (msg);
+
+       if (last_etag != NULL && *last_etag != '\0') {
+#if SOUP_CHECK_VERSION(3, 0, 0)
+               soup_message_headers_append (soup_message_get_request_headers (msg), "If-None-Match", 
last_etag);
+#else
+               soup_message_headers_append (msg->request_headers, "If-None-Match", last_etag);
+#endif
+       }
+
+#if SOUP_CHECK_VERSION(3, 0, 0)
+       soup_session_send_async (soup_session, msg, data->io_priority, cancellable, open_input_stream_cb, 
g_steal_pointer (&task));
+#else
+       soup_session_send_async (soup_session, msg, cancellable, open_input_stream_cb, g_steal_pointer 
(&task));
+#endif
+}
+
+static void
+open_input_stream_cb (GObject      *source_object,
+                      GAsyncResult *result,
+                      gpointer      user_data)
+{
+       g_autoptr(GTask) task = g_steal_pointer (&user_data);
+       DownloadData *data = g_task_get_task_data (task);
+       GCancellable *cancellable = g_task_get_cancellable (task);
+       g_autoptr(GInputStream) input_stream = NULL;
+       g_autoptr(GError) local_error = NULL;
+
+       /* This function can be called as a result of either reading a local
+        * file, or sending an HTTP request, so @source_object’s type can vary. */
+       if (G_IS_FILE (source_object)) {
+               GFile *local_file = G_FILE (source_object);
+
+               /* Local file. */
+               input_stream = G_INPUT_STREAM (g_file_read_finish (local_file, result, &local_error));
+
+               if (input_stream == NULL) {
+                       g_prefix_error (&local_error, "Failed to read ‘%s’: ",
+                                       g_file_peek_path (local_file));
+                       finish_download (task, g_steal_pointer (&local_error));
+                       return;
+               }
+
+               g_assert (data->input_stream == NULL);
+               data->input_stream = g_object_ref (input_stream);
+               data->close_input_stream = TRUE;
+       } else if (SOUP_IS_SESSION (source_object)) {
+               SoupSession *soup_session = SOUP_SESSION (source_object);
+               guint status_code;
+               const gchar *new_etag;
+
+               /* HTTP request. */
+#if SOUP_CHECK_VERSION(3, 0, 0)
+               input_stream = soup_session_send_finish (soup_session, result, &local_error);
+               status_code = soup_message_get_status (data->message);
+#else
+               input_stream = soup_session_send_finish (soup_session, result, &local_error);
+               status_code = data->message->status_code;
+#endif
+
+               if (input_stream != NULL) {
+                       g_assert (data->input_stream == NULL);
+                       data->input_stream = g_object_ref (input_stream);
+                       data->close_input_stream = TRUE;
+               }
+
+               if (status_code == SOUP_STATUS_NOT_MODIFIED) {
+                       /* If the file has not been modified from the ETag we
+                        * have, finish the download early. Ensure to close the
+                        * output stream so that its existing content is *not*
+                        * overwritten. */
+                       data->discard_output_stream = TRUE;
+                       finish_download (task, NULL);
+                       return;
+               } else if (status_code != SOUP_STATUS_OK) {
+                       g_autoptr(GString) str = g_string_new (NULL);
+                       g_string_append (str, soup_status_get_phrase (status_code));
+
+                       if (local_error != NULL) {
+                               g_string_append (str, ": ");
+                               g_string_append (str, local_error->message);
+                       }
+
+                       finish_download (task,
+                                        g_error_new (G_IO_ERROR,
+                                                     G_IO_ERROR_FAILED,
+                                                     "Failed to download ‘%s’: %s",
+                                                     data->uri, str->str));
+                       return;
+               }
+
+               g_assert (input_stream != NULL);
+
+               /* Get the expected download size. */
+#if SOUP_CHECK_VERSION(3, 0, 0)
+               data->expected_stream_size_bytes = soup_message_headers_get_content_length 
(soup_message_get_response_headers (data->message));
+#else
+               data->expected_stream_size_bytes = soup_message_headers_get_content_length 
(data->message->response_headers);
+#endif
+
+               /* Store the new ETag for later use. */
+#if SOUP_CHECK_VERSION(3, 0, 0)
+               new_etag = soup_message_headers_get_one (soup_message_get_response_headers (data->message), 
"ETag");
+#else
+               new_etag = soup_message_headers_get_one (data->message->response_headers, "ETag");
+#endif
+               if (new_etag != NULL && *new_etag == '\0')
+                       new_etag = NULL;
+               data->new_etag = g_strdup (new_etag);
+       } else {
+               g_assert_not_reached ();
+       }
+
+       /* Splice in an asynchronous loop. We unfortunately can’t use
+        * g_output_stream_splice_async() here, as it doesn’t provide a progress
+        * callback. The approach is the same though. */
+       g_input_stream_read_bytes_async (input_stream, data->buffer_size_bytes, data->io_priority,
+                                        cancellable, read_bytes_cb, g_steal_pointer (&task));
+}
+
+static void
+read_bytes_cb (GObject      *source_object,
+               GAsyncResult *result,
+               gpointer      user_data)
+{
+       GInputStream *input_stream = G_INPUT_STREAM (source_object);
+       g_autoptr(GTask) task = g_steal_pointer (&user_data);
+       DownloadData *data = g_task_get_task_data (task);
+       GCancellable *cancellable = g_task_get_cancellable (task);
+       g_autoptr(GBytes) bytes = NULL;
+       g_autoptr(GError) local_error = NULL;
+
+       bytes = g_input_stream_read_bytes_finish (input_stream, result, &local_error);
+
+       if (bytes == NULL) {
+               finish_download (task, g_steal_pointer (&local_error));
+               return;
+       }
+
+       /* Report progress. */
+       data->total_read_bytes += g_bytes_get_size (bytes);
+       data->expected_stream_size_bytes = MAX (data->expected_stream_size_bytes, data->total_read_bytes);
+       download_progress (task);
+
+       /* Write the downloaded data. */
+       if (g_bytes_get_size (bytes) > 0) {
+               g_clear_pointer (&data->currently_unwritten_chunk, g_bytes_unref);
+               data->currently_unwritten_chunk = g_bytes_ref (bytes);
+
+               g_output_stream_write_bytes_async (data->output_stream, bytes, data->io_priority,
+                                                  cancellable, write_bytes_cb, g_steal_pointer (&task));
+       } else {
+               finish_download (task, NULL);
+       }
+}
+
+static void
+write_bytes_cb (GObject      *source_object,
+                GAsyncResult *result,
+                gpointer      user_data)
+{
+       GOutputStream *output_stream = G_OUTPUT_STREAM (source_object);
+       g_autoptr(GTask) task = g_steal_pointer (&user_data);
+       DownloadData *data = g_task_get_task_data (task);
+       GCancellable *cancellable = g_task_get_cancellable (task);
+       gssize bytes_written_signed;
+       gsize bytes_written;
+       g_autoptr(GError) local_error = NULL;
+
+       bytes_written_signed = g_output_stream_write_bytes_finish (output_stream, result, &local_error);
+
+       if (bytes_written_signed < 0) {
+               finish_download (task, g_steal_pointer (&local_error));
+               return;
+       }
+
+       /* We know this is non-negative now. */
+       bytes_written = (gsize) bytes_written_signed;
+
+       /* Report progress. */
+       data->total_written_bytes += bytes_written;
+       download_progress (task);
+
+       g_assert (data->currently_unwritten_chunk != NULL);
+
+       if (bytes_written < g_bytes_get_size (data->currently_unwritten_chunk)) {
+               /* Partial write; try again with the remaining bytes. */
+               g_autoptr(GBytes) sub_bytes = g_bytes_new_from_bytes (data->currently_unwritten_chunk, 
bytes_written, g_bytes_get_size (data->currently_unwritten_chunk) - bytes_written);
+               g_assert (bytes_written > 0);
+
+               g_clear_pointer (&data->currently_unwritten_chunk, g_bytes_unref);
+               data->currently_unwritten_chunk = g_bytes_ref (sub_bytes);
+
+               g_output_stream_write_bytes_async (output_stream, sub_bytes, data->io_priority,
+                                                  cancellable, write_bytes_cb, g_steal_pointer (&task));
+       } else {
+               /* Full write succeeded. Start the next read. */
+               g_clear_pointer (&data->currently_unwritten_chunk, g_bytes_unref);
+
+               g_input_stream_read_bytes_async (data->input_stream, data->buffer_size_bytes, 
data->io_priority,
+                                                cancellable, read_bytes_cb, g_steal_pointer (&task));
+       }
+}
+
+/* error is (transfer full) */
+static void
+finish_download (GTask  *task,
+                 GError *error)
+{
+       DownloadData *data = g_task_get_task_data (task);
+       GCancellable *cancellable = g_task_get_cancellable (task);
+
+       /* Final progress update. */
+       if (error == NULL) {
+               data->expected_stream_size_bytes = data->total_read_bytes;
+               download_progress (task);
+       }
+
+       /* Record the error from the operation, if set. */
+       g_assert (data->error == NULL);
+       data->error = g_steal_pointer (&error);
+
+       g_assert (!data->discard_output_stream || data->close_output_stream);
+
+       if (data->close_output_stream) {
+               g_autoptr(GCancellable) output_cancellable = NULL;
+
+               g_assert (data->output_stream != NULL);
+
+               /* If there’s been a prior error, or we are aborting writing the
+                * output stream (perhaps because of a cache hit), close the
+                * output stream but cancel the close operation so that the old
+                * output file is not overwritten. */
+               if (data->error != NULL || data->discard_output_stream) {
+                       output_cancellable = g_cancellable_new ();
+                       g_cancellable_cancel (output_cancellable);
+               } else if (g_task_get_cancellable (task) != NULL) {
+                       output_cancellable = g_object_ref (g_task_get_cancellable (task));
+               }
+
+               g_output_stream_close_async (data->output_stream, data->io_priority, output_cancellable, 
close_stream_cb, g_object_ref (task));
+       }
+
+       if (data->close_input_stream && data->input_stream != NULL) {
+               g_input_stream_close_async (data->input_stream, data->io_priority, cancellable, 
close_stream_cb, g_object_ref (task));
+       }
+
+       /* Check in case both streams are already closed. */
+       close_stream_cb (NULL, NULL, g_object_ref (task));
+}
+
+static void
+close_stream_cb (GObject      *source_object,
+                 GAsyncResult *result,
+                 gpointer      user_data)
+{
+       g_autoptr(GTask) task = g_steal_pointer (&user_data);
+       DownloadData *data = g_task_get_task_data (task);
+       g_autoptr(GError) local_error = NULL;
+
+       if (G_IS_INPUT_STREAM (source_object)) {
+               /* Errors in closing the input stream are not fatal. */
+               if (!g_input_stream_close_finish (G_INPUT_STREAM (source_object),
+                                                 result, &local_error))
+                       g_debug ("Error closing input stream: %s", local_error->message);
+               g_clear_error (&local_error);
+
+               data->close_input_stream = FALSE;
+       } else if (G_IS_OUTPUT_STREAM (source_object)) {
+               /* Errors in closing the output stream are fatal, but don’t
+                * overwrite errors set earlier in the operation. */
+               if (!g_output_stream_close_finish (G_OUTPUT_STREAM (source_object),
+                                                  result, &local_error)) {
+                       if (data->error == NULL)
+                               data->error = g_steal_pointer (&local_error);
+                       else if (!g_error_matches (local_error, G_IO_ERROR, G_IO_ERROR_CANCELLED))
+                               g_debug ("Error closing output stream: %s", local_error->message);
+               }
+               g_clear_error (&local_error);
+
+               data->close_output_stream = FALSE;
+               data->discard_output_stream = FALSE;
+       } else {
+               /* finish_download() calls this with a NULL source_object */
+       }
+
+       /* Still waiting for one of the streams to close? */
+       if (data->close_input_stream || data->close_output_stream)
+               return;
+
+       if (data->error != NULL) {
+               g_task_return_error (task, g_error_copy (data->error));
+       } else {
+               g_task_return_boolean (task, TRUE);
+       }
+}
+
+static void
+download_progress (GTask *task)
+{
+       DownloadData *data = g_task_get_task_data (task);
+
+       if (data->progress_callback != NULL) {
+               /* This should be guaranteed by the rest of the download code. */
+               g_assert (data->expected_stream_size_bytes >= data->total_written_bytes);
+
+               data->progress_callback (data->total_written_bytes, data->expected_stream_size_bytes,
+                                        data->progress_user_data);
+       }
+}
+
+/**
+ * gs_download_stream_finish:
+ * @soup_session: a #SoupSession
+ * @result: result of the asynchronous operation
+ * @new_etag_out: (out callee-allocates) (transfer full) (optional) (nullable):
+ *   return location for the ETag of the downloaded file (which may be %NULL),
+ *   or %NULL to ignore it
+ * @error: return location for a #GError
+ *
+ * Finish an asynchronous download operation started with
+ * gs_download_stream_async().
+ *
+ * Returns: %TRUE on success, %FALSE otherwise
+ * Since: 42
+ */
+gboolean
+gs_download_stream_finish (SoupSession   *soup_session,
+                           GAsyncResult  *result,
+                           gchar        **new_etag_out,
+                           GError       **error)
+{
+       DownloadData *data;
+
+       g_return_val_if_fail (g_task_is_valid (result, soup_session), FALSE);
+       g_return_val_if_fail (g_task_get_source_tag (G_TASK (result)) == gs_download_stream_async, FALSE);
+       g_return_val_if_fail (error == NULL || *error == NULL, FALSE);
+
+       data = g_task_get_task_data (G_TASK (result));
+
+       if (new_etag_out != NULL)
+               *new_etag_out = g_strdup (data->new_etag);
+
+       return g_task_propagate_boolean (G_TASK (result), error);
+}
diff --git a/lib/gs-download-utils.h b/lib/gs-download-utils.h
index e634ace78..1bbfc1eba 100644
--- a/lib/gs-download-utils.h
+++ b/lib/gs-download-utils.h
@@ -19,4 +19,40 @@ G_BEGIN_DECLS
 
 SoupSession *gs_build_soup_session (void);
 
+/**
+ * GsDownloadProgressCallback:
+ * @bytes_downloaded: number of bytes downloaded so far
+ * @total_download_size: the total size of the download, in bytes
+ * @user_data: data passed to the calling function
+ *
+ * A progress callback to indicate how far a download has progressed.
+ *
+ * @total_download_size may be zero (for example, at the start of the download),
+ * so implementations of this callback must be careful to avoid division by zero
+ * errors.
+ *
+ * @total_download_size is guaranteed to always be greater than or equal to
+ * @bytes_downloaded.
+ *
+ * Since: 42
+ */
+typedef void (*GsDownloadProgressCallback) (gsize    bytes_downloaded,
+                                            gsize    total_download_size,
+                                            gpointer user_data);
+
+void           gs_download_stream_async        (SoupSession                *soup_session,
+                                                const gchar                *uri,
+                                                GOutputStream              *output_stream,
+                                                const gchar                *last_etag,
+                                                int                         io_priority,
+                                                GsDownloadProgressCallback  progress_callback,
+                                                gpointer                    progress_user_data,
+                                                GCancellable               *cancellable,
+                                                GAsyncReadyCallback         callback,
+                                                gpointer                    user_data);
+gboolean       gs_download_stream_finish       (SoupSession   *soup_session,
+                                                GAsyncResult  *result,
+                                                gchar        **new_etag_out,
+                                                GError       **error);
+
 G_END_DECLS
diff --git a/lib/gs-plugin.c b/lib/gs-plugin.c
index 48a2ab26f..8d2c6a178 100644
--- a/lib/gs-plugin.c
+++ b/lib/gs-plugin.c
@@ -41,6 +41,7 @@
 #endif
 
 #include "gs-app-list-private.h"
+#include "gs-download-utils.h"
 #include "gs-enums.h"
 #include "gs-os-release.h"
 #include "gs-plugin-private.h"
@@ -916,215 +917,42 @@ gs_plugin_reload (GsPlugin *plugin)
        g_idle_add (gs_plugin_reload_cb, plugin);
 }
 
-#if SOUP_CHECK_VERSION(3, 0, 0)
-static GBytes * /* (transfer full) */
-gs_plugin_download_with_progress (GsPlugin *plugin,
-                                 GsApp *app,
-                                 SoupMessage *msg,
-                                 GInputStream *stream,
-                                 GCancellable *cancellable,
-                                 GError **error)
-{
-       g_autoptr(GByteArray) byte_array = NULL;
-       gsize nread, total_read, expected_length;
-       guint8 buffer[16384];
-       gboolean success = FALSE;
-
-       if (stream == NULL || !SOUP_STATUS_IS_SUCCESSFUL (soup_message_get_status (msg)) ||
-           g_cancellable_is_cancelled (cancellable))
-               return NULL;
-
-       byte_array = g_byte_array_new ();
-
-       total_read = 0;
-       expected_length = soup_message_headers_get_content_length (soup_message_get_response_headers (msg));
-
-       while (g_input_stream_read_all (stream, buffer, sizeof (buffer), &nread, cancellable, error)) {
-               if (!nread) {
-                       success = TRUE;
-                       break;
-               }
-               g_byte_array_append (byte_array, buffer, nread);
-               total_read += nread;
-               if (app != NULL && expected_length > 0) {
-                       /* calculate percentage */
-                       guint percentage = (guint) ((100 * total_read) / expected_length);
-                       g_debug ("%s progress: %u%%", gs_app_get_id (app), percentage);
-                       gs_app_set_progress (app, percentage);
-                       gs_plugin_status_update (plugin, app, GS_PLUGIN_STATUS_DOWNLOADING);
-               }
-               if (nread < sizeof (buffer)) {
-                       success = TRUE;
-                       break;
-               }
-       }
-
-       if (success) {
-               GBytes *bytes = g_byte_array_free_to_bytes (byte_array);
-               byte_array = NULL;
-               return bytes;
-       }
-
-       return NULL;
-}
-#else
 typedef struct {
        GsPlugin        *plugin;
        GsApp           *app;
-       GCancellable    *cancellable;
 } GsPluginDownloadHelper;
 
 static void
-gs_plugin_download_chunk_cb (SoupMessage *msg, SoupBuffer *chunk,
-                            GsPluginDownloadHelper *helper)
+download_file_progress_cb (gsize    total_written_bytes,
+                           gsize    total_download_size,
+                           gpointer user_data)
 {
-       GsPluginPrivate *priv = gs_plugin_get_instance_private (helper->plugin);
+       GsPluginDownloadHelper *helper = user_data;
        guint percentage;
-       goffset header_size;
-       goffset body_length;
-
-       /* cancelled? */
-       if (g_cancellable_is_cancelled (helper->cancellable)) {
-               g_debug ("cancelling download of %s",
-                        gs_app_get_id (helper->app));
-               soup_session_cancel_message (priv->soup_session,
-                                            msg,
-                                            SOUP_STATUS_CANCELLED);
-               return;
-       }
 
-       /* if it's returning "Found" or an error, ignore the percentage */
-       if (msg->status_code != SOUP_STATUS_OK) {
-               g_debug ("ignoring status code %u (%s)",
-                        msg->status_code, msg->reason_phrase);
-               return;
-       }
-
-       /* get data */
-       body_length = msg->response_body->length;
-       header_size = soup_message_headers_get_content_length (msg->response_headers);
-
-       /* size is not known */
-       if (header_size < body_length)
-               return;
+       if (total_download_size > 0)
+               percentage = (guint) ((100 * total_written_bytes) / total_download_size);
+       else
+               percentage = 0;
 
-       /* calculate percentage */
-       percentage = (guint) ((100 * body_length) / header_size);
        g_debug ("%s progress: %u%%", gs_app_get_id (helper->app), percentage);
        gs_app_set_progress (helper->app, percentage);
        gs_plugin_status_update (helper->plugin,
                                 helper->app,
                                 GS_PLUGIN_STATUS_DOWNLOADING);
+
 }
-#endif
 
-/**
- * gs_plugin_download_data:
- * @plugin: a #GsPlugin
- * @app: a #GsApp, or %NULL
- * @uri: a remote URI
- * @cancellable: a #GCancellable, or %NULL
- * @error: a #GError, or %NULL
- *
- * Downloads data.
- *
- * Returns: the downloaded data, or %NULL
- *
- * Since: 3.22
- **/
-GBytes *
-gs_plugin_download_data (GsPlugin *plugin,
-                        GsApp *app,
-                        const gchar *uri,
-                        GCancellable *cancellable,
-                        GError **error)
+static void
+async_result_cb (GObject      *source_object,
+                 GAsyncResult *result,
+                 gpointer      user_data)
 {
-       GsPluginPrivate *priv = gs_plugin_get_instance_private (plugin);
-#if SOUP_CHECK_VERSION(3, 0, 0)
-       g_autoptr(GInputStream) stream = NULL;
-       g_autoptr(GError) error_local = NULL;
-       GBytes *bytes;
-#else
-       GsPluginDownloadHelper helper;
-       guint status_code;
-#endif
-       g_autoptr(SoupMessage) msg = NULL;
+       GAsyncResult **result_out = user_data;
 
-       g_return_val_if_fail (GS_IS_PLUGIN (plugin), NULL);
-       g_return_val_if_fail (uri != NULL, NULL);
-       g_return_val_if_fail (error == NULL || *error == NULL, NULL);
-
-       /* local */
-       if (g_str_has_prefix (uri, "file://")) {
-               gsize length = 0;
-               g_autofree gchar *contents = NULL;
-#if !SOUP_CHECK_VERSION(3, 0, 0)
-               g_autoptr(GError) error_local = NULL;
-#endif
-               g_debug ("copying %s from plugin %s", uri, priv->name);
-               if (!g_file_get_contents (uri + 7, &contents, &length, &error_local)) {
-                       g_set_error (error,
-                                    GS_PLUGIN_ERROR,
-                                    GS_PLUGIN_ERROR_DOWNLOAD_FAILED,
-                                    "failed to copy %s: %s",
-                                    uri, error_local->message);
-                       return NULL;
-               }
-               return g_bytes_new (contents, length);
-       }
-
-       /* remote */
-       g_debug ("downloading %s from plugin %s", uri, priv->name);
-       msg = soup_message_new (SOUP_METHOD_GET, uri);
-       if (msg == NULL) {
-               g_set_error (error,
-                            GS_PLUGIN_ERROR,
-                            GS_PLUGIN_ERROR_DOWNLOAD_FAILED,
-                            "failed to parse URI %s", uri);
-               return NULL;
-       }
-#if SOUP_CHECK_VERSION(3, 0, 0)
-       stream = soup_session_send (priv->soup_session, msg, cancellable, &error_local);
-       bytes = gs_plugin_download_with_progress (plugin, app, msg, stream, cancellable, &error_local);
-       if (bytes == NULL) {
-               if (g_error_matches (error_local, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
-                       g_propagate_error (error, error_local);
-               } else {
-                       g_set_error (error,
-                                    GS_PLUGIN_ERROR,
-                                    GS_PLUGIN_ERROR_DOWNLOAD_FAILED,
-                                    "failed to download %s: %s",
-                                    uri, error_local ? error_local->message : "Unknown error");
-               }
-       }
-       return bytes;
-#else
-       if (app != NULL) {
-               helper.plugin = plugin;
-               helper.app = app;
-               helper.cancellable = cancellable;
-               g_signal_connect (msg, "got-chunk",
-                                 G_CALLBACK (gs_plugin_download_chunk_cb),
-                                 &helper);
-       }
-       status_code = soup_session_send_message (priv->soup_session, msg);
-       if (status_code != SOUP_STATUS_OK) {
-               g_autoptr(GString) str = g_string_new (NULL);
-               g_string_append (str, soup_status_get_phrase (status_code));
-               if (msg->response_body->data != NULL) {
-                       g_string_append (str, ": ");
-                       g_string_append (str, msg->response_body->data);
-               }
-               g_set_error (error,
-                            GS_PLUGIN_ERROR,
-                            GS_PLUGIN_ERROR_DOWNLOAD_FAILED,
-                            "failed to download %s: %s",
-                            uri, str->str);
-               return NULL;
-       }
-       return g_bytes_new (msg->response_body->data,
-                           (gsize) msg->response_body->length);
-#endif
+       g_assert (*result_out == NULL);
+       *result_out = g_object_ref (result);
+       g_main_context_wakeup (g_main_context_get_thread_default ());
 }
 
 /**
@@ -1150,134 +978,62 @@ gs_plugin_download_file (GsPlugin *plugin,
                         GCancellable *cancellable,
                         GError **error)
 {
-       GsPluginPrivate *priv = gs_plugin_get_instance_private (plugin);
-#if SOUP_CHECK_VERSION(3, 0, 0)
-       g_autoptr(GInputStream) stream = NULL;
-       g_autoptr(GBytes) bytes = NULL;
-#else
+       g_autoptr(SoupSession) soup_session = NULL;
+       g_autoptr(GFile) output_file = NULL;
+       g_autoptr(GFileOutputStream) output_stream = NULL;
+       g_autofree gchar *last_etag = NULL;
+       g_autoptr(GAsyncResult) result = NULL;
+       g_autoptr(GMainContext) context = g_main_context_new ();
+       g_autoptr(GMainContextPusher) context_pusher = g_main_context_pusher_new (context);
        GsPluginDownloadHelper helper;
-#endif
-       const gchar *new_etag;
-       guint status_code;
-       gconstpointer downloaded_data = NULL;
-       gsize downloaded_data_length = 0;
-       g_autoptr(GError) error_local = NULL;
-       g_autoptr(SoupMessage) msg = NULL;
-       g_autoptr(GFile) file = NULL;
+       g_autofree gchar *new_etag = NULL;
+       g_autoptr(GError) local_error = NULL;
 
-       g_return_val_if_fail (GS_IS_PLUGIN (plugin), FALSE);
-       g_return_val_if_fail (uri != NULL, FALSE);
-       g_return_val_if_fail (filename != NULL, FALSE);
-       g_return_val_if_fail (error == NULL || *error == NULL, FALSE);
-
-       file = g_file_new_for_path (filename);
-
-       /* local */
-       if (g_str_has_prefix (uri, "file://")) {
-               gsize length = 0;
-               g_autofree gchar *contents = NULL;
-               g_debug ("copying %s from plugin %s", uri, priv->name);
-               if (!g_file_get_contents (uri + 7, &contents, &length, &error_local)) {
-                       g_set_error (error,
-                                    GS_PLUGIN_ERROR,
-                                    GS_PLUGIN_ERROR_DOWNLOAD_FAILED,
-                                    "failed to copy %s: %s",
-                                    uri, error_local->message);
-                       return FALSE;
-               }
-               if (!g_file_set_contents (filename, contents, length, &error_local)) {
-                       g_set_error (error,
-                                    GS_PLUGIN_ERROR,
-                                    GS_PLUGIN_ERROR_WRITE_FAILED,
-                                    "Failed to save file: %s",
-                                    error_local->message);
-                       return FALSE;
-               }
-               return TRUE;
-       }
+       helper.plugin = plugin;
+       helper.app = app;
 
-       /* remote */
-       g_debug ("downloading %s to %s from plugin %s", uri, filename, priv->name);
-       msg = soup_message_new (SOUP_METHOD_GET, uri);
-       if (msg == NULL) {
-               g_set_error (error,
-                            GS_PLUGIN_ERROR,
-                            GS_PLUGIN_ERROR_DOWNLOAD_FAILED,
-                            "failed to parse URI %s", uri);
+       soup_session = gs_build_soup_session ();
+
+       /* Create the destination file’s directory. */
+       if (!gs_mkdir_parent (filename, error))
                return FALSE;
-       }
-       if (g_file_test (filename, G_FILE_TEST_EXISTS)) {
-               g_autofree gchar *last_etag = gs_utils_get_file_etag (file, cancellable);
-               if (last_etag != NULL && *last_etag != '\0') {
-#if SOUP_CHECK_VERSION(3, 0, 0)
-                       soup_message_headers_append (soup_message_get_request_headers (msg), "If-None-Match", 
last_etag);
-#else
-                       soup_message_headers_append (msg->request_headers, "If-None-Match", last_etag);
-#endif
-               }
-       }
-#if SOUP_CHECK_VERSION(3, 0, 0)
-       stream = soup_session_send (priv->soup_session, msg, cancellable, &error_local);
-       bytes = gs_plugin_download_with_progress (plugin, app, msg, stream, cancellable, &error_local);
-       if (bytes != NULL)
-               downloaded_data = g_bytes_get_data (bytes, &downloaded_data_length);
-       status_code = soup_message_get_status (msg);
-#else
-       if (app != NULL) {
-               helper.plugin = plugin;
-               helper.app = app;
-               helper.cancellable = cancellable;
-               g_signal_connect (msg, "got-chunk",
-                                 G_CALLBACK (gs_plugin_download_chunk_cb),
-                                 &helper);
-       }
-       status_code = soup_session_send_message (priv->soup_session, msg);
-       downloaded_data = msg->response_body ? msg->response_body->data : NULL;
-       downloaded_data_length = msg->response_body ? msg->response_body->length : 0;
-#endif
-       if (status_code == SOUP_STATUS_NOT_MODIFIED)
-               return TRUE;
-       if (status_code != SOUP_STATUS_OK) {
-               g_autoptr(GString) str = g_string_new (NULL);
-               g_string_append (str, soup_status_get_phrase (status_code));
-#if SOUP_CHECK_VERSION(3, 0, 0)
-               if (error_local != NULL) {
-                       g_string_append (str, ": ");
-                       g_string_append (str, error_local->message);
-               }
-#endif
-               if (downloaded_data != NULL && downloaded_data_length > 0) {
-                       g_string_append (str, ": ");
-                       g_string_append_len (str, downloaded_data, downloaded_data_length);
-               }
-               g_set_error (error,
-                            GS_PLUGIN_ERROR,
-                            GS_PLUGIN_ERROR_DOWNLOAD_FAILED,
-                            "failed to download %s: %s",
-                            uri, str->str);
+
+       /* Query the old ETag if the file already exists. */
+       last_etag = gs_utils_get_file_etag (output_file, cancellable);
+
+       /* Create the output file. */
+       output_stream = g_file_replace (output_file, last_etag, FALSE,
+                                       G_FILE_CREATE_PRIVATE | G_FILE_CREATE_REPLACE_DESTINATION,
+                                       cancellable, &local_error);
+
+       if (output_stream == NULL) {
+               g_set_error_literal (error,
+                                    GS_PLUGIN_ERROR,
+                                    GS_PLUGIN_ERROR_DOWNLOAD_FAILED,
+                                    local_error->message);
                return FALSE;
        }
-#if SOUP_CHECK_VERSION(3, 0, 0)
-       g_clear_error (&error_local);
-#endif
-       if (!gs_mkdir_parent (filename, error))
-               return FALSE;
-       if (!g_file_set_contents (filename, downloaded_data, downloaded_data_length, &error_local)) {
-               g_set_error (error,
-                            GS_PLUGIN_ERROR,
-                            GS_PLUGIN_ERROR_WRITE_FAILED,
-                            "Failed to save file: %s",
-                            error_local->message);
+
+       /* Do the download. */
+       gs_download_stream_async (soup_session, uri, G_OUTPUT_STREAM (output_stream),
+                                 last_etag, G_PRIORITY_LOW,
+                                 download_file_progress_cb, &helper,
+                                 cancellable, async_result_cb, &result);
+
+       while (result == NULL)
+               g_main_context_iteration (context, TRUE);
+
+       if (!gs_download_stream_finish (soup_session, result, &new_etag, &local_error)) {
+               g_set_error_literal (error,
+                                    GS_PLUGIN_ERROR,
+                                    GS_PLUGIN_ERROR_DOWNLOAD_FAILED,
+                                    local_error->message);
                return FALSE;
        }
-#if SOUP_CHECK_VERSION(3, 0, 0)
-       new_etag = soup_message_headers_get_one (soup_message_get_response_headers (msg), "ETag");
-#else
-       new_etag = soup_message_headers_get_one (msg->response_headers, "ETag");
-#endif
-       if (new_etag != NULL && *new_etag == '\0')
-               new_etag = NULL;
-       gs_utils_set_file_etag (file, new_etag, cancellable);
+
+       /* Update the ETag. */
+       gs_utils_set_file_etag (output_file, new_etag, cancellable);
+
        return TRUE;
 }
 
diff --git a/lib/gs-plugin.h b/lib/gs-plugin.h
index 500e3171a..4eadb8ee6 100644
--- a/lib/gs-plugin.h
+++ b/lib/gs-plugin.h
@@ -152,11 +152,6 @@ void                gs_plugin_add_rule                     (GsPlugin       *plugin,
                                                         const gchar    *name);
 
 /* helpers */
-GBytes         *gs_plugin_download_data                (GsPlugin       *plugin,
-                                                        GsApp          *app,
-                                                        const gchar    *uri,
-                                                        GCancellable   *cancellable,
-                                                        GError         **error);
 gboolean        gs_plugin_download_file                (GsPlugin       *plugin,
                                                         GsApp          *app,
                                                         const gchar    *uri,


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]