[rhythmbox] rhythmdb: limit the number of outstanding entry adds for an import job



commit 3b260f0c2976015e9f6684bd5d290912b88d526f
Author: Jonathan Matthew <jonathan d14n org>
Date:   Sun Sep 23 15:10:21 2012 +1000

    rhythmdb: limit the number of outstanding entry adds for an import job
    
    Rather than feeding everything into the rhythmdb action queue
    immediately, limit the number of URIs in flight,  This makes it
    possible to cancel an import job and wait for any outstanding
    operations to finish in a reasonable amount of time, rather than
    minutes or more for large imports, and it doesn't appear to
    impact overall time taken.  It may reduce peak memory usage too.

 rhythmdb/rhythmdb-import-job.c |  108 ++++++++++++++++++++++++---------------
 1 files changed, 66 insertions(+), 42 deletions(-)
---
diff --git a/rhythmdb/rhythmdb-import-job.c b/rhythmdb/rhythmdb-import-job.c
index ccef14c..6b7dbcc 100644
--- a/rhythmdb/rhythmdb-import-job.c
+++ b/rhythmdb/rhythmdb-import-job.c
@@ -36,6 +36,14 @@
 #include "rb-debug.h"
 #include "rb-missing-plugins.h"
 
+/* maximum number of new URIs in the rhythmdb action queue.
+ * entries bounce around between different threads and processes a bit,
+ * so having multiple in flight should help.  we also want to be able to
+ * cancel import jobs quickly.  since we can't remove things from the
+ * action queue, having fewer entries helps.
+ */
+#define PROCESSING_LIMIT		20
+
 enum
 {
 	PROP_0,
@@ -64,7 +72,8 @@ struct _RhythmDBImportJobPrivate
 	int		total;
 	int		imported;
 	int		processed;
-	GHashTable	*outstanding;
+	GQueue		*outstanding;
+	GQueue		*processing;
 	RhythmDB	*db;
 	RhythmDBEntryType *entry_type;
 	RhythmDBEntryType *ignore_type;
@@ -145,10 +154,35 @@ rhythmdb_import_job_add_uri (RhythmDBImportJob *job, const char *uri)
 	g_mutex_unlock (&job->priv->lock);
 }
 
+/* must be called with lock held */
+static void
+maybe_start_more (RhythmDBImportJob *job)
+{
+	if (g_cancellable_is_cancelled (job->priv->cancel)) {
+		return;
+	}
+
+	while (g_queue_get_length (job->priv->processing) < PROCESSING_LIMIT) {
+		char *uri;
+
+		uri = g_queue_pop_head (job->priv->outstanding);
+		if (uri == NULL) {
+			return;
+		}
+
+		g_queue_push_tail (job->priv->processing, uri);
+
+		rhythmdb_add_uri_with_types (job->priv->db,
+					     uri,
+					     job->priv->entry_type,
+					     job->priv->ignore_type,
+					     job->priv->error_type);
+	}
+}
+
 static void
 missing_plugins_retry_cb (gpointer instance, gboolean installed, RhythmDBImportJob *job)
 {
-	GSList *retry = NULL;
 	GSList *i;
 
 	g_mutex_lock (&job->priv->lock);
@@ -172,25 +206,14 @@ missing_plugins_retry_cb (gpointer instance, gboolean installed, RhythmDBImportJ
 			uri = rhythmdb_entry_dup_string (entry, RHYTHMDB_PROP_LOCATION);
 			rhythmdb_entry_delete (job->priv->db, entry);
 
-			g_hash_table_insert (job->priv->outstanding, g_strdup (uri), GINT_TO_POINTER (1));
-			retry = g_slist_prepend (retry, uri);
+			g_queue_push_tail (job->priv->outstanding, g_strdup (uri));
 		}
 		rhythmdb_commit (job->priv->db);
-		retry = g_slist_reverse (retry);
 	}
-	g_mutex_unlock (&job->priv->lock);
-
-	for (i = retry; i != NULL; i = i->next) {
-		char *uri = (char *)i->data;
 
-		rhythmdb_add_uri_with_types (job->priv->db,
-					     uri,
-					     job->priv->entry_type,
-					     job->priv->ignore_type,
-					     job->priv->error_type);
-	}
+	maybe_start_more (job);
 
-	rb_slist_deep_free (retry);
+	g_mutex_unlock (&job->priv->lock);
 }
 
 static gboolean
@@ -252,6 +275,10 @@ emit_status_changed (RhythmDBImportJob *job)
 			rb_debug ("emitting job complete");
 			g_signal_emit (job, signals[COMPLETE], 0, job->priv->total);
 		}
