[rygel] core: Move flow control to HTTPGstSink
- From: Zeeshan Ali Khattak <zeeshanak src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [rygel] core: Move flow control to HTTPGstSink
- Date: Sat, 26 Mar 2011 14:06:35 +0000 (UTC)
commit 164db38ab98a0ba51d95d54d143ae56496d16048
Author: Zeeshan Ali (Khattak) <zeeshanak gnome org>
Date: Tue Mar 22 23:54:10 2011 +0200
core: Move flow control to HTTPGstSink
This is not just a move and port of the code from GstResponse; we now
don't achieve synchronization by (un)pausing the gstreamer pipeline but
rather by the sink blocking the pipeline thread through waiting on a
GLib.Cond.
src/rygel/rygel-http-gst-response.vala | 50 ++---------------------
src/rygel/rygel-http-gst-sink.vala | 68 ++++++++++++++++++++++++++++++-
2 files changed, 69 insertions(+), 49 deletions(-)
---
diff --git a/src/rygel/rygel-http-gst-response.vala b/src/rygel/rygel-http-gst-response.vala
index 07f149a..8c494da 100644
--- a/src/rygel/rygel-http-gst-response.vala
+++ b/src/rygel/rygel-http-gst-response.vala
@@ -25,17 +25,10 @@
using Gst;
internal class Rygel.HTTPGstResponse : Rygel.HTTPResponse {
- // High and low threshold for number of buffered chunks
- private const uint MAX_BUFFERED_CHUNKS = 32;
- private const uint MIN_BUFFERED_CHUNKS = 4;
-
private Pipeline pipeline;
private HTTPSeek seek;
- private int64 buffered;
- private bool out_of_sync;
-
public HTTPGstResponse (HTTPGet request,
HTTPGetHandler request_handler,
Element? gst_src = null) throws Error {
@@ -54,14 +47,9 @@ internal class Rygel.HTTPGstResponse : Rygel.HTTPResponse {
this.prepare_pipeline ("RygelHTTPGstResponse", src);
this.seek = request.seek;
-
- this.buffered = 0;
- this.out_of_sync = false;
}
public override async void run () {
- this.msg.wrote_chunk.connect (this.on_wrote_chunk);
-
// Only bother attempting to seek if the offset is greater than zero.
if (this.seek != null && this.seek.start > 0) {
this.pipeline.set_state (State.PAUSED);
@@ -75,8 +63,10 @@ internal class Rygel.HTTPGstResponse : Rygel.HTTPResponse {
}
public override void end (bool aborted, uint status) {
+ var sink = this.pipeline.get_by_name (HTTPGstSink.NAME) as HTTPGstSink;
+ sink.cancellable.cancel ();
+
this.pipeline.set_state (State.NULL);
- this.msg.wrote_chunk.disconnect (this.on_wrote_chunk);
if (!aborted) {
this.msg.response_body.complete ();
@@ -88,9 +78,7 @@ internal class Rygel.HTTPGstResponse : Rygel.HTTPResponse {
private void prepare_pipeline (string name,
Element src) throws Error {
- var sink = new HTTPGstSink ();
-
- sink.handoff.connect (this.on_new_buffer);
+ var sink = new HTTPGstSink (this);
this.pipeline = new Pipeline (name);
assert (this.pipeline != null);
@@ -150,36 +138,6 @@ internal class Rygel.HTTPGstResponse : Rygel.HTTPResponse {
}
}
- private void on_new_buffer (Element sink,
- Buffer buffer,
- Pad pad) {
- Idle.add_full (this.priority, () => {
- if (this.cancellable.is_cancelled ()) {
- return false;
- }
-
- this.push_data (buffer.data);
- this.buffered++;
-
- if (this.buffered > MAX_BUFFERED_CHUNKS) {
- // Client is either not reading (Paused) or not fast enough
- this.pipeline.set_state (State.PAUSED);
- this.out_of_sync = true;
- }
-
- return false;
- });
- }
-
- private void on_wrote_chunk (Soup.Message msg) {
- this.buffered--;
-
- if (out_of_sync && this.buffered < MIN_BUFFERED_CHUNKS) {
- this.pipeline.set_state (State.PLAYING);
- this.out_of_sync = false;
- }
- }
-
private bool bus_handler (Gst.Bus bus,
Gst.Message message) {
bool ret = true;
diff --git a/src/rygel/rygel-http-gst-sink.vala b/src/rygel/rygel-http-gst-sink.vala
index 5e8ac0b..b92c5d7 100644
--- a/src/rygel/rygel-http-gst-sink.vala
+++ b/src/rygel/rygel-http-gst-sink.vala
@@ -26,8 +26,19 @@ using Gst;
internal class Rygel.HTTPGstSink : BaseSink {
public const string NAME = "http-gst-sink";
public const string PAD_NAME = "sink";
+ // High and low threshold for number of buffered chunks
+ private const uint MAX_BUFFERED_CHUNKS = 32;
+ private const uint MIN_BUFFERED_CHUNKS = 4;
- public signal void handoff (Buffer buffer, Pad pad);
+ public Cancellable cancellable;
+
+ private unowned HTTPGstResponse response;
+ private int priority;
+
+ private int64 buffered;
+
+ private Mutex buffer_mutex;
+ private Cond buffer_condition;
static construct {
var caps = new Caps.any ();
@@ -38,9 +49,24 @@ internal class Rygel.HTTPGstSink : BaseSink {
add_pad_template (template);
}
- public HTTPGstSink () {
+ public HTTPGstSink (HTTPGstResponse response) {
+ this.buffered = 0;
+ this.buffer_mutex = new Mutex ();
+ this.buffer_condition = new Cond ();
+
+ this.cancellable = new Cancellable ();
+ this.priority = response.priority;
+ this.response = response;
+
this.sync = false;
this.name = NAME;
+
+ this.cancellable.cancelled.connect (this.on_cancelled);
+ response.msg.wrote_chunk.connect (this.on_wrote_chunk);
+ }
+
+ ~HTTPGstSink () {
+ this.response.msg.wrote_chunk.disconnect (this.on_wrote_chunk);
}
public override FlowReturn preroll (Buffer buffer) {
@@ -48,9 +74,45 @@ internal class Rygel.HTTPGstSink : BaseSink {
}
public override FlowReturn render (Buffer buffer) {
- this.handoff (buffer, this.sinkpad);
+ this.buffer_mutex.lock ();
+ while (!this.cancellable.is_cancelled () &&
+ this.buffered > MAX_BUFFERED_CHUNKS) {
+ // Client is either not reading (Paused) or not fast enough
+ this.buffer_condition.wait (this.buffer_mutex);
+ }
+ this.buffer_mutex.unlock ();
+
+ if (this.cancellable.is_cancelled ()) {
+ return FlowReturn.OK;
+ }
+
+ Idle.add_full (this.priority, () => {
+ if (this.cancellable.is_cancelled ()) {
+ return false;
+ }
+
+ this.response.push_data (buffer.data);
+ this.buffered++;
+ return false;
+ });
return FlowReturn.OK;
}
+
+ private void on_wrote_chunk (Soup.Message msg) {
+ this.buffer_mutex.lock ();
+ this.buffered--;
+
+ if (this.buffered < MIN_BUFFERED_CHUNKS) {
+ this.buffer_condition.broadcast ();
+ }
+ this.buffer_mutex.unlock ();
+ }
+
+ private void on_cancelled () {
+ this.buffer_mutex.lock ();
+ this.buffer_condition.broadcast ();
+ this.buffer_mutex.unlock ();
+ }
}
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]