[rygel/wip/media-engine: 6/15] server: Introduce DataSource interface



commit b323a918aa7827f45257f1211ae5f163f6cc9f07
Author: Jens Georg <jensg openismus org>
Date:   Fri Aug 31 16:39:07 2012 +0200

    server: Introduce DataSource interface
    
    Port the streaming to using DataSources.

 src/librygel-server/filelist.am                    |    9 +-
 src/librygel-server/rygel-data-sink.vala           |   87 ++++++++
 src/librygel-server/rygel-data-source.vala         |   83 +++++++
 src/librygel-server/rygel-gst-data-source.vala     |  232 ++++++++++++++++++++
 src/librygel-server/rygel-gst-media-engine.vala    |    4 +
 ...ygel-http-gst-sink.vala => rygel-gst-sink.vala} |   60 +++--
 .../rygel-http-identity-handler.vala               |   13 +-
 src/librygel-server/rygel-http-response.vala       |  215 +++----------------
 src/librygel-server/rygel-http-seek.vala           |    6 +-
 .../rygel-http-transcode-handler.vala              |    7 +-
 src/librygel-server/rygel-media-engine.vala        |    8 +
 src/librygel-server/rygel-media-item.vala          |   29 +--
 src/librygel-server/rygel-transcoder.vala          |    2 +-
 .../gst-launch/rygel-gst-launch-audio-item.vala    |    5 +-
 .../gst-launch/rygel-gst-launch-video-item.vala    |    6 +-
 src/plugins/test/rygel-test-audio-item.vala        |    8 +-
 src/plugins/test/rygel-test-video-item.vala        |    8 +-
 tests/Makefile.am                                  |    6 +-
 tests/rygel-data-source.vala                       |    1 +
 tests/rygel-gst-utils.vala                         |    1 -
 tests/rygel-http-gst-sink.vala                     |    1 -
 tests/rygel-http-response-test.vala                |   68 +++++-
 tests/rygel-http-seek_item-creator.vala            |    1 +
 tests/rygel-item-creator-test.vala                 |    4 +
 24 files changed, 601 insertions(+), 263 deletions(-)
---
diff --git a/src/librygel-server/filelist.am b/src/librygel-server/filelist.am
index 46ad6a3..2912d96 100644
--- a/src/librygel-server/filelist.am
+++ b/src/librygel-server/filelist.am
@@ -23,7 +23,8 @@ LIBRYGEL_SERVER_VAPI_SOURCE_FILES = \
 	rygel-visual-item.vala \
 	rygel-writable-container.vala \
 	rygel-media-server.vala \
-	rygel-media-engine.vala
+	rygel-media-engine.vala \
+	rygel-data-source.vala
 
 LIBRYGEL_SERVER_NONVAPI_SOURCE_FILES = \
 	rygel-aac-transcoder.vala \
@@ -37,7 +38,7 @@ LIBRYGEL_SERVER_NONVAPI_SOURCE_FILES = \
 	rygel-http-byte-seek.vala \
 	rygel-http-get-handler.vala \
 	rygel-http-get.vala \
-	rygel-http-gst-sink.vala \
+	rygel-gst-sink.vala \
 	rygel-http-identity-handler.vala \
 	rygel-http-item-uri.vala \
 	rygel-http-post.vala \
@@ -69,7 +70,9 @@ LIBRYGEL_SERVER_NONVAPI_SOURCE_FILES = \
 	rygel-xbmc-hacks.vala \
 	rygel-xbox-hacks.vala \
 	rygel-gst-media-engine.vala \
-	rygel-gst-transcoder.vala
+	rygel-gst-transcoder.vala \
+	rygel-gst-data-source.vala \
+	rygel-data-sink.vala
 
 LIBRYGEL_SERVER_VALAFLAGS_PKG = \
 	--pkg gstreamer-0.10 \
