[gnome-shell] shell-recorder-src: rework queue handling



commit 2015fc97dc3938bceb90feace193c012f9bb2e9d
Author: Wim Taymans <wtaymans redhat com>
Date:   Tue Jun 16 09:03:30 2015 +0200

    shell-recorder-src: rework queue handling
    
    Use our own locking and queue instead of async_queue.
    Implement unlock and unlock_stop to make the create function return
    FLUSHING. This is important to be able to pause the pipeline after some
    error occured in the pipeline.
    Implement start/stop to clear the queue and its state.

 src/shell-recorder-src.c |  139 ++++++++++++++++++++++++++++++++++++----------
 1 files changed, 110 insertions(+), 29 deletions(-)
---
diff --git a/src/shell-recorder-src.c b/src/shell-recorder-src.c
index b2fb41b..58b836c 100644
--- a/src/shell-recorder-src.c
+++ b/src/shell-recorder-src.c
@@ -11,12 +11,15 @@ struct _ShellRecorderSrc
 {
   GstPushSrc parent;
 
-  GMutex mutex_data;
-  GMutex *mutex;
+  GMutex mutex;
 
   GstCaps *caps;
-  GAsyncQueue *queue;
-  gboolean closed;
+  GMutex queue_lock;
+  GCond queue_cond;
+  GQueue *queue;
+
+  gboolean eos;
+  gboolean flushing;
   guint memory_used;
   guint memory_used_update_idle;
 };
@@ -32,9 +35,6 @@ enum {
   PROP_MEMORY_USED
 };
 
-/* Special marker value once the source is closed */
-#define RECORDER_QUEUE_END ((GstBuffer *)1)
-
 #define shell_recorder_src_parent_class parent_class
 G_DEFINE_TYPE(ShellRecorderSrc, shell_recorder_src, GST_TYPE_PUSH_SRC);
 
@@ -44,9 +44,10 @@ shell_recorder_src_init (ShellRecorderSrc      *src)
   gst_base_src_set_format (GST_BASE_SRC (src), GST_FORMAT_TIME);
   gst_base_src_set_live (GST_BASE_SRC (src), TRUE);
 
-  src->queue = g_async_queue_new ();
-  src->mutex = &src->mutex_data;
-  g_mutex_init (src->mutex);
+  src->queue = g_queue_new ();
+  g_mutex_init (&src->mutex);
+  g_mutex_init (&src->queue_lock);
+  g_cond_init (&src->queue_cond);
 }
 
 static gboolean