+	} else if (g_cancellable_is_cancelled (job->priv->cancel) &&
+		   g_queue_is_empty (job->priv->processing)) {
+		rb_debug ("cancelled job has no processing entries, emitting complete");
+		g_signal_emit (job, signals[COMPLETE], 0, job->priv->total);
 	}
 	g_mutex_unlock (&job->priv->lock);
 	g_object_unref (job);
@@ -263,7 +290,6 @@ static void
 uri_recurse_func (GFile *file, gboolean dir, RhythmDBImportJob *job)
 {
 	RhythmDBEntry *entry;
-	gboolean add_uri;
 	char *uri;
 
 	if (dir) {
@@ -275,43 +301,36 @@ uri_recurse_func (GFile *file, gboolean dir, RhythmDBImportJob *job)
 
 	uri = g_file_get_uri (file);
 
-	/* only add the entry to the map of entries we're waiting for
-	 * if it's not already in the db.
-	 */
+	/* if it's not already in the db, add it to the list of things to process */
 	entry = rhythmdb_entry_lookup_by_location (job->priv->db, uri);
 	if (entry == NULL) {
 		rb_debug ("waiting for entry %s", uri);
 		g_mutex_lock (&job->priv->lock);
 		job->priv->total++;
-		g_hash_table_insert (job->priv->outstanding, g_strdup (uri), GINT_TO_POINTER (1));
+		g_queue_push_tail (job->priv->outstanding, g_strdup (uri));
 
 		if (job->priv->status_changed_id == 0) {
 			job->priv->status_changed_id = g_idle_add ((GSourceFunc) emit_status_changed, job);
 		}
 
-		g_mutex_unlock (&job->priv->lock);
+		maybe_start_more (job);
 
-		add_uri = TRUE;
+		g_mutex_unlock (&job->priv->lock);
 	} else {
 		/* skip it if it's a different entry type */
 		RhythmDBEntryType *et;
 		et = rhythmdb_entry_get_entry_type (entry);
-		if (et != job->priv->entry_type &&
-		    et != job->priv->ignore_type &&
-		    et != job->priv->error_type) {
-			add_uri = FALSE;
-		} else {
-			add_uri = TRUE;
+		if (et == job->priv->entry_type ||
+		    et == job->priv->ignore_type ||
+		    et == job->priv->error_type) {
+			rhythmdb_add_uri_with_types (job->priv->db,
+						     uri,
+						     job->priv->entry_type,
+						     job->priv->ignore_type,
+						     job->priv->error_type);
 		}
 	}
 
-	if (add_uri) {
-		rhythmdb_add_uri_with_types (job->priv->db,
-					     uri,
-					     job->priv->entry_type,
-					     job->priv->ignore_type,
-					     job->priv->error_type);
-	}
 	g_free (uri);
 }
 
@@ -468,14 +487,14 @@ entry_added_cb (RhythmDB *db,
 		RhythmDBImportJob *job)
 {
 	const char *uri;
-	gboolean ours;
+	GList *link;
 
 	uri = rhythmdb_entry_get_string (entry, RHYTHMDB_PROP_LOCATION);
 
 	g_mutex_lock (&job->priv->lock);
-	ours = g_hash_table_remove (job->priv->outstanding, uri);
+	link = g_queue_find_custom (job->priv->processing, uri, (GCompareFunc) g_strcmp0);
 
-	if (ours) {
+	if (link != NULL) {
 		const char *details;
 		RhythmDBEntryType *entry_type;
 
@@ -500,6 +519,9 @@ entry_added_cb (RhythmDB *db,
 		if (job->priv->status_changed_id == 0) {
 			job->priv->status_changed_id = g_idle_add ((GSourceFunc) emit_status_changed, job);
 		}
+
+		g_queue_delete_link (job->priv->processing, link);
+		maybe_start_more (job);
 	}
 	g_mutex_unlock (&job->priv->lock);
 }
@@ -512,7 +534,8 @@ rhythmdb_import_job_init (RhythmDBImportJob *job)
 						 RhythmDBImportJobPrivate);
 
 	g_mutex_init (&job->priv->lock);
-	job->priv->outstanding = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, NULL);
+	job->priv->outstanding = g_queue_new ();
+	job->priv->processing = g_queue_new ();
 
 	job->priv->cancel = g_cancellable_new ();
 }
@@ -596,8 +619,9 @@ static void
 impl_finalize (GObject *object)
 {
 	RhythmDBImportJob *job = RHYTHMDB_IMPORT_JOB (object);
-	
-	g_hash_table_destroy (job->priv->outstanding);
+
+	g_queue_free_full (job->priv->outstanding, g_free);
+	g_queue_free_full (job->priv->processing, g_free);
 
 	rb_slist_deep_free (job->priv->uri_list);
 



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]