[tracker/multi-insert] tracker-store: Multi-insert for steroids, server side



commit ce8873b37ee5b001a97f00df075c3416b154c91d
Author: Philip Van Hoof <philip codeminded be>
Date:   Tue Oct 5 14:48:00 2010 +0200

    tracker-store: Multi-insert for steroids, server side

 src/tracker-store/tracker-steroids.c |  254 ++++++++++++++++++++++++++++++++++
 1 files changed, 254 insertions(+), 0 deletions(-)
---
diff --git a/src/tracker-store/tracker-steroids.c b/src/tracker-store/tracker-steroids.c
index 59de111..54dadee 100644
--- a/src/tracker-store/tracker-steroids.c
+++ b/src/tracker-store/tracker-steroids.c
@@ -51,6 +51,11 @@ typedef struct {
 	int fd;
 	guint request_id;
 	DBusConnection *connection;
+	struct {
+		int query_count;
+		int seen;
+		GPtrArray *errors;
+	} array_info;
 } ClientInfo;
 
 typedef struct {
@@ -174,6 +179,59 @@ update_callback (GError *error, gpointer user_data)
 }
 
 static void
+update_array_callback (GError *error, gpointer user_data)
+{
+	ClientInfo *info = user_data;
+	DBusMessage *reply;
+
+	info->array_info.seen++;
+
+	if (error) {
+		if (!info->array_info.errors)
+			info->array_info.errors = g_ptr_array_new ();
+		g_ptr_array_add (info->array_info.errors, g_error_copy (error));
+	} else {
+		g_ptr_array_add (info->array_info.errors, NULL);
+	}
+
+	if (info->array_info.seen == info->array_info.query_count) {
+		guint i;
+		DBusMessageIter iter, subiter;
+
+		tracker_dbus_request_success (info->request_id, NULL);
+		reply = dbus_message_new_method_return (info->call_message);
+
+		dbus_message_iter_init_append (reply, &iter);
+		dbus_message_iter_open_container (&iter, DBUS_TYPE_ARRAY, "ss", &subiter);
+
+		for (i = 0; info->array_info.errors->len; i++) {
+			GError *error = g_ptr_array_index (info->array_info.errors, i);
+			const gchar *str = "";
+			const gchar *message = "";
+
+			if (error) {
+				str = TRACKER_STEROIDS_INTERFACE ".UpdateError";
+				message = error->message;
+			}
+
+			dbus_message_iter_append_basic (&subiter, DBUS_TYPE_STRING, &str);
+			dbus_message_iter_append_basic (&subiter, DBUS_TYPE_STRING, &message);
+
+			if (error)
+				g_error_free (error);
+		}
+		g_ptr_array_free (info->array_info.errors, TRUE);
+
+		dbus_message_iter_close_container (&iter, &subiter);
+
+		dbus_connection_send (info->connection, reply, NULL);
+		dbus_message_unref (reply);
+
+		client_info_destroy (info);
+	}
+}
+
+static void
 marshal_hash_table_item (gpointer key,
                          gpointer value,
                          gpointer user_data)
@@ -669,6 +727,192 @@ steroids_update (TrackerSteroids *steroids,
 	g_free (query);
 }
 
