[tracker/tablelockedproblem: 2/2] tracker-store: Make TTL import a update_running case like any other



commit 547d3a3dfeba155ac98f33cd0be9048cfc254df5
Author: Philip Van Hoof <philip codeminded be>
Date:   Mon May 3 13:22:07 2010 +0200

    tracker-store: Make TTL import a update_running case like any other

 src/tracker-store/tracker-store.c |  235 ++++++++++++++++---------------------
 1 files changed, 100 insertions(+), 135 deletions(-)
---
diff --git a/src/tracker-store/tracker-store.c b/src/tracker-store/tracker-store.c
index a4495d3..6d3f65f 100644
--- a/src/tracker-store/tracker-store.c
+++ b/src/tracker-store/tracker-store.c
@@ -78,7 +78,6 @@ typedef struct {
 			GPtrArray    *blank_nodes;
 		} update;
 		struct {
-			gboolean      in_progress;
 			gchar        *path;
 		} turtle;
 	} data;
@@ -132,75 +131,6 @@ store_task_free (TrackerStoreTask *task)
 	g_slice_free (TrackerStoreTask, task);
 }
 
-static gboolean
-process_turtle_file_part (TrackerTurtleReader *reader, GError **error)
-{
-	int i;
-	GError *new_error = NULL;
-
-	/* process 10 statements at once before returning to main loop */
-
-	i = 0;
-
-	/* There is no logical structure in turtle files, so we have no choice
-	 * but fallback to fixed number of statements per transaction to avoid
-	 * blocking tracker-store.
-	 * Real applications should all use SPARQL update instead of turtle
-	 * import to avoid this issue.
-	 */
-	
-/*  SAVEPOINTS don't seem to work well with concurrent queries. I was getting
-	SQLITE_MISUSED as result for sqlite3_step() until I removed these 
-	tracker_data_begin_transaction (&new_error);
-	if (new_error) {
-		g_propagate_error (error, new_error);
-		return FALSE;
-	} */
-
-	while (new_error == NULL && tracker_turtle_reader_next (reader, &new_error)) {
-		/* insert statement */
-		if (tracker_turtle_reader_get_object_is_uri (reader)) {
-			tracker_data_insert_statement_with_uri (
-			                                        tracker_turtle_reader_get_graph (reader),
-			                                        tracker_turtle_reader_get_subject (reader),
-			                                        tracker_turtle_reader_get_predicate (reader),
-			                                        tracker_turtle_reader_get_object (reader),
-			                                        &new_error);
-		} else {
-			tracker_data_insert_statement_with_string (
-			                                           tracker_turtle_reader_get_graph (reader),
-			                                           tracker_turtle_reader_get_subject (reader),
-			                                           tracker_turtle_reader_get_predicate (reader),
-			                                           tracker_turtle_reader_get_object (reader),
-			                                           &new_error);
-		}
-
-		i++;
-		if (!new_error && i >= 10) {
-			/* return to main loop */
-			return TRUE;
-		}
-	}
-
-	if (new_error) {
-		/* Explanation for commenting out, see above
-		 * tracker_data_rollback_transaction (); */
-		g_propagate_error (error, new_error);
-		return FALSE;
-	}
-
-	/* Explanation for commenting out, see above
-	 * tracker_data_commit_transaction (&new_error); */
-	if (new_error) {
-		/* Explanation for commenting out, see above
-		 * tracker_data_rollback_transaction (); */
-		g_propagate_error (error, new_error);
-		return FALSE;
-	}
-
-	return FALSE;
-}
-
 static void
 begin_batch (TrackerStorePrivate *private)
 {
@@ -400,6 +330,20 @@ task_finish_cb (gpointer data)
 		}
 
 		private->update_running = FALSE;
