[rhythmbox] playbin2: do everything asynchronously



commit f6c1c8a82e046a568b827f59b1af9d3158d0d9fc
Author: Jonathan Matthew <jonathan d14n org>
Date:   Mon Apr 5 17:28:36 2010 +1000

    playbin2: do everything asynchronously
    
    Rather than blocking until state changes finish, we now react to the
    state change bus messages.  This of course greatly reduces the amount of
    time we block the main loop, but also means that all signals are emitted
    on the main thread, even in cases where we start playback from a
    different thread (bug #614351).
    
    Also it turns out we don't need to pause the pipeline before seeking, so
    now we just send the seek event directly.

 backends/gstreamer/rb-player-gst.c |  494 ++++++++++++++++--------------------
 1 files changed, 224 insertions(+), 270 deletions(-)
---
diff --git a/backends/gstreamer/rb-player-gst.c b/backends/gstreamer/rb-player-gst.c
index 58f4d2d..ec1a4ed 100644
--- a/backends/gstreamer/rb-player-gst.c
+++ b/backends/gstreamer/rb-player-gst.c
@@ -83,6 +83,14 @@ enum
 	LAST_SIGNAL
 };
 
+enum StateChangeAction {
+	DO_NOTHING,
+	PLAYER_SHUTDOWN,
+	SET_NEXT_URI,
+	STOP_TICK_TIMER,
+	FINISH_TRACK_CHANGE
+};
+
 static guint signals[LAST_SIGNAL] = { 0 };
 
 struct _RBPlayerGstPrivate
@@ -96,6 +104,7 @@ struct _RBPlayerGstPrivate
 
 	GstElement *playbin;
 	GstElement *audio_sink;
+	enum StateChangeAction state_change_action;
 	guint buffer_size;
 
 	gboolean playing;
@@ -104,6 +113,7 @@ struct _RBPlayerGstPrivate
 	gboolean stream_change_pending;
 	gboolean current_track_finishing;
 	gboolean playbin_stream_changing;
+	gboolean track_change;
 
 	gboolean emitted_error;
 
@@ -282,6 +292,169 @@ handle_missing_plugin_message (RBPlayerGst *player, GstMessage *message)
 }
 
 static gboolean
