[tracker/dbus-fd-experiment-squashed: 2/6] libtracker-client: Add Steroids support
- From: Adrien Bustany <abustany src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [tracker/dbus-fd-experiment-squashed: 2/6] libtracker-client: Add Steroids support
- Date: Fri, 4 Jun 2010 01:19:22 +0000 (UTC)
commit 426116aad042b78deba34dcc2820de2bdd10901e
Author: Adrien Bustany <abustany gnome org>
Date: Mon May 24 08:52:02 2010 -0400
libtracker-client: Add Steroids support
This commit add several function in libtracker-client to exploit the new
Steroids interface.
The main function is tracker_resources_sparql_query_iterate, which will
return a TrackerResultIterator. This iterator can then be used to
iterate over the results using the tracker_result_iterator_* functions.
Note that iteration is not done on DB side, all results are first fetch
into a buffer on client side. This is because keeping an iterator on
server side would block access to other clients, SQLite not being MVCC.
src/libtracker-client/tracker.c | 1052 +++++++++++++++++++++++++++++++++++++++
src/libtracker-client/tracker.h | 50 ++
2 files changed, 1102 insertions(+), 0 deletions(-)
---
diff --git a/src/libtracker-client/tracker.c b/src/libtracker-client/tracker.c
index 455fb86..0e24620 100644
--- a/src/libtracker-client/tracker.c
+++ b/src/libtracker-client/tracker.c
@@ -27,12 +27,20 @@
#include <dbus/dbus-glib-bindings.h>
#include <libtracker-common/tracker-dbus.h>
+#include <tracker-store/tracker-steroids.h>
+
+#include <errno.h>
+#include <sys/types.h>
+#include <unistd.h>
#include "tracker.h"
#include "tracker-resources-glue.h"
#include "tracker-statistics-glue.h"
+/* sleep delay to emulate dbus_pending_call_block, in us */
+#define NOT_TOO_SHORT_DELAY 1000
+
/**
* SECTION:tracker
* @short_description: A client library for querying and inserting
@@ -92,6 +100,7 @@
**/
typedef struct {
+ DBusGConnection *connection;
DBusGProxy *proxy_statistics;
DBusGProxy *proxy_resources;
@@ -104,6 +113,10 @@ typedef struct {
GList *writeback_callbacks;
gboolean is_constructed;
+
+#ifdef HAVE_DBUS_FD_PASSING
+ GThreadPool *thread_pool;
+#endif
} TrackerClientPrivate;
typedef struct {
@@ -143,6 +156,52 @@ typedef struct {
#endif /* TRACKER_DISABLE_DEPRECATED */
+struct TrackerResultIterator {
+#ifdef HAVE_DBUS_FD_PASSING
+ int fd;
+ int rc;
+ char *buffer;
+ int message_size;
+ int buffer_size;
+ int buffer_index;
+ char *large_row_buf;
+
+ guint n_columns;
+ int *offsets;
+ char *data;
+ gboolean has_next;
+#else
+ GPtrArray *results;
+ gint current_row;
+#endif
+};
+
+#ifdef HAVE_DBUS_FD_PASSING
+typedef enum {
+ FAST_QUERY,
+ FAST_UPDATE,
+ FAST_UPDATE_BLANK,
+ FAST_UPDATE_BATCH
+} FastOperationType;
+
+typedef struct {
+ FastOperationType operation;
+ gchar *query;
+ GError *error;
+ gpointer user_data;
+ union {
+ GPtrArray *result_gptrarray;
+ TrackerResultIterator *result_iterator;
+ };
+ union {
+ TrackerReplyGPtrArray gptrarray_callback;
+ TrackerReplyVoid void_callback;
+ TrackerReplyArray array_callback;
+ TrackerReplyIterator iterator_callback;
+ };
+} FastAsyncData;
+#endif
+
static gboolean is_service_available (void);
static void client_finalize (GObject *object);
static void client_set_property (GObject *object,
@@ -222,6 +281,74 @@ writeback_cb (DBusGProxy *proxy,
}
}
+static gboolean
+thread_callback (gpointer data)
+{
+ FastAsyncData *async_data = data;
+
+ switch (async_data->operation) {
+ case FAST_QUERY:
+ (* async_data->iterator_callback) (async_data->result_iterator,
+ async_data->error,
+ async_data->user_data);
+ break;
+ case FAST_UPDATE:
+ case FAST_UPDATE_BATCH:
+ (* async_data->void_callback) (async_data->error,
+ async_data->user_data);
+ break;
+ case FAST_UPDATE_BLANK:
+ (* async_data->gptrarray_callback) (async_data->result_gptrarray,
+ async_data->error,
+ async_data->user_data);
+ break;
+ default:
+ g_assert_not_reached ();
+ }
+
+ g_free (async_data->query);
+
+ if (async_data->error) {
+ g_error_free (async_data->error);
+ }
+
+ return FALSE;
+}
+
+static void
+thread_dispatch (gpointer data, gpointer user_data)
+{
+ FastAsyncData *async_data = data;
+ TrackerClient *client = user_data;
+
+ switch (async_data->operation) {
+ case FAST_QUERY:
+ async_data->result_iterator = tracker_resources_sparql_query_iterate (client,
+ async_data->query,
+ &async_data->error);
+ break;
+ case FAST_UPDATE:
+ tracker_resources_sparql_update_fast (client,
+ async_data->query,
+ &async_data->error);
+ break;
+ case FAST_UPDATE_BLANK:
+ async_data->result_gptrarray = tracker_resources_sparql_update_blank_fast (client,
+ async_data->query,
+ &async_data->error);
+ break;
+ case FAST_UPDATE_BATCH:
+ tracker_resources_batch_sparql_update_fast (client,
+ async_data->query,
+ &async_data->error);
+ break;
+ default:
+ g_assert_not_reached ();
+ }
+
+ g_idle_add (thread_callback, async_data);
+}
+
static void
tracker_client_class_init (TrackerClientClass *klass)
{
@@ -280,6 +407,12 @@ client_finalize (GObject *object)
if (private->pending_calls) {
g_hash_table_unref (private->pending_calls);
}
+
+ if (private->thread_pool) {
+ g_thread_pool_free (private->thread_pool,
+ TRUE,
+ TRUE);
+ }
}
static void
@@ -360,6 +493,8 @@ client_constructed (GObject *object)
return;
}
+ private->connection = connection;
+
private->proxy_statistics =
dbus_g_proxy_new_for_name (connection,
TRACKER_DBUS_SERVICE,
@@ -383,9 +518,23 @@ client_constructed (GObject *object)
TRACKER_TYPE_STR_STRV_MAP,
G_TYPE_INVALID);
+#ifdef HAVE_DBUS_FD_PASSING
+ private->thread_pool = g_thread_pool_new (thread_dispatch,
+ object,
+ 5,
+ FALSE,
+ NULL);
+#endif
+
private->is_constructed = TRUE;
}
+GQuark
+tracker_client_error_quark (void)
+{
+ return g_quark_from_static_string (TRACKER_CLIENT_ERROR_DOMAIN);
+}
+
static void
callback_with_gptrarray (DBusGProxy *proxy,
GPtrArray *OUT_result,
@@ -677,6 +826,311 @@ find_conversion (const char *format,
return start;
}
+#ifdef HAVE_DBUS_FD_PASSING
+static int
+buffer_read_int (char *buf)
+{
+ int result = 0;
+
+ result += (((unsigned char)*(buf++)));
+ result += (((unsigned char)*(buf++)) << 8);
+ result += (((unsigned char)*(buf++)) << 16);
+ result += (((unsigned char)*(buf++)) << 24);
+
+ return result;
+}
+
+static void
+buffer_write_int (char *buf, int value)
+{
+ memset (buf++, (value ) & 0xff, sizeof (char));
+ memset (buf++, (value >> 8) & 0xff, sizeof (char));
+ memset (buf++, (value >> 16) & 0xff, sizeof (char));
+ memset (buf++, (value >> 24) & 0xff, sizeof (char));
+}
+
+static int
+iterator_buffer_read_int (TrackerResultIterator *iterator)
+{
+ int result;
+
+ result = buffer_read_int (iterator->buffer + iterator->buffer_index);
+
+ iterator->buffer_index += sizeof (int);
+
+ return result;
+}
+
+static gboolean
+pipe_read (int fd, char *dst, int size)
+{
+ ssize_t readsofar = 0;
+ ssize_t ret;
+
+ while (readsofar < size) {
+ ret = read (fd,
+ dst + readsofar,
+ size - readsofar);
+ if (ret < 0) {
+ switch (errno) {
+ case EAGAIN:
+ break;
+ case EPIPE:
+ g_critical ("SIGPIPE in pipe_read");
+ return FALSE;
+ default:
+ g_critical ("write returned %d in pipe_read", errno);
+ return FALSE;
+ }
+ } else {
+ readsofar += ret;
+ }
+ }
+
+ return TRUE;
+}
+
+static gboolean
+pipe_write (int fd, const char *src, int size)
+{
+ ssize_t sent = 0;
+ ssize_t ret;
+
+ while (sent < size) {
+ ret = write (fd,
+ src + sent,
+ size - sent);
+ if (ret < 0) {
+ switch (errno) {
+ case EAGAIN:
+ break;
+ case EPIPE:
+ g_critical ("SIGPIPE in pipe_write");
+ return FALSE;
+ default:
+ g_critical ("write returned %d in pipe_write", errno);
+ return FALSE;
+ }
+ } else {
+ sent += ret;
+ }
+ }
+
+ return TRUE;
+}
+
+static int
+iterator_buffer_fill (TrackerResultIterator *iterator)
+{
+ int rc;
+ int n_columns;
+ int *offsets;
+ int row_size;
+ int rows = 0;
+ gboolean force_page_load = FALSE;
+
+ for (;;) {
+ iterator->message_size += TRACKER_STEROIDS_BUFFER_SIZE;
+
+ while (iterator->message_size > iterator->buffer_size) {
+ iterator->buffer = g_realloc (iterator->buffer, 2 * iterator->buffer_size);
+ iterator->buffer_size *= 2;
+ }
+
+ if (!pipe_read (iterator->fd,
+ iterator->buffer + iterator->buffer_index,
+ TRACKER_STEROIDS_BUFFER_SIZE)) {
+ return -1;
+ }
+
+ for (;;) {
+ if ((unsigned char) (*(iterator->buffer + iterator->buffer_index)) == TRACKER_STEROIDS_EOP ||
+ force_page_load) {
+ force_page_load = FALSE;
+ break;
+ }
+
+ rc = iterator_buffer_read_int (iterator);
+
+ switch (rc) {
+ case TRACKER_STEROIDS_RC_LARGEROW:
+ row_size = iterator_buffer_read_int (iterator);
+
+ while (iterator->buffer_size - iterator->buffer_index < row_size) {
+ iterator->buffer = g_realloc (iterator->buffer, 2 * iterator->buffer_size);
+ iterator->buffer_size *= 2;
+ }
+
+ if (!pipe_read (iterator->fd,
+ iterator->buffer + iterator->buffer_index + TRACKER_STEROIDS_BUFFER_SIZE - 2 * sizeof (int),
+ row_size - TRACKER_STEROIDS_BUFFER_SIZE + 2 * sizeof (int))) {
+ return -1;
+ }
+
+ force_page_load = TRUE;
+
+
+ /* Fall through to normal row handling */
+ case TRACKER_STEROIDS_RC_ROW:
+ n_columns = iterator_buffer_read_int (iterator);
+ iterator->n_columns = n_columns;
+ offsets = (int *) (iterator->buffer + iterator->buffer_index);
+ iterator->buffer_index += sizeof (int) * n_columns;
+ iterator->buffer_index += offsets[iterator->n_columns - 1] + 1;
+ rows ++;
+ break;
+ default:
+ goto end_of_results;
+ }
+ }
+ }
+
+end_of_results:
+ return rows;
+}
+
+static DBusMessage*
+sparql_update_fast (TrackerClient *client,
+ const gchar *query,
+ FastOperationType type,
+ GError **error)
+{
+ TrackerClientPrivate *private;
+ DBusConnection *connection;
+ gchar *dbus_method;
+ DBusMessage *message;
+ DBusMessageIter iter;
+ DBusMessage *reply;
+ DBusPendingCall *call;
+ DBusError dbus_error;
+ int pipefd[2];
+ char *query_size_buffer;
+ int query_len;
+ int query_index;
+
+ g_return_val_if_fail (TRACKER_IS_CLIENT (client), NULL);
+ g_return_val_if_fail (query != NULL, NULL);
+
+ private = TRACKER_CLIENT_GET_PRIVATE (client);
+
+ if (pipe (pipefd) < 0) {
+ g_set_error (error,
+ TRACKER_CLIENT_ERROR,
+ TRACKER_CLIENT_ERROR_UNSUPPORTED,
+ "Cannot open pipe");
+ return NULL;
+ }
+
+ connection = dbus_g_connection_get_connection (private->connection);
+
+ dbus_error_init (&dbus_error);
+
+ switch (type) {
+ case FAST_UPDATE:
+ dbus_method = "Update";
+ break;
+ case FAST_UPDATE_BLANK:
+ dbus_method = "UpdateBlank";
+ break;
+ case FAST_UPDATE_BATCH:
+ dbus_method = "BatchUpdate";
+ break;
+ default:
+ g_assert_not_reached ();
+ }
+
+ message = dbus_message_new_method_call (TRACKER_STEROIDS_SERVICE,
+ TRACKER_STEROIDS_PATH,
+ TRACKER_STEROIDS_INTERFACE,
+ dbus_method);
+ dbus_message_iter_init_append (message, &iter);
+ dbus_message_iter_append_basic (&iter, DBUS_TYPE_UNIX_FD, &pipefd[0]);
+ dbus_connection_send_with_reply (connection,
+ message,
+ &call,
+ -1);
+ dbus_message_unref (message);
+ close (pipefd[0]);
+
+ if (!call) {
+ g_set_error (error,
+ TRACKER_CLIENT_ERROR,
+ TRACKER_CLIENT_ERROR_UNSUPPORTED,
+ "FD passing unsupported or connection disconnected");
+ return NULL;
+ }
+
+ query_size_buffer = g_malloc (sizeof (int));
+
+ /* We don't need to null terminate, the store will do it for us */
+ query_len = strlen (query);
+ query_index = 0;
+ buffer_write_int (query_size_buffer, query_len);
+
+ pipe_write (pipefd[1], query_size_buffer, sizeof (int));
+
+ if ((sizeof (int) + query_len) > TRACKER_STEROIDS_BUFFER_SIZE) {
+ pipe_write (pipefd[1], query, TRACKER_STEROIDS_BUFFER_SIZE - sizeof (int));
+ query_index += TRACKER_STEROIDS_BUFFER_SIZE - sizeof (int);
+ }
+
+ while (query_index < query_len) {
+ int to_send = MIN(TRACKER_STEROIDS_BUFFER_SIZE, query_len - query_index);
+ pipe_write (pipefd[1], query + query_index, to_send);
+ query_index += to_send;
+ }
+
+ close (pipefd[1]);
+
+ g_free (query_size_buffer);
+
+ /* dbus_pending_call_block (call); */
+ while (!dbus_pending_call_get_completed (call)) {
+ g_usleep (NOT_TOO_SHORT_DELAY);
+ }
+
+ reply = dbus_pending_call_steal_reply (call);
+
+ g_assert (reply);
+
+ if (dbus_message_get_type (reply) == DBUS_MESSAGE_TYPE_ERROR) {
+ dbus_set_error_from_message (&dbus_error, reply);
+ dbus_set_g_error (error, &dbus_error);
+ dbus_pending_call_unref (call);
+ return NULL;
+ }
+
+ dbus_pending_call_unref (call);
+
+ return reply;
+}
+
+static GHashTable*
+unmarshall_hash_table (DBusMessageIter *iter) {
+ GHashTable *result;
+ DBusMessageIter subiter, subsubiter;
+
+ result = g_hash_table_new (g_str_hash, g_str_equal);
+
+ dbus_message_iter_recurse (iter, &subiter);
+
+ while (dbus_message_iter_get_arg_type (&subiter) != DBUS_TYPE_INVALID) {
+ const gchar *key, *value;
+
+ dbus_message_iter_recurse (&subiter, &subsubiter);
+ dbus_message_iter_get_basic (&subsubiter, &key);
+ dbus_message_iter_next (&subsubiter);
+ dbus_message_iter_get_basic (&subsubiter, &value);
+ g_hash_table_insert (result, g_strdup (key), g_strdup (value));
+
+ dbus_message_iter_next (&subiter);
+ }
+
+ return result;
+}
+
+#endif
+
/**
* tracker_uri_vprintf_escaped:
* @format: a standard printf() format string, but notice
@@ -1046,6 +1500,380 @@ tracker_resources_sparql_query (TrackerClient *client,
}
/**
+ * tracker_resources_sparql_query_iterate:
+ * @client: a #TrackerClient.
+ * @query: a string representing SPARQL.
+ * @error: a #GError.
+ *
+ * Queries the database using SPARQL, and returns an iterator instead of an
+ * array with all the results inside.
+ *
+ * Using an iterator will lower the memory usage. Additionally, this function
+ * uses a pipe when available get the results from Tracker store, which is
+ * roughly two times faster than using DBus.
+ *
+ * This API call is completely synchronous so it may block.
+ *
+ * <example>
+ * <title>Using tracker_resources_sparql_query_iterate(<!-- -->)</title>
+ * An example of using tracker_resources_sparql_query_iterate() to list all
+ * albums by title and include their song count and song total length.
+ * <programlisting>
+ * TrackerClient *client;
+ * TrackerResultIterator *iterator;
+ * GError *error = NULL;
+ * const gchar *query;
+ *
+ * /* Create D-Bus connection with no warnings and maximum timeout. */
+ * client = tracker_client_new (0, G_MAXINT);
+ * query = "SELECT {"
+ * " ?album"
+ * " ?title"
+ * " COUNT(?song) AS songs"
+ * " SUM(?length) AS totallength"
+ * "} WHERE {"
+ * " ?album a nmm:MusicAlbum ;"
+ * " nie:title ?title ."
+ * " ?song nmm:musicAlbum ?album ;"
+ * " nfo:duration ?length"
+ * "} "
+ * "GROUP BY (?album");
+ *
+ * iterator = tracker_resources_sparql_query_iterate (client, query, &error);
+ *
+ * if (error) {
+ * g_warning ("Could not query Tracker, %s", error->message);
+ * g_error_free (error);
+ * g_object_unref (client);
+ * return;
+ * }
+ *
+ * while (tracker_result_iterator_has_next (iterator)) {
+ * tracker_result_iterator_next (iterator);
+ *
+ * g_message ("Album: %s, Title: %s",
+ * tracker_result_iterator_value (iterator, 0),
+ * tracker_result_iterator_value (iterator, 1));
+ * }
+ *
+ * tracker_result_iterator_free (iterator);
+ *
+ * </programlisting>
+ * </example>
+ *
+ * Returns: A #TrackerResultIterator pointing before the first result row. This
+ * iterator must be disposed when done using tracker_result_iterator_free().
+ *
+ * Since: 0.9
+ **/
+TrackerResultIterator*
+tracker_resources_sparql_query_iterate (TrackerClient *client,
+ const gchar *query,
+ GError **error)
+{
+#ifdef HAVE_DBUS_FD_PASSING
+ TrackerClientPrivate *private;
+ DBusConnection *connection;
+ DBusMessage *message;
+ DBusMessageIter iter;
+ DBusMessage *reply;
+ DBusPendingCall *call;
+ DBusError dbus_error;
+ TrackerResultIterator *iterator;
+ int pipefd[2];
+
+ g_return_val_if_fail (TRACKER_IS_CLIENT (client), NULL);
+ g_return_val_if_fail (query, NULL);
+
+ private = TRACKER_CLIENT_GET_PRIVATE (client);
+
+ if (pipe (pipefd) < 0) {
+ g_set_error (error,
+ TRACKER_CLIENT_ERROR,
+ TRACKER_CLIENT_ERROR_UNSUPPORTED,
+ "Cannot open pipe");
+ return NULL;
+ }
+
+ connection = dbus_g_connection_get_connection (private->connection);
+
+ dbus_error_init (&dbus_error);
+
+ message = dbus_message_new_method_call (TRACKER_STEROIDS_SERVICE,
+ TRACKER_STEROIDS_PATH,
+ TRACKER_STEROIDS_INTERFACE,
+ "Query");
+
+ dbus_message_iter_init_append (message, &iter);
+ dbus_message_iter_append_basic (&iter, DBUS_TYPE_STRING, &query);
+ dbus_message_iter_append_basic (&iter, DBUS_TYPE_UNIX_FD, &pipefd[1]);
+
+ dbus_connection_send_with_reply (connection,
+ message,
+ &call,
+ -1);
+ dbus_message_unref (message);
+ close (pipefd[1]);
+
+ if (!call) {
+ g_set_error (error,
+ TRACKER_CLIENT_ERROR,
+ TRACKER_CLIENT_ERROR_UNSUPPORTED,
+ "FD passing unsupported or connection disconnected");
+ return NULL;
+ }
+
+ iterator = g_slice_new0 (TrackerResultIterator);
+ iterator->fd = pipefd[0];
+ iterator->buffer_size = TRACKER_STEROIDS_BUFFER_SIZE;
+ iterator->buffer = g_malloc (TRACKER_STEROIDS_BUFFER_SIZE);
+
+ if (iterator_buffer_fill (iterator) < 0) {
+ g_set_error (error,
+ TRACKER_CLIENT_ERROR,
+ TRACKER_CLIENT_ERROR_BROKEN_PIPE,
+ "Couldn't get results from server");
+ tracker_result_iterator_free (iterator);
+ return NULL;
+ }
+
+ iterator->buffer_index = 0;
+ iterator->rc = iterator_buffer_read_int (iterator);
+
+ /* Reset the iterator internal state */
+ iterator->buffer_index = 0;
+
+ if (iterator->rc == TRACKER_STEROIDS_RC_ROW ||
+ iterator->rc == TRACKER_STEROIDS_RC_LARGEROW) {
+ iterator->has_next = TRUE;
+ }
+
+ /* dbus_pending_call_block (call); */
+ /* This is a ugly workaround for a race condition in libdbus when you're
+ * calling dbus_pending_call_block from a thread which is not the thread
+ * where the DBus dispatch call happens. This is the case when the function
+ * is run from tracker_resources_sparql_query_iterate_async.
+ * See http://lists.freedesktop.org/archives/dbus/2009-March/011105.html
+ * for a lengthy explication
+ */
+ while (!dbus_pending_call_get_completed (call)) {
+ g_usleep (NOT_TOO_SHORT_DELAY);
+ }
+
+ reply = dbus_pending_call_steal_reply (call);
+
+ g_assert (reply);
+
+ if (dbus_message_get_type (reply) == DBUS_MESSAGE_TYPE_ERROR) {
+ dbus_set_error_from_message (&dbus_error, reply);
+ dbus_set_g_error (error, &dbus_error);
+ dbus_pending_call_unref (call);
+ return NULL;
+ }
+
+ dbus_message_unref (reply);
+
+ dbus_pending_call_unref (call);
+
+ return iterator;
+#else
+ TrackerResultIterator *iterator;
+ GError *inner_error = NULL;
+
+ g_return_val_if_fail (TRACKER_IS_CLIENT (client), NULL);
+ g_return_val_if_fail (query, NULL);
+
+ iterator = g_slice_new0 (TrackerResultIterator);
+
+ iterator->results = tracker_resources_sparql_query (client, query, &inner_error);
+ iterator->current_row = -1;
+
+ if (inner_error) {
+ g_propagate_error (error, inner_error);
+ g_slice_free (TrackerResultIterator, iterator);
+ iterator = NULL;
+ }
+
+ return iterator;
+#endif
+}
+
+/**
+ * tracker_result_iterator_free:
+ * @iterator: A TrackerResultIterator
+ *
+ * Frees a TrackerResultIterator and its associated resources
+ *
+ * Since: 0.9
+ **/
+void
+tracker_result_iterator_free (TrackerResultIterator *iterator)
+{
+#ifndef HAVE_DBUS_FD_PASSING
+ g_ptr_array_foreach (iterator->results, (GFunc) g_free, NULL);
+ g_ptr_array_free (iterator->results, TRUE);
+#else
+ if (iterator->buffer) {
+ g_free (iterator->buffer);
+ }
+ g_slice_free (TrackerResultIterator, iterator);
+#endif
+}
+
+/**
+ * tracker_result_iterator_n_columns:
+ * @iterator: A TrackerResultIterator
+ *
+ * Returns: the number of columns in the row pointed by @iterator
+ *
+ * Since: 0.9
+ **/
+guint
+tracker_result_iterator_n_columns (TrackerResultIterator *iterator)
+{
+#ifdef HAVE_DBUS_FD_PASSING
+ g_return_val_if_fail (iterator, 0);
+
+ return iterator->n_columns;
+#else
+ GStrv row;
+ guint i = 0;
+
+ g_return_val_if_fail (iterator, 0);
+
+ if (!iterator->results->len) {
+ return 0;
+ }
+
+ row = g_ptr_array_index (iterator->results, 0);
+
+ while (row[i++]) {
+ }
+
+ return i - 1;
+#endif
+}
+
+/**
+ * tracker_result_iterator_has_next:
+ * @iterator: A TrackerResultIterator
+ *
+ * Checks if the iterator has more rows
+ *
+ * Returns: TRUE if there are more rows to fetch, FALSE else
+ *
+ * Since: 0.9
+ **/
+gboolean
+tracker_result_iterator_has_next (TrackerResultIterator *iterator)
+{
+#ifdef HAVE_DBUS_FD_PASSING
+ g_return_val_if_fail (iterator, FALSE);
+
+ return iterator->has_next;
+#else
+ g_return_val_if_fail (iterator, FALSE);
+
+ if (!iterator->results->len) {
+ return FALSE;
+ }
+
+ return (iterator->current_row < (gint)(iterator->results->len - 1));
+#endif
+}
+
+/**
+ * tracker_result_iterator_next:
+ * @iterator: A TrackerResultIterator
+ *
+ * Fetches the next results row.
+ *
+ * Since: 0.9
+ **/
+void
+tracker_result_iterator_next (TrackerResultIterator *iterator)
+{
+#ifdef HAVE_DBUS_FD_PASSING
+ int nextrc;
+
+ iterator->rc = iterator_buffer_read_int (iterator);
+ switch (iterator->rc) {
+ case TRACKER_STEROIDS_RC_LARGEROW:
+ /* Skip row size int */
+ iterator_buffer_read_int (iterator);
+ case TRACKER_STEROIDS_RC_ROW:
+ iterator->n_columns = iterator_buffer_read_int (iterator);
+ iterator->offsets = (int *)(iterator->buffer + iterator->buffer_index);
+ iterator->buffer_index += iterator->n_columns * sizeof (int);
+ iterator->data = iterator->buffer + iterator->buffer_index;
+ iterator->buffer_index += iterator->offsets[iterator->n_columns - 1] + 1;
+
+ nextrc = buffer_read_int (iterator->buffer + iterator->buffer_index);
+ iterator->has_next = (nextrc == TRACKER_STEROIDS_RC_ROW || nextrc == TRACKER_STEROIDS_RC_LARGEROW);
+ break;
+ case TRACKER_STEROIDS_RC_DONE:
+ break;
+ default:
+ /* If an error happened, it has been reported by
+ * tracker_resources_sparql_query_iterate */
+ break;
+ }
+#else
+ g_return_if_fail (iterator);
+
+ if (!iterator->results->len) {
+ return;
+ }
+
+ if (iterator->current_row < (gint)iterator->results->len) {
+ iterator->current_row++;
+ }
+#endif
+}
+
+/**
+ * tracker_result_iterator_value:
+ * @iterator: A TrackerResultIterator
+ *
+ * Get a column's value as a string
+ *
+ * Returns: the value of the column as a string. The returned string belongs to
+ * the iterator and should not be freed.
+ *
+ * Since: 0.9
+ **/
+const gchar *
+tracker_result_iterator_value (TrackerResultIterator *iterator,
+ guint column)
+{
+#ifdef HAVE_DBUS_FD_PASSING
+ g_return_val_if_fail (iterator, NULL);
+ g_return_val_if_fail (column < iterator->n_columns, NULL);
+
+ if (column == 0) {
+ return iterator->data;
+ } else {
+ return iterator->data + iterator->offsets[column-1] + 1;
+ }
+#else
+ GStrv row;
+
+ g_return_val_if_fail (iterator, NULL);
+ g_return_val_if_fail (column < tracker_result_iterator_n_columns (iterator), NULL);
+
+ if (!iterator->results->len) {
+ return NULL;
+ }
+
+ g_return_val_if_fail (iterator->current_row < (gint)iterator->results->len, NULL);
+
+ row = g_ptr_array_index (iterator->results, iterator->current_row);
+
+ return row[column];
+#endif
+}
+
+/**
* tracker_resources_sparql_update:
* @client: a #TrackerClient.
* @query: a string representing SPARQL.
@@ -1078,6 +1906,26 @@ tracker_resources_sparql_update (TrackerClient *client,
error);
}
+void
+tracker_resources_sparql_update_fast (TrackerClient *client,
+ const gchar *query,
+ GError **error)
+{
+#ifdef HAVE_DBUS_FD_PASSING
+ DBusMessage *reply;
+
+ reply = sparql_update_fast (client, query, FAST_UPDATE, error);
+
+ if (!reply) {
+ return;
+ }
+
+ dbus_message_unref (reply);
+#else
+ tracker_resources_sparql_update (client, query, error);
+#endif
+}
+
GPtrArray *
tracker_resources_sparql_update_blank (TrackerClient *client,
const gchar *query,
@@ -1101,6 +1949,58 @@ tracker_resources_sparql_update_blank (TrackerClient *client,
return result;
}
+GPtrArray*
+tracker_resources_sparql_update_blank_fast (TrackerClient *client,
+ const gchar *query,
+ GError **error)
+{
+#ifdef HAVE_DBUS_FD_PASSING
+ DBusMessage *reply;
+ DBusMessageIter iter, subiter, subsubiter;
+ GPtrArray *result;
+
+ reply = sparql_update_fast (client, query, FAST_UPDATE_BLANK, error);
+
+ if (!reply) {
+ return NULL;
+ }
+
+ if (g_strcmp0 (dbus_message_get_signature (reply), "aaa{ss}")) {
+ g_set_error (error,
+ TRACKER_CLIENT_ERROR,
+ TRACKER_CLIENT_ERROR_UNSUPPORTED,
+ "Server returned invalid results");
+ dbus_message_unref (reply);
+ return NULL;
+ }
+
+ result = g_ptr_array_new ();
+ dbus_message_iter_init (reply, &iter);
+ dbus_message_iter_recurse (&iter, &subiter);
+
+ while (dbus_message_iter_get_arg_type (&subiter) != DBUS_TYPE_INVALID) {
+ GPtrArray *inner_array;
+
+ inner_array = g_ptr_array_new ();
+ g_ptr_array_add (result, inner_array);
+ dbus_message_iter_recurse (&subiter, &subsubiter);
+
+ while (dbus_message_iter_get_arg_type (&subsubiter) != DBUS_TYPE_INVALID) {
+ g_ptr_array_add (inner_array, unmarshall_hash_table (&subsubiter));
+ dbus_message_iter_next (&subsubiter);
+ }
+
+ dbus_message_iter_next (&subiter);
+ }
+
+ dbus_message_unref (reply);
+
+ return result;
+#else
+ return tracker_resources_sparql_update_blank (client, query, error);
+#endif
+}
+
/**
* tracker_resources_batch_sparql_update:
* @client: a #TrackerClient.
@@ -1131,6 +2031,26 @@ tracker_resources_batch_sparql_update (TrackerClient *client,
error);
}
+void
+tracker_resources_batch_sparql_update_fast (TrackerClient *client,
+ const gchar *query,
+ GError **error)
+{
+#ifdef HAVE_DBUS_FD_PASSING
+ DBusMessage *reply;
+
+ reply = sparql_update_fast (client, query, FAST_UPDATE_BATCH, error);
+
+ if (!reply) {
+ return;
+ }
+
+ dbus_message_unref (reply);
+#else
+ tracker_resources_batch_sparql_update (client, query, error);
+#endif
+}
+
/**
* tracker_resources_batch_commit:
* @client: a #TrackerClient.
@@ -1274,6 +2194,39 @@ tracker_resources_sparql_query_async (TrackerClient *client,
return cb->id;
}
+guint
+tracker_resources_sparql_query_iterate_async (TrackerClient *client,
+ const gchar *query,
+ TrackerReplyIterator callback,
+ gpointer user_data)
+{
+ TrackerClientPrivate *private;
+ FastAsyncData *data;
+ GError *error = NULL;
+
+ g_return_val_if_fail (TRACKER_IS_CLIENT (client), 0);
+ g_return_val_if_fail (query != NULL, 0);
+ g_return_val_if_fail (callback != NULL, 0);
+
+ private = TRACKER_CLIENT_GET_PRIVATE (client);
+
+ data = g_slice_new0 (FastAsyncData);
+ data->operation = FAST_QUERY;
+ data->query = g_strdup (query);
+ data->iterator_callback = callback;
+ data->user_data = user_data;
+
+ g_thread_pool_push (private->thread_pool, data, &error);
+
+ if (error) {
+ g_critical ("Could not create thread: %s", error->message);
+ g_error_free (error);
+ return 0;
+ }
+
+ return 0;
+}
+
/**
* tracker_resources_sparql_update_async:
* @client: a #TrackerClient.
@@ -1320,6 +2273,39 @@ tracker_resources_sparql_update_async (TrackerClient *client,
}
guint
+tracker_resources_sparql_update_fast_async (TrackerClient *client,
+ const gchar *query,
+ TrackerReplyVoid callback,
+ gpointer user_data)
+{
+ TrackerClientPrivate *private;
+ FastAsyncData *data;
+ GError *error = NULL;
+
+ g_return_val_if_fail (TRACKER_IS_CLIENT (client), 0);
+ g_return_val_if_fail (query != NULL, 0);
+ g_return_val_if_fail (callback != NULL, 0);
+
+ private = TRACKER_CLIENT_GET_PRIVATE (client);
+
+ data = g_slice_new0 (FastAsyncData);
+ data->operation = FAST_UPDATE;
+ data->query = g_strdup (query);
+ data->void_callback = callback;
+ data->user_data = user_data;
+
+ g_thread_pool_push (private->thread_pool, data, &error);
+
+ if (error) {
+ g_critical ("Could not create thread: %s", error->message);
+ g_error_free (error);
+ return 0;
+ }
+
+ return 0;
+}
+
+guint
tracker_resources_sparql_update_blank_async (TrackerClient *client,
const gchar *query,
TrackerReplyGPtrArray callback,
@@ -1350,6 +2336,39 @@ tracker_resources_sparql_update_blank_async (TrackerClient *client,
return cb->id;
}
+guint
+tracker_resources_sparql_update_blank_fast_async (TrackerClient *client,
+ const gchar *query,
+ TrackerReplyGPtrArray callback,
+ gpointer user_data)
+{
+ TrackerClientPrivate *private;
+ FastAsyncData *data;
+ GError *error = NULL;
+
+ g_return_val_if_fail (TRACKER_IS_CLIENT (client), 0);
+ g_return_val_if_fail (query != NULL, 0);
+ g_return_val_if_fail (callback != NULL, 0);
+
+ private = TRACKER_CLIENT_GET_PRIVATE (client);
+
+ data = g_slice_new0 (FastAsyncData);
+ data->operation = FAST_UPDATE_BLANK;
+ data->query = g_strdup (query);
+ data->gptrarray_callback = callback;
+ data->user_data = user_data;
+
+ g_thread_pool_push (private->thread_pool, data, &error);
+
+ if (error) {
+ g_critical ("Could not create thread: %s", error->message);
+ g_error_free (error);
+ return 0;
+ }
+
+ return 0;
+}
+
/**
* tracker_resources_batch_sparql_update_async:
* @client: a #TrackerClient.
@@ -1395,6 +2414,39 @@ tracker_resources_batch_sparql_update_async (TrackerClient *client,
return cb->id;
}
+guint
+tracker_resources_batch_sparql_update_fast_async (TrackerClient *client,
+ const gchar *query,
+ TrackerReplyVoid callback,
+ gpointer user_data)
+{
+ TrackerClientPrivate *private;
+ FastAsyncData *data;
+ GError *error = NULL;
+
+ g_return_val_if_fail (TRACKER_IS_CLIENT (client), 0);
+ g_return_val_if_fail (query != NULL, 0);
+ g_return_val_if_fail (callback != NULL, 0);
+
+ private = TRACKER_CLIENT_GET_PRIVATE (client);
+
+ data = g_slice_new0 (FastAsyncData);
+ data->operation = FAST_UPDATE_BLANK;
+ data->query = g_strdup (query);
+ data->void_callback = callback;
+ data->user_data = user_data;
+
+ g_thread_pool_push (private->thread_pool, data, &error);
+
+ if (error) {
+ g_critical ("Could not create thread: %s", error->message);
+ g_error_free (error);
+ return 0;
+ }
+
+ return 0;
+}
+
/**
* tracker_resources_batch_commit_async:
* @client: a #TrackerClient.
diff --git a/src/libtracker-client/tracker.h b/src/libtracker-client/tracker.h
index 0069907..3bb397e 100644
--- a/src/libtracker-client/tracker.h
+++ b/src/libtracker-client/tracker.h
@@ -49,6 +49,8 @@ typedef struct {
GObjectClass parent;
} TrackerClientClass;
+typedef struct TrackerResultIterator TrackerResultIterator;
+
/**
* TrackerClientFlags:
* @TRACKER_CLIENT_ENABLE_WARNINGS: If supplied warnings will be
@@ -59,6 +61,14 @@ typedef enum {
TRACKER_CLIENT_ENABLE_WARNINGS = 1 << 0
} TrackerClientFlags;
+#define TRACKER_CLIENT_ERROR tracker_client_error_quark ()
+#define TRACKER_CLIENT_ERROR_DOMAIN "TrackerClient"
+
+typedef enum {
+ TRACKER_CLIENT_ERROR_UNSUPPORTED,
+ TRACKER_CLIENT_ERROR_BROKEN_PIPE
+} TrackerClientError;
+
/**
* TrackerReplyGPtrArray:
* @result: a #GPtrArray with the results of the query.
@@ -85,6 +95,10 @@ typedef void (*TrackerReplyGPtrArray) (GPtrArray *result,
typedef void (*TrackerReplyVoid) (GError *error,
gpointer user_data);
+typedef void (*TrackerReplyIterator) (TrackerResultIterator *iterator,
+ GError *error,
+ gpointer user_data);
+
/**
* TrackerWritebackCallback:
* @resources: a hash table where each key is the uri of a resources which
@@ -98,6 +112,7 @@ typedef void (*TrackerWritebackCallback) (const GHashTable *resources,
gpointer user_data);
GType tracker_client_get_type (void) G_GNUC_CONST;
+GQuark tracker_client_error_quark (void);
TrackerClient *tracker_client_new (TrackerClientFlags flags,
gint timeout);
@@ -122,15 +137,34 @@ void tracker_resources_load (TrackerClient
GPtrArray * tracker_resources_sparql_query (TrackerClient *client,
const gchar *query,
GError **error);
+TrackerResultIterator *
+ tracker_resources_sparql_query_iterate (TrackerClient *client,
+ const gchar *query,
+ GError **error);
+void tracker_result_iterator_free (TrackerResultIterator *iterator);
+guint tracker_result_iterator_n_columns (TrackerResultIterator *iterator);
+gboolean tracker_result_iterator_has_next (TrackerResultIterator *iterator);
+void tracker_result_iterator_next (TrackerResultIterator *iterator);
+const gchar * tracker_result_iterator_value (TrackerResultIterator *iterator,
+ guint column);
void tracker_resources_sparql_update (TrackerClient *client,
const gchar *query,
GError **error);
+void tracker_resources_sparql_update_fast (TrackerClient *client,
+ const gchar *query,
+ GError **error);
GPtrArray * tracker_resources_sparql_update_blank (TrackerClient *client,
const gchar *query,
GError **error);
+GPtrArray * tracker_resources_sparql_update_blank_fast (TrackerClient *client,
+ const gchar *query,
+ GError **error);
void tracker_resources_batch_sparql_update (TrackerClient *client,
const gchar *query,
GError **error);
+void tracker_resources_batch_sparql_update_fast (TrackerClient *client,
+ const gchar *query,
+ GError **error);
void tracker_resources_batch_commit (TrackerClient *client,
GError **error);
/* Asynchronous API */
@@ -145,18 +179,34 @@ guint tracker_resources_sparql_query_async (TrackerClient
const gchar *query,
TrackerReplyGPtrArray callback,
gpointer user_data);
+guint tracker_resources_sparql_query_iterate_async (TrackerClient *client,
+ const gchar *query,
+ TrackerReplyIterator callback,
+ gpointer user_data);
guint tracker_resources_sparql_update_async (TrackerClient *client,
const gchar *query,
TrackerReplyVoid callback,
gpointer user_data);
+guint tracker_resources_sparql_update_fast_async (TrackerClient *client,
+ const gchar *query,
+ TrackerReplyVoid callback,
+ gpointer user_data);
guint tracker_resources_sparql_update_blank_async (TrackerClient *client,
const gchar *query,
TrackerReplyGPtrArray callback,
gpointer user_data);
+guint tracker_resources_sparql_update_blank_fast_async (TrackerClient *client,
+ const gchar *query,
+ TrackerReplyGPtrArray callback,
+ gpointer user_data);
guint tracker_resources_batch_sparql_update_async (TrackerClient *client,
const gchar *query,
TrackerReplyVoid callback,
gpointer user_data);
+guint tracker_resources_batch_sparql_update_fast_async (TrackerClient *client,
+ const gchar *query,
+ TrackerReplyVoid callback,
+ gpointer user_data);
guint tracker_resources_batch_commit_async (TrackerClient *client,
TrackerReplyVoid callback,
gpointer user_data);
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]