[pitivi: 1/4] Add stream matching code.



commit 55e08e374afdbb9321be07085d3d36747cf956a0
Author: Alessandro Decina <alessandro d gmail com>
Date:   Mon Jun 1 13:11:45 2009 +0200

    Add stream matching code.
    
    Add match_stream_groups and match_stream_groups_map to pitivi.stream.
---
 pitivi/stream.py     |  125 ++++++++++++++++++++++++++++++++++++++++++++
 tests/test_stream.py |  142 +++++++++++++++++++++++++++++++++++++++++++++++++-
 2 files changed, 266 insertions(+), 1 deletions(-)

diff --git a/pitivi/stream.py b/pitivi/stream.py
index 6e7ef44..f3046a3 100644
--- a/pitivi/stream.py
+++ b/pitivi/stream.py
@@ -27,6 +27,12 @@ from pitivi.log.loggable import Loggable
 import pitivi.log.log as log
 import gst
 
+STREAM_MATCH_MAXIMUM = 100
+STREAM_MATCH_SAME_CAPS = 60
+STREAM_MATCH_SAME_PAD_NAME = 40
+STREAM_MATCH_COMPATIBLE_CAPS = 30
+STREAM_MATCH_NONE = 0
+
 class MultimediaStream(Loggable):
     """
     Defines a media stream
@@ -413,3 +419,122 @@ def get_sink_pads_for_stream(element, stream):
     @rtype: List of C{gst.Pad}
     """
     return [x for x in get_pads_for_stream(element, stream) if x.get_direction() == gst.PAD_SINK]
+
+def stream_compare(stream_a, stream_b):
+    """
+    Compare two streams.
+    """
+    current_rank = STREAM_MATCH_NONE
+
+    if stream_a.pad_name is not None and (stream_a.pad_name ==
+            stream_b.pad_name):
+        current_rank += STREAM_MATCH_SAME_PAD_NAME
+
+    if stream_a.caps is not None:
+        if stream_a.caps == stream_b.caps:
+            current_rank += STREAM_MATCH_SAME_CAPS
+        elif stream_a.caps.intersect(stream_b.caps):
+            current_rank += STREAM_MATCH_COMPATIBLE_CAPS
+
+    return current_rank
+
+def match_stream(stream, stream_list):
+    """
+    Get the stream contained in stream_list that best matches the given stream.
+    """
+    best_stream = None
+    best_rank = STREAM_MATCH_NONE
+
+    for current_stream in stream_list:
+        current_rank = stream_compare(stream, current_stream)
+        if current_rank > best_rank:
+            best_rank = current_rank
+            best_stream = current_stream
+
+    return best_stream, best_rank
+
+class StreamGroupWalker(object):
+    """
+    Utility class used to match two groups of streams.
+
+    This class implements a greedy algorithm to compare two sets of streams. See
+    match_stream_groups for an example of usage.
+    """
+    def __init__(self, group_a, group_b,
+            stream_a=None, stream_b=None, parent=None):
+        self.group_a = list(group_a)
+        self.group_b = list(group_b)
+        self.stream_a = stream_a
+        self.stream_b = stream_b
+        if stream_a is not None and stream_b is not None:
+            match = stream_compare(stream_a, stream_b)
+            self.match = ((stream_a, stream_b), match)
+        else:
+            self.match = None
+        self.parent = parent
+
+    def advance(self):
+        walkers = []
+
+        for stream_a in self.group_a:
+            for stream_b in self.group_b:
+                group_a = list(self.group_a)
+                group_a.remove(stream_a)
+
+                group_b = list(self.group_b)
+                group_b.remove(stream_b)
+
+                walker = StreamGroupWalker(group_a, group_b,
+                        stream_a, stream_b, self)
+
+                walkers.append(walker)
+
+        return walkers
+
+    def getMatches(self):
+        matches = {}
+        walker = self
+        while walker is not None:
+            if walker.match is not None:
+                if matches.get(walker.match[0],
+                            STREAM_MATCH_NONE) < walker.match[1]:
+                    matches[walker.match[0]] = walker.match[1]
+
+            walker = walker.parent
+
+        return matches
+
+def match_stream_groups(group_a, group_b):
+    """
+    Match two groups of streams.
+
+    The function takes two sequences group_a and group_b of streams and returns
+    a dictionary of (stream_a, stream_b) -> rank, where stream_a belongs to
+    group_a, stream_b belongs to group_b and rank is stream_compare(stream_a,
+    stream_b).
+    The algorithm tries all the possible combinations of group_a and group_b and
+    returns the "best" match between group_a and group_b, ie the dictionary
+    having the sum of the ranks maximized.
+    """
+    walker = StreamGroupWalker(group_a, group_b)
+    walkers = [walker]
+    best_rank = 0
+    best_map = {}
+    while walkers:
+        walker = walkers.pop(0)
+        child_walkers = walker.advance()
+        if child_walkers:
+            walkers.extend(child_walkers)
+            continue
+
+        current_map = walker.getMatches()
+        current_rank = sum(current_map.values())
+        if current_rank > best_rank:
+            best_rank = current_rank
+            best_map = current_map
+
+    return best_map
+
+def match_stream_groups_map(group_a, group_b):
+    stream_map = match_stream_groups(group_a, group_b)
+    return dict(stream_map.keys())
diff --git a/tests/test_stream.py b/tests/test_stream.py
index 26b428b..7465eca 100644
--- a/tests/test_stream.py
+++ b/tests/test_stream.py
@@ -21,7 +21,11 @@
 # Boston, MA 02111-1307, USA.
 
 from common import TestCase