+tick_timeout (RBPlayerGst *mp)
+{
+	if (mp->priv->playing) {
+		gint64 position;
+
+		position = rb_player_get_time (RB_PLAYER (mp));
+
+		/* if we don't have stream-changed messages, do the track change when
+		 * the playback position is less than one second into the current track,
+		 * which pretty much has to be the new one.
+		 */
+		if (mp->priv->playbin_stream_changing && (position < GST_SECOND)) {
+			emit_playing_stream_and_tags (mp, TRUE);
+			mp->priv->playbin_stream_changing = FALSE;
+		}
+
+		_rb_player_emit_tick (RB_PLAYER (mp),
+				      mp->priv->stream_data,
+				      position,
+				      -1);
+	}
+	return TRUE;
+}
+
+static void
+set_playbin_volume (RBPlayerGst *player, float volume)
+{
+	/* ignore the deep-notify we get directly from the sink, as it causes deadlock.
+	 * we still get another one anyway.
+	 */
+	g_signal_handlers_block_by_func (player->priv->playbin, volume_notify_cb, player);
+#if GST_CHECK_VERSION(0,10,25)
+	if (gst_element_implements_interface (player->priv->playbin, GST_TYPE_STREAM_VOLUME))
+		gst_stream_volume_set_volume (GST_STREAM_VOLUME (player->priv->playbin),
+					      GST_STREAM_VOLUME_FORMAT_CUBIC, volume);
+	else
+		g_object_set (player->priv->playbin, "volume", volume, NULL);
+#else
+	g_object_set (player->priv->playbin, "volume", volume, NULL);
+#endif
+	g_signal_handlers_unblock_by_func (player->priv->playbin, volume_notify_cb, player);
+}
+
+
+
+static void
+track_change_done (RBPlayerGst *mp, GError *error)
+{
+	mp->priv->stream_change_pending = FALSE;
+
+	if (error != NULL) {
+		rb_debug ("track change failed: %s", error->message);
+		return;
+	}
+	rb_debug ("track change finished");
+
+	mp->priv->current_track_finishing = FALSE;
+	mp->priv->buffering = FALSE;
+	mp->priv->playing = TRUE;
+
+	if (mp->priv->playbin_stream_changing == FALSE) {
+		emit_playing_stream_and_tags (mp, mp->priv->track_change);
+	}
+
+	if (mp->priv->tick_timeout_id == 0) {
+		mp->priv->tick_timeout_id =
+			g_timeout_add (1000 / RB_PLAYER_GST_TICK_HZ,
+				       (GSourceFunc) tick_timeout,
+				       mp);
+	}
+
+	if (mp->priv->volume_applied == 0) {
+		GstElement *e;
+
+		/* if the sink provides volume control, ignore the first
+		 * volume setting, allowing the sink to restore its own
+		 * volume.
+		 */
+		e = rb_player_gst_find_element_with_property (mp->priv->audio_sink, "volume");
+		if (e != NULL) {
+			mp->priv->volume_applied = 1;
+			gst_object_unref (e);
+		}
+
+		if (mp->priv->volume_applied < mp->priv->volume_changed) {
+			rb_debug ("applying initial volume: %f", mp->priv->cur_volume);
+			set_playbin_volume (mp, mp->priv->cur_volume);
+		}
+
+		mp->priv->volume_applied = mp->priv->volume_changed;
+	}
+}
+
+static void
+state_change_finished (RBPlayerGst *mp, GError *error)
+{
+	enum StateChangeAction action = mp->priv->state_change_action;
+	mp->priv->state_change_action = DO_NOTHING;
+
+	switch (action) {
+	case DO_NOTHING:
+		break;
+
+	case PLAYER_SHUTDOWN:
+		if (error != NULL) {
+			g_warning ("unable to shut down player pipeline: %s\n", error->message);
+		}
+		break;
+
+	case SET_NEXT_URI:
+		if (error != NULL) {
+			g_warning ("unable to stop playback: %s\n", error->message);
+		} else {
+			rb_debug ("setting new playback URI %s", mp->priv->uri);
+			g_object_set (mp->priv->playbin, "uri", mp->priv->uri, NULL);
+			mp->priv->state_change_action = FINISH_TRACK_CHANGE;
+			gst_element_set_state (mp->priv->playbin, GST_STATE_PLAYING);
+		}
+		break;
+
+	case STOP_TICK_TIMER:
+		if (error != NULL) {
+			g_warning ("unable to pause playback: %s\n", error->message);
+		} else {
+			if (mp->priv->tick_timeout_id != 0) {
+				g_source_remove (mp->priv->tick_timeout_id);
+				mp->priv->tick_timeout_id = 0;
+			}
+		}
+		break;
+
+	case FINISH_TRACK_CHANGE:
+		track_change_done (mp, error);
+		break;
+	}
+}
+
+static gboolean
+message_from_sink (GstElement *sink, GstMessage *message)
+{
+	GstElement *src;
+	GstElement *match;
+	char *name;
+
+	src = GST_ELEMENT (GST_MESSAGE_SRC (message));
+
+	if (GST_IS_BIN (sink) == FALSE) {
+		return (src == sink);
+	}
+
+	name = gst_element_get_name (src);
+	match = gst_bin_get_by_name (GST_BIN (sink), name);
+	g_free (name);
+
+	if (match != NULL) {
+		g_object_unref (match);
+		return (match == src);
+	}
+
+	return FALSE;
+}
+
+static gboolean
 bus_cb (GstBus *bus, GstMessage *message, RBPlayerGst *mp)
 {
 	const GstStructure *structure;
@@ -315,9 +488,23 @@ bus_cb (GstBus *bus, GstMessage *message, RBPlayerGst *mp)
 		}
 
 		if (emit) {
-			sig_error = g_error_new_literal (RB_PLAYER_ERROR,
-							 code,
-							 error->message);
+			if (message_from_sink (mp->priv->audio_sink, message)) {
+				rb_debug ("got error from sink: %s (%s)", error->message, debug);
+				/* Translators: the parameter here is an error message */
+				g_set_error (&sig_error,
+					     RB_PLAYER_ERROR,
+					     code,
+					     _("Failed to open output device: %s"),
+					     error->message);
+			} else {
+				rb_debug ("got error from stream: %s (%s)", error->message, debug);
+				g_set_error (&sig_error,
+					     RB_PLAYER_ERROR,
+					     code,
+					     "%s",
+					     error->message);
+			}
+			state_change_finished (mp, sig_error);
 			mp->priv->emitted_error = TRUE;
 			_rb_player_emit_error (RB_PLAYER (mp), mp->priv->stream_data, sig_error);
 		}
