[gnome-software: 22/25] gs-plugin-job-refine: Make async internally

commit 1d2054d19631825e8119e709b396f409db122375
Author: Philip Withnall <pwithnall endlessos org>
Date:   Fri Feb 25 15:33:09 2022 +0000

    gs-plugin-job-refine: Make async internally
    Previously this was actually synchronous internally, as the changes to
    make it async were too big to do in the first round of porting.
    Signed-off-by: Philip Withnall <pwithnall endlessos org>
    Fixes: #1658

 lib/gs-plugin-job-refine.c | 338 ++++++++++++++++++++++++++++++++++++---------
 1 file changed, 275 insertions(+), 63 deletions(-)
diff --git a/lib/gs-plugin-job-refine.c b/lib/gs-plugin-job-refine.c
index 9e5bf7ca6..e8ac5d4df 100644
--- a/lib/gs-plugin-job-refine.c
+++ b/lib/gs-plugin-job-refine.c
@@ -32,6 +32,38 @@
  * gs_plugin_job_refine_get_result_list(). The #GsAppList which was passed
  * into the job will not be modified.
+ * Internally, the #GsPluginClass.refine_async() functions are called on all
+ * the plugins in parallel, and in parallel with a call to
+ * gs_odrs_provider_refine_async(). Once all of those calls are finished,
+ * zero or more recursive calls to run_refine_internal_async() are made in
+ * parallel to do a similar refine process on the addons, runtime and related
+ * components for all the components in the input #GsAppList. The refine job is
+ * complete once all these recursive calls complete.
+ *
+ * ```
+ *                                    run_async()
+ *                                         |
+ *                                         v
+ *           /-----------------------+-------------+----------------\
+ *           |                       |             |                |
+ * plugin->refine_async()  plugin->refine_async()  …  gs_odrs_provider_refine_async()
+ *           |                       |             |                |
+ *           v                       v             v                v
+ *           \-----------------------+-------------+----------------/
+ *                                         |
+ *                            finish_refine_internal_op()
+ *                                         |
+ *                                         v
+ *            /----------------------------+-----------------\
+ *            |                            |                 |
+ * run_refine_internal_async()  run_refine_internal_async()  …
+ *            |                            |                 |
+ *            v                            v                 v
+ *            \----------------------------+-----------------/
+ *                                         |
+ *                         finish_refine_internal_recursion()
+ * ```
+ *
  * See also: #GsPluginClass.refine_async
  * Since: 42
@@ -175,41 +207,88 @@ app_is_non_wildcard (GsApp *app, gpointer user_data)
 static void plugin_refine_cb (GObject      *source_object,
                               GAsyncResult *result,
                               gpointer      user_data);
+static void odrs_provider_refine_cb (GObject      *source_object,
+                                     GAsyncResult *result,
+                                     gpointer      user_data);
+static void finish_refine_internal_op (GTask  *task,
+                                       GError *error);
+static void recursive_internal_refine_cb (GObject      *source_object,
+                                          GAsyncResult *result,
+                                          gpointer      user_data);
+static void finish_refine_internal_recursion (GTask  *task,
+                                              GError *error);
+static gboolean run_refine_internal_finish (GsPluginJobRefine  *self,
+                                            GAsyncResult       *result,
+                                            GError            **error);
+typedef struct {
+       /* Input data. */
+       GsPluginLoader *plugin_loader;  /* (not nullable) (owned) */
+       GsAppList *list;  /* (not nullable) (owned) */
+       GsPluginRefineFlags flags;
+       /* In-progress data. */
+       guint n_pending_ops;
+       guint n_pending_recursions;
+       /* Output data. */
+       GError *error;  /* (nullable) (owned) */
+} RefineInternalData;
 static void
-plugin_refine_cb (GObject      *source_object,
-                  GAsyncResult *result,
-                  gpointer      user_data)
+refine_internal_data_free (RefineInternalData *data)
-       GAsyncResult **result_out = user_data;
+       g_clear_object (&data->plugin_loader);
+       g_clear_object (&data->list);
+       g_assert (data->n_pending_ops == 0);
+       g_assert (data->n_pending_recursions == 0);
+       /* If an error occurred, it should have been stolen to pass to
+        * g_task_return_error() by now. */
+       g_assert (data->error == NULL);
-       g_assert (*result_out == NULL);
-       *result_out = g_object_ref (result);
-       g_main_context_wakeup (g_main_context_get_thread_default ());
+       g_free (data);
-static gboolean
-run_refine_internal (GsPluginJobRefine    *self,
-                     GsPluginLoader       *plugin_loader,
-                     GsAppList            *list,
-                     GsPluginRefineFlags   flags,
-                     GCancellable         *cancellable,
-                     GError              **error)
+G_DEFINE_AUTOPTR_CLEANUP_FUNC (RefineInternalData, refine_internal_data_free)
+static void
+run_refine_internal_async (GsPluginJobRefine   *self,
+                           GsPluginLoader      *plugin_loader,
+                           GsAppList           *list,
+                           GsPluginRefineFlags  flags,
+                           GCancellable        *cancellable,
+                           GAsyncReadyCallback  callback,
+                           gpointer             user_data)
        GsOdrsProvider *odrs_provider;
        GsOdrsProviderRefineFlags odrs_refine_flags = 0;
        GPtrArray *plugins;  /* (element-type GsPlugin) */