@@ -54,9 +55,9 @@ shell_recorder_src_memory_used_update_idle (gpointer data)
 {
   ShellRecorderSrc *src = data;
 
-  g_mutex_lock (src->mutex);
+  g_mutex_lock (&src->mutex);
   src->memory_used_update_idle = 0;
-  g_mutex_unlock (src->mutex);
+  g_mutex_unlock (&src->mutex);
 
   g_object_notify (G_OBJECT (src), "memory-used");
 
@@ -70,14 +71,14 @@ static void
 shell_recorder_src_update_memory_used (ShellRecorderSrc *src,
                                       int               delta)
 {
-  g_mutex_lock (src->mutex);
+  g_mutex_lock (&src->mutex);
   src->memory_used += delta;
   if (src->memory_used_update_idle == 0)
     {
       src->memory_used_update_idle = g_idle_add (shell_recorder_src_memory_used_update_idle, src);
       g_source_set_name_by_id (src->memory_used_update_idle, "[gnome-shell] 
shell_recorder_src_memory_used_update_idle");
     }
-  g_mutex_unlock (src->mutex);
+  g_mutex_unlock (&src->mutex);
 }
 
 /* _negotiate() is called when we have to decide on a format. We
@@ -94,6 +95,62 @@ shell_recorder_src_negotiate (GstBaseSrc * base_src)
 }
 
 static gboolean
+shell_recorder_src_unlock (GstBaseSrc * base_src)
+{
+  ShellRecorderSrc *src = SHELL_RECORDER_SRC (base_src);
+
+  g_mutex_lock (&src->queue_lock);
+  src->flushing = TRUE;
+  g_cond_signal (&src->queue_cond);
+  g_mutex_unlock (&src->queue_lock);
+
+  return TRUE;
+}
+
+static gboolean
+shell_recorder_src_unlock_stop (GstBaseSrc * base_src)
+{
+  ShellRecorderSrc *src = SHELL_RECORDER_SRC (base_src);
+
+  g_mutex_lock (&src->queue_lock);
+  src->flushing = FALSE;
+  g_cond_signal (&src->queue_cond);
+  g_mutex_unlock (&src->queue_lock);
+
+  return TRUE;
+}
+
+static gboolean
+shell_recorder_src_start (GstBaseSrc * base_src)
+{
+  ShellRecorderSrc *src = SHELL_RECORDER_SRC (base_src);
+
+  g_mutex_lock (&src->queue_lock);
+  src->flushing = FALSE;
+  src->eos = FALSE;
+  g_cond_signal (&src->queue_cond);
+  g_mutex_unlock (&src->queue_lock);
+
+  return TRUE;
+}
+
+static gboolean
+shell_recorder_src_stop (GstBaseSrc * base_src)
+{
+  ShellRecorderSrc *src = SHELL_RECORDER_SRC (base_src);
+
+  g_mutex_lock (&src->queue_lock);
+  src->flushing = TRUE;
+  src->eos = FALSE;
+  g_queue_foreach (src->queue, (GFunc) gst_buffer_unref, NULL);
+  g_queue_clear (src->queue);
+  g_cond_signal (&src->queue_cond);
+  g_mutex_unlock (&src->queue_lock);
+
+  return TRUE;
+}
+
+static gboolean
 shell_recorder_src_send_event (GstElement * element, GstEvent * event)
 {
   ShellRecorderSrc *src = SHELL_RECORDER_SRC (element);
@@ -123,17 +180,29 @@ shell_recorder_src_create (GstPushSrc  *push_src,
   ShellRecorderSrc *src = SHELL_RECORDER_SRC (push_src);
   GstBuffer *buffer;
 
-  if (src->closed)
-    return GST_FLOW_EOS;
+  g_mutex_lock (&src->queue_lock);
+  while (TRUE) {
+    /* int the flushing state we just return FLUSHING */
+    if (src->flushing) {
+      g_mutex_unlock (&src->queue_lock);
+      return GST_FLOW_FLUSHING;
+    }
 
-  buffer = g_async_queue_pop (src->queue);
+    buffer = g_queue_pop_head (src->queue);
 
-  if (buffer == RECORDER_QUEUE_END)
-    {
-      /* Returning UNEXPECTED here will cause a EOS message to be sent */
-      src->closed = TRUE;
+    /* we have a buffer, exit the loop to handle it */
+    if (buffer != NULL)
+      break;
+
+    /* no buffer, check EOS */
+    if (src->eos) {
+      g_mutex_unlock (&src->queue_lock);
       return GST_FLOW_EOS;
     }
+    /* wait for something to happen and try again */
+    g_cond_wait (&src->queue_cond, &src->queue_lock);
+  }
+  g_mutex_unlock (&src->queue_lock);
 
   shell_recorder_src_update_memory_used (src,
                                         - (int)(gst_buffer_get_size(buffer) / 1024));
@@ -176,9 +245,11 @@ shell_recorder_src_finalize (GObject *object)
     g_source_remove (src->memory_used_update_idle);
 
   shell_recorder_src_set_caps (src, NULL);
-  g_async_queue_unref (src->queue);
+  g_queue_free_full (src->queue, (GDestroyNotify) gst_buffer_unref);
 
-  g_mutex_clear (src->mutex);
+  g_mutex_clear (&src->mutex);
+  g_mutex_clear (&src->queue_lock);
+  g_cond_clear (&src->queue_cond);
 
   G_OBJECT_CLASS (shell_recorder_src_parent_class)->finalize (object);
 }
@@ -216,9 +287,9 @@ shell_recorder_src_get_property (GObject         *object,
       gst_value_set_caps (value, src->caps);
       break;
     case PROP_MEMORY_USED:
-      g_mutex_lock (src->mutex);
+      g_mutex_lock (&src->mutex);
       g_value_set_uint (value, src->memory_used);
-      g_mutex_unlock (src->mutex);
+      g_mutex_unlock (&src->mutex);
       break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
@@ -270,6 +341,10 @@ shell_recorder_src_class_init (ShellRecorderSrcClass *klass)
   element_class->send_event = shell_recorder_src_send_event;
 
   base_src_class->negotiate = shell_recorder_src_negotiate;
+  base_src_class->unlock = shell_recorder_src_unlock;
+  base_src_class->unlock_stop = shell_recorder_src_unlock_stop;
+  base_src_class->start = shell_recorder_src_start;
+  base_src_class->stop = shell_recorder_src_stop;
 
   push_src_class->create = shell_recorder_src_create;
 }
@@ -292,7 +367,10 @@ shell_recorder_src_add_buffer (ShellRecorderSrc *src,
   shell_recorder_src_update_memory_used (src,
                                         (int)(gst_buffer_get_size(buffer) / 1024));
 
-  g_async_queue_push (src->queue, gst_buffer_ref (buffer));
+  g_mutex_lock (&src->queue_lock);
+  g_queue_push_tail (src->queue, gst_buffer_ref (buffer));
+  g_cond_signal (&src->queue_cond);
+  g_mutex_unlock (&src->queue_lock);
 }
 
 /**
@@ -305,10 +383,13 @@ void
 shell_recorder_src_close (ShellRecorderSrc *src)
 {
   /* We can't send a message to the source immediately or buffers that haven't
-   * been pushed yet will be discarded. Instead stick a marker onto our own
-   * queue to send an event once everything has been pushed.
+   * been pushed yet will be discarded. Instead mark ourselves EOS, which will
+   * make us send an event once everything has been pushed.
    */
-  g_async_queue_push (src->queue, RECORDER_QUEUE_END);
+  g_mutex_lock (&src->queue_lock);
+  src->eos = TRUE;
+  g_cond_signal (&src->queue_cond);
+  g_mutex_unlock (&src->queue_lock);
 }
 
 static gboolean


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]