[libgdata] core: Extend GDataUploadStream with support for resumable uploads
- From: Philip Withnall <pwithnall src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [libgdata] core: Extend GDataUploadStream with support for resumable uploads
- Date: Mon, 2 Apr 2012 14:33:09 +0000 (UTC)
commit f91c994f0f6f5f96ea7765e75ecc1ce393f9b61b
Author: Philip Withnall <philip tecnocode co uk>
Date: Tue Dec 20 20:40:16 2011 +0000
core: Extend GDataUploadStream with support for resumable uploads
This adds the following API:
â gdata_upload_stream_new_resumable()
â GDataUploadStream:content-length, gdata_upload_stream_get_content_length()
Helps: https://bugzilla.gnome.org/show_bug.cgi?id=607272
docs/reference/gdata-sections.txt | 4 +
gdata/gdata-upload-stream.c | 515 ++++++++++++++++++++++++++++++-------
gdata/gdata-upload-stream.h | 30 +++
gdata/gdata.symbols | 2 +
gdata/tests/common.c | 19 +-
gdata/tests/common.h | 1 +
gdata/tests/streams.c | 443 +++++++++++++++++++++++++++++++
7 files changed, 920 insertions(+), 94 deletions(-)
---
diff --git a/docs/reference/gdata-sections.txt b/docs/reference/gdata-sections.txt
index d086729..d671218 100644
--- a/docs/reference/gdata-sections.txt
+++ b/docs/reference/gdata-sections.txt
@@ -1802,9 +1802,12 @@ GDataDownloadStreamPrivate
<SECTION>
<FILE>gdata-upload-stream</FILE>
<TITLE>GDataUploadStream</TITLE>
+GDATA_LINK_RESUMABLE_CREATE_MEDIA
+GDATA_LINK_RESUMABLE_EDIT_MEDIA
GDataUploadStream
GDataUploadStreamClass
gdata_upload_stream_new
+gdata_upload_stream_new_resumable
gdata_upload_stream_get_response
gdata_upload_stream_get_service
gdata_upload_stream_get_authorization_domain
@@ -1814,6 +1817,7 @@ gdata_upload_stream_get_upload_uri
gdata_upload_stream_get_entry
gdata_upload_stream_get_slug
gdata_upload_stream_get_content_type
+gdata_upload_stream_get_content_length
<SUBSECTION Standard>
gdata_upload_stream_get_type
GDATA_UPLOAD_STREAM
diff --git a/gdata/gdata-upload-stream.c b/gdata/gdata-upload-stream.c
index f4cbbd6..04fe783 100644
--- a/gdata/gdata-upload-stream.c
+++ b/gdata/gdata-upload-stream.c
@@ -78,7 +78,8 @@
*
* /<!-- -->* Get the file to upload *<!-- -->/
* file = get_file_to_upload ();
- * file_info = g_file_query_info (file, G_FILE_ATTRIBUTE_STANDARD_DISPLAY_NAME "," G_FILE_ATTRIBUTE_STANDARD_CONTENT_TYPE,
+ * file_info = g_file_query_info (file, G_FILE_ATTRIBUTE_STANDARD_DISPLAY_NAME "," G_FILE_ATTRIBUTE_STANDARD_CONTENT_TYPE ","
+ * G_FILE_ATTRIBUTE_STANDARD_SIZE,
* G_FILE_QUERY_INFO_NONE, NULL, &error);
*
* if (file_info == NULL) {
@@ -102,8 +103,9 @@
* service = create_my_service ();
* domain = get_my_authorization_domain_from_service (service);
* cancellable = g_cancellable_new (); /<!-- -->* cancel this to cancel the entire upload operation *<!-- -->/
- * upload_stream = gdata_upload_stream_new (service, domain, SOUP_METHOD_POST, upload_uri, NULL, g_file_info_get_display_name (file_info),
- * g_file_info_get_content_type (file_info), cancellable);
+ * upload_stream = gdata_upload_stream_new_resumable (service, domain, SOUP_METHOD_POST, upload_uri, NULL,
+ * g_file_info_get_display_name (file_info), g_file_info_get_content_type (file_info),
+ * g_file_info_get_size (file_info), cancellable);
* g_object_unref (file_info);
*
* /<!-- -->* Perform the upload asynchronously *<!-- -->/
@@ -174,6 +176,7 @@
#include "gdata-private.h"
#define BOUNDARY_STRING "0003Z5W789deadbeefRTE456KlemsnoZV"
+#define MAX_RESUMABLE_CHUNK_SIZE (512 * 1024) /* bytes = 512 KiB */
static GObject *gdata_upload_stream_constructor (GType type, guint n_construct_params, GObjectConstructParam *construct_params);
static void gdata_upload_stream_dispose (GObject *object);
@@ -187,6 +190,12 @@ static gboolean gdata_upload_stream_close (GOutputStream *stream, GCancellable *
static void create_network_thread (GDataUploadStream *self, GError **error);
+typedef enum {
+ STATE_INITIAL_REQUEST, /* initial POST request to the resumable-create-media link (unused for non-resumable uploads) */
+ STATE_DATA_REQUESTS, /* one or more subsequent PUT requests (only state used for non-resumable uploads) */
+ STATE_FINISHED, /* finished successfully or in error */
+} UploadState;
+
struct _GDataUploadStreamPrivate {
gchar *method;
gchar *upload_uri;
@@ -195,6 +204,7 @@ struct _GDataUploadStreamPrivate {
GDataEntry *entry;
gchar *slug;
gchar *content_type;
+ goffset content_length; /* -1 for non-resumable uploads; 0 or greater for resumable ones */
SoupSession *session;
SoupMessage *message;
GDataBuffer *buffer;
@@ -202,15 +212,20 @@ struct _GDataUploadStreamPrivate {
GCancellable *cancellable;
GThread *network_thread;
+ UploadState state; /* protected by write_mutex */
GMutex write_mutex; /* mutex for write operations (specifically, write_finished) */
+ /* This persists across all resumable upload chunks. Note that it doesn't count bytes from the entry XML. */
+ gsize total_network_bytes_written; /* the number of bytes which have been written to the network in STATE_DATA_REQUESTS */
+
+ /* All of the following apply only to the current resumable upload chunk. */
gsize message_bytes_outstanding; /* the number of bytes which have been written to the buffer but not libsoup (signalled by write_cond) */
gsize network_bytes_outstanding; /* the number of bytes which have been written to libsoup but not the network (signalled by write_cond) */
gsize network_bytes_written; /* the number of bytes which have been written to the network (signalled by write_cond) */
+ gsize chunk_size; /* the size of the current chunk (in bytes); 0 iff content_length <= 0; must be <= MAX_RESUMABLE_CHUNK_SIZE */
GCond write_cond; /* signalled when a chunk has been written (protected by write_mutex) */
- gboolean finished; /* set once the upload thread has finished (protected by response_mutex) */
- guint response_status; /* set once we finish receiving the response (SOUP_STATUS_NONE otherwise) (protected by response_mutex) */
GCond finished_cond; /* signalled when sending the message (and receiving the response) is finished (protected by response_mutex) */
+ guint response_status; /* set once we finish receiving the response (SOUP_STATUS_NONE otherwise) (protected by response_mutex) */
GError *response_error; /* error asynchronously set by the network thread, and picked up by the main thread when appropriate */
GMutex response_mutex; /* mutex for ->response_error, ->response_status and ->finished_cond */
};
@@ -224,6 +239,7 @@ enum {
PROP_METHOD,
PROP_CANCELLABLE,
PROP_AUTHORIZATION_DOMAIN,
+ PROP_CONTENT_LENGTH,
};
G_DEFINE_TYPE (GDataUploadStream, gdata_upload_stream, G_TYPE_OUTPUT_STREAM)
@@ -328,6 +344,22 @@ gdata_upload_stream_class_init (GDataUploadStreamClass *klass)
G_PARAM_CONSTRUCT_ONLY | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
+ * GDataUploadStream:content-length:
+ *
+ * The content length (in bytes) of the file being uploaded (i.e. as returned by g_file_info_get_size()). Note that this does not include the
+ * length of the XML serialisation of #GDataUploadStream:entry, if set.
+ *
+ * If this is <code class="literal">-1</code> the upload will be non-resumable; if it is non-negative, the upload will be resumable.
+ *
+ * Since: 0.11.2
+ */
+ g_object_class_install_property (gobject_class, PROP_CONTENT_LENGTH,
+ g_param_spec_int64 ("content-length",
+ "Content length", "The content length (in bytes) of the file being uploaded.",
+ -1, G_MAXINT64, -1,
+ G_PARAM_CONSTRUCT_ONLY | G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+ /**
* GDataUploadStream:content-type:
*
* The content type of the file being uploaded (i.e. as returned by g_file_info_get_content_type()).
@@ -373,11 +405,34 @@ gdata_upload_stream_init (GDataUploadStream *self)
g_mutex_init (&(self->priv->response_mutex));
}
+static SoupMessage *
+build_message (GDataUploadStream *self, const gchar *method, const gchar *upload_uri)
+{
+ GDataUploadStreamPrivate *priv;
+ GDataServiceClass *klass;
+ SoupMessage *new_message;
+
+ priv = self->priv;
+
+ /* Build the message */
+ new_message = soup_message_new (method, upload_uri);
+
+ /* Make sure the headers are set */
+ klass = GDATA_SERVICE_GET_CLASS (priv->service);
+ if (klass->append_query_headers != NULL) {
+ klass->append_query_headers (priv->service, priv->authorization_domain, new_message);
+ }
+
+ /* We don't want to accumulate chunks */
+ soup_message_body_set_accumulate (new_message->request_body, FALSE);
+
+ return new_message;
+}
+
static GObject *
gdata_upload_stream_constructor (GType type, guint n_construct_params, GObjectConstructParam *construct_params)
{
GDataUploadStreamPrivate *priv;
- GDataServiceClass *klass;
GObject *object;
/* Chain up to the parent class */
@@ -389,28 +444,70 @@ gdata_upload_stream_constructor (GType type, guint n_construct_params, GObjectCo
priv->cancellable = g_cancellable_new ();
/* Build the message */
- priv->message = soup_message_new (priv->method, priv->upload_uri);
-
- /* Make sure the headers are set */
- klass = GDATA_SERVICE_GET_CLASS (priv->service);
- if (klass->append_query_headers != NULL) {
- klass->append_query_headers (priv->service, priv->authorization_domain, priv->message);
- }
+ priv->message = build_message (GDATA_UPLOAD_STREAM (object), priv->method, priv->upload_uri);
if (priv->slug != NULL)
soup_message_headers_append (priv->message->request_headers, "Slug", priv->slug);
- /* We don't want to accumulate chunks */
- soup_message_body_set_accumulate (priv->message->request_body, FALSE);
- soup_message_headers_set_encoding (priv->message->request_headers, SOUP_ENCODING_CHUNKED);
+ if (priv->content_length == -1) {
+ /* Non-resumable upload */
+ soup_message_headers_set_encoding (priv->message->request_headers, SOUP_ENCODING_CHUNKED);
+
+ /* The Content-Type should be multipart/related if we're also uploading the metadata (entry != NULL),
+ * and the given content_type otherwise. */
+ if (priv->entry != NULL) {
+ const gchar *first_part_header;
+ gchar *entry_xml, *second_part_header;
+
+ soup_message_headers_set_content_type (priv->message->request_headers, "multipart/related; boundary=" BOUNDARY_STRING, NULL);
+
+ /* Start by writing out the entry; then the thread has something to write to the network when it's created */
+ first_part_header = "--" BOUNDARY_STRING "\nContent-Type: application/atom+xml; charset=UTF-8\n\n";
+ entry_xml = gdata_parsable_get_xml (GDATA_PARSABLE (priv->entry));
+ second_part_header = g_strdup_printf ("\n--" BOUNDARY_STRING "\nContent-Type: %s\nContent-Transfer-Encoding: binary\n\n",
+ priv->content_type);
+
+ /* Push the message parts onto the message body; we can skip the buffer, since the network thread hasn't yet been created,
+ * so we're the sole thread accessing the SoupMessage. */
+ soup_message_body_append (priv->message->request_body, SOUP_MEMORY_STATIC, first_part_header, strlen (first_part_header));
+ soup_message_body_append (priv->message->request_body, SOUP_MEMORY_TAKE, entry_xml, strlen (entry_xml));
+ soup_message_body_append (priv->message->request_body, SOUP_MEMORY_TAKE, second_part_header, strlen (second_part_header));
+
+ priv->network_bytes_outstanding = priv->message->request_body->length;
+ } else {
+ soup_message_headers_set_content_type (priv->message->request_headers, priv->content_type, NULL);
+ }
- /* The Content-Type should be multipart/related if we're also uploading the metadata (entry != NULL),
- * and the given content_type otherwise.
- * Note that the Content-Length header is set when we first start writing to the network. */
- if (priv->entry != NULL)
- soup_message_headers_set_content_type (priv->message->request_headers, "multipart/related; boundary=" BOUNDARY_STRING, NULL);
- else
- soup_message_headers_set_content_type (priv->message->request_headers, priv->content_type, NULL);
+ /* Non-resumable uploads start with the data requests immediately. */
+ priv->state = STATE_DATA_REQUESTS;
+ } else {
+ gchar *content_length_str;
+
+ /* Resumable upload's initial request */
+ soup_message_headers_set_encoding (priv->message->request_headers, SOUP_ENCODING_CONTENT_LENGTH);
+ soup_message_headers_replace (priv->message->request_headers, "X-Upload-Content-Type", priv->content_type);
+
+ content_length_str = g_strdup_printf ("%" G_GOFFSET_FORMAT, priv->content_length);
+ soup_message_headers_replace (priv->message->request_headers, "X-Upload-Content-Length", content_length_str);
+ g_free (content_length_str);
+
+ if (priv->entry != NULL) {
+ const gchar *entry_xml;
+
+ soup_message_headers_set_content_type (priv->message->request_headers, "application/atom+xml; charset=UTF-8", NULL);
+
+ entry_xml = gdata_parsable_get_xml (GDATA_PARSABLE (priv->entry));
+ soup_message_body_append (priv->message->request_body, SOUP_MEMORY_TAKE, entry_xml, strlen (entry_xml));
+
+ priv->network_bytes_outstanding = priv->message->request_body->length;
+ } else {
+ soup_message_headers_set_content_length (priv->message->request_headers, 0);
+ }
+
+ /* Resumable uploads always start with an initial request, which either contains the XML or is empty. */
+ priv->state = STATE_INITIAL_REQUEST;
+ priv->chunk_size = MIN (priv->content_length, MAX_RESUMABLE_CHUNK_SIZE);
+ }
/* If the entry exists and has an ETag, we assume we're updating the entry, so we can set the If-Match header */
if (priv->entry != NULL && gdata_entry_get_etag (priv->entry) != NULL)
@@ -501,6 +598,9 @@ gdata_upload_stream_get_property (GObject *object, guint property_id, GValue *va
case PROP_CONTENT_TYPE:
g_value_set_string (value, priv->content_type);
break;
+ case PROP_CONTENT_LENGTH:
+ g_value_set_int64 (value, priv->content_length);
+ break;
case PROP_CANCELLABLE:
g_value_set_object (value, priv->cancellable);
break;
@@ -539,6 +639,9 @@ gdata_upload_stream_set_property (GObject *object, guint property_id, const GVal
case PROP_CONTENT_TYPE:
priv->content_type = g_value_dup_string (value);
break;
+ case PROP_CONTENT_LENGTH:
+ priv->content_length = g_value_get_int64 (value);
+ break;
case PROP_CANCELLABLE:
/* Construction only */
priv->cancellable = g_value_dup_object (value);
@@ -568,14 +671,15 @@ write_cancelled_cb (GCancellable *cancellable, CancelledData *data)
}
static gssize
-gdata_upload_stream_write (GOutputStream *stream, const void *buffer, gsize count, GCancellable *cancellable, GError **error)
+gdata_upload_stream_write (GOutputStream *stream, const void *buffer, gsize count, GCancellable *cancellable, GError **error_out)
{
GDataUploadStreamPrivate *priv = GDATA_UPLOAD_STREAM (stream)->priv;
gssize length_written = -1;
gulong cancelled_signal = 0, global_cancelled_signal = 0;
gboolean cancelled = FALSE; /* must only be touched with ->write_mutex held */
- gsize old_network_bytes_written;
+ gsize old_total_network_bytes_written;
CancelledData data;
+ GError *error = NULL;
/* Listen for cancellation events */
data.upload_stream = GDATA_UPLOAD_STREAM (stream);
@@ -590,21 +694,21 @@ gdata_upload_stream_write (GOutputStream *stream, const void *buffer, gsize coun
g_mutex_lock (&(priv->write_mutex));
if (cancelled == TRUE) {
- g_assert (g_cancellable_set_error_if_cancelled (cancellable, error) == TRUE ||
- g_cancellable_set_error_if_cancelled (priv->cancellable, error) == TRUE);
+ g_assert (g_cancellable_set_error_if_cancelled (cancellable, &error) == TRUE ||
+ g_cancellable_set_error_if_cancelled (priv->cancellable, &error) == TRUE);
g_mutex_unlock (&(priv->write_mutex));
length_written = -1;
goto done;
}
+ g_mutex_unlock (&(priv->write_mutex));
+
/* Increment the number of bytes outstanding for the new write, and keep a record of the old number written so we know if the write's
* finished before we reach write_cond. */
- old_network_bytes_written = priv->network_bytes_written;
+ old_total_network_bytes_written = priv->total_network_bytes_written;
priv->message_bytes_outstanding += count;
- g_mutex_unlock (&(priv->write_mutex));
-
/* Handle the more common case of the network thread already having been created first */
if (priv->network_thread != NULL) {
/* Push the new data into the buffer */
@@ -612,33 +716,11 @@ gdata_upload_stream_write (GOutputStream *stream, const void *buffer, gsize coun
goto write;
}
- /* We're lazy about starting the network operation so we don't time out before we've even started */
- if (priv->entry != NULL) {
- /* Start by writing out the entry; then the thread has something to write to the network when it's created */
- const gchar *first_part_header;
- gchar *entry_xml, *second_part_header;
-
- first_part_header = "--" BOUNDARY_STRING "\nContent-Type: application/atom+xml; charset=UTF-8\n\n";
- entry_xml = gdata_parsable_get_xml (GDATA_PARSABLE (priv->entry));
- second_part_header = g_strdup_printf ("\n--" BOUNDARY_STRING "\nContent-Type: %s\nContent-Transfer-Encoding: binary\n\n",
- priv->content_type);
-
- /* Push the message parts onto the message body; we can skip the buffer, since the network thread hasn't yet been created,
- * so we're the sole thread accessing the SoupMessage. */
- soup_message_body_append (priv->message->request_body, SOUP_MEMORY_STATIC, first_part_header, strlen (first_part_header));
- soup_message_body_append (priv->message->request_body, SOUP_MEMORY_TAKE, entry_xml, strlen (entry_xml));
- soup_message_body_append (priv->message->request_body, SOUP_MEMORY_TAKE, second_part_header, strlen (second_part_header));
-
- g_mutex_lock (&(priv->write_mutex));
- priv->network_bytes_outstanding += priv->message->request_body->length;
- g_mutex_unlock (&(priv->write_mutex));
- }
-
- /* Also write out the first chunk of data, so there's guaranteed to be something in the buffer */
+ /* Write out the first chunk of data, so there's guaranteed to be something in the buffer */
gdata_buffer_push_data (priv->buffer, buffer, count);
/* Create the thread and let the writing commence! */
- create_network_thread (GDATA_UPLOAD_STREAM (stream), error);
+ create_network_thread (GDATA_UPLOAD_STREAM (stream), &error);
if (priv->network_thread == NULL) {
length_written = -1;
goto done;
@@ -648,15 +730,20 @@ write:
g_mutex_lock (&(priv->write_mutex));
/* Wait for it to be written */
- while (priv->network_bytes_written - old_network_bytes_written < count && cancelled == FALSE) {
+ while (priv->total_network_bytes_written - old_total_network_bytes_written < count && cancelled == FALSE && priv->state != STATE_FINISHED) {
g_cond_wait (&(priv->write_cond), &(priv->write_mutex));
}
- length_written = MIN (count, priv->network_bytes_written - old_network_bytes_written);
+ length_written = MIN (count, priv->total_network_bytes_written - old_total_network_bytes_written);
/* Check for an error and return if necessary */
if (cancelled == TRUE && length_written < 1) {
- g_assert (g_cancellable_set_error_if_cancelled (cancellable, error) == TRUE ||
- g_cancellable_set_error_if_cancelled (priv->cancellable, error) == TRUE);
+ /* Cancellation. */
+ g_assert (g_cancellable_set_error_if_cancelled (cancellable, &error) == TRUE ||
+ g_cancellable_set_error_if_cancelled (priv->cancellable, &error) == TRUE);
+ length_written = -1;
+ } else if (priv->state == STATE_FINISHED && (length_written < 0 || (gsize) length_written < count)) {
+ /* Resumable upload error. */
+ g_set_error (&error, G_IO_ERROR, G_IO_ERROR_FAILED, _("Error received from server after uploading a resumable upload chunk."));
length_written = -1;
}
@@ -670,7 +757,11 @@ done:
if (global_cancelled_signal != 0)
g_cancellable_disconnect (priv->cancellable, global_cancelled_signal);
- g_assert (cancelled == TRUE || length_written > 0);
+ g_assert (error != NULL || length_written > 0);
+
+ if (error != NULL) {
+ g_propagate_error (error_out, error);
+ }
return length_written;
}
@@ -710,11 +801,20 @@ gdata_upload_stream_flush (GOutputStream *stream, GCancellable *cancellable, GEr
if (cancellable != NULL)
cancelled_signal = g_cancellable_connect (cancellable, (GCallback) flush_cancelled_cb, &data, NULL);
+ /* Create the thread if it hasn't been created already. This can happen if flush() is called immediately after creating the stream. */
+ if (priv->network_thread == NULL) {
+ create_network_thread (GDATA_UPLOAD_STREAM (stream), error);
+ if (priv->network_thread == NULL) {
+ success = FALSE;
+ goto done;
+ }
+ }
+
/* Start the flush operation proper */
g_mutex_lock (&(priv->write_mutex));
/* Wait for all outstanding bytes to be written to the network */
- while (priv->network_bytes_outstanding > 0 && cancelled == FALSE) {
+ while (priv->network_bytes_outstanding > 0 && cancelled == FALSE && priv->state != STATE_FINISHED) {
g_cond_wait (&(priv->write_cond), &(priv->write_mutex));
}
@@ -723,10 +823,15 @@ gdata_upload_stream_flush (GOutputStream *stream, GCancellable *cancellable, GEr
g_assert (g_cancellable_set_error_if_cancelled (cancellable, error) == TRUE ||
g_cancellable_set_error_if_cancelled (priv->cancellable, error) == TRUE);
success = FALSE;
+ } else if (priv->state == STATE_FINISHED && priv->network_bytes_outstanding > 0) {
+ /* Resumable upload error. */
+ g_set_error (error, G_IO_ERROR, G_IO_ERROR_FAILED, _("Error received from server after uploading a resumable upload chunk."));
+ success = FALSE;
}
g_mutex_unlock (&(priv->write_mutex));
+done:
/* Disconnect from the cancelled signals. Note that we have to do this without @write_mutex held, as g_cancellable_disconnect() blocks
* until any outstanding cancellation callbacks return, and they will block on @write_mutex. */
if (cancelled_signal != 0)
@@ -774,6 +879,7 @@ gdata_upload_stream_close (GOutputStream *stream, GCancellable *cancellable, GEr
gboolean cancelled = FALSE; /* must only be touched with ->response_mutex held */
gulong cancelled_signal = 0, global_cancelled_signal = 0;
CancelledData data;
+ gboolean is_finished;
GError *child_error = NULL;
/* If the operation was never started, return successfully immediately */
@@ -802,8 +908,12 @@ gdata_upload_stream_close (GOutputStream *stream, GCancellable *cancellable, GEr
g_mutex_lock (&(priv->response_mutex));
+ g_mutex_lock (&(priv->write_mutex));
+ is_finished = (priv->state == STATE_FINISHED);
+ g_mutex_unlock (&(priv->write_mutex));
+
/* If an operation is still in progress, the upload thread hasn't finished yetâ */
- if (priv->finished == FALSE) {
+ if (!is_finished) {
/* We've reached the end of the stream, so append the footer if the entire operation hasn't been cancelled. */
if (priv->entry != NULL && g_cancellable_is_cancelled (priv->cancellable) == FALSE) {
const gchar *footer = "\n--" BOUNDARY_STRING "--";
@@ -830,10 +940,14 @@ gdata_upload_stream_close (GOutputStream *stream, GCancellable *cancellable, GEr
g_assert (priv->response_status == SOUP_STATUS_NONE);
g_assert (priv->response_error == NULL);
+ g_mutex_lock (&(priv->write_mutex));
+ is_finished = (priv->state == STATE_FINISHED);
+ g_mutex_unlock (&(priv->write_mutex));
+
/* Error handling */
- if (priv->finished == FALSE && cancelled == TRUE) {
- /* Cancelled? If ->finished == TRUE, the network activity finished before the gdata_upload_stream_close() operation was cancelled, so
- * we don't need to return an error. */
+ if (!is_finished && cancelled == TRUE) {
+ /* Cancelled? If ->state == STATE_FINISHED, the network activity finished before the gdata_upload_stream_close() operation was
+ * cancelled, so we don't need to return an error. */
g_assert (g_cancellable_set_error_if_cancelled (cancellable, &child_error) == TRUE ||
g_cancellable_set_error_if_cancelled (priv->cancellable, &child_error) == TRUE);
priv->response_status = SOUP_STATUS_CANCELLED;
@@ -885,45 +999,65 @@ write_next_chunk (GDataUploadStream *self, SoupMessage *message)
#define CHUNK_SIZE 8192 /* 8KiB */
GDataUploadStreamPrivate *priv = self->priv;
+ gboolean has_network_bytes_outstanding, is_complete;
gsize length;
gboolean reached_eof = FALSE;
guint8 next_buffer[CHUNK_SIZE];
g_mutex_lock (&(priv->write_mutex));
+ has_network_bytes_outstanding = (priv->network_bytes_outstanding > 0);
+ is_complete = (priv->state == STATE_INITIAL_REQUEST ||
+ (priv->content_length != -1 && priv->network_bytes_written + priv->network_bytes_outstanding == priv->chunk_size));
+ g_mutex_unlock (&(priv->write_mutex));
- /* If there are still bytes in libsoup's buffer, don't block on getting new bytes into the stream */
- if (priv->network_bytes_outstanding > 0) {
- g_mutex_unlock (&(priv->write_mutex));
+ /* If there are still bytes in libsoup's buffer, don't block on getting new bytes into the stream. Also, if we're making the initial request
+ * of a resumable upload, don't push new data onto the network, since all of the XML was pushed into the buffer when we started. */
+ if (has_network_bytes_outstanding) {
return;
- }
+ } else if (is_complete) {
+ soup_message_body_complete (priv->message->request_body);
- g_mutex_unlock (&(priv->write_mutex));
+ return;
+ }
/* Append the next chunk to the message body so it can join in the fun.
- * Note that this call isn't blocking, and can return less than the CHUNK_SIZE. This is because
+ * Note that this call isn't necessarily blocking, and can return less than the CHUNK_SIZE. This is because
* we could deadlock if we block on getting CHUNK_SIZE bytes at the end of the stream. write() could
* easily be called with fewer bytes, but has no way to notify us that we've reached the end of the
- * stream, so we'd happily block on receiving more bytes which weren't forthcoming. */
- length = gdata_buffer_pop_data_limited (priv->buffer, next_buffer, CHUNK_SIZE, &reached_eof);
+ * stream, so we'd happily block on receiving more bytes which weren't forthcoming.
+ *
+ * Note also that we can't block on this call with write_mutex locked, or we could get into a deadlock if the stream is flushed at the same
+ * time (in the case that we don't know the content length ahead of time). */
+ if (priv->content_length == -1) {
+ /* Non-resumable upload. */
+ length = gdata_buffer_pop_data_limited (priv->buffer, next_buffer, CHUNK_SIZE, &reached_eof);
+ } else {
+ /* Resumable upload. Ensure we don't exceed the chunk size. */
+ length = gdata_buffer_pop_data_limited (priv->buffer, next_buffer,
+ MIN (CHUNK_SIZE, priv->chunk_size - (priv->network_bytes_written +
+ priv->network_bytes_outstanding)), &reached_eof);
+ }
g_mutex_lock (&(priv->write_mutex));
+
priv->message_bytes_outstanding -= length;
priv->network_bytes_outstanding += length;
- g_mutex_unlock (&(priv->write_mutex));
/* Append whatever data was returned */
if (length > 0)
soup_message_body_append (priv->message->request_body, SOUP_MEMORY_COPY, next_buffer, length);
- /* Finish off the request body if we've reached EOF (i.e. the stream has been closed) */
- if (reached_eof == TRUE) {
- g_mutex_lock (&(priv->write_mutex));
- g_assert (priv->message_bytes_outstanding == 0);
- g_mutex_unlock (&(priv->write_mutex));
+ /* Finish off the request body if we've reached EOF (i.e. the stream has been closed), or if we're doing a resumable upload and we reach
+ * the maximum chunk size. */
+ if (reached_eof == TRUE ||
+ (priv->content_length != -1 && priv->network_bytes_written + priv->network_bytes_outstanding == priv->chunk_size)) {
+ g_assert (reached_eof == FALSE || priv->message_bytes_outstanding == 0);
soup_message_body_complete (priv->message->request_body);
}
+ g_mutex_unlock (&(priv->write_mutex));
+
soup_session_unpause_message (priv->session, priv->message);
}
@@ -951,6 +1085,11 @@ wrote_body_data_cb (SoupMessage *message, SoupBuffer *buffer, GDataUploadStream
g_assert (priv->network_bytes_outstanding > 0);
priv->network_bytes_outstanding -= buffer->length;
priv->network_bytes_written += buffer->length;
+
+ if (priv->state == STATE_DATA_REQUESTS) {
+ priv->total_network_bytes_written += buffer->length;
+ }
+
g_cond_signal (&(priv->write_cond));
g_mutex_unlock (&(priv->write_mutex));
@@ -967,22 +1106,131 @@ upload_thread (GDataUploadStream *self)
g_assert (priv->cancellable != NULL);
- /* Connect to the wrote-* signals so we can prepare the next chunk for transmission */
- g_signal_connect (priv->message, "wrote-headers", (GCallback) wrote_headers_cb, self);
- g_signal_connect (priv->message, "wrote-body-data", (GCallback) wrote_body_data_cb, self);
+ while (TRUE) {
+ gulong wrote_headers_signal, wrote_body_data_signal;
+ gchar *new_uri;
+ SoupMessage *new_message;
+ gsize next_chunk_length;
- _gdata_service_actually_send_message (priv->session, priv->message, priv->cancellable, NULL);
+ /* Connect to the wrote-* signals so we can prepare the next chunk for transmission */
+ wrote_headers_signal = g_signal_connect (priv->message, "wrote-headers", (GCallback) wrote_headers_cb, self);
+ wrote_body_data_signal = g_signal_connect (priv->message, "wrote-body-data", (GCallback) wrote_body_data_cb, self);
- /* Signal write_cond, just in case we errored out and finished sending in the middle of a write */
- g_mutex_lock (&(priv->response_mutex));
- g_mutex_lock (&(priv->write_mutex));
- if (priv->message_bytes_outstanding > 0 || priv->network_bytes_outstanding > 0) {
- g_cond_signal (&(priv->write_cond));
+ _gdata_service_actually_send_message (priv->session, priv->message, priv->cancellable, NULL);
+
+ g_mutex_lock (&(priv->write_mutex));
+
+ /* If this is a resumable upload, continue to the next chunk. If it's a non-resumable upload, we're done. We have several cases:
+ * â Non-resumable upload:
+ * - Content only: STATE_DATA_REQUESTS â STATE_FINISHED
+ * - Metadata only: not supported
+ * - Content and metadata: STATE_DATA_REQUESTS â STATE_FINISHED
+ * â Resumable upload:
+ * - Content only:
+ * * STATE_INITIAL_REQUEST â STATE_DATA_REQUESTS
+ * * STATE_DATA_REQUESTS â STATE_DATA_REQUESTS
+ * * STATE_DATA_REQUESTS â STATE_FINISHED
+ * - Metadata only: STATE_INITIAL_REQUEST â STATE_FINISHED
+ * - Content and metadata:
+ * * STATE_INITIAL_REQUEST â STATE_DATA_REQUESTS
+ * * STATE_DATA_REQUESTS â STATE_DATA_REQUESTS
+ * * STATE_DATA_REQUESTS â STATE_FINISHED
+ */
+ switch (priv->state) {
+ case STATE_INITIAL_REQUEST:
+ /* We're either a content-only or a content-and-metadata resumable upload. */
+ priv->state = STATE_DATA_REQUESTS;
+
+ /* Check the response. On success it should be empty, status 200, with a Location header telling us where to upload
+ * next. If it's an error response, bail out and let the code in gdata_upload_stream_close() parse the error..*/
+ if (!SOUP_STATUS_IS_SUCCESSFUL (priv->message->status_code)) {
+ goto finished;
+ } else if (priv->content_length == 0 && priv->message->status_code == SOUP_STATUS_CREATED) {
+ /* If this was a metadata-only upload, we're done. */
+ goto finished;
+ }
+
+ /* Fall out and prepare the next message */
+ g_assert (priv->total_network_bytes_written == 0); /* haven't written any data yet */
+
+ break;
+ case STATE_DATA_REQUESTS:
+ /* Check the response. On completion it should contain the resulting entry's XML, status 201. On continuation it should
+ * be empty, status 308, with a Range header and potentially a Location header telling us what/where to upload next.
+ * If it's an error response, bail out and let the code in gdata_upload_stream_close() parse the error..*/
+ if (priv->message->status_code == 308) {
+ /* Continuation: fall out and prepare the next message */
+ g_assert (priv->content_length == -1 || priv->total_network_bytes_written < (gsize) priv->content_length);
+ } else if (SOUP_STATUS_IS_SUCCESSFUL (priv->message->status_code)) {
+ /* Completion. Check the server isn't misbehaving. */
+ g_assert (priv->content_length == -1 || priv->total_network_bytes_written == (gsize) priv->content_length);
+
+ goto finished;
+ } else {
+ /* Error */
+ goto finished;
+ }
+
+ /* Fall out and prepare the next message */
+ g_assert (priv->total_network_bytes_written > 0);
+
+ break;
+ case STATE_FINISHED:
+ default:
+ g_assert_not_reached ();
+ }
+
+ /* Prepare the next message. */
+ g_assert (priv->content_length != -1);
+
+ next_chunk_length = MIN (priv->content_length - priv->total_network_bytes_written, MAX_RESUMABLE_CHUNK_SIZE);
+
+ new_uri = g_strdup (soup_message_headers_get_one (priv->message->response_headers, "Location"));
+ if (new_uri == NULL) {
+ new_uri = soup_uri_to_string (soup_message_get_uri (priv->message), FALSE);
+ }
+
+ new_message = build_message (self, SOUP_METHOD_PUT, new_uri);
+
+ g_free (new_uri);
+
+ soup_message_headers_set_encoding (new_message->request_headers, SOUP_ENCODING_CONTENT_LENGTH);
+ soup_message_headers_set_content_type (new_message->request_headers, priv->content_type, NULL);
+ soup_message_headers_set_content_length (new_message->request_headers, next_chunk_length);
+ soup_message_headers_set_content_range (new_message->request_headers, priv->total_network_bytes_written,
+ priv->total_network_bytes_written + next_chunk_length - 1, priv->content_length);
+
+ g_signal_handler_disconnect (priv->message, wrote_body_data_signal);
+ g_signal_handler_disconnect (priv->message, wrote_headers_signal);
+
+ g_object_unref (priv->message);
+ priv->message = new_message;
+
+ /* Reset various counters for the next upload. Note that message_bytes_outstanding may be > 0 at this point, since the client may
+ * have pushed some content into the buffer while we were waiting for the response to this request. */
+ g_assert (priv->network_bytes_outstanding == 0);
+ priv->chunk_size = next_chunk_length;
+ priv->network_bytes_written = 0;
+
+ /* Loop round and upload this chunk now. */
+ g_mutex_unlock (&(priv->write_mutex));
+
+ continue;
+
+finished:
+ g_mutex_unlock (&(priv->write_mutex));
+
+ goto finished_outer;
}
+
+finished_outer:
+ /* Signal that the operation has finished (either successfully or in error).
+ * Also signal write_cond, just in case we errored out and finished sending in the middle of a write. */
+ g_mutex_lock (&(priv->write_mutex));
+ priv->state = STATE_FINISHED;
+ g_cond_signal (&(priv->write_cond));
g_mutex_unlock (&(priv->write_mutex));
- /* Signal that the operation has finished */
- priv->finished = TRUE;
g_cond_signal (&(priv->finished_cond));
g_mutex_unlock (&(priv->response_mutex));
@@ -1065,6 +1313,77 @@ gdata_upload_stream_new (GDataService *service, GDataAuthorizationDomain *domain
}
/**
+ * gdata_upload_stream_new_resumable:
+ * @service: a #GDataService
+ * @domain: (allow-none): the #GDataAuthorizationDomain to authorize the upload, or %NULL
+ * @method: the HTTP method to use
+ * @upload_uri: the URI to upload
+ * @entry: (allow-none): the entry to upload as metadata, or %NULL
+ * @slug: the file's slug (filename)
+ * @content_type: the content type of the file being uploaded
+ * @content_length: the size (in bytes) of the file being uploaded
+ * @cancellable: (allow-none): a #GCancellable for the entire upload stream, or %NULL
+ *
+ * Creates a new resumable #GDataUploadStream, allowing a file to be uploaded from a GData service using standard #GOutputStream API. The upload will
+ * use GData's resumable upload API, so should be more reliable than a normal upload (especially if the file is large). See the
+ * <ulink type="http" url="http://code.google.com/apis/gdata/docs/resumable_upload.html">GData documentation on resumable uploads</ulink> for more
+ * information.
+ *
+ * The HTTP method to use should be specified in @method, and will typically be either %SOUP_METHOD_POST (for insertions) or %SOUP_METHOD_PUT
+ * (for updates), according to the server and the @upload_uri.
+ *
+ * If @entry is specified, it will be attached to the upload as the entry to which the file being uploaded belongs. Otherwise, just the file
+ * written to the stream will be uploaded, and given a default entry as determined by the server.
+ *
+ * @slug, @content_type and @content_length must be specified before the upload begins, as they describe the file being streamed. @slug is the filename
+ * given to the file, which will typically be stored on the server and made available when downloading the file again. @content_type must be the
+ * correct content type for the file, and should be in the service's list of acceptable content types. @content_length must be the size of the file
+ * being uploaded (not including the XML for any associated #GDataEntry) in bytes. Zero is accepted if a metadata-only upload is being performed.
+ *
+ * As well as the standard GIO errors, calls to the #GOutputStream API on a #GDataUploadStream can also return any relevant specific error from
+ * #GDataServiceError, or %GDATA_SERVICE_ERROR_PROTOCOL_ERROR in the general case.
+ *
+ * If a #GCancellable is provided in @cancellable, the upload operation may be cancelled at any time from another thread using g_cancellable_cancel().
+ * In this case, any ongoing network activity will be stopped, and any pending or future calls to #GOutputStream API on the #GDataUploadStream will
+ * return %G_IO_ERROR_CANCELLED. Note that the #GCancellable objects which can be passed to individual #GOutputStream operations will not cancel the
+ * upload operation proper if cancelled â they will merely cancel that API call. The only way to cancel the upload operation completely is using this
+ * @cancellable.
+ *
+ * Note that network communication won't begin until the first call to g_output_stream_write() on the #GDataUploadStream.
+ *
+ * Return value: a new #GOutputStream, or %NULL; unref with g_object_unref()
+ *
+ * Since: 0.11.2
+ */
+GOutputStream *
+gdata_upload_stream_new_resumable (GDataService *service, GDataAuthorizationDomain *domain, const gchar *method, const gchar *upload_uri,
+ GDataEntry *entry, const gchar *slug, const gchar *content_type, goffset content_length, GCancellable *cancellable)
+{
+ g_return_val_if_fail (GDATA_IS_SERVICE (service), NULL);
+ g_return_val_if_fail (domain == NULL || GDATA_IS_AUTHORIZATION_DOMAIN (domain), NULL);
+ g_return_val_if_fail (method != NULL, NULL);
+ g_return_val_if_fail (upload_uri != NULL, NULL);
+ g_return_val_if_fail (entry == NULL || GDATA_IS_ENTRY (entry), NULL);
+ g_return_val_if_fail (slug != NULL, NULL);
+ g_return_val_if_fail (content_type != NULL, NULL);
+ g_return_val_if_fail (content_length >= 0, NULL);
+ g_return_val_if_fail (cancellable == NULL || G_IS_CANCELLABLE (cancellable), NULL);
+
+ /* Create the upload stream */
+ return G_OUTPUT_STREAM (g_object_new (GDATA_TYPE_UPLOAD_STREAM,
+ "method", method,
+ "upload-uri", upload_uri,
+ "service", service,
+ "authorization-domain", domain,
+ "entry", entry,
+ "slug", slug,
+ "content-type", content_type,
+ "content-length", content_length,
+ "cancellable", cancellable,
+ NULL));
+}
+
+/**
* gdata_upload_stream_get_response:
* @self: a #GDataUploadStream
* @length: (allow-none) (out caller-allocates): return location for the length of the response, or %NULL
@@ -1238,6 +1557,24 @@ gdata_upload_stream_get_content_type (GDataUploadStream *self)
}
/**
+ * gdata_upload_stream_get_content_length:
+ * @self: a #GDataUploadStream
+ *
+ * Gets the size (in bytes) of the file being uploaded. This will be <code class="literal">-1</code> for a non-resumable upload, and zero or greater
+ * for a resumable upload.
+ *
+ * Return value: the size of the file being uploaded
+ *
+ * Since: 0.11.2
+ */
+goffset
+gdata_upload_stream_get_content_length (GDataUploadStream *self)
+{
+ g_return_val_if_fail (GDATA_IS_UPLOAD_STREAM (self), -1);
+ return self->priv->content_length;
+}
+
+/**
* gdata_upload_stream_get_cancellable:
* @self: a #GDataUploadStream
*
diff --git a/gdata/gdata-upload-stream.h b/gdata/gdata-upload-stream.h
index b7e5c69..8f45e9d 100644
--- a/gdata/gdata-upload-stream.h
+++ b/gdata/gdata-upload-stream.h
@@ -29,6 +29,32 @@
G_BEGIN_DECLS
+/**
+ * GDATA_LINK_RESUMABLE_CREATE_MEDIA:
+ *
+ * The relation type URI of the resumable upload location for resources attached to this resource.
+ *
+ * For more information, see the
+ * <ulink type="http" url="http://code.google.com/apis/gdata/docs/resumable_upload.html#ResumableUploadInitiate">GData resumable upload protocol
+ * specification</ulink>.
+ *
+ * Since: 0.11.2
+ */
+#define GDATA_LINK_RESUMABLE_CREATE_MEDIA "http://schemas.google.com/g/2005#resumable-create-media"
+
+/**
+ * GDATA_LINK_RESUMABLE_EDIT_MEDIA:
+ *
+ * The relation type URI of the resumable update location for resources attached to this resource.
+ *
+ * For more information, see the
+ * <ulink type="http" url="http://code.google.com/apis/gdata/docs/resumable_upload.html#ResumableUploadInitiate">GData resumable upload protocol
+ * specification</ulink>.
+ *
+ * Since: 0.11.2
+ */
+#define GDATA_LINK_RESUMABLE_EDIT_MEDIA "http://schemas.google.com/g/2005#resumable-edit-media"
+
#define GDATA_TYPE_UPLOAD_STREAM (gdata_upload_stream_get_type ())
#define GDATA_UPLOAD_STREAM(o) (G_TYPE_CHECK_INSTANCE_CAST ((o), GDATA_TYPE_UPLOAD_STREAM, GDataUploadStream))
#define GDATA_UPLOAD_STREAM_CLASS(k) (G_TYPE_CHECK_CLASS_CAST((k), GDATA_TYPE_UPLOAD_STREAM, GDataUploadStreamClass))
@@ -67,6 +93,9 @@ GType gdata_upload_stream_get_type (void) G_GNUC_CONST;
GOutputStream *gdata_upload_stream_new (GDataService *service, GDataAuthorizationDomain *domain, const gchar *method, const gchar *upload_uri,
GDataEntry *entry, const gchar *slug, const gchar *content_type,
GCancellable *cancellable) G_GNUC_WARN_UNUSED_RESULT G_GNUC_MALLOC;
+GOutputStream *gdata_upload_stream_new_resumable (GDataService *service, GDataAuthorizationDomain *domain, const gchar *method, const gchar *upload_uri,
+ GDataEntry *entry, const gchar *slug, const gchar *content_type, goffset content_length,
+ GCancellable *cancellable) G_GNUC_WARN_UNUSED_RESULT G_GNUC_MALLOC;
const gchar *gdata_upload_stream_get_response (GDataUploadStream *self, gssize *length);
@@ -77,6 +106,7 @@ const gchar *gdata_upload_stream_get_upload_uri (GDataUploadStream *self) G_GNUC
GDataEntry *gdata_upload_stream_get_entry (GDataUploadStream *self) G_GNUC_PURE;
const gchar *gdata_upload_stream_get_slug (GDataUploadStream *self) G_GNUC_PURE;
const gchar *gdata_upload_stream_get_content_type (GDataUploadStream *self) G_GNUC_PURE;
+goffset gdata_upload_stream_get_content_length (GDataUploadStream *self) G_GNUC_PURE;
GCancellable *gdata_upload_stream_get_cancellable (GDataUploadStream *self) G_GNUC_PURE;
G_END_DECLS
diff --git a/gdata/gdata.symbols b/gdata/gdata.symbols
index 6305844..e75fa49 100644
--- a/gdata/gdata.symbols
+++ b/gdata/gdata.symbols
@@ -937,3 +937,5 @@ gdata_youtube_query_get_license
gdata_youtube_query_set_license
gdata_contacts_contact_get_file_as
gdata_contacts_contact_set_file_as
+gdata_upload_stream_new_resumable
+gdata_upload_stream_get_content_length
diff --git a/gdata/tests/common.c b/gdata/tests/common.c
index a94f90b..34d89c0 100644
--- a/gdata/tests/common.c
+++ b/gdata/tests/common.c
@@ -538,15 +538,11 @@ compare_xml_nodes (xmlNode *node1, xmlNode *node2)
}
gboolean
-gdata_test_compare_xml (GDataParsable *parsable, const gchar *expected_xml, gboolean print_error)
+gdata_test_compare_xml_strings (const gchar *parsable_xml, const gchar *expected_xml, gboolean print_error)
{
gboolean success;
- gchar *parsable_xml;
xmlDoc *parsable_doc, *expected_doc;
- /* Get an XML string for the GDataParsable */
- parsable_xml = gdata_parsable_get_xml (parsable);
-
/* Parse both the XML strings */
parsable_doc = xmlReadMemory (parsable_xml, strlen (parsable_xml), "/dev/null", NULL, 0);
expected_doc = xmlReadMemory (expected_xml, strlen (expected_xml), "/dev/null", NULL, 0);
@@ -562,6 +558,19 @@ gdata_test_compare_xml (GDataParsable *parsable, const gchar *expected_xml, gboo
xmlFreeDoc (expected_doc);
xmlFreeDoc (parsable_doc);
+
+ return success;
+}
+
+gboolean
+gdata_test_compare_xml (GDataParsable *parsable, const gchar *expected_xml, gboolean print_error)
+{
+ gboolean success;
+ gchar *parsable_xml;
+
+ /* Get an XML string for the GDataParsable */
+ parsable_xml = gdata_parsable_get_xml (parsable);
+ success = gdata_test_compare_xml_strings (parsable_xml, expected_xml, print_error);
g_free (parsable_xml);
return success;
diff --git a/gdata/tests/common.h b/gdata/tests/common.h
index 7f16ee2..3a50b76 100644
--- a/gdata/tests/common.h
+++ b/gdata/tests/common.h
@@ -53,6 +53,7 @@ guint gdata_test_batch_operation_deletion (GDataBatchOperation *operation, GData
gboolean gdata_test_batch_operation_run (GDataBatchOperation *operation, GCancellable *cancellable, GError **error);
gboolean gdata_test_batch_operation_run_finish (GDataBatchOperation *operation, GAsyncResult *async_result, GError **error);
+gboolean gdata_test_compare_xml_strings (const gchar *parsable_xml, const gchar *expected_xml, gboolean print_error);
gboolean gdata_test_compare_xml (GDataParsable *parsable, const gchar *expected_xml, gboolean print_error);
/* Convenience macro */
diff --git a/gdata/tests/streams.c b/gdata/tests/streams.c
index b4eb679..6c79fbb 100644
--- a/gdata/tests/streams.c
+++ b/gdata/tests/streams.c
@@ -581,11 +581,407 @@ test_upload_stream_upload_no_entry_content_length (void)
g_main_context_unref (async_context);
}
+/* Test parameters for a run of test_upload_stream_resumable(). */
+typedef struct {
+ enum {
+ CONTENT_ONLY = 0,
+ CONTENT_AND_METADATA = 1,
+ METADATA_ONLY = 2,
+ } content_type;
+#define UPLOAD_STREAM_RESUMABLE_MAX_CONTENT_TYPE 2
+ gsize file_size;
+ enum {
+ ERROR_ON_INITIAL_REQUEST = 0,
+ ERROR_ON_SUBSEQUENT_REQUEST = 1,
+ ERROR_ON_FINAL_REQUEST = 2,
+ NO_ERROR = 3,
+ } error_type;
+#define UPLOAD_STREAM_RESUMABLE_MAX_ERROR_TYPE 3
+} UploadStreamResumableTestParams;
+
+static const gchar *upload_stream_resumable_content_type_names[] = {
+ "content-only",
+ "content-and-metadata",
+ "metadata-only",
+};
+
+static const gchar *upload_stream_resumable_error_type_names[] = {
+ "initial-error",
+ "subsequent-error",
+ "final-error",
+ "success",
+};
+
+typedef struct {
+ UploadStreamResumableTestParams *test_params;
+ gsize next_range_start;
+ gsize next_range_end;
+ guint next_path_index;
+ const gchar *test_string;
+} UploadStreamResumableServerData;
+
+static void
+test_upload_stream_resumable_server_handler_cb (SoupServer *server, SoupMessage *message, const char *path, GHashTable *query,
+ SoupClientContext *client, UploadStreamResumableServerData *server_data)
+{
+ UploadStreamResumableTestParams *test_params;
+
+ test_params = server_data->test_params;
+
+ /* Are we handling the initial request, or a subsequent one? */
+ if (strcmp (path, "/") == 0) {
+ /* Initial request. */
+ gchar *file_size_str;
+
+ /* Check the Slug and X-Upload-* headers. */
+ g_assert_cmpstr (soup_message_headers_get_one (message->request_headers, "Slug"), ==, "slug");
+
+ file_size_str = g_strdup_printf ("%" G_GSIZE_FORMAT, test_params->file_size);
+ g_assert_cmpstr (soup_message_headers_get_one (message->request_headers, "X-Upload-Content-Type"), ==, "text/plain");
+ g_assert_cmpstr (soup_message_headers_get_one (message->request_headers, "X-Upload-Content-Length"), ==, file_size_str);
+ g_free (file_size_str);
+
+ /* Check the Content-Type and content. */
+ switch (test_params->content_type) {
+ case CONTENT_ONLY:
+ /* Check nothing was sent. */
+ g_assert_cmpstr (soup_message_headers_get_content_type (message->request_headers, NULL), ==, NULL);
+ g_assert_cmpint (message->request_body->length, ==, 0);
+
+ break;
+ case CONTENT_AND_METADATA:
+ case METADATA_ONLY:
+ /* Check the XML sent by the client. */
+ g_assert_cmpstr (soup_message_headers_get_content_type (message->request_headers, NULL), ==, "application/atom+xml");
+
+ g_assert (message->request_body->data[message->request_body->length] == '\0');
+ g_assert (gdata_test_compare_xml_strings (message->request_body->data,
+ "<?xml version='1.0' encoding='UTF-8'?>"
+ "<entry xmlns='http://www.w3.org/2005/Atom' "
+ "xmlns:app='http://www.w3.org/2007/app' "
+ "xmlns:georss='http://www.georss.org/georss' "
+ "xmlns:gml='http://www.opengis.net/gml' "
+ "xmlns:gd='http://schemas.google.com/g/2005' "
+ "xmlns:yt='http://gdata.youtube.com/schemas/2007' "
+ "xmlns:media='http://search.yahoo.com/mrss/'>"
+ "<title type='text'>Test title!</title>"
+ "<category term='http://gdata.youtube.com/schemas/2007#video' "
+ "scheme='http://schemas.google.com/g/2005#kind'/>"
+ "<media:group>"
+ "<media:title type='plain'>Test title!</media:title>"
+ "</media:group>"
+ "<app:control>"
+ "<app:draft>no</app:draft>"
+ "</app:control>"
+ "</entry>", TRUE) == TRUE);
+
+ break;
+ default:
+ g_assert_not_reached ();
+ }
+
+ /* Send a response. */
+ switch (test_params->error_type) {
+ case ERROR_ON_INITIAL_REQUEST:
+ /* Error. */
+ goto error;
+ case ERROR_ON_SUBSEQUENT_REQUEST:
+ case ERROR_ON_FINAL_REQUEST:
+ case NO_ERROR:
+ /* Success. */
+ if (test_params->file_size == 0) {
+ goto completion;
+ } else {
+ goto continuation;
+ }
+
+ break;
+ default:
+ g_assert_not_reached ();
+ }
+ } else if (*path == '/' && g_ascii_strtoull (path + 1, NULL, 10) == server_data->next_path_index) {
+ /* Subsequent request. */
+
+ /* Check the Slug and X-Upload-* headers. */
+ g_assert_cmpstr (soup_message_headers_get_one (message->request_headers, "Slug"), ==, NULL);
+ g_assert_cmpstr (soup_message_headers_get_one (message->request_headers, "X-Upload-Content-Type"), ==, NULL);
+ g_assert_cmpstr (soup_message_headers_get_one (message->request_headers, "X-Upload-Content-Length"), ==, NULL);
+
+ /* Check the Content-Type and content. */
+ switch (test_params->content_type) {
+ case CONTENT_ONLY:
+ case CONTENT_AND_METADATA: {
+ goffset range_start, range_end, range_length;
+
+ /* Check the headers. */
+ g_assert_cmpstr (soup_message_headers_get_content_type (message->request_headers, NULL), ==, "text/plain");
+ g_assert_cmpint (soup_message_headers_get_content_length (message->request_headers), ==, message->request_body->length);
+ g_assert_cmpint (message->request_body->length, >, 0);
+ g_assert_cmpint (message->request_body->length, <=, 512 * 1024 /* 512 KiB */);
+ g_assert (soup_message_headers_get_content_range (message->request_headers, &range_start, &range_end,
+ &range_length) == TRUE);
+ g_assert_cmpint (range_start, ==, server_data->next_range_start);
+ g_assert_cmpint (range_end, ==, server_data->next_range_end);
+ g_assert_cmpint (range_length, ==, test_params->file_size);
+
+ /* Check the content. */
+ g_assert (memcmp (server_data->test_string + range_start, message->request_body->data,
+ message->request_body->length) == 0);
+
+ /* Update the expected values. */
+ server_data->next_range_start = range_end + 1;
+ server_data->next_range_end = MIN (server_data->next_range_start + 512 * 1024, test_params->file_size) - 1;
+ server_data->next_path_index++;
+
+ break;
+ }
+ case METADATA_ONLY:
+ default:
+ g_assert_not_reached ();
+ }
+
+ /* Send a response. */
+ switch (test_params->error_type) {
+ case ERROR_ON_INITIAL_REQUEST:
+ g_assert_not_reached ();
+ case ERROR_ON_SUBSEQUENT_REQUEST:
+ case ERROR_ON_FINAL_REQUEST:
+ /* Skip the error if this isn't the final request. */
+ if (test_params->error_type == ERROR_ON_SUBSEQUENT_REQUEST ||
+ (test_params->error_type == ERROR_ON_FINAL_REQUEST && server_data->next_range_start == test_params->file_size)) {
+ goto error;
+ }
+
+ /* Fall through */
+ case NO_ERROR:
+ /* Success. */
+ if (server_data->next_range_start == test_params->file_size) {
+ goto completion;
+ } else {
+ goto continuation;
+ }
+
+ break;
+ default:
+ g_assert_not_reached ();
+ }
+ } else {
+ g_assert_not_reached ();
+ }
+
+ return;
+
+error: {
+ const gchar *error_response =
+ "<?xml version='1.0' encoding='UTF-8'?>"
+ "<errors>"
+ "<error>"
+ "<domain>yt:authentication</domain>"
+ "<code>InvalidToken</code>"
+ "<location type='header'>Authorization: GoogleLogin</location>"
+ "</error>"
+ "</errors>";
+
+ /* Error. */
+ soup_message_set_status (message, SOUP_STATUS_UNAUTHORIZED); /* arbitrary error status code */
+ soup_message_body_append (message->response_body, SOUP_MEMORY_STATIC, error_response, strlen (error_response));
+ }
+
+ return;
+
+continuation: {
+ gchar *upload_uri;
+
+ /* Continuation. */
+ if (server_data->next_path_index == 0) {
+ soup_message_set_status (message, SOUP_STATUS_OK);
+ } else {
+ soup_message_set_status (message, 308);
+ }
+
+ upload_uri = g_strdup_printf ("http://127.0.0.1:%u/%u", soup_server_get_port (server), ++server_data->next_path_index);
+ soup_message_headers_replace (message->response_headers, "Location", upload_uri);
+ g_free (upload_uri);
+ }
+
+ return;
+
+completion: {
+ const gchar *completion_response =
+ "<?xml version='1.0' encoding='UTF-8'?>"
+ "<entry xmlns='http://www.w3.org/2005/Atom' "
+ "xmlns:media='http://search.yahoo.com/mrss/' "
+ "xmlns:gd='http://schemas.google.com/g/2005' "
+ "xmlns:yt='http://gdata.youtube.com/schemas/2007' "
+ "xmlns:app='http://www.w3.org/2007/app' "
+ "xmlns:georss='http://www.georss.org/georss' "
+ "xmlns:gml='http://www.opengis.net/gml' "
+ "gd:etag='W/\"testfulness.\"'>"
+ "<title type='text'>Test title!</title>"
+ "<id>tag:youtube.com,2008:video:fooishbar</id>"
+ "<updated>2009-03-23T12:46:58Z</updated>"
+ "<published>2006-05-16T14:06:37Z</published>"
+ "<category term='http://gdata.youtube.com/schemas/2007#video' scheme='http://schemas.google.com/g/2005#kind'/>"
+ "<link href='http://www.youtube.com/watch?v=fooishbar' rel='http://www.iana.org/assignments/relation/alternate' type='text/html'/>"
+ "<link href='http://gdata.youtube.com/feeds/api/videos/fooishbar' rel='http://www.iana.org/assignments/relation/self' type='application/atom+xml'/>"
+ "<author>"
+ "<name>Brian</name>"
+ "<uri>http://gdata.youtube.com/feeds/api/users/brian</uri>"
+ "</author>"
+ "<media:group>"
+ "<media:category scheme='http://gdata.youtube.com/schemas/2007/categories.cat' label='Music'>Music</media:category>"
+ "<media:title type='plain'>Test title!</media:title>"
+ "</media:group>"
+ "<yt:recorded>2005-10-02</yt:recorded>"
+ "<app:control>"
+ "<app:draft>no</app:draft>"
+ "</app:control>"
+ "</entry>";
+
+ /* Completion. */
+ soup_message_set_status (message, SOUP_STATUS_CREATED);
+ soup_message_headers_set_content_type (message->response_headers, "application/atom+xml", NULL);
+ soup_message_body_append (message->response_body, SOUP_MEMORY_STATIC, completion_response, strlen (completion_response));
+ }
+}
+
+static void
+test_upload_stream_resumable (gconstpointer user_data)
+{
+ UploadStreamResumableTestParams *test_params;
+ UploadStreamResumableServerData server_data;
+ SoupServer *server;
+ GMainContext *async_context;
+ SoupAddress *addr;
+ GThread *thread;
+ gchar *upload_uri;
+ GDataService *service;
+ GDataEntry *entry = NULL;
+ GOutputStream *upload_stream;
+ gssize length_written;
+ gsize total_length_written = 0;
+ gchar *test_string;
+ goffset test_string_length;
+ gboolean success;
+ GError *error = NULL;
+
+ test_params = (UploadStreamResumableTestParams*) user_data;
+
+ /* Build the test string. */
+ if (test_params->file_size > 0) {
+ test_string = get_test_string (1, test_params->file_size / 4 /* arbitrary number which should generate enough data */);
+ g_assert (strlen (test_string) + 1 >= test_params->file_size);
+ test_string[test_params->file_size - 1] = '\0'; /* trim the string to the right length */
+ } else {
+ test_string = NULL;
+ }
+
+ test_string_length = test_params->file_size;
+
+ /* Create the server */
+ server_data.test_params = test_params;
+ server_data.next_range_start = 0;
+ server_data.next_range_end = MIN (test_params->file_size, 512 * 1024 /* 512 KiB */) - 1;
+ server_data.next_path_index = 0;
+ server_data.test_string = test_string;
+
+ async_context = g_main_context_new ();
+ addr = soup_address_new ("127.0.0.1", SOUP_ADDRESS_ANY_PORT);
+ soup_address_resolve_sync (addr, NULL);
+
+ server = soup_server_new (SOUP_SERVER_INTERFACE, addr,
+ SOUP_SERVER_ASYNC_CONTEXT, async_context,
+ NULL);
+ soup_server_add_handler (server, NULL, (SoupServerCallback) test_upload_stream_resumable_server_handler_cb, &server_data, NULL);
+
+ g_object_unref (addr);
+
+ g_assert (server != NULL);
+
+ /* Create a thread for the server */
+ thread = run_server (server);
+
+ /* Create a new upload stream uploading to the server */
+ if (test_params->content_type == CONTENT_AND_METADATA || test_params->content_type == METADATA_ONLY) {
+ /* Build a test entry. */
+ entry = GDATA_ENTRY (gdata_youtube_video_new (NULL));
+ gdata_entry_set_title (entry, "Test title!");
+ }
+
+ upload_uri = g_strdup_printf ("http://127.0.0.1:%u/", soup_server_get_port (server));
+ service = GDATA_SERVICE (gdata_youtube_service_new ("developer-key", NULL));
+ upload_stream = gdata_upload_stream_new_resumable (service, NULL, SOUP_METHOD_POST, upload_uri, entry, "slug", "text/plain",
+ test_params->file_size, NULL);
+ g_object_unref (service);
+ g_free (upload_uri);
+ g_clear_object (&entry);
+
+ if (test_params->file_size > 0) {
+ while ((length_written = g_output_stream_write (upload_stream, test_string + total_length_written,
+ test_string_length - total_length_written, NULL, &error)) > 0) {
+ g_assert_cmpint (length_written, <=, test_string_length - total_length_written);
+
+ total_length_written += length_written;
+ }
+ } else {
+ /* Do an empty write to poke things into action. */
+ if ((length_written = g_output_stream_write (upload_stream, "", 0, NULL, &error)) > 0) {
+ total_length_written += length_written;
+ }
+ }
+
+ /* Check the return value. */
+ switch (test_params->error_type) {
+ case ERROR_ON_INITIAL_REQUEST:
+ case ERROR_ON_SUBSEQUENT_REQUEST:
+ case ERROR_ON_FINAL_REQUEST:
+ /* We can't check the write() call for errors, since whether it throws an error depends on whether the range it's writing
+ * overlaps a resumable upload chunk, which is entirely arbitrary and unpredictable. */
+ g_assert_cmpint (length_written, <=, 0);
+ g_assert_cmpint (total_length_written, <=, test_string_length);
+ g_clear_error (&error);
+
+ /* Close the stream */
+ success = g_output_stream_close (upload_stream, NULL, &error);
+ g_assert_error (error, GDATA_SERVICE_ERROR, GDATA_SERVICE_ERROR_AUTHENTICATION_REQUIRED);
+ g_assert (success == FALSE);
+ g_clear_error (&error);
+
+ break;
+ case NO_ERROR:
+ /* Check we've had a successful return value */
+ g_assert_no_error (error);
+ g_assert_cmpint (length_written, ==, 0);
+ g_assert_cmpint (total_length_written, ==, test_string_length);
+
+ /* Close the stream */
+ success = g_output_stream_close (upload_stream, NULL, &error);
+ g_assert_no_error (error);
+ g_assert (success == TRUE);
+
+ break;
+ default:
+ g_assert_not_reached ();
+ }
+
+ /* Kill the server and wait for it to die */
+ soup_add_completion (async_context, (GSourceFunc) quit_server_cb, server);
+ g_thread_join (thread);
+
+ g_free (test_string);
+ g_object_unref (upload_stream);
+ g_object_unref (server);
+ g_main_context_unref (async_context);
+}
+
int
main (int argc, char *argv[])
{
gdata_test_init (argc, argv);
+ /* Only print out headers, since we're sending a lot of data. */
+ g_setenv ("LIBGDATA_DEBUG", "2" /* GDATA_LOG_HEADERS */, TRUE);
+
g_test_add_func ("/download-stream/download_content_length", test_download_stream_download_content_length);
g_test_add_func ("/download-stream/download_seek/before_start", test_download_stream_download_seek_before_start);
g_test_add_func ("/download-stream/download_seek/after_start_forwards", test_download_stream_download_seek_after_start_forwards);
@@ -593,5 +989,52 @@ main (int argc, char *argv[])
g_test_add_func ("/upload-stream/upload_no_entry_content_length", test_upload_stream_upload_no_entry_content_length);
+ /* Test all possible combinations of conditions for resumable uploads. */
+ {
+ guint i, j, k;
+
+ const gsize file_sizes[] = { /* all in bytes */
+ 407 * 1024, /* < 512 KiB */
+ 512 * 1024, /* 512 KiB */
+ 666 * 1024, /* > 512 KiB, < 1024 KiB */
+ 1024 * 1024, /* 1024 KiB */
+ 1025 * 1024, /* > 1024 KiB */
+ };
+
+ for (i = 0; i < UPLOAD_STREAM_RESUMABLE_MAX_CONTENT_TYPE + 1; i++) {
+ for (j = 0; j < G_N_ELEMENTS (file_sizes); j++) {
+ for (k = 0; k < UPLOAD_STREAM_RESUMABLE_MAX_ERROR_TYPE + 1; k++) {
+ UploadStreamResumableTestParams *test_params;
+ gchar *test_name;
+ gsize file_size;
+
+ /* Ignore combinations of METADATA_ONLY with file_sizes or non-initial errors. */
+ if (i == METADATA_ONLY && (j != 0 || k != 0)) {
+ continue;
+ } else if (i == METADATA_ONLY) {
+ file_size = 0 /* bytes */;
+ } else {
+ file_size = file_sizes[j];
+ }
+
+ test_name = g_strdup_printf ("/upload-stream/resumable/%s/%s/%" G_GSIZE_FORMAT,
+ upload_stream_resumable_content_type_names[i],
+ upload_stream_resumable_error_type_names[k],
+ file_size);
+
+ /* Allocate a new struct. We leak this. */
+ test_params = g_slice_new (UploadStreamResumableTestParams);
+ test_params->content_type = i;
+ test_params->file_size = file_size;
+ test_params->error_type = k;
+
+ g_test_add_data_func (test_name, test_params, test_upload_stream_resumable);
+
+ g_free (test_name);
+ }
+ }
+ }
+ }
+
return g_test_run ();
}
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]