[tracker/wip/carlosg/bus-query-cancellation] libtracker-sparql: Handle query cancellation on TrackerEndpointDBus




commit 5883cebf8d7b2948bcb808162e78a4629ab3315c
Author: Carlos Garnacho <carlosg gnome org>
Date:   Mon Nov 8 13:27:19 2021 +0100

    libtracker-sparql: Handle query cancellation on TrackerEndpointDBus
    
    Since the cursor contents are handed via a pipe FD, detect the situation
    where the other end of the pipe gets closed (say, client died or cancelled
    the operation) and cancel the DBus service side operation as well.
    
    This avoids these operations from finishing to completion if they are
    going nowhere. Now that each pending query request gets its own cancellable,
    ensure these are also cancelled on TrackerEndpointDBus finalization.
    
    Fixes: https://gitlab.gnome.org/GNOME/tracker/-/issues/264

 src/libtracker-sparql/tracker-endpoint-dbus.c | 68 +++++++++++++++++++++++----
 1 file changed, 60 insertions(+), 8 deletions(-)
---
diff --git a/src/libtracker-sparql/tracker-endpoint-dbus.c b/src/libtracker-sparql/tracker-endpoint-dbus.c
index 527e3f33a..d022c6f04 100644
--- a/src/libtracker-sparql/tracker-endpoint-dbus.c
+++ b/src/libtracker-sparql/tracker-endpoint-dbus.c
@@ -49,6 +49,7 @@
 #include <gio/gunixinputstream.h>
 #include <gio/gunixoutputstream.h>
 #include <gio/gunixfdlist.h>
+#include <glib-unix.h>
 
 static const gchar introspection_xml[] =
        "<node>"
@@ -86,6 +87,10 @@ typedef struct {
        TrackerEndpointDBus *endpoint;
        GDBusMethodInvocation *invocation;
        GDataOutputStream *data_stream;
+       GCancellable *global_cancellable;
+       GCancellable *cancellable;
+       gulong cancellable_id;
+       GSource *source;
 } QueryRequest;
 
 typedef struct {
@@ -175,6 +180,28 @@ tracker_endpoint_dbus_add_prologue (TrackerEndpointDBus *endpoint_dbus,
        }
 }
 
+static gboolean
+fd_watch_cb (gint          fd,
+             GIOCondition  condition,
+             gpointer      user_data)
+{
+       QueryRequest *request = user_data;
+
+       if ((condition & (G_IO_ERR | G_IO_HUP)) != 0) {
+               g_cancellable_cancel (request->cancellable);
+               return G_SOURCE_REMOVE;
+       }
+
+       return G_SOURCE_CONTINUE;
+}
+
+static void
+propagate_cancelled (GCancellable *cancellable,
+                     GCancellable *connected_cancellable)
+{
+       g_cancellable_cancel (connected_cancellable);
+}
+
 static QueryRequest *
 query_request_new (TrackerEndpointDBus   *endpoint,
                    GDBusMethodInvocation *invocation,
@@ -186,6 +213,21 @@ query_request_new (TrackerEndpointDBus   *endpoint,
        request = g_new0 (QueryRequest, 1);
        request->invocation = g_object_ref (invocation);
        request->endpoint = endpoint;
+       request->global_cancellable = g_object_ref (endpoint->cancellable);
+       request->cancellable = g_cancellable_new ();
+       request->cancellable_id =
+               g_cancellable_connect (request->global_cancellable,
+                                      G_CALLBACK (propagate_cancelled),
+                                      g_object_ref (request->cancellable),
+                                      g_object_unref);
+
+       request->source = g_unix_fd_source_new (fd, G_IO_ERR | G_IO_HUP);
+
+       g_source_set_callback (request->source,
+                              G_SOURCE_FUNC (fd_watch_cb),
+                              request,
+                              NULL);
+       g_source_attach (request->source, g_main_context_get_thread_default ());
 
        stream = g_unix_output_stream_new (fd, TRUE);
        buffered_stream = g_buffered_output_stream_new_sized (stream,
@@ -204,6 +246,14 @@ query_request_new (TrackerEndpointDBus   *endpoint,
 static void
 query_request_free (QueryRequest *request)
 {
+       g_cancellable_disconnect (request->global_cancellable,
+                                 request->cancellable_id);
+       g_object_unref (request->global_cancellable);
+
+       g_source_destroy (request->source);
+       g_source_unref (request->source);
+       g_object_unref (request->cancellable);
+
        g_output_stream_close (G_OUTPUT_STREAM (request->data_stream),
                               NULL, NULL);
 
@@ -305,7 +355,7 @@ write_cursor (QueryRequest          *request,
        values = g_new0 (const char *, n_columns);
        offsets = g_new0 (glong, n_columns);
 
-       while (tracker_sparql_cursor_next (cursor, NULL, &inner_error)) {
+       while (tracker_sparql_cursor_next (cursor, request->cancellable, &inner_error)) {
                glong cur_offset = -1;
 
                if (!g_data_output_stream_put_int32 (request->data_stream, n_columns,
@@ -397,8 +447,10 @@ finish_query (GObject      *source_object,
        TrackerSparqlCursor *cursor = TRACKER_SPARQL_CURSOR (source_object);
        GError *error = NULL;
 
-       if (!g_task_propagate_boolean (G_TASK (res), &error))
-               g_critical ("Error writing cursor: %s\n", error->message);
+       if (!g_task_propagate_boolean (G_TASK (res), &error)) {
+               if (!g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED))
+                       g_critical ("Error writing cursor: %s\n", error->message);
+       }
 
        g_object_unref (cursor);
        g_clear_error (&error);
@@ -423,7 +475,7 @@ query_cb (GObject      *object,
                return;
        }
 
-       task = g_task_new (cursor, request->endpoint->cancellable, finish_query, NULL);
+       task = g_task_new (cursor, request->cancellable, finish_query, NULL);
        g_task_set_task_data (task, request, (GDestroyNotify) query_request_free);
        g_task_run_in_thread (task, handle_cursor_reply);
        g_object_unref (task);
@@ -448,7 +500,7 @@ stmt_execute_cb (GObject      *object,
                return;
        }
 
-       task = g_task_new (cursor, request->endpoint->cancellable, finish_query, NULL);
+       task = g_task_new (cursor, request->cancellable, finish_query, NULL);
        g_task_set_task_data (task, request, (GDestroyNotify) query_request_free);
        g_task_run_in_thread (task, handle_cursor_reply);
        g_object_unref (task);
@@ -645,11 +697,11 @@ endpoint_dbus_iface_method_call (GDBusConnection       *connection,
                                TrackerSparqlStatement *stmt;
 
                                stmt = create_statement (conn, query, arguments,
-                                                        endpoint_dbus->cancellable,
+                                                        request->cancellable,
                                                         &error);
                                if (stmt) {
                                        tracker_sparql_statement_execute_async (stmt,
-                                                                               endpoint_dbus->cancellable,
+                                                                               request->cancellable,
                                                                                stmt_execute_cb,
                                                                                request);
                                        /* Statements are single use here... */
@@ -662,7 +714,7 @@ endpoint_dbus_iface_method_call (GDBusConnection       *connection,
                        } else {
                                tracker_sparql_connection_query_async (conn,
                                                                       query,
-                                                                      endpoint_dbus->cancellable,
+                                                                      request->cancellable,
                                                                       query_cb,
                                                                       request);
                        }


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]