[tracker/batch-signal: 4/4] tracker-store: Stop signal timeout for sequential batch updates



commit 98f6eb9c2e48e4fe4ab0c6d1ae5a7f80b916c968
Author: Jürg Billeter <j bitron ch>
Date:   Tue May 10 15:15:14 2011 +0200

    tracker-store: Stop signal timeout for sequential batch updates
    
    If tracker-store receives a batch request after processing all previous
    batch requests but before the signal timeout expires, stop the signal
    timeout. This reduces the number of signals sent for batch updates where
    requests do not trigger any queueing in tracker-store, e.g., when
    sending simple batch update every 0.5 s.
    
    Fixes NB#254154.

 src/libtracker-data/libtracker-data.vapi  |   11 ++++++++-
 src/libtracker-data/tracker-data-update.c |    4 +-
 src/libtracker-data/tracker-data-update.h |   30 ++++++++++++++++----------
 src/tracker-store/tracker-resources.vala  |   32 +++++++++++++++++++++++++---
 src/tracker-store/tracker-store.vala      |   31 +++++++++++++++------------
 5 files changed, 74 insertions(+), 34 deletions(-)
---
diff --git a/src/libtracker-data/libtracker-data.vapi b/src/libtracker-data/libtracker-data.vapi
index d01f9a6..e9ef977 100644
--- a/src/libtracker-data/libtracker-data.vapi
+++ b/src/libtracker-data/libtracker-data.vapi
@@ -161,10 +161,17 @@ namespace Tracker {
 	}
 
 	public delegate void StatementCallback (int graph_id, string? graph, int subject_id, string subject, int predicate_id, int object_id, string object, GLib.PtrArray rdf_types);
