[tracker/wip/carlosg/serialize-api: 14/26] libtracker-sparql: Implement serialize_async/finish on DBus connection




commit 3772433ed7682c90e7ef3c878c5ec73c68012241
Author: Carlos Garnacho <carlosg gnome org>
Date:   Sun Nov 21 23:05:07 2021 +0100

    libtracker-sparql: Implement serialize_async/finish on DBus connection
    
    This new DBus call simply pipes a FD, so that the endpoint can write
    RDF to it, and the other end receive it.

 src/libtracker-sparql/bus/tracker-bus.vala    | 16 +++++
 src/libtracker-sparql/tracker-endpoint-dbus.c | 95 +++++++++++++++++++++++++++
 src/libtracker-sparql/tracker-sparql.vapi     |  8 +++
 3 files changed, 119 insertions(+)
---
diff --git a/src/libtracker-sparql/bus/tracker-bus.vala b/src/libtracker-sparql/bus/tracker-bus.vala
index f61c6226f..05db6b7a9 100644
--- a/src/libtracker-sparql/bus/tracker-bus.vala
+++ b/src/libtracker-sparql/bus/tracker-bus.vala
@@ -391,4 +391,20 @@ public class Tracker.Bus.Connection : Tracker.Sparql.Connection {
                                                        null);
                return batch;
        }
+
+       public async override GLib.InputStream serialize_async (RdfFormat format, string sparql, 
GLib.Cancellable? cancellable = null) throws Sparql.Error, GLib.Error, GLib.IOError, GLib.DBusError {
+               UnixInputStream input;
+               UnixOutputStream output;
+               pipe (out input, out output);
+
+               var message = new DBusMessage.method_call (dbus_name, object_path, ENDPOINT_IFACE, 
"Serialize");
+               var fd_list = new UnixFDList ();
+               message.set_body (new Variant ("(shia{sv})", sparql, fd_list.append (output.fd), format, 
null));
+               message.set_unix_fd_list (fd_list);
+
+               var reply = yield bus.send_message_with_reply (message, DBusSendMessageFlags.NONE, int.MAX, 
null, cancellable);
+               handle_error_reply (reply);
+
+               return input;
+       }
 }
diff --git a/src/libtracker-sparql/tracker-endpoint-dbus.c b/src/libtracker-sparql/tracker-endpoint-dbus.c
index d022c6f04..ea654aaec 100644
--- a/src/libtracker-sparql/tracker-endpoint-dbus.c
+++ b/src/libtracker-sparql/tracker-endpoint-dbus.c
@@ -60,6 +60,12 @@ static const gchar introspection_xml[] =
        "      <arg type='a{sv}' name='arguments' direction='in' />"
        "      <arg type='as' name='result' direction='out' />"
        "    </method>"
+       "    <method name='Serialize'>"
+       "      <arg type='s' name='query' direction='in' />"
+       "      <arg type='h' name='output_stream' direction='in' />"
+       "      <arg type='i' name='format' direction='in' />"
+       "      <arg type='a{sv}' name='arguments' direction='in' />"
+       "    </method>"
        "    <method name='Update'>"
        "      <arg type='h' name='input_stream' direction='in' />"
        "    </method>"
@@ -506,6 +512,55 @@ stmt_execute_cb (GObject      *object,
        g_object_unref (task);
 }
 
