[rhythmbox] encoder: open output streams asynchronously



commit ac70f6e4135de6d13315e348fbfdbb00a276f674
Author: Jonathan Matthew <jonathan d14n org>
Date:   Mon Oct 3 08:38:47 2016 +1000

    encoder: open output streams asynchronously
    
    Also while we're here, get the output file size by doing a position
    query on the pipeline rather than a gvfs file info query.

 backends/gstreamer/rb-encoder-gst.c |  304 ++++++++++++++++-------------------
 1 files changed, 141 insertions(+), 163 deletions(-)
---
diff --git a/backends/gstreamer/rb-encoder-gst.c b/backends/gstreamer/rb-encoder-gst.c
index 4bc4cc1..ff6f634 100644
--- a/backends/gstreamer/rb-encoder-gst.c
+++ b/backends/gstreamer/rb-encoder-gst.c
@@ -53,6 +53,8 @@ struct _RBEncoderGstPrivate {
 
        GstElement *encodebin;
        GstElement *pipeline;
+       GstElement *output;
+       GstElement *sink;
        guint bus_watch_id;
 
        gboolean transcoding;
@@ -66,8 +68,13 @@ struct _RBEncoderGstPrivate {
        guint progress_id;
        char *dest_uri;
        char *dest_media_type;
+       gboolean overwrite;
+       gint64 dest_size;
 
        GOutputStream *outstream;
+       GCancellable *open_cancel;
+       GTask *open_task;
+
 
        GError *error;
 };
@@ -100,9 +107,6 @@ static void
 rb_encoder_gst_emit_completed (RBEncoderGst *encoder)
 {
        GError *error = NULL;
-       guint64 dest_size;
-       GFile *file;
-       GFileInfo *file_info;
 
        g_return_if_fail (encoder->priv->completion_emitted == FALSE);
 
@@ -128,26 +132,8 @@ rb_encoder_gst_emit_completed (RBEncoderGst *encoder)
                error = NULL;
        }
 
-       /* find the size of the output file, assuming we can get at it with gio */
-       dest_size = 0;
-       if (encoder->priv->dest_uri != NULL) {
-               file = g_file_new_for_uri (encoder->priv->dest_uri);
-               file_info = g_file_query_info (file, G_FILE_ATTRIBUTE_STANDARD_SIZE, G_FILE_QUERY_INFO_NONE, 
NULL, &error);
-               if (error != NULL) {
-                       rb_debug ("couldn't get size of destination %s: %s",
-                                 encoder->priv->dest_uri,
-                                 error->message);
-                       g_clear_error (&error);
-               } else {
-                       dest_size = g_file_info_get_attribute_uint64 (file_info, 
G_FILE_ATTRIBUTE_STANDARD_SIZE);
-                       rb_debug ("destination file size: %" G_GUINT64_FORMAT, dest_size);
-                       g_object_unref (file_info);
-               }
-               g_object_unref (file);
-       }
-
        encoder->priv->completion_emitted = TRUE;
-       _rb_encoder_emit_completed (RB_ENCODER (encoder), dest_size, encoder->priv->dest_media_type, 
encoder->priv->error);
+       _rb_encoder_emit_completed (RB_ENCODER (encoder), encoder->priv->dest_size, 
encoder->priv->dest_media_type, encoder->priv->error);
 }
 
 static void
@@ -198,6 +184,7 @@ bus_watch_cb (GstBus *bus, GstMessage *message, gpointer data)
                break;
 
        case GST_MESSAGE_EOS:
+               gst_element_query_position (encoder->priv->pipeline, GST_FORMAT_BYTES, 
&encoder->priv->dest_size);
 
                gst_element_set_state (encoder->priv->pipeline, GST_STATE_NULL);
                if (encoder->priv->outstream != NULL) {
@@ -270,31 +257,6 @@ progress_timeout_cb (RBEncoderGst *encoder)
 }
 
 static void