+	} else if (task->type == TRACKER_STORE_TASK_TYPE_TURTLE) {
+		if (!task->error) {
+			tracker_data_notify_db_transaction ();
+		}
+
+		if (task->callback.turtle_callback) {
+			task->callback.turtle_callback (task->error, task->user_data);
+		}
+
+		if (task->error) {
+			g_clear_error (&task->error);
+		}
+
+		private->update_running = FALSE;
 	}
 
 	if (task->destroy) {
@@ -478,6 +422,88 @@ pool_dispatch_cb (gpointer data,
 
 	} else if (task->type == TRACKER_STORE_TASK_TYPE_COMMIT) {
 		end_batch (private);
+	} else if (task->type == TRACKER_STORE_TASK_TYPE_TURTLE) {
+		TrackerTurtleReader *reader = NULL;
+
+		reader = tracker_turtle_reader_new (task->data.turtle.path,
+		                                    &task->error);
+
+		if (!task->error) {
+			GError *new_error = NULL;
+
+			begin_batch (private);
+
+			tracker_data_begin_transaction (&new_error);
+
+			if (new_error) {
+				g_propagate_error (&task->error, new_error);
+				goto import_out;
+			}
+
+			while (new_error == NULL && tracker_turtle_reader_next (reader, &new_error)) {
+				/* insert statement */
+				if (tracker_turtle_reader_get_object_is_uri (reader)) {
+					tracker_data_insert_statement_with_uri (
+					                                        tracker_turtle_reader_get_graph (reader),
+					                                        tracker_turtle_reader_get_subject (reader),
+					                                        tracker_turtle_reader_get_predicate (reader),
+					                                        tracker_turtle_reader_get_object (reader),
+					                                        &new_error);
+				} else {
+					tracker_data_insert_statement_with_string (
+					                                           tracker_turtle_reader_get_graph (reader),
+					                                           tracker_turtle_reader_get_subject (reader),
+					                                           tracker_turtle_reader_get_predicate (reader),
+					                                           tracker_turtle_reader_get_object (reader),
+					                                           &new_error);
+				}
+
+				private->batch_count++;
+
+				if (private->batch_count >= TRACKER_STORE_TRANSACTION_MAX) {
+					tracker_data_commit_transaction (&new_error);
+
+					if (new_error) {
+						g_propagate_error (&task->error, new_error);
+						goto import_out;
+					}
+					end_batch (private);
+
+					/* We could allow SELECT queries at this point, but the
+					 * private->update_running prevents this. We can't allow
+					 * SELECT queries inside of the begin-end batch parts,
+					 * else we apparently get SQLITE_MISUSE. Just FYI */
+
+					begin_batch (private);
+					tracker_data_begin_transaction (&new_error);
+					if (new_error) {
+						g_propagate_error (&task->error, new_error);
+						goto import_out;
+					}
+				}
+			}
+
+			if (new_error) {
+				tracker_data_rollback_transaction ();
+				g_propagate_error (&task->error, new_error);
+				goto import_out;
+			}
+
+			tracker_data_commit_transaction (&new_error);
+			if (new_error) {
+				tracker_data_rollback_transaction ();
+				g_propagate_error (&task->error, new_error);
+				goto import_out;
+			}
+
+			import_out:
+			end_batch (private);
+		}
+
+		if (reader) {
+			g_object_unref (reader);
+		}
+
 	}
 
 	g_idle_add (task_finish_cb, task);
@@ -529,76 +555,15 @@ queue_idle_handler (gpointer user_data)
 		private->update_running = TRUE;
 
 		task_run_async (private, task);
-	} else if (task->type == TRACKER_STORE_TASK_TYPE_TURTLE) {
-		GError *error = NULL;
-		static TrackerTurtleReader *turtle_reader = NULL;
-
-		if (!task->data.turtle.in_progress) {
-			turtle_reader = tracker_turtle_reader_new (task->data.turtle.path, &error);
-			if (error) {
-				if (task->callback.turtle_callback) {
-					task->callback.turtle_callback (error, task->user_data);
-				}
-
-				turtle_reader = NULL;
-				g_clear_error (&error);
-
-				g_queue_pop_head (queue);
-
-				if (task->destroy) {
-					task->destroy (task->user_data);
-				}
-
-				store_task_free (task);
-
-				goto out;
-			}
-
-			begin_batch (private);
-			task->data.turtle.in_progress = TRUE;
-		} else {
-			if (private->batch_count == 0) {
-				begin_batch (private);
-			}
-		}
 
-		if (process_turtle_file_part (turtle_reader, &error)) {
-			/* import still in progress */
-			private->batch_count++;
-			if (private->batch_count >= TRACKER_STORE_TRANSACTION_MAX) {
-				end_batch (private);
-			}
-
-			/* Process function wont return true in case of error */
-
-			return TRUE;
-		} else {
-			/* import finished */
-			task->data.turtle.in_progress = FALSE;
-
-			end_batch (private);
-
-			if (task->callback.turtle_callback) {
-				task->callback.turtle_callback (error, task->user_data);
-			}
-
-			g_object_unref (turtle_reader);
-			turtle_reader = NULL;
-			if (error) {
-				g_clear_error (&error);
-			}
-
-			g_queue_pop_head (queue);
+	} else if (task->type == TRACKER_STORE_TASK_TYPE_TURTLE) {
+		g_queue_pop_head (queue);
 
-			if (task->destroy) {
-				task->destroy (task->user_data);
-			}
+		private->update_running = TRUE;
 
-			store_task_free (task);
-		}
+		task_run_async (private, task);
 	}
 
-out:
 	return task_ready (private);
 }
 



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]