[tracker/wip/carlosg/serialize-api: 36/53] libtracker-sparql: Implement serialize_async/finish in direct connection




commit dbc19d2e2b1f06d28bf9a292b51964f8c5055fe4
Author: Carlos Garnacho <carlosg gnome org>
Date:   Sun Nov 21 00:02:06 2021 +0100

    libtracker-sparql: Implement serialize_async/finish in direct connection
    
    This allows serialization of RDF into GInputStream, currently only TTL
    format.

 src/libtracker-sparql/direct/tracker-direct.c | 163 ++++++++++++++++++++++++--
 1 file changed, 151 insertions(+), 12 deletions(-)
---
diff --git a/src/libtracker-sparql/direct/tracker-direct.c b/src/libtracker-sparql/direct/tracker-direct.c
index a4a69d44c..649da93eb 100644
--- a/src/libtracker-sparql/direct/tracker-direct.c
+++ b/src/libtracker-sparql/direct/tracker-direct.c
@@ -29,6 +29,7 @@
 #include <libtracker-data/tracker-sparql.h>
 #include <libtracker-sparql/tracker-notifier-private.h>
 #include <libtracker-sparql/tracker-private.h>
+#include <libtracker-sparql/tracker-serializer.h>
 
 typedef struct _TrackerDirectConnectionPrivate TrackerDirectConnectionPrivate;
 
@@ -61,6 +62,11 @@ typedef struct {
        TrackerResource *resource;
 } UpdateResource;
 
