[tracker/multi-insert: 3/11] libtracker-bus: multi-insert: Client API impl.



commit 61941c0e1ab0c440501ed8022f2787517ea1a3e4
Author: Philip Van Hoof <philip codeminded be>
Date:   Wed Oct 6 13:19:48 2010 +0200

    libtracker-bus: multi-insert: Client API impl.

 src/libtracker-bus/tracker-bus-fd-update.c |  221 +++++++++++++++++++++++++++-
 1 files changed, 217 insertions(+), 4 deletions(-)
---
diff --git a/src/libtracker-bus/tracker-bus-fd-update.c b/src/libtracker-bus/tracker-bus-fd-update.c
index 6ee3ca1..9cf1e63 100644
--- a/src/libtracker-bus/tracker-bus-fd-update.c
+++ b/src/libtracker-bus/tracker-bus-fd-update.c
@@ -203,6 +203,56 @@ sparql_update_fast_callback (DBusPendingCall *call,
 	dbus_pending_call_unref (call);
 }
 
+static void
+sparql_update_array_fast_callback (DBusPendingCall *call,
+                                   void            *user_data)
+{
+	FastAsyncData *fad = user_data;
+	DBusMessage *reply;
+	GError *error = NULL;
+	GPtrArray *errors;
+
+	/* Check for errors */
+	reply = dbus_pending_call_steal_reply (call);
+
+	if (dbus_message_get_type (reply) == DBUS_MESSAGE_TYPE_ERROR) {
+		error = sparql_error_from_dbus_message (reply);
+
+		g_simple_async_result_set_from_error (fad->res, error);
+
+		dbus_message_unref (reply);
+
+		g_simple_async_result_complete (fad->res);
+
+		fast_async_data_free (fad);
+
+		dbus_pending_call_unref (call);
+
+		return;
+	}
+
+	/* Call iterator callback */
+	switch (fad->operation_type) {
+	case FAST_UPDATE:
+	case FAST_UPDATE_BATCH:
+		// todo: read errors into errors
+
+		g_simple_async_result_set_op_res_gpointer (fad->res, errors, NULL);
+		g_simple_async_result_complete (fad->res);
+		break;
+	default:
+		g_assert_not_reached ();
+		break;
+	}
+
+	/* Clean up */
+	dbus_message_unref (reply);
+
+	fast_async_data_free (fad);
+
+	dbus_pending_call_unref (call);
+}
+
 static DBusPendingCall *
 sparql_update_fast_send (DBusConnection     *connection,
                          const gchar        *query,
@@ -297,6 +347,109 @@ sparql_update_fast_send (DBusConnection     *connection,
 	return call;
 }
 
+
+static DBusPendingCall *
+sparql_update_array_fast_send (DBusConnection     *connection,
+                               const gchar       **queries,
+                               guint               queries_len,
+                               FastOperationType   type,
+                               GError            **error)
+{
+	const gchar *dbus_method;
+	DBusMessage *message;
+	DBusMessageIter iter;
+	DBusPendingCall *call;
+	int pipefd[2], i;
+	GOutputStream *output_stream;
+	GOutputStream *buffered_output_stream;
+	GDataOutputStream *data_output_stream;
+	GError *inner_error = NULL;
+
+	g_return_val_if_fail (queries != NULL, NULL);
+	g_return_val_if_fail (queries_len != 0, NULL);
+
+	if (pipe (pipefd) < 0) {
+		g_set_error (error,
+		             TRACKER_SPARQL_ERROR,
+		             TRACKER_SPARQL_ERROR_UNSUPPORTED,
+		             "Cannot open pipe");
+		return NULL;
+	}
+
+	switch (type) {
+	case FAST_UPDATE:
+		dbus_method = "Update";
+		break;
+	case FAST_UPDATE_BATCH:
+		dbus_method = "BatchUpdate";
+		break;
+	default:
+		g_assert_not_reached ();
+	}
+
+	message = dbus_message_new_method_call (TRACKER_DBUS_SERVICE,
+	                                        TRACKER_DBUS_OBJECT_STEROIDS,
+	                                        TRACKER_DBUS_INTERFACE_STEROIDS,
+	                                        dbus_method);
+	dbus_message_iter_init_append (message, &iter);
+	dbus_message_iter_append_basic (&iter, DBUS_TYPE_UNIX_FD, &pipefd[0]);
+	dbus_connection_send_with_reply (connection, message, &call, -1);
+	dbus_message_unref (message);
+	close (pipefd[0]);
+
+	if (!call) {
+		g_set_error (error,
+		             TRACKER_SPARQL_ERROR,
+		             TRACKER_SPARQL_ERROR_UNSUPPORTED,
+		             "FD passing unsupported or connection disconnected");
+		return NULL;
+	}
+
+	output_stream = g_unix_output_stream_new (pipefd[1], TRUE);
+	buffered_output_stream = g_buffered_output_stream_new_sized (output_stream,
+	                                                             TRACKER_DBUS_PIPE_BUFFER_SIZE);
+	data_output_stream = g_data_output_stream_new (buffered_output_stream);
+
+	g_data_output_stream_put_uint32 (data_output_stream,
+	                                 queries_len,
+	                                 NULL,
+	                                 &inner_error);
+
+	for (i = 0; i < queries_len; i++) {
+		const gchar *query = queries[i];
+
+		g_data_output_stream_put_int32 (data_output_stream, strlen (query),
+		                                NULL, &inner_error);
+
+		if (inner_error) {
+			g_propagate_error (error, inner_error);
+			g_object_unref (data_output_stream);
+			g_object_unref (buffered_output_stream);
+			g_object_unref (output_stream);
+			return NULL;
+		}
+
+		g_data_output_stream_put_string (data_output_stream,
+		                                 query,
+		                                 NULL,
+		                                 &inner_error);
+
+		if (inner_error) {
+			g_propagate_error (error, inner_error);
+			g_object_unref (data_output_stream);
+			g_object_unref (buffered_output_stream);
+			g_object_unref (output_stream);
+			return NULL;
+		}
+	}
+
+	g_object_unref (data_output_stream);
+	g_object_unref (buffered_output_stream);
+	g_object_unref (output_stream);
+
+	return call;
+}
+
 static DBusMessage *
 sparql_update_fast (DBusConnection     *connection,
                     const gchar        *query,
@@ -345,6 +498,26 @@ sparql_update_fast_async (DBusConnection      *connection,
 	dbus_pending_call_set_notify (call, sparql_update_fast_callback, fad, NULL);
 }
 
+static void
+sparql_update_array_fast_async (DBusConnection      *connection,
+                                const gchar        **queries,
+                                guint                queries_len,
+                                FastAsyncData       *fad,
+                                GError             **error)
+{
+	DBusPendingCall *call;
+
+	call = sparql_update_array_fast_send (connection, queries, queries_len, fad->operation_type, error);
+	if (!call) {
+		/* Do some clean up ?*/
+		return;
+	}
+
+	fad->dbus_call = call;
+
+	dbus_pending_call_set_notify (call, sparql_update_array_fast_callback, fad, NULL);
+}
+
 /* Public API */
 
 void
@@ -403,7 +576,27 @@ tracker_bus_fd_sparql_update_array_async (DBusGConnection       *connection,
                                           GAsyncReadyCallback    callback,
                                           gpointer               user_data)
 {
-	// todo
+	FastAsyncData *fad;
+	GError *error = NULL;
+
+	g_return_if_fail (queries != NULL);
+	g_return_if_fail (queries_len != 0);
+
+	fad = fast_async_data_new (dbus_g_connection_get_connection (connection),
+	                           FAST_UPDATE, cancellable, user_data);
+
+	fad->res = g_simple_async_result_new (NULL, callback, user_data,
+	                                      tracker_bus_fd_sparql_update_async);
+
+	sparql_update_array_fast_async (dbus_g_connection_get_connection (connection),
+	                                queries, queries_len, fad, &error);
+
+	if (error) {
+		g_critical ("Could not initiate update: %s", error->message);
+		g_error_free (error);
+		g_object_unref (fad->res);
+		fast_async_data_free (fad);
+	}
 }
 
 void
@@ -550,7 +743,27 @@ tracker_bus_fd_sparql_batch_update_array_async (DBusGConnection        *connecti
                                                 GAsyncReadyCallback     callback,
                                                 gpointer                 user_data)
 {
-	// todo
+	FastAsyncData *fad;
+	GError *error = NULL;
+
+	g_return_if_fail (queries != NULL);
+	g_return_if_fail (queries_len != 0);
+
+	fad = fast_async_data_new (dbus_g_connection_get_connection (connection),
+	                           FAST_UPDATE_BATCH, cancellable, user_data);
+
+	fad->res = g_simple_async_result_new (NULL, callback, user_data,
+	                                      tracker_bus_fd_sparql_batch_update_async);
+
+	sparql_update_array_fast_async (dbus_g_connection_get_connection (connection),
+	                                queries, queries_len, fad, &error);
+
+	if (error) {
+		g_critical ("Could not initiate update: %s", error->message);
+		g_error_free (error);
+		g_object_unref (fad->res);
+		fast_async_data_free (fad);
+	}
 }
 
 void
@@ -565,7 +778,7 @@ tracker_bus_fd_sparql_batch_update_finish (GAsyncResult     *res,
 GPtrArray*
 tracker_bus_fd_sparql_update_array_finish (GAsyncResult *res)
 {
-	g_return_if_fail (res != NULL);
+	g_return_val_if_fail (res != NULL, NULL);
 
 	// todo: check if ref is needed here 
 	return g_ptr_array_ref (g_simple_async_result_get_op_res_gpointer (G_SIMPLE_ASYNC_RESULT (res)));
@@ -574,7 +787,7 @@ tracker_bus_fd_sparql_update_array_finish (GAsyncResult *res)
 GPtrArray*
 tracker_bus_fd_sparql_batch_update_array_finish (GAsyncResult *res)
 {
-	g_return_if_fail (res != NULL);
+	g_return_val_if_fail (res != NULL, NULL);
 
 	// todo: check if ref is needed here 
 	return g_ptr_array_ref (g_simple_async_result_get_op_res_gpointer (G_SIMPLE_ASYNC_RESULT (res)));



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]