[tracker/grouped-deletes] processing-pool: Map correctly errors to tasks after array updates



commit 79cd772997fc3d8c2654a8925fcb8cfc8f4319c8
Author: Carlos Garnacho <carlosg gnome org>
Date:   Wed Mar 9 11:47:53 2011 +0100

    processing-pool: Map correctly errors to tasks after array updates
    
    An error map is now passed, so the error position in the returned
    array can be guessed for all tasks.

 .../tracker-miner-fs-processing-pool.c             |   75 +++++++++++++++-----
 1 files changed, 56 insertions(+), 19 deletions(-)
---
diff --git a/src/libtracker-miner/tracker-miner-fs-processing-pool.c b/src/libtracker-miner/tracker-miner-fs-processing-pool.c
index 947a603..e6f4816 100644
--- a/src/libtracker-miner/tracker-miner-fs-processing-pool.c
+++ b/src/libtracker-miner/tracker-miner-fs-processing-pool.c
@@ -198,6 +198,12 @@ struct _TrackerProcessingPool {
 #endif /* PROCESSING_POOL_ENABLE_TRACE */
 };
 
+typedef struct {
+	GPtrArray *tasks;
+	GArray *error_map;
+	guint n_bulk_operations;
+} UpdateArrayData;
+
 /*------------------- PROCESSING TASK ----------------------*/
 
 TrackerProcessingTask *
