[gnome-remote-desktop] rdp/pipewire-stream: Use EGL thread for DMA buffer downloads



commit a60df1c64355e5889dbd639c10b351f9c51e1d1c
Author: Jonas Ådahl <jadahl gmail com>
Date:   Fri Nov 5 16:50:11 2021 +0100

    rdp/pipewire-stream: Use EGL thread for DMA buffer downloads
    
    This, just as with the VNC backend, will ensure that we use the right
    driver specific method for downloading DMA buffer contents into the
    framebuffer used for RDP.

 src/grd-rdp-pipewire-stream.c | 201 ++++++++++++++++++++++++++++++++++--------
 1 file changed, 163 insertions(+), 38 deletions(-)
---
diff --git a/src/grd-rdp-pipewire-stream.c b/src/grd-rdp-pipewire-stream.c
index bb49e59..d830d9b 100644
--- a/src/grd-rdp-pipewire-stream.c
+++ b/src/grd-rdp-pipewire-stream.c
@@ -28,8 +28,9 @@
 #include <spa/param/video/format-utils.h>
 #include <spa/utils/result.h>
 #include <sys/mman.h>
-#include <sys/syscall.h>
 
+#include "grd-context.h"
+#include "grd-egl-thread.h"
 #include "grd-pipewire-utils.h"
 
 enum
@@ -45,6 +46,7 @@ typedef struct _GrdRdpFrame GrdRdpFrame;
 
 typedef void (* GrdRdpFrameReadyCallback) (GrdRdpPipeWireStream *stream,
                                            GrdRdpFrame          *frame,
+                                           gboolean              success,
                                            gpointer              user_data);
 
 struct _GrdRdpFrame
@@ -62,6 +64,10 @@ struct _GrdRdpFrame
   uint16_t pointer_width;
   uint16_t pointer_height;
   gboolean pointer_is_hidden;
+
+  GrdRdpPipeWireStream *stream;
+  GrdRdpFrameReadyCallback callback;
+  gpointer callback_user_data;
 };
 
 struct _GrdRdpPipeWireStream
@@ -86,6 +92,8 @@ struct _GrdRdpPipeWireStream
   uint32_t src_node_id;
 
   struct spa_video_info_raw spa_format;
