[tracker/wip/carlosg/serialize-api: 11/22] libtracker-sparql: Implement serialize_async/finish on DBus connection
- From: Carlos Garnacho <carlosg src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [tracker/wip/carlosg/serialize-api: 11/22] libtracker-sparql: Implement serialize_async/finish on DBus connection
- Date: Sun, 28 Nov 2021 13:04:33 +0000 (UTC)
commit b8db8898f48f0abd8a84837bd5f2b4df9f084bf9
Author: Carlos Garnacho <carlosg gnome org>
Date: Sun Nov 21 23:05:07 2021 +0100
libtracker-sparql: Implement serialize_async/finish on DBus connection
This new DBus call simply pipes a FD, so that the endpoint can write
RDF to it, and the other end receive it.
src/libtracker-sparql/bus/tracker-bus.vala | 16 +++++
src/libtracker-sparql/tracker-endpoint-dbus.c | 95 +++++++++++++++++++++++++++
src/libtracker-sparql/tracker-sparql.vapi | 8 +++
3 files changed, 119 insertions(+)
---
diff --git a/src/libtracker-sparql/bus/tracker-bus.vala b/src/libtracker-sparql/bus/tracker-bus.vala
index f61c6226f..05db6b7a9 100644
--- a/src/libtracker-sparql/bus/tracker-bus.vala
+++ b/src/libtracker-sparql/bus/tracker-bus.vala
@@ -391,4 +391,20 @@ public class Tracker.Bus.Connection : Tracker.Sparql.Connection {
null);
return batch;
}
+
+ public async override GLib.InputStream serialize_async (RdfFormat format, string sparql,
GLib.Cancellable? cancellable = null) throws Sparql.Error, GLib.Error, GLib.IOError, GLib.DBusError {
+ UnixInputStream input;
+ UnixOutputStream output;
+ pipe (out input, out output);
+
+ var message = new DBusMessage.method_call (dbus_name, object_path, ENDPOINT_IFACE,
"Serialize");
+ var fd_list = new UnixFDList ();
+ message.set_body (new Variant ("(shia{sv})", sparql, fd_list.append (output.fd), format,
null));
+ message.set_unix_fd_list (fd_list);
+
+ var reply = yield bus.send_message_with_reply (message, DBusSendMessageFlags.NONE, int.MAX,
null, cancellable);
+ handle_error_reply (reply);
+
+ return input;
+ }
}
diff --git a/src/libtracker-sparql/tracker-endpoint-dbus.c b/src/libtracker-sparql/tracker-endpoint-dbus.c
index d022c6f04..ea654aaec 100644
--- a/src/libtracker-sparql/tracker-endpoint-dbus.c
+++ b/src/libtracker-sparql/tracker-endpoint-dbus.c
@@ -60,6 +60,12 @@ static const gchar introspection_xml[] =
" <arg type='a{sv}' name='arguments' direction='in' />"
" <arg type='as' name='result' direction='out' />"
" </method>"
+ " <method name='Serialize'>"
+ " <arg type='s' name='query' direction='in' />"
+ " <arg type='h' name='output_stream' direction='in' />"
+ " <arg type='i' name='format' direction='in' />"
+ " <arg type='a{sv}' name='arguments' direction='in' />"
+ " </method>"
" <method name='Update'>"
" <arg type='h' name='input_stream' direction='in' />"
" </method>"
@@ -506,6 +512,55 @@ stmt_execute_cb (GObject *object,
g_object_unref (task);
}
+static void
+splice_rdf_cb (GObject *object,
+ GAsyncResult *res,
+ gpointer user_data)
+{
+ QueryRequest *request = user_data;
+ GError *error = NULL;
+
+ g_output_stream_splice_finish (G_OUTPUT_STREAM (object),
+ res, &error);
+
+ if (error) {
+ /* The query request method invocations has been already replied */
+ g_warning ("Error splicing RDF data: %s", error->message);
+ g_error_free (error);
+ }
+
+ query_request_free (request);
+}
+
+static void
+serialize_cb (GObject *object,
+ GAsyncResult *res,
+ gpointer user_data)
+{
+ QueryRequest *request = user_data;
+ GInputStream *istream;
+ GError *error = NULL;
+
+ istream = tracker_sparql_connection_serialize_finish (TRACKER_SPARQL_CONNECTION (object),
+ res, &error);
+ if (!istream) {
+ g_dbus_method_invocation_return_gerror (request->invocation, error);
+ g_error_free (error);
+ query_request_free (request);
+ return;
+ }
+
+ g_dbus_method_invocation_return_value (request->invocation, NULL);
+ g_output_stream_splice_async (G_OUTPUT_STREAM (request->data_stream),
+ istream,
+ G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE |
+ G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET,
+ G_PRIORITY_DEFAULT,
+ request->global_cancellable,
+ splice_rdf_cb,
+ request);
+}
+
static void
update_cb (GObject *object,
GAsyncResult *res,
@@ -721,6 +776,46 @@ endpoint_dbus_iface_method_call (GDBusConnection *connection,
}
g_variant_iter_free (arguments);
+ g_free (query);
+ } else if (g_strcmp0 (method_name, "Serialize") == 0) {
+ TrackerRdfFormat format;
+
+ if (tracker_endpoint_dbus_forbid_operation (endpoint_dbus,
+ invocation,
+ TRACKER_OPERATION_TYPE_SELECT)) {
+ g_dbus_method_invocation_return_error (invocation,
+ G_DBUS_ERROR,
+ G_DBUS_ERROR_ACCESS_DENIED,
+ "Operation not allowed");
+ return;
+ }
+
+ /* FIXME: arguments is currently unused */
+ g_variant_get (parameters, "(shia{sv})", &query, &handle, &format, &arguments);
+
+ if (fd_list)
+ fd = g_unix_fd_list_get (fd_list, handle, &error);
+
+ if (fd < 0) {
+ g_dbus_method_invocation_return_error (invocation,
+ G_DBUS_ERROR,
+ G_DBUS_ERROR_INVALID_ARGS,
+ "Did not get a file descriptor");
+ } else {
+ QueryRequest *request;
+
+ query = tracker_endpoint_dbus_add_prologue (endpoint_dbus,
+ query);
+
+ request = query_request_new (endpoint_dbus, invocation, fd);
+ tracker_sparql_connection_serialize_async (conn,
+ format,
+ query,
+ request->cancellable,
+ serialize_cb,
+ request);
+ }
+
g_free (query);
} else if (g_strcmp0 (method_name, "Update") == 0 ||
g_strcmp0 (method_name, "UpdateArray") == 0) {
diff --git a/src/libtracker-sparql/tracker-sparql.vapi b/src/libtracker-sparql/tracker-sparql.vapi
index dec936737..79252618a 100644
--- a/src/libtracker-sparql/tracker-sparql.vapi
+++ b/src/libtracker-sparql/tracker-sparql.vapi
@@ -53,6 +53,12 @@ namespace Tracker {
BOOLEAN
}
+ [CCode (cheader_filename = "libtracker-sparql/tracker-connection.h")]
+ public enum RdfFormat {
+ TURTLE,
+ TRIG,
+ }
+
namespace Sparql {
[CCode (cheader_filename = "libtracker-sparql/tracker-sparql.h")]
public static string escape_string (string literal);
@@ -101,6 +107,8 @@ namespace Tracker {
public virtual Notifier? create_notifier ();
public virtual void close ();
public async virtual bool close_async () throws GLib.IOError;
+
+ public async virtual GLib.InputStream serialize_async (RdfFormat format, string sparql,
GLib.Cancellable? cancellable = null) throws Sparql.Error, GLib.Error, GLib.IOError, GLib.DBusError;
}
[CCode (cheader_filename = "libtracker-sparql/tracker-sparql.h")]
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]