[gnome-remote-desktop] rdp/pipewire-stream: Make buffer processing a bit more async



commit b45ee6953ec01ea56e50fdfc21c1b262191e7391
Author: Jonas Ådahl <jadahl gmail com>
Date:   Fri Nov 5 16:12:52 2021 +0100

    rdp/pipewire-stream: Make buffer processing a bit more async
    
    Get the frame via a callback instead of as a return value. Will simplify
    future changes when the result may be more async.

 src/grd-rdp-pipewire-stream.c | 78 ++++++++++++++++++++++++++-----------------
 1 file changed, 48 insertions(+), 30 deletions(-)
---
diff --git a/src/grd-rdp-pipewire-stream.c b/src/grd-rdp-pipewire-stream.c
index afb1051..705e280 100644
--- a/src/grd-rdp-pipewire-stream.c
+++ b/src/grd-rdp-pipewire-stream.c
@@ -41,7 +41,13 @@ enum
 
 static guint signals[N_SIGNALS];
 
-typedef struct _GrdRdpFrame
+typedef struct _GrdRdpFrame GrdRdpFrame;
+
+typedef void (* GrdRdpFrameReadyCallback) (GrdRdpPipeWireStream *stream,
+                                           GrdRdpFrame          *frame,
+                                           gpointer              user_data);
+
+struct _GrdRdpFrame
 {
   gatomicrefcount refcount;
 
@@ -56,7 +62,7 @@ typedef struct _GrdRdpFrame
   uint16_t pointer_width;
   uint16_t pointer_height;
   gboolean pointer_is_hidden;
-} GrdRdpFrame;
+};
 
 struct _GrdRdpPipeWireStream
 {
@@ -307,9 +313,11 @@ on_stream_param_changed (void                 *user_data,
                            params, G_N_ELEMENTS (params));
 }
 
-static GrdRdpFrame *
-process_buffer (GrdRdpPipeWireStream *stream,
-                struct spa_buffer    *buffer)
+static void
+process_buffer (GrdRdpPipeWireStream     *stream,
+                struct spa_buffer        *buffer,
+                GrdRdpFrameReadyCallback  callback,
+                gpointer                  user_data)
 {
   size_t size;
   uint8_t *map;
@@ -321,8 +329,8 @@ process_buffer (GrdRdpPipeWireStream *stream,
 
   if (buffer->datas[0].chunk->size == 0)
     {
-      map = NULL;
-      src_data = NULL;
+      callback (stream, g_steal_pointer (&frame), user_data);
+      return;
     }
   else if (buffer->datas[0].type == SPA_DATA_MemFd)
     {
@@ -331,7 +339,8 @@ process_buffer (GrdRdpPipeWireStream *stream,
       if (map == MAP_FAILED)
         {
           g_warning ("Failed to mmap buffer: %s", g_strerror (errno));
-          return NULL;
+          callback (stream, g_steal_pointer (&frame), user_data);
+          return;
         }
       src_data = SPA_MEMBER (map, buffer->datas[0].mapoffset, uint8_t);
     }
@@ -346,7 +355,8 @@ process_buffer (GrdRdpPipeWireStream *stream,
       if (map == MAP_FAILED)
         {
           g_warning ("Failed to mmap DMA buffer: %s", g_strerror (errno));
-          return NULL;
+          callback (stream, g_steal_pointer (&frame), user_data);
+          return;
         }
       grd_sync_dma_buf (fd, DMA_BUF_SYNC_START);
 
@@ -360,7 +370,8 @@ process_buffer (GrdRdpPipeWireStream *stream,
     }
   else
     {
-      return NULL;
+      callback (stream, g_steal_pointer (&frame), user_data);
+      return;
     }
 
   if (src_data)
@@ -433,7 +444,7 @@ process_buffer (GrdRdpPipeWireStream *stream,
         }
     }
 
-  return g_steal_pointer (&frame);
+  callback (stream, g_steal_pointer (&frame), user_data);
 }
 
 static void
@@ -461,28 +472,13 @@ take_pointer_data_from (GrdRdpFrame *src_frame,
 }
 
 static void
-on_stream_process (void *user_data)
+on_frame_ready (GrdRdpPipeWireStream *stream,
+                GrdRdpFrame          *frame,
+                gpointer              user_data)
 {
-  GrdRdpPipeWireStream *stream = GRD_RDP_PIPEWIRE_STREAM (user_data);
-  struct pw_buffer *next_buffer;
-  struct pw_buffer *buffer = NULL;
-  GrdRdpFrame *frame;
+  struct pw_buffer *buffer = user_data;
   GrdRdpFrame *pending_frame;
 
-  next_buffer = pw_stream_dequeue_buffer (stream->pipewire_stream);
-  while (next_buffer)
-    {
-      buffer = next_buffer;
-      next_buffer = pw_stream_dequeue_buffer (stream->pipewire_stream);
-
-      if (next_buffer)
-        pw_stream_queue_buffer (stream->pipewire_stream, buffer);
-    }
-  if (!buffer)
-    return;
-
-  frame = process_buffer (stream, buffer->buffer);
-
   g_assert (frame);
   g_mutex_lock (&stream->frame_mutex);
   pending_frame = g_steal_pointer (&stream->pending_frame);
@@ -503,6 +499,28 @@ on_stream_process (void *user_data)
   g_source_set_ready_time (stream->render_source, 0);
 }
 
+static void
+on_stream_process (void *user_data)
+{
+  GrdRdpPipeWireStream *stream = GRD_RDP_PIPEWIRE_STREAM (user_data);
+  struct pw_buffer *next_buffer;
+  struct pw_buffer *buffer = NULL;
+
+  next_buffer = pw_stream_dequeue_buffer (stream->pipewire_stream);
+  while (next_buffer)
+    {
+      buffer = next_buffer;
+      next_buffer = pw_stream_dequeue_buffer (stream->pipewire_stream);
+
+      if (next_buffer)
+        pw_stream_queue_buffer (stream->pipewire_stream, buffer);
+    }
+  if (!buffer)
+    return;
+
+  process_buffer (stream, buffer->buffer, on_frame_ready, buffer);
+}
+
 static const struct pw_stream_events stream_events = {
   PW_VERSION_STREAM_EVENTS,
   .state_changed = on_stream_state_changed,


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]