[gnome-remote-desktop] session-rdp: Run socket handling in a separate thread



commit 1e6b41fe58352218a84c98b2f1beb698682640de
Author: Pascal Nowack <Pascal Nowack gmx de>
Date:   Sun Oct 25 08:46:50 2020 +0100

    session-rdp: Run socket handling in a separate thread
    
    Currently, everything in gnome-remote-desktop is handled inside of one
    thread with the exception of the worker threads when encoding a frame
    for a RDP client and the PipeWire thread, where we receive the
    frame buffers.
    For clipboard support in RDP, we have to ensure that only one data
    request happens at the same time, since normal clipboard content
    (everything except files) is not bound to some identification number
    to assign a response to a request.
    This implies that data requests cannot be (easily) implemented in an
    asynchronous way and that we have to wait for the response in the same
    thread.
    However, this will also block any incoming data that will be handled in
    `handle_socket_data()`, which will unavoidably lead to a deadlock.
    Similar situations will happen later with the graphics pipeline or
    other virtual channels in RDP.
    
    Moreover, the current socket handling via `handle_socket_data()` causes
    other errors:
    When a new remote desktop client connects, `handle_socket_data()` is
    not always called, when it needs to be called.
    This leads to a problem, where the remote desktop client waits for its
    activation procedure to complete, but glib cannot realize that it has
    to call `handle_socket_data` to complete the procedure.
    
    To solve these situations, run the socket handling in a separate thread
    to not be able to run into a deadlock, and handle the socket handling
    with functions provided by FreeRDP and WinPR.
    The socket thread is like `handle_socket_data()` completely event-
    based.
    For the socket handling, FreeRDP allows us to listen on its transport
    event handles.
    With these event handles, g-r-d can now properly handle the activation
    process for clients at any time (which is not always possible with the
    socket handling functions provided by glib).
    Due to the handling of input events now happens in the socket thread,
    use the previous implemented RdpEventQueue to queue all input events
    on the main thread, since dbus handling still happens in the main
    thread.
    
    In addition to that, this also simplifies some areas of the source
    code, as there are no more attach_source/detach_source calls flying
    all around in the source code that were called during the
    initialization process.

 src/grd-session-rdp.c | 238 ++++++++++++++++++++++++++++++++++----------------
 1 file changed, 161 insertions(+), 77 deletions(-)
---
diff --git a/src/grd-session-rdp.c b/src/grd-session-rdp.c
index 06e98bf..d144a8e 100644
--- a/src/grd-session-rdp.c
+++ b/src/grd-session-rdp.c
@@ -32,6 +32,7 @@
 
 #include "grd-context.h"
 #include "grd-damage-utils.h"
+#include "grd-rdp-event-queue.h"
 #include "grd-rdp-pipewire-stream.h"
 #include "grd-rdp-sam.h"
 #include "grd-rdp-server.h"
@@ -70,9 +71,12 @@ struct _GrdSessionRdp
   GrdSession parent;
 
   GSocketConnection *connection;
-  GSource *source;
   freerdp_peer *peer;
 
+  GThread *socket_thread;
+  HANDLE start_event;
+  HANDLE stop_event;
+
   uint8_t *last_frame;
   Pointer *last_pointer;
   GHashTable *pointer_cache;
@@ -86,6 +90,8 @@ struct _GrdSessionRdp
   GHashTable *pressed_unicode_keys;
 #endif
 
+  GrdRdpEventQueue *rdp_event_queue;
+
   GThreadPool *thread_pool;
   GCond pending_jobs_cond;
   GMutex pending_jobs_mutex;
@@ -130,6 +136,7 @@ typedef struct _RdpPeerContext
   GrdSessionRdp *session_rdp;
   GrdRdpSAMFile *sam_file;
 
+  GMutex flags_mutex;
   RdpPeerFlag flags;
   uint32_t frame_id;
 
@@ -147,9 +154,6 @@ typedef struct _RdpPeerContext
 
 G_DEFINE_TYPE (GrdSessionRdp, grd_session_rdp, GRD_TYPE_SESSION);
 
-static void
-grd_session_rdp_detach_source (GrdSessionRdp *session_rdp);
-
 static gboolean
 close_session_idle (gpointer user_data);
 