-start_pipeline (RBEncoderGst *encoder)
-{
-       GstStateChangeReturn result;
-       GstBus *bus;
-
-       g_assert (encoder->priv->pipeline != NULL);
-
-       bus = gst_pipeline_get_bus (GST_PIPELINE (encoder->priv->pipeline));
-       g_assert(encoder->priv->bus_watch_id == 0);
-       encoder->priv->bus_watch_id = gst_bus_add_watch (bus, bus_watch_cb, encoder);
-       g_object_unref (bus);
-
-       result = gst_element_set_state (encoder->priv->pipeline, GST_STATE_PLAYING);
-       if (result != GST_STATE_CHANGE_FAILURE) {
-               /* start reporting progress */
-               if (encoder->priv->total_length > 0) {
-                       _rb_encoder_emit_progress (RB_ENCODER (encoder), 0.0);
-                       encoder->priv->progress_id = g_timeout_add (250, (GSourceFunc)progress_timeout_cb, 
encoder);
-               } else {
-                       _rb_encoder_emit_progress (RB_ENCODER (encoder), -1);
-               }
-       }
-}
-
-static void
 add_string_tag (GstTagList *tags, GstTagMergeMode mode, const char *tag, RhythmDBEntry *entry, 
RhythmDBPropType property)
 {
        const char *v;
@@ -304,10 +266,8 @@ add_string_tag (GstTagList *tags, GstTagMergeMode mode, const char *tag, RhythmD
        }
 }
 
-static gboolean
-add_tags_from_entry (RBEncoderGst *encoder,
-                    RhythmDBEntry *entry,
-                    GError **error)
+static void
+add_tags_from_entry (RBEncoderGst *encoder, RhythmDBEntry *entry)
 {
        GstTagList *tags;
        GValue obj = {0,};
@@ -389,8 +349,7 @@ add_tags_from_entry (RBEncoderGst *encoder,
                }
        }
 
-       gst_tag_list_free (tags);
-       return TRUE;
+       gst_tag_list_unref (tags);
 }
 
 static void
@@ -454,89 +413,6 @@ add_decoding_pipeline (RBEncoderGst *encoder,
        return decodebin;
 }
 