diff --git a/src/librygel-server/rygel-data-sink.vala b/src/librygel-server/rygel-data-sink.vala
new file mode 100644
index 0000000..9ff7d51
--- /dev/null
+++ b/src/librygel-server/rygel-data-sink.vala
@@ -0,0 +1,87 @@
+/*
+ * Copyright (C) 2012 Intel Corporation.
+ *
+ * Author: Jens Georg <jensg openismus com>
+ *
+ * This file is part of Rygel.
+ *
+ * Rygel is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * Rygel is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+using Soup;
+
+/**
+ * Class that converts the push DataSource into the pull required by libsoup.
+ */
+internal class Rygel.DataSink : Object {
+    private DataSource source;
+    private Server server;
+    private Message message;
+
+    private const uint MAX_BUFFERED_CHUNKS = 32;
+    private const uint MIN_BUFFERED_CHUNKS = 4;
+
+    private int64 chunks_buffered;
+    private int64 bytes_sent;
+    private int64 max_bytes;
+
+    public DataSink (DataSource source,
+                     Server     server,
+                     Message    message,
+                     HTTPSeek?  offsets) {
+        this.source = source;
+        this.server = server;
+        this.message = message;
+
+        this.chunks_buffered = 0;
+        this.bytes_sent = 0;
+        this.max_bytes = int64.MAX;
+        if (offsets != null &&
+            offsets is HTTPByteSeek) {
+            this.max_bytes = offsets.length;
+        }
+
+        this.source.data_available.connect (this.on_data_available);
+        this.message.wrote_chunk.connect (this.on_wrote_chunk);
+    }
+
+    private void on_wrote_chunk (Soup.Message msg) {
+        this.chunks_buffered--;
+        if (this.chunks_buffered < MIN_BUFFERED_CHUNKS) {
+            this.source.thaw ();
+        }
+    }
+
+    private void on_data_available (uint8[] buffer) {
+        var left = this.max_bytes - this.bytes_sent;
+
+        if (left <= 0) {
+            return;
+        }
+
+        var to_send = int64.min (buffer.length, left);
+
+        this.message.response_body.append (Soup.MemoryUse.COPY,
+                                           buffer[0:to_send]);
+        this.chunks_buffered++;
+        this.bytes_sent += to_send;
+
+        this.server.unpause_message (this.message);
+
+        if (this.chunks_buffered > MAX_BUFFERED_CHUNKS) {
+            this.source.freeze ();
+        }
+    }
+}
diff --git a/src/librygel-server/rygel-data-source.vala b/src/librygel-server/rygel-data-source.vala
new file mode 100644
index 0000000..2093420
--- /dev/null
+++ b/src/librygel-server/rygel-data-source.vala
@@ -0,0 +1,83 @@
+/*
+ * Copyright (C) 2012 Intel Corporation.
+ *
+ * Author: Jens Georg <jensg openismus com>
+ *
+ * This file is part of Rygel.
+ *
+ * Rygel is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * Rygel is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+public errordomain Rygel.DataSourceError {
+    GENERAL,
+    SEEK_FAILED
+}
+
+/**
+ * Interface for all data streams from a media engine. The data source is
+ * responsible for providing the streamable byte-stream via its data_available
+ * signal. End-of-stream is signalled through the done signal, while errors
+ * are signalled by using the error signal.
+ */
+public interface Rygel.DataSource : GLib.Object {
+    /**
+     * Start producing the data.
+     *
+     * @param offsets optional limits of the stream for partial streaming
+     * @throws Error if anything goes wrong while starting the stream. Throws
+     * DataSourceError.SEEK_FAILED if a seek method is not supported or the
+     * range is not fulfillable.
+     */
+    public abstract void start (HTTPSeek? offsets) throws Error;
+
+    /**
+     * Temporarily stop data generation.
+     *
+     * May be called multiple times. If the source is already frozen, the
+     * following calles to freeze are ignored. After callging freeze(), so
+     * data_available() signal should be emitted.
+     */
+    public abstract void freeze ();
+
+    /**
+     * Resume data generation from a previous freeze call.
+     *
+     * May be called multiple times, will be ignored if the source is not
+     * frozen.
+     */
+    public abstract void thaw ();
+
+    /**
+     * Stop producing data.
+     * After calling stop(), calling start() should produce data from the
+     * beginning and not resume streaming.
+     */
+    public abstract void stop ();
+
+    /**
+     * Emitted when the source has produced some data.
+     */
+    public signal void data_available (uint8[] data);
+
+    /**
+     * Emitted when the source does not have data anymore.
+     */
+    public signal void done ();
+
+    /**
+     * Emitted when the source encounters a problem during data generation.
+     */
+    public signal void error (Error error);
+}
diff --git a/src/librygel-server/rygel-gst-data-source.vala b/src/librygel-server/rygel-gst-data-source.vala
new file mode 100644
index 0000000..53a944f
--- /dev/null
+++ b/src/librygel-server/rygel-gst-data-source.vala
@@ -0,0 +1,232 @@
+/*
+ * Copyright (C) 2012 Intel Corporation.
+ *
+ * Author: Jens Georg <jensg openismus com>
+ *
+ * This file is part of Rygel.
+ *
+ * Rygel is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * Rygel is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ */
+
+using Gst;
+
+public class Rygel.GstDataSource : Rygel.DataSource, GLib.Object {
+    private dynamic Element src;
+    private Pipeline pipeline;
+    private HTTPSeek seek = null;
+    private GstSink sink;
+    private uint bus_watch_id;
+
+    public GstDataSource (string uri) {
+        this.src = GstUtils.create_source_for_uri (uri);
+    }
+
+    public GstDataSource.from_element (Element element) {
+        this.src = element;
+    }
+
+    public void start (HTTPSeek? offsets) throws Error {
+        this.seek = offsets;
+        this.prepare_pipeline ("RygelGstDataSource", this.src);
+        if (this.seek != null) {
+            this.pipeline.set_state (State.PAUSED);
+        } else {
+            this.pipeline.set_state (State.PLAYING);
+        }
+    }
+
+    public void freeze () {
+        this.sink.freeze ();
+    }
+
+    public void thaw () {
+        this.sink.thaw ();
+    }
+
+    public void stop () {
+        // Unlock eventually frozen sink
+        this.sink.cancellable.cancel ();
+        this.pipeline.set_state (State.NULL);
+        Source.remove (this.bus_watch_id);
+    }
+
+    private void prepare_pipeline (string   name,
+                                   Element  src) throws Error {
+        this.sink = new GstSink (this, this.seek);
+
+        this.pipeline = new Pipeline (name);
+        if (pipeline == null) {
+            throw new DataSourceError.GENERAL
+                                        (_("Failed to create pipeline"));
+        }
+
+        this.pipeline.add_many (src, sink);
+
+        if (src.numsrcpads == 0) {
+            // Seems source uses dynamic pads, link when pad available
+            src.pad_added.connect (this.src_pad_added);
+        } else {
+            // static pads? easy!
+            if (!src.link (sink)) {
+                throw new GstError.LINK (_("Failed to link %s to %s"),
+                                         src.name,
+                                         sink.name);
+            }
+        }
+
+        // Bus handler
+        var bus = this.pipeline.get_bus ();
+        this.bus_watch_id = bus.add_watch (this.bus_handler);
+    }
+
+    private void src_pad_added (Element src, Pad src_pad) {
+        var caps = src_pad.get_caps_reffed ();
+
+        var sink = this.pipeline.get_by_name (GstSink.NAME);
+        Pad sink_pad;
+
+        dynamic Element depay = GstUtils.get_rtp_depayloader (caps);
+        if (depay != null) {
+            this.pipeline.add (depay);
+            if (!depay.link (sink)) {
+                critical (_("Failed to link %s to %s"),
+                          depay.name,
+                          sink.name);
+                this.done ();
+
+                return;
+            }
+
+            sink_pad = depay.get_compatible_pad (src_pad, caps);
+        } else {
+            sink_pad = sink.get_compatible_pad (src_pad, caps);
+        }
+
+        if (src_pad.link (sink_pad) != PadLinkReturn.OK) {
+            critical (_("Failed to link pad %s to %s"),
+                      src_pad.name,
+                      sink_pad.name);
+            this.done ();
+
+            return;
+        }
+
+        if (depay != null) {
+            depay.sync_state_with_parent ();
+        }
+    }
+
+    private bool bus_handler (Gst.Bus bus, Gst.Message message) {
+        bool ret = true;
+
+        if (message.type == MessageType.EOS) {
+            ret = false;
+        } else if (message.type == MessageType.STATE_CHANGED) {
+            if (message.src != this.pipeline) {
+                return true;
+            }
+
+            State old_state;
+            State new_state;
+
+            message.parse_state_changed (out old_state,
+                                         out new_state,
+                                         null);
+
+            if (old_state == State.NULL && new_state == State.READY) {
+                dynamic Element element = this.pipeline.get_by_name ("muxer");
+                if (element != null) {
+                    var name = element.get_factory ().get_name ();
+                    // Awesome gross hack, really.
+                    if (name == "mp4mux") {
+                        element.streamable = true;
+                        element.fragment_duration = 1000;
+                    }
+                }
+            }
+
+            if (this.seek != null) {
+                if (old_state == State.READY && new_state == State.PAUSED) {
+                    if (this.perform_seek ()) {
+                        this.pipeline.set_state (State.PLAYING);
+                    }
+                }
+            }
+        } else {
+            GLib.Error err;
+            string err_msg;
+
+            if (message.type == MessageType.ERROR) {
+                message.parse_error (out err, out err_msg);
+                critical (_("Error from pipeline %s: %s"),
+                          this.pipeline.name,
+                          err_msg);
+
+                ret = false;
+            } else if (message.type == MessageType.WARNING) {
+                message.parse_warning (out err, out err_msg);
+                warning (_("Warning from pipeline %s: %s"),
+                         this.pipeline.name,
+                         err_msg);
+            }
+        }
+
+        if (!ret) {
+            Idle.add_full (Priority.DEFAULT, () => {
+                this.done ();
+
+                return false;
+            });
+        }
+
+        return ret;
+    }
+
+    private bool perform_seek () {
+        var stop_type = Gst.SeekType.NONE;
+        Format format;
+        var flags = SeekFlags.FLUSH;
+
+        if (this.seek is HTTPTimeSeek) {
+            format = Format.TIME;
+            flags |= SeekFlags.KEY_UNIT;
+        } else {
+            format = Format.BYTES;
+            flags |= SeekFlags.ACCURATE;
+        }
+
+        if (this.seek.stop > 0) {
+            stop_type = Gst.SeekType.SET;
+        }
+
+        if (!this.pipeline.seek (1.0,
+                                 format,
+                                 flags,
+                                 Gst.SeekType.SET,
+                                 this.seek.start,
+                                 stop_type,
+                                 this.seek.stop + 1)) {
+            warning (_("Failed to seek to offsets %lld:%lld"),
+                     this.seek.start,
+                     this.seek.stop);
+
+            this.error (new DataSourceError.SEEK_FAILED (_("Failed to seek")));
+
+            return false;
+        }
+
+        return true;
+    }
+}
diff --git a/src/librygel-server/rygel-gst-media-engine.vala b/src/librygel-server/rygel-gst-media-engine.vala
index 8a7ba70..890de4c 100644
--- a/src/librygel-server/rygel-gst-media-engine.vala
+++ b/src/librygel-server/rygel-gst-media-engine.vala
@@ -87,4 +87,8 @@ internal class Rygel.GstMediaEngine : Rygel.MediaEngine {
     public override unowned GLib.List<Transcoder>? get_transcoders () {
         return this.transcoders;
     }
+
+    public override DataSource create_data_source (string uri) {
+        return new GstDataSource (uri);
+    }
 }