+       g_autoptr(GTask) task = NULL;
+       RefineInternalData *data;
+       g_autoptr(RefineInternalData) data_owned = NULL;
+       task = g_task_new (self, cancellable, callback, user_data);
+       g_task_set_source_tag (task, run_refine_internal_async);
+       data = data_owned = g_new0 (RefineInternalData, 1);
+       data->plugin_loader = g_object_ref (plugin_loader);
+       data->list = g_object_ref (list);
+       data->flags = flags;
+       g_task_set_task_data (task, g_steal_pointer (&data_owned), (GDestroyNotify) 
        /* try to adopt each application with a plugin */
        gs_plugin_loader_run_adopt (plugin_loader, list);
+       data->n_pending_ops = 1;
        /* run each plugin */
        plugins = gs_plugin_loader_get_plugins (plugin_loader);
        for (guint i = 0; i < plugins->len; i++) {
                GsPlugin *plugin = g_ptr_array_index (plugins, i);
                GsPluginClass *plugin_class = GS_PLUGIN_GET_CLASS (plugin);
-               g_autoptr(GAsyncResult) refine_result = NULL;
                if (!gs_plugin_get_enabled (plugin))
@@ -217,18 +296,9 @@ run_refine_internal (GsPluginJobRefine    *self,
                /* run the batched plugin symbol */
+               data->n_pending_ops++;
                plugin_class->refine_async (plugin, list, flags,
-                                           cancellable, plugin_refine_cb, &refine_result);
-               /* FIXME: Make this sync until the calling function is rearranged
-                * to be async. */
-               while (refine_result == NULL)
-                       g_main_context_iteration (g_main_context_get_thread_default (), TRUE);
-               if (!plugin_class->refine_finish (plugin, refine_result, error))
-                       return FALSE;
-               gs_plugin_status_update (plugin, NULL, GS_PLUGIN_STATUS_FINISHED);
+                                           cancellable, plugin_refine_cb, g_object_ref (task));
        /* Add ODRS data if needed */
@@ -241,18 +311,79 @@ run_refine_internal (GsPluginJobRefine    *self,
                odrs_refine_flags |= GS_ODRS_PROVIDER_REFINE_FLAGS_GET_RATINGS;
        if (odrs_provider != NULL && odrs_refine_flags != 0) {
-               g_autoptr(GAsyncResult) odrs_refine_result = NULL;
+               data->n_pending_ops++;
                gs_odrs_provider_refine_async (odrs_provider, list, odrs_refine_flags,
-                                              cancellable, plugin_refine_cb, &odrs_refine_result);
+                                              cancellable, odrs_provider_refine_cb, g_object_ref (task));
+       }
+       finish_refine_internal_op (task, NULL);
+static void
+plugin_refine_cb (GObject      *source_object,
+                  GAsyncResult *result,
+                  gpointer      user_data)
+       GsPlugin *plugin = GS_PLUGIN (source_object);
+       g_autoptr(GTask) task = g_steal_pointer (&user_data);
+       GsPluginClass *plugin_class = GS_PLUGIN_GET_CLASS (plugin);
+       g_autoptr(GError) local_error = NULL;
+       if (!plugin_class->refine_finish (plugin, result, &local_error)) {
+               finish_refine_internal_op (task, g_steal_pointer (&local_error));
+               return;
+       }
+       gs_plugin_status_update (plugin, NULL, GS_PLUGIN_STATUS_FINISHED);
+       finish_refine_internal_op (task, NULL);
+static void
+odrs_provider_refine_cb (GObject      *source_object,
+                         GAsyncResult *result,
+                         gpointer      user_data)
+       GsOdrsProvider *odrs_provider = GS_ODRS_PROVIDER (source_object);
+       g_autoptr(GTask) task = g_steal_pointer (&user_data);
+       g_autoptr(GError) local_error = NULL;
+       gs_odrs_provider_refine_finish (odrs_provider, result, &local_error);
+       finish_refine_internal_op (task, g_steal_pointer (&local_error));
+/* @error is (transfer full) if non-NULL */
+static void
+finish_refine_internal_op (GTask  *task,
+                           GError *error)
+       GsPluginJobRefine *self = g_task_get_source_object (task);
+       GCancellable *cancellable = g_task_get_cancellable (task);
+       g_autoptr(GError) error_owned = g_steal_pointer (&error);
+       RefineInternalData *data = g_task_get_task_data (task);
+       GsPluginLoader *plugin_loader = data->plugin_loader;
+       GsAppList *list = data->list;
+       GsPluginRefineFlags flags = data->flags;
+       if (data->error == NULL && error_owned != NULL) {
+               data->error = g_steal_pointer (&error_owned);
+       } else if (error_owned != NULL) {
+               g_debug ("Additional error while refining: %s", error_owned->message);
+       }
+       g_assert (data->n_pending_ops > 0);
+       data->n_pending_ops--;
-               /* FIXME: Make this sync until the calling function is rearranged
-                * to be async. */
-               while (odrs_refine_result == NULL)
-                       g_main_context_iteration (g_main_context_get_thread_default (), TRUE);
+       if (data->n_pending_ops > 0)
+               return;
-               if (!gs_odrs_provider_refine_finish (odrs_provider, odrs_refine_result, error))
-                       return FALSE;
+       /* At this point, all the plugin->refine() calls are complete and the
+        * gs_odrs_provider_refine_async() call is also complete. If an error
+        * occurred during those calls, return with it now rather than
+        * proceeding to the recursive calls below. */
+       if (data->error != NULL) {
+               g_task_return_error (task, g_steal_pointer (&data->error));
+               return;
        /* filter any wildcard apps left in the list */
@@ -268,6 +399,10 @@ run_refine_internal (GsPluginJobRefine    *self,
+       /* Now run several recursive calls to run_refine_internal_async() in
+        * parallel, to refine related components. */
+       data->n_pending_recursions = 1;
        /* refine addons one layer deep */
                g_autoptr(GsAppList) addons_list = gs_app_list_new ();
@@ -290,11 +425,11 @@ run_refine_internal (GsPluginJobRefine    *self,
                if (gs_app_list_length (addons_list) > 0 && addons_flags != 0) {
-                       if (!run_refine_internal (self, plugin_loader,
-                                                 addons_list, addons_flags,
-                                                 cancellable, error)) {
-                               return FALSE;
-                       }
+                       data->n_pending_recursions++;
+                       run_refine_internal_async (self, plugin_loader,
+                                                  addons_list, addons_flags,
+                                                  cancellable, recursive_internal_refine_cb,
+                                                  g_object_ref (task));
@@ -314,11 +449,11 @@ run_refine_internal (GsPluginJobRefine    *self,
                if (gs_app_list_length (runtimes_list) > 0 && runtimes_flags != 0) {
-                       if (!run_refine_internal (self, plugin_loader,
-                                                 runtimes_list, runtimes_flags,
-                                                 cancellable, error)) {
-                               return FALSE;
-                       }
+                       data->n_pending_recursions++;
+                       run_refine_internal_async (self, plugin_loader,
+                                                  runtimes_list, runtimes_flags,
+                                                  cancellable, recursive_internal_refine_cb,
+                                                  g_object_ref (task));
@@ -342,16 +477,64 @@ run_refine_internal (GsPluginJobRefine    *self,
                if (gs_app_list_length (related_list) > 0 && related_flags != 0) {
-                       if (!run_refine_internal (self, plugin_loader,
-                                                 related_list, related_flags,
-                                                 cancellable, error)) {
-                               return FALSE;
-                       }
+                       data->n_pending_recursions++;
+                       run_refine_internal_async (self, plugin_loader,
+                                                  related_list, related_flags,
+                                                  cancellable, recursive_internal_refine_cb,
+                                                  g_object_ref (task));
-       /* success */
-       return TRUE;
+       finish_refine_internal_recursion (task, NULL);
+static void
+recursive_internal_refine_cb (GObject      *source_object,
+                              GAsyncResult *result,
+                              gpointer      user_data)
+       GsPluginJobRefine *self = GS_PLUGIN_JOB_REFINE (source_object);
+       g_autoptr(GTask) task = g_steal_pointer (&user_data);
+       g_autoptr(GError) local_error = NULL;
+       run_refine_internal_finish (self, result, &local_error);
+       finish_refine_internal_recursion (task, g_steal_pointer (&local_error));
+/* @error is (transfer full) if non-NULL */
+static void
+finish_refine_internal_recursion (GTask  *task,
+                                  GError *error)
+       g_autoptr(GError) error_owned = g_steal_pointer (&error);
+       RefineInternalData *data = g_task_get_task_data (task);
+       if (data->error == NULL && error_owned != NULL) {
+               data->error = g_steal_pointer (&error_owned);
+       } else if (error_owned != NULL) {
+               g_debug ("Additional error while refining: %s", error_owned->message);
+       }
+       g_assert (data->n_pending_recursions > 0);
+       data->n_pending_recursions--;
+       if (data->n_pending_recursions > 0)
+               return;
+       /* The entire refine operation (and all its sub-operations and
+        * recursions) is complete. */
+       if (data->error != NULL)
+               g_task_return_error (task, g_steal_pointer (&data->error));
+       else
+               g_task_return_boolean (task, TRUE);
+static gboolean
+run_refine_internal_finish (GsPluginJobRefine  *self,
+                            GAsyncResult       *result,
+                            GError            **error)
+       return g_task_propagate_boolean (G_TASK (result), error);
 static gboolean
@@ -363,6 +546,12 @@ app_thaw_notify_idle (gpointer data)
        return G_SOURCE_REMOVE;
+static void run_cb (GObject      *source_object,
+                    GAsyncResult *result,
+                    gpointer      user_data);
+static void finish_run (GTask     *task,
+                        GsAppList *result_list);
 static void
 gs_plugin_job_refine_run_async (GsPluginJob         *job,
                                 GsPluginLoader      *plugin_loader,
@@ -372,8 +561,6 @@ gs_plugin_job_refine_run_async (GsPluginJob         *job,
        GsPluginJobRefine *self = GS_PLUGIN_JOB_REFINE (job);
        g_autoptr(GTask) task = NULL;
-       g_autoptr(GError) local_error = NULL;
-       g_autofree gchar *job_debug = NULL;
        g_autoptr(GsAppList) result_list = NULL;
        /* check required args */
@@ -383,14 +570,15 @@ gs_plugin_job_refine_run_async (GsPluginJob         *job,
        /* Operate on a copy of the input list so we don’t modify it when
         * resolving wildcards. */
        result_list = gs_app_list_copy (self->app_list);
+       g_task_set_task_data (task, g_object_ref (result_list), (GDestroyNotify) g_object_unref);
        /* nothing to do */
-       if (self->flags == 0) {
-               g_debug ("no refine flags set for transaction");
-               goto results;
+       if (self->flags == 0 ||
+           gs_app_list_length (result_list) == 0) {
+               g_debug ("no refine flags set for transaction or app list is empty");
+               finish_run (task, result_list);
+               return;
-       if (gs_app_list_length (result_list) == 0)
-               goto results;
        /* freeze all apps */
        for (guint i = 0; i < gs_app_list_length (self->app_list); i++) {
@@ -398,8 +586,23 @@ gs_plugin_job_refine_run_async (GsPluginJob         *job,
                g_object_freeze_notify (G_OBJECT (app));
-       /* first pass */
-       if (run_refine_internal (self, plugin_loader, result_list, self->flags, cancellable, &local_error)) {
+       /* Start refining the apps. */
+       run_refine_internal_async (self, plugin_loader, result_list,
+                                  self->flags, cancellable,
+                                  run_cb, g_steal_pointer (&task));
+static void
+run_cb (GObject      *source_object,
+        GAsyncResult *result,
+        gpointer      user_data)
+       GsPluginJobRefine *self = GS_PLUGIN_JOB_REFINE (source_object);
+       g_autoptr(GTask) task = g_steal_pointer (&user_data);
+       GsAppList *result_list = g_task_get_task_data (task);
+       g_autoptr(GError) local_error = NULL;
+       if (run_refine_internal_finish (self, result, &local_error)) {
                /* remove any addons that have the same source as the parent app */
                for (guint i = 0; i < gs_app_list_length (result_list); i++) {
                        g_autoptr(GPtrArray) to_remove = g_ptr_array_new ();
@@ -443,7 +646,16 @@ gs_plugin_job_refine_run_async (GsPluginJob         *job,
+       finish_run (task, result_list);
+static void
+finish_run (GTask     *task,
+            GsAppList *result_list)
+       GsPluginJobRefine *self = g_task_get_source_object (task);
+       g_autofree gchar *job_debug = NULL;
        /* Internal calls to #GsPluginJobRefine may want to do their own
         * filtering, typically if the refine is being done as part of another
         * plugin job. If so, only filter to remove wildcards. Wildcards should
@@ -458,7 +670,7 @@ results:
                gs_app_list_filter (result_list, app_is_valid_filter, self);
        /* show elapsed time */
-       job_debug = gs_plugin_job_to_string (job);
+       job_debug = gs_plugin_job_to_string (GS_PLUGIN_JOB (self));
        g_debug ("%s", job_debug);
        /* success */

[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]