@@ -583,54 +589,71 @@ tracker_processing_pool_sparql_update_array_cb (GObject      *object,
 {
 	GError *global_error = NULL;
 	GPtrArray *sparql_array_errors;
-	GPtrArray *sparql_array;
+	UpdateArrayData *update_data;
 	guint i;
 
 	/* Get arrays of errors and queries */
-	sparql_array = user_data;
+	update_data = user_data;
 
 	trace ("(Processing Pool) Finished array-update of tasks %p",
-	       sparql_array);
+	       update_data->tasks);
 
 	sparql_array_errors = tracker_sparql_connection_update_array_finish (TRACKER_SPARQL_CONNECTION (object),
 	                                                                     result,
 	                                                                     &global_error);
 	if (global_error) {
 		g_critical ("(Processing Pool) Could not execute array-update of tasks %p with '%u' items: %s",
-		            sparql_array,
-		            sparql_array->len,
+		            update_data->tasks,
+		            update_data->tasks->len,
 		            global_error->message);
 	}
 
 	/* Report status on each task of the batch update */
-	for (i = 0; i < sparql_array->len; i++) {
+	for (i = 0; i < update_data->tasks->len; i++) {
 		TrackerProcessingTask *task;
+		GError *error = NULL;
 
-		task = g_ptr_array_index (sparql_array, i);
+		task = g_ptr_array_index (update_data->tasks, i);
 
 		/* Before calling user-provided callback, REMOVE the task from the pool;
 		 * as the user-provided callback may actually modify the pool again */
 		tracker_processing_pool_remove_task (task->pool, task);
 
-		/* Call finished handler with the error, if any */
-		task->finished_handler (task, task->finished_user_data, global_error);
+		if (global_error) {
+			error = global_error;
+		} else {
+			gint error_pos;
 
-		/* FIXME: sparql_array_errors don't match the task list anymore.
-		 *                      (global_error ?
-		 *                       global_error :
-		 *                       g_ptr_array_index (sparql_array_errors, i)));
-		 */
+			error_pos = g_array_index (update_data->error_map, gint, i);
+
+			/* Find the corresponing error according to the passed map,
+			 * numbers >= 0 are non-bulk tasks, and < 0 are bulk tasks,
+			 * so the number of bulk operations must be added, as these
+			 * tasks are prepended.
+			 */
+			error_pos += update_data->n_bulk_operations;
+			error = g_ptr_array_index (sparql_array_errors, error_pos);
+		}
+
+		/* Call finished handler with the error, if any */
+		task->finished_handler (task, task->finished_user_data, error);
 
 		/* No need to deallocate the task here, it will be done when
-		 * unref-ing the GPtrArray below */
+		 * unref-ing the UpdateArrayData below */
 	}
 
 	/* Unref the arrays of errors and queries */
 	if (sparql_array_errors)
 		g_ptr_array_unref (sparql_array_errors);
+
 	/* Note that tasks are actually deallocated here */
-	g_ptr_array_unref (sparql_array);
-	g_clear_error (&global_error);
+	g_ptr_array_free (update_data->tasks, TRUE);
+	g_array_free (update_data->error_map, TRUE);
+	g_slice_free (UpdateArrayData, update_data);
+
+	if (global_error) {
+		g_error_free (global_error);
+	}
 }
 
 static void
@@ -738,7 +761,8 @@ tracker_processing_pool_buffer_flush (TrackerProcessingPool *pool,
                                       const gchar           *reason)
 {
 	GPtrArray *bulk_ops;
-	GArray *sparql_array;
+	GArray *sparql_array, *error_map;
+	UpdateArrayData *update_data;
 	guint i, j;
 
 	if (!pool->sparql_buffer)
@@ -747,9 +771,11 @@ tracker_processing_pool_buffer_flush (TrackerProcessingPool *pool,
 	/* Loop buffer and construct array of strings */
 	sparql_array = g_array_new (FALSE, TRUE, sizeof (gchar *));
 	bulk_ops = g_ptr_array_new_with_free_func ((GDestroyNotify) bulk_operation_merge_free);
+	error_map = g_array_new (TRUE, TRUE, sizeof (gint));
 
 	for (i = 0; i < pool->sparql_buffer->len; i++) {
 		TrackerProcessingTask *task;
+		gint pos;
 
 		task = g_ptr_array_index (pool->sparql_buffer, i);
 
@@ -765,9 +791,11 @@ tracker_processing_pool_buffer_flush (TrackerProcessingPool *pool,
 
 		if (task->content == CONTENT_SPARQL_STRING) {
 			g_array_append_val (sparql_array, task->data.string);
+			pos = sparql_array->len - 1;
 		} else if (task->content == CONTENT_SPARQL_BUILDER) {
 			const gchar *str = tracker_sparql_builder_get_result (task->data.builder);
 			g_array_append_val (sparql_array, str);
+			pos = sparql_array->len - 1;
 		} else if (task->content == CONTENT_BULK_OPERATION) {
 			BulkOperationMerge *bulk = NULL;
 			gint j;
@@ -779,6 +807,7 @@ tracker_processing_pool_buffer_flush (TrackerProcessingPool *pool,
 
 				if (cur->bulk_operation == task->data.bulk.bulk_operation) {
 					bulk = cur;
+					pos = - 1 - j;
 					break;
 				}
 			}
@@ -786,10 +815,13 @@ tracker_processing_pool_buffer_flush (TrackerProcessingPool *pool,
 			if (!bulk) {
 				bulk = bulk_operation_merge_new (task->data.bulk.bulk_operation);
 				g_ptr_array_add (bulk_ops, bulk);
+				pos = - bulk_ops->len;
 			}
 
 			bulk->tasks = g_list_prepend (bulk->tasks, task);
 		}
+
+		g_array_append_val (error_map, pos);
 	}
 
 	for (j = 0; j < bulk_ops->len; j++) {
@@ -809,13 +841,18 @@ tracker_processing_pool_buffer_flush (TrackerProcessingPool *pool,
 	       pool->sparql_buffer->len,
 	       reason ? reason : "Unknown reason");
 
+	update_data = g_slice_new0 (UpdateArrayData);
+	update_data->tasks = pool->sparql_buffer;
+	update_data->n_bulk_operations = bulk_ops->len;
+	update_data->error_map = error_map;
+
 	tracker_sparql_connection_update_array_async (tracker_miner_get_connection (TRACKER_MINER (pool->miner)),
 	                                              (gchar **) sparql_array->data,
 	                                              sparql_array->len,
 	                                              G_PRIORITY_DEFAULT,
 	                                              NULL,
 	                                              tracker_processing_pool_sparql_update_array_cb,
-	                                              pool->sparql_buffer);
+	                                              update_data);
 
 	/* Clear current parent */
 	if (pool->sparql_buffer_current_parent) {



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]