[gnome-software/wip/jrocha/limit-parallel-ops: 3/4] Limit the number of certain operations running in parallel



commit ae857a909d06a29d9d68e6b5283f55f71baf656e
Author: Joaquim Rocha <jrocha endlessm com>
Date:   Fri Jan 19 11:28:06 2018 +0100

    Limit the number of certain operations running in parallel
    
    Operations like installation/update/upgrade-download can be resource
    intensive in slower machines, e.g. several installs at the same time may
    use memory or disk to a point where the whole system becomes unusable.
    This has been observed on machines that have 2GB of RAM (or less) and
    spinning disks.
    
    To avoid such situations, these changes limit the number of operations
    that can run in parallel by using a thread pool. The number of parallel
    operations allowed is in function of the RAM (one thread allowed per
    GB).

 lib/gs-plugin-loader.c          |  83 +++++++++++++++++++++++
 lib/gs-plugin-loader.h          |   3 +-
 lib/gs-utils.c                  |  11 +++
 lib/gs-utils.h                  |   1 +
 plugins/dummy/gs-plugin-dummy.c |   3 +-
 plugins/dummy/gs-self-test.c    | 144 +++++++++++++++++++++++++++++++++++++++-
 src/gs-details-page.c           |  62 +++++++++++++++++
 7 files changed, 304 insertions(+), 3 deletions(-)
---
diff --git a/lib/gs-plugin-loader.c b/lib/gs-plugin-loader.c
index 8ed0833d..5eae42bb 100644
--- a/lib/gs-plugin-loader.c
+++ b/lib/gs-plugin-loader.c
@@ -24,6 +24,7 @@
 #include <locale.h>
 #include <glib/gi18n.h>
 #include <appstream-glib.h>
+#include <math.h>
 
 #include "gs-app-private.h"
 #include "gs-app-list-private.h"
@@ -54,6 +55,8 @@ typedef struct
        GMutex                   pending_apps_mutex;
        GPtrArray               *pending_apps;
 
+       GThreadPool             *queued_ops_pool;
+
        GSettings               *settings;
 
        GMutex                   events_by_id_mutex;
@@ -72,6 +75,7 @@ typedef struct
 
 static void gs_plugin_loader_monitor_network (GsPluginLoader *plugin_loader);
 static void add_app_to_install_queue (GsPluginLoader *plugin_loader, GsApp *app);
+static void gs_plugin_loader_process_in_thread_pool_cb (gpointer data, gpointer user_data);
 
 G_DEFINE_TYPE_WITH_PRIVATE (GsPluginLoader, gs_plugin_loader, G_TYPE_OBJECT)
 
@@ -2692,6 +2696,12 @@ gs_plugin_loader_dispose (GObject *object)
                                             priv->network_changed_handler);
                priv->network_changed_handler = 0;
        }
+       if (priv->queued_ops_pool != NULL) {
+               /* stop accepting more requests and wait until any currently
+                * running ones are finished */
+               g_thread_pool_free (priv->queued_ops_pool, TRUE, TRUE);
+               priv->queued_ops_pool = NULL;
+       }
        g_clear_object (&priv->network_monitor);
        g_clear_object (&priv->soup_session);
        g_clear_object (&priv->profile);
@@ -2797,6 +2807,13 @@ gs_plugin_loader_settings_changed_cb (GSettings *settings,
                gs_plugin_loader_allow_updates_recheck (plugin_loader);
 }
 
