[tracker/wip/carlosg/deserialize-api: 5/17] libtracker-sparql: Implement deserialize API in direct connections




commit 292c75346fcd2128767e077cb916be2b3f389d16
Author: Carlos Garnacho <carlosg gnome org>
Date:   Sun May 8 00:01:30 2022 +0200

    libtracker-sparql: Implement deserialize API in direct connections
    
    Implement this as an update job that uses a TrackerDeserializer underneath
    to load the data.

 src/libtracker-sparql/direct/tracker-direct.c | 121 +++++++++++++++++++++++---
 1 file changed, 108 insertions(+), 13 deletions(-)
---
diff --git a/src/libtracker-sparql/direct/tracker-direct.c b/src/libtracker-sparql/direct/tracker-direct.c
index 66c10410f..7b8b1fab1 100644
--- a/src/libtracker-sparql/direct/tracker-direct.c
+++ b/src/libtracker-sparql/direct/tracker-direct.c
@@ -66,6 +66,12 @@ typedef struct {
        TrackerRdfFormat format;
 } SerializeRdf;
 
+typedef struct {
+       GInputStream *stream;
+       gchar *default_graph;
+       TrackerRdfFormat format;
+} DeserializeRdf;
+
 enum {
        PROP_0,
        PROP_FLAGS,
@@ -84,6 +90,7 @@ typedef enum {
        TASK_TYPE_UPDATE_BATCH,
        TASK_TYPE_RELEASE_MEMORY,
        TASK_TYPE_SERIALIZE,
+       TASK_TYPE_DESERIALIZE,
 } TaskType;
 
 typedef struct {
@@ -192,6 +199,19 @@ error:
        return FALSE;
 }
 
+static TrackerSerializerFormat
+convert_format (TrackerRdfFormat format)
+{
+       switch (format) {
+       case TRACKER_RDF_FORMAT_TURTLE:
+               return TRACKER_SERIALIZER_FORMAT_TTL;
+       case TRACKER_RDF_FORMAT_TRIG:
+               return TRACKER_SERIALIZER_FORMAT_TRIG;
+       default:
+               g_assert_not_reached ();
+       }
+}
+
 static void
 update_thread_func (gpointer data,
                     gpointer user_data)
@@ -229,6 +249,30 @@ update_thread_func (gpointer data,
                update_resource (tracker_data, data->graph, data->resource, &error);
                break;
        }
+       case TASK_TYPE_DESERIALIZE: {
+               DeserializeRdf *data = task_data->data;
+               TrackerSparqlCursor *deserializer;
+
+               tracker_data_begin_transaction (tracker_data, &error);
+               if (error)
+                       break;
+
+               deserializer = tracker_deserializer_new (data->stream,
+                                                        priv->namespace_manager,
+                                                        convert_format (data->format));
+
+               if (tracker_data_load_from_deserializer (tracker_data,
+                                                        TRACKER_DESERIALIZER (deserializer),
+                                                        data->default_graph,
+                                                        "<stream>",
+                                                        &error)) {
+                       tracker_data_commit_transaction (tracker_data, &error);
+               } else {
+                       tracker_data_rollback_transaction (tracker_data);
+               }
+               g_object_unref (deserializer);
+               break;
+       }
        case TASK_TYPE_UPDATE_BATCH:
                tracker_direct_batch_update (task_data->data, priv->data_manager, &error);
                break;
@@ -270,19 +314,6 @@ execute_query_in_thread (GTask    *task,
                g_task_return_error (task, error);
 }
 
-static TrackerSerializerFormat
-convert_format (TrackerRdfFormat format)
-{
-       switch (format) {
-       case TRACKER_RDF_FORMAT_TURTLE:
-               return TRACKER_SERIALIZER_FORMAT_TTL;
-       case TRACKER_RDF_FORMAT_TRIG:
-               return TRACKER_SERIALIZER_FORMAT_TRIG;
-       default:
-               g_assert_not_reached ();
-       }
-}
-
 static void
 serialize_in_thread (GTask    *task,
                      TaskData *task_data)
@@ -1365,6 +1396,68 @@ tracker_direct_connection_serialize_finish (TrackerSparqlConnection  *connection
        return g_task_propagate_pointer (G_TASK (res), error);
 }
 
+static DeserializeRdf *
+deserialize_rdf_data_new (GInputStream     *stream,
+                          const gchar      *default_graph,
+                          TrackerRdfFormat  format)
+{
+       DeserializeRdf *data;
+
+       data = g_new0 (DeserializeRdf, 1);
+       data->stream = g_object_ref (stream);
+       data->default_graph = g_strdup (default_graph);
+       data->format = format;
+
+       return data;
+}
+
+static void
+deserialize_rdf_data_free (gpointer user_data)
+{
+       DeserializeRdf *data = user_data;
+
+       g_object_unref (data->stream);
+       g_free (data->default_graph);
+       g_free (data);
+}
+
+static void
+tracker_direct_connection_deserialize_async (TrackerSparqlConnection *self,
+                                             TrackerDeserializeFlags  flags,
+                                             TrackerRdfFormat         format,
+                                             const gchar             *default_graph,
+                                             GInputStream            *stream,
+                                             GCancellable            *cancellable,
+                                             GAsyncReadyCallback      callback,
+                                             gpointer                 user_data)
+{
+       TrackerDirectConnectionPrivate *priv;
+       TrackerDirectConnection *conn;
+       GTask *task;
+
+       conn = TRACKER_DIRECT_CONNECTION (self);
+       priv = tracker_direct_connection_get_instance_private (conn);
+
+       task = g_task_new (self, cancellable, callback, user_data);
+       g_task_set_task_data (task,
+                             task_data_query_new (TASK_TYPE_DESERIALIZE,
+                                                  deserialize_rdf_data_new (stream,
+                                                                            default_graph,
+                                                                            format),
+                                                  deserialize_rdf_data_free),
+                             (GDestroyNotify) task_data_free);
+
+       g_thread_pool_push (priv->update_thread, task, NULL);
+}
+
+static gboolean
+tracker_direct_connection_deserialize_finish (TrackerSparqlConnection  *connection,
+                                              GAsyncResult             *res,
+                                              GError                  **error)
+{
+       return g_task_propagate_boolean (G_TASK (res), error);
+}
+
 static void
 tracker_direct_connection_map_connection (TrackerSparqlConnection *connection,
                                          const gchar             *handle_name,
@@ -1418,6 +1511,8 @@ tracker_direct_connection_class_init (TrackerDirectConnectionClass *klass)
        sparql_connection_class->lookup_dbus_service = tracker_direct_connection_lookup_dbus_service;
        sparql_connection_class->serialize_async = tracker_direct_connection_serialize_async;
        sparql_connection_class->serialize_finish = tracker_direct_connection_serialize_finish;
+       sparql_connection_class->deserialize_async = tracker_direct_connection_deserialize_async;
+       sparql_connection_class->deserialize_finish = tracker_direct_connection_deserialize_finish;
        sparql_connection_class->map_connection = tracker_direct_connection_map_connection;
 
        props[PROP_FLAGS] =


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]