-from pitivi.stream import AudioStream, VideoStream, TextStream
+from pitivi.stream import AudioStream, VideoStream, match_stream, \
+        match_stream_groups, StreamGroupWalker, \
+        STREAM_MATCH_MAXIMUM, STREAM_MATCH_SAME_CAPS, \
+        STREAM_MATCH_COMPATIBLE_CAPS, STREAM_MATCH_NONE, \
+        STREAM_MATCH_SAME_PAD_NAME
 import gst
 
 class TestMultimediaStream(object):
@@ -139,3 +143,139 @@ class TestVideoStream(TestMultimediaStream, TestCase):
         stream = VideoStream(caps)
         self.failUnlessEqual(stream.par, gst.Fraction(1, 1))
         self.failUnlessEqual(stream.dar, gst.Fraction(4, 3))
+
+
+class TestMatchStream(TestCase):
+    def testMatchStreamNoMatch(self):
+        s1 = AudioStream(gst.Caps("audio/x-vorbis"))
+        s2 = VideoStream(gst.Caps("video/x-theora"))
+
+        stream, rank = match_stream(s1, [])
+        self.failUnlessEqual(stream, None)
+        self.failUnlessEqual(rank, STREAM_MATCH_NONE)
+
+        stream, rank = match_stream(s1, [s2])
+        self.failUnlessEqual(stream, None)
+        self.failUnlessEqual(rank, STREAM_MATCH_NONE)
+
+    def testMatchStreamSameCaps(self):
+        s1 = AudioStream(gst.Caps("audio/x-vorbis, a=1"))
+        s2 = AudioStream(gst.Caps("audio/x-vorbis, a=2"))
+        stream, rank = match_stream(s1, [s1, s2])
+        self.failUnlessEqual(id(s1), id(stream))
+        self.failUnlessEqual(rank, STREAM_MATCH_SAME_CAPS)
+
+    def testMatchSamePadName(self):
+        s1 = AudioStream(gst.Caps("audio/x-vorbis"), pad_name="src0")
+        s2 = AudioStream(gst.Caps("audio/x-speex"), pad_name="src0")
+        stream, rank = match_stream(s1, [s2])
+        self.failUnlessEqual(id(s2), id(stream))
+        self.failUnlessEqual(rank, STREAM_MATCH_SAME_PAD_NAME)
+
+    def testMatchStreamCompatibleCaps(self):
+        s1 = AudioStream(gst.Caps("audio/x-vorbis, a={1, 2}"))
+        s2 = AudioStream(gst.Caps("audio/x-vorbis, a={2, 3}"))
+        stream, rank = match_stream(s1, [s2])
+        self.failUnlessEqual(id(s2), id(stream))
+        self.failUnlessEqual(rank, STREAM_MATCH_COMPATIBLE_CAPS)
+
+    def testMatchStreamSameNameAndSameCaps(self):
+        s1 = AudioStream(gst.Caps("audio/x-vorbis"), pad_name="src0")
+        s2 = AudioStream(gst.Caps("audio/x-vorbis"), pad_name="src0")
+        stream, rank = match_stream(s1, [s2])
+        self.failUnlessEqual(id(s2), id(stream))
+        self.failUnlessEqual(rank,
+                STREAM_MATCH_SAME_PAD_NAME + STREAM_MATCH_SAME_CAPS)
+        self.failUnlessEqual(rank, STREAM_MATCH_MAXIMUM)
+
+    def testMatchStreamSameNameAndCompatibleCaps(self):
+        s1 = AudioStream(gst.Caps("audio/x-vorbis, a={1, 2}"), pad_name="src0")
+        s2 = AudioStream(gst.Caps("audio/x-vorbis, a={2, 3}"), pad_name="src0")
+        stream, rank = match_stream(s1, [s2])
+        self.failUnlessEqual(id(s2), id(stream))
+        self.failUnlessEqual(rank,
+                STREAM_MATCH_SAME_PAD_NAME + STREAM_MATCH_COMPATIBLE_CAPS)
+
+
+class TestStreamGroupMatching(TestCase):
+    def testEmptyGroups(self):
+        group_a = []
+        group_b = []
+
+        walker = StreamGroupWalker(group_a, group_b)
+        self.failUnlessEqual(walker.advance(), [])
+        self.failUnlessEqual(walker.getMatches(), {})
+
+        stream = AudioStream(gst.Caps("audio/x-vorbis"))
+        group_a = [stream]
+        group_b = []
+        walker = StreamGroupWalker(group_a, group_b)
+        self.failUnlessEqual(walker.advance(), [])
+        self.failUnlessEqual(walker.getMatches(), {})
+
+        group_a = []
+        group_b = [stream]
+        walker = StreamGroupWalker(group_a, group_b)
+        self.failUnlessEqual(walker.advance(), [])
+        self.failUnlessEqual(walker.getMatches(), {})
+
+    def testSimpleMatch(self):
+        stream1 = AudioStream(gst.Caps("audio/x-vorbis"))
+        stream2 = AudioStream(gst.Caps("audio/x-raw-int"))
+        stream3 = AudioStream(gst.Caps("audio/x-vorbis, meh=asd"))
+
+        group_a = [stream1, stream2]
+        group_b = [stream3]
+
+        walker = StreamGroupWalker(group_a, group_b)
+        walkers = walker.advance()
+        self.failUnlessEqual(len(walkers), 2)
+
+        walker = walkers[0]
+        self.failUnlessEqual(walker.advance(), [])
+        self.failUnlessEqual(walker.getMatches(),
+                {(stream1, stream3): STREAM_MATCH_COMPATIBLE_CAPS})
+
+        walker = walkers[1]
+        self.failUnlessEqual(walker.advance(), [])
+        self.failUnlessEqual(walker.getMatches(), {})
+
+    def testMatchStreamGroupsOrder(self):
+        stream1 = AudioStream(gst.Caps("audio/x-vorbis"))
+        stream2 = AudioStream(gst.Caps("audio/x-vorbis"))
+        stream3 = AudioStream(gst.Caps("audio/x-vorbis"))
+
+        known_best_map = {(stream1, stream2): STREAM_MATCH_SAME_CAPS}
+
+        group_a = [stream1]
+        group_b = [stream2, stream3]
+        best_map = match_stream_groups(group_a, group_b)
+        self.failUnlessEqual(known_best_map, best_map)
+
+    def testMatchStreamGroupsBestMatch(self):
+        stream1 = AudioStream(gst.Caps("video/x-theora"))
+        stream2 = AudioStream(gst.Caps("audio/x-vorbis, meh={FAIL, WIN}"))
+        stream3 = AudioStream(gst.Caps("audio/x-vorbis"))
+        stream4 = AudioStream(gst.Caps("video/x-theora"))
+        stream5 = AudioStream(gst.Caps("audio/x-vorbis, meh=WIN"))
+        stream6 = AudioStream(gst.Caps("audio/x-vorbis"))
+
+        known_best_map = {(stream1, stream4): STREAM_MATCH_SAME_CAPS,
+                (stream2, stream5): STREAM_MATCH_COMPATIBLE_CAPS,
+                (stream3, stream6): STREAM_MATCH_SAME_CAPS}
+
+        group_a = [stream1, stream2, stream3]
+        group_b = [stream4, stream5, stream6]
+        best_map = match_stream_groups(group_a, group_b)
+        self.failUnlessEqual(known_best_map, best_map)
+
+        group_a = [stream1, stream2, stream3]
+        group_b = [stream6, stream5, stream4]
+        best_map = match_stream_groups(group_a, group_b)
+        self.failUnlessEqual(known_best_map, best_map)
+
+        group_a = [stream1, stream2, stream3]
+        group_b = [stream5, stream6, stream4]
+        best_map = match_stream_groups(group_a, group_b)
+        self.failUnlessEqual(known_best_map, best_map)
+



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]