+static gint
+get_max_parallel_ops (void)
+{
+       /* We're allowing 1 op per GB of memory */
+       return (gint) MAX (round((gdouble) gs_utils_get_memory_total () / 1024), 1.0);
+}
+
 static void
 gs_plugin_loader_init (GsPluginLoader *plugin_loader)
 {
@@ -2810,6 +2827,11 @@ gs_plugin_loader_init (GsPluginLoader *plugin_loader)
        priv->global_cache = gs_app_list_new ();
        priv->plugins = g_ptr_array_new_with_free_func ((GDestroyNotify) g_object_unref);
        priv->pending_apps = g_ptr_array_new_with_free_func ((GFreeFunc) g_object_unref);
+       priv->queued_ops_pool = g_thread_pool_new (gs_plugin_loader_process_in_thread_pool_cb,
+                                                  NULL,
+                                                  get_max_parallel_ops (),
+                                                  FALSE,
+                                                  NULL);
        priv->auth_array = g_ptr_array_new_with_free_func ((GFreeFunc) g_object_unref);
        priv->file_monitors = g_ptr_array_new_with_free_func ((GFreeFunc) g_object_unref);
        priv->locations = g_ptr_array_new_with_free_func (g_free);
@@ -3392,6 +3414,19 @@ gs_plugin_loader_process_thread_cb (GTask *task,
        g_task_return_pointer (task, g_object_ref (list), (GDestroyNotify) g_object_unref);
 }
 
+static void
+gs_plugin_loader_process_in_thread_pool_cb (gpointer data,
+                                           gpointer user_data)
+{
+       GTask *task = data;
+       gpointer source_object = g_task_get_source_object (task);
+       gpointer task_data = g_task_get_task_data (task);
+       GCancellable *cancellable = g_task_get_cancellable (task);
+
+       gs_plugin_loader_process_thread_cb (task, source_object, task_data, cancellable);
+       g_object_unref (task);
+}
+
 static gboolean
 gs_plugin_loader_job_timeout_cb (gpointer user_data)
 {
@@ -3415,6 +3450,22 @@ gs_plugin_loader_cancelled_cb (GCancellable *cancellable, GsPluginLoaderHelper *
        g_cancellable_cancel (helper->cancellable);
 }
 
+static void
+gs_plugin_loader_schedule_task (GsPluginLoader *plugin_loader,
+                               GTask *task)
+{
+       GsPluginLoaderHelper *helper = g_task_get_task_data (task);
+       GsPluginLoaderPrivate *priv = gs_plugin_loader_get_instance_private (plugin_loader);
+       GsApp *app = gs_plugin_job_get_app (helper->plugin_job);
+
+       if (app != NULL) {
+               /* set the pending-action to the app */
+               GsPluginAction action = gs_plugin_job_get_action (helper->plugin_job);
+               gs_app_set_pending_action (app, action);
+       }
+       g_thread_pool_push (priv->queued_ops_pool, g_object_ref (task), NULL);
+}
+
 /**
  * gs_plugin_loader_job_process_async:
  *
@@ -3615,6 +3666,18 @@ gs_plugin_loader_job_process_async (GsPluginLoader *plugin_loader,
                break;
        }
 
+       switch (action) {
+       case GS_PLUGIN_ACTION_INSTALL:
+       case GS_PLUGIN_ACTION_UPDATE:
+       case GS_PLUGIN_ACTION_UPGRADE_DOWNLOAD:
+               /* these actions must be performed by the thread pool because we
+                * want to limit the number of them running in parallel */
+               gs_plugin_loader_schedule_task (plugin_loader, task);
+               return;
+       default:
+               break;
+       }
+
        /* run in a thread */
        g_task_run_in_thread (task, gs_plugin_loader_process_thread_cb);
 }
@@ -3687,4 +3750,24 @@ gs_plugin_loader_get_profile (GsPluginLoader *plugin_loader)
        return priv->profile;
 }
 
+/**
+ * gs_plugin_loader_set_max_parallel_ops:
+ * @plugin_loader: a #GsPluginLoader
+ * @max_ops: the maximum number of parallel operations
+ *
+ * Sets the number of maximum number of queued operations (install/update/upgrade-download)
+ * to be processed at a time. If @max_ops is 0, then it will set the default maximum number.
+ */
+void
+gs_plugin_loader_set_max_parallel_ops (GsPluginLoader *plugin_loader,
+                                      guint max_ops)
+{
+       g_autoptr(GError) error = NULL;
+       GsPluginLoaderPrivate *priv = gs_plugin_loader_get_instance_private (plugin_loader);
+       if (max_ops == 0)
+               max_ops = get_max_parallel_ops ();
+       if (!g_thread_pool_set_max_threads (priv->queued_ops_pool, max_ops, &error))
+               g_warning ("Failed to set the maximum number of ops in parallel: %s",
+                          error->message);
+}
 /* vim: set noexpandtab: */
