tracker r3011 - in trunk: . src/libtracker-db src/tracker-indexer
- From: carlosg svn gnome org
- To: svn-commits-list gnome org
- Subject: tracker r3011 - in trunk: . src/libtracker-db src/tracker-indexer
- Date: Tue, 3 Mar 2009 17:57:30 +0000 (UTC)
Author: carlosg
Date: Tue Mar 3 17:57:30 2009
New Revision: 3011
URL: http://svn.gnome.org/viewvc/tracker?rev=3011&view=rev
Log:
2009-03-03 Carlos Garnacho <carlos imendio com>
Make flushing to index asynchronous.
* src/libtracker-db/tracker-db-index.[ch]: Make asynchronous, so the
information is committed in batches, and the cache is layered. Add
boolean properties to indicate whether it's currently flushing or
saturated (i.e.: too many cache layers left to flush)
* src/tracker-indexer/tracker-indexer.c: Update to these changes.
Listen for changes in these new properties in order to pause the
indexer or waiting for the index to be flushed before finishing.
Modified:
trunk/ChangeLog
trunk/src/libtracker-db/tracker-db-index.c
trunk/src/libtracker-db/tracker-db-index.h
trunk/src/tracker-indexer/tracker-indexer.c
Modified: trunk/src/libtracker-db/tracker-db-index.c
==============================================================================
--- trunk/src/libtracker-db/tracker-db-index.c (original)
+++ trunk/src/libtracker-db/tracker-db-index.c Tue Mar 3 17:57:30 2009
@@ -37,6 +37,8 @@
/* Size of free block pool of inverted index */
#define MAX_HIT_BUFFER 480000
+#define MAX_CACHE_DEPTH 2
+#define MAX_FLUSH_TIME 0.5 /* In fractions of a second */
#define TRACKER_DB_INDEX_GET_PRIVATE(o) (G_TYPE_INSTANCE_GET_PRIVATE ((o), TRACKER_TYPE_DB_INDEX, TrackerDBIndexPrivate))
@@ -53,16 +55,18 @@
guint readonly : 1;
guint in_pause : 1;
guint in_flush : 1;
+ guint overloaded : 1;
+
+ /* Internal caches */
+ guint idle_flush_id;
+ GList *cache_layers;
+ GHashTable *cur_cache;
/* From the indexer */
- GHashTable *cache;
gchar *filename;
gint bucket_count;
};
-/*
-static void tracker_db_index_class_init (TrackerDBIndexClass *class);
-static void tracker_db_index_init (TrackerDBIndex *tree);
-*/
+
static void tracker_db_index_finalize (GObject *object);
static void tracker_db_index_set_property (GObject *object,
guint prop_id,
@@ -74,13 +78,16 @@
GParamSpec *pspec);
static void free_cache_values (GArray *array);
+
enum {
PROP_0,
PROP_FILENAME,
PROP_MIN_BUCKET,
PROP_MAX_BUCKET,
PROP_RELOAD,
- PROP_READONLY
+ PROP_READONLY,
+ PROP_FLUSHING,
+ PROP_OVERLOADED
};
G_DEFINE_TYPE (TrackerDBIndex, tracker_db_index, G_TYPE_OBJECT)
@@ -139,9 +146,33 @@
G_PARAM_READWRITE |
G_PARAM_CONSTRUCT));
+ g_object_class_install_property (object_class,
+ PROP_FLUSHING,
+ g_param_spec_boolean ("flushing",
+ "Whether the index is currently being flushed",
+ "Whether the index is currently being flushed",
+ FALSE,
+ G_PARAM_READABLE));
+ g_object_class_install_property (object_class,
+ PROP_OVERLOADED,
+ g_param_spec_boolean ("overloaded",
+ "Whether the index cache is overloaded",
+ "Whether the index cache is overloaded",
+ FALSE,
+ G_PARAM_READABLE));
+
g_type_class_add_private (object_class, sizeof (TrackerDBIndexPrivate));
}
+static GHashTable *
+index_cache_new (void)
+{
+ return g_hash_table_new_full (g_str_hash,
+ g_str_equal,
+ (GDestroyNotify) g_free,
+ (GDestroyNotify) free_cache_values);
+}
+
static void
tracker_db_index_init (TrackerDBIndex *indez)
{
@@ -150,11 +181,6 @@
priv = TRACKER_DB_INDEX_GET_PRIVATE (indez);
priv->reload = TRUE;
-
- priv->cache = g_hash_table_new_full (g_str_hash,
- g_str_equal,
- (GDestroyNotify) g_free,
- (GDestroyNotify) free_cache_values);
}
static void
@@ -166,13 +192,22 @@
indez = TRACKER_DB_INDEX (object);
priv = TRACKER_DB_INDEX_GET_PRIVATE (indez);
- tracker_db_index_flush (indez);
+ tracker_db_index_flush_sync (indez);
tracker_db_index_close (indez);
- g_hash_table_destroy (priv->cache);
+ if (priv->idle_flush_id) {
+ g_source_remove (priv->idle_flush_id);
+ priv->idle_flush_id = 0;
+ }
- g_free (priv->filename);
+ g_list_foreach (priv->cache_layers, (GFunc) g_hash_table_destroy, NULL);
+ g_list_free (priv->cache_layers);
+ if (priv->cur_cache) {
+ g_hash_table_destroy (priv->cur_cache);
+ }
+
+ g_free (priv->filename);
G_OBJECT_CLASS (tracker_db_index_parent_class)->finalize (object);
}
@@ -236,6 +271,12 @@
case PROP_READONLY:
g_value_set_boolean (value, priv->readonly);
break;
+ case PROP_FLUSHING:
+ g_value_set_boolean (value, priv->in_flush);
+ break;
+ case PROP_OVERLOADED:
+ g_value_set_boolean (value, priv->overloaded);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
@@ -537,13 +578,29 @@
return !priv->reload;
}
+static void
+update_overloaded_status (TrackerDBIndex *indez)
+{
+ TrackerDBIndexPrivate *priv;
+ gboolean overloaded;
+
+ priv = TRACKER_DB_INDEX_GET_PRIVATE (indez);
+
+ overloaded = g_list_length (priv->cache_layers) > MAX_CACHE_DEPTH;
+
+ if (priv->overloaded != overloaded) {
+ priv->overloaded = overloaded;
+ g_object_notify (G_OBJECT (indez), "overloaded");
+ }
+}
+
/* Use for deletes or updates of multiple entities when they are not
* new.
*/
static gboolean
-indexer_update_word (DEPOT *indez,
- const gchar *word,
- GArray *new_hits)
+indexer_update_word (const gchar *word,
+ GArray *new_hits,
+ DEPOT *indez)
{
TrackerDBIndexItem *new_hit;
TrackerDBIndexItem *previous_hits;
@@ -696,32 +753,105 @@
return TRUE;
}
+static void
+set_in_flush (TrackerDBIndex *indez,
+ gboolean in_flush)
+{
+ TrackerDBIndexPrivate *priv;
+
+ priv = TRACKER_DB_INDEX_GET_PRIVATE (indez);
+
+ if (in_flush != priv->in_flush) {
+ priv->in_flush = in_flush;
+ g_object_notify (G_OBJECT (indez), "flushing");
+ }
+}
+
static gboolean
-cache_flush_item (gpointer key,
- gpointer value,
- gpointer user_data)
-{
- GArray *array;
- DEPOT *indez;
- gchar *word;
-
- word = (gchar *) key;
- array = (GArray *) value;
- indez = (DEPOT *) user_data;
-
- /* Mark element for removal if succesfull insertion */
-
- /**
- * FIXME:
- *
- * Not removing the word from the memory-queue is not a good solution.
- * That's because the only thing we'll achieve is letting this queue
- * grow until it starts succeeding again. Which might end up being
- * never. Making tracker-indexer both becoming increasingly slow and
- * start consuming increasing amounts of memory.
- **/
+index_flush_item (gpointer user_data)
+{
+ TrackerDBIndex *indez;
+ TrackerDBIndexPrivate *priv;
+ GHashTableIter iter;
+ gpointer key, value;
+
+ indez = TRACKER_DB_INDEX (user_data);
+ priv = TRACKER_DB_INDEX_GET_PRIVATE (indez);
+
+ if (priv->in_pause || !priv->index) {
+ g_debug ("Flushing was paused or index was closed, waiting...");
+ priv->idle_flush_id = 0;
+ return FALSE;
+ }
+
+ if (g_hash_table_size (priv->cache_layers->data) > 0) {
+ GTimer *timer;
+
+ timer = g_timer_new ();
+ g_hash_table_iter_init (&iter, (GHashTable *) priv->cache_layers->data);
+
+ while (g_hash_table_iter_next (&iter, &key, &value)) {
+ /* Process words from cache */
+ if (indexer_update_word (key, value, priv->index)) {
+ g_hash_table_iter_remove (&iter);
+ }
+
+ if (g_timer_elapsed (timer, NULL) > MAX_FLUSH_TIME) {
+ break;
+ }
+ }
+
+ g_timer_destroy (timer);
+
+ return TRUE;
+ } else {
+ GList *link;
+
+ /* Current cache being flushed is already empty, proceed with the next one */
+ link = priv->cache_layers;
+ priv->cache_layers = g_list_remove_link (priv->cache_layers, link);
+ g_hash_table_destroy (link->data);
+ g_list_free_1 (link);
+
+ update_overloaded_status (indez);
+
+ if (priv->cache_layers) {
+ g_debug ("Flushing next batch (%d words) to index...",
+ g_hash_table_size (priv->cache_layers->data));
+ return TRUE;
+ } else {
+ g_debug ("Finished flushing elements to index");
+
+ set_in_flush (indez, FALSE);
+ priv->idle_flush_id = 0;
+
+ return FALSE;
+ }
+ }
+
+ return TRUE;
+}
+
+static void
+init_flush (TrackerDBIndex *indez)
+{
+ TrackerDBIndexPrivate *priv;
+
+ priv = TRACKER_DB_INDEX_GET_PRIVATE (indez);
+
+ if (priv->in_pause) {
+ g_debug ("Index was paused, waiting for it being resumed...");
+ return;
+ }
+
+ if (!priv->index) {
+ g_debug ("Index was not open for flush, waiting...");
+ return;
+ }
- return indexer_update_word (indez, word, array);
+ if (priv->idle_flush_id == 0) {
+ priv->idle_flush_id = g_idle_add (index_flush_item, indez);
+ }
}
gboolean
@@ -789,6 +919,11 @@
rec_count);
priv->reload = FALSE;
+
+ if (priv->in_flush) {
+ g_debug ("Resuming flushing...");
+ init_flush (indez);
+ }
} else {
priv->reload = TRUE;
}
@@ -840,64 +975,73 @@
}
}
-guint
+void
tracker_db_index_flush (TrackerDBIndex *indez)
{
TrackerDBIndexPrivate *priv;
- guint size, removed_items;
- g_return_val_if_fail (TRACKER_IS_DB_INDEX (indez), 0);
+ g_return_if_fail (TRACKER_IS_DB_INDEX (indez));
priv = TRACKER_DB_INDEX_GET_PRIVATE (indez);
- if (priv->in_pause) {
- g_debug ("Index was paused");
- return 0;
+#if 0
+ if (!priv->cur_cache || g_hash_table_size (priv->cur_cache) == 0) {
+ /* Nothing to flush */
+ return;
}
+#endif
- if (priv->in_flush) {
- g_debug ("Index was already in the middle of a flush");
- return 0;
+ if (!priv->in_flush) {
+ set_in_flush (indez, TRUE);
}
- if (!priv->index) {
- g_debug ("Index was not open for flush, waiting...");
- return 0;
- }
+ g_debug ("Pushing a new batch (%d words) to be flushed to index...",
+ g_hash_table_size (priv->cur_cache));
- priv->in_flush = TRUE;
- size = g_hash_table_size (priv->cache);
- removed_items = 0;
+ /* Put current cache into the queue and create a
+ * new one for keeping appending words
+ */
+ priv->cache_layers = g_list_append (priv->cache_layers, priv->cur_cache);
+ priv->cur_cache = index_cache_new ();
- if (size > 0) {
- GList *keys, *k;
- gpointer value;
+ init_flush (indez);
+ update_overloaded_status (indez);
+}
- g_debug ("Flushing index with %d items in cache", size);
+void
+tracker_db_index_flush_sync (TrackerDBIndex *indez)
+{
+ TrackerDBIndexPrivate *priv;
+ GList *cache;
- keys = g_hash_table_get_keys (priv->cache);
+ g_return_if_fail (TRACKER_IS_DB_INDEX (indez));
- for (k = keys; k; k = k->next) {
- value = g_hash_table_lookup (priv->cache, k->data);
+ priv = TRACKER_DB_INDEX_GET_PRIVATE (indez);
- if (cache_flush_item (k->data, value, priv->index)) {
- g_hash_table_remove (priv->cache, k->data);
- removed_items++;
- }
+ if (priv->idle_flush_id) {
+ g_source_remove (priv->idle_flush_id);
+ priv->idle_flush_id = 0;
+ }
- g_main_context_iteration (NULL, FALSE);
+ set_in_flush (indez, TRUE);
- if (priv->in_pause) {
- break;
- }
- }
+ if (priv->cur_cache && g_hash_table_size (priv->cur_cache) > 0) {
+ priv->cache_layers = g_list_append (priv->cache_layers, priv->cur_cache);
+ priv->cur_cache = NULL;
+ }
- g_list_free (keys);
+ for (cache = priv->cache_layers; cache; cache = cache->next) {
+ g_hash_table_foreach_remove (cache->data,
+ (GHRFunc) indexer_update_word,
+ priv->index);
}
- priv->in_flush = FALSE;
+ g_list_foreach (priv->cache_layers, (GFunc) g_hash_table_destroy, NULL);
+ g_list_free (priv->cache_layers);
+ priv->cache_layers = NULL;
- return removed_items;
+ set_in_flush (indez, FALSE);
+ update_overloaded_status (indez);
}
guint32
@@ -1048,19 +1192,21 @@
priv = TRACKER_DB_INDEX_GET_PRIVATE (indez);
- g_return_if_fail (priv->in_flush == FALSE);
+ if (G_UNLIKELY (!priv->cur_cache)) {
+ priv->cur_cache = index_cache_new ();
+ }
elem.id = service_id;
elem.amalgamated = tracker_db_index_item_calc_amalgamated (service_type, weight);
- array = g_hash_table_lookup (priv->cache, word);
+ array = g_hash_table_lookup (priv->cur_cache, word);
if (!array) {
/* Create the array if it didn't exist (first time we
* find the word)
*/
array = g_array_new (FALSE, TRUE, sizeof (TrackerDBIndexItem));
- g_hash_table_insert (priv->cache, g_strdup (word), array);
+ g_hash_table_insert (priv->cur_cache, g_strdup (word), array);
g_array_append_val (array, elem);
return;
@@ -1089,6 +1235,30 @@
g_array_append_val (array, elem);
}
+gboolean
+tracker_db_index_get_flushing (TrackerDBIndex *indez)
+{
+ TrackerDBIndexPrivate *priv;
+
+ g_return_val_if_fail (TRACKER_IS_DB_INDEX (indez), FALSE);
+
+ priv = TRACKER_DB_INDEX_GET_PRIVATE (indez);
+
+ return priv->in_flush;
+}
+
+gboolean
+tracker_db_index_get_overloaded (TrackerDBIndex *indez)
+{
+ TrackerDBIndexPrivate *priv;
+
+ g_return_val_if_fail (TRACKER_IS_DB_INDEX (indez), FALSE);
+
+ priv = TRACKER_DB_INDEX_GET_PRIVATE (indez);
+
+ return priv->overloaded;
+}
+
/*
* UNUSED
*
Modified: trunk/src/libtracker-db/tracker-db-index.h
==============================================================================
--- trunk/src/libtracker-db/tracker-db-index.h (original)
+++ trunk/src/libtracker-db/tracker-db-index.h Tue Mar 3 17:57:30 2009
@@ -72,7 +72,8 @@
/* Open/Close/Flush */
gboolean tracker_db_index_open (TrackerDBIndex *index);
gboolean tracker_db_index_close (TrackerDBIndex *index);
-guint tracker_db_index_flush (TrackerDBIndex *index);
+void tracker_db_index_flush (TrackerDBIndex *index);
+void tracker_db_index_flush_sync (TrackerDBIndex *index);
/* Using the index */
guint32 tracker_db_index_get_size (TrackerDBIndex *index);
@@ -87,6 +88,8 @@
guint32 service_id,
gint service_type,
gint weight);
+gboolean tracker_db_index_get_flushing (TrackerDBIndex *indez);
+gboolean tracker_db_index_get_overloaded (TrackerDBIndex *indez);
gboolean tracker_db_index_remove_dud_hits (TrackerDBIndex *index,
const gchar *word,
GSList *dud_list);
Modified: trunk/src/tracker-indexer/tracker-indexer.c
==============================================================================
--- trunk/src/tracker-indexer/tracker-indexer.c (original)
+++ trunk/src/tracker-indexer/tracker-indexer.c Tue Mar 3 17:57:30 2009
@@ -144,8 +144,9 @@
guint items_to_index;
guint subelements_processed;
- gboolean in_transaction;
- gboolean in_process;
+ guint in_transaction : 1;
+ guint in_process : 1;
+ guint interrupted : 1;
guint state;
};
@@ -173,7 +174,7 @@
};
enum TrackerIndexerState {
- TRACKER_INDEXER_STATE_FLUSHING = 1 << 0,
+ TRACKER_INDEXER_STATE_INDEX_OVERLOADED = 1 << 0,
TRACKER_INDEXER_STATE_PAUSED = 1 << 1,
TRACKER_INDEXER_STATE_STOPPED = 1 << 2,
};
@@ -205,6 +206,7 @@
PathInfo *info,
const gchar *dirname,
const gchar *basename);
+static void check_finished (TrackerIndexer *indexer);
static guint signals[LAST_SIGNAL] = { 0, };
@@ -348,8 +350,6 @@
{
indexer->private->flush_id = 0;
- state_set_flags (indexer, TRACKER_INDEXER_STATE_FLUSHING);
-
if (indexer->private->in_transaction) {
stop_transaction (indexer);
}
@@ -361,8 +361,6 @@
indexer->private->items_indexed += indexer->private->items_to_index;
indexer->private->items_to_index = 0;
- state_unset_flags (indexer, TRACKER_INDEXER_STATE_FLUSHING);
-
return FALSE;
}
@@ -469,6 +467,38 @@
#endif /* HAVE_HAL */
static void
+index_flushing_notify_cb (GObject *object,
+ GParamSpec *pspec,
+ TrackerIndexer *indexer)
+{
+ TrackerIndexerState state;
+
+ state = indexer->private->state;
+
+ if ((state & TRACKER_INDEXER_STATE_STOPPED) != 0 &&
+ !tracker_db_index_get_flushing (indexer->private->file_index) &&
+ !tracker_db_index_get_flushing (indexer->private->email_index)) {
+ /* The indexer has been already stopped and all indices are flushed */
+ check_finished (indexer);
+ }
+}
+
+static void
+index_overloaded_notify_cb (GObject *object,
+ GParamSpec *pspec,
+ TrackerIndexer *indexer)
+{
+ if (tracker_db_index_get_overloaded (indexer->private->file_index) ||
+ tracker_db_index_get_overloaded (indexer->private->email_index)) {
+ g_debug ("Index overloaded, stopping indexer to let it process items");
+ state_set_flags (indexer, TRACKER_INDEXER_STATE_INDEX_OVERLOADED);
+ } else {
+ g_debug ("Index no longer overloaded, resuming data harvesting");
+ state_unset_flags (indexer, TRACKER_INDEXER_STATE_INDEX_OVERLOADED);
+ }
+}
+
+static void
tracker_indexer_finalize (GObject *object)
{
TrackerIndexerPrivate *priv;
@@ -507,7 +537,20 @@
g_object_unref (priv->language);
g_object_unref (priv->config);
+ g_signal_handlers_disconnect_by_func (priv->file_index,
+ index_flushing_notify_cb,
+ object);
+ g_signal_handlers_disconnect_by_func (priv->file_index,
+ index_overloaded_notify_cb,
+ object);
g_object_unref (priv->file_index);
+
+ g_signal_handlers_disconnect_by_func (priv->email_index,
+ index_flushing_notify_cb,
+ object);
+ g_signal_handlers_disconnect_by_func (priv->email_index,
+ index_overloaded_notify_cb,
+ object);
g_object_unref (priv->email_index);
g_free (priv->db_dir);
@@ -654,9 +697,13 @@
return;
}
+ indexer->private->interrupted = FALSE;
state_unset_flags (indexer, TRACKER_INDEXER_STATE_STOPPED);
- g_timer_destroy (indexer->private->timer);
+ if (indexer->private->timer) {
+ g_timer_destroy (indexer->private->timer);
+ }
+
indexer->private->timer = g_timer_new ();
/* Open indexes */
@@ -667,8 +714,7 @@
}
static void
-check_stopped (TrackerIndexer *indexer,
- gboolean interrupted)
+check_finished (TrackerIndexer *indexer)
{
TrackerIndexerState state;
gdouble seconds_elapsed = 0;
@@ -676,21 +722,18 @@
state = indexer->private->state;
- /* No more modules to query, we're done */
- if ((state & TRACKER_INDEXER_STATE_STOPPED) == 0) {
+ if (indexer->private->timer) {
g_timer_stop (indexer->private->timer);
seconds_elapsed = g_timer_elapsed (indexer->private->timer, NULL);
- }
- /* Flush remaining items */
- schedule_flush (indexer, TRUE);
+ g_timer_destroy (indexer->private->timer);
+ indexer->private->timer = NULL;
+ }
/* Close indexes */
tracker_db_index_close (indexer->private->file_index);
tracker_db_index_close (indexer->private->email_index);
- state_set_flags (indexer, TRACKER_INDEXER_STATE_STOPPED);
-
/* Print out how long it took us */
str = tracker_seconds_to_string (seconds_elapsed, FALSE);
@@ -705,7 +748,7 @@
seconds_elapsed,
indexer->private->items_processed,
indexer->private->items_indexed,
- interrupted);
+ indexer->private->interrupted);
/* Reset stats */
indexer->private->items_processed = 0;
@@ -714,6 +757,15 @@
indexer->private->subelements_processed = 0;
}
+static void
+check_stopped (TrackerIndexer *indexer,
+ gboolean interrupted)
+{
+ schedule_flush (indexer, TRUE);
+ state_set_flags (indexer, TRACKER_INDEXER_STATE_STOPPED);
+ indexer->private->interrupted = (interrupted != FALSE);
+}
+
static gboolean
signal_status_cb (TrackerIndexer *indexer)
{
@@ -834,9 +886,19 @@
lindex = tracker_db_index_manager_get_index (TRACKER_DB_INDEX_FILE);
priv->file_index = g_object_ref (lindex);
+ g_signal_connect (priv->file_index, "notify::flushing",
+ G_CALLBACK (index_flushing_notify_cb), indexer);
+ g_signal_connect (priv->file_index, "notify::overloaded",
+ G_CALLBACK (index_overloaded_notify_cb), indexer);
+
lindex = tracker_db_index_manager_get_index (TRACKER_DB_INDEX_EMAIL);
priv->email_index = g_object_ref (lindex);
+ g_signal_connect (priv->email_index, "notify::flushing",
+ G_CALLBACK (index_flushing_notify_cb), indexer);
+ g_signal_connect (priv->email_index, "notify::overloaded",
+ G_CALLBACK (index_overloaded_notify_cb), indexer);
+
/* Set up databases, these pointers are mostly used to
* start/stop transactions, since TrackerDBManager treats
* interfaces as singletons, it's safe to just ask it
@@ -849,9 +911,6 @@
priv->email_metadata = tracker_db_manager_get_db_interface (TRACKER_DB_EMAIL_METADATA);
priv->email_contents = tracker_db_manager_get_db_interface (TRACKER_DB_EMAIL_CONTENTS);
- /* Set up timer to know how long the process will take and took */
- priv->timer = g_timer_new ();
-
/* Set up idle handler to process files/directories */
state_check (indexer);
}
@@ -2511,8 +2570,8 @@
s = g_string_new ("");
- if (state & TRACKER_INDEXER_STATE_FLUSHING) {
- s = g_string_append (s, "FLUSHING | ");
+ if (state & TRACKER_INDEXER_STATE_INDEX_OVERLOADED) {
+ s = g_string_append (s, "INDEX_OVERLOADED | ");
}
if (state & TRACKER_INDEXER_STATE_PAUSED) {
s = g_string_append (s, "PAUSED | ");
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]