-static GFileOutputStream *
-create_stream (const char *dest, gboolean overwrite, GError **error)
-{
-       GFile *file;
-       GFileOutputStream *stream = NULL;
-       GError *local_error = NULL;
-
-       file = g_file_new_for_uri (dest);
-       stream = g_file_create (file, G_FILE_CREATE_NONE, NULL, &local_error);
-
-       if (local_error != NULL) {
-               if (g_error_matches (local_error, G_IO_ERROR, G_IO_ERROR_EXISTS)) {
-                       if (overwrite) {
-                               g_clear_error (&local_error);
-                               stream = g_file_replace (file, NULL, FALSE, G_FILE_CREATE_NONE, NULL, error);
-                       } else {
-                               g_set_error_literal (error,
-                                                    RB_ENCODER_ERROR,
-                                                    RB_ENCODER_ERROR_DEST_EXISTS,
-                                                    local_error->message);
-                               g_clear_error (&local_error);
-                       }
-               } else {
-                       g_propagate_error (error, local_error);
-               }
-       }
-
-       g_object_unref (file);
-       return stream;
-}
-
-static gboolean
-attach_output_pipeline (RBEncoderGst *encoder,
-                       GstElement *end,
-                       const char *dest,
-                       gboolean overwrite,
-                       GError **error)
-{
-       GFileOutputStream *stream;
-       GstElement *sink;
-
-       /* if we can get to the location with gio, open the file here
-        * (prompting for overwrite if it already exists) and use giostreamsink.
-        * otherwise, create whatever sink element we can.
-        */
-       rb_debug ("attempting to open output file %s", dest);
-
-       sink = gst_element_factory_make ("giostreamsink", NULL);
-       if (sink != NULL) {
-               stream = create_stream (dest, overwrite, error);
-               if (stream != NULL) {
-                       g_object_set (sink, "stream", stream, NULL);
-                       encoder->priv->outstream = G_OUTPUT_STREAM (stream);
-               } else if (g_error_matches (*error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED)) {
-                       rb_debug ("giostreamsink can't write to %s", dest);
-                       g_clear_object (&sink);
-                       g_clear_error (error);
-               } else {
-                       g_object_unref (sink);
-                       return FALSE;
-               }
-       } else {
-               rb_debug ("unable to create giostreamsink, falling back to default sink for %s", dest);
-       }
-
-       if (sink == NULL) {
-               sink = gst_element_make_from_uri (GST_URI_SINK, dest, "sink", NULL);
-               if (sink == NULL) {
-                       g_set_error (error, RB_ENCODER_ERROR, RB_ENCODER_ERROR_FILE_ACCESS,
-                                    _("Could not create a GStreamer sink element to write to %s"),
-                                    dest);
-                       return FALSE;
-               }
-       }
-
-       /* provide a hook for setting sink properties */
-       _rb_encoder_emit_prepare_sink (RB_ENCODER (encoder), dest, G_OBJECT (sink));
-
-       gst_bin_add (GST_BIN (encoder->priv->pipeline), sink);
-       gst_element_link (end, sink);
-       return TRUE;
-}
-
 static GstElement *
 create_pipeline_and_source (RBEncoderGst *encoder,
                            RhythmDBEntry *entry,
@@ -631,6 +507,10 @@ impl_cancel (RBEncoder *bencoder)
 {
        RBEncoderGst *encoder = RB_ENCODER_GST (bencoder);
 
+       if (encoder->priv->open_cancel != NULL) {
+               g_cancellable_cancel (encoder->priv->open_cancel);
+       }
+
        if (encoder->priv->pipeline != NULL) {
                gst_element_set_state (encoder->priv->pipeline, GST_STATE_NULL);
                g_object_unref (encoder->priv->pipeline);
@@ -676,6 +556,118 @@ cancel_idle (RBEncoder *encoder)
 }
 
 static void
+sink_open_cb (GObject *source_object, GAsyncResult *result, gpointer data)
+{
+       RBEncoderGst *encoder = RB_ENCODER_GST (source_object);
+       GstStateChangeReturn state_change;
+       GstBus *bus;
+       GError *error = NULL;
+
+       if (g_task_propagate_boolean (G_TASK (result), &error) == FALSE) {
+               set_error (encoder, error);
+               g_error_free (error);
+               rb_encoder_gst_emit_completed (encoder);
+       } else {
+               g_object_set (encoder->priv->sink, "stream", encoder->priv->outstream, NULL);
+               _rb_encoder_emit_prepare_sink (RB_ENCODER (encoder), encoder->priv->dest_uri, G_OBJECT 
(encoder->priv->sink));
+
+               gst_bin_add (GST_BIN (encoder->priv->pipeline), encoder->priv->sink);
+               gst_element_link (encoder->priv->output, encoder->priv->sink);
+
+               bus = gst_pipeline_get_bus (GST_PIPELINE (encoder->priv->pipeline));
+               encoder->priv->bus_watch_id = gst_bus_add_watch (bus, bus_watch_cb, encoder);
+               g_object_unref (bus);
+
+               state_change = gst_element_set_state (encoder->priv->pipeline, GST_STATE_PLAYING);
+               if (state_change != GST_STATE_CHANGE_FAILURE) {
+                       if (encoder->priv->total_length > 0) {
+                               _rb_encoder_emit_progress (RB_ENCODER (encoder), 0.0);
+                               encoder->priv->progress_id = g_timeout_add (250, 
(GSourceFunc)progress_timeout_cb, encoder);
+                       } else {
+                               _rb_encoder_emit_progress (RB_ENCODER (encoder), -1);
+                       }
+               }
+       }
+
+       g_object_unref (encoder);
+}
+
+static GOutputStream *
+stream_open (RBEncoderGst *encoder, GFile *file, GCancellable *cancellable, GError **error)
+{
+       GFileOutputStream *stream;
+
+       if (encoder->priv->overwrite) {
+               stream = g_file_replace (file, NULL, FALSE, G_FILE_CREATE_NONE, cancellable, error);
+       } else {
+               stream = g_file_create (file, G_FILE_CREATE_NONE, cancellable, error);
+       }
+
+       if (*error != NULL && g_error_matches (*error, G_IO_ERROR, G_IO_ERROR_EXISTS)) {
+               char *msg = g_strdup ((*error)->message);
+
+               g_clear_error (error);
+               g_set_error_literal (error,
+                                    RB_ENCODER_ERROR,
+                                    RB_ENCODER_ERROR_DEST_EXISTS,
+                                    msg);
+               g_free (msg);
+       }
+
+       return G_OUTPUT_STREAM (stream);
+}
+
+static void
+sink_open (GTask *task, gpointer source_object, gpointer task_data, GCancellable *cancellable)
+{
+       RBEncoderGst *encoder = RB_ENCODER_GST (source_object);
+       GError *error = NULL;
+
+       encoder->priv->sink = gst_element_factory_make ("giostreamsink", NULL);
+       if (encoder->priv->sink != NULL) {
+               GFile *file;
+
+               file = g_file_new_for_uri (encoder->priv->dest_uri);
+
+               encoder->priv->outstream = stream_open (encoder, file, cancellable, &error);
+               if (encoder->priv->outstream != NULL) {
+                       rb_debug ("opened output stream for %s", encoder->priv->dest_uri);
+               } else if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED)) {
+                       rb_debug ("using default sink for %s as gio can't do it", encoder->priv->dest_uri);
+                       g_clear_error (&error);
+                       g_clear_object (&encoder->priv->sink);
+               } else if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_INVALID_FILENAME)) {
+                       char *dest;
+
+                       g_clear_error (&error);
+
+                       dest = rb_sanitize_uri_for_filesystem (encoder->priv->dest_uri, "msdos");
+                       g_free (encoder->priv->dest_uri);
+                       encoder->priv->dest_uri = dest;
+                       rb_debug ("sanitized destination uri to %s", dest);
+
+                       file = g_file_new_for_uri (encoder->priv->dest_uri);
+                       encoder->priv->outstream = stream_open (encoder, file, cancellable, &error);
+               }
+       }
+
+       if (encoder->priv->sink == NULL) {
+               rb_debug ("unable to create giostreamsink, using default sink for %s", 
encoder->priv->dest_uri);
+               encoder->priv->sink = gst_element_make_from_uri (GST_URI_SINK, encoder->priv->dest_uri, 
"sink", NULL);
+               if (encoder->priv->sink == NULL) {
+                       g_set_error (&error, RB_ENCODER_ERROR, RB_ENCODER_ERROR_FILE_ACCESS,
+                                    _("Could not create a GStreamer sink element to write to %s"),
+                                    encoder->priv->dest_uri);
+                       g_task_return_error (task, error);
+               }
+               g_task_return_boolean (task, TRUE);
+       } else {
+               g_task_return_boolean (task, TRUE);
+       }
+       g_object_unref (task);
+}
+
+static void
 impl_encode (RBEncoder *bencoder,
             RhythmDBEntry *entry,
             const char *dest,
@@ -684,14 +676,16 @@ impl_encode (RBEncoder *bencoder,
 {
        RBEncoderGst *encoder = RB_ENCODER_GST (bencoder);
        GError *error = NULL;
-       char *freedest = NULL;
-       GstElement *end;
+       GTask *task;
 
        g_return_if_fail (encoder->priv->pipeline == NULL);
 
+       g_clear_object (&encoder->priv->profile);
        g_free (encoder->priv->dest_media_type);
        g_free (encoder->priv->dest_uri);
-       encoder->priv->dest_uri = NULL;
+       encoder->priv->dest_uri = g_strdup (dest);
+       encoder->priv->overwrite = overwrite;
+       encoder->priv->dest_size = 0;
 
        /* keep ourselves alive in case we get cancelled by a signal handler */
        g_object_ref (encoder);
@@ -701,46 +695,30 @@ impl_encode (RBEncoder *bencoder,
                encoder->priv->total_length = rhythmdb_entry_get_uint64 (entry, RHYTHMDB_PROP_FILE_SIZE);
                encoder->priv->position_format = GST_FORMAT_BYTES;
                encoder->priv->dest_media_type = rhythmdb_entry_dup_string (entry, RHYTHMDB_PROP_MEDIA_TYPE);
-
-               end = create_pipeline_and_source (encoder, entry, &error);
+               encoder->priv->output = create_pipeline_and_source (encoder, entry, &error);
        } else {
                gst_encoding_profile_ref (profile);
                encoder->priv->profile = profile;
                encoder->priv->total_length = rhythmdb_entry_get_ulong (entry, RHYTHMDB_PROP_DURATION);
                encoder->priv->position_format = GST_FORMAT_TIME;
                encoder->priv->dest_media_type = rb_gst_encoding_profile_get_media_type (profile);
+               encoder->priv->output = transcode_track (encoder, entry, &error);
 
-               end = transcode_track (encoder, entry, &error);
-       }
-
-       if (error == NULL) {
-               attach_output_pipeline (encoder, end, dest, overwrite, &error);
-               if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_INVALID_FILENAME) && freedest == NULL) {
-                       freedest = rb_sanitize_uri_for_filesystem (dest, "msdos");
-                       dest = freedest;
-
-                       g_clear_error (&error);
-                       attach_output_pipeline (encoder, end, dest, overwrite, &error);
-               }
-       }
-
-       if (error == NULL && profile != NULL) {
-               add_tags_from_entry (encoder, entry, &error);
+               add_tags_from_entry (encoder, entry);
        }
 
        if (error != NULL) {
                if (encoder->priv->cancelled == FALSE) {
                        set_error (encoder, error);
-                       g_idle_add ((GSourceFunc) cancel_idle, g_object_ref (encoder));
+                       g_idle_add ((GSourceFunc) cancel_idle, encoder);
                }
                g_error_free (error);
        } else {
-               encoder->priv->dest_uri = g_strdup (dest);
-               start_pipeline (encoder);
-       }
+               encoder->priv->open_cancel = g_cancellable_new ();
 
-       g_object_unref (encoder);
-       g_free (freedest);
+               task = g_task_new (encoder, encoder->priv->open_cancel, sink_open_cb, NULL);
+               g_task_run_in_thread (task, sink_open);
+       }
 }
 
 static gboolean


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]