@@ -162,6 +166,37 @@ static gboolean
 are_pointer_bitmaps_equal (gconstpointer a,
                            gconstpointer b);
 
+static gboolean
+is_rdp_peer_flag_set (RdpPeerContext *rdp_peer_context,
+                      RdpPeerFlag     flag)
+{
+  gboolean state;
+
+  g_mutex_lock (&rdp_peer_context->flags_mutex);
+  state = rdp_peer_context->flags & flag;
+  g_mutex_unlock (&rdp_peer_context->flags_mutex);
+
+  return state;
+}
+
+static void
+set_rdp_peer_flag (RdpPeerContext *rdp_peer_context,
+                   RdpPeerFlag     flag)
+{
+  g_mutex_lock (&rdp_peer_context->flags_mutex);
+  rdp_peer_context->flags |= flag;
+  g_mutex_unlock (&rdp_peer_context->flags_mutex);
+}
+
+static void
+unset_rdp_peer_flag (RdpPeerContext *rdp_peer_context,
+                     RdpPeerFlag     flag)
+{
+  g_mutex_lock (&rdp_peer_context->flags_mutex);
+  rdp_peer_context->flags &= ~flag;
+  g_mutex_unlock (&rdp_peer_context->flags_mutex);
+}
+
 void
 grd_session_rdp_resize_framebuffer (GrdSessionRdp *session_rdp,
                                     uint32_t       width,
@@ -205,8 +240,8 @@ grd_session_rdp_take_buffer (GrdSessionRdp *session_rdp,
   uint32_t stride = grd_session_rdp_get_framebuffer_stride (session_rdp);
   cairo_region_t *region;
 
-  if (rdp_peer_context->flags & RDP_PEER_ACTIVATED &&
-      rdp_peer_context->flags & RDP_PEER_OUTPUT_ENABLED)
+  if (is_rdp_peer_flag_set (rdp_peer_context, RDP_PEER_ACTIVATED) &&
+      is_rdp_peer_flag_set (rdp_peer_context, RDP_PEER_OUTPUT_ENABLED))
     {
       region = grd_get_damage_region ((uint8_t *) data,
                                       session_rdp->last_frame,
@@ -279,7 +314,7 @@ grd_session_rdp_update_pointer (GrdSessionRdp *session_rdp,
   uint8_t r, g, b, a;
   uint32_t x, y;
 
-  if (!(rdp_peer_context->flags & RDP_PEER_ACTIVATED))
+  if (!is_rdp_peer_flag_set (rdp_peer_context, RDP_PEER_ACTIVATED))
     {
       g_free (data);
       return;
@@ -463,7 +498,7 @@ grd_session_rdp_hide_pointer (GrdSessionRdp *session_rdp)
   rdpUpdate *rdp_update = peer->update;
   POINTER_SYSTEM_UPDATE pointer_system = {0};
 
-  if (!(rdp_peer_context->flags & RDP_PEER_ACTIVATED))
+  if (!is_rdp_peer_flag_set (rdp_peer_context, RDP_PEER_ACTIVATED))
     return;
 
   if (session_rdp->pointer_type == POINTER_TYPE_HIDDEN)
@@ -486,7 +521,7 @@ grd_session_rdp_move_pointer (GrdSessionRdp *session_rdp,
   rdpUpdate *rdp_update = peer->update;
   POINTER_POSITION_UPDATE pointer_position = {0};
 
-  if (!(rdp_peer_context->flags & RDP_PEER_ACTIVATED))
+  if (!is_rdp_peer_flag_set (rdp_peer_context, RDP_PEER_ACTIVATED))
     return;
 
   g_mutex_lock (&session_rdp->pointer_mutex);
@@ -511,14 +546,19 @@ maybe_queue_close_session_idle (GrdSessionRdp *session_rdp)
 
   session_rdp->close_session_idle_id =
     g_idle_add (close_session_idle, session_rdp);
+
+  SetEvent (session_rdp->stop_event);
 }
 
 static void
 handle_client_gone (GrdSessionRdp *session_rdp)
 {
+  freerdp_peer *peer = session_rdp->peer;
+  RdpPeerContext *rdp_peer_context = (RdpPeerContext *) peer->context;
+
   g_debug ("RDP client gone");
 
-  grd_session_rdp_detach_source (session_rdp);
+  unset_rdp_peer_flag (rdp_peer_context, RDP_PEER_ACTIVATED);
   maybe_queue_close_session_idle (session_rdp);
 }
 
@@ -1050,9 +1090,13 @@ notify_keysym_released (gpointer key,
                         gpointer user_data)
 {
   GrdSession *session = (GrdSession *) user_data;
+  GrdSessionRdp *session_rdp = GRD_SESSION_RDP (session);
+  GrdRdpEventQueue *rdp_event_queue = session_rdp->rdp_event_queue;
   xkb_keysym_t keysym = GPOINTER_TO_UINT (key);
 
-  grd_session_notify_keyboard_keysym (session, keysym, GRD_KEY_STATE_RELEASED);
+  grd_rdp_event_queue_add_input_event_keyboard_keysym (rdp_event_queue,
+                                                       keysym,
+                                                       GRD_KEY_STATE_RELEASED);
 
   return TRUE;
 }
@@ -1068,7 +1112,7 @@ rdp_input_synchronize_event (rdpInput *rdp_input,
   GrdSession *session = GRD_SESSION (session_rdp);
 #endif
 
-  if (!(rdp_peer_context->flags & RDP_PEER_ACTIVATED))
+  if (!is_rdp_peer_flag_set (rdp_peer_context, RDP_PEER_ACTIVATED))
     return TRUE;
 
 #ifdef HAS_RDP_UNICODE_INPUT
@@ -1088,12 +1132,12 @@ rdp_input_mouse_event (rdpInput *rdp_input,
 {
   RdpPeerContext *rdp_peer_context = (RdpPeerContext *) rdp_input->context;
   GrdSessionRdp *session_rdp = rdp_peer_context->session_rdp;
-  GrdSession *session = GRD_SESSION (session_rdp);
+  GrdRdpEventQueue *rdp_event_queue = session_rdp->rdp_event_queue;
   GrdButtonState button_state;
   int32_t button = 0;
   double axis_step;
 
-  if (!(rdp_peer_context->flags & RDP_PEER_ACTIVATED) ||
+  if (!is_rdp_peer_flag_set (rdp_peer_context, RDP_PEER_ACTIVATED) ||
       is_view_only (session_rdp))
     return TRUE;
 
@@ -1104,7 +1148,8 @@ rdp_input_mouse_event (rdpInput *rdp_input,
       session_rdp->pointer_y = y;
       g_mutex_unlock (&session_rdp->pointer_mutex);
 
-      grd_session_notify_pointer_motion_absolute (session, x, y);
+      grd_rdp_event_queue_add_input_event_pointer_motion_abs (rdp_event_queue,
+                                                              x, y);
     }
 
   button_state = flags & PTR_FLAGS_DOWN ? GRD_BUTTON_STATE_PRESSED
@@ -1118,7 +1163,10 @@ rdp_input_mouse_event (rdpInput *rdp_input,
     button = BTN_MIDDLE;
 
   if (button)
-    grd_session_notify_pointer_button (session, button, button_state);
+    {
+      grd_rdp_event_queue_add_input_event_pointer_button (rdp_event_queue,
+                                                          button, button_state);
+    }
 
   if (!(flags & PTR_FLAGS_WHEEL) && !(flags & PTR_FLAGS_HWHEEL))
     return TRUE;
@@ -1129,13 +1177,15 @@ rdp_input_mouse_event (rdpInput *rdp_input,
 
   if (flags & PTR_FLAGS_WHEEL)
     {
-      grd_session_notify_pointer_axis (session, 0, axis_step * SCROLL_STEP,
-                                       GRD_POINTER_AXIS_FLAGS_SOURCE_WHEEL);
+      grd_rdp_event_queue_add_input_event_pointer_axis (
+        rdp_event_queue, 0, axis_step * SCROLL_STEP,
+        GRD_POINTER_AXIS_FLAGS_SOURCE_WHEEL);
     }
   if (flags & PTR_FLAGS_HWHEEL)
     {
-      grd_session_notify_pointer_axis (session, -axis_step * SCROLL_STEP, 0,
-                                       GRD_POINTER_AXIS_FLAGS_SOURCE_WHEEL);
+      grd_rdp_event_queue_add_input_event_pointer_axis (
+        rdp_event_queue, -axis_step * SCROLL_STEP, 0,
+        GRD_POINTER_AXIS_FLAGS_SOURCE_WHEEL);
     }
 
   return TRUE;
@@ -1149,11 +1199,11 @@ rdp_input_extended_mouse_event (rdpInput *rdp_input,
 {
   RdpPeerContext *rdp_peer_context = (RdpPeerContext *) rdp_input->context;
   GrdSessionRdp *session_rdp = rdp_peer_context->session_rdp;
-  GrdSession *session = GRD_SESSION (session_rdp);
+  GrdRdpEventQueue *rdp_event_queue = session_rdp->rdp_event_queue;
   GrdButtonState button_state;
   int32_t button = 0;
 
-  if (!(rdp_peer_context->flags & RDP_PEER_ACTIVATED) ||
+  if (!is_rdp_peer_flag_set (rdp_peer_context, RDP_PEER_ACTIVATED) ||
       is_view_only (session_rdp))
     return TRUE;
 
@@ -1169,7 +1219,10 @@ rdp_input_extended_mouse_event (rdpInput *rdp_input,
     button = BTN_EXTRA;
 
   if (button)
-    grd_session_notify_pointer_button (session, button, button_state);
+    {
+      grd_rdp_event_queue_add_input_event_pointer_button (rdp_event_queue,
+                                                          button, button_state);
+    }
 
   return TRUE;
 }
@@ -1182,7 +1235,7 @@ rdp_input_keyboard_event (rdpInput *rdp_input,
   RdpPeerContext *rdp_peer_context = (RdpPeerContext *) rdp_input->context;
   GrdSessionRdp *session_rdp = rdp_peer_context->session_rdp;
 
-  if (!(rdp_peer_context->flags & RDP_PEER_ACTIVATED) ||
+  if (!is_rdp_peer_flag_set (rdp_peer_context, RDP_PEER_ACTIVATED) ||
       is_view_only (session_rdp))
     return TRUE;
 
@@ -1197,13 +1250,13 @@ rdp_input_unicode_keyboard_event (rdpInput *rdp_input,
   RdpPeerContext *rdp_peer_context = (RdpPeerContext *) rdp_input->context;
   GrdSessionRdp *session_rdp = rdp_peer_context->session_rdp;
 #ifdef HAS_RDP_UNICODE_INPUT
-  GrdSession *session = GRD_SESSION (session_rdp);
+  GrdRdpEventQueue *rdp_event_queue = session_rdp->rdp_event_queue;
   uint32_t *code_utf32;
   xkb_keysym_t keysym;
   GrdKeyState key_state;
 #endif
 
-  if (!(rdp_peer_context->flags & RDP_PEER_ACTIVATED) ||
+  if (!is_rdp_peer_flag_set (rdp_peer_context, RDP_PEER_ACTIVATED) ||
       is_view_only (session_rdp))
     return TRUE;
 
@@ -1231,7 +1284,8 @@ rdp_input_unicode_keyboard_event (rdpInput *rdp_input,
         return TRUE;
     }
 
-  grd_session_notify_keyboard_keysym (session, keysym, key_state);
+  grd_rdp_event_queue_add_input_event_keyboard_keysym (rdp_event_queue,
+                                                       keysym, key_state);
 #endif
 
   return TRUE;
@@ -1245,9 +1299,9 @@ rdp_suppress_output (rdpContext         *rdp_context,
   RdpPeerContext *rdp_peer_context = (RdpPeerContext *) rdp_context;
 
   if (allow)
-    rdp_peer_context->flags |= RDP_PEER_OUTPUT_ENABLED;
+    set_rdp_peer_flag (rdp_peer_context, RDP_PEER_OUTPUT_ENABLED);
   else
-    rdp_peer_context->flags &= ~RDP_PEER_OUTPUT_ENABLED;
+    unset_rdp_peer_flag (rdp_peer_context, RDP_PEER_OUTPUT_ENABLED);
 
   return TRUE;
 }
@@ -1310,12 +1364,11 @@ rdp_peer_post_connect (freerdp_peer *peer)
   rdp_settings->PointerCacheSize = MIN (rdp_settings->PointerCacheSize, 100);
 
   grd_session_start (GRD_SESSION (session_rdp));
-  grd_session_rdp_detach_source (session_rdp);
 
   sam_file = g_steal_pointer (&rdp_peer_context->sam_file);
   grd_rdp_sam_maybe_close_and_free_sam_file (sam_file);
 
-  rdp_peer_context->flags |= RDP_PEER_ACTIVATED;
+  set_rdp_peer_flag (rdp_peer_context, RDP_PEER_ACTIVATED);
 
   return TRUE;
 }
@@ -1355,6 +1408,7 @@ rdp_peer_context_new (freerdp_peer   *peer,
 
   rdp_peer_context->vcm = WTSOpenServerA ((LPSTR) peer->context);
 
+  g_mutex_init (&rdp_peer_context->flags_mutex);
   rdp_peer_context->flags = RDP_PEER_OUTPUT_ENABLED;
 
   return TRUE;
@@ -1453,57 +1507,67 @@ init_rdp_session (GrdSessionRdp *session_rdp,
   session_rdp->last_frame = NULL;
   session_rdp->last_pointer = NULL;
   session_rdp->thread_pool = NULL;
+
+  SetEvent (session_rdp->start_event);
 }
 
-static gboolean
-handle_socket_data (GSocket      *socket,
-                    GIOCondition  condition,
-                    gpointer      user_data)
+gpointer
+socket_thread_func (gpointer data)
 {
-  GrdSessionRdp *session_rdp = user_data;
-  freerdp_peer *peer = session_rdp->peer;
-  RdpPeerContext *rdp_peer_context = (RdpPeerContext *) peer->context;
+  GrdSessionRdp *session_rdp = data;
+  freerdp_peer *peer;
+  RdpPeerContext *rdp_peer_context;
+  HANDLE channel_event;
+  HANDLE events[32];
+  uint32_t n_events;
+  uint32_t n_freerdp_handles;
+
+  WaitForSingleObject (session_rdp->start_event, INFINITE);
+
+  peer = session_rdp->peer;
+  rdp_peer_context = (RdpPeerContext *) peer->context;
+  channel_event = WTSVirtualChannelManagerGetEventHandle (rdp_peer_context->vcm);
 
-  if (condition & G_IO_IN)
+  while (TRUE)
     {
-      if (!peer->CheckFileDescriptor (peer) ||
-          !WTSVirtualChannelManagerCheckFileDescriptor (rdp_peer_context->vcm))
+      n_events = 0;
+
+      events[n_events++] = session_rdp->stop_event;
+
+      n_freerdp_handles = peer->GetEventHandles (peer, &events[n_events],
+                                                 32 - n_events);
+      if (!n_freerdp_handles)
         {
-          g_message ("Unable to check (VCM) file descriptor, closing connection");
-          rdp_peer_context->flags &= ~RDP_PEER_ACTIVATED;
+          g_warning ("Failed to get FreeRDP transport event handles");
           handle_client_gone (session_rdp);
-
-          return G_SOURCE_REMOVE;
+          break;
         }
-    }
-  else
-    {
-      g_debug ("Unhandled socket condition %d\n", condition);
-      return G_SOURCE_REMOVE;
-    }
+      n_events += n_freerdp_handles;
 
-  return G_SOURCE_CONTINUE;
-}
+      events[n_events++] = channel_event;
 
-static void
-grd_session_rdp_attach_source (GrdSessionRdp *session_rdp)
-{
-  GSocket *socket;
-
-  socket = g_socket_connection_get_socket (session_rdp->connection);
-  session_rdp->source = g_socket_create_source (socket,
-                                                G_IO_IN | G_IO_PRI,
-                                                NULL);
-  g_source_set_callback (session_rdp->source,
-                         (GSourceFunc) handle_socket_data,
-                         session_rdp, NULL);
-  g_source_attach (session_rdp->source, NULL);
-}
+      WaitForMultipleObjects (n_events, events, FALSE, INFINITE);
 
-static void
-grd_session_rdp_detach_source (GrdSessionRdp *session_rdp)
-{
-  g_clear_pointer (&session_rdp->source, g_source_destroy);
+      if (WaitForSingleObject (session_rdp->stop_event, 0) == WAIT_OBJECT_0)
+        break;
+
+      if (!peer->CheckFileDescriptor (peer))
+        {
+          g_message ("Unable to check file descriptor, closing connection");
+          handle_client_gone (session_rdp);
+          break;
+        }
+
+      if (WaitForSingleObject (channel_event, 0) == WAIT_OBJECT_0 &&
+          !WTSVirtualChannelManagerCheckFileDescriptor (rdp_peer_context->vcm))
+        {
+          g_message ("Unable to check VCM file descriptor, closing connection");
+          handle_client_gone (session_rdp);
+          break;
+        }
+    }
+
+  return NULL;
 }
 
 GrdSessionRdp *
@@ -1538,8 +1602,24 @@ grd_session_rdp_new (GrdRdpServer      *rdp_server,
                               NULL);
 
   session_rdp->connection = g_object_ref (connection);
+  session_rdp->start_event = CreateEvent (NULL, TRUE, FALSE, NULL);
+  session_rdp->stop_event = CreateEvent (NULL, TRUE, FALSE, NULL);
 
-  grd_session_rdp_attach_source (session_rdp);
+  session_rdp->socket_thread = g_thread_new ("RDP socket thread",
+                                             socket_thread_func,
+                                             session_rdp);
+  if (!session_rdp->socket_thread)
+    {
+      g_warning ("Failed to create socket thread");
+
+      g_clear_pointer (&session_rdp->stop_event, CloseHandle);
+      g_clear_pointer (&session_rdp->start_event, CloseHandle);
+      g_object_unref (connection);
+      g_free (password);
+      g_free (username);
+
+      return NULL;
+    }
 
   init_rdp_session (session_rdp, username, password);
 
@@ -1571,10 +1651,11 @@ grd_session_rdp_stop (GrdSession *session)
 
   g_debug ("Stopping RDP session");
 
-  g_clear_object (&session_rdp->pipewire_stream);
+  SetEvent (session_rdp->stop_event);
 
-  grd_session_rdp_detach_source (session_rdp);
+  g_clear_object (&session_rdp->pipewire_stream);
 
+  g_clear_pointer (&session_rdp->socket_thread, g_thread_join);
   g_clear_object (&session_rdp->connection);
 
   if (rdp_peer_context->sam_file)
@@ -1592,12 +1673,16 @@ grd_session_rdp_stop (GrdSession *session)
                                notify_keysym_released,
                                session);
 #endif
+  g_clear_object (&session_rdp->rdp_event_queue);
 
   g_clear_pointer (&session_rdp->last_frame, g_free);
   g_hash_table_foreach_remove (session_rdp->pointer_cache,
                                clear_pointer_bitmap,
                                NULL);
 
+  g_clear_pointer (&session_rdp->stop_event, CloseHandle);
+  g_clear_pointer (&session_rdp->start_event, CloseHandle);
+
   g_clear_handle_id (&session_rdp->close_session_idle_id, g_source_remove);
 }
 
@@ -1643,9 +1728,6 @@ grd_session_rdp_stream_ready (GrdSession *session,
   g_signal_connect (session_rdp->pipewire_stream, "closed",
                     G_CALLBACK (on_pipewire_stream_closed),
                     session_rdp);
-
-  if (!session_rdp->source)
-    grd_session_rdp_attach_source (session_rdp);
 }
 
 static void
@@ -1698,6 +1780,8 @@ grd_session_rdp_init (GrdSessionRdp *session_rdp)
   g_cond_init (&session_rdp->pending_jobs_cond);
   g_mutex_init (&session_rdp->pending_jobs_mutex);
   g_mutex_init (&session_rdp->pointer_mutex);
+
+  session_rdp->rdp_event_queue = grd_rdp_event_queue_new (session_rdp);
 }
 
 static void


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]