diff --git a/lib/gs-plugin-loader.h b/lib/gs-plugin-loader.h
index c4d9921b..81626bfc 100644
--- a/lib/gs-plugin-loader.h
+++ b/lib/gs-plugin-loader.h
@@ -109,7 +109,8 @@ void                 gs_plugin_loader_setup_again           (GsPluginLoader 
*plugin_loader);
 void            gs_plugin_loader_clear_caches          (GsPluginLoader *plugin_loader);
 GsPlugin       *gs_plugin_loader_find_plugin           (GsPluginLoader *plugin_loader,
                                                         const gchar    *plugin_name);
-
+void            gs_plugin_loader_set_max_parallel_ops  (GsPluginLoader *plugin_loader,
+                                                        guint           num_ops);
 
 G_END_DECLS
 
diff --git a/lib/gs-utils.c b/lib/gs-utils.c
index c1d86df1..9b754ed9 100644
--- a/lib/gs-utils.c
+++ b/lib/gs-utils.c
@@ -38,6 +38,7 @@
 #include <string.h>
 #include <glib/gstdio.h>
 #include <json-glib/json-glib.h>
+#include <sys/sysinfo.h>
 
 #ifdef HAVE_POLKIT
 #include <polkit/polkit.h>
@@ -50,6 +51,8 @@
 #define LOW_RESOLUTION_WIDTH  800
 #define LOW_RESOLUTION_HEIGHT 600
 
+#define MB_IN_BYTES (1024 * 1024)
+
 /**
  * gs_mkdir_parent:
  * @path: A full pathname
@@ -1029,4 +1032,12 @@ gs_utils_is_low_resolution (GtkWidget *toplevel)
        return geometry.width < LOW_RESOLUTION_WIDTH || geometry.height < LOW_RESOLUTION_HEIGHT;
 }
 
+guint
+gs_utils_get_memory_total (void)
+{
+       struct sysinfo si = { 0 };
+       sysinfo (&si);
+       return si.totalram / MB_IN_BYTES / si.mem_unit;
+}
+
 /* vim: set noexpandtab: */
diff --git a/lib/gs-utils.h b/lib/gs-utils.h
index 383fda96..2a048447 100644
--- a/lib/gs-utils.h
+++ b/lib/gs-utils.h
@@ -92,6 +92,7 @@ void           gs_utils_append_key_value      (GString        *str,
                                                 gsize           align_len,
                                                 const gchar    *key,
                                                 const gchar    *value);
+guint           gs_utils_get_memory_total      (void);
 
 G_END_DECLS
 
