[tracker/dbus-fd-experiment-gio: 1/11] Steroids server: port to GIO



commit 46f7e691e9ab60fe19bd2188f69cf4bb49413f82
Author: Adrien Bustany <abustany gnome org>
Date:   Tue Jun 8 09:49:22 2010 -0400

    Steroids server: port to GIO

 src/tracker-store/tracker-steroids.c |  265 ++++++++++++++--------------------
 1 files changed, 105 insertions(+), 160 deletions(-)
---
diff --git a/src/tracker-store/tracker-steroids.c b/src/tracker-store/tracker-steroids.c
index 2587a40..57f5d09 100644
--- a/src/tracker-store/tracker-steroids.c
+++ b/src/tracker-store/tracker-steroids.c
@@ -20,6 +20,8 @@
 #include <errno.h>
 #include <sys/types.h>
 #include <unistd.h>
+#include <gio/gunixoutputstream.h>
+#include <gio/gunixinputstream.h>
 
 #include "tracker-dbus.h"
 #include "tracker-steroids.h"
@@ -105,89 +107,6 @@ destroy_client_info (gpointer user_data)
 }
 
 static void
-buffer_write_int (char *dst, int value)
-{
-	memset (dst++, (value      ) & 0xff, sizeof (char));
-	memset (dst++, (value >>  8) & 0xff, sizeof (char));
-	memset (dst++, (value >> 16) & 0xff, sizeof (char));
-	memset (dst++, (value >> 24) & 0xff, sizeof (char));
-}
-
-static int
-buffer_read_int (char *src)
-{
-	int result = 0;
-
-	result += (((unsigned char)*(src++)));
-	result += (((unsigned char)*(src++)) <<  8);
-	result += (((unsigned char)*(src++)) << 16);
-	result += (((unsigned char)*(src++)) << 24);
-
-	return result;
-}
-
-static void
-client_write_int (ClientInfo *info, int value)
-{
-	char *dst = info->send_buffer + info->send_buffer_index;
-	buffer_write_int (dst, value);
-	info->send_buffer_index += sizeof (int);
-}
-
-static void
-buffer_send (int fd, char *buf, int size)
-{
-	ssize_t sent = 0;
-
-	while (sent != size) {
-		ssize_t ret = write (fd,
-		                     buf + sent,
-		                     size - sent);
-
-		if (ret == -1) {
-			g_critical ("Could not send buffer (error code %d)", errno);
-		}
-
-		sent += ret;
-	}
-}
-
-static void
-buffer_read (int fd, char *buf, int size)
-{
-	ssize_t received = 0;
-
-	while (received != size) {
-		ssize_t ret = read (fd,
-		                    buf + received,
-		                    size - received);
-
-		if (ret == -1) {
-			g_critical ("Could not read buffer (error code %d)", errno);
-		}
-
-		received += ret;
-	}
-}
-
-static void
-page_flush (ClientInfo *info)
-{
-	/* Put an "end of page" marker if there was still some space left
-	 * The "end of page" marker is one byte long, so we're sure there's at
-	 * least space for the marker */
-	if (TRACKER_STEROIDS_BUFFER_SIZE != info->send_buffer_index) {
-		memset (info->send_buffer + info->send_buffer_index,
-		        TRACKER_STEROIDS_EOP,
-		        sizeof (char));
-	}
-
-	buffer_send (info->fd, info->send_buffer, TRACKER_STEROIDS_BUFFER_SIZE);
-
-	info->send_buffer_index = 0;
-}
-
-static void
 query_callback (gpointer  inthread_data,
                 GError   *error,
                 gpointer  user_data)