@@ -335,6 +522,21 @@ bus_cb (GstBus *bus, GstMessage *message, RBPlayerGst *mp)
 		_rb_player_emit_eos (RB_PLAYER (mp), mp->priv->stream_data, FALSE);
 		break;
 
+	case GST_MESSAGE_STATE_CHANGED:
+		{
+			GstState oldstate;
+			GstState newstate;
+			GstState pending;
+			gst_message_parse_state_changed (message, &oldstate, &newstate, &pending);
+			if (GST_MESSAGE_SRC (message) == GST_OBJECT (mp->priv->playbin)) {
+				rb_debug ("playbin reached state %s", gst_element_state_get_name (newstate));
+				if (pending == GST_STATE_VOID_PENDING) {
+					state_change_finished (mp, NULL);
+				}
+			}
+			break;
+		}
+
 	case GST_MESSAGE_TAG: {
 		GstTagList *tags;
 		gst_message_parse_tag (message, &tags);
@@ -348,6 +550,7 @@ bus_cb (GstBus *bus, GstMessage *message, RBPlayerGst *mp)
 		break;
 	}
 
+
 	case GST_MESSAGE_BUFFERING: {
 		gint progress;
 
@@ -540,173 +743,6 @@ construct_pipeline (RBPlayerGst *mp, GError **error)
 }
 
 static gboolean