diff --git a/plugins/dummy/gs-plugin-dummy.c b/plugins/dummy/gs-plugin-dummy.c
index 6962852a..fbd322cf 100644
--- a/plugins/dummy/gs-plugin-dummy.c
+++ b/plugins/dummy/gs-plugin-dummy.c
@@ -548,7 +548,8 @@ gs_plugin_app_install (GsPlugin *plugin,
                return TRUE;
 
        /* install app */
-       if (g_strcmp0 (gs_app_get_id (app), "chiron.desktop") == 0) {
+       if (g_strcmp0 (gs_app_get_id (app), "chiron.desktop") == 0 ||
+           g_strcmp0 (gs_app_get_id (app), "zeus.desktop") == 0) {
                gs_app_set_state (app, AS_APP_STATE_INSTALLING);
                if (!gs_plugin_dummy_delay (plugin, app, 500, cancellable, error)) {
                        gs_app_set_state_recover (app);
diff --git a/plugins/dummy/gs-self-test.c b/plugins/dummy/gs-self-test.c
index a8ea380f..c2df107c 100644
--- a/plugins/dummy/gs-self-test.c
+++ b/plugins/dummy/gs-self-test.c
@@ -27,6 +27,29 @@
 
 static guint _status_changed_cnt = 0;
 
+typedef struct {
+       GError *error;
+       GMainLoop *loop;
+} GsDummyTestHelper;
+
+static GsDummyTestHelper *
+gs_dummy_test_helper_new (void)
+{
+        return g_new0 (GsDummyTestHelper, 1);
+}
+
+static void
+gs_dummy_test_helper_free (GsDummyTestHelper *helper)
+{
+       if (helper->error != NULL)
+               g_error_free (helper->error);
+       if (helper->loop != NULL)
+               g_main_loop_unref (helper->loop);
+       g_free (helper);
+}
+
+G_DEFINE_AUTOPTR_CLEANUP_FUNC(GsDummyTestHelper, gs_dummy_test_helper_free)
+
 static void
 gs_plugin_loader_status_changed_cb (GsPluginLoader *plugin_loader,
                                    GsApp *app,
@@ -673,6 +696,123 @@ gs_plugins_dummy_purchase_func (GsPluginLoader *plugin_loader)
        g_assert_cmpint (gs_app_get_state (app), ==, AS_APP_STATE_AVAILABLE);
 }
 
+static void
+plugin_job_action_cb (GObject *source,
+                     GAsyncResult *res,
+                     gpointer user_data)
+{
+      GsPluginLoader *plugin_loader = GS_PLUGIN_LOADER (source);
+      GsDummyTestHelper *helper = (GsDummyTestHelper *) user_data;
+
+      gs_plugin_loader_job_action_finish (plugin_loader, res, &helper->error);
+      if (helper->loop != NULL)
+              g_main_loop_quit (helper->loop);
+}
+
+static void
+gs_plugins_dummy_limit_parallel_ops_func (GsPluginLoader *plugin_loader)
+{
+       g_autoptr(GsAppList) list = NULL;
+        GsApp *app1 = NULL;
+       g_autoptr(GsApp) app2 = NULL;
+       g_autoptr(GsApp) app3 = NULL;
+       g_autoptr(GsPluginJob) plugin_job1 = NULL;
+       g_autoptr(GsPluginJob) plugin_job2 = NULL;
+       g_autoptr(GsPluginJob) plugin_job3 = NULL;
+       g_autoptr(GMainContext) context = NULL;
+       g_autoptr(GsDummyTestHelper) helper1 = gs_dummy_test_helper_new ();
+       g_autoptr(GsDummyTestHelper) helper2 = gs_dummy_test_helper_new ();
+       g_autoptr(GsDummyTestHelper) helper3 = gs_dummy_test_helper_new ();
+
+       /* drop all caches */
+       gs_plugin_loader_setup_again (plugin_loader);
+
+       /* get the updates list */
+       plugin_job1 = gs_plugin_job_newv (GS_PLUGIN_ACTION_GET_DISTRO_UPDATES, NULL);
+       list = gs_plugin_loader_job_process (plugin_loader, plugin_job1, NULL, &helper3->error);
+       gs_test_flush_main_context ();
+       g_assert_no_error (helper3->error);
+       g_assert (list != NULL);
+       g_assert_cmpint (gs_app_list_length (list), ==, 1);
+       app1 = gs_app_list_index (list, 0);
+       g_assert_cmpstr (gs_app_get_id (app1), ==, "org.fedoraproject.release-rawhide.upgrade");
+       g_assert_cmpint (gs_app_get_kind (app1), ==, AS_APP_KIND_OS_UPGRADE);
+       g_assert_cmpint (gs_app_get_state (app1), ==, AS_APP_STATE_AVAILABLE);
+
+       /* allow only one operation at a time */
+       gs_plugin_loader_set_max_parallel_ops (plugin_loader, 1);
+
+       app2 = gs_app_new ("chiron.desktop");
+       gs_app_set_management_plugin (app2, "dummy");
+       gs_app_set_state (app2, AS_APP_STATE_AVAILABLE);
+
+       /* use "proxy" prefix so the update function succeeds... */
+       app3 = gs_app_new ("proxy-zeus.desktop");
+       gs_app_set_management_plugin (app3, "dummy");
+       gs_app_set_state (app3, AS_APP_STATE_UPDATABLE_LIVE);
+
+       context = g_main_context_new ();
+       helper3->loop = g_main_loop_new (context, FALSE);
+       g_main_context_push_thread_default (context);
+
+       /* call a few operations at the "same time" */
+
+       /* download an upgrade */
+       g_object_unref (plugin_job1);
+       plugin_job1 = gs_plugin_job_newv (GS_PLUGIN_ACTION_UPGRADE_DOWNLOAD,
+                                         "app", app1,
+                                         NULL);
+       gs_plugin_loader_job_process_async (plugin_loader,
+                                           plugin_job1,
+                                           NULL,
+                                           plugin_job_action_cb,
+                                           helper1);
+
+       /* install an app */
+       plugin_job2 = gs_plugin_job_newv (GS_PLUGIN_ACTION_INSTALL,
+                                         "app", app2,
+                                         NULL);
+       gs_plugin_loader_job_process_async (plugin_loader,
+                                           plugin_job2,
+                                           NULL,
+                                           plugin_job_action_cb,
+                                           helper2);
+
+       /* update an app */
+       plugin_job3 = gs_plugin_job_newv (GS_PLUGIN_ACTION_UPDATE,
+                                         "app", app3,
+                                         NULL);
+       gs_plugin_loader_job_process_async (plugin_loader,
+                                           plugin_job3,
+                                           NULL,
+                                           plugin_job_action_cb,
+                                           helper3);
+
+       /* since we have only 1 parallel installation op possible,
+        * verify the last operations are pending */
+       g_assert_cmpint (gs_app_get_state (app2), ==, AS_APP_STATE_AVAILABLE);
+       g_assert_cmpint (gs_app_get_pending_action (app2), ==, GS_PLUGIN_ACTION_INSTALL);
+       g_assert_cmpint (gs_app_get_state (app3), ==, AS_APP_STATE_UPDATABLE_LIVE);
+       g_assert_cmpint (gs_app_get_pending_action (app3), ==, GS_PLUGIN_ACTION_UPDATE);
+
+       /* wait for the 2nd installation to finish, it means the 1st should have been
+        * finished too */
+       g_main_loop_run (helper3->loop);
+       g_main_context_pop_thread_default (context);
+
+       gs_test_flush_main_context ();
+       g_assert_no_error (helper1->error);
+       g_assert_no_error (helper2->error);
+       g_assert_no_error (helper3->error);
+
+       g_assert_cmpint (gs_app_get_state (app1), ==, AS_APP_STATE_UPDATABLE);
+       g_assert_cmpint (gs_app_get_state (app2), ==, AS_APP_STATE_INSTALLED);
+       g_assert_cmpint (gs_app_get_state (app3), ==, AS_APP_STATE_INSTALLED);
+
+       /* set the default max parallel ops */
+       gs_plugin_loader_set_max_parallel_ops (plugin_loader, 0);
+}
+
 int
 main (int argc, char **argv)
 {
@@ -823,7 +963,9 @@ main (int argc, char **argv)
        g_test_add_data_func ("/gnome-software/plugins/dummy/metadata-quirks",
                              plugin_loader,
                              (GTestDataFunc) gs_plugins_dummy_metadata_quirks);
-
+       g_test_add_data_func ("/gnome-software/plugins/dummy/limit-parallel-ops",
+                             plugin_loader,
+                             (GTestDataFunc) gs_plugins_dummy_limit_parallel_ops_func);
        return g_test_run ();
 }
 
diff --git a/src/gs-details-page.c b/src/gs-details-page.c
index b5f613c7..16b1688c 100644
--- a/src/gs-details-page.c
+++ b/src/gs-details-page.c
@@ -233,6 +233,19 @@ gs_details_page_update_shortcut_button (GsDetailsPage *self)
        }
 }
 
+static gboolean
+app_has_pending_action (GsApp *app)
+{
+       /* sanitize the pending state change by verifying we're in one of the
+        * expected states */
+       if (gs_app_get_state (app) != AS_APP_STATE_AVAILABLE &&
+           gs_app_get_state (app) != AS_APP_STATE_UPDATABLE_LIVE &&
+           gs_app_get_state (app) != AS_APP_STATE_UPDATABLE)
+               return FALSE;
+
+       return gs_app_get_pending_action (app) != GS_PLUGIN_ACTION_UNKNOWN;
+}
+
 static void
 gs_details_page_switch_to (GsPage *page, gboolean scroll_up)
 {
@@ -398,6 +411,12 @@ gs_details_page_switch_to (GsPage *page, gboolean scroll_up)
                }
        }
 
+       if (app_has_pending_action (self->app)) {
+               gtk_widget_set_visible (self->button_install, FALSE);
+               gtk_widget_set_visible (self->button_details_launch, FALSE);
+               gtk_widget_set_visible (self->button_remove, FALSE);
+       }
+
        adj = gtk_scrolled_window_get_vadjustment (GTK_SCROLLED_WINDOW (self->scrolledwindow_details));
        gtk_adjustment_set_value (adj, gtk_adjustment_get_lower (adj));
 
@@ -428,6 +447,12 @@ gs_details_page_refresh_progress (GsDetailsPage *self)
                gtk_widget_set_visible (self->button_cancel, FALSE);
                break;
        }