+
+  gboolean destroyed;
 };
 
 G_DEFINE_TYPE (GrdRdpPipeWireStream, grd_rdp_pipewire_stream,
@@ -96,17 +104,29 @@ static void grd_rdp_frame_unref (GrdRdpFrame *frame);
 G_DEFINE_AUTOPTR_CLEANUP_FUNC (GrdRdpFrame, grd_rdp_frame_unref)
 
 static GrdRdpFrame *
-grd_rdp_frame_new (void)
+grd_rdp_frame_new (GrdRdpPipeWireStream     *stream,
+                   GrdRdpFrameReadyCallback  callback,
+                   gpointer                  callback_user_data)
 {
   GrdRdpFrame *frame;
 
   frame = g_new0 (GrdRdpFrame, 1);
 
   g_atomic_ref_count_init (&frame->refcount);
+  frame->stream = stream;
+  frame->callback = callback;
+  frame->callback_user_data = callback_user_data;
 
   return frame;
 }
 
+static GrdRdpFrame *
+grd_rdp_frame_ref (GrdRdpFrame *frame)
+{
+  g_atomic_ref_count_inc (&frame->refcount);
+  return frame;
+}
+
 static void
 grd_rdp_frame_unref (GrdRdpFrame *frame)
 {
@@ -276,8 +296,11 @@ on_stream_param_changed (void                 *user_data,
                          const struct spa_pod *format)
 {
   GrdRdpPipeWireStream *stream = GRD_RDP_PIPEWIRE_STREAM (user_data);
+  GrdSession *session = GRD_SESSION (stream->session_rdp);
+  GrdContext *context = grd_session_get_context (session);
   uint8_t params_buffer[1024];
   struct spa_pod_builder pod_builder;
+  enum spa_data_type allowed_buffer_types;
   const struct spa_pod *params[3];
 
   if (!format || id != SPA_PARAM_Format)
@@ -287,10 +310,15 @@ on_stream_param_changed (void                 *user_data,
 
   pod_builder = SPA_POD_BUILDER_INIT (params_buffer, sizeof (params_buffer));
 
+  allowed_buffer_types = 1 << SPA_DATA_MemFd;
+  if (grd_context_get_egl_thread (context))
+    allowed_buffer_types |= 1 << SPA_DATA_DmaBuf;
+
   params[0] = spa_pod_builder_add_object (
     &pod_builder,
     SPA_TYPE_OBJECT_ParamBuffers, SPA_PARAM_Buffers,
     SPA_PARAM_BUFFERS_buffers, SPA_POD_CHOICE_RANGE_Int (8, 1, 8),
+    SPA_PARAM_BUFFERS_dataType, SPA_POD_Int (allowed_buffer_types),
     0);
 
   params[1] = spa_pod_builder_add_object (
@@ -333,6 +361,18 @@ copy_frame_data (GrdRdpFrame *frame,
     }
 }
 
+static void
+on_dma_buf_downloaded (gboolean success,
+                       gpointer user_data)
+{
+  GrdRdpFrame *frame = user_data;
+
+  frame->callback (frame->stream,
+                   frame,
+                   success,
+                   frame->callback_user_data);
+}
+
 static void
 process_buffer (GrdRdpPipeWireStream     *stream,
                 struct spa_buffer        *buffer,
@@ -345,9 +385,6 @@ process_buffer (GrdRdpPipeWireStream     *stream,
   int height;
   int src_stride;
   int dst_stride;
-  size_t size;
-  uint8_t *map;
-  void *src_data;
   struct spa_meta_cursor *spa_meta_cursor;
   g_autoptr (GrdRdpFrame) frame = NULL;
 
@@ -359,7 +396,7 @@ process_buffer (GrdRdpPipeWireStream     *stream,
   grd_get_spa_format_details (stream->spa_format.format,
                               &drm_format, &bpp);
 
-  frame = grd_rdp_frame_new ();
+  frame = grd_rdp_frame_new (stream, callback, user_data);
 
   spa_meta_cursor = spa_buffer_find_meta_data (buffer, SPA_META_Cursor,
                                                sizeof *spa_meta_cursor);
@@ -401,75 +438,108 @@ process_buffer (GrdRdpPipeWireStream     *stream,
 
   if (buffer->datas[0].chunk->size == 0)
     {
-      callback (stream, g_steal_pointer (&frame), user_data);
+      callback (stream, g_steal_pointer (&frame), TRUE, user_data);
       return;
     }
   else if (buffer->datas[0].type == SPA_DATA_MemFd)
     {
+      size_t size;
+      uint8_t *map;
+      void *src_data;
+
       size = buffer->datas[0].maxsize + buffer->datas[0].mapoffset;
       map = mmap (NULL, size, PROT_READ, MAP_PRIVATE, buffer->datas[0].fd, 0);
       if (map == MAP_FAILED)
         {
           g_warning ("Failed to mmap buffer: %s", g_strerror (errno));
-          callback (stream, g_steal_pointer (&frame), user_data);
+          callback (stream, g_steal_pointer (&frame), TRUE, user_data);
           return;
         }
       src_data = SPA_MEMBER (map, buffer->datas[0].mapoffset, uint8_t);
       frame->width = width;
       frame->height = height;
+
+      copy_frame_data (frame,
+                       src_data,
+                       width, height,
+                       dst_stride,
+                       src_stride,
+                       bpp);
+
+      munmap (map, size);
+
+      callback (stream, g_steal_pointer (&frame), TRUE, user_data);
     }
   else if (buffer->datas[0].type == SPA_DATA_DmaBuf)
     {
-      int fd;
+      GrdSession *session = GRD_SESSION (stream->session_rdp);
+      GrdContext *context = grd_session_get_context (session);
+      GrdEglThread *egl_thread = grd_context_get_egl_thread (context);
+      int row_width;
+      int *fds;
+      uint32_t *offsets;
+      uint32_t *strides;
+      uint64_t *modifiers;
+      uint32_t n_planes;
+      unsigned int i;
+      uint8_t *dst_data;
 
-      fd = buffer->datas[0].fd;
-      size = buffer->datas[0].maxsize + buffer->datas[0].mapoffset;
+      frame->width = width;
+      frame->height = height;
 
-      map = mmap (NULL, size, PROT_READ, MAP_PRIVATE, fd, 0);
-      if (map == MAP_FAILED)
+      row_width = dst_stride / bpp;
+
+      n_planes = buffer->n_datas;
+      fds = g_alloca (sizeof (int) * n_planes);
+      offsets = g_alloca (sizeof (uint32_t) * n_planes);
+      strides = g_alloca (sizeof (uint32_t) * n_planes);
+      modifiers = g_alloca (sizeof (uint64_t) * n_planes);
+
+      for (i = 0; i < n_planes; i++)
         {
-          g_warning ("Failed to mmap DMA buffer: %s", g_strerror (errno));
-          callback (stream, g_steal_pointer (&frame), user_data);
-          return;
+          fds[i] = buffer->datas[i].fd;
+          offsets[i] = buffer->datas[i].chunk->offset;
+          strides[i] = buffer->datas[i].chunk->stride;
+          modifiers[i] = stream->spa_format.modifier;
         }
-      grd_sync_dma_buf (fd, DMA_BUF_SYNC_START);
-
-      src_data = SPA_MEMBER (map, buffer->datas[0].mapoffset, uint8_t);
-      frame->width = width;
-      frame->height = height;
+      dst_data = g_malloc0 (height * dst_stride);
+
+      frame->data = dst_data;
+      grd_egl_thread_download (egl_thread,
+                               dst_data,
+                               row_width,
+                               drm_format,
+                               width, height,
+                               n_planes,
+                               fds,
+                               strides,
+                               offsets,
+                               modifiers,
+                               on_dma_buf_downloaded,
+                               grd_rdp_frame_ref (g_steal_pointer (&frame)),
+                               (GDestroyNotify) grd_rdp_frame_unref);
     }
   else if (buffer->datas[0].type == SPA_DATA_MemPtr)
     {
-      size = buffer->datas[0].maxsize + buffer->datas[0].mapoffset;
-      map = NULL;
+      void *src_data;
+
       src_data = buffer->datas[0].data;
       frame->width = width;
       frame->height = height;
-    }
-  else
-    {
-      callback (stream, g_steal_pointer (&frame), user_data);
-      return;
-    }
 
-  if (src_data)
-    {
       copy_frame_data (frame,
                        src_data,
                        width, height,
                        dst_stride,
                        src_stride,
                        bpp);
-    }
 
-  if (map)
+      callback (stream, g_steal_pointer (&frame), TRUE, user_data);
+    }
+  else
     {
-      if (buffer->datas[0].type == SPA_DATA_DmaBuf)
-        grd_sync_dma_buf (buffer->datas[0].fd, DMA_BUF_SYNC_END);
-      munmap (map, size);
+      callback (stream, g_steal_pointer (&frame), TRUE, user_data);
     }
-
-  callback (stream, g_steal_pointer (&frame), user_data);
 }
 
 static void
@@ -499,12 +569,17 @@ take_pointer_data_from (GrdRdpFrame *src_frame,
 static void
 on_frame_ready (GrdRdpPipeWireStream *stream,
                 GrdRdpFrame          *frame,
+                gboolean              success,
                 gpointer              user_data)
 {
   struct pw_buffer *buffer = user_data;
   GrdRdpFrame *pending_frame;
 
   g_assert (frame);
+
+  if (!success)
+    goto out;
+
   g_mutex_lock (&stream->frame_mutex);
   pending_frame = g_steal_pointer (&stream->pending_frame);
   if (pending_frame)
@@ -519,6 +594,7 @@ on_frame_ready (GrdRdpPipeWireStream *stream,
   stream->pending_frame = frame;
   g_mutex_unlock (&stream->frame_mutex);
 
+out:
   pw_stream_queue_buffer (stream->pipewire_stream, buffer);
 
   g_source_set_ready_time (stream->render_source, 0);
@@ -531,6 +607,9 @@ on_stream_process (void *user_data)
   struct pw_buffer *next_buffer;
   struct pw_buffer *buffer = NULL;
 
+  if (stream->destroyed)
+    return;
+
   next_buffer = pw_stream_dequeue_buffer (stream->pipewire_stream);
   while (next_buffer)
     {
@@ -691,10 +770,56 @@ grd_rdp_pipewire_stream_new (GrdSessionRdp  *session_rdp,
   return g_steal_pointer (&stream);
 }
 
+typedef struct
+{
+  GMutex mutex;
+  GCond cond;
+  gboolean done;
+} SyncData;
+
+static void
+on_sync_done (gboolean success,
+              gpointer user_data)
+{
+  SyncData *sync_data = user_data;
+
+  g_mutex_lock (&sync_data->mutex);
+  sync_data->done = TRUE;
+  g_cond_signal (&sync_data->cond);
+  g_mutex_unlock (&sync_data->mutex);
+}
+
 static void
 grd_rdp_pipewire_stream_finalize (GObject *object)
 {
   GrdRdpPipeWireStream *stream = GRD_RDP_PIPEWIRE_STREAM (object);
+  GrdSession *session = GRD_SESSION (stream->session_rdp);
+  GrdContext *context = grd_session_get_context (session);
+  GrdEglThread *egl_thread;
+
+  stream->destroyed = TRUE;
+
+  if (stream->pipewire_stream)
+    pw_stream_flush (stream->pipewire_stream, false);
+
+  egl_thread = grd_context_get_egl_thread (context);
+  if (egl_thread && stream->pipewire_stream)
+    {
+      SyncData sync_data = {};
+
+      g_mutex_init (&sync_data.mutex);
+      g_cond_init (&sync_data.cond);
+
+      grd_egl_thread_sync (egl_thread, on_sync_done, &sync_data, NULL);
+
+      g_mutex_lock (&sync_data.mutex);
+      while (!sync_data.done)
+        g_cond_wait (&sync_data.cond, &sync_data.mutex);
+      g_mutex_unlock (&sync_data.mutex);
+
+      g_cond_clear (&sync_data.cond);
+      g_mutex_clear (&sync_data.mutex);
+    }
 
   /*
    * We can't clear stream->pipewire_stream before destroying it, as the data


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]