[libgdata] core: Fix synchronisation between threads in GDataUploadStream



commit 8962b93e81f3095cd3c78f0bf2af81c2360311cf
Author: Philip Withnall <philip tecnocode co uk>
Date:   Thu Dec 16 23:37:54 2010 +0000

    core: Fix synchronisation between threads in GDataUploadStream
    
    This upgrades GDataUploadStream to keep track of the number of bytes in
    various buffers, so that it can return more accurate numbers of bytes written
    in each gdata_upload_stream_write() operation, as well as support cancellation
    of a write without cancelling the entire upload.
    
    Helps: bgo#637036

 gdata/gdata-upload-stream.c |   70 ++++++++++++++++++++++++++++++++----------
 1 files changed, 53 insertions(+), 17 deletions(-)
---
diff --git a/gdata/gdata-upload-stream.c b/gdata/gdata-upload-stream.c
index 15de40b..199a2d3 100644
--- a/gdata/gdata-upload-stream.c
+++ b/gdata/gdata-upload-stream.c
@@ -92,7 +92,9 @@ struct _GDataUploadStreamPrivate {
 	GThread *network_thread;
 
 	GStaticMutex write_mutex; /* mutex for write operations (specifically, write_finished) */
-	gboolean write_finished; /* set when the network thread has finished writing a chunk (before it signals write_cond) */
+	gsize message_bytes_outstanding; /* the number of bytes which have been written to the buffer but not libsoup (signalled by write_cond) */
+	gsize network_bytes_outstanding; /* the number of bytes which have been written to libsoup but not the network (signalled by write_cond) */
+	gsize network_bytes_written; /* the number of bytes which have been written to the network (signalled by write_cond) */
 	GCond *write_cond; /* signalled when a chunk has been written (protected by write_mutex) */
 
 	guint response_status; /* set once we finish receiving the response (0 otherwise) (protected by response_mutex) */
@@ -381,13 +383,9 @@ write_cancelled_cb (GCancellable *cancellable, CancelledData *data)
 {
 	GDataUploadStreamPrivate *priv = data->upload_stream->priv;
 
-	/* Tell libsoup to cancel the upload */
-	soup_session_cancel_message (priv->session, priv->message, SOUP_STATUS_CANCELLED);
-
-	/* Set the error and signal that the write operation has finished */
+	/* Signal the gdata_upload_stream_write() function that it should stop blocking and cancel */
 	g_static_mutex_lock (&(priv->write_mutex));
 	*(data->cancelled) = TRUE;
-	priv->write_finished = TRUE;
 	g_cond_signal (priv->write_cond);
 	g_static_mutex_unlock (&(priv->write_mutex));
 }
@@ -396,9 +394,10 @@ static gssize
 gdata_upload_stream_write (GOutputStream *stream, const void *buffer, gsize count, GCancellable *cancellable, GError **error)
 {
 	GDataUploadStreamPrivate *priv = GDATA_UPLOAD_STREAM (stream)->priv;
-	gsize length_written = count;
+	gssize length_written = -1;
 	gulong cancelled_signal = 0;
 	gboolean cancelled = FALSE;
+	gsize old_network_bytes_written;
 
 	/* Listen for cancellation events */
 	if (cancellable != NULL) {
@@ -423,8 +422,10 @@ gdata_upload_stream_write (GOutputStream *stream, const void *buffer, gsize coun
 		return -1;
 	}
 
-	/* Set write_finished so we know if the write operation has finished before we reach write_cond */
-	priv->write_finished = FALSE;
+	/* Increment the number of bytes outstanding for the new write, and keep a record of the old number written so we know if the write's
+	 * finished before we reach write_cond. */
+	old_network_bytes_written = priv->network_bytes_written;
+	priv->message_bytes_outstanding += count;
 
 	g_static_mutex_unlock (&(priv->write_mutex));
 
@@ -451,10 +452,14 @@ gdata_upload_stream_write (GOutputStream *stream, const void *buffer, gsize coun
 		soup_message_body_append (priv->message->request_body, SOUP_MEMORY_STATIC, first_part_header, strlen (first_part_header));
 		soup_message_body_append (priv->message->request_body, SOUP_MEMORY_TAKE, entry_xml, strlen (entry_xml));
 		soup_message_body_append (priv->message->request_body, SOUP_MEMORY_TAKE, second_part_header, strlen (second_part_header));
+
+		g_static_mutex_lock (&(priv->write_mutex));
+		priv->network_bytes_outstanding += priv->message->request_body->length;
+		g_static_mutex_unlock (&(priv->write_mutex));
 	}
 
-	/* Also write out the first chunk of data, so there's guaranteed to be something in the request body */
-	soup_message_body_append (priv->message->request_body, SOUP_MEMORY_COPY, buffer, count);
+	/* Also write out the first chunk of data, so there's guaranteed to be something in the buffer */
+	gdata_buffer_push_data (priv->buffer, buffer, count);
 
 	/* Create the thread and let the writing commence! */
 	create_network_thread (GDATA_UPLOAD_STREAM (stream), error);
@@ -467,8 +472,9 @@ gdata_upload_stream_write (GOutputStream *stream, const void *buffer, gsize coun
 write:
 	/* Wait for it to be written */
 	g_static_mutex_lock (&(priv->write_mutex));
-	if (priv->write_finished == FALSE && cancelled == FALSE)
+	while (priv->network_bytes_written - old_network_bytes_written < count && cancelled == FALSE)
 		g_cond_wait (priv->write_cond, g_static_mutex_get_mutex (&(priv->write_mutex)));
+	length_written = MIN (count, priv->network_bytes_written - old_network_bytes_written);
 	g_static_mutex_unlock (&(priv->write_mutex));
 
 	g_static_mutex_lock (&(priv->response_mutex));
@@ -478,7 +484,7 @@ write:
 		g_propagate_error (error, priv->response_error);
 		priv->response_error = NULL;
 		length_written = -1;
-	} else if (cancelled == TRUE) {
+	} else if (cancelled == TRUE && length_written < 1) {
 		g_assert (cancellable != NULL && g_cancellable_set_error_if_cancelled (cancellable, error) == TRUE);
 		length_written = -1;
 	}
@@ -490,6 +496,8 @@ write:
 	if (cancelled_signal != 0)
 		g_cancellable_disconnect (cancellable, cancelled_signal);
 
+	g_assert (cancelled == TRUE || length_written > 0);
+
 	return length_written;
 }
 
@@ -576,22 +584,48 @@ write_next_chunk (GDataUploadStream *self, SoupMessage *message)
 	gboolean reached_eof = FALSE;
 	guint8 next_buffer[CHUNK_SIZE];
 
+	g_static_mutex_lock (&(priv->write_mutex));
+
+	/* If there are still bytes in libsoup's buffer, don't block on getting new bytes into the stream */
+	if (priv->network_bytes_outstanding > 0) {
+		g_static_mutex_unlock (&(priv->write_mutex));
+		return;
+	}
+
+	g_static_mutex_unlock (&(priv->write_mutex));
+
 	/* Append the next chunk to the message body so it can join in the fun.
 	 * Note that this call isn't blocking, and can return less than the CHUNK_SIZE. This is because
 	 * we could deadlock if we block on getting CHUNK_SIZE bytes at the end of the stream. write() could
 	 * easily be called with fewer bytes, but has no way to notify us that we've reached the end of the
 	 * stream, so we'd happily block on receiving more bytes which weren't forthcoming. */
 	length = gdata_buffer_pop_data_limited (priv->buffer, next_buffer, CHUNK_SIZE, &reached_eof);
+
+	g_static_mutex_lock (&(priv->write_mutex));
+	priv->message_bytes_outstanding -= length;
+	priv->network_bytes_outstanding += length;
+	g_static_mutex_unlock (&(priv->write_mutex));
+
 	if (reached_eof == TRUE) {
 		/* We've reached the end of the stream, so append the footer (if appropriate) and stop */
 		g_static_mutex_lock (&(priv->response_mutex));
 		if (priv->entry != NULL && priv->response_error == NULL) {
 			const gchar *footer = "\n--" BOUNDARY_STRING "--";
-			soup_message_body_append (priv->message->request_body, SOUP_MEMORY_STATIC, footer, strlen (footer));
+			gsize footer_length = strlen (footer);
+
+			soup_message_body_append (priv->message->request_body, SOUP_MEMORY_STATIC, footer, footer_length);
+
+			g_static_mutex_lock (&(priv->write_mutex));
+			priv->network_bytes_outstanding += footer_length;
+			g_static_mutex_unlock (&(priv->write_mutex));
 		}
 		g_static_mutex_unlock (&(priv->response_mutex));
 
 		soup_message_body_complete (priv->message->request_body);
+
+		g_static_mutex_lock (&(priv->write_mutex));
+		g_assert (priv->message_bytes_outstanding == 0);
+		g_static_mutex_unlock (&(priv->write_mutex));
 	} else {
 		soup_message_body_append (priv->message->request_body, SOUP_MEMORY_COPY, next_buffer, length);
 	}
@@ -620,7 +654,9 @@ wrote_body_data_cb (SoupMessage *message, SoupBuffer *buffer, GDataUploadStream
 
 	/* Signal the main thread that the chunk has been written */
 	g_static_mutex_lock (&(priv->write_mutex));
-	priv->write_finished = TRUE;
+	g_assert (priv->network_bytes_outstanding > 0);
+	priv->network_bytes_outstanding -= buffer->length;
+	priv->network_bytes_written += buffer->length;
 	g_cond_signal (priv->write_cond);
 	g_static_mutex_unlock (&(priv->write_mutex));
 
@@ -643,8 +679,8 @@ upload_thread (GDataUploadStream *self)
 	/* Signal write_cond, just in case we errored out and finished sending in the middle of a write */
 	g_static_mutex_lock (&(priv->response_mutex));
 	g_static_mutex_lock (&(priv->write_mutex));
-	priv->write_finished = TRUE;
-	g_cond_signal (priv->write_cond);
+	if (priv->message_bytes_outstanding > 0 || priv->network_bytes_outstanding > 0)
+		g_cond_signal (priv->write_cond);
 	g_static_mutex_unlock (&(priv->write_mutex));
 
 	/* Deal with the response if it was unsuccessful */



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]