-message_from_sink (GstElement *sink, GstMessage *message)
-{
-	GstElement *src;
-	GstElement *match;
-	char *name;
-
-	src = GST_ELEMENT (GST_MESSAGE_SRC (message));
-
-	if (GST_IS_BIN (sink) == FALSE) {
-		return (src == sink);
-	}
-
-	name = gst_element_get_name (src);
-	match = gst_bin_get_by_name (GST_BIN (sink), name);
-	g_free (name);
-
-	if (match != NULL) {
-		g_object_unref (match);
-		return (match == src);
-	}
-
-	return FALSE;
-}
-
-static gboolean
-set_state_and_wait (RBPlayerGst *player, GstState target, GError **error)
-{
-	GstBus *bus;
-	gboolean waiting;
-	gboolean result;
-
-	g_assert (player->priv->playbin != NULL);
-	/* XXX probably need to remove bus watch here if we're not on the main thread */
-	/* .. probably shouldn't be doing this much anyway .. */
-
-	rb_debug ("setting playbin state to %s", gst_element_state_get_name (target));
-
-	switch (gst_element_set_state (player->priv->playbin, target)) {
-	case GST_STATE_CHANGE_SUCCESS:
-		rb_debug ("state change was successful");
-		return TRUE;
-
-	case GST_STATE_CHANGE_NO_PREROLL:
-		rb_debug ("state change was successful (no preroll)");
-		return TRUE;
-
-	case GST_STATE_CHANGE_ASYNC:
-		rb_debug ("state is changing asynchronously");
-		result = TRUE;
-		break;
-
-	case GST_STATE_CHANGE_FAILURE:
-		rb_debug ("state change failed");
-		result = FALSE;
-		break;
-
-	default:
-		rb_debug ("unknown state change return..");
-		result = TRUE;
-		break;
-	}
-
-	bus = gst_element_get_bus (player->priv->playbin);
-	waiting = TRUE;
-	while (waiting) {
-		GstMessage *message;
-
-		message = gst_bus_timed_pop (bus, GST_SECOND * STATE_CHANGE_MESSAGE_TIMEOUT);
-		if (message == NULL) {
-			rb_debug ("state change is taking too long..");
-			break;
-		}
-
-		switch (GST_MESSAGE_TYPE (message)) {
-		case GST_MESSAGE_ERROR:
-			{
-				char *debug;
-				GError *gst_error = NULL;
-
-				gst_message_parse_error (message, &gst_error, &debug);
-
-				if (message_from_sink (player->priv->audio_sink, message)) {
-					rb_debug ("got error from sink: %s (%s)", gst_error->message, debug);
-					/* Translators: the parameter here is an error message */
-					g_set_error (error,
-						     RB_PLAYER_ERROR,
-						     RB_PLAYER_ERROR_INTERNAL,
-						     _("Failed to open output device: %s"),
-						     gst_error->message);
-				} else {
-					rb_debug ("got error from stream: %s (%s)", gst_error->message, debug);
-					g_set_error (error,
-						     RB_PLAYER_ERROR,
-						     RB_PLAYER_ERROR_GENERAL,
-						     "%s",
-						     gst_error->message);
-				}
-
-				g_error_free (gst_error);
-				g_free (debug);
-
-				waiting = FALSE;
-				result = FALSE;
-				break;
-			}
-
-		case GST_MESSAGE_STATE_CHANGED:
-			{
-				GstState oldstate;
-				GstState newstate;
-				GstState pending;
-				gst_message_parse_state_changed (message, &oldstate, &newstate, &pending);
-				if (GST_MESSAGE_SRC (message) == GST_OBJECT (player->priv->playbin)) {
-					rb_debug ("playbin reached state %s", gst_element_state_get_name (newstate));
-					if (pending == GST_STATE_VOID_PENDING && newstate == target) {
-						waiting = FALSE;
-					}
-				}
-				break;
-			}
-
-		default:
-			/* pass back to regular message handler */
-			bus_cb (bus, message, player);
-			break;
-		}
-
-		gst_message_unref (message);
-	}
-
-	if (result == FALSE && error != NULL && *error == NULL) {
-		g_set_error (error,
-			     RB_PLAYER_ERROR,
-			     RB_PLAYER_ERROR_GENERAL,
-			     _("Unable to start playback pipeline"));
-	}
-
-	return result;
-}
-
-static gboolean
-tick_timeout (RBPlayerGst *mp)
-{
-	if (mp->priv->playing) {
-		gint64 position;
-
-		position = rb_player_get_time (RB_PLAYER (mp));
-
-		/* if we don't have stream-changed messages, do the track change when
-		 * the playback position is less than one second into the current track,
-		 * which pretty much has to be the new one.
-		 */
-		if (mp->priv->playbin_stream_changing && (position < GST_SECOND)) {
-			emit_playing_stream_and_tags (mp, TRUE);
-			mp->priv->playbin_stream_changing = FALSE;
-		}
-
-		_rb_player_emit_tick (RB_PLAYER (mp),
-				      mp->priv->stream_data,
-				      position,
-				      -1);
-	}
-	return TRUE;
-}
-
-
-static gboolean
 impl_close (RBPlayer *player, const char *uri, GError **error)
 {
 	RBPlayerGst *mp = RB_PLAYER_GST (player);
@@ -734,10 +770,11 @@ impl_close (RBPlayer *player, const char *uri, GError **error)
 		mp->priv->tick_timeout_id = 0;
 	}
 