+typedef struct {
+       gchar *query;
+       TrackerRdfFormat format;
+} SerializeRdf;
+
 enum {
        PROP_0,
        PROP_FLAGS,
@@ -78,6 +84,7 @@ typedef enum {
        TASK_TYPE_UPDATE_RESOURCE,
        TASK_TYPE_UPDATE_BATCH,
        TASK_TYPE_RELEASE_MEMORY,
+       TASK_TYPE_SERIALIZE,
 } TaskType;
 
 typedef struct {
@@ -208,6 +215,7 @@ update_thread_func (gpointer data,
 
        switch (task_data->type) {
        case TASK_TYPE_QUERY:
+       case TASK_TYPE_SERIALIZE:
                g_warning ("Queries don't go through this thread");
                break;
        case TASK_TYPE_UPDATE:
@@ -246,18 +254,86 @@ update_thread_func (gpointer data,
        g_mutex_unlock (&priv->mutex);
 }
 
+static void
+execute_query_in_thread (GTask    *task,
+                         TaskData *task_data)
+{
+       TrackerSparqlCursor *cursor;
+       GError *error = NULL;
+
+       cursor = tracker_sparql_connection_query (TRACKER_SPARQL_CONNECTION (g_task_get_source_object (task)),
+                                                 task_data->data,
+                                                 g_task_get_cancellable (task),
+                                                 &error);
+       if (cursor)
+               g_task_return_pointer (task, cursor, g_object_unref);
+       else
+               g_task_return_error (task, error);
+}
+
+static TrackerSerializerFormat
+convert_format (TrackerRdfFormat format)
+{
+       switch (format) {
+       case TRACKER_RDF_FORMAT_TURTLE:
+               return TRACKER_SERIALIZER_FORMAT_TTL;
+       default:
+               g_assert_not_reached ();
+       }
+}
+
+static void
+serialize_in_thread (GTask    *task,
+                     TaskData *task_data)
+{
+       TrackerDirectConnectionPrivate *priv;
+       TrackerDirectConnection *conn;
+       TrackerSparql *query = NULL;
+       TrackerSparqlCursor *cursor = NULL;
+       GInputStream *istream = NULL;
+       SerializeRdf *data = task_data->data;
+       GError *error = NULL;
+
+       conn = g_task_get_source_object (task);
+       priv = tracker_direct_connection_get_instance_private (conn);
+
+       g_mutex_lock (&priv->mutex);
+       query = tracker_sparql_new (priv->data_manager, data->query);
+       if (!tracker_sparql_is_serializable (query)) {
+               g_set_error (&error,
+                            TRACKER_SPARQL_ERROR,
+                            TRACKER_SPARQL_ERROR_PARSE,
+                            "Query is not DESCRIBE or CONSTRUCT");
+               goto out;
+       }
+
+       cursor = tracker_sparql_execute_cursor (query, NULL, &error);
+       tracker_direct_connection_update_timestamp (conn);
+       if (!cursor)
+               goto out;
+
+       tracker_sparql_cursor_set_connection (cursor, TRACKER_SPARQL_CONNECTION (conn));
+       istream = tracker_serializer_new (cursor, convert_format (data->format));
+
+ out:
+       g_clear_object (&query);
+       g_clear_object (&cursor);
+       g_mutex_unlock (&priv->mutex);
+
+       if (istream)
+               g_task_return_pointer (task, istream, g_object_unref);
+       else
+               g_task_return_error (task, error);
+}
+
 static void
 query_thread_pool_func (gpointer data,
                         gpointer user_data)
 {
        TrackerDirectConnection *conn = user_data;
        TrackerDirectConnectionPrivate *priv;
-       TrackerSparqlCursor *cursor;
        GTask *task = data;
        TaskData *task_data = g_task_get_task_data (task);
-       GError *error = NULL;
-
-       g_assert (task_data->type == TASK_TYPE_QUERY);
 
        priv = tracker_direct_connection_get_instance_private (conn);
 
@@ -270,14 +346,16 @@ query_thread_pool_func (gpointer data,
                return;
        }
 
-       cursor = tracker_sparql_connection_query (TRACKER_SPARQL_CONNECTION (g_task_get_source_object (task)),
-                                                 task_data->data,
-                                                 g_task_get_cancellable (task),
-                                                 &error);
-       if (cursor)
-               g_task_return_pointer (task, cursor, g_object_unref);
-       else
-               g_task_return_error (task, error);
+       switch (task_data->type) {
+       case TASK_TYPE_QUERY:
+               execute_query_in_thread (task, task_data);
+               break;
+       case TASK_TYPE_SERIALIZE:
+               serialize_in_thread (task, task_data);
+               break;
+       default:
+               g_assert_not_reached ();
+       }
 
        g_object_unref (task);
 }
@@ -1227,6 +1305,65 @@ tracker_direct_connection_lookup_dbus_service (TrackerSparqlConnection  *connect
        return TRUE;
 }
 
+static SerializeRdf *
+serialize_rdf_data_new (const gchar      *query,
+                        TrackerRdfFormat  format)
+{
+       SerializeRdf *data;
+
+       data = g_new0 (SerializeRdf, 1);
+       data->query = g_strdup (query);
+       data->format = format;
+
+       return data;
+}
+
+static void
+serialize_rdf_data_free (gpointer user_data)
+{
+       SerializeRdf *data = user_data;
+
+       g_free (data->query);
+       g_free (data);
+}
+
+static void
+tracker_direct_connection_serialize_async (TrackerSparqlConnection  *self,
+                                           TrackerRdfFormat          format,
+                                           const gchar              *query,
+                                           GCancellable             *cancellable,
+                                           GAsyncReadyCallback      callback,
+                                           gpointer                 user_data)
+{
+       TrackerDirectConnectionPrivate *priv;
+       TrackerDirectConnection *conn;
+       GError *error = NULL;
+       GTask *task;
+
+       conn = TRACKER_DIRECT_CONNECTION (self);
+       priv = tracker_direct_connection_get_instance_private (conn);
+
+       task = g_task_new (self, cancellable, callback, user_data);
+       g_task_set_task_data (task,
+                             task_data_query_new (TASK_TYPE_SERIALIZE,
+                                                  serialize_rdf_data_new (query, format),
+                                                  serialize_rdf_data_free),
+                             (GDestroyNotify) task_data_free);
+
+       if (!g_thread_pool_push (priv->select_pool, task, &error)) {
+               g_task_return_error (task, _translate_internal_error (error));
+               g_object_unref (task);
+       }
+}
+
+static GInputStream *
+tracker_direct_connection_serialize_finish (TrackerSparqlConnection  *connection,
+                                            GAsyncResult             *res,
+                                            GError                  **error)
+{
+       return g_task_propagate_pointer (G_TASK (res), error);
+}
+
 static void
 tracker_direct_connection_class_init (TrackerDirectConnectionClass *klass)
 {
@@ -1262,6 +1399,8 @@ tracker_direct_connection_class_init (TrackerDirectConnectionClass *klass)
        sparql_connection_class->update_resource_finish = tracker_direct_connection_update_resource_finish;
        sparql_connection_class->create_batch = tracker_direct_connection_create_batch;
        sparql_connection_class->lookup_dbus_service = tracker_direct_connection_lookup_dbus_service;
+       sparql_connection_class->serialize_async = tracker_direct_connection_serialize_async;
+       sparql_connection_class->serialize_finish = tracker_direct_connection_serialize_finish;
 
        props[PROP_FLAGS] =
                g_param_spec_flags ("flags",


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]