[rhythmbox] rhythmdb: limit the number of outstanding entry adds for an import job
- From: Jonathan Matthew <jmatthew src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [rhythmbox] rhythmdb: limit the number of outstanding entry adds for an import job
- Date: Sun, 23 Sep 2012 05:30:38 +0000 (UTC)
commit 3b260f0c2976015e9f6684bd5d290912b88d526f
Author: Jonathan Matthew <jonathan d14n org>
Date: Sun Sep 23 15:10:21 2012 +1000
rhythmdb: limit the number of outstanding entry adds for an import job
Rather than feeding everything into the rhythmdb action queue
immediately, limit the number of URIs in flight, This makes it
possible to cancel an import job and wait for any outstanding
operations to finish in a reasonable amount of time, rather than
minutes or more for large imports, and it doesn't appear to
impact overall time taken. It may reduce peak memory usage too.
rhythmdb/rhythmdb-import-job.c | 108 ++++++++++++++++++++++++---------------
1 files changed, 66 insertions(+), 42 deletions(-)
---
diff --git a/rhythmdb/rhythmdb-import-job.c b/rhythmdb/rhythmdb-import-job.c
index ccef14c..6b7dbcc 100644
--- a/rhythmdb/rhythmdb-import-job.c
+++ b/rhythmdb/rhythmdb-import-job.c
@@ -36,6 +36,14 @@
#include "rb-debug.h"
#include "rb-missing-plugins.h"
+/* maximum number of new URIs in the rhythmdb action queue.
+ * entries bounce around between different threads and processes a bit,
+ * so having multiple in flight should help. we also want to be able to
+ * cancel import jobs quickly. since we can't remove things from the
+ * action queue, having fewer entries helps.
+ */
+#define PROCESSING_LIMIT 20
+
enum
{
PROP_0,
@@ -64,7 +72,8 @@ struct _RhythmDBImportJobPrivate
int total;
int imported;
int processed;
- GHashTable *outstanding;
+ GQueue *outstanding;
+ GQueue *processing;
RhythmDB *db;
RhythmDBEntryType *entry_type;
RhythmDBEntryType *ignore_type;
@@ -145,10 +154,35 @@ rhythmdb_import_job_add_uri (RhythmDBImportJob *job, const char *uri)
g_mutex_unlock (&job->priv->lock);
}
+/* must be called with lock held */
+static void
+maybe_start_more (RhythmDBImportJob *job)
+{
+ if (g_cancellable_is_cancelled (job->priv->cancel)) {
+ return;
+ }
+
+ while (g_queue_get_length (job->priv->processing) < PROCESSING_LIMIT) {
+ char *uri;
+
+ uri = g_queue_pop_head (job->priv->outstanding);
+ if (uri == NULL) {
+ return;
+ }
+
+ g_queue_push_tail (job->priv->processing, uri);
+
+ rhythmdb_add_uri_with_types (job->priv->db,
+ uri,
+ job->priv->entry_type,
+ job->priv->ignore_type,
+ job->priv->error_type);
+ }
+}
+
static void
missing_plugins_retry_cb (gpointer instance, gboolean installed, RhythmDBImportJob *job)
{
- GSList *retry = NULL;
GSList *i;
g_mutex_lock (&job->priv->lock);
@@ -172,25 +206,14 @@ missing_plugins_retry_cb (gpointer instance, gboolean installed, RhythmDBImportJ
uri = rhythmdb_entry_dup_string (entry, RHYTHMDB_PROP_LOCATION);
rhythmdb_entry_delete (job->priv->db, entry);
- g_hash_table_insert (job->priv->outstanding, g_strdup (uri), GINT_TO_POINTER (1));
- retry = g_slist_prepend (retry, uri);
+ g_queue_push_tail (job->priv->outstanding, g_strdup (uri));
}
rhythmdb_commit (job->priv->db);
- retry = g_slist_reverse (retry);
}
- g_mutex_unlock (&job->priv->lock);
-
- for (i = retry; i != NULL; i = i->next) {
- char *uri = (char *)i->data;
- rhythmdb_add_uri_with_types (job->priv->db,
- uri,
- job->priv->entry_type,
- job->priv->ignore_type,
- job->priv->error_type);
- }
+ maybe_start_more (job);
- rb_slist_deep_free (retry);
+ g_mutex_unlock (&job->priv->lock);
}
static gboolean
@@ -252,6 +275,10 @@ emit_status_changed (RhythmDBImportJob *job)
rb_debug ("emitting job complete");
g_signal_emit (job, signals[COMPLETE], 0, job->priv->total);
}
+ } else if (g_cancellable_is_cancelled (job->priv->cancel) &&
+ g_queue_is_empty (job->priv->processing)) {
+ rb_debug ("cancelled job has no processing entries, emitting complete");
+ g_signal_emit (job, signals[COMPLETE], 0, job->priv->total);
}
g_mutex_unlock (&job->priv->lock);
g_object_unref (job);
@@ -263,7 +290,6 @@ static void
uri_recurse_func (GFile *file, gboolean dir, RhythmDBImportJob *job)
{
RhythmDBEntry *entry;
- gboolean add_uri;
char *uri;
if (dir) {
@@ -275,43 +301,36 @@ uri_recurse_func (GFile *file, gboolean dir, RhythmDBImportJob *job)
uri = g_file_get_uri (file);
- /* only add the entry to the map of entries we're waiting for
- * if it's not already in the db.
- */
+ /* if it's not already in the db, add it to the list of things to process */
entry = rhythmdb_entry_lookup_by_location (job->priv->db, uri);
if (entry == NULL) {
rb_debug ("waiting for entry %s", uri);
g_mutex_lock (&job->priv->lock);
job->priv->total++;
- g_hash_table_insert (job->priv->outstanding, g_strdup (uri), GINT_TO_POINTER (1));
+ g_queue_push_tail (job->priv->outstanding, g_strdup (uri));
if (job->priv->status_changed_id == 0) {
job->priv->status_changed_id = g_idle_add ((GSourceFunc) emit_status_changed, job);
}
- g_mutex_unlock (&job->priv->lock);
+ maybe_start_more (job);
- add_uri = TRUE;
+ g_mutex_unlock (&job->priv->lock);
} else {
/* skip it if it's a different entry type */
RhythmDBEntryType *et;
et = rhythmdb_entry_get_entry_type (entry);
- if (et != job->priv->entry_type &&
- et != job->priv->ignore_type &&
- et != job->priv->error_type) {
- add_uri = FALSE;
- } else {
- add_uri = TRUE;
+ if (et == job->priv->entry_type ||
+ et == job->priv->ignore_type ||
+ et == job->priv->error_type) {
+ rhythmdb_add_uri_with_types (job->priv->db,
+ uri,
+ job->priv->entry_type,
+ job->priv->ignore_type,
+ job->priv->error_type);
}
}
- if (add_uri) {
- rhythmdb_add_uri_with_types (job->priv->db,
- uri,
- job->priv->entry_type,
- job->priv->ignore_type,
- job->priv->error_type);
- }
g_free (uri);
}
@@ -468,14 +487,14 @@ entry_added_cb (RhythmDB *db,
RhythmDBImportJob *job)
{
const char *uri;
- gboolean ours;
+ GList *link;
uri = rhythmdb_entry_get_string (entry, RHYTHMDB_PROP_LOCATION);
g_mutex_lock (&job->priv->lock);
- ours = g_hash_table_remove (job->priv->outstanding, uri);
+ link = g_queue_find_custom (job->priv->processing, uri, (GCompareFunc) g_strcmp0);
- if (ours) {
+ if (link != NULL) {
const char *details;
RhythmDBEntryType *entry_type;
@@ -500,6 +519,9 @@ entry_added_cb (RhythmDB *db,
if (job->priv->status_changed_id == 0) {
job->priv->status_changed_id = g_idle_add ((GSourceFunc) emit_status_changed, job);
}
+
+ g_queue_delete_link (job->priv->processing, link);
+ maybe_start_more (job);
}
g_mutex_unlock (&job->priv->lock);
}
@@ -512,7 +534,8 @@ rhythmdb_import_job_init (RhythmDBImportJob *job)
RhythmDBImportJobPrivate);
g_mutex_init (&job->priv->lock);
- job->priv->outstanding = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, NULL);
+ job->priv->outstanding = g_queue_new ();
+ job->priv->processing = g_queue_new ();
job->priv->cancel = g_cancellable_new ();
}
@@ -596,8 +619,9 @@ static void
impl_finalize (GObject *object)
{
RhythmDBImportJob *job = RHYTHMDB_IMPORT_JOB (object);
-
- g_hash_table_destroy (job->priv->outstanding);
+
+ g_queue_free_full (job->priv->outstanding, g_free);
+ g_queue_free_full (job->priv->processing, g_free);
rb_slist_deep_free (job->priv->uri_list);
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]