[libgdata] core: Extend GDataUploadStream with support for resumable uploads



commit f91c994f0f6f5f96ea7765e75ecc1ce393f9b61b
Author: Philip Withnall <philip tecnocode co uk>
Date:   Tue Dec 20 20:40:16 2011 +0000

    core: Extend GDataUploadStream with support for resumable uploads
    
    This adds the following API:
     â gdata_upload_stream_new_resumable()
     â GDataUploadStream:content-length, gdata_upload_stream_get_content_length()
    
    Helps: https://bugzilla.gnome.org/show_bug.cgi?id=607272

 docs/reference/gdata-sections.txt |    4 +
 gdata/gdata-upload-stream.c       |  515 ++++++++++++++++++++++++++++++-------
 gdata/gdata-upload-stream.h       |   30 +++
 gdata/gdata.symbols               |    2 +
 gdata/tests/common.c              |   19 +-
 gdata/tests/common.h              |    1 +
 gdata/tests/streams.c             |  443 +++++++++++++++++++++++++++++++
 7 files changed, 920 insertions(+), 94 deletions(-)
---
diff --git a/docs/reference/gdata-sections.txt b/docs/reference/gdata-sections.txt
index d086729..d671218 100644
--- a/docs/reference/gdata-sections.txt
+++ b/docs/reference/gdata-sections.txt
@@ -1802,9 +1802,12 @@ GDataDownloadStreamPrivate
 <SECTION>
 <FILE>gdata-upload-stream</FILE>
 <TITLE>GDataUploadStream</TITLE>
+GDATA_LINK_RESUMABLE_CREATE_MEDIA
+GDATA_LINK_RESUMABLE_EDIT_MEDIA
 GDataUploadStream
 GDataUploadStreamClass
 gdata_upload_stream_new
+gdata_upload_stream_new_resumable
 gdata_upload_stream_get_response
 gdata_upload_stream_get_service
 gdata_upload_stream_get_authorization_domain
@@ -1814,6 +1817,7 @@ gdata_upload_stream_get_upload_uri
 gdata_upload_stream_get_entry
 gdata_upload_stream_get_slug
 gdata_upload_stream_get_content_type
+gdata_upload_stream_get_content_length
 <SUBSECTION Standard>
 gdata_upload_stream_get_type
 GDATA_UPLOAD_STREAM
diff --git a/gdata/gdata-upload-stream.c b/gdata/gdata-upload-stream.c
index f4cbbd6..04fe783 100644
--- a/gdata/gdata-upload-stream.c
+++ b/gdata/gdata-upload-stream.c
@@ -78,7 +78,8 @@
  *
  *	/<!-- -->* Get the file to upload *<!-- -->/
  *	file = get_file_to_upload ();
