[tracker/multi-insert: 3/11] libtracker-bus: Client API implementations for multi-insert
- From: Philip Van Hoof <pvanhoof src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [tracker/multi-insert: 3/11] libtracker-bus: Client API implementations for multi-insert
- Date: Thu, 7 Oct 2010 12:43:50 +0000 (UTC)
commit 4d5764ec26130f5d72264cf739eefe5835559631
Author: Philip Van Hoof <philip codeminded be>
Date: Wed Oct 6 13:19:48 2010 +0200
libtracker-bus: Client API implementations for multi-insert
src/libtracker-bus/tracker-bus-fd-update.c | 221 +++++++++++++++++++++++++++-
1 files changed, 217 insertions(+), 4 deletions(-)
---
diff --git a/src/libtracker-bus/tracker-bus-fd-update.c b/src/libtracker-bus/tracker-bus-fd-update.c
index 6ee3ca1..9cf1e63 100644
--- a/src/libtracker-bus/tracker-bus-fd-update.c
+++ b/src/libtracker-bus/tracker-bus-fd-update.c
@@ -203,6 +203,56 @@ sparql_update_fast_callback (DBusPendingCall *call,
dbus_pending_call_unref (call);
}
+static void
+sparql_update_array_fast_callback (DBusPendingCall *call,
+ void *user_data)
+{
+ FastAsyncData *fad = user_data;
+ DBusMessage *reply;
+ GError *error = NULL;
+ GPtrArray *errors;
+
+ /* Check for errors */
+ reply = dbus_pending_call_steal_reply (call);
+
+ if (dbus_message_get_type (reply) == DBUS_MESSAGE_TYPE_ERROR) {
+ error = sparql_error_from_dbus_message (reply);
+
+ g_simple_async_result_set_from_error (fad->res, error);
+
+ dbus_message_unref (reply);
+
+ g_simple_async_result_complete (fad->res);
+
+ fast_async_data_free (fad);
+
+ dbus_pending_call_unref (call);
+
+ return;
+ }
+
+ /* Call iterator callback */
+ switch (fad->operation_type) {
+ case FAST_UPDATE:
+ case FAST_UPDATE_BATCH:
+ // todo: read errors into errors
+
+ g_simple_async_result_set_op_res_gpointer (fad->res, errors, NULL);
+ g_simple_async_result_complete (fad->res);
+ break;
+ default:
+ g_assert_not_reached ();
+ break;
+ }
+
+ /* Clean up */
+ dbus_message_unref (reply);
+
+ fast_async_data_free (fad);
+
+ dbus_pending_call_unref (call);
+}
+
static DBusPendingCall *
sparql_update_fast_send (DBusConnection *connection,
const gchar *query,
@@ -297,6 +347,109 @@ sparql_update_fast_send (DBusConnection *connection,
return call;
}
+
+static DBusPendingCall *
+sparql_update_array_fast_send (DBusConnection *connection,
+ const gchar **queries,
+ guint queries_len,
+ FastOperationType type,
+ GError **error)
+{
+ const gchar *dbus_method;
+ DBusMessage *message;
+ DBusMessageIter iter;
+ DBusPendingCall *call;
+ int pipefd[2], i;
+ GOutputStream *output_stream;
+ GOutputStream *buffered_output_stream;
+ GDataOutputStream *data_output_stream;
+ GError *inner_error = NULL;
+
+ g_return_val_if_fail (queries != NULL, NULL);
+ g_return_val_if_fail (queries_len != 0, NULL);
+
+ if (pipe (pipefd) < 0) {
+ g_set_error (error,
+ TRACKER_SPARQL_ERROR,
+ TRACKER_SPARQL_ERROR_UNSUPPORTED,
+ "Cannot open pipe");
+ return NULL;
+ }
+
+ switch (type) {
+ case FAST_UPDATE:
+ dbus_method = "Update";
+ break;
+ case FAST_UPDATE_BATCH:
+ dbus_method = "BatchUpdate";
+ break;
+ default:
+ g_assert_not_reached ();
+ }
+
+ message = dbus_message_new_method_call (TRACKER_DBUS_SERVICE,
+ TRACKER_DBUS_OBJECT_STEROIDS,
+ TRACKER_DBUS_INTERFACE_STEROIDS,
+ dbus_method);
+ dbus_message_iter_init_append (message, &iter);
+ dbus_message_iter_append_basic (&iter, DBUS_TYPE_UNIX_FD, &pipefd[0]);
+ dbus_connection_send_with_reply (connection, message, &call, -1);
+ dbus_message_unref (message);
+ close (pipefd[0]);
+
+ if (!call) {
+ g_set_error (error,
+ TRACKER_SPARQL_ERROR,
+ TRACKER_SPARQL_ERROR_UNSUPPORTED,
+ "FD passing unsupported or connection disconnected");
+ return NULL;
+ }
+
+ output_stream = g_unix_output_stream_new (pipefd[1], TRUE);
+ buffered_output_stream = g_buffered_output_stream_new_sized (output_stream,
+ TRACKER_DBUS_PIPE_BUFFER_SIZE);
+ data_output_stream = g_data_output_stream_new (buffered_output_stream);
+
+ g_data_output_stream_put_uint32 (data_output_stream,
+ queries_len,
+ NULL,
+ &inner_error);
+
+ for (i = 0; i < queries_len; i++) {
+ const gchar *query = queries[i];
+
+ g_data_output_stream_put_int32 (data_output_stream, strlen (query),
+ NULL, &inner_error);
+
+ if (inner_error) {
+ g_propagate_error (error, inner_error);
+ g_object_unref (data_output_stream);
+ g_object_unref (buffered_output_stream);
+ g_object_unref (output_stream);
+ return NULL;
+ }
+
+ g_data_output_stream_put_string (data_output_stream,
+ query,
+ NULL,
+ &inner_error);
+
+ if (inner_error) {
+ g_propagate_error (error, inner_error);
+ g_object_unref (data_output_stream);
+ g_object_unref (buffered_output_stream);
+ g_object_unref (output_stream);
+ return NULL;
+ }
+ }
+
+ g_object_unref (data_output_stream);
+ g_object_unref (buffered_output_stream);
+ g_object_unref (output_stream);
+
+ return call;
+}
+
static DBusMessage *
sparql_update_fast (DBusConnection *connection,
const gchar *query,
@@ -345,6 +498,26 @@ sparql_update_fast_async (DBusConnection *connection,
dbus_pending_call_set_notify (call, sparql_update_fast_callback, fad, NULL);
}
+static void
+sparql_update_array_fast_async (DBusConnection *connection,
+ const gchar **queries,
+ guint queries_len,
+ FastAsyncData *fad,
+ GError **error)
+{
+ DBusPendingCall *call;
+
+ call = sparql_update_array_fast_send (connection, queries, queries_len, fad->operation_type, error);
+ if (!call) {
+ /* Do some clean up ?*/
+ return;
+ }
+
+ fad->dbus_call = call;
+
+ dbus_pending_call_set_notify (call, sparql_update_array_fast_callback, fad, NULL);
+}
+
/* Public API */
void
@@ -403,7 +576,27 @@ tracker_bus_fd_sparql_update_array_async (DBusGConnection *connection,
GAsyncReadyCallback callback,
gpointer user_data)
{
- // todo
+ FastAsyncData *fad;
+ GError *error = NULL;
+
+ g_return_if_fail (queries != NULL);
+ g_return_if_fail (queries_len != 0);
+
+ fad = fast_async_data_new (dbus_g_connection_get_connection (connection),
+ FAST_UPDATE, cancellable, user_data);
+
+ fad->res = g_simple_async_result_new (NULL, callback, user_data,
+ tracker_bus_fd_sparql_update_async);
+
+ sparql_update_array_fast_async (dbus_g_connection_get_connection (connection),
+ queries, queries_len, fad, &error);
+
+ if (error) {
+ g_critical ("Could not initiate update: %s", error->message);
+ g_error_free (error);
+ g_object_unref (fad->res);
+ fast_async_data_free (fad);
+ }
}
void
@@ -550,7 +743,27 @@ tracker_bus_fd_sparql_batch_update_array_async (DBusGConnection *connecti
GAsyncReadyCallback callback,
gpointer user_data)
{
- // todo
+ FastAsyncData *fad;
+ GError *error = NULL;
+
+ g_return_if_fail (queries != NULL);
+ g_return_if_fail (queries_len != 0);
+
+ fad = fast_async_data_new (dbus_g_connection_get_connection (connection),
+ FAST_UPDATE_BATCH, cancellable, user_data);
+
+ fad->res = g_simple_async_result_new (NULL, callback, user_data,
+ tracker_bus_fd_sparql_batch_update_async);
+
+ sparql_update_array_fast_async (dbus_g_connection_get_connection (connection),
+ queries, queries_len, fad, &error);
+
+ if (error) {
+ g_critical ("Could not initiate update: %s", error->message);
+ g_error_free (error);
+ g_object_unref (fad->res);
+ fast_async_data_free (fad);
+ }
}
void
@@ -565,7 +778,7 @@ tracker_bus_fd_sparql_batch_update_finish (GAsyncResult *res,
GPtrArray*
tracker_bus_fd_sparql_update_array_finish (GAsyncResult *res)
{
- g_return_if_fail (res != NULL);
+ g_return_val_if_fail (res != NULL, NULL);
// todo: check if ref is needed here
return g_ptr_array_ref (g_simple_async_result_get_op_res_gpointer (G_SIMPLE_ASYNC_RESULT (res)));
@@ -574,7 +787,7 @@ tracker_bus_fd_sparql_update_array_finish (GAsyncResult *res)
GPtrArray*
tracker_bus_fd_sparql_batch_update_array_finish (GAsyncResult *res)
{
- g_return_if_fail (res != NULL);
+ g_return_val_if_fail (res != NULL, NULL);
// todo: check if ref is needed here
return g_ptr_array_ref (g_simple_async_result_get_op_res_gpointer (G_SIMPLE_ASYNC_RESULT (res)));
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]