-	if (mp->priv->playbin == NULL)
-		return TRUE;
-
-	return set_state_and_wait (mp, GST_STATE_READY, error);
+	if (mp->priv->playbin != NULL) {
+		mp->priv->state_change_action = PLAYER_SHUTDOWN;
+		gst_element_set_state (mp->priv->playbin, GST_STATE_READY);
+	}
+	return TRUE;
 }
 
 static gboolean
@@ -781,39 +818,21 @@ impl_opened (RBPlayer *player)
 	return mp->priv->uri != NULL;
 }
 
-static void
-set_playbin_volume (RBPlayerGst *player, float volume)
-{
-	/* ignore the deep-notify we get directly from the sink, as it causes deadlock.
-	 * we still get another one anyway.
-	 */
-	g_signal_handlers_block_by_func (player->priv->playbin, volume_notify_cb, player);
-#if GST_CHECK_VERSION(0,10,25)
-	if (gst_element_implements_interface (player->priv->playbin, GST_TYPE_STREAM_VOLUME))
-		gst_stream_volume_set_volume (GST_STREAM_VOLUME (player->priv->playbin),
-					      GST_STREAM_VOLUME_FORMAT_CUBIC, volume);
-	else
-		g_object_set (player->priv->playbin, "volume", volume, NULL);
-#else
-	g_object_set (player->priv->playbin, "volume", volume, NULL);
-#endif
-	g_signal_handlers_unblock_by_func (player->priv->playbin, volume_notify_cb, player);
-}
-
 static gboolean
 impl_play (RBPlayer *player, RBPlayerPlayType play_type, gint64 crossfade, GError **error)
 {
 	RBPlayerGst *mp = RB_PLAYER_GST (player);
 	gboolean result;
-	gboolean track_change = TRUE;
 
 	g_return_val_if_fail (mp->priv->playbin != NULL, FALSE);
 
+	mp->priv->track_change = TRUE;
+
 	if (mp->priv->stream_change_pending == FALSE) {
 		rb_debug ("no stream change pending, just restarting playback");
-		result = set_state_and_wait (mp, GST_STATE_PLAYING, error);
-		track_change = FALSE;
-
+		mp->priv->state_change_action = FINISH_TRACK_CHANGE;
+		mp->priv->track_change = FALSE;
+		gst_element_set_state (mp->priv->playbin, GST_STATE_PLAYING);
 	} else if (mp->priv->current_track_finishing) {
 		rb_debug ("current track finishing -> just setting URI on playbin");
 		g_object_set (mp->priv->playbin, "uri", mp->priv->uri, NULL);
@@ -821,6 +840,7 @@ impl_play (RBPlayer *player, RBPlayerPlayType play_type, gint64 crossfade, GErro
 
 		mp->priv->playbin_stream_changing = TRUE;
 
+		track_change_done (mp, NULL);
 	} else {
 		gboolean reused = FALSE;
 
@@ -837,61 +857,19 @@ impl_play (RBPlayer *player, RBPlayerPlayType play_type, gint64 crossfade, GErro
 					       signals[REUSE_STREAM], 0,
 					       mp->priv->uri, mp->priv->prev_uri, mp->priv->playbin);
 				result = TRUE;
+				track_change_done (mp, *error);
 			}
 		}
 
 		/* no stream reuse, so stop, set the new URI, then start */
 		if (reused == FALSE) {
 			rb_debug ("not in transition, stopping current track to start the new one");
-			result = set_state_and_wait (mp, GST_STATE_READY, error);
-			if (result == TRUE) {
-				g_object_set (mp->priv->playbin, "uri", mp->priv->uri, NULL);
-				result = set_state_and_wait (mp, GST_STATE_PLAYING, error);
-			}
+			mp->priv->state_change_action = SET_NEXT_URI;
+			gst_element_set_state (mp->priv->playbin, GST_STATE_READY);
 		}
 
 	}
 
