[gvfs] dav: Implement push support
- From: Ross Lagerwall <rossl src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [gvfs] dav: Implement push support
- Date: Wed, 30 Apr 2014 19:06:34 +0000 (UTC)
commit eb11ec725f5d2850597fc88635609474c857f6aa
Author: Ross Lagerwall <rosslagerwall gmail com>
Date: Sat Apr 5 20:20:49 2014 +0100
dav: Implement push support
Implement push support for the webdav backend. This allows large files
to be uploaded properly without consuming large amounts of memory and
also makes the progress bar work when uploading.
Data is provided to libsoup in chunks, activated via the wrote_chunk
signal. Note that this uses content-length encoding rather than chunked
encoding which is not supported by many servers. The CAN_REBUILD flag
is set on the SoupMessage and accumulate is set to FALSE so that Libsoup
does not buffer all the data in memory at once. This does mean that the
restarted signal needs to be handled correctly by seeking to the
beginning of the file.
The code is written in an asynchronous fashion so that other operations
are not blocked since the webdav backend is single-threaded.
Unfortunately, this does complicate the code, especially with regards to
having reads in flight and handling the restarted signal from libsoup.
A quick benchmark writing to a tmpfs via Apache's mod_dav achieved
just over 1GB/s.
https://bugzilla.gnome.org/show_bug.cgi?id=570772
daemon/gvfsbackenddav.c | 376 +++++++++++++++++++++++++++++++++++++++++++++++
1 files changed, 376 insertions(+), 0 deletions(-)
---
diff --git a/daemon/gvfsbackenddav.c b/daemon/gvfsbackenddav.c
index e74fb25..3e9f00a 100644
--- a/daemon/gvfsbackenddav.c
+++ b/daemon/gvfsbackenddav.c
@@ -56,6 +56,7 @@
#include "gvfsjobqueryfsinfo.h"
#include "gvfsjobqueryattributes.h"
#include "gvfsjobenumerate.h"
+#include "gvfsjobpush.h"
#include "gvfsdaemonprotocol.h"
#ifdef HAVE_AVAHI
@@ -2828,6 +2829,380 @@ do_move (GVfsBackend *backend,
soup_uri_free (target_uri);
}
+#define CHUNK_SIZE 65536
+
+/* Used to keep track of the state of reads in flight when the restarted signal
+ * is received. */
+typedef enum {
+ PUSH_READ_STATUS_NONE,
+ PUSH_READ_STATUS_RESET,
+ PUSH_READ_STATUS_DEFERRED,
+} PushReadStatus;
+
+typedef struct {
+ /* Job details */
+ GVfsBackend *backend;
+ GVfsJob *job;
+ GVfsJobPush *op_job;
+
+ /* Local file */
+ GInputStream *in;
+ unsigned char *buf;
+ goffset size;
+ goffset n_read;
+ PushReadStatus read_status;
+
+ /* Remote file */
+ SoupURI *uri;
+ SoupMessage *msg;
+ goffset n_written;
+} PushHandle;
+
+static void
+push_write_next_chunk (SoupMessage *msg, gpointer user_data);
+
+static void
+push_handle_free (PushHandle *handle)
+{
+ if (handle->in)
+ {
+ g_input_stream_close_async (handle->in, 0, NULL, NULL, NULL);
+ g_object_unref (handle->in);
+ }
+ g_object_unref (handle->backend);
+ g_object_unref (handle->job);
+ soup_uri_free (handle->uri);
+ g_slice_free (PushHandle, handle);
+}
+
+static void
+push_read_cb (GObject *source, GAsyncResult *res, gpointer user_data)
+{
+ PushHandle *handle = user_data;
+ GError *error = NULL;
+ gssize n;
+
+ n = g_input_stream_read_finish (handle->in, res, &error);
+
+ /* Ignore this read if we've subsequently been restarted. */
+ if (handle->read_status != PUSH_READ_STATUS_NONE)
+ {
+ g_free (handle->buf);
+ handle->buf = NULL;
+
+ /* Queue another read if we've been subsequently restarted and
+ * push_write_next_chunk () was called in the meantime. */
+ if (handle->read_status == PUSH_READ_STATUS_DEFERRED)
+ push_write_next_chunk (handle->msg, handle);
+
+ return;
+ }
+
+ if (n > 0)
+ {
+ soup_message_body_append_take (handle->msg->request_body, handle->buf, n);
+ handle->buf = NULL;
+ handle->n_read += n;
+ soup_session_unpause_message (G_VFS_BACKEND_HTTP (handle->backend)->session_async,
+ handle->msg);
+ }
+ else if (n == 0)
+ {
+ g_free (handle->buf);
+ handle->buf = NULL;
+
+ if (handle->n_read != handle->size)
+ {
+ g_vfs_job_failed_literal (handle->job,
+ G_IO_ERROR,
+ G_IO_ERROR_FAILED,
+ _("File length changed during transfer"));
+
+ soup_session_cancel_message (G_VFS_BACKEND_HTTP (handle->backend)->session_async,
+ handle->msg,
+ SOUP_STATUS_CANCELLED);
+ }
+ }
+ else
+ {
+ g_free (handle->buf);
+ handle->buf = NULL;
+ g_vfs_job_failed_from_error (handle->job, error);
+ g_error_free (error);
+ soup_session_cancel_message (G_VFS_BACKEND_HTTP (handle->backend)->session_async,
+ handle->msg,
+ SOUP_STATUS_CANCELLED);
+ }
+}
+
+static void
+push_write_next_chunk (SoupMessage *msg, gpointer user_data)
+{
+ PushHandle *handle = user_data;
+
+ /* If we've been restarted, seek to the beginning of the file. */
+ if (handle->read_status == PUSH_READ_STATUS_RESET)
+ {
+ GError *error = NULL;
+
+ /* We've been restarted but there's still a read in flight, so defer. */
+ if (handle->buf)
+ {
+ handle->read_status = PUSH_READ_STATUS_DEFERRED;
+ return;
+ }
+
+ handle->n_read = 0;
+ handle->n_written = 0;
+ handle->read_status = PUSH_READ_STATUS_NONE;
+
+ if (!g_seekable_seek (G_SEEKABLE (handle->in),
+ 0, G_SEEK_SET,
+ handle->job->cancellable, &error))
+ {
+ g_vfs_job_failed_from_error (handle->job, error);
+ g_error_free (error);
+ soup_session_cancel_message (G_VFS_BACKEND_HTTP (handle->backend)->session_async,
+ handle->msg,
+ SOUP_STATUS_CANCELLED);
+ return;
+ }
+ }
+
+ handle->buf = g_malloc (CHUNK_SIZE);
+ g_input_stream_read_async (handle->in,
+ handle->buf, CHUNK_SIZE,
+ 0, handle->job->cancellable,
+ push_read_cb, handle);
+}
+
+static void
+push_setup_message (PushHandle *handle)
+{
+ soup_message_set_flags (handle->msg, SOUP_MESSAGE_CAN_REBUILD);
+ soup_message_body_set_accumulate (handle->msg->request_body, FALSE);
+ message_add_overwrite_header (handle->msg,
+ handle->op_job->flags & G_FILE_COPY_OVERWRITE);
+ soup_message_headers_set_encoding (handle->msg->request_headers,
+ SOUP_ENCODING_CONTENT_LENGTH);
+ soup_message_headers_set_content_length (handle->msg->request_headers,
+ handle->size);
+ soup_message_headers_set_content_type (handle->msg->request_headers,
+ "application/octet-stream",
+ NULL);
+}
+
+static void
+push_restarted (SoupMessage *msg, gpointer user_data)
+{
+ PushHandle *handle = user_data;
+
+ handle->read_status = PUSH_READ_STATUS_RESET;
+ msg->method = SOUP_METHOD_PUT;
+ push_setup_message (handle);
+}
+
+static void
+push_wrote_body_data (SoupMessage *msg, SoupBuffer *chunk, gpointer user_data)
+{
+ PushHandle *handle = user_data;
+
+ handle->n_written += chunk->length;
+ g_vfs_job_progress_callback (handle->n_written, handle->size, handle->job);
+}
+
+static void
+push_done (SoupSession *session, SoupMessage *msg, gpointer user_data)
+{
+ PushHandle *handle = user_data;
+
+ if (g_vfs_job_is_finished (handle->job))
+ ; /* We got an error so we finished the job and cancelled msg. */
+ else if (!SOUP_STATUS_IS_SUCCESSFUL (msg->status_code))
+ http_job_failed (handle->job, msg);
+ else
+ {
+ if (handle->op_job->remove_source)
+ g_unlink (handle->op_job->local_path);
+
+ g_vfs_job_succeeded (handle->job);
+ }
+
+ push_handle_free (handle);
+}
+
+static void
+push_stat_dest_cb (SoupSession *session, SoupMessage *msg, gpointer user_data)
+{
+ PushHandle *handle = user_data;
+ GFileType type;
+
+ if (stat_location_finish (msg, &type, NULL))
+ {
+ if (!(handle->op_job->flags & G_FILE_COPY_OVERWRITE))
+ {
+ g_vfs_job_failed (handle->job,
+ G_IO_ERROR,
+ G_IO_ERROR_EXISTS,
+ _("Target file already exists"));
+ push_handle_free (handle);
+ return;
+ }
+ if (type == G_FILE_TYPE_DIRECTORY)
+ {
+ g_vfs_job_failed (handle->job,
+ G_IO_ERROR,
+ G_IO_ERROR_IS_DIRECTORY,
+ _("File is directory"));
+ push_handle_free (handle);
+ return;
+ }
+ }
+
+ handle->msg = soup_message_new_from_uri (SOUP_METHOD_PUT, handle->uri);
+ push_setup_message (handle);
+
+ g_signal_connect (handle->msg, "restarted",
+ G_CALLBACK (push_restarted), handle);
+ g_signal_connect (handle->msg, "wrote_headers",
+ G_CALLBACK (push_write_next_chunk), handle);
+ g_signal_connect (handle->msg, "wrote_chunk",
+ G_CALLBACK (push_write_next_chunk), handle);
+ g_signal_connect (handle->msg, "wrote-body-data",
+ G_CALLBACK (push_wrote_body_data), handle);
+
+ soup_session_queue_message (G_VFS_BACKEND_HTTP (handle->backend)->session_async,
+ handle->msg,
+ push_done, handle);
+}
+
+static void
+push_source_fstat_cb (GObject *source, GAsyncResult *res, gpointer user_data)
+{
+ GFileInputStream *fin = G_FILE_INPUT_STREAM (source);
+ PushHandle *handle = user_data;
+ GError *error = NULL;
+ GFileInfo *info;
+
+ info = g_file_input_stream_query_info_finish (fin, res, &error);
+ if (info)
+ {
+ SoupMessage *msg;
+
+ handle->size = g_file_info_get_size (info);
+ g_object_unref (info);
+
+ msg = stat_location_begin (handle->uri, FALSE);
+ http_backend_queue_message (handle->backend,
+ msg,
+ push_stat_dest_cb, handle);
+ }
+ else
+ {
+ g_vfs_job_failed_from_error (handle->job, error);
+ g_error_free (error);
+ push_handle_free (handle);
+ }
+}
+
+static void
+push_source_open_cb (GObject *source, GAsyncResult *res, gpointer user_data)
+{
+ GFile *source_file = G_FILE (source);
+ PushHandle *handle = user_data;
+ GError *error = NULL;
+ GFileInputStream *fin;
+
+ fin = g_file_read_finish (source_file, res, &error);
+ if (fin)
+ {
+ handle->in = G_INPUT_STREAM (fin);
+
+ g_file_input_stream_query_info_async (fin,
+ G_FILE_ATTRIBUTE_STANDARD_SIZE,
+ 0, handle->job->cancellable,
+ push_source_fstat_cb, handle);
+ }
+ else
+ {
+ if (error->domain == G_IO_ERROR && error->code == G_IO_ERROR_IS_DIRECTORY)
+ {
+ /* Fall back to default implementation to improve the error message */
+ g_vfs_job_failed (handle->job, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
+ _("Not supported"));
+ }
+ else
+ g_vfs_job_failed_from_error (handle->job, error);
+
+ g_error_free (error);
+ push_handle_free (handle);
+ }
+}
+
+static void
+push_source_lstat_cb (GObject *source, GAsyncResult *res, gpointer user_data)
+{
+ GFile *source_file = G_FILE (source);
+ PushHandle *handle = user_data;
+ GError *error = NULL;
+ GFileInfo *info;
+
+ info = g_file_query_info_finish (source_file, res, &error);
+ if (!info)
+ {
+ g_vfs_job_failed_from_error (handle->job, error);
+ g_error_free (error);
+ push_handle_free (handle);
+ return;
+ }
+
+ if ((handle->op_job->flags & G_FILE_COPY_NOFOLLOW_SYMLINKS) &&
+ g_file_info_get_file_type (info) == G_FILE_TYPE_SYMBOLIC_LINK)
+ {
+ /* Fall back to default implementation to copy symlink */
+ g_vfs_job_failed (handle->job, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
+ _("Not supported"));
+ push_handle_free (handle);
+ g_object_unref (info);
+ return;
+ }
+
+ g_file_read_async (source_file,
+ 0, handle->job->cancellable,
+ push_source_open_cb, handle);
+ g_object_unref (info);
+}
+
+static gboolean
+try_push (GVfsBackend *backend,
+ GVfsJobPush *job,
+ const char *destination,
+ const char *local_path,
+ GFileCopyFlags flags,
+ gboolean remove_source,
+ GFileProgressCallback progress_callback,
+ gpointer progress_callback_data)
+{
+ GFile *source;
+ PushHandle *handle;
+
+ handle = g_slice_new0 (PushHandle);
+ handle->backend = g_object_ref (backend);
+ handle->job = g_object_ref (G_VFS_JOB (job));
+ handle->op_job = job;
+ handle->uri = g_vfs_backend_dav_uri_for_path (backend, destination, FALSE);
+
+ source = g_file_new_for_path (local_path);
+ g_file_query_info_async (source,
+ G_FILE_ATTRIBUTE_STANDARD_TYPE,
+ G_FILE_QUERY_INFO_NOFOLLOW_SYMLINKS,
+ 0, handle->job->cancellable,
+ push_source_lstat_cb, handle);
+ g_object_unref (source);
+
+ return TRUE;
+}
+
/* ************************************************************************* */
/* */
static void
@@ -2858,4 +3233,5 @@ g_vfs_backend_dav_class_init (GVfsBackendDavClass *klass)
backend_class->delete = do_delete;
backend_class->set_display_name = do_set_display_name;
backend_class->move = do_move;
+ backend_class->try_push = try_push;
}
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]