[tracker/dbus-fd-experiment-gio: 1/6] Steroids server: port to GIO
- From: Adrien Bustany <abustany src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [tracker/dbus-fd-experiment-gio: 1/6] Steroids server: port to GIO
- Date: Thu, 10 Jun 2010 16:37:36 +0000 (UTC)
commit 46f7e691e9ab60fe19bd2188f69cf4bb49413f82
Author: Adrien Bustany <abustany gnome org>
Date: Tue Jun 8 09:49:22 2010 -0400
Steroids server: port to GIO
src/tracker-store/tracker-steroids.c | 265 ++++++++++++++--------------------
1 files changed, 105 insertions(+), 160 deletions(-)
---
diff --git a/src/tracker-store/tracker-steroids.c b/src/tracker-store/tracker-steroids.c
index 2587a40..57f5d09 100644
--- a/src/tracker-store/tracker-steroids.c
+++ b/src/tracker-store/tracker-steroids.c
@@ -20,6 +20,8 @@
#include <errno.h>
#include <sys/types.h>
#include <unistd.h>
+#include <gio/gunixoutputstream.h>
+#include <gio/gunixinputstream.h>
#include "tracker-dbus.h"
#include "tracker-steroids.h"
@@ -105,89 +107,6 @@ destroy_client_info (gpointer user_data)
}
static void
-buffer_write_int (char *dst, int value)
-{
- memset (dst++, (value ) & 0xff, sizeof (char));
- memset (dst++, (value >> 8) & 0xff, sizeof (char));
- memset (dst++, (value >> 16) & 0xff, sizeof (char));
- memset (dst++, (value >> 24) & 0xff, sizeof (char));
-}
-
-static int
-buffer_read_int (char *src)
-{
- int result = 0;
-
- result += (((unsigned char)*(src++)));
- result += (((unsigned char)*(src++)) << 8);
- result += (((unsigned char)*(src++)) << 16);
- result += (((unsigned char)*(src++)) << 24);
-
- return result;
-}
-
-static void
-client_write_int (ClientInfo *info, int value)
-{
- char *dst = info->send_buffer + info->send_buffer_index;
- buffer_write_int (dst, value);
- info->send_buffer_index += sizeof (int);
-}
-
-static void
-buffer_send (int fd, char *buf, int size)
-{
- ssize_t sent = 0;
-
- while (sent != size) {
- ssize_t ret = write (fd,
- buf + sent,
- size - sent);
-
- if (ret == -1) {
- g_critical ("Could not send buffer (error code %d)", errno);
- }
-
- sent += ret;
- }
-}
-
-static void
-buffer_read (int fd, char *buf, int size)
-{
- ssize_t received = 0;
-
- while (received != size) {
- ssize_t ret = read (fd,
- buf + received,
- size - received);
-
- if (ret == -1) {
- g_critical ("Could not read buffer (error code %d)", errno);
- }
-
- received += ret;
- }
-}
-
-static void
-page_flush (ClientInfo *info)
-{
- /* Put an "end of page" marker if there was still some space left
- * The "end of page" marker is one byte long, so we're sure there's at
- * least space for the marker */
- if (TRACKER_STEROIDS_BUFFER_SIZE != info->send_buffer_index) {
- memset (info->send_buffer + info->send_buffer_index,
- TRACKER_STEROIDS_EOP,
- sizeof (char));
- }
-
- buffer_send (info->fd, info->send_buffer, TRACKER_STEROIDS_BUFFER_SIZE);
-
- info->send_buffer_index = 0;
-}
-
-static void
query_callback (gpointer inthread_data,
GError *error,
gpointer user_data)
@@ -196,10 +115,6 @@ query_callback (gpointer inthread_data,
ClientInfo *info = user_data;
DBusMessage *reply;
- if (info->fd) {
- close (info->fd);
- }
-
if (ptr && ptr->error) {
/* Client is still there, but query failed */
tracker_dbus_request_failed (info->request_id,
@@ -242,10 +157,6 @@ update_callback (GError *error, gpointer user_data)
ClientInfo *info = user_data;
DBusMessage *reply;
- if (info->fd) {
- close (info->fd);
- }
-
if (error) {
tracker_dbus_request_failed (info->request_id,
NULL,
@@ -348,14 +259,24 @@ query_inthread (TrackerDBCursor *cursor,
InThreadPtr *ptr = g_slice_new0 (InThreadPtr);
ClientInfo *info = user_data;
GError *loop_error = NULL;
+ GOutputStream *output_stream;
+ GDataOutputStream *data_output_stream;
guint n_columns;
int *column_sizes;
int *column_offsets;
const gchar **column_data;
+ output_stream = g_buffered_output_stream_new_sized (g_unix_output_stream_new (info->fd, TRUE),
+ TRACKER_STEROIDS_BUFFER_SIZE);
+ data_output_stream = g_data_output_stream_new (output_stream);
+
if (error) {
- client_write_int (info, TRACKER_STEROIDS_RC_ERROR);
- page_flush (info);
+ g_data_output_stream_put_int32 (data_output_stream,
+ TRACKER_STEROIDS_RC_ERROR,
+ NULL,
+ NULL);
+ g_object_unref (data_output_stream);
+ g_object_unref (output_stream);
ptr->error = g_error_copy (error);
return ptr;
}
@@ -369,10 +290,9 @@ query_inthread (TrackerDBCursor *cursor,
while (tracker_db_cursor_iter_next (cursor, &loop_error)) {
int i;
guint last_offset = -1;
- guint row_size;
if (loop_error != NULL) {
- break;
+ goto end_query_inthread;
}
for (i = 0; i < n_columns ; i++) {
@@ -387,76 +307,57 @@ query_inthread (TrackerDBCursor *cursor,
column_offsets[i] = last_offset;
}
- row_size = sizeof (int) /* return code */
- + sizeof (int) /* column count */
- + n_columns * sizeof (int) /* column offsets */
- + last_offset /* columns data */
- + n_columns * sizeof (char); /* trailing \0 for each column */
-
- if (row_size > TRACKER_STEROIDS_BUFFER_SIZE) {
- char *buffer = g_malloc (sizeof (char) * row_size + sizeof (int));
- char *dst = buffer;
-
- /* Flush the current page, if there was something in it */
- if (info->send_buffer_index) {
- page_flush (info);
- }
+ g_data_output_stream_put_int32 (data_output_stream,
+ TRACKER_STEROIDS_RC_ROW,
+ NULL,
+ &loop_error);
- buffer_write_int (dst, TRACKER_STEROIDS_RC_LARGEROW);
- dst += sizeof (int);
- buffer_write_int (dst, row_size);
- dst += sizeof (int);
- buffer_write_int (dst, n_columns);
- dst += sizeof (int);
- memcpy (dst,
- column_offsets,
- n_columns * sizeof (int));
- dst += n_columns * sizeof (int);
-
- for (i = 0; i < n_columns; i++) {
- memcpy (dst,
- column_data[i],
- column_sizes[i]);
- dst += column_sizes[i];
- memset (dst, 0, 1);
- dst ++;
- }
+ if (loop_error) {
+ goto end_query_inthread;
+ }
- buffer_send (info->fd, buffer, row_size + 2 * sizeof (int));
+ g_data_output_stream_put_int32 (data_output_stream,
+ n_columns,
+ NULL,
+ &loop_error);
- g_free (buffer);
- continue;
+ if (loop_error) {
+ goto end_query_inthread;
}
- if (row_size > TRACKER_STEROIDS_BUFFER_SIZE - info->send_buffer_index) {
- page_flush (info);
+ for (i = 0; i < n_columns; i++) {
+ g_data_output_stream_put_int32 (data_output_stream,
+ column_offsets[i],
+ NULL,
+ &loop_error);
+ if (loop_error) {
+ goto end_query_inthread;
+ }
}
- client_write_int (info, TRACKER_STEROIDS_RC_ROW);
- client_write_int (info, n_columns);
- memcpy (info->send_buffer + info->send_buffer_index,
- column_offsets,
- n_columns * sizeof (int));
- info->send_buffer_index += n_columns * sizeof (int);
-
for (i = 0; i < n_columns; i++) {
- memcpy (info->send_buffer + info->send_buffer_index,
- column_data[i],
- column_sizes[i]);
- info->send_buffer_index += column_sizes[i];
- memset (info->send_buffer + info->send_buffer_index, 0, 1);
- info->send_buffer_index ++;
+ g_data_output_stream_put_string (data_output_stream,
+ column_data[i] ? column_data[i] : "",
+ NULL,
+ &loop_error);
+
+ if (loop_error) {
+ goto end_query_inthread;
+ }
}
}
+end_query_inthread:
- /* We flush to ensure we'll have enough space to write the DONE code */
- if (info->send_buffer_index) {
- page_flush (info);
+ if (!loop_error) {
+ g_data_output_stream_put_int32 (data_output_stream,
+ TRACKER_STEROIDS_RC_DONE,
+ NULL,
+ &loop_error);
}
- client_write_int (info, TRACKER_STEROIDS_RC_DONE);
- page_flush (info);
- close (info->fd);
+ /* Will force flushing */
+ g_object_unref (data_output_stream);
+ g_object_unref (output_stream);
if (loop_error) {
ptr->error = loop_error;
@@ -488,7 +389,7 @@ tracker_steroids_query (TrackerSteroids *steroids,
"Query",
dbus_message_get_signature (message),
dbus_message_get_interface (message));
- dbus_connection_send (connection, message, NULL);
+ dbus_connection_send (connection, reply, NULL);
dbus_message_unref (reply);
return;
}
@@ -530,9 +431,12 @@ tracker_steroids_update (TrackerSteroids *steroids,
{
DBusError dbus_error;
ClientInfo *info;
+ GInputStream *input_stream;
+ GDataInputStream *data_input_stream;
+ GError *error = NULL;
+ gsize bytes_read;
guint request_id;
const gchar *sender;
- static char query_size_buffer[sizeof(int)];
int query_size;
DBusMessage *reply;
gchar *query;
@@ -544,7 +448,7 @@ tracker_steroids_update (TrackerSteroids *steroids,
"Update",
dbus_message_get_signature (message),
dbus_message_get_interface (message));
- dbus_connection_send (connection, message, NULL);
+ dbus_connection_send (connection, reply, NULL);
dbus_message_unref (reply);
return;
}
@@ -572,16 +476,57 @@ tracker_steroids_update (TrackerSteroids *steroids,
sender = dbus_message_get_sender (message);
- buffer_read (info->fd, query_size_buffer, sizeof (int));
- query_size = buffer_read_int (query_size_buffer);
+ input_stream = g_buffered_input_stream_new_sized (g_unix_input_stream_new (info->fd, TRUE),
+ TRACKER_STEROIDS_BUFFER_SIZE);
+ data_input_stream = g_data_input_stream_new (input_stream);
+
+ query_size = g_data_input_stream_read_int32 (data_input_stream,
+ NULL,
+ &error);
+
+ if (error) {
+ reply = dbus_message_new_error (info->call_message,
+ TRACKER_STEROIDS_INTERFACE ".UpdateError",
+ error->message);
+ dbus_connection_send (connection, reply, NULL);
+ dbus_message_unref (reply);
+
+ g_object_unref (data_input_stream);
+ g_object_unref (input_stream);
+ g_error_free (error);
+ destroy_client_info (info);
+
+ return;
+ }
/* We malloc one more char to ensure string is 0 terminated */
query = g_malloc0 ((1 + query_size) * sizeof (char));
- buffer_read (info->fd, query, query_size);
+ g_input_stream_read_all (input_stream,
+ query,
+ query_size,
+ &bytes_read,
+ NULL,
+ &error);
+
+ if (error) {
+ reply = dbus_message_new_error (info->call_message,
+ TRACKER_STEROIDS_INTERFACE ".UpdateError",
+ error->message);
+ dbus_connection_send (connection, reply, NULL);
+ dbus_message_unref (reply);
+
+ g_free (query);
+ g_object_unref (data_input_stream);
+ g_object_unref (input_stream);
+ g_error_free (error);
+ destroy_client_info (info);
+
+ return;
+ }
- close (info->fd);
- info->fd = 0;
+ g_object_unref (data_input_stream);
+ g_object_unref (input_stream);
if (update_blank) {
tracker_store_sparql_update_blank (query, TRACKER_STORE_PRIORITY_HIGH,
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]