[rygel] core: Move flow control to HTTPGstSink



commit 164db38ab98a0ba51d95d54d143ae56496d16048
Author: Zeeshan Ali (Khattak) <zeeshanak gnome org>
Date:   Tue Mar 22 23:54:10 2011 +0200

    core: Move flow control to HTTPGstSink
    
    This is not just a move and port of the code from GstResponse; we now
    don't achieve synchronization by (un)pausing the gstreamer pipeline but
    rather by the sink blocking the pipeline thread through waiting on a
    GLib.Cond.

 src/rygel/rygel-http-gst-response.vala |   50 ++---------------------
 src/rygel/rygel-http-gst-sink.vala     |   68 ++++++++++++++++++++++++++++++-
 2 files changed, 69 insertions(+), 49 deletions(-)
---
diff --git a/src/rygel/rygel-http-gst-response.vala b/src/rygel/rygel-http-gst-response.vala
index 07f149a..8c494da 100644
--- a/src/rygel/rygel-http-gst-response.vala
+++ b/src/rygel/rygel-http-gst-response.vala
@@ -25,17 +25,10 @@
 using Gst;
 
 internal class Rygel.HTTPGstResponse : Rygel.HTTPResponse {
-    // High and low threshold for number of buffered chunks
-    private const uint MAX_BUFFERED_CHUNKS = 32;
-    private const uint MIN_BUFFERED_CHUNKS = 4;
-
     private Pipeline pipeline;
 
     private HTTPSeek seek;
 
-    private int64 buffered;
-    private bool out_of_sync;
-
     public HTTPGstResponse (HTTPGet        request,
                             HTTPGetHandler request_handler,
                             Element?       gst_src = null) throws Error {
@@ -54,14 +47,9 @@ internal class Rygel.HTTPGstResponse : Rygel.HTTPResponse {
 
         this.prepare_pipeline ("RygelHTTPGstResponse", src);
         this.seek = request.seek;
-
-        this.buffered = 0;
-        this.out_of_sync = false;
     }
 
     public override async void run () {
-        this.msg.wrote_chunk.connect (this.on_wrote_chunk);
-
         // Only bother attempting to seek if the offset is greater than zero.
         if (this.seek != null && this.seek.start > 0) {
             this.pipeline.set_state (State.PAUSED);
@@ -75,8 +63,10 @@ internal class Rygel.HTTPGstResponse : Rygel.HTTPResponse {
     }
 
     public override void end (bool aborted, uint status) {
+        var sink = this.pipeline.get_by_name (HTTPGstSink.NAME) as HTTPGstSink;
+        sink.cancellable.cancel ();
+
         this.pipeline.set_state (State.NULL);
-        this.msg.wrote_chunk.disconnect (this.on_wrote_chunk);
 
         if (!aborted) {
             this.msg.response_body.complete ();
@@ -88,9 +78,7 @@ internal class Rygel.HTTPGstResponse : Rygel.HTTPResponse {
 
     private void prepare_pipeline (string name,
                                    Element src) throws Error {
-        var sink = new HTTPGstSink ();
-
-        sink.handoff.connect (this.on_new_buffer);
+        var sink = new HTTPGstSink (this);
 
         this.pipeline = new Pipeline (name);
         assert (this.pipeline != null);
@@ -150,36 +138,6 @@ internal class Rygel.HTTPGstResponse : Rygel.HTTPResponse {
         }
     }
 
-    private void on_new_buffer (Element sink,
-                                Buffer  buffer,
-                                Pad     pad) {
-        Idle.add_full (this.priority, () => {
-            if (this.cancellable.is_cancelled ()) {
-                return false;
-            }
-
-            this.push_data (buffer.data);
-            this.buffered++;
-
-            if (this.buffered > MAX_BUFFERED_CHUNKS) {
-                // Client is either not reading (Paused) or not fast enough
-                this.pipeline.set_state (State.PAUSED);
-                this.out_of_sync = true;
-            }
-
-            return false;
-        });
-    }
-
-    private void on_wrote_chunk (Soup.Message msg) {
-        this.buffered--;
-
-        if (out_of_sync && this.buffered < MIN_BUFFERED_CHUNKS) {
-            this.pipeline.set_state (State.PLAYING);
-            this.out_of_sync = false;
-        }
-    }
-
     private bool bus_handler (Gst.Bus     bus,
                               Gst.Message message) {
         bool ret = true;
diff --git a/src/rygel/rygel-http-gst-sink.vala b/src/rygel/rygel-http-gst-sink.vala
index 5e8ac0b..b92c5d7 100644
--- a/src/rygel/rygel-http-gst-sink.vala
+++ b/src/rygel/rygel-http-gst-sink.vala
@@ -26,8 +26,19 @@ using Gst;
 internal class Rygel.HTTPGstSink : BaseSink {
     public const string NAME = "http-gst-sink";
     public const string PAD_NAME = "sink";
+    // High and low threshold for number of buffered chunks
+    private const uint MAX_BUFFERED_CHUNKS = 32;
+    private const uint MIN_BUFFERED_CHUNKS = 4;
 
-    public signal void handoff (Buffer buffer, Pad pad);
+    public Cancellable cancellable;
+
+    private unowned HTTPGstResponse response;
+    private int priority;
+
+    private int64 buffered;
+
+    private Mutex buffer_mutex;
+    private Cond buffer_condition;
 
     static construct {
         var caps = new Caps.any ();
@@ -38,9 +49,24 @@ internal class Rygel.HTTPGstSink : BaseSink {
         add_pad_template (template);
     }
 
-    public HTTPGstSink () {
+    public HTTPGstSink (HTTPGstResponse response) {
+        this.buffered = 0;
+        this.buffer_mutex = new Mutex ();
+        this.buffer_condition = new Cond ();
+
+        this.cancellable = new Cancellable ();
+        this.priority = response.priority;
+        this.response = response;
+
         this.sync = false;
         this.name = NAME;
+
+        this.cancellable.cancelled.connect (this.on_cancelled);
+        response.msg.wrote_chunk.connect (this.on_wrote_chunk);
+    }
+
+    ~HTTPGstSink () {
+        this.response.msg.wrote_chunk.disconnect (this.on_wrote_chunk);
     }
 
     public override FlowReturn preroll (Buffer buffer) {
@@ -48,9 +74,45 @@ internal class Rygel.HTTPGstSink : BaseSink {
     }
 
     public override FlowReturn render (Buffer buffer) {
-        this.handoff (buffer, this.sinkpad);
+        this.buffer_mutex.lock ();
+        while (!this.cancellable.is_cancelled () &&
+               this.buffered > MAX_BUFFERED_CHUNKS) {
+            // Client is either not reading (Paused) or not fast enough
+            this.buffer_condition.wait (this.buffer_mutex);
+        }
+        this.buffer_mutex.unlock ();
+
+        if (this.cancellable.is_cancelled ()) {
+            return FlowReturn.OK;
+        }
+
+        Idle.add_full (this.priority, () => {
+            if (this.cancellable.is_cancelled ()) {
+                return false;
+            }
+
+            this.response.push_data (buffer.data);
+            this.buffered++;
+            return false;
+        });
 
         return FlowReturn.OK;
     }
+
+    private void on_wrote_chunk (Soup.Message msg) {
+        this.buffer_mutex.lock ();
+        this.buffered--;
+
+        if (this.buffered < MIN_BUFFERED_CHUNKS) {
+            this.buffer_condition.broadcast ();
+        }
+        this.buffer_mutex.unlock ();
+    }
+
+    private void on_cancelled () {
+        this.buffer_mutex.lock ();
+        this.buffer_condition.broadcast ();
+        this.buffer_mutex.unlock ();
+    }
 }
 



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]