[libgdata] core: Add support for g_output_stream_flush() to GDataUploadStream



commit 26b8b853f3c24f47077b277a95ca1f49e37d7584
Author: Philip Withnall <philip tecnocode co uk>
Date:   Fri Dec 17 23:55:59 2010 +0000

    core: Add support for g_output_stream_flush() to GDataUploadStream
    
    Now, calling g_output_stream_flush() on a GDataUploadStream will block until
    all outstanding bytes have been written to the network.

 gdata/gdata-upload-stream.c |   63 +++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 63 insertions(+), 0 deletions(-)
---
diff --git a/gdata/gdata-upload-stream.c b/gdata/gdata-upload-stream.c
index 913b2c6..7e261c3 100644
--- a/gdata/gdata-upload-stream.c
+++ b/gdata/gdata-upload-stream.c
@@ -74,6 +74,7 @@ static void gdata_upload_stream_get_property (GObject *object, guint property_id
 static void gdata_upload_stream_set_property (GObject *object, guint property_id, const GValue *value, GParamSpec *pspec);
 
 static gssize gdata_upload_stream_write (GOutputStream *stream, const void *buffer, gsize count, GCancellable *cancellable, GError **error);
+static gboolean gdata_upload_stream_flush (GOutputStream *stream, GCancellable *cancellable, GError **error);
 static gboolean gdata_upload_stream_close (GOutputStream *stream, GCancellable *cancellable, GError **error);
 
 static void create_network_thread (GDataUploadStream *self, GError **error);
@@ -133,6 +134,7 @@ gdata_upload_stream_class_init (GDataUploadStreamClass *klass)
 	/* We use the default implementations of the async functions, which just run
 	 * our implementation of the sync function in a thread. */
 	stream_class->write_fn = gdata_upload_stream_write;
+	stream_class->flush = gdata_upload_stream_flush;
 	stream_class->close_fn = gdata_upload_stream_close;
 
 	/**
@@ -542,6 +544,67 @@ done:
 }
 
 static void
+flush_cancelled_cb (GCancellable *cancellable, CancelledData *data)
+{
+	GDataUploadStreamPrivate *priv = data->upload_stream->priv;
+
+	/* Signal the gdata_upload_stream_flush() function that it should stop blocking and cancel */
+	g_static_mutex_lock (&(priv->write_mutex));
+	*(data->cancelled) = TRUE;
+	g_cond_signal (priv->write_cond);
+	g_static_mutex_unlock (&(priv->write_mutex));
+}
+
+/* Block until ->network_bytes_outstanding reaches zero. Cancelling the cancellable passed to gdata_upload_stream_flush() breaks out of the wait(),
+ * but doesn't stop the network thread from continuing to write the remaining bytes to the network.
+ * The wrapper function, g_output_stream_flush(), calls g_output_stream_set_pending() before calling this function, and calls
+ * g_output_stream_clear_pending() afterwards, so we don't need to worry about other operations happening concurrently. We also don't need to worry
+ * about being called after the stream has been closed (though the network activity could finish before or during this function). */
+static gboolean
+gdata_upload_stream_flush (GOutputStream *stream, GCancellable *cancellable, GError **error)
+{
+	GDataUploadStreamPrivate *priv = GDATA_UPLOAD_STREAM (stream)->priv;
+	gulong cancelled_signal = 0, global_cancelled_signal = 0;
+	gboolean cancelled = FALSE;
+	gboolean success = TRUE;
+	CancelledData data;
+
+	/* Listen for cancellation events */
+	data.upload_stream = GDATA_UPLOAD_STREAM (stream);
+	data.cancelled = &cancelled;
+
+	global_cancelled_signal = g_cancellable_connect (priv->cancellable, (GCallback) flush_cancelled_cb, &data, NULL);
+
+	if (cancellable != NULL)
+		cancelled_signal = g_cancellable_connect (cancellable, (GCallback) flush_cancelled_cb, &data, NULL);
+
+	/* Start the flush operation proper */
+	g_static_mutex_lock (&(priv->write_mutex));
+
+	/* Wait for all outstanding bytes to be written to the network */
+	while (priv->network_bytes_outstanding > 0 && cancelled == FALSE)
+		g_cond_wait (priv->write_cond, g_static_mutex_get_mutex (&(priv->write_mutex)));
+
+	/* Check for an error and return if necessary */
+	if (cancelled == TRUE) {
+		g_assert (g_cancellable_set_error_if_cancelled (cancellable, error) == TRUE ||
+		          g_cancellable_set_error_if_cancelled (priv->cancellable, error) == TRUE);
+		success = FALSE;
+	}
+
+	g_static_mutex_unlock (&(priv->write_mutex));
+
+	/* Disconnect from the cancelled signals. Note that we have to do this without @write_mutex held, as g_cancellable_disconnect() blocks
+	 * until any outstanding cancellation callbacks return, and they will block on @write_mutex. */
+	if (cancelled_signal != 0)
+		g_cancellable_disconnect (cancellable, cancelled_signal);
+	if (global_cancelled_signal != 0)
+		g_cancellable_disconnect (priv->cancellable, global_cancelled_signal);
+
+	return success;
+}
+
+static void
 close_cancelled_cb (GCancellable *cancellable, CancelledData *data)
 {
 	GDataUploadStreamPrivate *priv = data->upload_stream->priv;



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]