+       if (app_has_pending_action (self->app)) {
+               gtk_widget_set_visible (self->button_cancel, TRUE);
+               gtk_widget_set_sensitive (self->button_cancel,
+                                         !g_cancellable_is_cancelled (self->app_cancellable) &&
+                                         gs_app_get_allow_cancel (self->app));
+       }
 
        /* progress status label */
        switch (state) {
@@ -445,6 +470,28 @@ gs_details_page_refresh_progress (GsDetailsPage *self)
                gtk_widget_set_visible (self->label_progress_status, FALSE);
                break;
        }
+       if (app_has_pending_action (self->app)) {
+               GsPluginAction action = gs_app_get_pending_action (self->app);
+               gtk_widget_set_visible (self->label_progress_status, TRUE);
+               switch (action) {
+               case GS_PLUGIN_ACTION_INSTALL:
+                       /* TRANSLATORS: This is a label on top of the app's progress
+                        * bar to inform the user that the app should be installed soon */
+                       gtk_label_set_label (GTK_LABEL (self->label_progress_status),
+                                            _("Pending installation…"));
+                       break;
+               case GS_PLUGIN_ACTION_UPDATE:
+               case GS_PLUGIN_ACTION_UPGRADE_DOWNLOAD:
+                       /* TRANSLATORS: This is a label on top of the app's progress
+                        * bar to inform the user that the app should be updated soon */
+                       gtk_label_set_label (GTK_LABEL (self->label_progress_status),
+                                            _("Pending update…"));
+                       break;
+               default:
+                       gtk_widget_set_visible (self->label_progress_status, FALSE);
+                       break;
+               }
+       }
 
        /* percentage bar */
        switch (state) {
@@ -465,6 +512,10 @@ gs_details_page_refresh_progress (GsDetailsPage *self)
                gtk_widget_set_visible (self->progressbar_top, FALSE);
                break;
        }