+static void
+splice_rdf_cb (GObject      *object,
+               GAsyncResult *res,
+               gpointer      user_data)
+{
+       QueryRequest *request = user_data;
+       GError *error = NULL;
+
+       g_output_stream_splice_finish (G_OUTPUT_STREAM (object),
+                                      res, &error);
+
+       if (error) {
+               /* The query request method invocations has been already replied */
+               g_warning ("Error splicing RDF data: %s", error->message);
+               g_error_free (error);
+       }
+
+       query_request_free (request);
+}
+
+static void
+serialize_cb (GObject      *object,
+              GAsyncResult *res,
+              gpointer      user_data)
+{
+       QueryRequest *request = user_data;
+       GInputStream *istream;
+       GError *error = NULL;
+
+       istream = tracker_sparql_connection_serialize_finish (TRACKER_SPARQL_CONNECTION (object),
+                                                             res, &error);
+       if (!istream) {
+               g_dbus_method_invocation_return_gerror (request->invocation, error);
+               g_error_free (error);
+               query_request_free (request);
+               return;
+       }
+
+       g_dbus_method_invocation_return_value (request->invocation, NULL);
+       g_output_stream_splice_async (G_OUTPUT_STREAM (request->data_stream),
+                                     istream,
+                                     G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE |
+                                     G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET,
+                                     G_PRIORITY_DEFAULT,
+                                     request->global_cancellable,
+                                     splice_rdf_cb,
+                                     request);
+}
+
 static void
 update_cb (GObject      *object,
            GAsyncResult *res,
@@ -721,6 +776,46 @@ endpoint_dbus_iface_method_call (GDBusConnection       *connection,
                }
 
                g_variant_iter_free (arguments);
+               g_free (query);
+       } else if (g_strcmp0 (method_name, "Serialize") == 0) {
+               TrackerRdfFormat format;
+
+               if (tracker_endpoint_dbus_forbid_operation (endpoint_dbus,
+                                                           invocation,
+                                                           TRACKER_OPERATION_TYPE_SELECT)) {
+                       g_dbus_method_invocation_return_error (invocation,
+                                                              G_DBUS_ERROR,
+                                                              G_DBUS_ERROR_ACCESS_DENIED,
+                                                              "Operation not allowed");
+                       return;
+               }
+
+               /* FIXME: arguments is currently unused */
+               g_variant_get (parameters, "(shia{sv})", &query, &handle, &format, &arguments);
+
+               if (fd_list)
+                       fd = g_unix_fd_list_get (fd_list, handle, &error);
+
+               if (fd < 0) {
+                       g_dbus_method_invocation_return_error (invocation,
+                                                              G_DBUS_ERROR,
+                                                              G_DBUS_ERROR_INVALID_ARGS,
+                                                              "Did not get a file descriptor");
+               } else {
+                       QueryRequest *request;
+
+                       query = tracker_endpoint_dbus_add_prologue (endpoint_dbus,
+                                                                   query);
+
+                       request = query_request_new (endpoint_dbus, invocation, fd);
+                       tracker_sparql_connection_serialize_async (conn,
+                                                                  format,
+                                                                  query,
+                                                                  request->cancellable,
+                                                                  serialize_cb,
+                                                                  request);
+               }
+
                g_free (query);
        } else if (g_strcmp0 (method_name, "Update") == 0 ||
                   g_strcmp0 (method_name, "UpdateArray") == 0) {
diff --git a/src/libtracker-sparql/tracker-sparql.vapi b/src/libtracker-sparql/tracker-sparql.vapi
index dec936737..79252618a 100644
--- a/src/libtracker-sparql/tracker-sparql.vapi
+++ b/src/libtracker-sparql/tracker-sparql.vapi
@@ -53,6 +53,12 @@ namespace Tracker {
                BOOLEAN
        }
 
+       [CCode (cheader_filename = "libtracker-sparql/tracker-connection.h")]
+       public enum RdfFormat {
+               TURTLE,
+               TRIG,
+       }
+
        namespace Sparql {
                [CCode (cheader_filename = "libtracker-sparql/tracker-sparql.h")]
                public static string escape_string (string literal);
@@ -101,6 +107,8 @@ namespace Tracker {
                 public virtual Notifier? create_notifier ();
                 public virtual void close ();
                 public async virtual bool close_async () throws GLib.IOError;
+
+                public async virtual GLib.InputStream serialize_async (RdfFormat format, string sparql, 
GLib.Cancellable? cancellable = null) throws Sparql.Error, GLib.Error, GLib.IOError, GLib.DBusError;
        }
 
        [CCode (cheader_filename = "libtracker-sparql/tracker-sparql.h")]


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]