[gnome-remote-desktop] vnc/pipewire-stream: Use EGL thread to download DMA buffers



commit 0c8709af1b4a34a051627bca41b1d2a5b9622250
Author: Jonas Ådahl <jadahl gmail com>
Date:   Fri Nov 5 15:54:02 2021 +0100

    vnc/pipewire-stream: Use EGL thread to download DMA buffers
    
    This ensures we use the driver specific method for downloading DMA
    buffers. This also allows us to explicitly ask for DMA buffers when
    negotiating PipeWire stream parameters.

 src/grd-vnc-pipewire-stream.c | 364 +++++++++++++++++++++++++++++++-----------
 src/meson.build               |   1 +
 2 files changed, 271 insertions(+), 94 deletions(-)
---
diff --git a/src/grd-vnc-pipewire-stream.c b/src/grd-vnc-pipewire-stream.c
index 8186170..98d5d0b 100644
--- a/src/grd-vnc-pipewire-stream.c
+++ b/src/grd-vnc-pipewire-stream.c
@@ -22,15 +22,16 @@
 
 #include "grd-vnc-pipewire-stream.h"
 
-#include <linux/dma-buf.h>
+#include <drm_fourcc.h>
 #include <pipewire/pipewire.h>
 #include <spa/param/props.h>
 #include <spa/param/format-utils.h>
 #include <spa/param/video/format-utils.h>
 #include <spa/utils/result.h>
 #include <sys/mman.h>
-#include <sys/syscall.h>
 
+#include "grd-context.h"
+#include "grd-egl-thread.h"
 #include "grd-pipewire-utils.h"
 #include "grd-vnc-cursor.h"
 
@@ -43,7 +44,14 @@ enum
 
 static guint signals[N_SIGNALS];
 
-typedef struct _GrdVncFrame
+typedef struct _GrdVncFrame GrdVncFrame;
+
+typedef void (* GrdVncFrameReadyCallback) (GrdVncPipeWireStream *stream,
+                                           GrdVncFrame          *frame,
+                                           gboolean              success,
+                                           gpointer              user_data);
+
+struct _GrdVncFrame
 {
   gatomicrefcount refcount;
 
@@ -52,11 +60,11 @@ typedef struct _GrdVncFrame
   gboolean cursor_moved;
   int cursor_x;
   int cursor_y;
-} GrdVncFrame;
 
