[pitivi] autoaligner: Port autoaligner to use new audio data
- From: Alexandru Băluț <alexbalut src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [pitivi] autoaligner: Port autoaligner to use new audio data
- Date: Wed, 21 Sep 2022 07:53:55 +0000 (UTC)
commit cefdb6930ae3e2915e30a2f18d09b6a687356bf8
Author: Thejas Kiran P S <thejaskiranps gmail com>
Date: Wed Sep 21 07:53:44 2022 +0000
autoaligner: Port autoaligner to use new audio data
Fixes #1345
.pre-commit-config.yaml | 15 +-
data/ui/timelinetoolbar.ui | 3 +-
pitivi/autoaligner.py | 746 ++++++--------------------------------------
pitivi/timeline/timeline.py | 23 +-
pitivi/undo/undo.py | 9 +
pitivi/utils/extract.py | 247 ---------------
tests/test_autoaligner.py | 100 ++++++
7 files changed, 226 insertions(+), 917 deletions(-)
---
diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index f37ddefee..23819d73c 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -27,7 +27,6 @@ repos:
args:
# http://www.pydocstyle.org/en/latest/error_codes.html
- --ignore=D1,D203,D213,D401,D406,D407,D413
- exclude: '.*pitivi/utils/extract.py$|.*pitivi/autoaligner.py$'
- repo: https://gitlab.com/PyCQA/flake8
rev: 3.9.2
hooks:
@@ -36,16 +35,16 @@ repos:
# http://flake8.pycqa.org/en/latest/user/error-codes.html
# https://pycodestyle.readthedocs.io/en/latest/intro.html#error-codes
- --ignore=E402,E501,E722,F401,F841,W504
- exclude: >
- (?x)^(
- pitivi/utils/extract.py|
- pitivi/autoaligner.py|
- )$
- repo: https://github.com/pre-commit/mirrors-mypy
rev: 'v0.910-1'
hooks:
- id: mypy
- files: '^pitivi/(clipproperties.py|timeline/timeline.py)$'
+ files: >
+ (?x)^pitivi/(
+ autoaligner.py|
+ clipproperties.py|
+ timeline/timeline.py
+ )$
args:
- --no-strict-optional
- repo: local
@@ -57,8 +56,6 @@ repos:
types: [python]
exclude: >
(?x)^(
- pitivi/utils/extract.py|
- pitivi/autoaligner.py|
tests/validate-tests/manager.py|
bin/pitivi.in
)$
diff --git a/data/ui/timelinetoolbar.ui b/data/ui/timelinetoolbar.ui
index c887dc2a4..d5bfe99ce 100644
--- a/data/ui/timelinetoolbar.ui
+++ b/data/ui/timelinetoolbar.ui
@@ -119,8 +119,7 @@
<property name="visible">True</property>
<property name="can_focus">False</property>
<property name="tooltip_text" translatable="yes">Align clips based on their soundtracks</property>
- <property name="visible_horizontal">False</property>
- <property name="visible_vertical">False</property>
+ <property name="action_name">timeline.align-clips</property>
<property name="label" translatable="yes">Align</property>
<property name="use_underline">True</property>
<property name="icon_name">stopwatch-symbolic</property>
diff --git a/pitivi/autoaligner.py b/pitivi/autoaligner.py
index fa7bfeed7..2d3977bc8 100644
--- a/pitivi/autoaligner.py
+++ b/pitivi/autoaligner.py
@@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
# Pitivi video editor
# Copyright (c) 2011, Benjamin M. Schwartz <bens alum mit edu>
+# Copyright (c) 2022, Thejas Kiran P S <thejaskiranps gmail com>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
@@ -14,668 +15,121 @@
#
# You should have received a copy of the GNU Lesser General Public
# License along with this program; if not, see <http://www.gnu.org/licenses/>.
-"""Automatic alignment of `Clip`s."""
-import array
-import os
-import time
+import os.path
+from typing import List
-from gi.repository import Gst
-from gi.repository import Gtk
+import numpy.typing
+from gi.repository import GES
+from scipy.signal import correlate
+from scipy.signal import correlation_lags
-
-try:
- import numpy
-except ImportError:
- numpy = None
-
-from gettext import gettext as _
-
-import pitivi.configure as configure
-
-from pitivi.utils.ui import beautify_eta
-from pitivi.utils.misc import call_false
-from pitivi.utils.extract import Extractee
+from pitivi.timeline.previewers import get_wavefile_location_for_uri
+from pitivi.timeline.previewers import SAMPLE_DURATION
from pitivi.utils.loggable import Loggable
-def nextpow2(x):
- a = 1
- while a < x:
- a *= 2
- return a
-
-
-def submax(left, middle, right):
- """
- Find the maximum of a quadratic function from three samples.
-
- Given samples from a quadratic P(x) at x=-1, 0, and 1, find the x
- that extremizes P. This is useful for determining the subsample
- position of the extremum given three samples around the observed
- extreme.
-
- @param left: value at x=-1
- @type left: L{float}
- @param middle: value at x=0
- @type middle: L{float}
- @param right: value at x=1
- @type right: L{float}
- @returns: value of x that extremizes the interpolating quadratic
- @rtype: L{float}
-
- """
- L = middle - left # L and R are both positive if middle is the
- R = middle - right # observed max of the integer samples
- return 0.5 * (R - L) / (R + L)
- # Derivation: Consider a quadratic q(x) := P(0) - P(x). Then q(x) has
- # two roots, one at 0 and one at z, and the extreme is at (0+z)/2
- # (i.e. at z/2)
- # q(x) = bx*(x-z) # a may be positive or negative
- # q(1) = b*(1 - z) = R
- # q(-1) = b*(1 + z) = L
- # (1+z)/(1-z) = L/R (from here it's just algebra to find a)
- # z + 1 = R/L - (R/L)*z
- # z*(1+R/L) = R/L - 1
- # z = (R/L - 1)/(R/L + 1) = (R-L)/(R+L)
-
-
-def rigidalign(reference, targets):
- """
- Estimate the relative shift between reference and targets.
-
- The algorithm works by subtracting the mean, and then locating
- the maximum of the cross-correlation. For inputs of length M{N},
- the running time is M{O(C{len(targets)}*N*log(N))}.
-
- @param reference: the waveform to regard as fixed
- @type reference: Sequence(Number)
- @param targets: the waveforms that should be aligned to reference
- @type targets: Sequence(Sequence(Number))
- @returns: The shift necessary to bring each target into alignment
- with the reference. The returned shift may not be an integer,
- indicating that the best alignment would be achieved by a
- non-integer shift and appropriate interpolation.
- @rtype: Sequence(Number)
-
- """
- # L is the maximum size of a cross-correlation between the
- # reference and any of the targets.
- L = len(reference) + max(len(t) for t in targets) - 1
- # We round up L to the next power of 2 for speed in the FFT.
- L = nextpow2(L)
- reference = reference - numpy.mean(reference)
- fref = numpy.fft.rfft(reference, L).conj()
- shifts = []
- for t in targets:
- t = t - numpy.mean(t)
- # Compute cross-correlation
- xcorr = numpy.fft.irfft(fref * numpy.fft.rfft(t, L))
- # shift maximizes dotproduct(t[shift:],reference)
- # int() to convert numpy.int32 to python int
- shift = int(numpy.argmax(xcorr))
- subsample_shift = submax(xcorr[(shift - 1) % L],
- xcorr[shift],
- xcorr[(shift + 1) % L])
- shift = shift + subsample_shift
- # shift is now a float indicating the interpolated maximum
- if shift >= len(t): # Negative shifts appear large and positive
- shift -= L # This corrects them to be negative
- shifts.append(-shift)
- # Sign reversed to move the target instead of the reference
- return shifts
-
-
-def _findslope(a):
- # Helper function for affinealign
- # The provided matrix a contains a bright line whose slope we want to know,
- # against a noisy background.
- # The line starts at 0,0. If the slope is positive, it runs toward the
- # center of the matrix (i.e. toward (-1,-1))
- # If the slope is negative, it wraps from 0,0 to 0,-1 and continues toward
- # the center, (i.e. toward (-1,0)).
- # The line segment terminates at the midline along the X direction.
- # We locate the line by simply checking the sum along each possible line
- # up to the Y-max edge of a. The caller sets the limit by choosing the
- # size of a.
- # The function returns a floating-point slope assuming that the matrix
- # has "square pixels".
- Y, X = a.shape
- X /= 2
- x_pos = numpy.arange(1, X)
- x_neg = numpy.arange(2 * X - 1, X, -1)
- best_end = 0
- max_sum = 0
- for end in range(Y):
- y = (x_pos * end) // X
- s = numpy.sum(a[y, x_pos])
- if s > max_sum:
- max_sum = s
- best_end = end
- s = numpy.sum(a[y, x_neg])
- if s > max_sum:
- max_sum = s
- best_end = -end
- return float(best_end) / X
-
-
-def affinealign(reference, targets, max_drift=0.02):
- """ EXPERIMENTAL FUNCTION.
-
- Perform an affine registration between a reference and a number of
- targets. Designed for aligning the amplitude envelopes of recordings of
- the same event by different devices.
-
- NOTE: This method is currently NOT USED by Pitivi, as it has proven both
- unnecessary and unusable. So far every test case has been registered
- successfully by rigidalign, and until Pitivi supports time-stretching of
- audio, the drift calculation cannot actually be used.
-
- @param reference: the reference signal to which others will be registered
- @type reference: array(number)
- @param targets: the signals to register
- @type targets: ordered iterable(array(number))
- @param max_drift: the maximum absolute clock drift rate
- (i.e. stretch factor) that will be considered during search
- @type max_drift: positive L{float}
- @return: (offsets, drifts). offsets[i] is the point in reference at which
- targets[i] starts. drifts[i] is the speed of targets[i] relative to
- the reference (positive is faster, meaning the target should be
- slowed down to be in sync with the reference)
- """
- L = len(reference) + max(len(t) for t in targets) - 1
- L2 = nextpow2(L)
- bsize = int(20. / max_drift) # NEEDS TUNING
- num_blocks = nextpow2(1.0 * len(reference) // bsize) # NEEDS TUNING
- bspace = (len(reference) - bsize) // num_blocks
- reference -= numpy.mean(reference)
-
- # Construct FFT'd reference blocks
- freference_blocks = numpy.zeros((L2 / 2 + 1, num_blocks),
- dtype=numpy.complex)
- for i in range(num_blocks):
- s = i * bspace
- tmp = numpy.zeros((L2,))
- tmp[s:s + bsize] = reference[s:s + bsize]
- freference_blocks[:, i] = numpy.fft.rfft(tmp, L2).conj()
- freference_blocks[:10, :] = 0 # High-pass to ignore slow volume variations
-
- offsets = []
- drifts = []
- for t in targets:
- t -= numpy.mean(t)
- ft = numpy.fft.rfft(t, L2)
- # fxcorr is the FFT'd cross-correlation with the reference blocks
- fxcorr_blocks = numpy.zeros((L2 / 2 + 1, num_blocks),
- dtype=numpy.complex)
- for i in range(num_blocks):
- fxcorr_blocks[:, i] = ft * freference_blocks[:, i]
- fxcorr_blocks[:, i] /= numpy.sqrt(
- numpy.sum(fxcorr_blocks[:, i] ** 2))
- del ft
- # At this point xcorr_blocks would show a distinct bright line, nearly
- # orthogonal to time, indicating where each of these blocks found their
- # peak. Each point on this line represents the time in t where block i
- # found its match. The time-intercept gives the time in b at which the
- # reference starts, and the slope gives the amount by which the
- # reference is faster relative to b.
-
- # The challenge now is to find this line. Our strategy is to reduce the
- # search to one dimension by first finding the slope.
- # The Fourier Transform of a smooth real line in 2D is an orthogonal
- # line through the origin, with phase that gives its position.
- # Unfortunately this line is not clearly visible in fxcorr_blocks, so
- # we discard the phase (by taking the absolute value) and then inverse
- # transform. This places the line at the origin, so we can find its
- # slope.
-
- # Construct the half-autocorrelation matrix
- # (A true autocorrelation matrix would be ifft(abs(fft(x))**2), but this
- # is just ifft(abs(fft(x))).)
- # Construction is stepwise partly in an attempt to save memory
- # The width is 2*num_blocks in order to avoid overlapping positive and
- # negative correlations
- halfautocorr = numpy.fft.fft(fxcorr_blocks, 2 * num_blocks, 1)
- halfautocorr = numpy.abs(halfautocorr)
- halfautocorr = numpy.fft.ifft(halfautocorr, None, 1)
- halfautocorr = numpy.fft.irfft(halfautocorr, None, 0)
- # Now it's actually the half-autocorrelation.
- # Chop out the bit we don't care about
- halfautocorr = halfautocorr[:bspace * num_blocks * max_drift, :]
- # Remove the local-correlation peak.
- halfautocorr[-1:2, -1:2] = 0 # NEEDS TUNING
- # Normalize each column (appears to be necessary)
- for i in range(2 * num_blocks):
- halfautocorr[:, i] /= numpy.sqrt(
- numpy.sum(halfautocorr[:, i] ** 2))
- drift = _findslope(halfautocorr) / bspace
- del halfautocorr
-
- # inverse transform and shift everything into alignment
- xcorr_blocks = numpy.fft.irfft(fxcorr_blocks, None, 0)
- del fxcorr_blocks
- # TODO: see if phase ramps are worthwhile here
- for i in range(num_blocks):
- blockcenter = i * bspace + bsize / 2
- shift = int(blockcenter * drift)
- if shift > 0:
- temp = xcorr_blocks[:shift, i].copy()
- xcorr_blocks[:-shift, i] = xcorr_blocks[shift:, i].copy()
- xcorr_blocks[-shift:, i] = temp
- elif shift < 0:
- temp = xcorr_blocks[shift:, i].copy()
- xcorr_blocks[-shift:, i] = xcorr_blocks[:shift, i].copy()
- xcorr_blocks[:-shift, i] = temp
-
- # xcorr is the drift-compensated cross-correlation
- xcorr = numpy.sum(xcorr_blocks, axis=1)
- del xcorr_blocks
-
- offset = numpy.argmax(xcorr)
- del xcorr
- if offset >= len(t):
- offset -= L2
-
- # now offset is the point in target at which reference starts and
- # drift is the speed with which the reference drifts relative to the
- # target. We reverse these relationships for the caller.
- slope = 1 + drift
- offsets.append(-offset / slope)
- drifts.append(1 / slope - 1)
- return offsets, drifts
-
-
-def getAudioTrack(clip):
- """
- Helper function for getting an audio track from a Clip
-
- @param clip: The Clip from which to locate an audio track
- @type clip: L{Clip}
- @returns: An audio track from clip, or None if clip has no audio track
- @rtype: audio L{TrackElement} or L{NoneType}
- """
- for track in clip.track_elements:
- if track.stream_type == AudioStream:
- return track
- return None
-
-
-class ProgressMeter:
-
- """Abstract interface representing a progress meter."""
-
- def addWatcher(self, function):
- """ Add a progress watching callback function. This callback will
- always be called from the main thread.
-
- @param function: a function to call with progress updates.
- @type function: callable(fractional_progress, time_remaining_text).
- fractional_progress is a float normalized to [0,1].
- time_remaining_text is a localized text string indicating the
- estimated time remaining.
- """
- raise NotImplementedError
-
-
-class ProgressAggregator(ProgressMeter):
-
- """A ProgressMeter that aggregates progress reports.
-
- Reports from multiple sources are combined into a unified progress
- report.
-
- """
-
- def __init__(self):
- # _targets is a list giving the size of each task.
- self._targets = []
- # _portions is a list of the same length as _targets, indicating
- # the portion of each task that as been completed (initially 0).
- self._portions = []
- self._start = time.time()
- self._watchers = []
-
- def getPortionCB(self, target):
- """Prepare a new input for the Aggregator.
-
- Given a target size
- (in arbitrary units, but should be consistent across all calls on
- a single ProgressAggregator object), it returns a callback that
- can be used to update progress on this portion of the task.
-
- @param target: the total task size for this portion
- @type target: number
- @returns: a callback that can be used to inform the Aggregator of
- subsequent updates to this portion
- @rtype: function(x), where x should be a number indicating the
- absolute amount of this subtask that has been completed.
-
- """
- i = len(self._targets)
- self._targets.append(target)
- self._portions.append(0)
-
- def cb(thusfar):
- self._portions[i] = thusfar
- GLib.idle_add(self._callForward)
- return cb
-
- def addWatcher(self, function):
- self._watchers.append(function)
-
- def _callForward(self):
- # This function always returns False so that it may be safely
- # invoked via GLib.idle_add(). Use of idle_add() is necessary
- # to ensure that watchers are always called from the main thread,
- # even if progress updates are received from other threads.
- total_target = sum(self._targets)
- total_completed = sum(self._portions)
- if total_target == 0:
- return False
- frac = min(1.0, float(total_completed) / total_target)
- now = time.time()
- remaining = (now - self._start) * (1 - frac) / frac
- for function in self._watchers:
- function(frac, beautify_eta(int(remaining * Gst.SECOND)))
- return False
-
-
-class EnvelopeExtractee(Extractee, Loggable):
-
- """Class that computes the envelope of a 1-D signal (audio).
-
- The envelope is defined as the sum of the absolute value of the signal
- over each block. This class computes the envelope incrementally,
- so that the entire signal does not ever need to be stored.
-
- """
-
- def __init__(self, blocksize, callback, *cbargs):
- """
- @param blocksize: the number of samples in a block
- @type blocksize: L{int}
- @param callback: a function to call when the extraction is complete.
- The function's first argument will be a numpy array
- representing the envelope, and any later argument to this
- function will be passed as subsequent arguments to callback.
-
- """
- Loggable.__init__(self)
- self._blocksize = blocksize
- self._cb = callback
- self._cbargs = cbargs
- self._blocks = numpy.zeros((0,), dtype=numpy.float32)
- self._empty = array.array('f', [])
- # self._samples buffers up to self._threshold samples, before
- # their envelope is computed and store in self._blocks, in order
- # to amortize some of the function call overheads.
- self._samples = array.array('f', [])
- self._threshold = 2000 * blocksize
- self._progress_watchers = []
-
- def receive(self, a):
- self._samples.extend(a)
- if len(self._samples) < self._threshold:
- return
- else:
- self._process_samples()
-
- def addWatcher(self, w):
- """
- Add a function to call with progress updates.
-
- @param w: callback function
- @type w: function(# of samples received so far)
-
- """
- self._progress_watchers.append(w)
-
- def _process_samples(self):
- excess = len(self._samples) % self._blocksize
- if excess != 0:
- samples_to_process = self._samples[:-excess]
- self._samples = self._samples[-excess:]
- else:
- samples_to_process = self._samples
- self._samples = array.array('f', [])
- self.debug("Adding %s samples to %s blocks",
- len(samples_to_process), len(self._blocks))
- newblocks = len(samples_to_process) // self._blocksize
- samples_abs = numpy.abs(
- samples_to_process).reshape((newblocks, self._blocksize))
- self._blocks.resize((len(self._blocks) + newblocks,))
- # This numpy.sum() call relies on samples_abs being a
- # floating-point type. If samples_abs.dtype is int16
- # then the sum may overflow.
- self._blocks[-newblocks:] = numpy.sum(samples_abs, 1)
- for w in self._progress_watchers:
- w(self._blocksize * len(self._blocks) + excess)
-
- def finalize(self):
- self._process_samples() # absorb any remaining buffered samples
- self._cb(self._blocks, *self._cbargs)
-
-
class AutoAligner(Loggable):
+ """Logic for aligning clips based on their audio."""
- """
- Class for aligning a set of L{Clip}s automatically.
-
- The alignment is based on their contents, so that the shifted tracks
- are synchronized. The current implementation only analyzes audio
- data, so timeline objects without an audio track cannot be aligned.
-
- """
-
- BLOCKRATE = 25
- """
- @ivar BLOCKRATE: The number of amplitude blocks per second.
-
- The AutoAligner works by computing the "amplitude envelope" of each
- audio stream. We define an amplitude envelope as the absolute value
- of the audio samples, downsampled to a low samplerate. This
- samplerate, in Hz, is given by BLOCKRATE. (It is given this name
- because the downsampling filter is implemented by very simple
- averaging over blocks, i.e. a box filter.) 25 Hz appears to be a
- good choice because it evenly divides all common audio samplerates
- (e.g. 11025 and 8000). Lower blockrate requires less CPU time but
- produces less accurate alignment. Higher blockrate is the reverse
- (and also cannot evenly divide all samplerates).
-
- """
-
- def __init__(self, clips, callback):
- """
- @param clips: an iterable of L{Clip}s.
- In this implementation, only L{Clip}s with at least one
- audio track will be aligned.
- @type clips: iter(L{Clip})
- @param callback: A function to call when alignment is complete. No
- arguments will be provided.
- @type callback: function
-
- """
+ def __init__(self, selection):
Loggable.__init__(self)
- # self._clips maps each object to its envelope. The values
- # are initially None prior to envelope extraction.
- self._clips = dict.fromkeys(clips)
- self._callback = callback
- # stack of (Track, Extractee) pairs waiting to be processed
- # When start() is called, the stack will be populated, and then
- # processed sequentially. Only one item from the stack will be
- # actively in process at a time.
- self._extraction_stack = []
+ # Remove transition clips if any.
+ clips = [clip for clip in selection if isinstance(clip, GES.UriClip)]
+ # Sorting the clip in descending order according to their length
+ self._clips: List[GES.Clip] = sorted(clips,
+ key=lambda clip: clip.props.duration,
+ reverse=True)
+
+ def _get_peaks(self,
+ clips: List[GES.Clip]
+ ) -> List[numpy.typing.NDArray[numpy.float64]]:
+ """Returns peak values of each clip from its wave cache."""
+ peaks = []
+ for clip in clips:
+ wavefile = get_wavefile_location_for_uri(clip.get_uri())
+ clip_peaks = numpy.load(wavefile)
+
+ # Slice out samples of trimmed part.
+ start = clip.inpoint // SAMPLE_DURATION
+ end = (clip.inpoint + clip.duration) // SAMPLE_DURATION
+ peaks.append(clip_peaks[start:end])
+ return peaks
@staticmethod
- def canAlign(clips):
- """
- Can an AutoAligner align these objects?
-
- Determine whether a group of timeline objects can all
- be aligned together by an AutoAligner.
-
- @param clips: a group of timeline objects
- @type clips: iterable(L{Clip})
- @returns: True iff the objects can aligned.
- @rtype: L{bool}
-
- """
- # numpy is a "soft dependency". If you're running without numpy,
- # this False return value is your only warning not to
- # use the AutoAligner, which will crash immediately.
- return all(getAudioTrack(t) is not None for t in clips)
-
- def _extractNextEnvelope(self):
- audiotrack, extractee = self._extraction_stack.pop()
- r = RandomAccessAudioExtractor(audiotrack.factory,
- audiotrack.stream)
- r.extract(extractee, audiotrack.in_point,
- audiotrack.out_point - audiotrack.in_point)
- return False
-
- def _envelopeCb(self, array, clip):
- self.debug("Receiving envelope for %s", clip)
- self._clips[clip] = array
- if self._extraction_stack:
- self._extractNextEnvelope()
- else: # This was the last envelope
- self._performShifts()
- self._callback()
-
- def start(self):
- """
- Initiate the auto-alignment process.
-
- @returns: a L{ProgressMeter} indicating the progress of the
- alignment
- @rtype: L{ProgressMeter}
-
- """
- progress_aggregator = ProgressAggregator()
- pairs = [] # (Clip, {audio}TrackElement) pairs
- for clip in list(self._clips.keys()):
- audiotrack = getAudioTrack(clip)
- if audiotrack is not None:
- pairs.append((clip, audiotrack))
- else: # forget any Clip without an audio track
- self._clips.pop(clip)
- if len(pairs) >= 2:
- for clip, audiotrack in pairs:
- # blocksize is the number of samples per block
- blocksize = audiotrack.stream.rate // self.BLOCKRATE
- extractee = EnvelopeExtractee(
- blocksize, self._envelopeCb, clip)
- # numsamples is the total number of samples in the track,
- # which is used by progress_aggregator to determine
- # the percent completion.
- numsamples = ((audiotrack.duration / Gst.SECOND) *
- audiotrack.stream.rate)
- extractee.addWatcher(
- progress_aggregator.getPortionCB(numsamples))
- self._extraction_stack.append((audiotrack, extractee))
- # After we return, start the extraction cycle.
- # This GLib.idle_add call should not be necessary;
- # we should be able to invoke _extractNextEnvelope directly
- # here. However, there is some as-yet-unexplained
- # race condition between the Python GIL, GTK UI updates,
- # GLib mainloop, and pygst multithreading, resulting in
- # occasional deadlocks during autoalignment.
- # This call to idle_add() reportedly eliminates the deadlock.
- # No one knows why.
- GLib.idle_add(self._extractNextEnvelope)
- else: # We can't do anything without at least two audio tracks
- # After we return, call the callback function (once)
- GLib.idle_add(call_false, self._callback)
- return progress_aggregator
-
- def _chooseReference(self):
- """
- Chooses the timeline object to use as a reference.
+ def can_align(clips: List[GES.Clip]) -> bool:
+ """Checks if auto alignment of the clips is possible."""
+ if len(clips) < 2:
+ return False
- This function currently selects the one with lowest priority,
- i.e. appears highest in the GUI. The behavior of this function
- affects user interaction, because the user may want to
- determine which object moves and which stays put.
+ # Check all clips have an audio track.
+ if not (all(c.get_track_types() & GES.TrackType.AUDIO
+ for c in clips)):
+ return False
- @returns: the timeline object with lowest priority.
- @rtype: L{Clip}
+ # Check every clip is from a different layer.
+ layers = [clip.get_layer() for clip in clips]
+ if len(set(layers)) < len(layers):
+ return False
+ # Check if peaks data have been generated by the previewer.
+ for clip in clips:
+ peaks_file_uri = get_wavefile_location_for_uri(clip.get_uri())
+ if not os.path.isfile(peaks_file_uri):
+ return False
+
+ return True
+
+ def _xalign(self,
+ peaks1: numpy.typing.NDArray[numpy.float64],
+ peaks2: numpy.typing.NDArray[numpy.float64]
+ ) -> numpy.int64:
+ """Calculates lag in peak-arrays of a pair of clips using cross correlation."""
+ corr = correlate(peaks1, peaks2)
+ lags = correlation_lags(peaks1.size, peaks2.size)
+ lag = lags[numpy.argmax(corr)]
+ return lag
+
+ def _calculate_shifts(self,
+ peaks: List[numpy.typing.NDArray[numpy.float64]]
+ ) -> List[numpy.int64]:
+ """Calculates the shift required by target clips wrt to reference clip.
+
+ Args:
+ peaks: List of peak values of each clip.
"""
- def priority(clip):
- return clip.priority
- return min(iter(self._clips.keys()), key=priority)
-
- def _performShifts(self):
- self.debug("performing shifts")
- reference = self._chooseReference()
- # By using pop(), this line also removes the reference
- # Clip and its envelope from further consideration,
- # saving some CPU time in rigidalign.
- reference_envelope = self._clips.pop(reference)
- # We call list() because we need a reliable ordering of the pairs
- # (In python 3, dict.items() returns an unordered dictview)
- pairs = list(self._clips.items())
- envelopes = [p[1] for p in pairs]
- offsets = rigidalign(reference_envelope, envelopes)
- for (movable, envelope), offset in zip(pairs, offsets):
- # tshift is the offset rescaled to units of nanoseconds
- tshift = int((offset * Gst.SECOND) / self.BLOCKRATE)
- self.debug("Shifting %s to %i ns from %i",
- movable, tshift, reference.start)
- newstart = reference.start + tshift
- if newstart >= 0:
- movable.start = newstart
- else:
- # Timeline objects always must have a positive start point, so
- # if alignment would move an object to start at negative time,
- # we instead make it start at zero and chop off the required
- # amount at the beginning.
- movable.start = 0
- movable.in_point = movable.in_point - newstart
- movable.duration += newstart
-
-
-class AlignmentProgressDialog:
-
- """ Dialog indicating the progress of the auto-alignment process.
- Code derived from L{RenderingProgressDialog}, but greatly simplified
- (read-only, no buttons)."""
-
- def __init__(self, app):
- self.builder = Gtk.Builder()
- self.builder.add_from_file(
- os.path.join(configure.get_ui_dir(), "alignmentprogress.ui"))
- self.builder.connect_signals(self)
+ # Select peaks of largest clip as reference.
+ reference = peaks[0]
+ reference -= reference.mean()
+
+ shifts = []
+ # Adding 0 shift for the reference clip.
+ shifts.append(numpy.int64(0))
+ for clip_peaks in peaks[1:]:
+ clip_peaks -= clip_peaks.mean()
+ shift = self._xalign(reference, clip_peaks)
+ # Converting shift to time to be shifted in ns.
+ shift *= SAMPLE_DURATION
+ shifts.append(shift)
+
+ return shifts
+
+ def run(self) -> None:
+ if not self.can_align(self._clips):
+ return
- self.window = self.builder.get_object("align-progress")
- self.progressbar = self.builder.get_object("progressbar")
- # Parent this dialog with mainwindow
- # set_transient_for allows this dialog to properly
- # minimize together with the mainwindow. This method is
- # taken from RenderingProgressDialog. In both cases, it appears
- # to work correctly, although there is a known bug for Gnome 3 in
- # RenderingProgressDialog (bug #652917)
- self.window.set_transient_for(app.gui)
+ peaks = self._get_peaks(self._clips)
- # FIXME: Add a cancel button
+ shifts = self._calculate_shifts(peaks)
+ self._perform_shifts(shifts)
- def updatePosition(self, fraction, estimated):
- self.progressbar.set_fraction(fraction)
- self.window.set_title(_("%d%% Analyzed") % int(100 * fraction))
- if estimated:
- self.progressbar.set_text(_("About %s left") % estimated)
+ def _perform_shifts(self, shifts: List[numpy.int64]) -> None:
+ reference = self._clips[0]
+ starts = [reference.props.start + shift for shift in shifts]
+ min_start = min(starts)
+ if min_start < 0:
+ # Adjust the starts to avoid placing clips at a negative position.
+ starts = [start - min_start for start in starts]
-if __name__ == '__main__':
- # Simple command-line test
- from sys import argv
- names = argv[1:]
- envelopes = [numpy.fromfile(n) for n in names]
- reference = envelopes[-1]
- offsets, drifts = affinealign(reference, envelopes, 0.02)
- print(offsets, drifts)
- import matplotlib.pyplot as plt
- fig, ax = plt.subplots()
- for o, d, e in zip(offsets, drifts, envelopes):
- t = o + (1 + d) * numpy.arange(len(e))
- ax.plot(t, e / numpy.sqrt(numpy.sum(e ** 2)))
- plt.show()
+ for clip, start in zip(self._clips, starts):
+ clip.props.start = start
diff --git a/pitivi/timeline/timeline.py b/pitivi/timeline/timeline.py
index 95786378c..8b341d67e 100644
--- a/pitivi/timeline/timeline.py
+++ b/pitivi/timeline/timeline.py
@@ -28,7 +28,6 @@ from gi.repository import Gst
from gi.repository import Gtk
from pitivi.action_search_bar import ActionSearchBar
-from pitivi.autoaligner import AlignmentProgressDialog
from pitivi.autoaligner import AutoAligner
from pitivi.configure import get_ui_dir
from pitivi.configure import in_devel
@@ -1670,6 +1669,7 @@ class TimelineContainer(Gtk.Grid, Zoomable, Loggable):
self.forward_one_frame_action.set_enabled(project_loaded)
self.backward_one_second_action.set_enabled(project_loaded)
self.forward_one_second_action.set_enabled(project_loaded)
+ self.align_clips_action.set_enabled(AutoAligner.can_align(selection))
# Internal API
@@ -1861,6 +1861,10 @@ class TimelineContainer(Gtk.Grid, Zoomable, Loggable):
self.add_effect_action,
_("Add an effect to the selected clip"))
+ self.align_clips_action = Gio.SimpleAction.new("align-clips", None)
+ self.align_clips_action.connect("activate", self._align_selected_cb)
+ group.add_action(self.align_clips_action)
+
if in_devel():
self.gapless_action = Gio.SimpleAction.new("toggle-gapless-mode", None)
self.gapless_action.connect("activate", self._gaplessmode_toggled_cb)
@@ -2184,18 +2188,11 @@ class TimelineContainer(Gtk.Grid, Zoomable, Loggable):
if not self.ges_timeline:
return
- progress_dialog = AlignmentProgressDialog(self.app)
- progress_dialog.window.show()
- self.app.action_log.begin("align", toplevel=True)
-
- def aligned_cb(): # Called when alignment is complete
- self.app.action_log.commit()
- self._project.pipeline.commit_timeline()
- progress_dialog.window.destroy()
-
- auto_aligner = AutoAligner(self.timeline.selection, aligned_cb)
- progress_meter = auto_aligner.start()
- progress_meter.add_watcher(progress_dialog.update_position)
+ with self.app.action_log.started("Align clips",
+
finalizing_action=CommitTimelineFinalizingAction(self._project.pipeline),
+ toplevel=True):
+ auto_aligner = AutoAligner(self.timeline.selection)
+ auto_aligner.run()
def _split_cb(self, unused_action, unused_parameter):
"""Splits clips.
diff --git a/pitivi/undo/undo.py b/pitivi/undo/undo.py
index c2497456d..90c561075 100644
--- a/pitivi/undo/undo.py
+++ b/pitivi/undo/undo.py
@@ -172,6 +172,15 @@ class UndoableActionLog(GObject.Object, Loggable):
The operation will be composed of all the actions which have been
pushed and also of the committed sub-operations.
+
+ Args:
+ action_group_name (str): The name of the operation.
+ finalizing_action (FinalizingAction): The action to be performed
+ at the end of undoing or redoing the stacked actions.
+ mergeable (bool): Whether this stack accepts merges with future
+ compatible stacks.
+ toplevel (bool): If true, throws error if this operation is
+ started while another one is being recorded.
"""
if self.running:
self.debug("Abort because running")
diff --git a/tests/test_autoaligner.py b/tests/test_autoaligner.py
new file mode 100644
index 000000000..191f37187
--- /dev/null
+++ b/tests/test_autoaligner.py
@@ -0,0 +1,100 @@
+# -*- coding: utf-8 -*-
+# Pitivi video editor
+# Copyright (c) 2022, Thejas Kiran P S <thejaskiranps gmail com>
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU Lesser General Public
+# License as published by the Free Software Foundation; either
+# version 2.1 of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Lesser General Public License for more details.
+#
+# You should have received a copy of the GNU Lesser General Public
+# License along with this program; if not, see <http://www.gnu.org/licenses/>.
+import os
+
+from gi.repository import GES
+from gi.repository import Gst
+
+from pitivi.autoaligner import AutoAligner
+from pitivi.timeline.previewers import AudioPreviewer
+from pitivi.timeline.previewers import get_wavefile_location_for_uri
+from pitivi.undo.timeline import CommitTimelineFinalizingAction
+from tests import common
+
+
+class TestAutoAligner(common.TestCase):
+ """Tests for AutoAligner class."""
+
+ def __generate_wavefile(self, clip):
+ wavefile = get_wavefile_location_for_uri(clip.props.uri)
+ if os.path.exists(wavefile):
+ return
+
+ for source in clip.get_children(False):
+ if isinstance(source, GES.AudioUriSource):
+ source_clip = source
+
+ mainloop = common.create_main_loop()
+ previewer = AudioPreviewer(source_clip, 90)
+ previewer.connect("done", lambda x: mainloop.quit())
+ previewer.start_generation()
+ mainloop.run()
+ self.assertTrue(os.path.exists(wavefile))
+
+ @common.setup_timeline
+ def test_auto_aligner(self):
+ # Prevent magnetic snapping from interfering with the alignment of clips.
+ self.timeline.props.snapping_distance = 0
+ self.timeline.append_layer()
+ layers = self.timeline.get_layers()
+ # Add clips(tears_of_steel.webm) to both layers with a
+ # slight difference in their starting positions.
+ clip1 = self.add_clip(layers[0], start=0, duration=Gst.SECOND)
+ clip2 = self.add_clip(layers[1], start=Gst.SECOND, duration=Gst.SECOND)
+ self.__generate_wavefile(clip1)
+
+ self.assertNotEqual(clip1.start, clip2.start)
+ autoaligner = AutoAligner([clip1, clip2])
+ autoaligner.run()
+ self.assertEqual(clip1.start, clip2.start)
+
+ @common.setup_timeline
+ def test_negative_shifts(self):
+ """Tests shifts causing negative clip.start are handled properly."""
+ self.timeline.props.snapping_distance = 0
+ self.timeline.append_layer()
+ layers = self.timeline.get_layers()
+ clip1 = self.add_clip(layers[0], start=0, inpoint=Gst.SECOND // 2, duration=Gst.SECOND)
+ clip2 = self.add_clip(layers[1], start=0, duration=Gst.SECOND)
+ self.__generate_wavefile(clip1)
+
+ autoaligner = AutoAligner([clip1, clip2])
+ autoaligner.run()
+ self.assertEqual(clip1.start, Gst.SECOND // 2)
+ self.assertEqual(clip2.start, 0)
+
+ @common.setup_timeline
+ def test_align_undo_redo(self):
+ self.timeline.props.snapping_distance = 0
+ self.timeline.append_layer()
+ layers = self.timeline.get_layers()
+
+ clip1 = self.add_clip(layers[0], start=0, duration=Gst.SECOND)
+ clip2 = self.add_clip(layers[1], start=Gst.SECOND, duration=Gst.SECOND)
+ self.__generate_wavefile(clip1)
+
+ with self.action_log.started("Align clips",
+ finalizing_action=CommitTimelineFinalizingAction(self.project.pipeline),
+ toplevel=True):
+ autoaligner = AutoAligner([clip1, clip2])
+ autoaligner.run()
+ self.assertEqual([clip1.start, clip2.start], [0, 0])
+
+ self.action_log.undo()
+ self.assertEqual([clip1.start, clip2.start], [0, Gst.SECOND])
+ self.action_log.redo()
+ self.assertEqual([clip1.start, clip2.start], [0, 0])
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]