[tracker/miner-priority-queues: 21/22] libtracker-miner: Add priorities to the sparql buffer



commit bf57817873c68409a5b52de4c345ee66e9885b57
Author: Carlos Garnacho <carlos lanedo com>
Date:   Thu Jul 14 15:03:19 2011 +0200

    libtracker-miner: Add priorities to the sparql buffer
    
    If a task is of priority G_PRIORITY_HIGH, it will issue an Update()
    instead of a UpdateArray(), so updates are most immediate.

 src/libtracker-miner/tracker-miner-fs.c      |    4 +
 src/libtracker-miner/tracker-sparql-buffer.c |   84 +++++++++++++++++++++-----
 src/libtracker-miner/tracker-sparql-buffer.h |    1 +
 3 files changed, 73 insertions(+), 16 deletions(-)
---
diff --git a/src/libtracker-miner/tracker-miner-fs.c b/src/libtracker-miner/tracker-miner-fs.c
index 49070bd..f243ddf 100644
--- a/src/libtracker-miner/tracker-miner-fs.c
+++ b/src/libtracker-miner/tracker-miner-fs.c
@@ -1860,6 +1860,7 @@ item_add_or_update_cb (TrackerMinerFS *fs,
 
 	tracker_sparql_buffer_push (fs->priv->sparql_buffer,
 	                            sparql_task,
+	                            ctxt->priority,
 	                            sparql_buffer_task_finished_cb,
 	                            fs);
 
@@ -1967,6 +1968,7 @@ item_remove (TrackerMinerFS *fs,
 
 	tracker_sparql_buffer_push (fs->priv->sparql_buffer,
 	                            task,
+	                            G_PRIORITY_DEFAULT,
 	                            sparql_buffer_task_finished_cb,
 	                            fs);
 
@@ -1985,6 +1987,7 @@ item_remove (TrackerMinerFS *fs,
 
 	tracker_sparql_buffer_push (fs->priv->sparql_buffer,
 	                            task,
+	                            G_PRIORITY_DEFAULT,
 	                            sparql_buffer_task_finished_cb,
 	                            fs);
 
@@ -2332,6 +2335,7 @@ item_move (TrackerMinerFS *fs,
 	                                                               FALSE));
 	tracker_sparql_buffer_push (fs->priv->sparql_buffer,
 	                            task,
+	                            G_PRIORITY_DEFAULT,
 	                            sparql_buffer_task_finished_cb,
 	                            fs);
 
diff --git a/src/libtracker-miner/tracker-sparql-buffer.c b/src/libtracker-miner/tracker-sparql-buffer.c
index bd3abe9..c79e534 100644
--- a/src/libtracker-miner/tracker-sparql-buffer.c
+++ b/src/libtracker-miner/tracker-sparql-buffer.c
@@ -26,6 +26,7 @@
 typedef struct _TrackerSparqlBufferPrivate TrackerSparqlBufferPrivate;
 typedef struct _SparqlTaskData SparqlTaskData;
 typedef struct _UpdateArrayData UpdateArrayData;
+typedef struct _UpdateData UpdateData;
 typedef struct _BulkOperationMerge BulkOperationMerge;
 
 enum {
@@ -64,6 +65,11 @@ struct _SparqlTaskData
 	GSimpleAsyncResult *result;
 };
 
+struct _UpdateData {
+	TrackerSparqlBuffer *buffer;
+	TrackerTask *task;
+};
+
 struct _UpdateArrayData {
 	TrackerSparqlBuffer *buffer;
 	GPtrArray *tasks;
@@ -544,9 +550,30 @@ tracker_sparql_buffer_flush (TrackerSparqlBuffer *buffer,
 	return TRUE;
 }
 
+static void
+tracker_sparql_buffer_update_cb (GObject      *object,
+                                 GAsyncResult *result,
+                                 gpointer      user_data)
+{
+	UpdateData *update_data = user_data;
+	GError *error = NULL;
+
+	tracker_sparql_connection_update_finish (TRACKER_SPARQL_CONNECTION (object),
+	                                         result, &error);
+	if (error) {
+		g_critical ("Error in prioritized update: %s\n", error->message);
+		g_error_free (error);
+	}
+
+	tracker_task_pool_remove (TRACKER_TASK_POOL (update_data->buffer),
+	                          update_data->task);
+	g_slice_free (UpdateData, update_data);
+}
+
 void
 tracker_sparql_buffer_push (TrackerSparqlBuffer *buffer,
                             TrackerTask         *task,
+                            gint                 priority,
                             GAsyncReadyCallback  cb,
                             gpointer             user_data)
 {
@@ -558,27 +585,52 @@ tracker_sparql_buffer_push (TrackerSparqlBuffer *buffer,
 
 	priv = buffer->priv;
 
-	if (tracker_task_pool_get_size (TRACKER_TASK_POOL (buffer)) == 0) {
-		reset_flush_timeout (buffer);
-	}
+	data = tracker_task_get_data (task);
+	data->result = g_simple_async_result_new (G_OBJECT (buffer),
+	                                          cb, user_data, NULL);
 
-	tracker_task_pool_add (TRACKER_TASK_POOL (buffer), task);
+	if (priority <= G_PRIORITY_HIGH &&
+	    data->type != TASK_TYPE_BULK) {
+		UpdateData *update_data;
+		const gchar *sparql;
 
-	if (!priv->tasks) {
-		priv->tasks = g_ptr_array_new_with_free_func ((GDestroyNotify) tracker_task_unref);
-	}
+		/* High priority task */
+		update_data = g_slice_new0 (UpdateData);
+		update_data->buffer = buffer;
+		update_data->task = task;
 
-	g_ptr_array_add (priv->tasks, task);
+		if (data->type == TASK_TYPE_SPARQL_STR) {
+			sparql = data->data.str;
+		} else if (data->type == TASK_TYPE_SPARQL) {
+			sparql = tracker_sparql_builder_get_result (data->data.builder);
+		}
 
-	data = tracker_task_get_data (task);
-	data->result = g_simple_async_result_new (G_OBJECT (buffer),
-	                                          cb, user_data, NULL);
+		tracker_task_pool_add (TRACKER_TASK_POOL (buffer), task);
+		tracker_sparql_connection_update_async (priv->connection,
+		                                        sparql,
+		                                        G_PRIORITY_HIGH,
+		                                        NULL,
+		                                        tracker_sparql_buffer_update_cb,
+		                                        update_data);
+	} else {
+		if (tracker_task_pool_get_size (TRACKER_TASK_POOL (buffer)) == 0) {
+			reset_flush_timeout (buffer);
+		}
+
+		tracker_task_pool_add (TRACKER_TASK_POOL (buffer), task);
+
+		if (!priv->tasks) {
+			priv->tasks = g_ptr_array_new_with_free_func ((GDestroyNotify) tracker_task_unref);
+		}
+
+		g_ptr_array_add (priv->tasks, task);
 
-	if (tracker_task_pool_limit_reached (TRACKER_TASK_POOL (buffer))) {
-		tracker_sparql_buffer_flush (buffer, "SPARQL buffer limit reached");
-	} else if (priv->tasks->len > tracker_task_pool_get_limit (TRACKER_TASK_POOL (buffer)) / 2) {
-		/* We've filled half of the buffer, flush it as we receive more tasks */
-		tracker_sparql_buffer_flush (buffer, "SPARQL buffer half-full");
+		if (tracker_task_pool_limit_reached (TRACKER_TASK_POOL (buffer))) {
+			tracker_sparql_buffer_flush (buffer, "SPARQL buffer limit reached");
+		} else if (priv->tasks->len > tracker_task_pool_get_limit (TRACKER_TASK_POOL (buffer)) / 2) {
+			/* We've filled half of the buffer, flush it as we receive more tasks */
+			tracker_sparql_buffer_flush (buffer, "SPARQL buffer half-full");
+		}
 	}
 }
 
diff --git a/src/libtracker-miner/tracker-sparql-buffer.h b/src/libtracker-miner/tracker-sparql-buffer.h
index 6ee13b4..919ad9a 100644
--- a/src/libtracker-miner/tracker-sparql-buffer.h
+++ b/src/libtracker-miner/tracker-sparql-buffer.h
@@ -65,6 +65,7 @@ gboolean             tracker_sparql_buffer_flush (TrackerSparqlBuffer *buffer,
 
 void                 tracker_sparql_buffer_push  (TrackerSparqlBuffer *buffer,
                                                   TrackerTask         *task,
+                                                  gint                 priority,
                                                   GAsyncReadyCallback  cb,
                                                   gpointer             user_data);
 



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]