[tracker/multi-insert] tracker-store: Multi-insert for steroids, server side
- From: Philip Van Hoof <pvanhoof src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [tracker/multi-insert] tracker-store: Multi-insert for steroids, server side
- Date: Tue, 5 Oct 2010 12:48:31 +0000 (UTC)
commit ce8873b37ee5b001a97f00df075c3416b154c91d
Author: Philip Van Hoof <philip codeminded be>
Date: Tue Oct 5 14:48:00 2010 +0200
tracker-store: Multi-insert for steroids, server side
src/tracker-store/tracker-steroids.c | 254 ++++++++++++++++++++++++++++++++++
1 files changed, 254 insertions(+), 0 deletions(-)
---
diff --git a/src/tracker-store/tracker-steroids.c b/src/tracker-store/tracker-steroids.c
index 59de111..54dadee 100644
--- a/src/tracker-store/tracker-steroids.c
+++ b/src/tracker-store/tracker-steroids.c
@@ -51,6 +51,11 @@ typedef struct {
int fd;
guint request_id;
DBusConnection *connection;
+ struct {
+ int query_count;
+ int seen;
+ GPtrArray *errors;
+ } array_info;
} ClientInfo;
typedef struct {
@@ -174,6 +179,59 @@ update_callback (GError *error, gpointer user_data)
}
static void
+update_array_callback (GError *error, gpointer user_data)
+{
+ ClientInfo *info = user_data;
+ DBusMessage *reply;
+
+ info->array_info.seen++;
+
+ if (error) {
+ if (!info->array_info.errors)
+ info->array_info.errors = g_ptr_array_new ();
+ g_ptr_array_add (info->array_info.errors, g_error_copy (error));
+ } else {
+ g_ptr_array_add (info->array_info.errors, NULL);
+ }
+
+ if (info->array_info.seen == info->array_info.query_count) {
+ guint i;
+ DBusMessageIter iter, subiter;
+
+ tracker_dbus_request_success (info->request_id, NULL);
+ reply = dbus_message_new_method_return (info->call_message);
+
+ dbus_message_iter_init_append (reply, &iter);
+ dbus_message_iter_open_container (&iter, DBUS_TYPE_ARRAY, "ss", &subiter);
+
+ for (i = 0; info->array_info.errors->len; i++) {
+ GError *error = g_ptr_array_index (info->array_info.errors, i);
+ const gchar *str = "";
+ const gchar *message = "";
+
+ if (error) {
+ str = TRACKER_STEROIDS_INTERFACE ".UpdateError";
+ message = error->message;
+ }
+
+ dbus_message_iter_append_basic (&subiter, DBUS_TYPE_STRING, &str);
+ dbus_message_iter_append_basic (&subiter, DBUS_TYPE_STRING, &message);
+
+ if (error)
+ g_error_free (error);
+ }
+ g_ptr_array_free (info->array_info.errors, TRUE);
+
+ dbus_message_iter_close_container (&iter, &subiter);
+
+ dbus_connection_send (info->connection, reply, NULL);
+ dbus_message_unref (reply);
+
+ client_info_destroy (info);
+ }
+}
+
+static void
marshal_hash_table_item (gpointer key,
gpointer value,
gpointer user_data)
@@ -669,6 +727,192 @@ steroids_update (TrackerSteroids *steroids,
g_free (query);
}
+
+static void
+steroids_update_array (TrackerSteroids *steroids,
+ DBusConnection *connection,
+ DBusMessage *message,
+ gboolean batch)
+{
+ DBusError dbus_error;
+ ClientInfo *info;
+ GInputStream *input_stream;
+ GDataInputStream *data_input_stream;
+ GError *error = NULL;
+ guint request_id;
+ const gchar *sender;
+ int i;
+ DBusMessage *reply;
+ int fd;
+ gchar **query_array;
+
+ request_id = tracker_dbus_get_next_request_id ();
+
+ if (g_strcmp0 (dbus_message_get_signature (message), DBUS_TYPE_UNIX_FD_AS_STRING)) {
+ tracker_dbus_request_new (request_id,
+ NULL,
+ "%s()",
+ __FUNCTION__);
+
+ reply = dbus_message_new_error_printf (message,
+ DBUS_ERROR_UNKNOWN_METHOD,
+ UNKNOWN_METHOD_MESSAGE,
+ "Update",
+ dbus_message_get_signature (message),
+ dbus_message_get_interface (message),
+ DBUS_TYPE_UNIX_FD_AS_STRING);
+ dbus_connection_send (connection, reply, NULL);
+ dbus_message_unref (reply);
+
+ tracker_dbus_request_failed (request_id,
+ NULL,
+ NULL,
+ UNKNOWN_METHOD_MESSAGE,
+ "Update",
+ dbus_message_get_signature (message),
+ dbus_message_get_interface (message),
+ DBUS_TYPE_UNIX_FD_AS_STRING);
+
+ return;
+ }
+
+ dbus_error_init (&dbus_error);
+
+ dbus_message_get_args (message,
+ &dbus_error,
+ DBUS_TYPE_UNIX_FD, &fd,
+ DBUS_TYPE_INVALID);
+
+ if (dbus_error_is_set (&dbus_error)) {
+ tracker_dbus_request_new (request_id,
+ NULL,
+ "%s()",
+ __FUNCTION__);
+
+ reply = dbus_message_new_error (message, dbus_error.name, dbus_error.message);
+ dbus_connection_send (connection, reply, NULL);
+
+ tracker_dbus_request_failed (request_id,
+ NULL,
+ NULL,
+ dbus_error.message);
+
+ dbus_message_unref (reply);
+ dbus_error_free (&dbus_error);
+
+ return;
+ }
+
+ tracker_dbus_request_new (request_id,
+ NULL,
+ "%s(fd:%d)",
+ __FUNCTION__,
+ fd);
+
+ info = g_slice_new (ClientInfo);
+ info->connection = dbus_connection_ref (connection);
+ info->call_message = dbus_message_ref (message);
+ info->request_id = request_id;
+ info->fd = fd;
+
+ sender = dbus_message_get_sender (message);
+
+ input_stream = g_unix_input_stream_new (info->fd, TRUE);
+ data_input_stream = g_data_input_stream_new (input_stream);
+ g_buffered_input_stream_set_buffer_size (G_BUFFERED_INPUT_STREAM (data_input_stream),
+ TRACKER_STEROIDS_BUFFER_SIZE);
+
+ info->array_info.query_count = g_data_input_stream_read_int32 (data_input_stream,
+ NULL,
+ &error);
+
+ info->array_info.seen = 0;
+ query_array = g_new0 (gchar*, info->array_info.query_count + 1);
+
+ for (i = 0; i < info->array_info.query_count; i++) {
+ gsize bytes_read;
+ int query_size;
+
+ query_size = g_data_input_stream_read_int32 (data_input_stream,
+ NULL,
+ &error);
+
+ if (error) {
+ reply = dbus_message_new_error (info->call_message,
+ TRACKER_STEROIDS_INTERFACE ".UpdateError",
+ error->message);
+ dbus_connection_send (connection, reply, NULL);
+ dbus_message_unref (reply);
+
+ tracker_dbus_request_failed (request_id,
+ NULL,
+ NULL,
+ error->message);
+
+ g_strfreev (query_array);
+ g_object_unref (data_input_stream);
+ g_object_unref (input_stream);
+ g_error_free (error);
+ client_info_destroy (info);
+
+ return;
+ }
+
+ /* We malloc one more char to ensure string is 0 terminated */
+ query_array[i] = g_malloc0 ((1 + query_size) * sizeof (char));
+
+ g_input_stream_read_all (input_stream,
+ query_array[i],
+ query_size,
+ &bytes_read,
+ NULL,
+ &error);
+
+ if (error) {
+ reply = dbus_message_new_error (info->call_message,
+ TRACKER_STEROIDS_INTERFACE ".UpdateError",
+ error->message);
+ dbus_connection_send (connection, reply, NULL);
+ dbus_message_unref (reply);
+
+ tracker_dbus_request_failed (request_id,
+ NULL,
+ NULL,
+ error->message);
+
+ g_strfreev (query_array);
+ g_object_unref (data_input_stream);
+ g_object_unref (input_stream);
+ g_error_free (error);
+ client_info_destroy (info);
+
+ return;
+ }
+
+ g_object_unref (data_input_stream);
+ g_object_unref (input_stream);
+
+ info->array_info.query_count++;
+ }
+
+ for (i = 0; query_array[i] != NULL; i++) {
+
+ tracker_dbus_request_debug (request_id,
+ NULL,
+ "query: '%s'",
+ query_array[i]);
+
+ tracker_store_sparql_update (query_array[i],
+ batch ? TRACKER_STORE_PRIORITY_LOW : TRACKER_STORE_PRIORITY_HIGH,
+ update_array_callback,
+ sender,
+ info,
+ NULL);
+ }
+
+ g_strfreev (query_array);
+}
+
DBusHandlerResult
tracker_steroids_connection_filter (DBusConnection *connection,
DBusMessage *message,
@@ -697,6 +941,11 @@ tracker_steroids_connection_filter (DBusConnection *connection,
return DBUS_HANDLER_RESULT_HANDLED;
}
+ if (!g_strcmp0 ("UpdateArray", dbus_message_get_member (message))) {
+ steroids_update_array (steroids, connection, message, FALSE);
+ return DBUS_HANDLER_RESULT_HANDLED;
+ }
+
if (!g_strcmp0 ("Update", dbus_message_get_member (message))) {
steroids_update (steroids, connection, message, FALSE, FALSE);
return DBUS_HANDLER_RESULT_HANDLED;
@@ -712,5 +961,10 @@ tracker_steroids_connection_filter (DBusConnection *connection,
return DBUS_HANDLER_RESULT_HANDLED;
}
+ if (!g_strcmp0 ("BatchUpdateArray", dbus_message_get_member (message))) {
+ steroids_update_array (steroids, connection, message, TRUE);
+ return DBUS_HANDLER_RESULT_HANDLED;
+ }
+
return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
}
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]