[tracker/wip/carlosg/deserialize-api: 5/17] libtracker-sparql: Implement deserialize API in direct connections
- From: Carlos Garnacho <carlosg src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [tracker/wip/carlosg/deserialize-api: 5/17] libtracker-sparql: Implement deserialize API in direct connections
- Date: Thu, 30 Jun 2022 21:57:56 +0000 (UTC)
commit 292c75346fcd2128767e077cb916be2b3f389d16
Author: Carlos Garnacho <carlosg gnome org>
Date: Sun May 8 00:01:30 2022 +0200
libtracker-sparql: Implement deserialize API in direct connections
Implement this as an update job that uses a TrackerDeserializer underneath
to load the data.
src/libtracker-sparql/direct/tracker-direct.c | 121 +++++++++++++++++++++++---
1 file changed, 108 insertions(+), 13 deletions(-)
---
diff --git a/src/libtracker-sparql/direct/tracker-direct.c b/src/libtracker-sparql/direct/tracker-direct.c
index 66c10410f..7b8b1fab1 100644
--- a/src/libtracker-sparql/direct/tracker-direct.c
+++ b/src/libtracker-sparql/direct/tracker-direct.c
@@ -66,6 +66,12 @@ typedef struct {
TrackerRdfFormat format;
} SerializeRdf;
+typedef struct {
+ GInputStream *stream;
+ gchar *default_graph;
+ TrackerRdfFormat format;
+} DeserializeRdf;
+
enum {
PROP_0,
PROP_FLAGS,
@@ -84,6 +90,7 @@ typedef enum {
TASK_TYPE_UPDATE_BATCH,
TASK_TYPE_RELEASE_MEMORY,
TASK_TYPE_SERIALIZE,
+ TASK_TYPE_DESERIALIZE,
} TaskType;
typedef struct {
@@ -192,6 +199,19 @@ error:
return FALSE;
}
+static TrackerSerializerFormat
+convert_format (TrackerRdfFormat format)
+{
+ switch (format) {
+ case TRACKER_RDF_FORMAT_TURTLE:
+ return TRACKER_SERIALIZER_FORMAT_TTL;
+ case TRACKER_RDF_FORMAT_TRIG:
+ return TRACKER_SERIALIZER_FORMAT_TRIG;
+ default:
+ g_assert_not_reached ();
+ }
+}
+
static void
update_thread_func (gpointer data,
gpointer user_data)
@@ -229,6 +249,30 @@ update_thread_func (gpointer data,
update_resource (tracker_data, data->graph, data->resource, &error);
break;
}
+ case TASK_TYPE_DESERIALIZE: {
+ DeserializeRdf *data = task_data->data;
+ TrackerSparqlCursor *deserializer;
+
+ tracker_data_begin_transaction (tracker_data, &error);
+ if (error)
+ break;
+
+ deserializer = tracker_deserializer_new (data->stream,
+ priv->namespace_manager,
+ convert_format (data->format));
+
+ if (tracker_data_load_from_deserializer (tracker_data,
+ TRACKER_DESERIALIZER (deserializer),
+ data->default_graph,
+ "<stream>",
+ &error)) {
+ tracker_data_commit_transaction (tracker_data, &error);
+ } else {
+ tracker_data_rollback_transaction (tracker_data);
+ }
+ g_object_unref (deserializer);
+ break;
+ }
case TASK_TYPE_UPDATE_BATCH:
tracker_direct_batch_update (task_data->data, priv->data_manager, &error);
break;
@@ -270,19 +314,6 @@ execute_query_in_thread (GTask *task,
g_task_return_error (task, error);
}
-static TrackerSerializerFormat
-convert_format (TrackerRdfFormat format)
-{
- switch (format) {
- case TRACKER_RDF_FORMAT_TURTLE:
- return TRACKER_SERIALIZER_FORMAT_TTL;
- case TRACKER_RDF_FORMAT_TRIG:
- return TRACKER_SERIALIZER_FORMAT_TRIG;
- default:
- g_assert_not_reached ();
- }
-}
-
static void
serialize_in_thread (GTask *task,
TaskData *task_data)
@@ -1365,6 +1396,68 @@ tracker_direct_connection_serialize_finish (TrackerSparqlConnection *connection
return g_task_propagate_pointer (G_TASK (res), error);
}
+static DeserializeRdf *
+deserialize_rdf_data_new (GInputStream *stream,
+ const gchar *default_graph,
+ TrackerRdfFormat format)
+{
+ DeserializeRdf *data;
+
+ data = g_new0 (DeserializeRdf, 1);
+ data->stream = g_object_ref (stream);
+ data->default_graph = g_strdup (default_graph);
+ data->format = format;
+
+ return data;
+}
+
+static void
+deserialize_rdf_data_free (gpointer user_data)
+{
+ DeserializeRdf *data = user_data;
+
+ g_object_unref (data->stream);
+ g_free (data->default_graph);
+ g_free (data);
+}
+
+static void
+tracker_direct_connection_deserialize_async (TrackerSparqlConnection *self,
+ TrackerDeserializeFlags flags,
+ TrackerRdfFormat format,
+ const gchar *default_graph,
+ GInputStream *stream,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer user_data)
+{
+ TrackerDirectConnectionPrivate *priv;
+ TrackerDirectConnection *conn;
+ GTask *task;
+
+ conn = TRACKER_DIRECT_CONNECTION (self);
+ priv = tracker_direct_connection_get_instance_private (conn);
+
+ task = g_task_new (self, cancellable, callback, user_data);
+ g_task_set_task_data (task,
+ task_data_query_new (TASK_TYPE_DESERIALIZE,
+ deserialize_rdf_data_new (stream,
+ default_graph,
+ format),
+ deserialize_rdf_data_free),
+ (GDestroyNotify) task_data_free);
+
+ g_thread_pool_push (priv->update_thread, task, NULL);
+}
+
+static gboolean
+tracker_direct_connection_deserialize_finish (TrackerSparqlConnection *connection,
+ GAsyncResult *res,
+ GError **error)
+{
+ return g_task_propagate_boolean (G_TASK (res), error);
+}
+
static void
tracker_direct_connection_map_connection (TrackerSparqlConnection *connection,
const gchar *handle_name,
@@ -1418,6 +1511,8 @@ tracker_direct_connection_class_init (TrackerDirectConnectionClass *klass)
sparql_connection_class->lookup_dbus_service = tracker_direct_connection_lookup_dbus_service;
sparql_connection_class->serialize_async = tracker_direct_connection_serialize_async;
sparql_connection_class->serialize_finish = tracker_direct_connection_serialize_finish;
+ sparql_connection_class->deserialize_async = tracker_direct_connection_deserialize_async;
+ sparql_connection_class->deserialize_finish = tracker_direct_connection_deserialize_finish;
sparql_connection_class->map_connection = tracker_direct_connection_map_connection;
props[PROP_FLAGS] =
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]