[tracker/journal-compression-review: 4/12] libtracker-db: Support reading compressed journals



commit e0321389b8ebcb5f9e5fe8abf47804f5de563bc0
Author: Jürg Billeter <j bitron ch>
Date:   Fri May 14 09:35:38 2010 +0200

    libtracker-db: Support reading compressed journals

 src/libtracker-data/tracker-data-manager.c |    5 +-
 src/libtracker-data/tracker-data-update.c  |    2 +-
 src/libtracker-db/tracker-db-journal.c     |  606 ++++++++++++++++++----------
 3 files changed, 402 insertions(+), 211 deletions(-)
---
diff --git a/src/libtracker-data/tracker-data-manager.c b/src/libtracker-data/tracker-data-manager.c
index 359d253..898badd 100644
--- a/src/libtracker-data/tracker-data-manager.c
+++ b/src/libtracker-data/tracker-data-manager.c
@@ -990,7 +990,8 @@ load_ontology_from_journal (GHashTable **classes_out,
 	properties = g_hash_table_new_full (g_direct_hash, g_direct_equal,
 	                                    NULL, (GDestroyNotify) g_object_unref);
 
-	id_uri_map = g_hash_table_new (g_direct_hash, g_direct_equal);
+	id_uri_map = g_hash_table_new_full (g_direct_hash, g_direct_equal,
+	                                    NULL, g_free);
 
 	while (tracker_db_journal_reader_next (NULL)) {
 		TrackerDBJournalEntryType type;
@@ -1001,7 +1002,7 @@ load_ontology_from_journal (GHashTable **classes_out,
 			const gchar *uri;
 
 			tracker_db_journal_reader_get_resource (&id, &uri);
-			g_hash_table_insert (id_uri_map, GINT_TO_POINTER (id), (gpointer) uri);
+			g_hash_table_insert (id_uri_map, GINT_TO_POINTER (id), g_strdup (uri));
 		} else if (type == TRACKER_DB_JOURNAL_END_TRANSACTION) {
 			/* end of initial transaction => end of ontology */
 			break;
diff --git a/src/libtracker-data/tracker-data-update.c b/src/libtracker-data/tracker-data-update.c
index a8ab034..1a34944 100644
--- a/src/libtracker-data/tracker-data-update.c
+++ b/src/libtracker-data/tracker-data-update.c
@@ -2652,7 +2652,7 @@ tracker_data_replay_journal (GHashTable          *classes,
 			tracker_db_journal_reader_get_resource (&id, &uri);
 
 			if (in_ontology) {
-				g_hash_table_insert (id_uri_map, GINT_TO_POINTER (id), (gpointer) uri);
+				g_hash_table_insert (id_uri_map, GINT_TO_POINTER (id), g_strdup (uri));
 				continue;
 			}
 
diff --git a/src/libtracker-db/tracker-db-journal.c b/src/libtracker-db/tracker-db-journal.c
index 8b95ba0..f6e2d23 100644
--- a/src/libtracker-db/tracker-db-journal.c
+++ b/src/libtracker-db/tracker-db-journal.c
@@ -68,6 +68,7 @@ typedef enum {
 
 typedef struct {
 	gchar *filename;
+	GDataInputStream *stream;
 	GMappedFile *file;
 	const gchar *current;
 	const gchar *end;
@@ -78,12 +79,12 @@ typedef struct {
 	guint32 amount_of_triples;
 	gint64 time;
 	TrackerDBJournalEntryType type;
-	const gchar *uri;
+	gchar *uri;
 	gint g_id;
 	gint s_id;
 	gint p_id;
 	gint o_id;
-	const gchar *object;
+	gchar *object;
 	guint current_file;
 	gchar *rotate_to;
 } JournalReader;
@@ -110,6 +111,28 @@ static JournalWriter writer = {0};
 
 static gboolean tracker_db_journal_rotate (void);
 
+static gboolean
+journal_eof (JournalReader *jreader)
+{
+	if (jreader->stream) {
+		GBufferedInputStream *bstream;
+
+		bstream = G_BUFFERED_INPUT_STREAM (jreader->stream);
+
+		if (g_buffered_input_stream_get_available (bstream) == 0) {
+			if (g_buffered_input_stream_fill (bstream, -1, NULL, NULL) == 0) {
+				return TRUE;
+			}
+		}
+	} else {
+		if (jreader->current >= jreader->end) {
+			return TRUE;
+		}
+	}
+
+	return FALSE;
+}
+
 static guint32
 read_uint32 (const guint8 *data)
 {
@@ -119,6 +142,154 @@ read_uint32 (const guint8 *data)
 	       data[3];
 }
 
+static guint32
+journal_read_uint32 (JournalReader  *jreader,
+                     GError        **error)
+{
+	guint32 result;
+
+	if (jreader->stream) {
+		result = g_data_input_stream_read_uint32 (jreader->stream, NULL, error);
+	} else {
+		if (jreader->end - jreader->current < sizeof (guint32)) {
+			/* damaged journal entry */
+			g_set_error (error, TRACKER_DB_JOURNAL_ERROR, 0, 
+			             "Damaged journal entry, %d < sizeof(guint32)",
+			             (gint) (jreader->end - jreader->current));
+			return 0;
+		}
+
+		result = read_uint32 (jreader->current);
+		jreader->current += 4;
+	}
+
+	return result;
+}
+
+/* based on GDataInputStream code */
+static gssize
+scan_for_nul (GBufferedInputStream *stream,
+              gsize                *checked_out)
+{
+	const gchar *buffer;
+	gsize start, end, peeked;
+	gint i;
+	gsize available, checked;
+
+	checked = *checked_out;
+
+	start = checked;
+	buffer = (const gchar *) g_buffered_input_stream_peek_buffer (stream, &available) + start;
+	end = available;
+	peeked = end - start;
+
+	for (i = 0; checked < available && i < peeked; i++) {
+		if (buffer[i] == '\0') {
+			return (start + i);
+		}
+	}
+
+	checked = end;
+
+	*checked_out = checked;
+	return -1;
+}
+
+static gchar *
+journal_read_string (JournalReader  *jreader,
+                     GError        **error)
+{
+	gchar *result;
+
+	if (jreader->stream) {
+		/* based on GDataInputStream code */
+
+		GBufferedInputStream *bstream;
+		gsize checked;
+		gssize found_pos;
+
+		bstream = G_BUFFERED_INPUT_STREAM (jreader->stream);
+
+		checked = 0;
+
+		while ((found_pos = scan_for_nul (bstream, &checked)) == -1) {
+			if (g_buffered_input_stream_get_available (bstream) == g_buffered_input_stream_get_buffer_size (bstream)) {
+				g_buffered_input_stream_set_buffer_size (bstream, 2 * g_buffered_input_stream_get_buffer_size (bstream));
+			}
+
+			if (g_buffered_input_stream_fill (bstream, -1, NULL, error) <= 0) {
+				/* error or end of stream */
+				g_set_error (error, TRACKER_DB_JOURNAL_ERROR, 0,
+					     "Damaged journal entry, no terminating zero found");
+				return NULL;
+			}
+		}
+
+		result = g_malloc (found_pos + 1);
+		g_input_stream_read (G_INPUT_STREAM (bstream), result, found_pos + 1, NULL, NULL);
+	} else {
+		gsize str_length;
+
+		str_length = strnlen (jreader->current, jreader->end - jreader->current);
+		if (str_length == jreader->end - jreader->current) {
+			/* damaged journal entry (no terminating '\0' character) */
+			g_set_error (error, TRACKER_DB_JOURNAL_ERROR, 0,
+				     "Damaged journal entry, no terminating zero found");
+			return NULL;
+
+		}
+
+		result = g_strdup (jreader->current);
+
+		jreader->current += str_length + 1;
+	}
+
+	if (!g_utf8_validate (result, -1, NULL)) {
+		/* damaged journal entry (invalid UTF-8) */
+		g_set_error (error, TRACKER_DB_JOURNAL_ERROR, 0,
+			     "Damaged journal entry, invalid UTF-8");
+		g_free (result);
+		return NULL;
+	}
+
+	return result;
+}
+
+static gboolean
+journal_verify_header (JournalReader *jreader)
+{
+	gchar header[8];
+	gint i;
+	GError *error = NULL;
+
+	if (jreader->stream) {
+		for (i = 0; i < sizeof (header); i++) {
+			header[i] = g_data_input_stream_read_byte (jreader->stream, NULL, &error);
+			if (error) {
+				g_clear_error (&error);
+				return FALSE;
+			}
+		}
+
+		if (memcmp (header, "trlog\00003", 8)) {
+			return FALSE;
+		}
+	} else {
+		/* verify journal file header */
+		if (jreader->end - jreader->current < 8) {
+			return FALSE;
+		}
+
+		if (memcmp (jreader->current, "trlog\00003", 8)) {
+			return FALSE;
+		}
+
+		jreader->current += 8;
+	}
+
+	return TRUE;
+}
+
 void
 tracker_db_journal_get_rotating (gboolean *do_rotating,
                                  gsize    *chunk_size,
@@ -866,42 +1037,68 @@ db_journal_reader_init (JournalReader *jreader,
 	}
 
 	jreader->type = TRACKER_DB_JOURNAL_START;
-	jreader->file = g_mapped_file_new (filename_open, FALSE, &error);
 
-	g_free (filename_open);
+	if (g_str_has_suffix (filename_open, ".gz")) {
+		GFile *file;
+		GInputStream *stream, *cstream;
+		GConverter *converter;
+
+		file = g_file_new_for_path (filename_open);
+		g_free (filename_open);
+
+		stream = G_INPUT_STREAM (g_file_read (file, NULL, &error));
+		g_object_unref (file);
+		if (error) {
+			if (!g_error_matches (error, G_IO_ERROR, G_IO_ERROR_NOT_FOUND)) {
+				/* do not warn if the file does not exist, just return FALSE */
+				g_warning ("Could not create TrackerDBJournalReader for file '%s', %s",
+					   jreader->filename,
+					   error->message ? error->message : "no error given");
+			}
+			g_error_free (error);
+			g_free (jreader->filename);
+			jreader->filename = NULL;
 
-	if (error) {
-		if (!g_error_matches (error, G_FILE_ERROR, G_FILE_ERROR_NOENT)) {
-			/* do not warn if the file does not exist, just return FALSE */
-			g_warning ("Could not create TrackerDBJournalReader for file '%s', %s",
-			           jreader->filename,
-			           error->message ? error->message : "no error given");
+			return FALSE;
 		}
-		g_error_free (error);
-		g_free (jreader->filename);
-		jreader->filename = NULL;
 
-		return FALSE;
-	}
+		converter = G_CONVERTER (g_zlib_decompressor_new (G_ZLIB_COMPRESSOR_FORMAT_GZIP));
+		cstream = g_converter_input_stream_new (stream, converter);
+		g_object_unref (stream);
+		g_object_unref (converter);
 
-	jreader->last_success = jreader->start = jreader->current = 
-		g_mapped_file_get_contents (jreader->file);
+		jreader->stream = g_data_input_stream_new (cstream);
+		g_object_unref (cstream);
+	} else {
+		jreader->file = g_mapped_file_new (filename_open, FALSE, &error);
 
-	jreader->end = jreader->current + g_mapped_file_get_length (jreader->file);
+		g_free (filename_open);
 
-	/* verify journal file header */
-	if (jreader->end - jreader->current < 8) {
-		tracker_db_journal_reader_shutdown ();
-		return FALSE;
+		if (error) {
+			if (!g_error_matches (error, G_FILE_ERROR, G_FILE_ERROR_NOENT)) {
+				/* do not warn if the file does not exist, just return FALSE */
+				g_warning ("Could not create TrackerDBJournalReader for file '%s', %s",
+					   jreader->filename,
+					   error->message ? error->message : "no error given");
+			}
+			g_error_free (error);
+			g_free (jreader->filename);
+			jreader->filename = NULL;
+
+			return FALSE;
+		}
+
+		jreader->last_success = jreader->start = jreader->current = 
+			g_mapped_file_get_contents (jreader->file);
+
+		jreader->end = jreader->current + g_mapped_file_get_length (jreader->file);
 	}
 
-	if (memcmp (jreader->current, "trlog\00003", 8)) {
+	if (!journal_verify_header (jreader)) {
 		tracker_db_journal_reader_shutdown ();
 		return FALSE;
 	}
 
-	jreader->current += 8;
-
 	return TRUE;
 }
 
@@ -914,7 +1111,7 @@ tracker_db_journal_reader_init (const gchar *filename)
 gsize
 tracker_db_journal_reader_get_size_of_correct (void)
 {
-	g_return_val_if_fail (reader.file != NULL, FALSE);
+	g_return_val_if_fail (reader.file != NULL || reader.stream != NULL, FALSE);
 
 	return (gsize) (reader.last_success - reader.start);
 }
@@ -927,64 +1124,84 @@ reader_next_file (GError **error)
 
 	filename_open = reader_get_next_filepath (&reader);
 
+	if (g_str_has_suffix (filename_open, ".gz")) {
+		GFile *file;
+		GInputStream *stream, *cstream;
+		GConverter *converter;
+
+		g_object_unref (reader.stream);
+
+		file = g_file_new_for_path (filename_open);
+		g_free (filename_open);
+
+		stream = G_INPUT_STREAM (g_file_read (file, NULL, &new_error));
+		g_object_unref (file);
+		if (new_error) {
+			g_propagate_error (error, new_error);
+			return FALSE;
+		}
+
+		converter = G_CONVERTER (g_zlib_decompressor_new (G_ZLIB_COMPRESSOR_FORMAT_GZIP));
+		cstream = g_converter_input_stream_new (stream, converter);
+		g_object_unref (stream);
+		g_object_unref (converter);
+
+		reader.stream = g_data_input_stream_new (cstream);
+		g_object_unref (cstream);
+	} else {
 #if GLIB_CHECK_VERSION(2,22,0)
-	g_mapped_file_unref (reader.file);
+		g_mapped_file_unref (reader.file);
 #else
-	g_mapped_file_free (reader.file);
+		g_mapped_file_free (reader.file);
 #endif
 
-	reader.file = g_mapped_file_new (filename_open, FALSE, &new_error);
+		reader.file = g_mapped_file_new (filename_open, FALSE, &new_error);
+		g_free (filename_open);
 
-	if (new_error) {
-		g_propagate_error (error, new_error);
-		return FALSE;
-	}
-
-	reader.last_success = reader.start = reader.current = 
-		g_mapped_file_get_contents (reader.file);
+		if (new_error) {
+			g_propagate_error (error, new_error);
+			return FALSE;
+		}
 
-	reader.end = reader.current + g_mapped_file_get_length (reader.file);
+		reader.last_success = reader.start = reader.current = 
+			g_mapped_file_get_contents (reader.file);
 
-	/* verify journal file header */
-	if (reader.end - reader.current < 8) {
-		g_set_error (error, TRACKER_DB_JOURNAL_ERROR, 0, 
-		             "Damaged journal entry at begin of journal");
-		tracker_db_journal_reader_shutdown ();
-		return FALSE;
+		reader.end = reader.current + g_mapped_file_get_length (reader.file);
 	}
 
-	if (memcmp (reader.current, "trlog\00003", 8)) {
+	if (!journal_verify_header (&reader)) {
 		g_set_error (error, TRACKER_DB_JOURNAL_ERROR, 0, 
 		             "Damaged journal entry at begin of journal");
 		tracker_db_journal_reader_shutdown ();
 		return FALSE;
 	}
 
-	reader.current += 8;
-
 	reader.type = TRACKER_DB_JOURNAL_END_TRANSACTION;
 
 	reader.entry_begin = NULL;
 	reader.entry_end = NULL;
 	reader.amount_of_triples = 0;
 
-	g_free (filename_open);
-
 	return TRUE;
 }
 
 static gboolean
 db_journal_reader_shutdown (JournalReader *jreader)
 {
-	g_return_val_if_fail (jreader->file != NULL, FALSE);
+	g_return_val_if_fail (jreader->file != NULL || jreader->stream != NULL, FALSE);
 
+	if (reader.stream) {
+		g_object_unref (reader.stream);
+		reader.stream = NULL;
+	} else {
 #if GLIB_CHECK_VERSION(2,22,0)
-	g_mapped_file_unref (jreader->file);
+		g_mapped_file_unref (jreader->file);
 #else
-	g_mapped_file_free (jreader->file);
+		g_mapped_file_free (jreader->file);
 #endif
 
-	jreader->file = NULL;
+		jreader->file = NULL;
+	}
 
 	g_free (jreader->filename);
 	jreader->filename = NULL;
@@ -1016,7 +1233,7 @@ tracker_db_journal_reader_shutdown (void)
 TrackerDBJournalEntryType
 tracker_db_journal_reader_get_type (void)
 {
-	g_return_val_if_fail (reader.file != NULL, FALSE);
+	g_return_val_if_fail (reader.file != NULL || reader.stream != NULL, FALSE);
 
 	return reader.type;
 }
@@ -1024,7 +1241,19 @@ tracker_db_journal_reader_get_type (void)
 static gboolean
 db_journal_reader_next (JournalReader *jreader, gboolean global_reader, GError **error)
 {
-	g_return_val_if_fail (jreader->file != NULL, FALSE);
+	GError *inner_error = NULL;
+
+	g_return_val_if_fail (jreader->file != NULL || jreader->stream != NULL, FALSE);
+
+	/* reset struct */
+	g_free (jreader->uri);
+	jreader->uri = NULL;
+	jreader->g_id = 0;
+	jreader->s_id = 0;
+	jreader->p_id = 0;
+	jreader->o_id = 0;
+	g_free (jreader->object);
+	jreader->object = NULL;
 
 	/*
 	 * Visual layout of the data in the binary journal:
@@ -1061,28 +1290,24 @@ db_journal_reader_next (JournalReader *jreader, gboolean global_reader, GError *
 		guint32 crc_check;
 		TransactionFormat t_kind;
 
-		/* Check the end is not before where we currently are */
-		if (jreader->current >= jreader->end) {
-				/* Return FALSE as there is no further entry but
-				 * do not set error as it's not an error case. */
-				if (global_reader && jreader->current_file != 0)
-					return reader_next_file (error);
-				else
-					return FALSE;
+		/* Check the end is not where we currently are */
+		if (journal_eof (jreader)) {
+			/* Return FALSE as there is no further entry but
+			 * do not set error as it's not an error case. */
+			if (global_reader && jreader->current_file != 0)
+				return reader_next_file (error);
+			else
+				return FALSE;
 		}
 
-		/* Check the end is not smaller than the first uint32
-		 * for reading the entry size.
-		 */
-		if (jreader->end - jreader->current < sizeof (guint32)) {
-			g_set_error (error, TRACKER_DB_JOURNAL_ERROR, 0, 
-			             "Damaged journal entry, %d < sizeof(guint32) at start/end of journal",
-			             (gint) (jreader->end - jreader->current));
-			return FALSE;
-		}
+		jreader->entry_begin = jreader->current;
 
 		/* Read the first uint32 which contains the size */
-		entry_size = read_uint32 (jreader->current);
+		entry_size = journal_read_uint32 (jreader, &inner_error);
+		if (inner_error) {
+			g_propagate_error (error, inner_error);
+			return FALSE;
+		}
 
 		/* Check that entry is big enough for header and footer */
 		if (entry_size < 5 * sizeof (guint32)) {
@@ -1092,63 +1317,76 @@ db_journal_reader_next (JournalReader *jreader, gboolean global_reader, GError *
 			return FALSE;
 		}
 
-		/* Set the bounds for the entry */
-		jreader->entry_begin = jreader->current;
-		jreader->entry_end = jreader->entry_begin + entry_size;
-
-		/* Check the end of the entry does not exceed the end
-		 * of the journal.
-		 */
-		if (jreader->end < jreader->entry_end) {
-			g_set_error (error, TRACKER_DB_JOURNAL_ERROR, 0, 
-			             "Damaged journal entry, end < entry end");
-			return FALSE;
-		}
+		if (!jreader->stream) {
+			/* Set the bounds for the entry */
+			jreader->entry_end = jreader->entry_begin + entry_size;
 
-		/* Move the current potision of the journal past the
-		 * entry size we read earlier.
-		 */
-		jreader->current += 4;
+			/* Check the end of the entry does not exceed the end
+			 * of the journal.
+			 */
+			if (jreader->end < jreader->entry_end) {
+				g_set_error (error, TRACKER_DB_JOURNAL_ERROR, 0, 
+					     "Damaged journal entry, end < entry end");
+				return FALSE;
+			}
 
-		/* Read entry size check at the end of the entry */
-		entry_size_check = read_uint32 (jreader->entry_end - 4);
+			/* Read entry size check at the end of the entry */
+			entry_size_check = read_uint32 (jreader->entry_end - 4);
 
-		if (entry_size != entry_size_check) {
-			/* damaged journal entry */
-			g_set_error (error, TRACKER_DB_JOURNAL_ERROR, 0, 
-			             "Damaged journal entry, %d != %d (entry size != entry size check)", 
-			             entry_size, 
-			             entry_size_check);
-			return FALSE;
+			if (entry_size != entry_size_check) {
+				/* damaged journal entry */
+				g_set_error (error, TRACKER_DB_JOURNAL_ERROR, 0, 
+					     "Damaged journal entry, %d != %d (entry size != entry size check)", 
+					     entry_size, 
+					     entry_size_check);
+				return FALSE;
+			}
 		}
 
 		/* Read the amount of triples */
-		jreader->amount_of_triples = read_uint32 (jreader->current);
-		jreader->current += 4;
+		jreader->amount_of_triples = journal_read_uint32 (jreader, &inner_error);
+		if (inner_error) {
+			g_propagate_error (error, inner_error);
+			return FALSE;
+		}
 
 		/* Read the crc */
-		crc_check = read_uint32 (jreader->current);
-		jreader->current += 4;
+		crc_check = journal_read_uint32 (jreader, &inner_error);
+		if (inner_error) {
+			g_propagate_error (error, inner_error);
+			return FALSE;
+		}
 
-		/* Calculate the crc */
-		crc = tracker_crc32 (jreader->entry_begin + (sizeof (guint32) * 3), entry_size - (sizeof (guint32) * 3));
+		if (!jreader->stream) {
+			// Maybe read in whole transaction in one buffer, so we can do CRC even without mmap (when reading compressed journals)
+			// might this be too problematic memory-wise
 
-		/* Verify checksum */
-		if (crc != crc_check) {
-			/* damaged journal entry */
-			g_set_error (error, TRACKER_DB_JOURNAL_ERROR, 0, 
-			             "Damaged journal entry, 0x%.8x != 0x%.8x (crc32 failed)",
-			             crc,
-			             crc_check);
-			return FALSE;
+			/* Calculate the crc */
+			crc = tracker_crc32 (jreader->entry_begin + (sizeof (guint32) * 3), entry_size - (sizeof (guint32) * 3));
+
+			/* Verify checksum */
+			if (crc != crc_check) {
+				/* damaged journal entry */
+				g_set_error (error, TRACKER_DB_JOURNAL_ERROR, 0, 
+					     "Damaged journal entry, 0x%.8x != 0x%.8x (crc32 failed)",
+					     crc,
+					     crc_check);
+				return FALSE;
+			}
 		}
 
 		/* Read the timestamp */
-		jreader->time = read_uint32 (jreader->current);
-		jreader->current += 4;
+		jreader->time = journal_read_uint32 (jreader, &inner_error);
+		if (inner_error) {
+			g_propagate_error (error, inner_error);
+			return FALSE;
+		}
 
-		t_kind = read_uint32 (jreader->current);
-		jreader->current += 4;
+		t_kind = journal_read_uint32 (jreader, &inner_error);
+		if (inner_error) {
+			g_propagate_error (error, inner_error);
+			return FALSE;
+		}
 
 		if (t_kind == TRANSACTION_FORMAT_DATA)
 			jreader->type = TRACKER_DB_JOURNAL_START_TRANSACTION;
@@ -1159,67 +1397,51 @@ db_journal_reader_next (JournalReader *jreader, gboolean global_reader, GError *
 	} else if (jreader->amount_of_triples == 0) {
 		/* end of transaction */
 
-		jreader->current += 4;
-		if (jreader->current != jreader->entry_end) {
-			/* damaged journal entry */
-			g_set_error (error, TRACKER_DB_JOURNAL_ERROR, 0, 
-			             "Damaged journal entry, %p != %p (end of transaction with 0 triples)",
-			             jreader->current,
-			             jreader->entry_end);
+		/* read redundant entry size at end of transaction */
+		journal_read_uint32 (jreader, &inner_error);
+		if (inner_error) {
+			g_propagate_error (error, inner_error);
 			return FALSE;
 		}
 
+		if (!jreader->stream) {
+			if (jreader->current != jreader->entry_end) {
+				/* damaged journal entry */
+				g_set_error (error, TRACKER_DB_JOURNAL_ERROR, 0, 
+					     "Damaged journal entry, %p != %p (end of transaction with 0 triples)",
+					     jreader->current,
+					     jreader->entry_end);
+				return FALSE;
+			}
+		}
+
 		jreader->type = TRACKER_DB_JOURNAL_END_TRANSACTION;
 		jreader->last_success = jreader->current;
 
 		return TRUE;
 	} else {
 		DataFormat df;
-		gsize str_length;
 
-		if (jreader->end - jreader->current < sizeof (guint32)) {
-			/* damaged journal entry */
-			g_set_error (error, TRACKER_DB_JOURNAL_ERROR, 0, 
-			             "Damaged journal entry, %d < sizeof(guint32)",
-			             (gint) (jreader->end - jreader->current));
+		df = journal_read_uint32 (jreader, &inner_error);
+		if (inner_error) {
+			g_propagate_error (error, inner_error);
 			return FALSE;
 		}
 
-		df = read_uint32 (jreader->current);
-		jreader->current += 4;
-
 		if (df == DATA_FORMAT_RESOURCE_INSERT) {
 			jreader->type = TRACKER_DB_JOURNAL_RESOURCE;
 
-			if (jreader->end - jreader->current < sizeof (guint32) + 1) {
-				/* damaged journal entry */
-				g_set_error (error, TRACKER_DB_JOURNAL_ERROR, 0, 
-				             "Damaged journal entry, %d < sizeof(guint32) + 1 for resource",
-				             (gint) (jreader->end - jreader->current));
-				return FALSE;
-			}
-
-			jreader->s_id = read_uint32 (jreader->current);
-			jreader->current += 4;
-
-			str_length = strnlen (jreader->current, jreader->end - jreader->current);
-			if (str_length == jreader->end - jreader->current) {
-				/* damaged journal entry (no terminating '\0' character) */
-				g_set_error (error, TRACKER_DB_JOURNAL_ERROR, 0, 
-				             "Damaged journal entry, no terminating zero found for resource");
+			jreader->s_id = journal_read_uint32 (jreader, &inner_error);
+			if (inner_error) {
+				g_propagate_error (error, inner_error);
 				return FALSE;
-
 			}
 
-			if (!g_utf8_validate (jreader->current, -1, NULL)) {
-				/* damaged journal entry (invalid UTF-8) */
-				g_set_error (error, TRACKER_DB_JOURNAL_ERROR, 0, 
-				             "Damaged journal entry, invalid UTF-8 for resource");
+			jreader->uri = journal_read_string (jreader, &inner_error);
+			if (inner_error) {
+				g_propagate_error (error, inner_error);
 				return FALSE;
 			}
-
-			jreader->uri = jreader->current;
-			jreader->current += str_length + 1;
 		} else {
 			if (df & DATA_FORMAT_OPERATION_DELETE) {
 				if (df & DATA_FORMAT_OBJECT_ID) {
@@ -1236,73 +1458,41 @@ db_journal_reader_next (JournalReader *jreader, gboolean global_reader, GError *
 			}
 
 			if (df & DATA_FORMAT_GRAPH) {
-				if (jreader->end - jreader->current < sizeof (guint32)) {
-					/* damaged journal entry */
-					g_set_error (error, TRACKER_DB_JOURNAL_ERROR, 0, 
-						     "Damaged journal entry, %d < sizeof(guint32)",
-						     (gint) (jreader->end - jreader->current));
+				/* named graph */
+				jreader->g_id = journal_read_uint32 (jreader, &inner_error);
+				if (inner_error) {
+					g_propagate_error (error, inner_error);
 					return FALSE;
 				}
-
-				/* named graph */
-				jreader->g_id = read_uint32 (jreader->current);
-				jreader->current += 4;
 			} else {
 				/* default graph */
 				jreader->g_id = 0;
 			}
 
-			if (jreader->end - jreader->current < 2 * sizeof (guint32)) {
-				/* damaged journal entry */
-				g_set_error (error, TRACKER_DB_JOURNAL_ERROR, 0, 
-				             "Damaged journal entry, %d < 2 * sizeof(guint32)",
-				             (gint) (jreader->end - jreader->current));
+			jreader->s_id = journal_read_uint32 (jreader, &inner_error);
+			if (inner_error) {
+				g_propagate_error (error, inner_error);
 				return FALSE;
 			}
 
-			jreader->s_id = read_uint32 (jreader->current);
-			jreader->current += 4;
-
-			jreader->p_id = read_uint32 (jreader->current);
-			jreader->current += 4;
+			jreader->p_id = journal_read_uint32 (jreader, &inner_error);
+			if (inner_error) {
+				g_propagate_error (error, inner_error);
+				return FALSE;
+			}
 
 			if (df & DATA_FORMAT_OBJECT_ID) {
-				if (jreader->end - jreader->current < sizeof (guint32)) {
-					/* damaged journal entry */
-					g_set_error (error, TRACKER_DB_JOURNAL_ERROR, 0, 
-					             "Damaged journal entry, %d < sizeof(guint32) for data format 2",
-					             (gint) (jreader->end - jreader->current));
+				jreader->o_id = journal_read_uint32 (jreader, &inner_error);
+				if (inner_error) {
+					g_propagate_error (error, inner_error);
 					return FALSE;
 				}
-
-				jreader->o_id = read_uint32 (jreader->current);
-				jreader->current += 4;
 			} else {
-				if (jreader->end - jreader->current < 1) {
-					/* damaged journal entry */
-					g_set_error (error, TRACKER_DB_JOURNAL_ERROR, 0, 
-					             "Damaged journal entry, %d < 1",
-					             (gint) (jreader->end - jreader->current));
+				jreader->object = journal_read_string (jreader, &inner_error);
+				if (inner_error) {
+					g_propagate_error (error, inner_error);
 					return FALSE;
 				}
-
-				str_length = strnlen (jreader->current, jreader->end - jreader->current);
-				if (str_length == jreader->end - jreader->current) {
-					/* damaged journal entry (no terminating '\0' character) */
-					g_set_error (error, TRACKER_DB_JOURNAL_ERROR, 0, 
-					             "Damaged journal entry, no terminating zero found");
-					return FALSE;
-				}
-
-				if (!g_utf8_validate (jreader->current, -1, NULL)) {
-					/* damaged journal entry (invalid UTF-8) */
-					g_set_error (error, TRACKER_DB_JOURNAL_ERROR, 0, 
-					             "Damaged journal entry, invalid UTF-8");
-					return FALSE;
-				}
-
-				jreader->object = jreader->current;
-				jreader->current += str_length + 1;
 			}
 		}
 
@@ -1357,7 +1547,7 @@ gboolean
 tracker_db_journal_reader_get_resource (gint         *id,
                                         const gchar **uri)
 {
-	g_return_val_if_fail (reader.file != NULL, FALSE);
+	g_return_val_if_fail (reader.file != NULL || reader.stream != NULL, FALSE);
 	g_return_val_if_fail (reader.type == TRACKER_DB_JOURNAL_RESOURCE, FALSE);
 
 	*id = reader.s_id;
@@ -1372,7 +1562,7 @@ tracker_db_journal_reader_get_statement (gint         *g_id,
                                          gint         *p_id,
                                          const gchar **object)
 {
-	g_return_val_if_fail (reader.file != NULL, FALSE);
+	g_return_val_if_fail (reader.file != NULL || reader.stream != NULL, FALSE);
 	g_return_val_if_fail (reader.type == TRACKER_DB_JOURNAL_INSERT_STATEMENT ||
 	                      reader.type == TRACKER_DB_JOURNAL_DELETE_STATEMENT,
 	                      FALSE);
@@ -1393,7 +1583,7 @@ tracker_db_journal_reader_get_statement_id (gint *g_id,
                                             gint *p_id,
                                             gint *o_id)
 {
-	g_return_val_if_fail (reader.file != NULL, FALSE);
+	g_return_val_if_fail (reader.file != NULL || reader.stream != NULL, FALSE);
 	g_return_val_if_fail (reader.type == TRACKER_DB_JOURNAL_INSERT_STATEMENT_ID ||
 	                      reader.type == TRACKER_DB_JOURNAL_DELETE_STATEMENT_ID,
 	                      FALSE);



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]