@@ -196,10 +115,6 @@ query_callback (gpointer  inthread_data,
 	ClientInfo  *info = user_data;
 	DBusMessage *reply;
 
-	if (info->fd) {
-		close (info->fd);
-	}
-
 	if (ptr && ptr->error) {
 		/* Client is still there, but query failed */
 		tracker_dbus_request_failed (info->request_id,
@@ -242,10 +157,6 @@ update_callback (GError *error, gpointer user_data)
 	ClientInfo *info = user_data;
 	DBusMessage *reply;
 
-	if (info->fd) {
-		close (info->fd);
-	}
-
 	if (error) {
 		tracker_dbus_request_failed (info->request_id,
 		                             NULL,
@@ -348,14 +259,24 @@ query_inthread (TrackerDBCursor *cursor,
 	InThreadPtr *ptr  = g_slice_new0 (InThreadPtr);
 	ClientInfo  *info = user_data;
 	GError *loop_error = NULL;
+	GOutputStream *output_stream;
+	GDataOutputStream *data_output_stream;
 	guint n_columns;
 	int *column_sizes;
 	int *column_offsets;
 	const gchar **column_data;
 
+	output_stream = g_buffered_output_stream_new_sized (g_unix_output_stream_new (info->fd, TRUE),
+	                                                    TRACKER_STEROIDS_BUFFER_SIZE);
+	data_output_stream = g_data_output_stream_new (output_stream);
+
 	if (error) {
-		client_write_int (info, TRACKER_STEROIDS_RC_ERROR);
-		page_flush (info);
+		g_data_output_stream_put_int32 (data_output_stream,
+		                                TRACKER_STEROIDS_RC_ERROR,
+		                                NULL,
+		                                NULL);
+		g_object_unref (data_output_stream);
+		g_object_unref (output_stream);
 		ptr->error = g_error_copy (error);
 		return ptr;
 	}
@@ -369,10 +290,9 @@ query_inthread (TrackerDBCursor *cursor,
 	while (tracker_db_cursor_iter_next (cursor, &loop_error)) {
 		int i;
 		guint last_offset = -1;
-		guint row_size;
 
 		if (loop_error != NULL) {
-			break;
+			goto end_query_inthread;
 		}
 
 		for (i = 0; i < n_columns ; i++) {
@@ -387,76 +307,57 @@ query_inthread (TrackerDBCursor *cursor,
 			column_offsets[i] = last_offset;
 		}
 
-		row_size = sizeof (int)                   /* return code */
-		         + sizeof (int)                   /* column count */
-		         + n_columns * sizeof (int)    /* column offsets */
-		         + last_offset                    /* columns data */
-		         + n_columns * sizeof (char);  /* trailing \0 for each column */
-
-		if (row_size > TRACKER_STEROIDS_BUFFER_SIZE) {
-			char *buffer = g_malloc (sizeof (char) * row_size + sizeof (int));
-			char *dst = buffer;
-
-			/* Flush the current page, if there was something in it */
-			if (info->send_buffer_index) {
-				page_flush (info);
-			}
+		g_data_output_stream_put_int32 (data_output_stream,
+		                                TRACKER_STEROIDS_RC_ROW,
+		                                NULL,
+		                                &loop_error);
 
-			buffer_write_int (dst, TRACKER_STEROIDS_RC_LARGEROW);
-			dst += sizeof (int);
-			buffer_write_int (dst, row_size);
-			dst += sizeof (int);
-			buffer_write_int (dst, n_columns);
-			dst += sizeof (int);
-			memcpy (dst,
-			        column_offsets,
-			        n_columns * sizeof (int));
-			dst += n_columns * sizeof (int);
-
-			for (i = 0; i < n_columns; i++) {
-				memcpy (dst,
-				        column_data[i],
-				        column_sizes[i]);
-				dst += column_sizes[i];
-				memset (dst, 0, 1);
-				dst ++;
-			}
+		if (loop_error) {
+			goto end_query_inthread;
+		}
 
-			buffer_send (info->fd, buffer, row_size + 2 * sizeof (int));
+		g_data_output_stream_put_int32 (data_output_stream,
+		                                n_columns,
+		                                NULL,
+		                                &loop_error);
 
-			g_free (buffer);
-			continue;
+		if (loop_error) {
+			goto end_query_inthread;
 		}
 
-		if (row_size > TRACKER_STEROIDS_BUFFER_SIZE - info->send_buffer_index) {
-			page_flush (info);
+		for (i = 0; i < n_columns; i++) {
+			g_data_output_stream_put_int32 (data_output_stream,
+			                                column_offsets[i],
+			                                NULL,
+			                                &loop_error);
+			if (loop_error) {
+				goto end_query_inthread;
+			}
 		}
 
-		client_write_int (info, TRACKER_STEROIDS_RC_ROW);
-		client_write_int (info, n_columns);
-		memcpy (info->send_buffer + info->send_buffer_index,
-		        column_offsets,
-		        n_columns * sizeof (int));
-		info->send_buffer_index += n_columns * sizeof (int);
-
 		for (i = 0; i < n_columns; i++) {
-			memcpy (info->send_buffer + info->send_buffer_index,
-			        column_data[i],
-			        column_sizes[i]);
-			info->send_buffer_index += column_sizes[i];
-			memset (info->send_buffer + info->send_buffer_index, 0, 1);
-			info->send_buffer_index ++;
+			g_data_output_stream_put_string (data_output_stream,
+			                                 column_data[i] ? column_data[i] : "",
+			                                 NULL,
+			                                 &loop_error);
+
+			if (loop_error) {
+				goto end_query_inthread;
+			}
 		}
 	}
+end_query_inthread:
 
-	/* We flush to ensure we'll have enough space to write the DONE code */
-	if (info->send_buffer_index) {
-		page_flush (info);
+	if (!loop_error) {
+		g_data_output_stream_put_int32 (data_output_stream,
+		                                TRACKER_STEROIDS_RC_DONE,
+		                                NULL,
+		                                &loop_error);
 	}
-	client_write_int (info, TRACKER_STEROIDS_RC_DONE);
-	page_flush (info);
 
-	close (info->fd);
+	/* Will force flushing */
+	g_object_unref (data_output_stream);
+	g_object_unref (output_stream);
 
 	if (loop_error) {
 		ptr->error = loop_error;
@@ -488,7 +389,7 @@ tracker_steroids_query (TrackerSteroids *steroids,
 		                                       "Query",
 		                                       dbus_message_get_signature (message),
 		                                       dbus_message_get_interface (message));
-		dbus_connection_send (connection, message, NULL);
+		dbus_connection_send (connection, reply, NULL);
 		dbus_message_unref (reply);
 		return;
 	}
@@ -530,9 +431,12 @@ tracker_steroids_update (TrackerSteroids *steroids,
 {
 	DBusError               dbus_error;
 	ClientInfo             *info;
+	GInputStream           *input_stream;
+	GDataInputStream       *data_input_stream;
+	GError                 *error = NULL;
+	gsize                   bytes_read;
 	guint                   request_id;
 	const gchar            *sender;
-	static char             query_size_buffer[sizeof(int)];
 	int                     query_size;
 	DBusMessage            *reply;
 	gchar                  *query;
@@ -544,7 +448,7 @@ tracker_steroids_update (TrackerSteroids *steroids,
 		                                       "Update",
 		                                       dbus_message_get_signature (message),
 		                                       dbus_message_get_interface (message));
-		dbus_connection_send (connection, message, NULL);
+		dbus_connection_send (connection, reply, NULL);
 		dbus_message_unref (reply);
 		return;
 	}
@@ -572,16 +476,57 @@ tracker_steroids_update (TrackerSteroids *steroids,
 
 	sender = dbus_message_get_sender (message);
 
-	buffer_read (info->fd, query_size_buffer, sizeof (int));
-	query_size = buffer_read_int (query_size_buffer);
+	input_stream = g_buffered_input_stream_new_sized (g_unix_input_stream_new (info->fd, TRUE),
+	                                                  TRACKER_STEROIDS_BUFFER_SIZE);
+	data_input_stream = g_data_input_stream_new (input_stream);
+
+	query_size = g_data_input_stream_read_int32 (data_input_stream,
+	                                             NULL,
+	                                             &error);
+
+	if (error) {
+		reply = dbus_message_new_error (info->call_message,
+		                                TRACKER_STEROIDS_INTERFACE ".UpdateError",
+		                                error->message);
+		dbus_connection_send (connection, reply, NULL);
+		dbus_message_unref (reply);
+
+		g_object_unref (data_input_stream);
+		g_object_unref (input_stream);
+		g_error_free (error);
+		destroy_client_info (info);
+
+		return;
+	}
 
 	/* We malloc one more char to ensure string is 0 terminated */
 	query = g_malloc0 ((1 + query_size) * sizeof (char));
 
-	buffer_read (info->fd, query, query_size);
+	g_input_stream_read_all (input_stream,
+	                         query,
+	                         query_size,
+	                         &bytes_read,
+	                         NULL,
+	                         &error);
+
+	if (error) {
+		reply = dbus_message_new_error (info->call_message,
+		                                TRACKER_STEROIDS_INTERFACE ".UpdateError",
+		                                error->message);
+		dbus_connection_send (connection, reply, NULL);
+		dbus_message_unref (reply);
+
+		g_free (query);
+		g_object_unref (data_input_stream);
+		g_object_unref (input_stream);
+		g_error_free (error);
+		destroy_client_info (info);
+
+		return;
+	}
 
-	close (info->fd);
-	info->fd = 0;
+	g_object_unref (data_input_stream);
+	g_object_unref (input_stream);
 
 	if (update_blank) {
 		tracker_store_sparql_update_blank (query, TRACKER_STORE_PRIORITY_HIGH,



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]