- *	file_info = g_file_query_info (file, G_FILE_ATTRIBUTE_STANDARD_DISPLAY_NAME "," G_FILE_ATTRIBUTE_STANDARD_CONTENT_TYPE,
+ *	file_info = g_file_query_info (file, G_FILE_ATTRIBUTE_STANDARD_DISPLAY_NAME "," G_FILE_ATTRIBUTE_STANDARD_CONTENT_TYPE ","
+ *	                               G_FILE_ATTRIBUTE_STANDARD_SIZE,
  *	                               G_FILE_QUERY_INFO_NONE, NULL, &error);
  *
  *	if (file_info == NULL) {
@@ -102,8 +103,9 @@
  *	service = create_my_service ();
  *	domain = get_my_authorization_domain_from_service (service);
  *	cancellable = g_cancellable_new (); /<!-- -->* cancel this to cancel the entire upload operation *<!-- -->/
- *	upload_stream = gdata_upload_stream_new (service, domain, SOUP_METHOD_POST, upload_uri, NULL, g_file_info_get_display_name (file_info),
- *	                                         g_file_info_get_content_type (file_info), cancellable);
+ *	upload_stream = gdata_upload_stream_new_resumable (service, domain, SOUP_METHOD_POST, upload_uri, NULL,
+ *	                                                   g_file_info_get_display_name (file_info), g_file_info_get_content_type (file_info),
+ *	                                                   g_file_info_get_size (file_info), cancellable);
  *	g_object_unref (file_info);
  *
  *	/<!-- -->* Perform the upload asynchronously *<!-- -->/
@@ -174,6 +176,7 @@
 #include "gdata-private.h"
 
 #define BOUNDARY_STRING "0003Z5W789deadbeefRTE456KlemsnoZV"
+#define MAX_RESUMABLE_CHUNK_SIZE (512 * 1024) /* bytes = 512 KiB */
 
 static GObject *gdata_upload_stream_constructor (GType type, guint n_construct_params, GObjectConstructParam *construct_params);
 static void gdata_upload_stream_dispose (GObject *object);
@@ -187,6 +190,12 @@ static gboolean gdata_upload_stream_close (GOutputStream *stream, GCancellable *
 
 static void create_network_thread (GDataUploadStream *self, GError **error);
 
+typedef enum {
+	STATE_INITIAL_REQUEST, /* initial POST request to the resumable-create-media link (unused for non-resumable uploads) */
+	STATE_DATA_REQUESTS, /* one or more subsequent PUT requests (only state used for non-resumable uploads) */
+	STATE_FINISHED, /* finished successfully or in error */
+} UploadState;
+
 struct _GDataUploadStreamPrivate {
 	gchar *method;
 	gchar *upload_uri;
@@ -195,6 +204,7 @@ struct _GDataUploadStreamPrivate {
 	GDataEntry *entry;
 	gchar *slug;
 	gchar *content_type;
+	goffset content_length; /* -1 for non-resumable uploads; 0 or greater for resumable ones */
 	SoupSession *session;
 	SoupMessage *message;
 	GDataBuffer *buffer;
@@ -202,15 +212,20 @@ struct _GDataUploadStreamPrivate {
 	GCancellable *cancellable;
 	GThread *network_thread;
 
+	UploadState state; /* protected by write_mutex */
 	GMutex write_mutex; /* mutex for write operations (specifically, write_finished) */
+	/* This persists across all resumable upload chunks. Note that it doesn't count bytes from the entry XML. */
+	gsize total_network_bytes_written; /* the number of bytes which have been written to the network in STATE_DATA_REQUESTS */
+
+	/* All of the following apply only to the current resumable upload chunk. */
 	gsize message_bytes_outstanding; /* the number of bytes which have been written to the buffer but not libsoup (signalled by write_cond) */
 	gsize network_bytes_outstanding; /* the number of bytes which have been written to libsoup but not the network (signalled by write_cond) */
 	gsize network_bytes_written; /* the number of bytes which have been written to the network (signalled by write_cond) */
+	gsize chunk_size; /* the size of the current chunk (in bytes); 0 iff content_length <= 0; must be <= MAX_RESUMABLE_CHUNK_SIZE */
 	GCond write_cond; /* signalled when a chunk has been written (protected by write_mutex) */
 
-	gboolean finished; /* set once the upload thread has finished (protected by response_mutex) */
-	guint response_status; /* set once we finish receiving the response (SOUP_STATUS_NONE otherwise) (protected by response_mutex) */
 	GCond finished_cond; /* signalled when sending the message (and receiving the response) is finished (protected by response_mutex) */
+	guint response_status; /* set once we finish receiving the response (SOUP_STATUS_NONE otherwise) (protected by response_mutex) */
 	GError *response_error; /* error asynchronously set by the network thread, and picked up by the main thread when appropriate */
 	GMutex response_mutex; /* mutex for ->response_error, ->response_status and ->finished_cond */
 };
@@ -224,6 +239,7 @@ enum {
 	PROP_METHOD,
 	PROP_CANCELLABLE,
 	PROP_AUTHORIZATION_DOMAIN,
+	PROP_CONTENT_LENGTH,
 };
 
 G_DEFINE_TYPE (GDataUploadStream, gdata_upload_stream, G_TYPE_OUTPUT_STREAM)
@@ -328,6 +344,22 @@ gdata_upload_stream_class_init (GDataUploadStreamClass *klass)
 	                                                      G_PARAM_CONSTRUCT_ONLY | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
 
 	/**
+	 * GDataUploadStream:content-length:
+	 *
+	 * The content length (in bytes) of the file being uploaded (i.e. as returned by g_file_info_get_size()). Note that this does not include the
+	 * length of the XML serialisation of #GDataUploadStream:entry, if set.
+	 *
+	 * If this is <code class="literal">-1</code> the upload will be non-resumable; if it is non-negative, the upload will be resumable.
+	 *
+	 * Since: 0.11.2
+	 */
+	g_object_class_install_property (gobject_class, PROP_CONTENT_LENGTH,
+	                                 g_param_spec_int64 ("content-length",
+	                                                     "Content length", "The content length (in bytes) of the file being uploaded.",
+	                                                     -1, G_MAXINT64, -1,
+	                                                     G_PARAM_CONSTRUCT_ONLY | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+	/**
 	 * GDataUploadStream:content-type:
 	 *
 	 * The content type of the file being uploaded (i.e. as returned by g_file_info_get_content_type()).
@@ -373,11 +405,34 @@ gdata_upload_stream_init (GDataUploadStream *self)
 	g_mutex_init (&(self->priv->response_mutex));
 }
 
+static SoupMessage *
+build_message (GDataUploadStream *self, const gchar *method, const gchar *upload_uri)
+{
+	GDataUploadStreamPrivate *priv;
+	GDataServiceClass *klass;
+	SoupMessage *new_message;
+
+	priv = self->priv;
+
+	/* Build the message */
+	new_message = soup_message_new (method, upload_uri);
+
+	/* Make sure the headers are set */
+	klass = GDATA_SERVICE_GET_CLASS (priv->service);
+	if (klass->append_query_headers != NULL) {
+		klass->append_query_headers (priv->service, priv->authorization_domain, new_message);
+	}
+
+	/* We don't want to accumulate chunks */
+	soup_message_body_set_accumulate (new_message->request_body, FALSE);
+
+	return new_message;
+}
+
 static GObject *
 gdata_upload_stream_constructor (GType type, guint n_construct_params, GObjectConstructParam *construct_params)
 {
 	GDataUploadStreamPrivate *priv;
-	GDataServiceClass *klass;
 	GObject *object;
 
 	/* Chain up to the parent class */
@@ -389,28 +444,70 @@ gdata_upload_stream_constructor (GType type, guint n_construct_params, GObjectCo
 		priv->cancellable = g_cancellable_new ();
 
 	/* Build the message */
-	priv->message = soup_message_new (priv->method, priv->upload_uri);
-
-	/* Make sure the headers are set */
-	klass = GDATA_SERVICE_GET_CLASS (priv->service);
-	if (klass->append_query_headers != NULL) {
-		klass->append_query_headers (priv->service, priv->authorization_domain, priv->message);
-	}
+	priv->message = build_message (GDATA_UPLOAD_STREAM (object), priv->method, priv->upload_uri);
 
 	if (priv->slug != NULL)
 		soup_message_headers_append (priv->message->request_headers, "Slug", priv->slug);
 
-	/* We don't want to accumulate chunks */
-	soup_message_body_set_accumulate (priv->message->request_body, FALSE);
-	soup_message_headers_set_encoding (priv->message->request_headers, SOUP_ENCODING_CHUNKED);
+	if (priv->content_length == -1) {
+		/* Non-resumable upload */
+		soup_message_headers_set_encoding (priv->message->request_headers, SOUP_ENCODING_CHUNKED);
+
+		/* The Content-Type should be multipart/related if we're also uploading the metadata (entry != NULL),
+		 * and the given content_type otherwise. */
+		if (priv->entry != NULL) {
+			const gchar *first_part_header;
+			gchar *entry_xml, *second_part_header;
+
+			soup_message_headers_set_content_type (priv->message->request_headers, "multipart/related; boundary=" BOUNDARY_STRING, NULL);
+
+			/* Start by writing out the entry; then the thread has something to write to the network when it's created */
+			first_part_header = "--" BOUNDARY_STRING "\nContent-Type: application/atom+xml; charset=UTF-8\n\n";
+			entry_xml = gdata_parsable_get_xml (GDATA_PARSABLE (priv->entry));
+			second_part_header = g_strdup_printf ("\n--" BOUNDARY_STRING "\nContent-Type: %s\nContent-Transfer-Encoding: binary\n\n",
+			                                      priv->content_type);
+
+			/* Push the message parts onto the message body; we can skip the buffer, since the network thread hasn't yet been created,
+			 * so we're the sole thread accessing the SoupMessage. */
+			soup_message_body_append (priv->message->request_body, SOUP_MEMORY_STATIC, first_part_header, strlen (first_part_header));
+			soup_message_body_append (priv->message->request_body, SOUP_MEMORY_TAKE, entry_xml, strlen (entry_xml));
+			soup_message_body_append (priv->message->request_body, SOUP_MEMORY_TAKE, second_part_header, strlen (second_part_header));
+
+			priv->network_bytes_outstanding = priv->message->request_body->length;
+		} else {
+			soup_message_headers_set_content_type (priv->message->request_headers, priv->content_type, NULL);
+		}
 
-	/* The Content-Type should be multipart/related if we're also uploading the metadata (entry != NULL),
-	 * and the given content_type otherwise.
-	 * Note that the Content-Length header is set when we first start writing to the network. */
-	if (priv->entry != NULL)
-		soup_message_headers_set_content_type (priv->message->request_headers, "multipart/related; boundary=" BOUNDARY_STRING, NULL);
-	else
-		soup_message_headers_set_content_type (priv->message->request_headers, priv->content_type, NULL);
+		/* Non-resumable uploads start with the data requests immediately. */
+		priv->state = STATE_DATA_REQUESTS;
+	} else {
+		gchar *content_length_str;
+
+		/* Resumable upload's initial request */
+		soup_message_headers_set_encoding (priv->message->request_headers, SOUP_ENCODING_CONTENT_LENGTH);
+		soup_message_headers_replace (priv->message->request_headers, "X-Upload-Content-Type", priv->content_type);
+
+		content_length_str = g_strdup_printf ("%" G_GOFFSET_FORMAT, priv->content_length);
+		soup_message_headers_replace (priv->message->request_headers, "X-Upload-Content-Length", content_length_str);
+		g_free (content_length_str);
+
+		if (priv->entry != NULL) {
+			const gchar *entry_xml;
+
+			soup_message_headers_set_content_type (priv->message->request_headers, "application/atom+xml; charset=UTF-8", NULL);
+
+			entry_xml = gdata_parsable_get_xml (GDATA_PARSABLE (priv->entry));
+			soup_message_body_append (priv->message->request_body, SOUP_MEMORY_TAKE, entry_xml, strlen (entry_xml));
+
+			priv->network_bytes_outstanding = priv->message->request_body->length;
+		} else {
+			soup_message_headers_set_content_length (priv->message->request_headers, 0);
+		}
+
+		/* Resumable uploads always start with an initial request, which either contains the XML or is empty. */
+		priv->state = STATE_INITIAL_REQUEST;
+		priv->chunk_size = MIN (priv->content_length, MAX_RESUMABLE_CHUNK_SIZE);
+	}
 
 	/* If the entry exists and has an ETag, we assume we're updating the entry, so we can set the If-Match header */
 	if (priv->entry != NULL && gdata_entry_get_etag (priv->entry) != NULL)
@@ -501,6 +598,9 @@ gdata_upload_stream_get_property (GObject *object, guint property_id, GValue *va
 		case PROP_CONTENT_TYPE:
 			g_value_set_string (value, priv->content_type);
 			break;
+		case PROP_CONTENT_LENGTH:
+			g_value_set_int64 (value, priv->content_length);
+			break;
 		case PROP_CANCELLABLE:
 			g_value_set_object (value, priv->cancellable);
 			break;
@@ -539,6 +639,9 @@ gdata_upload_stream_set_property (GObject *object, guint property_id, const GVal
 		case PROP_CONTENT_TYPE:
 			priv->content_type = g_value_dup_string (value);
 			break;
+		case PROP_CONTENT_LENGTH:
+			priv->content_length = g_value_get_int64 (value);
+			break;
 		case PROP_CANCELLABLE:
 			/* Construction only */
 			priv->cancellable = g_value_dup_object (value);
@@ -568,14 +671,15 @@ write_cancelled_cb (GCancellable *cancellable, CancelledData *data)
 }
 
 static gssize
-gdata_upload_stream_write (GOutputStream *stream, const void *buffer, gsize count, GCancellable *cancellable, GError **error)
+gdata_upload_stream_write (GOutputStream *stream, const void *buffer, gsize count, GCancellable *cancellable, GError **error_out)
 {
 	GDataUploadStreamPrivate *priv = GDATA_UPLOAD_STREAM (stream)->priv;
 	gssize length_written = -1;
 	gulong cancelled_signal = 0, global_cancelled_signal = 0;
 	gboolean cancelled = FALSE; /* must only be touched with ->write_mutex held */
-	gsize old_network_bytes_written;
+	gsize old_total_network_bytes_written;
 	CancelledData data;
+	GError *error = NULL;
 
 	/* Listen for cancellation events */
 	data.upload_stream = GDATA_UPLOAD_STREAM (stream);
@@ -590,21 +694,21 @@ gdata_upload_stream_write (GOutputStream *stream, const void *buffer, gsize coun
 	g_mutex_lock (&(priv->write_mutex));
 
 	if (cancelled == TRUE) {
-		g_assert (g_cancellable_set_error_if_cancelled (cancellable, error) == TRUE ||
-		          g_cancellable_set_error_if_cancelled (priv->cancellable, error) == TRUE);
+		g_assert (g_cancellable_set_error_if_cancelled (cancellable, &error) == TRUE ||
+		          g_cancellable_set_error_if_cancelled (priv->cancellable, &error) == TRUE);
 		g_mutex_unlock (&(priv->write_mutex));
 
 		length_written = -1;
 		goto done;
 	}
 
+	g_mutex_unlock (&(priv->write_mutex));
+
 	/* Increment the number of bytes outstanding for the new write, and keep a record of the old number written so we know if the write's
 	 * finished before we reach write_cond. */
-	old_network_bytes_written = priv->network_bytes_written;
+	old_total_network_bytes_written = priv->total_network_bytes_written;
 	priv->message_bytes_outstanding += count;
 
-	g_mutex_unlock (&(priv->write_mutex));
-
 	/* Handle the more common case of the network thread already having been created first */
 	if (priv->network_thread != NULL) {
 		/* Push the new data into the buffer */
@@ -612,33 +716,11 @@ gdata_upload_stream_write (GOutputStream *stream, const void *buffer, gsize coun
 		goto write;
 	}
 
-	/* We're lazy about starting the network operation so we don't time out before we've even started */
-	if (priv->entry != NULL) {
-		/* Start by writing out the entry; then the thread has something to write to the network when it's created */
-		const gchar *first_part_header;
-		gchar *entry_xml, *second_part_header;
-
-		first_part_header = "--" BOUNDARY_STRING "\nContent-Type: application/atom+xml; charset=UTF-8\n\n";
-		entry_xml = gdata_parsable_get_xml (GDATA_PARSABLE (priv->entry));
-		second_part_header = g_strdup_printf ("\n--" BOUNDARY_STRING "\nContent-Type: %s\nContent-Transfer-Encoding: binary\n\n",
-		                                      priv->content_type);
-
-		/* Push the message parts onto the message body; we can skip the buffer, since the network thread hasn't yet been created,
-		 * so we're the sole thread accessing the SoupMessage. */
-		soup_message_body_append (priv->message->request_body, SOUP_MEMORY_STATIC, first_part_header, strlen (first_part_header));
-		soup_message_body_append (priv->message->request_body, SOUP_MEMORY_TAKE, entry_xml, strlen (entry_xml));
-		soup_message_body_append (priv->message->request_body, SOUP_MEMORY_TAKE, second_part_header, strlen (second_part_header));
-
-		g_mutex_lock (&(priv->write_mutex));
-		priv->network_bytes_outstanding += priv->message->request_body->length;
-		g_mutex_unlock (&(priv->write_mutex));
-	}
-
-	/* Also write out the first chunk of data, so there's guaranteed to be something in the buffer */
+	/* Write out the first chunk of data, so there's guaranteed to be something in the buffer */
 	gdata_buffer_push_data (priv->buffer, buffer, count);
 
 	/* Create the thread and let the writing commence! */
-	create_network_thread (GDATA_UPLOAD_STREAM (stream), error);
+	create_network_thread (GDATA_UPLOAD_STREAM (stream), &error);
 	if (priv->network_thread == NULL) {
 		length_written = -1;
 		goto done;
@@ -648,15 +730,20 @@ write:
 	g_mutex_lock (&(priv->write_mutex));
 
 	/* Wait for it to be written */
-	while (priv->network_bytes_written - old_network_bytes_written < count && cancelled == FALSE) {
+	while (priv->total_network_bytes_written - old_total_network_bytes_written < count && cancelled == FALSE && priv->state != STATE_FINISHED) {
 		g_cond_wait (&(priv->write_cond), &(priv->write_mutex));
 	}
-	length_written = MIN (count, priv->network_bytes_written - old_network_bytes_written);
+	length_written = MIN (count, priv->total_network_bytes_written - old_total_network_bytes_written);
 
 	/* Check for an error and return if necessary */
 	if (cancelled == TRUE && length_written < 1) {
-		g_assert (g_cancellable_set_error_if_cancelled (cancellable, error) == TRUE ||
-		          g_cancellable_set_error_if_cancelled (priv->cancellable, error) == TRUE);
+		/* Cancellation. */
+		g_assert (g_cancellable_set_error_if_cancelled (cancellable, &error) == TRUE ||
+		          g_cancellable_set_error_if_cancelled (priv->cancellable, &error) == TRUE);
+		length_written = -1;
+	} else if (priv->state == STATE_FINISHED && (length_written < 0 || (gsize) length_written < count)) {
+		/* Resumable upload error. */
+		g_set_error (&error, G_IO_ERROR, G_IO_ERROR_FAILED, _("Error received from server after uploading a resumable upload chunk."));
 		length_written = -1;
 	}
 
@@ -670,7 +757,11 @@ done:
 	if (global_cancelled_signal != 0)
 		g_cancellable_disconnect (priv->cancellable, global_cancelled_signal);
 
-	g_assert (cancelled == TRUE || length_written > 0);
+	g_assert (error != NULL || length_written > 0);
+
+	if (error != NULL) {
+		g_propagate_error (error_out, error);
+	}
 
 	return length_written;
 }
@@ -710,11 +801,20 @@ gdata_upload_stream_flush (GOutputStream *stream, GCancellable *cancellable, GEr
 	if (cancellable != NULL)
 		cancelled_signal = g_cancellable_connect (cancellable, (GCallback) flush_cancelled_cb, &data, NULL);
 
+	/* Create the thread if it hasn't been created already. This can happen if flush() is called immediately after creating the stream. */
+	if (priv->network_thread == NULL) {
+		create_network_thread (GDATA_UPLOAD_STREAM (stream), error);
+		if (priv->network_thread == NULL) {
+			success = FALSE;
+			goto done;
+		}
+	}
+
 	/* Start the flush operation proper */
 	g_mutex_lock (&(priv->write_mutex));
 
 	/* Wait for all outstanding bytes to be written to the network */
-	while (priv->network_bytes_outstanding > 0 && cancelled == FALSE) {
+	while (priv->network_bytes_outstanding > 0 && cancelled == FALSE && priv->state != STATE_FINISHED) {
 		g_cond_wait (&(priv->write_cond), &(priv->write_mutex));
 	}
 
@@ -723,10 +823,15 @@ gdata_upload_stream_flush (GOutputStream *stream, GCancellable *cancellable, GEr
 		g_assert (g_cancellable_set_error_if_cancelled (cancellable, error) == TRUE ||
 		          g_cancellable_set_error_if_cancelled (priv->cancellable, error) == TRUE);
 		success = FALSE;
+	} else if (priv->state == STATE_FINISHED && priv->network_bytes_outstanding > 0) {
+		/* Resumable upload error. */
+		g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, _("Error received from server after uploading a resumable upload chunk."));
+		success = FALSE;
 	}
 
 	g_mutex_unlock (&(priv->write_mutex));
 
+done:
 	/* Disconnect from the cancelled signals. Note that we have to do this without @write_mutex held, as g_cancellable_disconnect() blocks
 	 * until any outstanding cancellation callbacks return, and they will block on @write_mutex. */
 	if (cancelled_signal != 0)
@@ -774,6 +879,7 @@ gdata_upload_stream_close (GOutputStream *stream, GCancellable *cancellable, GEr
 	gboolean cancelled = FALSE; /* must only be touched with ->response_mutex held */
 	gulong cancelled_signal = 0, global_cancelled_signal = 0;
 	CancelledData data;
+	gboolean is_finished;
 	GError *child_error = NULL;
 
 	/* If the operation was never started, return successfully immediately */
@@ -802,8 +908,12 @@ gdata_upload_stream_close (GOutputStream *stream, GCancellable *cancellable, GEr
 
 	g_mutex_lock (&(priv->response_mutex));
 
+	g_mutex_lock (&(priv->write_mutex));
+	is_finished = (priv->state == STATE_FINISHED);
+	g_mutex_unlock (&(priv->write_mutex));
+
 	/* If an operation is still in progress, the upload thread hasn't finished yetâ */
-	if (priv->finished == FALSE) {
+	if (!is_finished) {
 		/* We've reached the end of the stream, so append the footer if the entire operation hasn't been cancelled. */
 		if (priv->entry != NULL && g_cancellable_is_cancelled (priv->cancellable) == FALSE) {
 			const gchar *footer = "\n--" BOUNDARY_STRING "--";
@@ -830,10 +940,14 @@ gdata_upload_stream_close (GOutputStream *stream, GCancellable *cancellable, GEr
 	g_assert (priv->response_status == SOUP_STATUS_NONE);
 	g_assert (priv->response_error == NULL);
 
+	g_mutex_lock (&(priv->write_mutex));
+	is_finished = (priv->state == STATE_FINISHED);
+	g_mutex_unlock (&(priv->write_mutex));
+
 	/* Error handling */
-	if (priv->finished == FALSE && cancelled == TRUE) {
-		/* Cancelled? If ->finished == TRUE, the network activity finished before the gdata_upload_stream_close() operation was cancelled, so
-		 * we don't need to return an error. */
+	if (!is_finished && cancelled == TRUE) {
+		/* Cancelled? If ->state == STATE_FINISHED, the network activity finished before the gdata_upload_stream_close() operation was
+		 * cancelled, so we don't need to return an error. */
 		g_assert (g_cancellable_set_error_if_cancelled (cancellable, &child_error) == TRUE ||
 		          g_cancellable_set_error_if_cancelled (priv->cancellable, &child_error) == TRUE);
 		priv->response_status = SOUP_STATUS_CANCELLED;
@@ -885,45 +999,65 @@ write_next_chunk (GDataUploadStream *self, SoupMessage *message)
 	#define CHUNK_SIZE 8192 /* 8KiB */
 
 	GDataUploadStreamPrivate *priv = self->priv;
+	gboolean has_network_bytes_outstanding, is_complete;
 	gsize length;
 	gboolean reached_eof = FALSE;
 	guint8 next_buffer[CHUNK_SIZE];
 
 	g_mutex_lock (&(priv->write_mutex));
+	has_network_bytes_outstanding = (priv->network_bytes_outstanding > 0);
+	is_complete = (priv->state == STATE_INITIAL_REQUEST ||
+	               (priv->content_length != -1 && priv->network_bytes_written + priv->network_bytes_outstanding == priv->chunk_size));
+	g_mutex_unlock (&(priv->write_mutex));
 
-	/* If there are still bytes in libsoup's buffer, don't block on getting new bytes into the stream */
-	if (priv->network_bytes_outstanding > 0) {
-		g_mutex_unlock (&(priv->write_mutex));
+	/* If there are still bytes in libsoup's buffer, don't block on getting new bytes into the stream. Also, if we're making the initial request
+	 * of a resumable upload, don't push new data onto the network, since all of the XML was pushed into the buffer when we started. */
+	if (has_network_bytes_outstanding) {
 		return;
-	}
+	} else if (is_complete) {
+		soup_message_body_complete (priv->message->request_body);
 
-	g_mutex_unlock (&(priv->write_mutex));
+		return;
+	}
 
 	/* Append the next chunk to the message body so it can join in the fun.
-	 * Note that this call isn't blocking, and can return less than the CHUNK_SIZE. This is because
+	 * Note that this call isn't necessarily blocking, and can return less than the CHUNK_SIZE. This is because
 	 * we could deadlock if we block on getting CHUNK_SIZE bytes at the end of the stream. write() could
 	 * easily be called with fewer bytes, but has no way to notify us that we've reached the end of the
-	 * stream, so we'd happily block on receiving more bytes which weren't forthcoming. */
-	length = gdata_buffer_pop_data_limited (priv->buffer, next_buffer, CHUNK_SIZE, &reached_eof);
+	 * stream, so we'd happily block on receiving more bytes which weren't forthcoming.
+	 *
+	 * Note also that we can't block on this call with write_mutex locked, or we could get into a deadlock if the stream is flushed at the same
+	 * time (in the case that we don't know the content length ahead of time). */
+	if (priv->content_length == -1) {
+		/* Non-resumable upload. */
+		length = gdata_buffer_pop_data_limited (priv->buffer, next_buffer, CHUNK_SIZE, &reached_eof);
+	} else {
+		/* Resumable upload. Ensure we don't exceed the chunk size. */
+		length = gdata_buffer_pop_data_limited (priv->buffer, next_buffer,
+		                                        MIN (CHUNK_SIZE, priv->chunk_size - (priv->network_bytes_written +
+		                                                                             priv->network_bytes_outstanding)), &reached_eof);
+	}
 
 	g_mutex_lock (&(priv->write_mutex));
+
 	priv->message_bytes_outstanding -= length;
 	priv->network_bytes_outstanding += length;
-	g_mutex_unlock (&(priv->write_mutex));
 
 	/* Append whatever data was returned */
 	if (length > 0)
 		soup_message_body_append (priv->message->request_body, SOUP_MEMORY_COPY, next_buffer, length);
 
-	/* Finish off the request body if we've reached EOF (i.e. the stream has been closed) */
-	if (reached_eof == TRUE) {
-		g_mutex_lock (&(priv->write_mutex));
-		g_assert (priv->message_bytes_outstanding == 0);
-		g_mutex_unlock (&(priv->write_mutex));
+	/* Finish off the request body if we've reached EOF (i.e. the stream has been closed), or if we're doing a resumable upload and we reach
+	 * the maximum chunk size. */
+	if (reached_eof == TRUE ||
+	    (priv->content_length != -1 && priv->network_bytes_written + priv->network_bytes_outstanding == priv->chunk_size)) {
+		g_assert (reached_eof == FALSE || priv->message_bytes_outstanding == 0);
 
 		soup_message_body_complete (priv->message->request_body);
 	}
 
+	g_mutex_unlock (&(priv->write_mutex));
+
 	soup_session_unpause_message (priv->session, priv->message);
 }
 
@@ -951,6 +1085,11 @@ wrote_body_data_cb (SoupMessage *message, SoupBuffer *buffer, GDataUploadStream
 	g_assert (priv->network_bytes_outstanding > 0);
 	priv->network_bytes_outstanding -= buffer->length;
 	priv->network_bytes_written += buffer->length;
+
+	if (priv->state == STATE_DATA_REQUESTS) {
+		priv->total_network_bytes_written += buffer->length;
+	}
+
 	g_cond_signal (&(priv->write_cond));
 	g_mutex_unlock (&(priv->write_mutex));
 
@@ -967,22 +1106,131 @@ upload_thread (GDataUploadStream *self)
 
 	g_assert (priv->cancellable != NULL);
 
-	/* Connect to the wrote-* signals so we can prepare the next chunk for transmission */
-	g_signal_connect (priv->message, "wrote-headers", (GCallback) wrote_headers_cb, self);
-	g_signal_connect (priv->message, "wrote-body-data", (GCallback) wrote_body_data_cb, self);
+	while (TRUE) {
+		gulong wrote_headers_signal, wrote_body_data_signal;
+		gchar *new_uri;
+		SoupMessage *new_message;
+		gsize next_chunk_length;
 
-	_gdata_service_actually_send_message (priv->session, priv->message, priv->cancellable, NULL);
+		/* Connect to the wrote-* signals so we can prepare the next chunk for transmission */
+		wrote_headers_signal = g_signal_connect (priv->message, "wrote-headers", (GCallback) wrote_headers_cb, self);
+		wrote_body_data_signal = g_signal_connect (priv->message, "wrote-body-data", (GCallback) wrote_body_data_cb, self);
 
-	/* Signal write_cond, just in case we errored out and finished sending in the middle of a write */
-	g_mutex_lock (&(priv->response_mutex));
-	g_mutex_lock (&(priv->write_mutex));
-	if (priv->message_bytes_outstanding > 0 || priv->network_bytes_outstanding > 0) {
-		g_cond_signal (&(priv->write_cond));
+		_gdata_service_actually_send_message (priv->session, priv->message, priv->cancellable, NULL);
+
+		g_mutex_lock (&(priv->write_mutex));
+
+		/* If this is a resumable upload, continue to the next chunk. If it's a non-resumable upload, we're done. We have several cases:
+		 *  â Non-resumable upload:
+		 *     - Content only: STATE_DATA_REQUESTS â STATE_FINISHED
+		 *     - Metadata only: not supported
+		 *     - Content and metadata: STATE_DATA_REQUESTS â STATE_FINISHED
+		 *  â Resumable upload:
+		 *     - Content only:
+		 *        * STATE_INITIAL_REQUEST â STATE_DATA_REQUESTS
+		 *        * STATE_DATA_REQUESTS â STATE_DATA_REQUESTS
+		 *        * STATE_DATA_REQUESTS â STATE_FINISHED
+		 *     - Metadata only: STATE_INITIAL_REQUEST â STATE_FINISHED
+		 *     - Content and metadata:
+		 *        * STATE_INITIAL_REQUEST â STATE_DATA_REQUESTS
+		 *        * STATE_DATA_REQUESTS â STATE_DATA_REQUESTS
+		 *        * STATE_DATA_REQUESTS â STATE_FINISHED
+		 */
+		switch (priv->state) {
+			case STATE_INITIAL_REQUEST:
+				/* We're either a content-only or a content-and-metadata resumable upload. */
+				priv->state = STATE_DATA_REQUESTS;
+
+				/* Check the response. On success it should be empty, status 200, with a Location header telling us where to upload
+				 * next. If it's an error response, bail out and let the code in gdata_upload_stream_close() parse the error..*/
+				if (!SOUP_STATUS_IS_SUCCESSFUL (priv->message->status_code)) {
+					goto finished;
+				} else if (priv->content_length == 0 && priv->message->status_code == SOUP_STATUS_CREATED) {
+					/* If this was a metadata-only upload, we're done. */
+					goto finished;
+				}
+
+				/* Fall out and prepare the next message */
+				g_assert (priv->total_network_bytes_written == 0); /* haven't written any data yet */
+
+				break;
+			case STATE_DATA_REQUESTS:
+				/* Check the response. On completion it should contain the resulting entry's XML, status 201. On continuation it should
+				 * be empty, status 308, with a Range header and potentially a Location header telling us what/where to upload next.
+				 * If it's an error response, bail out and let the code in gdata_upload_stream_close() parse the error..*/
+				if (priv->message->status_code == 308) {
+					/* Continuation: fall out and prepare the next message */
+					g_assert (priv->content_length == -1 || priv->total_network_bytes_written < (gsize) priv->content_length);
+				} else if (SOUP_STATUS_IS_SUCCESSFUL (priv->message->status_code)) {
+					/* Completion. Check the server isn't misbehaving. */
+					g_assert (priv->content_length == -1 || priv->total_network_bytes_written == (gsize) priv->content_length);
+
+					goto finished;
+				} else {
+					/* Error */
+					goto finished;
+				}
+
+				/* Fall out and prepare the next message */
+				g_assert (priv->total_network_bytes_written > 0);
+
+				break;
+			case STATE_FINISHED:
+			default:
+				g_assert_not_reached ();
+		}
+
+		/* Prepare the next message. */
+		g_assert (priv->content_length != -1);
+
+		next_chunk_length = MIN (priv->content_length - priv->total_network_bytes_written, MAX_RESUMABLE_CHUNK_SIZE);
+
+		new_uri = g_strdup (soup_message_headers_get_one (priv->message->response_headers, "Location"));
+		if (new_uri == NULL) {
+			new_uri = soup_uri_to_string (soup_message_get_uri (priv->message), FALSE);
+		}
+
+		new_message = build_message (self, SOUP_METHOD_PUT, new_uri);
+
+		g_free (new_uri);
+
+		soup_message_headers_set_encoding (new_message->request_headers, SOUP_ENCODING_CONTENT_LENGTH);
+		soup_message_headers_set_content_type (new_message->request_headers, priv->content_type, NULL);
+		soup_message_headers_set_content_length (new_message->request_headers, next_chunk_length);
+		soup_message_headers_set_content_range (new_message->request_headers, priv->total_network_bytes_written,
+		                                        priv->total_network_bytes_written + next_chunk_length - 1, priv->content_length);
+
+		g_signal_handler_disconnect (priv->message, wrote_body_data_signal);
+		g_signal_handler_disconnect (priv->message, wrote_headers_signal);
+
+		g_object_unref (priv->message);
+		priv->message = new_message;
+
+		/* Reset various counters for the next upload. Note that message_bytes_outstanding may be > 0 at this point, since the client may
+		 * have pushed some content into the buffer while we were waiting for the response to this request. */
+		g_assert (priv->network_bytes_outstanding == 0);
+		priv->chunk_size = next_chunk_length;
+		priv->network_bytes_written = 0;
+
+		/* Loop round and upload this chunk now. */
+		g_mutex_unlock (&(priv->write_mutex));
+
+		continue;
+
+finished:
+		g_mutex_unlock (&(priv->write_mutex));
+
+		goto finished_outer;
 	}
+
+finished_outer:
+	/* Signal that the operation has finished (either successfully or in error).
+	 * Also signal write_cond, just in case we errored out and finished sending in the middle of a write. */
+	g_mutex_lock (&(priv->write_mutex));
+	priv->state = STATE_FINISHED;
+	g_cond_signal (&(priv->write_cond));
 	g_mutex_unlock (&(priv->write_mutex));
 
-	/* Signal that the operation has finished */
-	priv->finished = TRUE;
 	g_cond_signal (&(priv->finished_cond));
 	g_mutex_unlock (&(priv->response_mutex));
 
@@ -1065,6 +1313,77 @@ gdata_upload_stream_new (GDataService *service, GDataAuthorizationDomain *domain
 }
 
 /**
+ * gdata_upload_stream_new_resumable:
+ * @service: a #GDataService
+ * @domain: (allow-none): the #GDataAuthorizationDomain to authorize the upload, or %NULL
+ * @method: the HTTP method to use
+ * @upload_uri: the URI to upload
+ * @entry: (allow-none): the entry to upload as metadata, or %NULL
+ * @slug: the file's slug (filename)
+ * @content_type: the content type of the file being uploaded
+ * @content_length: the size (in bytes) of the file being uploaded
+ * @cancellable: (allow-none): a #GCancellable for the entire upload stream, or %NULL
+ *
+ * Creates a new resumable #GDataUploadStream, allowing a file to be uploaded from a GData service using standard #GOutputStream API. The upload will
+ * use GData's resumable upload API, so should be more reliable than a normal upload (especially if the file is large). See the
+ * <ulink type="http" url="http://code.google.com/apis/gdata/docs/resumable_upload.html";>GData documentation on resumable uploads</ulink> for more
+ * information.
+ *
+ * The HTTP method to use should be specified in @method, and will typically be either %SOUP_METHOD_POST (for insertions) or %SOUP_METHOD_PUT
+ * (for updates), according to the server and the @upload_uri.
+ *
+ * If @entry is specified, it will be attached to the upload as the entry to which the file being uploaded belongs. Otherwise, just the file
+ * written to the stream will be uploaded, and given a default entry as determined by the server.
+ *
+ * @slug, @content_type and @content_length must be specified before the upload begins, as they describe the file being streamed. @slug is the filename
+ * given to the file, which will typically be stored on the server and made available when downloading the file again. @content_type must be the
+ * correct content type for the file, and should be in the service's list of acceptable content types. @content_length must be the size of the file
+ * being uploaded (not including the XML for any associated #GDataEntry) in bytes. Zero is accepted if a metadata-only upload is being performed.
+ *
+ * As well as the standard GIO errors, calls to the #GOutputStream API on a #GDataUploadStream can also return any relevant specific error from
+ * #GDataServiceError, or %GDATA_SERVICE_ERROR_PROTOCOL_ERROR in the general case.
+ *
+ * If a #GCancellable is provided in @cancellable, the upload operation may be cancelled at any time from another thread using g_cancellable_cancel().
+ * In this case, any ongoing network activity will be stopped, and any pending or future calls to #GOutputStream API on the #GDataUploadStream will
+ * return %G_IO_ERROR_CANCELLED. Note that the #GCancellable objects which can be passed to individual #GOutputStream operations will not cancel the
+ * upload operation proper if cancelled â they will merely cancel that API call. The only way to cancel the upload operation completely is using this
+ * @cancellable.
+ *
+ * Note that network communication won't begin until the first call to g_output_stream_write() on the #GDataUploadStream.
+ *
+ * Return value: a new #GOutputStream, or %NULL; unref with g_object_unref()
+ *
+ * Since: 0.11.2
+ */
+GOutputStream *
+gdata_upload_stream_new_resumable (GDataService *service, GDataAuthorizationDomain *domain, const gchar *method, const gchar *upload_uri,
+                                   GDataEntry *entry, const gchar *slug, const gchar *content_type, goffset content_length, GCancellable *cancellable)
+{
+	g_return_val_if_fail (GDATA_IS_SERVICE (service), NULL);
+	g_return_val_if_fail (domain == NULL || GDATA_IS_AUTHORIZATION_DOMAIN (domain), NULL);
+	g_return_val_if_fail (method != NULL, NULL);
+	g_return_val_if_fail (upload_uri != NULL, NULL);
+	g_return_val_if_fail (entry == NULL || GDATA_IS_ENTRY (entry), NULL);
+	g_return_val_if_fail (slug != NULL, NULL);
+	g_return_val_if_fail (content_type != NULL, NULL);
+	g_return_val_if_fail (content_length >= 0, NULL);
+	g_return_val_if_fail (cancellable == NULL || G_IS_CANCELLABLE (cancellable), NULL);
+
+	/* Create the upload stream */
+	return G_OUTPUT_STREAM (g_object_new (GDATA_TYPE_UPLOAD_STREAM,
+	                                      "method", method,
+	                                      "upload-uri", upload_uri,
+	                                      "service", service,
+	                                      "authorization-domain", domain,
+	                                      "entry", entry,
+	                                      "slug", slug,
+	                                      "content-type", content_type,
+	                                      "content-length", content_length,
+	                                      "cancellable", cancellable,
+	                                      NULL));
+}
+
+/**
  * gdata_upload_stream_get_response:
  * @self: a #GDataUploadStream
  * @length: (allow-none) (out caller-allocates): return location for the length of the response, or %NULL
@@ -1238,6 +1557,24 @@ gdata_upload_stream_get_content_type (GDataUploadStream *self)
 }
 
 /**
+ * gdata_upload_stream_get_content_length:
+ * @self: a #GDataUploadStream
+ *
+ * Gets the size (in bytes) of the file being uploaded. This will be <code class="literal">-1</code> for a non-resumable upload, and zero or greater
+ * for a resumable upload.
+ *
+ * Return value: the size of the file being uploaded
+ *
+ * Since: 0.11.2
+ */
+goffset
+gdata_upload_stream_get_content_length (GDataUploadStream *self)
+{
+	g_return_val_if_fail (GDATA_IS_UPLOAD_STREAM (self), -1);
+	return self->priv->content_length;
+}
+
+/**
  * gdata_upload_stream_get_cancellable:
  * @self: a #GDataUploadStream
  *
diff --git a/gdata/gdata-upload-stream.h b/gdata/gdata-upload-stream.h
index b7e5c69..8f45e9d 100644
--- a/gdata/gdata-upload-stream.h
+++ b/gdata/gdata-upload-stream.h
@@ -29,6 +29,32 @@
 
 G_BEGIN_DECLS
 
+/**
+ * GDATA_LINK_RESUMABLE_CREATE_MEDIA:
+ *
+ * The relation type URI of the resumable upload location for resources attached to this resource.
+ *
+ * For more information, see the
+ * <ulink type="http" url="http://code.google.com/apis/gdata/docs/resumable_upload.html#ResumableUploadInitiate";>GData resumable upload protocol
+ * specification</ulink>.
+ *
+ * Since: 0.11.2
+ */
+#define GDATA_LINK_RESUMABLE_CREATE_MEDIA "http://schemas.google.com/g/2005#resumable-create-media";
+
+/**
+ * GDATA_LINK_RESUMABLE_EDIT_MEDIA:
+ *
+ * The relation type URI of the resumable update location for resources attached to this resource.
+ *
+ * For more information, see the
+ * <ulink type="http" url="http://code.google.com/apis/gdata/docs/resumable_upload.html#ResumableUploadInitiate";>GData resumable upload protocol
+ * specification</ulink>.
+ *
+ * Since: 0.11.2
+ */
+#define GDATA_LINK_RESUMABLE_EDIT_MEDIA "http://schemas.google.com/g/2005#resumable-edit-media";
+
 #define GDATA_TYPE_UPLOAD_STREAM		(gdata_upload_stream_get_type ())
 #define GDATA_UPLOAD_STREAM(o)			(G_TYPE_CHECK_INSTANCE_CAST ((o), GDATA_TYPE_UPLOAD_STREAM, GDataUploadStream))
 #define GDATA_UPLOAD_STREAM_CLASS(k)		(G_TYPE_CHECK_CLASS_CAST((k), GDATA_TYPE_UPLOAD_STREAM, GDataUploadStreamClass))
@@ -67,6 +93,9 @@ GType gdata_upload_stream_get_type (void) G_GNUC_CONST;
 GOutputStream *gdata_upload_stream_new (GDataService *service, GDataAuthorizationDomain *domain, const gchar *method, const gchar *upload_uri,
                                         GDataEntry *entry, const gchar *slug, const gchar *content_type,
                                         GCancellable *cancellable) G_GNUC_WARN_UNUSED_RESULT G_GNUC_MALLOC;
+GOutputStream *gdata_upload_stream_new_resumable (GDataService *service, GDataAuthorizationDomain *domain, const gchar *method, const gchar *upload_uri,
+                                                  GDataEntry *entry, const gchar *slug, const gchar *content_type, goffset content_length,
+                                                  GCancellable *cancellable) G_GNUC_WARN_UNUSED_RESULT G_GNUC_MALLOC;
 
 const gchar *gdata_upload_stream_get_response (GDataUploadStream *self, gssize *length);
 
@@ -77,6 +106,7 @@ const gchar *gdata_upload_stream_get_upload_uri (GDataUploadStream *self) G_GNUC
 GDataEntry *gdata_upload_stream_get_entry (GDataUploadStream *self) G_GNUC_PURE;
 const gchar *gdata_upload_stream_get_slug (GDataUploadStream *self) G_GNUC_PURE;
 const gchar *gdata_upload_stream_get_content_type (GDataUploadStream *self) G_GNUC_PURE;
+goffset gdata_upload_stream_get_content_length (GDataUploadStream *self) G_GNUC_PURE;
 GCancellable *gdata_upload_stream_get_cancellable (GDataUploadStream *self) G_GNUC_PURE;
 
 G_END_DECLS
diff --git a/gdata/gdata.symbols b/gdata/gdata.symbols
index 6305844..e75fa49 100644
--- a/gdata/gdata.symbols
+++ b/gdata/gdata.symbols
@@ -937,3 +937,5 @@ gdata_youtube_query_get_license
 gdata_youtube_query_set_license
 gdata_contacts_contact_get_file_as
 gdata_contacts_contact_set_file_as
+gdata_upload_stream_new_resumable
+gdata_upload_stream_get_content_length
diff --git a/gdata/tests/common.c b/gdata/tests/common.c
index a94f90b..34d89c0 100644
--- a/gdata/tests/common.c
+++ b/gdata/tests/common.c
@@ -538,15 +538,11 @@ compare_xml_nodes (xmlNode *node1, xmlNode *node2)
 }
 
 gboolean
-gdata_test_compare_xml (GDataParsable *parsable, const gchar *expected_xml, gboolean print_error)
+gdata_test_compare_xml_strings (const gchar *parsable_xml, const gchar *expected_xml, gboolean print_error)
 {
 	gboolean success;
-	gchar *parsable_xml;
 	xmlDoc *parsable_doc, *expected_doc;
 
-	/* Get an XML string for the GDataParsable */
-	parsable_xml = gdata_parsable_get_xml (parsable);
-
 	/* Parse both the XML strings */
 	parsable_doc = xmlReadMemory (parsable_xml, strlen (parsable_xml), "/dev/null", NULL, 0);
 	expected_doc = xmlReadMemory (expected_xml, strlen (expected_xml), "/dev/null", NULL, 0);
@@ -562,6 +558,19 @@ gdata_test_compare_xml (GDataParsable *parsable, const gchar *expected_xml, gboo
 
 	xmlFreeDoc (expected_doc);
 	xmlFreeDoc (parsable_doc);
+
+	return success;
+}
+
+gboolean
+gdata_test_compare_xml (GDataParsable *parsable, const gchar *expected_xml, gboolean print_error)
+{
+	gboolean success;
+	gchar *parsable_xml;
+
+	/* Get an XML string for the GDataParsable */
+	parsable_xml = gdata_parsable_get_xml (parsable);
+	success = gdata_test_compare_xml_strings (parsable_xml, expected_xml, print_error);
 	g_free (parsable_xml);
 
 	return success;
diff --git a/gdata/tests/common.h b/gdata/tests/common.h
index 7f16ee2..3a50b76 100644
--- a/gdata/tests/common.h
+++ b/gdata/tests/common.h
@@ -53,6 +53,7 @@ guint gdata_test_batch_operation_deletion (GDataBatchOperation *operation, GData
 gboolean gdata_test_batch_operation_run (GDataBatchOperation *operation, GCancellable *cancellable, GError **error);
 gboolean gdata_test_batch_operation_run_finish (GDataBatchOperation *operation, GAsyncResult *async_result, GError **error);
 
+gboolean gdata_test_compare_xml_strings (const gchar *parsable_xml, const gchar *expected_xml, gboolean print_error);
 gboolean gdata_test_compare_xml (GDataParsable *parsable, const gchar *expected_xml, gboolean print_error);
 
 /* Convenience macro */
diff --git a/gdata/tests/streams.c b/gdata/tests/streams.c
index b4eb679..6c79fbb 100644
--- a/gdata/tests/streams.c
+++ b/gdata/tests/streams.c
@@ -581,11 +581,407 @@ test_upload_stream_upload_no_entry_content_length (void)
 	g_main_context_unref (async_context);
 }
 
+/* Test parameters for a run of test_upload_stream_resumable(). */
+typedef struct {
+	enum {
+		CONTENT_ONLY = 0,
+		CONTENT_AND_METADATA = 1,
+		METADATA_ONLY = 2,
+	} content_type;
+#define UPLOAD_STREAM_RESUMABLE_MAX_CONTENT_TYPE 2
+	gsize file_size;
+	enum {
+		ERROR_ON_INITIAL_REQUEST = 0,
+		ERROR_ON_SUBSEQUENT_REQUEST = 1,
+		ERROR_ON_FINAL_REQUEST = 2,
+		NO_ERROR = 3,
+	} error_type;
+#define UPLOAD_STREAM_RESUMABLE_MAX_ERROR_TYPE 3
+} UploadStreamResumableTestParams;
+
+static const gchar *upload_stream_resumable_content_type_names[] = {
+	"content-only",
+	"content-and-metadata",
+	"metadata-only",
+};
+
+static const gchar *upload_stream_resumable_error_type_names[] = {
+	"initial-error",
+	"subsequent-error",
+	"final-error",
+	"success",
+};
+
+typedef struct {
+	UploadStreamResumableTestParams *test_params;
+	gsize next_range_start;
+	gsize next_range_end;
+	guint next_path_index;
+	const gchar *test_string;
+} UploadStreamResumableServerData;
+
+static void
+test_upload_stream_resumable_server_handler_cb (SoupServer *server, SoupMessage *message, const char *path, GHashTable *query,
+                                                SoupClientContext *client, UploadStreamResumableServerData *server_data)
+{
+	UploadStreamResumableTestParams *test_params;
+
+	test_params = server_data->test_params;
+
+	/* Are we handling the initial request, or a subsequent one? */
+	if (strcmp (path, "/") == 0) {
+		/* Initial request. */
+		gchar *file_size_str;
+
+		/* Check the Slug and X-Upload-* headers. */
+		g_assert_cmpstr (soup_message_headers_get_one (message->request_headers, "Slug"), ==, "slug");
+
+		file_size_str = g_strdup_printf ("%" G_GSIZE_FORMAT, test_params->file_size);
+		g_assert_cmpstr (soup_message_headers_get_one (message->request_headers, "X-Upload-Content-Type"), ==, "text/plain");
+		g_assert_cmpstr (soup_message_headers_get_one (message->request_headers, "X-Upload-Content-Length"), ==, file_size_str);
+		g_free (file_size_str);
+
+		/* Check the Content-Type and content. */
+		switch (test_params->content_type) {
+			case CONTENT_ONLY:
+				/* Check nothing was sent. */
+				g_assert_cmpstr (soup_message_headers_get_content_type (message->request_headers, NULL), ==, NULL);
+				g_assert_cmpint (message->request_body->length, ==, 0);
+
+				break;
+			case CONTENT_AND_METADATA:
+			case METADATA_ONLY:
+				/* Check the XML sent by the client. */
+				g_assert_cmpstr (soup_message_headers_get_content_type (message->request_headers, NULL), ==, "application/atom+xml");
+
+				g_assert (message->request_body->data[message->request_body->length] == '\0');
+				g_assert (gdata_test_compare_xml_strings (message->request_body->data,
+					"<?xml version='1.0' encoding='UTF-8'?>"
+					"<entry xmlns='http://www.w3.org/2005/Atom' "
+					       "xmlns:app='http://www.w3.org/2007/app' "
+					       "xmlns:georss='http://www.georss.org/georss' "
+					       "xmlns:gml='http://www.opengis.net/gml' "
+					       "xmlns:gd='http://schemas.google.com/g/2005' "
+					       "xmlns:yt='http://gdata.youtube.com/schemas/2007' "
+					       "xmlns:media='http://search.yahoo.com/mrss/'>"
+						"<title type='text'>Test title!</title>"
+						"<category term='http://gdata.youtube.com/schemas/2007#video' "
+						          "scheme='http://schemas.google.com/g/2005#kind'/>"
+						"<media:group>"
+							"<media:title type='plain'>Test title!</media:title>"
+						"</media:group>"
+						"<app:control>"
+							"<app:draft>no</app:draft>"
+						"</app:control>"
+					"</entry>", TRUE) == TRUE);
+
+				break;
+			default:
+				g_assert_not_reached ();
+		}
+
+		/* Send a response. */
+		switch (test_params->error_type) {
+			case ERROR_ON_INITIAL_REQUEST:
+				/* Error. */
+				goto error;
+			case ERROR_ON_SUBSEQUENT_REQUEST:
+			case ERROR_ON_FINAL_REQUEST:
+			case NO_ERROR:
+				/* Success. */
+				if (test_params->file_size == 0) {
+					goto completion;
+				} else {
+					goto continuation;
+				}
+
+				break;
+			default:
+				g_assert_not_reached ();
+		}
+	} else if (*path == '/' && g_ascii_strtoull (path + 1, NULL, 10) == server_data->next_path_index) {
+		/* Subsequent request. */
+
+		/* Check the Slug and X-Upload-* headers. */
+		g_assert_cmpstr (soup_message_headers_get_one (message->request_headers, "Slug"), ==, NULL);
+		g_assert_cmpstr (soup_message_headers_get_one (message->request_headers, "X-Upload-Content-Type"), ==, NULL);
+		g_assert_cmpstr (soup_message_headers_get_one (message->request_headers, "X-Upload-Content-Length"), ==, NULL);
+
+		/* Check the Content-Type and content. */
+		switch (test_params->content_type) {
+			case CONTENT_ONLY:
+			case CONTENT_AND_METADATA: {
+				goffset range_start, range_end, range_length;
+
+				/* Check the headers. */
+				g_assert_cmpstr (soup_message_headers_get_content_type (message->request_headers, NULL), ==, "text/plain");
+				g_assert_cmpint (soup_message_headers_get_content_length (message->request_headers), ==, message->request_body->length);
+				g_assert_cmpint (message->request_body->length, >, 0);
+				g_assert_cmpint (message->request_body->length, <=, 512 * 1024 /* 512 KiB */);
+				g_assert (soup_message_headers_get_content_range (message->request_headers, &range_start, &range_end,
+				                                                  &range_length) == TRUE);
+				g_assert_cmpint (range_start, ==, server_data->next_range_start);
+				g_assert_cmpint (range_end, ==, server_data->next_range_end);
+				g_assert_cmpint (range_length, ==, test_params->file_size);
+
+				/* Check the content. */
+				g_assert (memcmp (server_data->test_string + range_start, message->request_body->data,
+				                  message->request_body->length) == 0);
+
+				/* Update the expected values. */
+				server_data->next_range_start = range_end + 1;
+				server_data->next_range_end = MIN (server_data->next_range_start + 512 * 1024, test_params->file_size) - 1;
+				server_data->next_path_index++;
+
+				break;
+			}
+			case METADATA_ONLY:
+			default:
+				g_assert_not_reached ();
+		}
+
+		/* Send a response. */
+		switch (test_params->error_type) {
+			case ERROR_ON_INITIAL_REQUEST:
+				g_assert_not_reached ();
+			case ERROR_ON_SUBSEQUENT_REQUEST:
+			case ERROR_ON_FINAL_REQUEST:
+				/* Skip the error if this isn't the final request. */
+				if (test_params->error_type == ERROR_ON_SUBSEQUENT_REQUEST ||
+				    (test_params->error_type == ERROR_ON_FINAL_REQUEST && server_data->next_range_start == test_params->file_size)) {
+					goto error;
+				}
+
+				/* Fall through */
+			case NO_ERROR:
+				/* Success. */
+				if (server_data->next_range_start == test_params->file_size) {
+					goto completion;
+				} else {
+					goto continuation;
+				}
+
+				break;
+			default:
+				g_assert_not_reached ();
+		}
+	} else {
+		g_assert_not_reached ();
+	}
+
+	return;
+
+error: {
+		const gchar *error_response =
+			"<?xml version='1.0' encoding='UTF-8'?>"
+			"<errors>"
+				"<error>"
+					"<domain>yt:authentication</domain>"
+					"<code>InvalidToken</code>"
+					"<location type='header'>Authorization: GoogleLogin</location>"
+				"</error>"
+			"</errors>";
+
+		/* Error. */
+		soup_message_set_status (message, SOUP_STATUS_UNAUTHORIZED); /* arbitrary error status code */
+		soup_message_body_append (message->response_body, SOUP_MEMORY_STATIC, error_response, strlen (error_response));
+	}
+
+	return;
+
+continuation: {
+		gchar *upload_uri;
+
+		/* Continuation. */
+		if (server_data->next_path_index == 0) {
+			soup_message_set_status (message, SOUP_STATUS_OK);
+		} else {
+			soup_message_set_status (message, 308);
+		}
+
+		upload_uri = g_strdup_printf ("http://127.0.0.1:%u/%u";, soup_server_get_port (server), ++server_data->next_path_index);
+		soup_message_headers_replace (message->response_headers, "Location", upload_uri);
+		g_free (upload_uri);
+	}
+
+	return;
+
+completion: {
+		const gchar *completion_response =
+			"<?xml version='1.0' encoding='UTF-8'?>"
+			"<entry xmlns='http://www.w3.org/2005/Atom' "
+			       "xmlns:media='http://search.yahoo.com/mrss/' "
+			       "xmlns:gd='http://schemas.google.com/g/2005' "
+			       "xmlns:yt='http://gdata.youtube.com/schemas/2007' "
+			       "xmlns:app='http://www.w3.org/2007/app' "
+			       "xmlns:georss='http://www.georss.org/georss' "
+			       "xmlns:gml='http://www.opengis.net/gml' "
+			       "gd:etag='W/\"testfulness.\"'>"
+				"<title type='text'>Test title!</title>"
+				"<id>tag:youtube.com,2008:video:fooishbar</id>"
+				"<updated>2009-03-23T12:46:58Z</updated>"
+				"<published>2006-05-16T14:06:37Z</published>"
+				"<category term='http://gdata.youtube.com/schemas/2007#video' scheme='http://schemas.google.com/g/2005#kind'/>"
+				"<link href='http://www.youtube.com/watch?v=fooishbar' rel='http://www.iana.org/assignments/relation/alternate' type='text/html'/>"
+				"<link href='http://gdata.youtube.com/feeds/api/videos/fooishbar' rel='http://www.iana.org/assignments/relation/self' type='application/atom+xml'/>"
+				"<author>"
+					"<name>Brian</name>"
+					"<uri>http://gdata.youtube.com/feeds/api/users/brian</uri>"
+				"</author>"
+				"<media:group>"
+					"<media:category scheme='http://gdata.youtube.com/schemas/2007/categories.cat' label='Music'>Music</media:category>"
+					"<media:title type='plain'>Test title!</media:title>"
+				"</media:group>"
+				"<yt:recorded>2005-10-02</yt:recorded>"
+				"<app:control>"
+					"<app:draft>no</app:draft>"
+				"</app:control>"
+			"</entry>";
+
+		/* Completion. */
+		soup_message_set_status (message, SOUP_STATUS_CREATED);
+		soup_message_headers_set_content_type (message->response_headers, "application/atom+xml", NULL);
+		soup_message_body_append (message->response_body, SOUP_MEMORY_STATIC, completion_response, strlen (completion_response));
+	}
+}
+
+static void
+test_upload_stream_resumable (gconstpointer user_data)
+{
+	UploadStreamResumableTestParams *test_params;
+	UploadStreamResumableServerData server_data;
+	SoupServer *server;
+	GMainContext *async_context;
+	SoupAddress *addr;
+	GThread *thread;
+	gchar *upload_uri;
+	GDataService *service;
+	GDataEntry *entry = NULL;
+	GOutputStream *upload_stream;
+	gssize length_written;
+	gsize total_length_written = 0;
+	gchar *test_string;
+	goffset test_string_length;
+	gboolean success;
+	GError *error = NULL;
+
+	test_params = (UploadStreamResumableTestParams*) user_data;
+
+	/* Build the test string. */
+	if (test_params->file_size > 0) {
+		test_string = get_test_string (1, test_params->file_size / 4 /* arbitrary number which should generate enough data */);
+		g_assert (strlen (test_string) + 1 >= test_params->file_size);
+		test_string[test_params->file_size - 1] = '\0'; /* trim the string to the right length */
+	} else {
+		test_string = NULL;
+	}
+
+	test_string_length = test_params->file_size;
+
+	/* Create the server */
+	server_data.test_params = test_params;
+	server_data.next_range_start = 0;
+	server_data.next_range_end = MIN (test_params->file_size, 512 * 1024 /* 512 KiB */) - 1;
+	server_data.next_path_index = 0;
+	server_data.test_string = test_string;
+
+	async_context = g_main_context_new ();
+	addr = soup_address_new ("127.0.0.1", SOUP_ADDRESS_ANY_PORT);
+	soup_address_resolve_sync (addr, NULL);
+
+	server = soup_server_new (SOUP_SERVER_INTERFACE, addr,
+	                          SOUP_SERVER_ASYNC_CONTEXT, async_context,
+	                          NULL);
+	soup_server_add_handler (server, NULL, (SoupServerCallback) test_upload_stream_resumable_server_handler_cb, &server_data, NULL);
+
+	g_object_unref (addr);
+
+	g_assert (server != NULL);
+
+	/* Create a thread for the server */
+	thread = run_server (server);
+
+	/* Create a new upload stream uploading to the server */
+	if (test_params->content_type == CONTENT_AND_METADATA || test_params->content_type == METADATA_ONLY) {
+		/* Build a test entry. */
+		entry = GDATA_ENTRY (gdata_youtube_video_new (NULL));
+		gdata_entry_set_title (entry, "Test title!");
+	}
+
+	upload_uri = g_strdup_printf ("http://127.0.0.1:%u/";, soup_server_get_port (server));
+	service = GDATA_SERVICE (gdata_youtube_service_new ("developer-key", NULL));
+	upload_stream = gdata_upload_stream_new_resumable (service, NULL, SOUP_METHOD_POST, upload_uri, entry, "slug", "text/plain",
+	                                                   test_params->file_size, NULL);
+	g_object_unref (service);
+	g_free (upload_uri);
+	g_clear_object (&entry);
+
+	if (test_params->file_size > 0) {
+		while ((length_written = g_output_stream_write (upload_stream, test_string + total_length_written,
+		                                                test_string_length - total_length_written, NULL, &error)) > 0) {
+			g_assert_cmpint (length_written, <=, test_string_length - total_length_written);
+
+			total_length_written += length_written;
+		}
+	} else {
+		/* Do an empty write to poke things into action. */
+		if ((length_written = g_output_stream_write (upload_stream, "", 0, NULL, &error)) > 0) {
+			total_length_written += length_written;
+		}
+	}
+
+	/* Check the return value. */
+	switch (test_params->error_type) {
+		case ERROR_ON_INITIAL_REQUEST:
+		case ERROR_ON_SUBSEQUENT_REQUEST:
+		case ERROR_ON_FINAL_REQUEST:
+			/* We can't check the write() call for errors, since whether it throws an error depends on whether the range it's writing
+			 * overlaps a resumable upload chunk, which is entirely arbitrary and unpredictable. */
+			g_assert_cmpint (length_written, <=, 0);
+			g_assert_cmpint (total_length_written, <=, test_string_length);
+			g_clear_error (&error);
+
+			/* Close the stream */
+			success = g_output_stream_close (upload_stream, NULL, &error);
+			g_assert_error (error, GDATA_SERVICE_ERROR, GDATA_SERVICE_ERROR_AUTHENTICATION_REQUIRED);
+			g_assert (success == FALSE);
+			g_clear_error (&error);
+
+			break;
+		case NO_ERROR:
+			/* Check we've had a successful return value */
+			g_assert_no_error (error);
+			g_assert_cmpint (length_written, ==, 0);
+			g_assert_cmpint (total_length_written, ==, test_string_length);
+
+			/* Close the stream */
+			success = g_output_stream_close (upload_stream, NULL, &error);
+			g_assert_no_error (error);
+			g_assert (success == TRUE);
+
+			break;
+		default:
+			g_assert_not_reached ();
+	}
+
+	/* Kill the server and wait for it to die */
+	soup_add_completion (async_context, (GSourceFunc) quit_server_cb, server);
+	g_thread_join (thread);
+
+	g_free (test_string);
+	g_object_unref (upload_stream);
+	g_object_unref (server);
+	g_main_context_unref (async_context);
+}
+
 int
 main (int argc, char *argv[])
 {
 	gdata_test_init (argc, argv);
 
+	/* Only print out headers, since we're sending a lot of data. */
+	g_setenv ("LIBGDATA_DEBUG", "2" /* GDATA_LOG_HEADERS */, TRUE);
+
 	g_test_add_func ("/download-stream/download_content_length", test_download_stream_download_content_length);
 	g_test_add_func ("/download-stream/download_seek/before_start", test_download_stream_download_seek_before_start);
 	g_test_add_func ("/download-stream/download_seek/after_start_forwards", test_download_stream_download_seek_after_start_forwards);
@@ -593,5 +989,52 @@ main (int argc, char *argv[])
 
 	g_test_add_func ("/upload-stream/upload_no_entry_content_length", test_upload_stream_upload_no_entry_content_length);
 
+	/* Test all possible combinations of conditions for resumable uploads. */
+	{
+		guint i, j, k;
+
+		const gsize file_sizes[] = { /* all in bytes */
+			407 * 1024, /* < 512 KiB */
+			512 * 1024, /* 512 KiB */
+			666 * 1024, /* > 512 KiB, < 1024 KiB */
+			1024 * 1024, /* 1024 KiB */
+			1025 * 1024, /* > 1024 KiB */
+		};
+
+		for (i = 0; i < UPLOAD_STREAM_RESUMABLE_MAX_CONTENT_TYPE + 1; i++) {
+			for (j = 0; j < G_N_ELEMENTS (file_sizes); j++) {
+				for (k = 0; k < UPLOAD_STREAM_RESUMABLE_MAX_ERROR_TYPE + 1; k++) {
+					UploadStreamResumableTestParams *test_params;
+					gchar *test_name;
+					gsize file_size;
+
+					/* Ignore combinations of METADATA_ONLY with file_sizes or non-initial errors. */
+					if (i == METADATA_ONLY && (j != 0 || k != 0)) {
+						continue;
+					} else if (i == METADATA_ONLY) {
+						file_size = 0 /* bytes */;
+					} else {
+						file_size = file_sizes[j];
+					}
+
+					test_name = g_strdup_printf ("/upload-stream/resumable/%s/%s/%" G_GSIZE_FORMAT,
+					                             upload_stream_resumable_content_type_names[i],
+					                             upload_stream_resumable_error_type_names[k],
+					                             file_size);
+
+					/* Allocate a new struct. We leak this. */
+					test_params = g_slice_new (UploadStreamResumableTestParams);
+					test_params->content_type = i;
+					test_params->file_size = file_size;
+					test_params->error_type = k;
+
+					g_test_add_data_func (test_name, test_params, test_upload_stream_resumable);
+
+					g_free (test_name);
+				}
+			}
+		}
+	}
+
 	return g_test_run ();
 }



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]