-	mp->priv->stream_change_pending = FALSE;
-
-	if (result) {
-		mp->priv->current_track_finishing = FALSE;
-		mp->priv->buffering = FALSE;
-		mp->priv->playing = TRUE;
-
-		if (mp->priv->playbin_stream_changing == FALSE) {
-			emit_playing_stream_and_tags (mp, track_change);
-		}
-
-		if (mp->priv->tick_timeout_id == 0) {
-			mp->priv->tick_timeout_id =
-				g_timeout_add (1000 / RB_PLAYER_GST_TICK_HZ,
-					       (GSourceFunc) tick_timeout,
-					       mp);
-		}
-
-		if (mp->priv->volume_applied == 0) {
-			GstElement *e;
-
-			/* if the sink provides volume control, ignore the first
-			 * volume setting, allowing the sink to restore its own
-			 * volume.
-			 */
-			e = rb_player_gst_find_element_with_property (mp->priv->audio_sink, "volume");
-			if (e != NULL) {
-				mp->priv->volume_applied = 1;
-				gst_object_unref (e);
-			}
-
-			if (mp->priv->volume_applied < mp->priv->volume_changed) {
-				rb_debug ("applying initial volume: %f", mp->priv->cur_volume);
-				set_playbin_volume (mp, mp->priv->cur_volume);
-			}
-
-			mp->priv->volume_applied = mp->priv->volume_changed;
-		}
-	}
-
 	return result;
 }
 
@@ -899,7 +877,6 @@ static void
 impl_pause (RBPlayer *player)
 {
 	RBPlayerGst *mp = RB_PLAYER_GST (player);
-	GError *error = NULL;
 
 	if (!mp->priv->playing)
 		return;
@@ -908,15 +885,8 @@ impl_pause (RBPlayer *player)
 
 	g_return_if_fail (mp->priv->playbin != NULL);
 
-	if (set_state_and_wait (mp, GST_STATE_PAUSED, &error) == FALSE) {
-		g_warning ("unable to pause playback: %s\n", error->message);
-		g_error_free (error);
-	}
-
-	if (mp->priv->tick_timeout_id != 0) {
-		g_source_remove (mp->priv->tick_timeout_id);
-		mp->priv->tick_timeout_id = 0;
-	}
+	mp->priv->state_change_action = STOP_TICK_TIMER;
+	gst_element_set_state (mp->priv->playbin, GST_STATE_PAUSED);
 }
 
 static gboolean
@@ -981,30 +951,14 @@ static void
 impl_set_time (RBPlayer *player, gint64 time)
 {
 	RBPlayerGst *mp = RB_PLAYER_GST (player);
-	GError *error = NULL;
-
-	g_return_if_fail (time >= 0);
-
-	g_return_if_fail (mp->priv->playbin != NULL);
-
-	if (set_state_and_wait (mp, GST_STATE_PAUSED, &error) == FALSE) {
-		g_warning ("got error while pausing the pipelink for seeking: %s\n", error->message);
-		g_clear_error (&error);
-
-		/* keep going anyway? */
-	}
 
+	rb_debug ("seeking to %" G_GINT64_FORMAT, time);
 	gst_element_seek (mp->priv->playbin, 1.0,
 			  GST_FORMAT_TIME, GST_SEEK_FLAG_FLUSH,
 			  GST_SEEK_TYPE_SET, time,
 			  GST_SEEK_TYPE_NONE, -1);
 
-	if (mp->priv->playing) {
-		if (set_state_and_wait (mp, GST_STATE_PLAYING, &error) == FALSE) {
-			g_warning ("unable to resume playback after seeking: %s\n", error->message);
-			g_clear_error (&error);
-		}
-	}
+	gst_element_get_state (mp->priv->playbin, NULL, NULL, 100 * GST_MSECOND);
 }
 
 static gint64



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]