+       if (app_has_pending_action (self->app)) {
+               gtk_widget_set_visible (self->progressbar_top, TRUE);
+               gtk_progress_bar_set_fraction (GTK_PROGRESS_BAR (self->progressbar_top), 0);
+       }
 
        /* spinner */
        switch (state) {
@@ -488,6 +539,8 @@ gs_details_page_refresh_progress (GsDetailsPage *self)
                gtk_widget_set_visible (self->box_progress, FALSE);
                break;
        }
+       if (app_has_pending_action (self->app))
+               gtk_widget_set_visible (self->box_progress, TRUE);
 }
 
 static gboolean
@@ -1572,6 +1625,9 @@ set_app (GsDetailsPage *self, GsApp *app)
        g_signal_connect_object (self->app, "notify::allow-cancel",
                                 G_CALLBACK (gs_details_page_allow_cancel_changed_cb),
                                 self, 0);
+       g_signal_connect_object (self->app, "notify::pending-action",
+                                G_CALLBACK (gs_details_page_notify_state_changed_cb),
+                                self, 0);
 
        /* print what we've got */
        tmp = gs_app_to_string (self->app);
@@ -1783,6 +1839,9 @@ gs_details_page_set_app (GsDetailsPage *self, GsApp *app)
        g_signal_connect_object (self->app, "notify::allow-cancel",
                                 G_CALLBACK (gs_details_page_allow_cancel_changed_cb),
                                 self, 0);
+       g_signal_connect_object (self->app, "notify::pending-action",
+                                G_CALLBACK (gs_details_page_notify_state_changed_cb),
+                                self, 0);
 
        g_set_object (&self->app_cancellable, gs_app_get_cancellable (self->app));
 
@@ -1814,6 +1873,9 @@ gs_details_page_app_cancel_button_cb (GtkWidget *widget, GsDetailsPage *self)
 {
        g_cancellable_cancel (self->app_cancellable);
        gtk_widget_set_sensitive (widget, FALSE);
+
+       /* reset the pending-action from the app if needed */
+       gs_app_set_pending_action (self->app, GS_PLUGIN_ACTION_UNKNOWN);
 }
 
 static void


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]