[rhythmbox] encoder: open output streams asynchronously
- From: Jonathan Matthew <jmatthew src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [rhythmbox] encoder: open output streams asynchronously
- Date: Sun, 2 Oct 2016 23:32:40 +0000 (UTC)
commit ac70f6e4135de6d13315e348fbfdbb00a276f674
Author: Jonathan Matthew <jonathan d14n org>
Date: Mon Oct 3 08:38:47 2016 +1000
encoder: open output streams asynchronously
Also while we're here, get the output file size by doing a position
query on the pipeline rather than a gvfs file info query.
backends/gstreamer/rb-encoder-gst.c | 304 ++++++++++++++++-------------------
1 files changed, 141 insertions(+), 163 deletions(-)
---
diff --git a/backends/gstreamer/rb-encoder-gst.c b/backends/gstreamer/rb-encoder-gst.c
index 4bc4cc1..ff6f634 100644
--- a/backends/gstreamer/rb-encoder-gst.c
+++ b/backends/gstreamer/rb-encoder-gst.c
@@ -53,6 +53,8 @@ struct _RBEncoderGstPrivate {
GstElement *encodebin;
GstElement *pipeline;
+ GstElement *output;
+ GstElement *sink;
guint bus_watch_id;
gboolean transcoding;
@@ -66,8 +68,13 @@ struct _RBEncoderGstPrivate {
guint progress_id;
char *dest_uri;
char *dest_media_type;
+ gboolean overwrite;
+ gint64 dest_size;
GOutputStream *outstream;
+ GCancellable *open_cancel;
+ GTask *open_task;
+
GError *error;
};
@@ -100,9 +107,6 @@ static void
rb_encoder_gst_emit_completed (RBEncoderGst *encoder)
{
GError *error = NULL;
- guint64 dest_size;
- GFile *file;
- GFileInfo *file_info;
g_return_if_fail (encoder->priv->completion_emitted == FALSE);
@@ -128,26 +132,8 @@ rb_encoder_gst_emit_completed (RBEncoderGst *encoder)
error = NULL;
}
- /* find the size of the output file, assuming we can get at it with gio */
- dest_size = 0;
- if (encoder->priv->dest_uri != NULL) {
- file = g_file_new_for_uri (encoder->priv->dest_uri);
- file_info = g_file_query_info (file, G_FILE_ATTRIBUTE_STANDARD_SIZE, G_FILE_QUERY_INFO_NONE,
NULL, &error);
- if (error != NULL) {
- rb_debug ("couldn't get size of destination %s: %s",
- encoder->priv->dest_uri,
- error->message);
- g_clear_error (&error);
- } else {
- dest_size = g_file_info_get_attribute_uint64 (file_info,
G_FILE_ATTRIBUTE_STANDARD_SIZE);
- rb_debug ("destination file size: %" G_GUINT64_FORMAT, dest_size);
- g_object_unref (file_info);
- }
- g_object_unref (file);
- }
-
encoder->priv->completion_emitted = TRUE;
- _rb_encoder_emit_completed (RB_ENCODER (encoder), dest_size, encoder->priv->dest_media_type,
encoder->priv->error);
+ _rb_encoder_emit_completed (RB_ENCODER (encoder), encoder->priv->dest_size,
encoder->priv->dest_media_type, encoder->priv->error);
}
static void
@@ -198,6 +184,7 @@ bus_watch_cb (GstBus *bus, GstMessage *message, gpointer data)
break;
case GST_MESSAGE_EOS:
+ gst_element_query_position (encoder->priv->pipeline, GST_FORMAT_BYTES,
&encoder->priv->dest_size);
gst_element_set_state (encoder->priv->pipeline, GST_STATE_NULL);
if (encoder->priv->outstream != NULL) {
@@ -270,31 +257,6 @@ progress_timeout_cb (RBEncoderGst *encoder)
}
static void
-start_pipeline (RBEncoderGst *encoder)
-{
- GstStateChangeReturn result;
- GstBus *bus;
-
- g_assert (encoder->priv->pipeline != NULL);
-
- bus = gst_pipeline_get_bus (GST_PIPELINE (encoder->priv->pipeline));
- g_assert(encoder->priv->bus_watch_id == 0);
- encoder->priv->bus_watch_id = gst_bus_add_watch (bus, bus_watch_cb, encoder);
- g_object_unref (bus);
-
- result = gst_element_set_state (encoder->priv->pipeline, GST_STATE_PLAYING);
- if (result != GST_STATE_CHANGE_FAILURE) {
- /* start reporting progress */
- if (encoder->priv->total_length > 0) {
- _rb_encoder_emit_progress (RB_ENCODER (encoder), 0.0);
- encoder->priv->progress_id = g_timeout_add (250, (GSourceFunc)progress_timeout_cb,
encoder);
- } else {
- _rb_encoder_emit_progress (RB_ENCODER (encoder), -1);
- }
- }
-}
-
-static void
add_string_tag (GstTagList *tags, GstTagMergeMode mode, const char *tag, RhythmDBEntry *entry,
RhythmDBPropType property)
{
const char *v;
@@ -304,10 +266,8 @@ add_string_tag (GstTagList *tags, GstTagMergeMode mode, const char *tag, RhythmD
}
}
-static gboolean
-add_tags_from_entry (RBEncoderGst *encoder,
- RhythmDBEntry *entry,
- GError **error)
+static void
+add_tags_from_entry (RBEncoderGst *encoder, RhythmDBEntry *entry)
{
GstTagList *tags;
GValue obj = {0,};
@@ -389,8 +349,7 @@ add_tags_from_entry (RBEncoderGst *encoder,
}
}
- gst_tag_list_free (tags);
- return TRUE;
+ gst_tag_list_unref (tags);
}
static void
@@ -454,89 +413,6 @@ add_decoding_pipeline (RBEncoderGst *encoder,
return decodebin;
}
-static GFileOutputStream *
-create_stream (const char *dest, gboolean overwrite, GError **error)
-{
- GFile *file;
- GFileOutputStream *stream = NULL;
- GError *local_error = NULL;
-
- file = g_file_new_for_uri (dest);
- stream = g_file_create (file, G_FILE_CREATE_NONE, NULL, &local_error);
-
- if (local_error != NULL) {
- if (g_error_matches (local_error, G_IO_ERROR, G_IO_ERROR_EXISTS)) {
- if (overwrite) {
- g_clear_error (&local_error);
- stream = g_file_replace (file, NULL, FALSE, G_FILE_CREATE_NONE, NULL, error);
- } else {
- g_set_error_literal (error,
- RB_ENCODER_ERROR,
- RB_ENCODER_ERROR_DEST_EXISTS,
- local_error->message);
- g_clear_error (&local_error);
- }
- } else {
- g_propagate_error (error, local_error);
- }
- }
-
- g_object_unref (file);
- return stream;
-}
-
-static gboolean
-attach_output_pipeline (RBEncoderGst *encoder,
- GstElement *end,
- const char *dest,
- gboolean overwrite,
- GError **error)
-{
- GFileOutputStream *stream;
- GstElement *sink;
-
- /* if we can get to the location with gio, open the file here
- * (prompting for overwrite if it already exists) and use giostreamsink.
- * otherwise, create whatever sink element we can.
- */
- rb_debug ("attempting to open output file %s", dest);
-
- sink = gst_element_factory_make ("giostreamsink", NULL);
- if (sink != NULL) {
- stream = create_stream (dest, overwrite, error);
- if (stream != NULL) {
- g_object_set (sink, "stream", stream, NULL);
- encoder->priv->outstream = G_OUTPUT_STREAM (stream);
- } else if (g_error_matches (*error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED)) {
- rb_debug ("giostreamsink can't write to %s", dest);
- g_clear_object (&sink);
- g_clear_error (error);
- } else {
- g_object_unref (sink);
- return FALSE;
- }
- } else {
- rb_debug ("unable to create giostreamsink, falling back to default sink for %s", dest);
- }
-
- if (sink == NULL) {
- sink = gst_element_make_from_uri (GST_URI_SINK, dest, "sink", NULL);
- if (sink == NULL) {
- g_set_error (error, RB_ENCODER_ERROR, RB_ENCODER_ERROR_FILE_ACCESS,
- _("Could not create a GStreamer sink element to write to %s"),
- dest);
- return FALSE;
- }
- }
-
- /* provide a hook for setting sink properties */
- _rb_encoder_emit_prepare_sink (RB_ENCODER (encoder), dest, G_OBJECT (sink));
-
- gst_bin_add (GST_BIN (encoder->priv->pipeline), sink);
- gst_element_link (end, sink);
- return TRUE;
-}
-
static GstElement *
create_pipeline_and_source (RBEncoderGst *encoder,
RhythmDBEntry *entry,
@@ -631,6 +507,10 @@ impl_cancel (RBEncoder *bencoder)
{
RBEncoderGst *encoder = RB_ENCODER_GST (bencoder);
+ if (encoder->priv->open_cancel != NULL) {
+ g_cancellable_cancel (encoder->priv->open_cancel);
+ }
+
if (encoder->priv->pipeline != NULL) {
gst_element_set_state (encoder->priv->pipeline, GST_STATE_NULL);
g_object_unref (encoder->priv->pipeline);
@@ -676,6 +556,118 @@ cancel_idle (RBEncoder *encoder)
}
static void
+sink_open_cb (GObject *source_object, GAsyncResult *result, gpointer data)
+{
+ RBEncoderGst *encoder = RB_ENCODER_GST (source_object);
+ GstStateChangeReturn state_change;
+ GstBus *bus;
+ GError *error = NULL;
+
+ if (g_task_propagate_boolean (G_TASK (result), &error) == FALSE) {
+ set_error (encoder, error);
+ g_error_free (error);
+ rb_encoder_gst_emit_completed (encoder);
+ } else {
+ g_object_set (encoder->priv->sink, "stream", encoder->priv->outstream, NULL);
+ _rb_encoder_emit_prepare_sink (RB_ENCODER (encoder), encoder->priv->dest_uri, G_OBJECT
(encoder->priv->sink));
+
+ gst_bin_add (GST_BIN (encoder->priv->pipeline), encoder->priv->sink);
+ gst_element_link (encoder->priv->output, encoder->priv->sink);
+
+ bus = gst_pipeline_get_bus (GST_PIPELINE (encoder->priv->pipeline));
+ encoder->priv->bus_watch_id = gst_bus_add_watch (bus, bus_watch_cb, encoder);
+ g_object_unref (bus);
+
+ state_change = gst_element_set_state (encoder->priv->pipeline, GST_STATE_PLAYING);
+ if (state_change != GST_STATE_CHANGE_FAILURE) {
+ if (encoder->priv->total_length > 0) {
+ _rb_encoder_emit_progress (RB_ENCODER (encoder), 0.0);
+ encoder->priv->progress_id = g_timeout_add (250,
(GSourceFunc)progress_timeout_cb, encoder);
+ } else {
+ _rb_encoder_emit_progress (RB_ENCODER (encoder), -1);
+ }
+ }
+ }
+
+ g_object_unref (encoder);
+}
+
+static GOutputStream *
+stream_open (RBEncoderGst *encoder, GFile *file, GCancellable *cancellable, GError **error)
+{
+ GFileOutputStream *stream;
+
+ if (encoder->priv->overwrite) {
+ stream = g_file_replace (file, NULL, FALSE, G_FILE_CREATE_NONE, cancellable, error);
+ } else {
+ stream = g_file_create (file, G_FILE_CREATE_NONE, cancellable, error);
+ }
+
+ if (*error != NULL && g_error_matches (*error, G_IO_ERROR, G_IO_ERROR_EXISTS)) {
+ char *msg = g_strdup ((*error)->message);
+
+ g_clear_error (error);
+ g_set_error_literal (error,
+ RB_ENCODER_ERROR,
+ RB_ENCODER_ERROR_DEST_EXISTS,
+ msg);
+ g_free (msg);
+ }
+
+ return G_OUTPUT_STREAM (stream);
+}
+
+static void
+sink_open (GTask *task, gpointer source_object, gpointer task_data, GCancellable *cancellable)
+{
+ RBEncoderGst *encoder = RB_ENCODER_GST (source_object);
+ GError *error = NULL;
+
+ encoder->priv->sink = gst_element_factory_make ("giostreamsink", NULL);
+ if (encoder->priv->sink != NULL) {
+ GFile *file;
+
+ file = g_file_new_for_uri (encoder->priv->dest_uri);
+
+ encoder->priv->outstream = stream_open (encoder, file, cancellable, &error);
+ if (encoder->priv->outstream != NULL) {
+ rb_debug ("opened output stream for %s", encoder->priv->dest_uri);
+ } else if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED)) {
+ rb_debug ("using default sink for %s as gio can't do it", encoder->priv->dest_uri);
+ g_clear_error (&error);
+ g_clear_object (&encoder->priv->sink);
+ } else if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_INVALID_FILENAME)) {
+ char *dest;
+
+ g_clear_error (&error);
+
+ dest = rb_sanitize_uri_for_filesystem (encoder->priv->dest_uri, "msdos");
+ g_free (encoder->priv->dest_uri);
+ encoder->priv->dest_uri = dest;
+ rb_debug ("sanitized destination uri to %s", dest);
+
+ file = g_file_new_for_uri (encoder->priv->dest_uri);
+ encoder->priv->outstream = stream_open (encoder, file, cancellable, &error);
+ }
+ }
+
+ if (encoder->priv->sink == NULL) {
+ rb_debug ("unable to create giostreamsink, using default sink for %s",
encoder->priv->dest_uri);
+ encoder->priv->sink = gst_element_make_from_uri (GST_URI_SINK, encoder->priv->dest_uri,
"sink", NULL);
+ if (encoder->priv->sink == NULL) {
+ g_set_error (&error, RB_ENCODER_ERROR, RB_ENCODER_ERROR_FILE_ACCESS,
+ _("Could not create a GStreamer sink element to write to %s"),
+ encoder->priv->dest_uri);
+ g_task_return_error (task, error);
+ }
+ g_task_return_boolean (task, TRUE);
+ } else {
+ g_task_return_boolean (task, TRUE);
+ }
+ g_object_unref (task);
+}
+
+static void
impl_encode (RBEncoder *bencoder,
RhythmDBEntry *entry,
const char *dest,
@@ -684,14 +676,16 @@ impl_encode (RBEncoder *bencoder,
{
RBEncoderGst *encoder = RB_ENCODER_GST (bencoder);
GError *error = NULL;
- char *freedest = NULL;
- GstElement *end;
+ GTask *task;
g_return_if_fail (encoder->priv->pipeline == NULL);
+ g_clear_object (&encoder->priv->profile);
g_free (encoder->priv->dest_media_type);
g_free (encoder->priv->dest_uri);
- encoder->priv->dest_uri = NULL;
+ encoder->priv->dest_uri = g_strdup (dest);
+ encoder->priv->overwrite = overwrite;
+ encoder->priv->dest_size = 0;
/* keep ourselves alive in case we get cancelled by a signal handler */
g_object_ref (encoder);
@@ -701,46 +695,30 @@ impl_encode (RBEncoder *bencoder,
encoder->priv->total_length = rhythmdb_entry_get_uint64 (entry, RHYTHMDB_PROP_FILE_SIZE);
encoder->priv->position_format = GST_FORMAT_BYTES;
encoder->priv->dest_media_type = rhythmdb_entry_dup_string (entry, RHYTHMDB_PROP_MEDIA_TYPE);
-
- end = create_pipeline_and_source (encoder, entry, &error);
+ encoder->priv->output = create_pipeline_and_source (encoder, entry, &error);
} else {
gst_encoding_profile_ref (profile);
encoder->priv->profile = profile;
encoder->priv->total_length = rhythmdb_entry_get_ulong (entry, RHYTHMDB_PROP_DURATION);
encoder->priv->position_format = GST_FORMAT_TIME;
encoder->priv->dest_media_type = rb_gst_encoding_profile_get_media_type (profile);
+ encoder->priv->output = transcode_track (encoder, entry, &error);
- end = transcode_track (encoder, entry, &error);
- }
-
- if (error == NULL) {
- attach_output_pipeline (encoder, end, dest, overwrite, &error);
- if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_INVALID_FILENAME) && freedest == NULL) {
- freedest = rb_sanitize_uri_for_filesystem (dest, "msdos");
- dest = freedest;
-
- g_clear_error (&error);
- attach_output_pipeline (encoder, end, dest, overwrite, &error);
- }
- }
-
- if (error == NULL && profile != NULL) {
- add_tags_from_entry (encoder, entry, &error);
+ add_tags_from_entry (encoder, entry);
}
if (error != NULL) {
if (encoder->priv->cancelled == FALSE) {
set_error (encoder, error);
- g_idle_add ((GSourceFunc) cancel_idle, g_object_ref (encoder));
+ g_idle_add ((GSourceFunc) cancel_idle, encoder);
}
g_error_free (error);
} else {
- encoder->priv->dest_uri = g_strdup (dest);
- start_pipeline (encoder);
- }
+ encoder->priv->open_cancel = g_cancellable_new ();
- g_object_unref (encoder);
- g_free (freedest);
+ task = g_task_new (encoder, encoder->priv->open_cancel, sink_open_cb, NULL);
+ g_task_run_in_thread (task, sink_open);
+ }
}
static gboolean
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]