+
+static void
+steroids_update_array (TrackerSteroids *steroids,
+                       DBusConnection  *connection,
+                       DBusMessage     *message,
+                       gboolean         batch)
+{
+	DBusError dbus_error;
+	ClientInfo *info;
+	GInputStream *input_stream;
+	GDataInputStream *data_input_stream;
+	GError *error = NULL;
+	guint request_id;
+	const gchar *sender;
+	int i;
+	DBusMessage *reply;
+	int fd;
+	gchar **query_array;
+
+	request_id = tracker_dbus_get_next_request_id ();
+
+	if (g_strcmp0 (dbus_message_get_signature (message), DBUS_TYPE_UNIX_FD_AS_STRING)) {
+		tracker_dbus_request_new (request_id,
+		                          NULL,
+		                          "%s()",
+		                          __FUNCTION__);
+
+		reply = dbus_message_new_error_printf (message,
+		                                       DBUS_ERROR_UNKNOWN_METHOD,
+		                                       UNKNOWN_METHOD_MESSAGE,
+		                                       "Update",
+		                                       dbus_message_get_signature (message),
+		                                       dbus_message_get_interface (message),
+		                                       DBUS_TYPE_UNIX_FD_AS_STRING);
+		dbus_connection_send (connection, reply, NULL);
+		dbus_message_unref (reply);
+
+		tracker_dbus_request_failed (request_id,
+		                             NULL,
+		                             NULL,
+		                             UNKNOWN_METHOD_MESSAGE,
+		                             "Update",
+		                             dbus_message_get_signature (message),
+		                             dbus_message_get_interface (message),
+		                             DBUS_TYPE_UNIX_FD_AS_STRING);
+
+		return;
+	}
+
+	dbus_error_init (&dbus_error);
+
+	dbus_message_get_args (message,
+	                       &dbus_error,
+	                       DBUS_TYPE_UNIX_FD, &fd,
+	                       DBUS_TYPE_INVALID);
+
+	if (dbus_error_is_set (&dbus_error)) {
+		tracker_dbus_request_new (request_id,
+		                          NULL,
+		                          "%s()",
+		                          __FUNCTION__);
+
+		reply = dbus_message_new_error (message, dbus_error.name, dbus_error.message);
+		dbus_connection_send (connection, reply, NULL);
+
+		tracker_dbus_request_failed (request_id,
+		                             NULL,
+		                             NULL,
+		                             dbus_error.message);
+
+		dbus_message_unref (reply);
+		dbus_error_free (&dbus_error);
+
+		return;
+	}
+
+	tracker_dbus_request_new (request_id,
+	                          NULL,
+	                          "%s(fd:%d)",
+	                          __FUNCTION__,
+	                          fd);
+
+	info = g_slice_new (ClientInfo);
+	info->connection = dbus_connection_ref (connection);
+	info->call_message = dbus_message_ref (message);
+	info->request_id = request_id;
+	info->fd = fd;
+
+	sender = dbus_message_get_sender (message);
+
+	input_stream = g_unix_input_stream_new (info->fd, TRUE);
+	data_input_stream = g_data_input_stream_new (input_stream);
+	g_buffered_input_stream_set_buffer_size (G_BUFFERED_INPUT_STREAM (data_input_stream),
+	                                         TRACKER_STEROIDS_BUFFER_SIZE);
+
+	info->array_info.query_count = g_data_input_stream_read_int32 (data_input_stream,
+	                                                               NULL,
+	                                                               &error);
+
+	info->array_info.seen = 0;
+	query_array = g_new0 (gchar*, info->array_info.query_count + 1);
+
+	for (i = 0; i < info->array_info.query_count; i++) {
+		gsize bytes_read;
+		int query_size;
+
+		query_size = g_data_input_stream_read_int32 (data_input_stream,
+		                                             NULL,
+		                                             &error);
+
+		if (error) {
+			reply = dbus_message_new_error (info->call_message,
+			                                TRACKER_STEROIDS_INTERFACE ".UpdateError",
+			                                error->message);
+			dbus_connection_send (connection, reply, NULL);
+			dbus_message_unref (reply);
+
+			tracker_dbus_request_failed (request_id,
+			                             NULL,
+			                             NULL,
+			                             error->message);
+
+			g_strfreev (query_array);
+			g_object_unref (data_input_stream);
+			g_object_unref (input_stream);
+			g_error_free (error);
+			client_info_destroy (info);
+
+			return;
+		}
+
+		/* We malloc one more char to ensure string is 0 terminated */
+		query_array[i] = g_malloc0 ((1 + query_size) * sizeof (char));
+
+		g_input_stream_read_all (input_stream,
+		                         query_array[i],
+		                         query_size,
+		                         &bytes_read,
+		                         NULL,
+		                         &error);
+
+		if (error) {
+			reply = dbus_message_new_error (info->call_message,
+			                                TRACKER_STEROIDS_INTERFACE ".UpdateError",
+			                                error->message);
+			dbus_connection_send (connection, reply, NULL);
+			dbus_message_unref (reply);
+
+			tracker_dbus_request_failed (request_id,
+			                             NULL,
+			                             NULL,
+			                             error->message);
+
+			g_strfreev (query_array);
+			g_object_unref (data_input_stream);
+			g_object_unref (input_stream);
+			g_error_free (error);
+			client_info_destroy (info);
+
+			return;
+		}
+
+		g_object_unref (data_input_stream);
+		g_object_unref (input_stream);
+
+		info->array_info.query_count++;
+	}
+
+	for (i = 0; query_array[i] != NULL; i++) {
+
+		tracker_dbus_request_debug (request_id,
+		                            NULL,
+		                            "query: '%s'",
+		                            query_array[i]);
+
+		tracker_store_sparql_update (query_array[i],
+		                             batch ? TRACKER_STORE_PRIORITY_LOW : TRACKER_STORE_PRIORITY_HIGH,
+		                             update_array_callback,
+		                             sender,
+		                             info,
+		                             NULL);
+	}
+
+	g_strfreev (query_array);
+}
+
 DBusHandlerResult
 tracker_steroids_connection_filter (DBusConnection *connection,
                                     DBusMessage    *message,
@@ -697,6 +941,11 @@ tracker_steroids_connection_filter (DBusConnection *connection,
 		return DBUS_HANDLER_RESULT_HANDLED;
 	}
 
+	if (!g_strcmp0 ("UpdateArray", dbus_message_get_member (message))) {
+		steroids_update_array (steroids, connection, message, FALSE);
+		return DBUS_HANDLER_RESULT_HANDLED;
+	}
+
 	if (!g_strcmp0 ("Update", dbus_message_get_member (message))) {
 		steroids_update (steroids, connection, message, FALSE, FALSE);
 		return DBUS_HANDLER_RESULT_HANDLED;
@@ -712,5 +961,10 @@ tracker_steroids_connection_filter (DBusConnection *connection,
 		return DBUS_HANDLER_RESULT_HANDLED;
 	}
 
+	if (!g_strcmp0 ("BatchUpdateArray", dbus_message_get_member (message))) {
+		steroids_update_array (steroids, connection, message, TRUE);
+		return DBUS_HANDLER_RESULT_HANDLED;
+	}
+
 	return DBUS_HANDLER_RESULT_NOT_YET_HANDLED;
 }



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]