diff --git a/src/librygel-server/rygel-http-gst-sink.vala b/src/librygel-server/rygel-gst-sink.vala
similarity index 77%
rename from src/librygel-server/rygel-http-gst-sink.vala
rename to src/librygel-server/rygel-gst-sink.vala
index 67a3271..eed08d0 100644
--- a/src/librygel-server/rygel-http-gst-sink.vala
+++ b/src/librygel-server/rygel-gst-sink.vala
@@ -1,8 +1,10 @@
 /*
  * Copyright (C) 2011 Nokia Corporation.
+ * Copyright (C) 2012 Intel Corporation.
  *
  * Author: Zeeshan Ali (Khattak) <zeeshanak gnome org>
  *                               <zeeshan ali nokia com>
+ *         Jens Georg <jensg openismus com>
  *
  * This file is part of Rygel.
  *
@@ -23,7 +25,7 @@
 
 using Gst;
 
-internal class Rygel.HTTPGstSink : BaseSink {
+internal class Rygel.GstSink : BaseSink {
     public const string NAME = "http-gst-sink";
     public const string PAD_NAME = "sink";
     // High and low threshold for number of buffered chunks
@@ -32,7 +34,6 @@ internal class Rygel.HTTPGstSink : BaseSink {
 
     public Cancellable cancellable;
 
-    private unowned HTTPResponse response;
     private int priority;
 
     private int64 chunks_buffered;
@@ -41,6 +42,10 @@ internal class Rygel.HTTPGstSink : BaseSink {
 
     private Mutex buffer_mutex = Mutex ();
     private Cond buffer_condition = Cond ();
+    private unowned DataSource source;
+    private HTTPSeek offsets;
+
+    private bool frozen;
 
     static construct {
         var caps = new Caps.any ();
@@ -51,32 +56,53 @@ internal class Rygel.HTTPGstSink : BaseSink {
         add_pad_template (template);
     }
 
-    public HTTPGstSink (HTTPResponse response) {
+    public GstSink (DataSource source, HTTPSeek? offsets) {
         this.chunks_buffered = 0;
         this.bytes_sent = 0;
         this.max_bytes = int64.MAX;
+        this.source = source;
+        this.offsets = offsets;
 
         this.cancellable = new Cancellable ();
-        this.priority = response.priority;
-        this.response = response;
 
         this.sync = false;
         this.name = NAME;
+        this.frozen = false;
 
-        if (response.seek != null) {
-            if (response.seek is HTTPByteSeek) {
-                this.max_bytes = response.seek.length;
+        if (this.offsets != null) {
+            if (this.offsets is HTTPByteSeek) {
+                this.max_bytes = this.offsets.length;
             }
         }
 
         this.cancellable.cancelled.connect (this.on_cancelled);
-        response.msg.wrote_chunk.connect (this.on_wrote_chunk);
+    }
+
+    public void freeze () {
+        if (this.frozen) {
+            return;
+        }
+
+        this.buffer_mutex.lock ();
+        this.frozen = true;
+        this.buffer_mutex.unlock ();
+    }
+
+    public void thaw () {
+        if (!this.frozen) {
+            return;
+        }
+
+        this.buffer_mutex.lock ();
+        this.frozen = false;
+        this.buffer_condition.broadcast ();
+        this.buffer_mutex.unlock ();
     }
 
     public override FlowReturn render (Buffer buffer) {
         this.buffer_mutex.lock ();
         while (!this.cancellable.is_cancelled () &&
-               this.chunks_buffered > MAX_BUFFERED_CHUNKS) {
+                this.frozen) {
             // Client is either not reading (Paused) or not fast enough
             this.buffer_condition.wait (this.buffer_mutex);
         }
@@ -103,28 +129,16 @@ internal class Rygel.HTTPGstSink : BaseSink {
 
         var to_send = int64.min (buffer.size, left);
 
-        this.response.push_data (buffer.data[0:to_send]);
+        this.source.data_available (buffer.data[0:to_send]);
         this.chunks_buffered++;
         this.bytes_sent += to_send;
 
         return false;
     }
 
-    private void on_wrote_chunk (Soup.Message msg) {
-        this.buffer_mutex.lock ();
-        this.chunks_buffered--;
-
-        if (this.chunks_buffered < MIN_BUFFERED_CHUNKS) {
-            this.buffer_condition.broadcast ();
-        }
-        this.buffer_mutex.unlock ();
-    }
-
     private void on_cancelled () {
         this.buffer_mutex.lock ();
         this.buffer_condition.broadcast ();
         this.buffer_mutex.unlock ();
-
-        this.response.msg.wrote_chunk.disconnect (this.on_wrote_chunk);
     }
 }
diff --git a/src/librygel-server/rygel-http-identity-handler.vala b/src/librygel-server/rygel-http-identity-handler.vala
index 845fe32..186ef40 100644
--- a/src/librygel-server/rygel-http-identity-handler.vala
+++ b/src/librygel-server/rygel-http-identity-handler.vala
@@ -1,8 +1,10 @@
 /*
  * Copyright (C) 2008, 2009 Nokia Corporation.
+ * Copyright (C) 2012 Intel Corporation.
  *
  * Author: Zeeshan Ali (Khattak) <zeeshanak gnome org>
  *                               <zeeshan ali nokia com>
+ *         Jens Georg <jensg openismus com>
  *
  * This file is part of Rygel.
  *
@@ -74,12 +76,13 @@ internal class Rygel.HTTPIdentityHandler : Rygel.HTTPGetHandler {
     }
 
     private HTTPResponse render_body_real (HTTPGet request) throws Error {
-        Element src;
+        DataSource src;
+        var engine = MediaEngine.get_default ();
 
         if (request.subtitle != null) {
-            src = GstUtils.create_source_for_uri (request.subtitle.uri);
+            src = engine.create_data_source (request.subtitle.uri);
         } else if (request.thumbnail != null) {
-            src = GstUtils.create_source_for_uri (request.thumbnail.uri);
+            src = engine.create_data_source (request.thumbnail.uri);
         } else {
             src = request.item.create_stream_source
                                         (request.http_server.context.host_ip);
@@ -89,10 +92,6 @@ internal class Rygel.HTTPIdentityHandler : Rygel.HTTPGetHandler {
             throw new HTTPRequestError.NOT_FOUND (_("Not found"));
         }
 
-        if (src.is_floating ()) {
-            src.ref_sink ();
-        }
-
         return new HTTPResponse (request, this, src);
     }
 }
diff --git a/src/librygel-server/rygel-http-response.vala b/src/librygel-server/rygel-http-response.vala
index 2cc5238..7c209b1 100644
--- a/src/librygel-server/rygel-http-response.vala
+++ b/src/librygel-server/rygel-http-response.vala
@@ -1,8 +1,10 @@
 /*
- * Copyright (C) 2008 Nokia Corporation.
+ * Copyright (C) 2008-2012 Nokia Corporation.
+ * Copyright (C) 2012 Intel Corporation.
  *
  * Author: Zeeshan Ali (Khattak) <zeeshanak gnome org>
  *                               <zeeshan ali nokia com>
+ *         Jens Georg <jensg openismus com>
  *
  * This file is part of Rygel.
  *
@@ -57,17 +59,30 @@ internal class Rygel.HTTPResponse : GLib.Object, Rygel.StateMachine {
         }
     }
 
-    private Pipeline pipeline;
-    private uint bus_watch_id;
+    private DataSource src;
+    private DataSink sink;
     private bool unref_soup_server;
 
     public HTTPResponse (HTTPGet        request,
                          HTTPGetHandler request_handler,
-                         Element        src) throws Error {
+                         DataSource     src) throws Error {
         this.server = request.server;
         this.msg = request.msg;
         this.cancellable = request_handler.cancellable;
         this.seek = request.seek;
+        this.src = src;
+        this.sink = new DataSink (this.src, this.server, this.msg, this.seek);
+        this.src.done.connect ( () => {
+            this.end (false, KnownStatusCode.NONE);
+        });
+        this.src.error.connect ( (error) => {
+            if (error is DataSourceError.SEEK_FAILED) {
+                this.end (false,
+                          KnownStatusCode.REQUESTED_RANGE_NOT_SATISFIABLE);
+            } else {
+                this.end (false, KnownStatusCode.NONE);
+            }
+        });
 
         if (this.cancellable != null) {
             this.cancellable.cancelled.connect (this.on_cancelled);
@@ -75,7 +90,6 @@ internal class Rygel.HTTPResponse : GLib.Object, Rygel.StateMachine {
 
         this.msg.response_body.set_accumulate (false);
 
-        this.prepare_pipeline ("RygelHTTPGstResponse", src);
         this.server.weak_ref (this.on_server_weak_ref);
         this.unref_soup_server = true;
     }
@@ -87,30 +101,22 @@ internal class Rygel.HTTPResponse : GLib.Object, Rygel.StateMachine {
     }
 
     public async void run () {
-        // Only bother attempting to seek if the offset is greater than zero.
-        if (this.seek != null) {
-            this.pipeline.set_state (State.PAUSED);
-        } else {
-            this.pipeline.set_state (State.PLAYING);
-        }
-
         this.run_continue = run.callback;
+        try {
+            this.src.start (this.seek);
+        } catch (Error error) {
+            Idle.add (() => {
+                this.end (false, KnownStatusCode.NONE);
 
-        yield;
-    }
-
-    public void push_data (uint8[] data) {
-        this.msg.response_body.append (Soup.MemoryUse.COPY, data);
+                return false;
+            });
+        }
 
-        this.server.unpause_message (this.msg);
+        yield;
     }
 
     public virtual void end (bool aborted, uint status) {
-        var sink = this.pipeline.get_by_name (HTTPGstSink.NAME) as HTTPGstSink;
-        sink.cancellable.cancel ();
-
-        this.pipeline.set_state (State.NULL);
-        Source.remove (this.bus_watch_id);
+        this.src.stop ();
 
         var encoding = this.msg.response_headers.get_encoding ();
 
@@ -138,167 +144,4 @@ internal class Rygel.HTTPResponse : GLib.Object, Rygel.StateMachine {
         this.unref_soup_server = false;
         this.cancellable.cancel ();
     }
-
-    private void prepare_pipeline (string name, Element src) throws Error {
-        var sink = new HTTPGstSink (this);
-
-        this.pipeline = new Pipeline (name);
-        assert (this.pipeline != null);
-
-        this.pipeline.add_many (src, sink);
-
-        if (src.numsrcpads == 0) {
-            // Seems source uses dynamic pads, link when pad available
-            src.pad_added.connect (this.src_pad_added);
-        } else {
-            // static pads? easy!
-            if (!src.link (sink)) {
-                throw new GstError.LINK (_("Failed to link %s to %s"),
-                                         src.name,
-                                         sink.name);
-            }
-        }
-
-        // Bus handler
-        var bus = this.pipeline.get_bus ();
-        this.bus_watch_id = bus.add_watch (this.bus_handler);
-    }
-
-    private void src_pad_added (Element src, Pad src_pad) {
-        var caps = src_pad.get_caps_reffed ();
-
-        var sink = this.pipeline.get_by_name (HTTPGstSink.NAME);
-        Pad sink_pad;
-
-        dynamic Element depay = GstUtils.get_rtp_depayloader (caps);
-        if (depay != null) {
-            this.pipeline.add (depay);
-            if (!depay.link (sink)) {
-                critical (_("Failed to link %s to %s"),
-                          depay.name,
-                          sink.name);
-
-                this.end (false, KnownStatusCode.NONE);
-
-                return;
-            }
-
-            sink_pad = depay.get_compatible_pad (src_pad, caps);
-        } else {
-            sink_pad = sink.get_compatible_pad (src_pad, caps);
-        }
-
-        if (src_pad.link (sink_pad) != PadLinkReturn.OK) {
-            critical (_("Failed to link pad %s to %s"),
-                      src_pad.name,
-                      sink_pad.name);
-            this.end (false, KnownStatusCode.NONE);
-
-            return;
-        }
-
-        if (depay != null) {
-            depay.sync_state_with_parent ();
-        }
-    }
-
-    private bool bus_handler (Gst.Bus bus, Gst.Message message) {
-        bool ret = true;
-
-        if (message.type == MessageType.EOS) {
-            ret = false;
-        } else if (message.type == MessageType.STATE_CHANGED) {
-            if (message.src != this.pipeline) {
-                return true;
-            }
-            State old_state;
-            State new_state;
-
-            message.parse_state_changed (out old_state,
-                                         out new_state,
-                                         null);
-            if (old_state == State.NULL && new_state == State.READY) {
-                dynamic Element element = this.pipeline.get_by_name ("muxer");
-                if (element != null) {
-                    var name = element.get_factory ().get_name ();
-                    // Awesome gross hack, really.
-                    if (name == "mp4mux") {
-                        element.streamable = true;
-                        element.fragment_duration = 1000;
-                    }
-                }
-            }
-
-            if (this.seek != null) {
-                if (old_state == State.READY && new_state == State.PAUSED) {
-                    if (this.perform_seek ()) {
-                        this.pipeline.set_state (State.PLAYING);
-                    }
-                }
-            }
-        } else {
-            GLib.Error err;
-            string err_msg;
-
-            if (message.type == MessageType.ERROR) {
-                message.parse_error (out err, out err_msg);
-                critical (_("Error from pipeline %s: %s"),
-                          this.pipeline.name,
-                          err_msg);
-
-                ret = false;
-            } else if (message.type == MessageType.WARNING) {
-                message.parse_warning (out err, out err_msg);
-                warning (_("Warning from pipeline %s: %s"),
-                         this.pipeline.name,
-                         err_msg);
-            }
-        }
-
-        // If pipeline state didn't change due to the request being cancelled,
-        // end this request. Otherwise it was already ended.
-        if (!ret) {
-            Idle.add_full (this.priority, () => {
-                if (!this.cancellable.is_cancelled ()) {
-                    this.end (false, KnownStatusCode.NONE);
-                }
-
-                return false;
-            });
-        }
-
-        return ret;
-    }
-
-    private bool perform_seek () {
-        var stop_type = Gst.SeekType.NONE;
-        Format format;
-
-        if (this.seek is HTTPTimeSeek) {
-            format = Format.TIME;
-
-        } else {
-            format = Format.BYTES;
-        }
-
-        if (this.seek.stop > 0) {
-            stop_type = Gst.SeekType.SET;
-        }
-
-        if (!this.pipeline.seek (1.0,
-                                 format,
-                                 SeekFlags.FLUSH | SeekFlags.ACCURATE,
-                                 Gst.SeekType.SET,
-                                 this.seek.start,
-                                 stop_type,
-                                 this.seek.stop + 1)) {
-            warning (_("Failed to seek to offset %lld"), this.seek.start);
-
-            this.end (false, KnownStatusCode.REQUESTED_RANGE_NOT_SATISFIABLE);
-
-            return false;
-        }
-
-        return true;
-    }
 }
diff --git a/src/librygel-server/rygel-http-seek.vala b/src/librygel-server/rygel-http-seek.vala
index a857734..670816b 100644
--- a/src/librygel-server/rygel-http-seek.vala
+++ b/src/librygel-server/rygel-http-seek.vala
@@ -1,8 +1,10 @@
 /*
  * Copyright (C) 2008-2009 Nokia Corporation.
+ * Copyright (C) 2012 Intel Corporation.
  *
  * Author: Zeeshan Ali (Khattak) <zeeshanak gnome org>
  *                               <zeeshan ali nokia com>
+ *         Jens Georg <jensg openismus com>
  *
  * This file is part of Rygel.
  *
@@ -21,12 +23,12 @@
  * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
  */
 