-typedef void (* GrdVncFrameReadyCallback) (GrdVncPipeWireStream *stream,
-                                           GrdVncFrame          *frame,
-                                           gpointer              user_data);
+  GrdVncPipeWireStream *stream;
+  GrdVncFrameReadyCallback callback;
+  gpointer callback_user_data;
+};
 
 struct _GrdVncPipeWireStream
 {
@@ -80,6 +88,8 @@ struct _GrdVncPipeWireStream
   uint32_t src_node_id;
 
   struct spa_video_info_raw spa_format;
+
+  gboolean destroyed;
 };
 
 G_DEFINE_TYPE (GrdVncPipeWireStream, grd_vnc_pipewire_stream,
@@ -183,10 +193,13 @@ on_stream_param_changed (void                 *user_data,
                          const struct spa_pod *format)
 {
   GrdVncPipeWireStream *stream = GRD_VNC_PIPEWIRE_STREAM (user_data);
+  GrdSession *session = GRD_SESSION (stream->session);
+  GrdContext *context = grd_session_get_context (session);
   uint8_t params_buffer[1024];
   struct spa_pod_builder pod_builder;
   int width;
   int height;
+  enum spa_data_type allowed_buffer_types;
   const struct spa_pod *params[3];
 
   if (!format || id != SPA_PARAM_Format)
@@ -201,10 +214,15 @@ on_stream_param_changed (void                 *user_data,
 
   grd_session_vnc_queue_resize_framebuffer (stream->session, width, height);
 
+  allowed_buffer_types = 1 << SPA_DATA_MemFd;
+  if (grd_context_get_egl_thread (context))
+    allowed_buffer_types |= 1 << SPA_DATA_DmaBuf;
+
   params[0] = spa_pod_builder_add_object (
     &pod_builder,
     SPA_TYPE_OBJECT_ParamBuffers, SPA_PARAM_Buffers,
     SPA_PARAM_BUFFERS_buffers, SPA_POD_CHOICE_RANGE_Int (8, 1, 8),
+    SPA_PARAM_BUFFERS_dataType, SPA_POD_Int (allowed_buffer_types),
     0);
 
   params[1] = spa_pod_builder_add_object (
@@ -228,17 +246,29 @@ on_stream_param_changed (void                 *user_data,
 }
 
 static GrdVncFrame *
-grd_vnc_frame_new (void)
+grd_vnc_frame_new (GrdVncPipeWireStream     *stream,
+                   GrdVncFrameReadyCallback  callback,
+                   gpointer                  callback_user_data)
 {
   GrdVncFrame *frame;
 
   frame = g_new0 (GrdVncFrame, 1);
 
   g_atomic_ref_count_init (&frame->refcount);
+  frame->stream = stream;
+  frame->callback = callback;
+  frame->callback_user_data = callback_user_data;
 
   return frame;
 }
 
+static GrdVncFrame *
+grd_vnc_frame_ref (GrdVncFrame *frame)
+{
+  g_atomic_ref_count_inc (&frame->refcount);
+  return frame;
+}
+
 static void
 grd_vnc_frame_unref (GrdVncFrame *frame)
 {
@@ -298,95 +328,91 @@ do_render (gpointer user_data)
 }
 
 static void
-process_buffer (GrdVncPipeWireStream     *stream,
-                struct spa_buffer        *buffer,
-                GrdVncFrameReadyCallback  callback,
-                gpointer                  user_data)
+copy_frame_data (GrdVncFrame *frame,
+                 uint8_t     *src_data,
+                 int          width,
+                 int          height,
+                 int          dst_stride,
+                 int          src_stride,
+                 int          bpp)
 {
-  size_t size;
-  uint8_t *map;
-  void *src_data;
-  struct spa_meta_cursor *spa_meta_cursor;
-  g_autoptr (GrdVncFrame) frame = NULL;
+  int y;
 
-  frame = grd_vnc_frame_new ();
-
-  if (buffer->datas[0].chunk->size == 0)
+  frame->data = g_malloc (height * dst_stride);
+  for (y = 0; y < height; y++)
     {
-      map = NULL;
-      src_data = NULL;
+      memcpy (((uint8_t *) frame->data) + y * dst_stride,
+              ((uint8_t *) src_data) + y * src_stride,
+              width * bpp);
     }
-  else if (buffer->datas[0].type == SPA_DATA_MemFd)
+}
+
+static struct
+{
+  enum spa_video_format spa_format;
+  uint32_t drm_format;
+  int bpp;
+} format_table[] = {
+      { SPA_VIDEO_FORMAT_ARGB, DRM_FORMAT_BGRA8888, 4 },
+      { SPA_VIDEO_FORMAT_BGRA, DRM_FORMAT_ARGB8888, 4 },
+      { SPA_VIDEO_FORMAT_xRGB, DRM_FORMAT_BGRX8888, 4 },
+      { SPA_VIDEO_FORMAT_BGRx, DRM_FORMAT_XRGB8888, 4 },
+};
+
+static void
+get_spa_format_details (enum spa_video_format  spa_format,
+                        uint32_t              *drm_format,
+                        int                   *bpp)
+{
+  int i;
+
+  for (i = 0; i < G_N_ELEMENTS (format_table); i++)
     {
-      size = buffer->datas[0].maxsize + buffer->datas[0].mapoffset;
-      map = mmap (NULL, size, PROT_READ, MAP_PRIVATE, buffer->datas[0].fd, 0);
-      if (map == MAP_FAILED)
+      if (format_table[i].spa_format == spa_format)
         {
-          g_warning ("Failed to mmap buffer: %s", g_strerror (errno));
-          callback (stream, NULL, user_data);
+          *drm_format = format_table[i].drm_format;
+          *bpp = format_table[i].bpp;
           return;
         }
-      src_data = SPA_MEMBER (map, buffer->datas[0].mapoffset, uint8_t);
     }
-  else if (buffer->datas[0].type == SPA_DATA_DmaBuf)
-    {
-      int fd;
-
-      fd = buffer->datas[0].fd;
-      size = buffer->datas[0].maxsize + buffer->datas[0].mapoffset;
 
-      map = mmap (NULL, size, PROT_READ, MAP_PRIVATE, fd, 0);
-      if (map == MAP_FAILED)
-        {
-          g_warning ("Failed to mmap DMA buffer: %s", g_strerror (errno));
-          callback (stream, NULL, user_data);
-          return;
-        }
-      grd_sync_dma_buf (fd, DMA_BUF_SYNC_START);
+  g_assert_not_reached ();
+}
 
-      src_data = SPA_MEMBER (map, buffer->datas[0].mapoffset, uint8_t);
-    }
-  else if (buffer->datas[0].type == SPA_DATA_MemPtr)
-    {
-      size = buffer->datas[0].maxsize + buffer->datas[0].mapoffset;
-      map = NULL;
-      src_data = buffer->datas[0].data;
-    }
-  else
-    {
-      callback (stream, NULL, user_data);
-      return;
-    }
+static void
+on_dma_buf_downloaded (gboolean success,
+                       gpointer user_data)
+{
+  GrdVncFrame *frame = user_data;
 
-  if (src_data)
-    {
-      int src_stride;
-      int dst_stride;
-      int width;
-      int height;
-      int y;
+  frame->callback (frame->stream,
+                   frame,
+                   success,
+                   frame->callback_user_data);
+}
 
-      height = stream->spa_format.size.height;
-      width = stream->spa_format.size.width;
-      src_stride = buffer->datas[0].chunk->stride;
-      dst_stride = grd_session_vnc_get_stride_for_width (stream->session,
-                                                         width);
+static void
+process_buffer (GrdVncPipeWireStream     *stream,
+                struct spa_buffer        *buffer,
+                GrdVncFrameReadyCallback  callback,
+                gpointer                  user_data)
+{
+  int dst_stride;
+  uint32_t drm_format;
+  int bpp;
+  int width;
+  int height;
+  struct spa_meta_cursor *spa_meta_cursor;
+  g_autoptr (GrdVncFrame) frame = NULL;
 
-      frame->data = g_malloc (height * dst_stride);
-      for (y = 0; y < height; y++)
-        {
-          memcpy (((uint8_t *) frame->data) + y * dst_stride,
-                  ((uint8_t *) src_data) + y * src_stride,
-                  width * 4);
-        }
-    }
+  height = stream->spa_format.size.height;
+  width = stream->spa_format.size.width;
+  dst_stride = grd_session_vnc_get_stride_for_width (stream->session,
+                                                     width);
+  get_spa_format_details (stream->spa_format.format,
+                          &drm_format, &bpp);
 
-  if (map)
-    {
-      if (buffer->datas[0].type == SPA_DATA_DmaBuf)
-        grd_sync_dma_buf (buffer->datas[0].fd, DMA_BUF_SYNC_END);
-      munmap (map, size);
-    }
+  frame = grd_vnc_frame_new (stream, callback, user_data);
 
   spa_meta_cursor = spa_buffer_find_meta_data (buffer, SPA_META_Cursor,
                                                sizeof *spa_meta_cursor);
@@ -436,7 +462,105 @@ process_buffer (GrdVncPipeWireStream     *stream,
       frame->cursor_y = spa_meta_cursor->position.y;
     }
 
-  callback (stream, g_steal_pointer (&frame), user_data);
+  if (buffer->datas[0].chunk->size == 0)
+    {
+      callback (stream, g_steal_pointer (&frame), TRUE, user_data);
+    }
+  else if (buffer->datas[0].type == SPA_DATA_MemFd)
+    {
+      size_t size;
+      uint8_t *map;
+      int src_stride;
+      uint8_t *src_data;
+
+      size = buffer->datas[0].maxsize + buffer->datas[0].mapoffset;
+      map = mmap (NULL, size, PROT_READ, MAP_PRIVATE, buffer->datas[0].fd, 0);
+      if (map == MAP_FAILED)
+        {
+          g_warning ("Failed to mmap buffer: %s", g_strerror (errno));
+          callback (stream, NULL, FALSE, user_data);
+          return;
+        }
+
+      frame->data = g_malloc0 (height * dst_stride);
+      src_data = SPA_MEMBER (map, buffer->datas[0].mapoffset, uint8_t);
+      src_stride = buffer->datas[0].chunk->stride;
+      copy_frame_data (frame, src_data,
+                       width, height,
+                       dst_stride, src_stride,
+                       bpp);
+
+      munmap (map, size);
+
+      callback (stream, g_steal_pointer (&frame), TRUE, user_data);
+    }
+  else if (buffer->datas[0].type == SPA_DATA_DmaBuf)
+    {
+      GrdSession *session = GRD_SESSION (stream->session);
+      GrdContext *context = grd_session_get_context (session);
+      GrdEglThread *egl_thread = grd_context_get_egl_thread (context);
+      int row_width;
+      int *fds;
+      uint32_t *offsets;
+      uint32_t *strides;
+      uint64_t *modifiers;
+      uint32_t n_planes;
+      unsigned int i;
+      uint8_t *dst_data;
+
+      row_width = dst_stride / bpp;
+
+      n_planes = buffer->n_datas;
+      fds = g_alloca (sizeof (int) * n_planes);
+      offsets = g_alloca (sizeof (uint32_t) * n_planes);
+      strides = g_alloca (sizeof (uint32_t) * n_planes);
+      modifiers = g_alloca (sizeof (uint64_t) * n_planes);
+
+      for (i = 0; i < n_planes; i++)
+        {
+          fds[i] = buffer->datas[i].fd;
+          offsets[i] = buffer->datas[i].chunk->offset;
+          strides[i] = buffer->datas[i].chunk->stride;
+          modifiers[i] = stream->spa_format.modifier;
+        }
+      dst_data = g_malloc0 (height * dst_stride);
+
+      frame->data = dst_data;
+      grd_egl_thread_download (egl_thread,
+                               dst_data,
+                               row_width,
+                               drm_format,
+                               width, height,
+                               n_planes,
+                               fds,
+                               strides,
+                               offsets,
+                               modifiers,
+                               on_dma_buf_downloaded,
+                               grd_vnc_frame_ref (g_steal_pointer (&frame)),
+                               (GDestroyNotify) grd_vnc_frame_unref);
+    }
+  else if (buffer->datas[0].type == SPA_DATA_MemPtr)
+    {
+      uint8_t *src_data;
+      int src_stride;
+
+      src_data = buffer->datas[0].data;
+
+      frame->data = g_malloc0 (height * dst_stride);
+
+      src_stride = buffer->datas[0].chunk->stride;
+      copy_frame_data (frame, src_data,
+                       width, height,
+                       dst_stride, src_stride,
+                       bpp);
+
+      callback (stream, g_steal_pointer (&frame), TRUE, user_data);
+    }
+  else
+    {
+      callback (stream, NULL, FALSE, user_data);
+    }
 }
 
 static gboolean
@@ -457,6 +581,7 @@ static GSourceFuncs pending_frame_source_funcs =
 static void
 on_frame_ready (GrdVncPipeWireStream *stream,
                 GrdVncFrame          *frame,
+                gboolean              success,
                 gpointer              user_data)
 {
   GrdVncFrame *pending_frame;
@@ -464,6 +589,9 @@ on_frame_ready (GrdVncPipeWireStream *stream,
 
   g_assert (frame);
 
+  if (!success)
+    goto out;
+
   g_mutex_lock (&stream->frame_mutex);
 
   pending_frame = g_steal_pointer (&stream->pending_frame);
@@ -482,23 +610,15 @@ on_frame_ready (GrdVncPipeWireStream *stream,
 
       grd_vnc_frame_unref (pending_frame);
     }
-  if (!stream->pending_frame_source)
-    {
-      GSource *source;
-
-      source = g_source_new (&pending_frame_source_funcs, sizeof (GSource));
-      stream->pending_frame_source = source;
-      g_source_set_callback (source, do_render, stream, NULL);
-      g_source_attach (source, NULL);
-      g_source_unref (source);
-    }
-  g_source_set_ready_time (stream->pending_frame_source, 0);
 
   stream->pending_frame = frame;
 
   g_mutex_unlock (&stream->frame_mutex);
 
+out:
   pw_stream_queue_buffer (stream->pipewire_stream, buffer);
+
+  g_source_set_ready_time (stream->pending_frame_source, 0);
 }
 
 static void
@@ -508,6 +628,9 @@ on_stream_process (void *user_data)
   struct pw_buffer *next_buffer;
   struct pw_buffer *buffer = NULL;
 
+  if (stream->destroyed)
+    return;
+
   next_buffer = pw_stream_dequeue_buffer (stream->pipewire_stream);
   while (next_buffer)
     {
@@ -619,6 +742,7 @@ grd_vnc_pipewire_stream_new (GrdSessionVnc  *session_vnc,
 {
   g_autoptr (GrdVncPipeWireStream) stream = NULL;
   GrdPipeWireSource *pipewire_source;
+  GSource *source;
 
   grd_maybe_initialize_pipewire ();
 
@@ -652,6 +776,12 @@ grd_vnc_pipewire_stream_new (GrdSessionVnc  *session_vnc,
       return NULL;
     }
 
+  source = g_source_new (&pending_frame_source_funcs, sizeof (GSource));
+  stream->pending_frame_source = source;
+  g_source_set_callback (source, do_render, stream, NULL);
+  g_source_attach (source, NULL);
+  g_source_unref (source);
+
   pw_core_add_listener (stream->pipewire_core,
                         &stream->pipewire_core_listener,
                         &core_events,
@@ -663,10 +793,56 @@ grd_vnc_pipewire_stream_new (GrdSessionVnc  *session_vnc,
   return g_steal_pointer (&stream);
 }
 
+typedef struct
+{
+  GMutex mutex;
+  GCond cond;
+  gboolean done;
+} SyncData;
+
+static void
+on_sync_done (gboolean success,
+              gpointer user_data)
+{
+  SyncData *sync_data = user_data;
+
+  g_mutex_lock (&sync_data->mutex);
+  sync_data->done = TRUE;
+  g_cond_signal (&sync_data->cond);
+  g_mutex_unlock (&sync_data->mutex);
+}
+
 static void
 grd_vnc_pipewire_stream_finalize (GObject *object)
 {
   GrdVncPipeWireStream *stream = GRD_VNC_PIPEWIRE_STREAM (object);
+  GrdSession *session = GRD_SESSION (stream->session);
+  GrdContext *context = grd_session_get_context (session);
+  GrdEglThread *egl_thread;
+
+  stream->destroyed = TRUE;
+
+  if (stream->pipewire_stream)
+    pw_stream_flush (stream->pipewire_stream, false);
+
+  egl_thread = grd_context_get_egl_thread (context);
+  if (egl_thread)
+    {
+      SyncData sync_data = {};
+
+      g_mutex_init (&sync_data.mutex);
+      g_cond_init (&sync_data.cond);
+
+      grd_egl_thread_sync (egl_thread, on_sync_done, &sync_data, NULL);
+
+      g_mutex_lock (&sync_data.mutex);
+      while (!sync_data.done)
+        g_cond_wait (&sync_data.cond, &sync_data.mutex);
+      g_mutex_unlock (&sync_data.mutex);
+
+      g_cond_clear (&sync_data.cond);
+      g_mutex_clear (&sync_data.mutex);
+    }
 
   /*
    * We can't clear stream->pipewire_stream before destroying it, as the data
diff --git a/src/meson.build b/src/meson.build
index a65a62b..fb98d22 100644
--- a/src/meson.build
+++ b/src/meson.build
@@ -9,6 +9,7 @@ deps = [
   libsecret_dep,
   libnotify_dep,
   epoxy_dep,
+  drm_dep,
 ]
 
 daemon_sources = files([


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]