-	public delegate void CommitCallback (bool start_timer);
+	public delegate void CommitCallback (Data.CommitType commit_type);
 
 	[CCode (cheader_filename = "libtracker-data/tracker-data-query.h,libtracker-data/tracker-data-update.h,libtracker-data/tracker-data-backup.h")]
 	namespace Data {
+		[CCode (cprefix = "TRACKER_DATA_COMMIT_")]
+		public enum CommitType {
+			REGULAR,
+			BATCH,
+			BATCH_LAST
+		}
+
 		public int query_resource_id (string uri);
 		public DBCursor query_sparql_cursor (string query) throws Sparql.Error;
 		public void begin_db_transaction ();
@@ -175,7 +182,7 @@ namespace Tracker {
 		public void update_sparql (string update) throws Sparql.Error;
 		public GLib.Variant update_sparql_blank (string update) throws Sparql.Error;
 		public void load_turtle_file (GLib.File file) throws Sparql.Error;
-		public void notify_transaction (bool start_timer);
+		public void notify_transaction (CommitType commit_type);
 		public void delete_statement (string? graph, string subject, string predicate, string object) throws Sparql.Error, DateError;
 		public void update_statement (string? graph, string subject, string predicate, string object) throws Sparql.Error, DateError;
 		public void insert_statement (string? graph, string subject, string predicate, string object) throws Sparql.Error, DateError;
diff --git a/src/libtracker-data/tracker-data-update.c b/src/libtracker-data/tracker-data-update.c
index 955b4b7..c72409a 100644
--- a/src/libtracker-data/tracker-data-update.c
+++ b/src/libtracker-data/tracker-data-update.c
@@ -3070,14 +3070,14 @@ tracker_data_commit_transaction (GError **error)
 }
 
 void
-tracker_data_notify_transaction (gboolean start_timer)
+tracker_data_notify_transaction (TrackerDataCommitType commit_type)
 {
 	if (commit_callbacks) {
 		guint n;
 		for (n = 0; n < commit_callbacks->len; n++) {
 			TrackerCommitDelegate *delegate;
 			delegate = g_ptr_array_index (commit_callbacks, n);
-			delegate->callback (start_timer, delegate->user_data);
+			delegate->callback (commit_type, delegate->user_data);
 		}
 	}
 }
diff --git a/src/libtracker-data/tracker-data-update.h b/src/libtracker-data/tracker-data-update.h
index 1bed0d9..9921333 100644
--- a/src/libtracker-data/tracker-data-update.h
+++ b/src/libtracker-data/tracker-data-update.h
@@ -34,17 +34,23 @@ G_BEGIN_DECLS
 #error "only <libtracker-data/tracker-data.h> must be included directly."
 #endif
 
-typedef void (*TrackerStatementCallback) (gint         graph_id,
-                                          const gchar *graph,
-                                          gint         subject_id,
-                                          const gchar *subject,
-                                          gint         predicate_id,
-                                          gint         object_id,
-                                          const gchar *object,
-                                          GPtrArray   *rdf_types,
-                                          gpointer     user_data);
-typedef void (*TrackerCommitCallback)    (gboolean     start_timer,
-                                          gpointer     user_data);
+typedef enum {
+	TRACKER_DATA_COMMIT_REGULAR,
+	TRACKER_DATA_COMMIT_BATCH,
+	TRACKER_DATA_COMMIT_BATCH_LAST
+} TrackerDataCommitType;
+
+typedef void (*TrackerStatementCallback) (gint                  graph_id,
+                                          const gchar          *graph,
+                                          gint                  subject_id,
+                                          const gchar          *subject,
+                                          gint                  predicate_id,
+                                          gint                  object_id,
+                                          const gchar          *object,
+                                          GPtrArray            *rdf_types,
+                                          gpointer              user_data);
+typedef void (*TrackerCommitCallback)    (TrackerDataCommitType commit_type,
+                                          gpointer              user_data);
 
 GQuark   tracker_data_error_quark                   (void);
 
@@ -79,7 +85,7 @@ void     tracker_data_begin_ontology_transaction    (GError                   **
 void     tracker_data_begin_transaction_for_replay  (time_t                     time,
                                                      GError                   **error);
 void     tracker_data_commit_transaction            (GError                   **error);
-void     tracker_data_notify_transaction            (gboolean                   start_timer);
+void     tracker_data_notify_transaction            (TrackerDataCommitType      commit_type);
 void     tracker_data_rollback_transaction          (void);
 void     tracker_data_update_sparql                 (const gchar               *update,
                                                      GError                   **error);
diff --git a/src/tracker-store/tracker-resources.vala b/src/tracker-store/tracker-resources.vala
index bf491cf..2615f71 100644
--- a/src/tracker-store/tracker-resources.vala
+++ b/src/tracker-store/tracker-resources.vala
@@ -52,6 +52,7 @@ public class Tracker.Resources : Object {
 
 	DBusConnection connection;
 	uint signal_timeout;
+	bool regular_commit_pending;
 
 	public signal void writeback ([DBus (signature = "a{iai}")] Variant subjects);
 	public signal void graph_updated (string classname, [DBus (signature = "a(iiii)")] Variant deletes, [DBus (signature = "a(iiii)")] Variant inserts);
@@ -251,26 +252,49 @@ public class Tracker.Resources : Object {
 
 		Tracker.Writeback.reset_ready ();
 
+		regular_commit_pending = false;
 		signal_timeout = 0;
 		return false;
 	}
 
-	void on_statements_committed (bool start_timer) {
+	void on_statements_committed (Tracker.Data.CommitType commit_type) {
 		/* Class signal feature */
 
 		foreach (var cl in Tracker.Events.get_classes ()) {
 			cl.transact_events ();
 		}
 
-		if (start_timer && signal_timeout == 0) {
-			signal_timeout = Timeout.add (SIGNALS_SECONDS_PER_EMIT * 1000, on_emit_signals);
+		if (!regular_commit_pending) {
+			// never cancel timeout for non-batch commits as we want
+			// to ensure that the signal corresponding to a certain
+			// update arrives within a fixed time limit
+
+			// cancel it in all other cases
+			// in the BATCH_LAST case, the timeout will be reenabled
+			// further down but it's important to cancel it first
+			// to reset the timeout to 1 s starting now
+			if (signal_timeout != 0) {
+				Source.remove (signal_timeout);
+				signal_timeout = 0;
+			}
+		}
+
+		if (commit_type == Tracker.Data.CommitType.REGULAR) {
+			regular_commit_pending = true;
+		}
+
+		if (regular_commit_pending || commit_type == Tracker.Data.CommitType.BATCH_LAST) {
+			// timer wanted for non-batch commits and the last in a series of batch commits
+			if (signal_timeout == 0) {
+				signal_timeout = Timeout.add (SIGNALS_SECONDS_PER_EMIT * 1000, on_emit_signals);
+			}
 		}
 
 		/* Writeback feature */
 		Tracker.Writeback.transact ();
 	}
 
-	void on_statements_rolled_back (bool start_timer) {
+	void on_statements_rolled_back (Tracker.Data.CommitType commit_type) {
 		Tracker.Events.reset_pending ();
 		Tracker.Writeback.reset_pending ();
 	}
diff --git a/src/tracker-store/tracker-store.vala b/src/tracker-store/tracker-store.vala
index 0b086c7..4fc2f6b 100644
--- a/src/tracker-store/tracker-store.vala
+++ b/src/tracker-store/tracker-store.vala
@@ -135,24 +135,27 @@ public class Tracker.Store {
 		}
 	}
 
-	static bool start_timer_or_not (Task task) {
-		bool result;
-
+	static Tracker.Data.CommitType commit_type (Task task) {
 		switch (task.type) {
 			case TaskType.UPDATE:
 			case TaskType.UPDATE_BLANK:
-				result = !(((UpdateTask) task).priority == Priority.LOW && update_queues[Priority.LOW].get_length () > 0);
-				break;
+				if (((UpdateTask) task).priority == Priority.HIGH) {
+					return Tracker.Data.CommitType.REGULAR;
+				} else if (update_queues[Priority.LOW].get_length () > 0) {
+					return Tracker.Data.CommitType.BATCH;
+				} else {
+					return Tracker.Data.CommitType.BATCH_LAST;
+				}
 			case TaskType.TURTLE:
-				result = update_queues[Priority.TURTLE].get_length () == 0;
-				break;
-			case TaskType.QUERY:
+				if (update_queues[Priority.TURTLE].get_length () > 0) {
+					return Tracker.Data.CommitType.BATCH;
+				} else {
+					return Tracker.Data.CommitType.BATCH_LAST;
+				}
 			default:
-				result = false;
-				break;
+				warn_if_reached ();
+				return Tracker.Data.CommitType.REGULAR;
 		}
-
-		return result;
 	}
 
 	static bool task_finish_cb (Task task) {
@@ -174,7 +177,7 @@ public class Tracker.Store {
 			n_queries_running--;
 		} else if (task.type == TaskType.UPDATE || task.type == TaskType.UPDATE_BLANK) {
 			if (task.error == null) {
-				Tracker.Data.notify_transaction (start_timer_or_not (task));
+				Tracker.Data.notify_transaction (commit_type (task));
 			}
 
 			task.callback ();
@@ -183,7 +186,7 @@ public class Tracker.Store {
 			update_running = false;
 		} else if (task.type == TaskType.TURTLE) {
 			if (task.error == null) {
-				Tracker.Data.notify_transaction (start_timer_or_not (task));
+				Tracker.Data.notify_transaction (commit_type (task));
 			}
 
 			task.callback ();



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]