-internal errordomain Rygel.HTTPSeekError {
+public errordomain Rygel.HTTPSeekError {
     INVALID_RANGE = Soup.KnownStatusCode.BAD_REQUEST,
     OUT_OF_RANGE = Soup.KnownStatusCode.REQUESTED_RANGE_NOT_SATISFIABLE,
 }
 
-internal abstract class Rygel.HTTPSeek : GLib.Object {
+public abstract class Rygel.HTTPSeek : GLib.Object {
     public Soup.Message msg { get; private set; }
 
     // These are either number of bytes or microseconds
diff --git a/src/librygel-server/rygel-http-transcode-handler.vala b/src/librygel-server/rygel-http-transcode-handler.vala
index b4746e7..f4bbc53 100644
--- a/src/librygel-server/rygel-http-transcode-handler.vala
+++ b/src/librygel-server/rygel-http-transcode-handler.vala
@@ -1,8 +1,10 @@
 /*
  * Copyright (C) 2009 Nokia Corporation.
+ * Copyright (C) 2012 Intel Corporation.
  *
  * Author: Zeeshan Ali (Khattak) <zeeshanak gnome org>
  *                               <zeeshan ali nokia com>
+ *         Jens Georg <jensg openismus com>
  *
  * This file is part of Rygel.
  *
@@ -57,13 +59,14 @@ internal class Rygel.HTTPTranscodeHandler : HTTPGetHandler {
             throw new HTTPRequestError.NOT_FOUND (_("Not found"));
         }
 
-        try {
+/*        try {
             src = this.transcoder.create_source (item, src);
 
             return new HTTPResponse (request, this, src);
         } catch (GLib.Error err) {
             throw new HTTPRequestError.NOT_FOUND (err.message);
-        }
+        }*/
+        throw new HTTPRequestError.NOT_FOUND ("Transcoding temporarily disabled");
     }
 
     protected override DIDLLiteResource add_resource (DIDLLiteItem didl_item,
diff --git a/src/librygel-server/rygel-media-engine.vala b/src/librygel-server/rygel-media-engine.vala
index 04e374f..5bbe6c2 100644
--- a/src/librygel-server/rygel-media-engine.vala
+++ b/src/librygel-server/rygel-media-engine.vala
@@ -77,4 +77,12 @@ public abstract class Rygel.MediaEngine : GLib.Object {
      * @return A list of #Transcoder<!-- -->s or null if not supported.
      */
     public abstract unowned List<Transcoder>? get_transcoders ();
+
+    /**
+     * Get a data source for the URI.
+     *
+     * @param uri to create the data source for.
+     * @return A data source representing the uri
+     */
+    public abstract DataSource create_data_source (string uri);
 }
diff --git a/src/librygel-server/rygel-media-item.vala b/src/librygel-server/rygel-media-item.vala
index d43c2ae..82200d9 100644
--- a/src/librygel-server/rygel-media-item.vala
+++ b/src/librygel-server/rygel-media-item.vala
@@ -24,7 +24,6 @@
  */
 
 using GUPnP;
-using Gst;
 
 private errordomain Rygel.MediaItemError {
     BAD_URI
@@ -103,24 +102,22 @@ public abstract class Rygel.MediaItem : MediaObject {
 
     // Live media items need to provide a nice working implementation of this
     // method if they can/do not provide a valid URI
-    public virtual Element? create_stream_source (string? host_ip = null) {
-        dynamic Element src = null;
-
-        if (this.uris.size != 0) {
-            string translated_uri = this.uris.get (0);
-            if (host_ip != null) {
-                try {
-                    translated_uri = this.address_regex.replace_literal
-                                        (this.uris.get (0), -1, 0, host_ip);
-                } catch (Error error) {
-                    assert_not_reached ();
-                }
-            }
+    public virtual DataSource? create_stream_source (string? host_ip = null) {
+        if (this.uris.size == 0) {
+            return null;
+        }
 
-            src = GstUtils.create_source_for_uri (translated_uri);
+        string translated_uri = this.uris.get (0);
+        if (host_ip != null) {
+            try {
+                translated_uri = this.address_regex.replace_literal
+                    (this.uris.get (0), -1, 0, host_ip);
+            } catch (Error error) {
+                assert_not_reached ();
+            }
         }
 
-        return src;
+        return MediaEngine.get_default ().create_data_source (translated_uri);
     }
 
     public bool is_live_stream () {
diff --git a/src/librygel-server/rygel-transcoder.vala b/src/librygel-server/rygel-transcoder.vala
index 64d3a14..cc0e2fd 100644
--- a/src/librygel-server/rygel-transcoder.vala
+++ b/src/librygel-server/rygel-transcoder.vala
@@ -23,8 +23,8 @@
  * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
  */
 
-using Gst;
 using GUPnP;
+using Gst;
 
 /**
  * The base Transcoder class. Each implementation derives from it and must
diff --git a/src/plugins/gst-launch/rygel-gst-launch-audio-item.vala b/src/plugins/gst-launch/rygel-gst-launch-audio-item.vala
index 81e3eaa..e847291 100644
--- a/src/plugins/gst-launch/rygel-gst-launch-audio-item.vala
+++ b/src/plugins/gst-launch/rygel-gst-launch-audio-item.vala
@@ -5,6 +5,7 @@
  * Author: Thijs Vermeir <thijsvermeir gmail com>
  * Author: Zeeshan Ali (Khattak) <zeeshanak gnome org>
  *                               <zeeshan ali nokia com>
+ *         Jens Georg <jensg openismus com>
  *
  * This file is part of Rygel.
  *
@@ -42,7 +43,7 @@ public class Rygel.GstLaunch.AudioItem : Rygel.AudioItem, Item {
         this.launch_line = launch_line;
     }
 
-    public override Element? create_stream_source (string? host_ip) {
-        return this.create_source ();
+    public override DataSource? create_stream_source (string? host_ip) {
+        return new GstDataSource.from_element (this.create_source ());
     }
 }
diff --git a/src/plugins/gst-launch/rygel-gst-launch-video-item.vala b/src/plugins/gst-launch/rygel-gst-launch-video-item.vala
index b25900c..7b7f2a4 100644
--- a/src/plugins/gst-launch/rygel-gst-launch-video-item.vala
+++ b/src/plugins/gst-launch/rygel-gst-launch-video-item.vala
@@ -1,10 +1,12 @@
 /*
  * Copyright (C) 2009 Thijs Vermeir <thijsvermeir gmail com>
  * Copyright (C) 2010 Nokia Corporation.
+ * Copyright (C) 2012 Intel Corporation.
  *
  * Author: Thijs Vermeir <thijsvermeir gmail com>
  * Author: Zeeshan Ali (Khattak) <zeeshanak gnome org>
  *                               <zeeshan ali nokia com>
+ *         Jens Georg <jensg openismus com>
  *
  * This file is part of Rygel.
  *
@@ -42,7 +44,7 @@ public class Rygel.GstLaunch.VideoItem : Rygel.VideoItem, Item {
         this.launch_line = launch_line;
     }
 
-    public override Element? create_stream_source (string? host_ip) {
-        return this.create_source ();
+    public override DataSource? create_stream_source (string? host_ip) {
+        return new GstDataSource.from_element (this.create_source ());
     }
 }
diff --git a/src/plugins/test/rygel-test-audio-item.vala b/src/plugins/test/rygel-test-audio-item.vala
index 9a27c8a..021ecec 100644
--- a/src/plugins/test/rygel-test-audio-item.vala
+++ b/src/plugins/test/rygel-test-audio-item.vala
@@ -1,9 +1,11 @@
 /*
  * Copyright (C) 2008 Zeeshan Ali (Khattak) <zeeshanak gnome org>.
  * Copyright (C) 2008 Nokia Corporation.
+ * Copyright (C) 2012 Intel Corporation.
  *
  * Author: Zeeshan Ali (Khattak) <zeeshanak gnome org>
  *                               <zeeshan ali nokia com>
+ *         Jens Georg <jensg openismus com>
  *
  * This file is part of Rygel.
  *
@@ -37,9 +39,11 @@ public class Rygel.Test.AudioItem : Rygel.AudioItem {
         this.mime_type = TEST_MIMETYPE;
     }
 
-    public override Element? create_stream_source (string? host_ip) {
+    public override DataSource? create_stream_source (string? host_ip) {
         try {
-            return parse_bin_from_description (PIPELINE, true);
+            var element = parse_bin_from_description (PIPELINE, true);
+
+            return new GstDataSource.from_element (element);
         } catch (Error err) {
             warning ("Required plugin missing (%s)", err.message);
 
diff --git a/src/plugins/test/rygel-test-video-item.vala b/src/plugins/test/rygel-test-video-item.vala
index 237f943..991a488 100644
--- a/src/plugins/test/rygel-test-video-item.vala
+++ b/src/plugins/test/rygel-test-video-item.vala
@@ -1,9 +1,11 @@
 /*
  * Copyright (C) 2008 Zeeshan Ali (Khattak) <zeeshanak gnome org>.
  * Copyright (C) 2008 Nokia Corporation.
+ * Copyright (C) 2012 Intel Corporation.
  *
  * Author: Zeeshan Ali (Khattak) <zeeshanak gnome org>
  *                               <zeeshan ali nokia com>
+ *         Jens Georg <jensg openismus com>
  *
  * This file is part of Rygel.
  *
@@ -39,9 +41,11 @@ public class Rygel.Test.VideoItem : Rygel.VideoItem {
         this.mime_type = TEST_MIMETYPE;
     }
 
-    public override Element? create_stream_source (string? host_ip) {
+    public override DataSource? create_stream_source (string? host_ip) {
         try {
-            return parse_bin_from_description (PIPELINE, true);
+            var element =  parse_bin_from_description (PIPELINE, true);
+
+            return new GstDataSource.from_element (element);
         } catch (Error err) {
             warning ("Required plugin missing (%s)", err.message);
 
diff --git a/tests/Makefile.am b/tests/Makefile.am
index dc431e2..6c7086a 100644
--- a/tests/Makefile.am
+++ b/tests/Makefile.am
@@ -54,8 +54,8 @@ rygel_http_item_uri_test_SOURCES = rygel-http-item-uri-test.vala \
 rygel_http_response_test_SOURCES = rygel-http-response-test.vala \
 				   rygel-http-response.vala \
 				   rygel-state-machine_http-response.vala \
-				   rygel-http-gst-sink.vala \
-				   rygel-gst-utils.vala
+				   rygel-data-source_http-response.vala \
+				   rygel-data-sink_http-response.vala
 
 rygel_http_byte_seek_test_SOURCES = rygel-http-byte-seek-test.vala \
 				    rygel-http-byte-seek.vala \
@@ -91,6 +91,8 @@ rygel_searchable_container_test_SOURCES = \
 
 rygel_item_creator_test_SOURCES = rygel-item-creator-test.vala \
 				  rygel-item-creator.vala \
+				  rygel-data-source.vala \
+				  rygel-http-seek_item-creator.vala \
 				  rygel-state-machine_item-creator.vala \
 				  rygel-relational-expression.vala \
 				  rygel-search-expression.vala \
diff --git a/tests/rygel-data-source.vala b/tests/rygel-data-source.vala
new file mode 120000
index 0000000..e1065ed
--- /dev/null
+++ b/tests/rygel-data-source.vala
@@ -0,0 +1 @@
+../src/librygel-server/rygel-data-source.vala
\ No newline at end of file
diff --git a/tests/rygel-http-response-test.vala b/tests/rygel-http-response-test.vala
index 64e0bf4..9f36384 100644
--- a/tests/rygel-http-response-test.vala
+++ b/tests/rygel-http-response-test.vala
@@ -22,7 +22,6 @@
  */
 
 using Soup;
-using Gst;
 
 public errordomain Rygel.TestError {
     SKIP = 77,
@@ -338,8 +337,58 @@ public class Rygel.HTTPGetHandler : GLib.Object {
     }
 }
 
+internal class Rygel.TestDataSource : Rygel.DataSource, Object {
+    private long block_size;
+    private long buffers;
+    private uint64 data_sent;
+    private bool frozen;
+
+    public TestDataSource (long block_size, long buffers) {
+        this.block_size = block_size;
+        this.buffers = buffers;
+        this.data_sent = 0;
+    }
+
+    public void start (HTTPSeek? seek) throws Error {
+        Idle.add ( () => {
+            if (frozen) {
+                return false;
+            }
+
+            var data = new uint8[block_size];
+            this.data_sent += block_size;
+            if (this.data_sent > HTTPResponseTest.MAX_BYTES) {
+                this.done ();
+
+                return false;
+            }
+
+            this.data_available (data);
+
+            return true;
+        });
+    }
+
+    public void freeze () {
+        this.frozen = true;
+    }
+
+    public void thaw () {
+        if (!this.frozen) {
+            return;
+        }
+
+        this.frozen = false;
+        this.start (null);
+    }
+
+    public void stop () {
+        this.freeze ();
+    }
+}
+
 public class Rygel.MediaItem {
-    private static const long BLOCK_SIZE = HTTPResponseTest.MAX_BYTES / 16 + 1;
+    private static const long BLOCK_SIZE = HTTPResponseTest.MAX_BYTES / 16;
     private static const long MAX_BUFFERS = 25;
 
     public int64 size {
@@ -348,26 +397,23 @@ public class Rygel.MediaItem {
         }
     }
 
-    private dynamic Element src;
+    private DataSource src;
+    bool is_live = false;
 
     public MediaItem () {
-        this.src = GstUtils.create_element ("fakesrc", null);
-        this.src.sizetype = 2; // fixed
+        this.src = new TestDataSource (BLOCK_SIZE, MAX_BUFFERS);
+        this.is_live = true;
     }
 
     public MediaItem.fixed_size () {
         this ();
-
-        this.src.blocksize = BLOCK_SIZE;
-        this.src.num_buffers = MAX_BUFFERS;
-        this.src.sizemax = MAX_BUFFERS * BLOCK_SIZE;
     }
 
-    public Element? create_stream_source () {
+    public DataSource? create_stream_source () {
         return this.src;
     }
 
     public bool is_live_stream () {
-        return ((int) this.src.num_buffers) < 0;
+        return this.is_live;
     }
 }
diff --git a/tests/rygel-http-seek_item-creator.vala b/tests/rygel-http-seek_item-creator.vala
new file mode 120000
index 0000000..3396600
--- /dev/null
+++ b/tests/rygel-http-seek_item-creator.vala
@@ -0,0 +1 @@
+../src/librygel-server/rygel-http-seek.vala
\ No newline at end of file
diff --git a/tests/rygel-item-creator-test.vala b/tests/rygel-item-creator-test.vala
index 482693d..e0fda39 100644
--- a/tests/rygel-item-creator-test.vala
+++ b/tests/rygel-item-creator-test.vala
@@ -253,6 +253,10 @@ public class Rygel.GstMediaEngine : Rygel.MediaEngine {
     public override unowned GLib.List<Transcoder>? get_transcoders () {
         return null;
     }
+
+    public override DataSource create_data_source (string uri) {
+        return null;
+    }
 }
 
 public class Rygel.HTTPItemCreatorTest : GLib.Object {



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]