[gvfs] dav: Implement push support



commit eb11ec725f5d2850597fc88635609474c857f6aa
Author: Ross Lagerwall <rosslagerwall gmail com>
Date:   Sat Apr 5 20:20:49 2014 +0100

    dav: Implement push support
    
    Implement push support for the webdav backend.  This allows large files
    to be uploaded properly without consuming large amounts of memory and
    also makes the progress bar work when uploading.
    
    Data is provided to libsoup in chunks, activated via the wrote_chunk
    signal.  Note that this uses content-length encoding rather than chunked
    encoding which is not supported by many servers.  The CAN_REBUILD flag
    is set on the SoupMessage and accumulate is set to FALSE so that Libsoup
    does not buffer all the data in memory at once.  This does mean that the
    restarted signal needs to be handled correctly by seeking to the
    beginning of the file.
    
    The code is written in an asynchronous fashion so that other operations
    are not blocked since the webdav backend is single-threaded.
    Unfortunately, this does complicate the code, especially with regards to
    having reads in flight and handling the restarted signal from libsoup.
    
    A quick benchmark writing to a tmpfs via Apache's mod_dav achieved
    just over 1GB/s.
    
    https://bugzilla.gnome.org/show_bug.cgi?id=570772

 daemon/gvfsbackenddav.c |  376 +++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 376 insertions(+), 0 deletions(-)
---
diff --git a/daemon/gvfsbackenddav.c b/daemon/gvfsbackenddav.c
index e74fb25..3e9f00a 100644
--- a/daemon/gvfsbackenddav.c
+++ b/daemon/gvfsbackenddav.c
@@ -56,6 +56,7 @@
 #include "gvfsjobqueryfsinfo.h"
 #include "gvfsjobqueryattributes.h"
 #include "gvfsjobenumerate.h"
+#include "gvfsjobpush.h"
 #include "gvfsdaemonprotocol.h"
 
 #ifdef HAVE_AVAHI
@@ -2828,6 +2829,380 @@ do_move (GVfsBackend *backend,
   soup_uri_free (target_uri);
 }
 
+#define CHUNK_SIZE 65536
+
+/* Used to keep track of the state of reads in flight when the restarted signal
+ * is received. */
+typedef enum {
+  PUSH_READ_STATUS_NONE,
+  PUSH_READ_STATUS_RESET,
+  PUSH_READ_STATUS_DEFERRED,
+} PushReadStatus;
+
+typedef struct {
+  /* Job details */
+  GVfsBackend *backend;
+  GVfsJob *job;
+  GVfsJobPush *op_job;
+
+  /* Local file */
+  GInputStream *in;
+  unsigned char *buf;
+  goffset size;
+  goffset n_read;
+  PushReadStatus read_status;
+
+  /* Remote file */
+  SoupURI *uri;
+  SoupMessage *msg;
+  goffset n_written;
+} PushHandle;
+
+static void
+push_write_next_chunk (SoupMessage *msg, gpointer user_data);
+
+static void
+push_handle_free (PushHandle *handle)
+{
+  if (handle->in)
+    {
+      g_input_stream_close_async (handle->in, 0, NULL, NULL, NULL);
+      g_object_unref (handle->in);
+    }
+  g_object_unref (handle->backend);
+  g_object_unref (handle->job);
+  soup_uri_free (handle->uri);
+  g_slice_free (PushHandle, handle);
+}
+
+static void
+push_read_cb (GObject *source, GAsyncResult *res, gpointer user_data)
+{
+  PushHandle *handle = user_data;
+  GError *error = NULL;
+  gssize n;
+
+  n = g_input_stream_read_finish (handle->in, res, &error);
+
+  /* Ignore this read if we've subsequently been restarted. */
+  if (handle->read_status != PUSH_READ_STATUS_NONE)
+    {
+      g_free (handle->buf);
+      handle->buf = NULL;
+
+      /* Queue another read if we've been subsequently restarted and
+       * push_write_next_chunk () was called in the meantime. */
+      if (handle->read_status == PUSH_READ_STATUS_DEFERRED)
+        push_write_next_chunk (handle->msg, handle);
+
+      return;
+    }
+
+  if (n > 0)
+    {
+      soup_message_body_append_take (handle->msg->request_body, handle->buf, n);
+      handle->buf = NULL;
+      handle->n_read += n;
+      soup_session_unpause_message (G_VFS_BACKEND_HTTP (handle->backend)->session_async,
+                                    handle->msg);
+    }
+  else if (n == 0)
+    {
+      g_free (handle->buf);
+      handle->buf = NULL;
+
+      if (handle->n_read != handle->size)
+        {
+          g_vfs_job_failed_literal (handle->job,
+                                    G_IO_ERROR,
+                                    G_IO_ERROR_FAILED,
+                                    _("File length changed during transfer"));
+
+          soup_session_cancel_message (G_VFS_BACKEND_HTTP (handle->backend)->session_async,
+                                       handle->msg,
+                                       SOUP_STATUS_CANCELLED);
+        }
+    }
+  else
+    {
+      g_free (handle->buf);
+      handle->buf = NULL;
+      g_vfs_job_failed_from_error (handle->job, error);
+      g_error_free (error);
+      soup_session_cancel_message (G_VFS_BACKEND_HTTP (handle->backend)->session_async,
+                                   handle->msg,
+                                   SOUP_STATUS_CANCELLED);
+    }
+}
+
+static void
+push_write_next_chunk (SoupMessage *msg, gpointer user_data)
+{
+  PushHandle *handle = user_data;
+
+  /* If we've been restarted, seek to the beginning of the file. */
+  if (handle->read_status == PUSH_READ_STATUS_RESET)
+    {
+      GError *error = NULL;
+
+      /* We've been restarted but there's still a read in flight, so defer. */
+      if (handle->buf)
+        {
+          handle->read_status = PUSH_READ_STATUS_DEFERRED;
+          return;
+        }
+
+      handle->n_read = 0;
+      handle->n_written = 0;
+      handle->read_status = PUSH_READ_STATUS_NONE;
+
+      if (!g_seekable_seek (G_SEEKABLE (handle->in),
+                            0, G_SEEK_SET,
+                            handle->job->cancellable, &error))
+        {
+          g_vfs_job_failed_from_error (handle->job, error);
+          g_error_free (error);
+          soup_session_cancel_message (G_VFS_BACKEND_HTTP (handle->backend)->session_async,
+                                       handle->msg,
+                                       SOUP_STATUS_CANCELLED);
+          return;
+        }
+    }
+
+  handle->buf = g_malloc (CHUNK_SIZE);
+  g_input_stream_read_async (handle->in,
+                             handle->buf, CHUNK_SIZE,
+                             0, handle->job->cancellable,
+                             push_read_cb, handle);
+}
+
+static void
+push_setup_message (PushHandle *handle)
+{
+  soup_message_set_flags (handle->msg, SOUP_MESSAGE_CAN_REBUILD);
+  soup_message_body_set_accumulate (handle->msg->request_body, FALSE);
+  message_add_overwrite_header (handle->msg,
+                                handle->op_job->flags & G_FILE_COPY_OVERWRITE);
+  soup_message_headers_set_encoding (handle->msg->request_headers,
+                                     SOUP_ENCODING_CONTENT_LENGTH);
+  soup_message_headers_set_content_length (handle->msg->request_headers,
+                                           handle->size);
+  soup_message_headers_set_content_type (handle->msg->request_headers,
+                                         "application/octet-stream",
+                                         NULL);
+}
+
+static void
+push_restarted (SoupMessage *msg, gpointer user_data)
+{
+  PushHandle *handle = user_data;
+
+  handle->read_status = PUSH_READ_STATUS_RESET;
+       msg->method = SOUP_METHOD_PUT;
+  push_setup_message (handle);
+}
+
+static void
+push_wrote_body_data (SoupMessage *msg, SoupBuffer *chunk, gpointer user_data)
+{
+  PushHandle *handle = user_data;
+
+  handle->n_written += chunk->length;
+  g_vfs_job_progress_callback (handle->n_written, handle->size, handle->job);
+}
+
+static void
+push_done (SoupSession *session, SoupMessage *msg, gpointer user_data)
+{
+  PushHandle *handle = user_data;
+
+  if (g_vfs_job_is_finished (handle->job))
+    ; /* We got an error so we finished the job and cancelled msg. */
+  else if (!SOUP_STATUS_IS_SUCCESSFUL (msg->status_code))
+    http_job_failed (handle->job, msg);
+  else
+    {
+      if (handle->op_job->remove_source)
+        g_unlink (handle->op_job->local_path);
+
+      g_vfs_job_succeeded (handle->job);
+    }
+
+  push_handle_free (handle);
+}
+
+static void
+push_stat_dest_cb (SoupSession *session, SoupMessage *msg, gpointer user_data)
+{
+  PushHandle *handle = user_data;
+  GFileType type;
+
+  if (stat_location_finish (msg, &type, NULL))
+    {
+      if (!(handle->op_job->flags & G_FILE_COPY_OVERWRITE))
+        {
+          g_vfs_job_failed (handle->job,
+                            G_IO_ERROR,
+                            G_IO_ERROR_EXISTS,
+                            _("Target file already exists"));
+          push_handle_free (handle);
+          return;
+        }
+      if (type == G_FILE_TYPE_DIRECTORY)
+        {
+          g_vfs_job_failed (handle->job,
+                            G_IO_ERROR,
+                            G_IO_ERROR_IS_DIRECTORY,
+                            _("File is directory"));
+          push_handle_free (handle);
+          return;
+        }
+    }
+
+  handle->msg = soup_message_new_from_uri (SOUP_METHOD_PUT, handle->uri);
+  push_setup_message (handle);
+
+  g_signal_connect (handle->msg, "restarted",
+                    G_CALLBACK (push_restarted), handle);
+  g_signal_connect (handle->msg, "wrote_headers",
+                    G_CALLBACK (push_write_next_chunk), handle);
+  g_signal_connect (handle->msg, "wrote_chunk",
+                    G_CALLBACK (push_write_next_chunk), handle);
+  g_signal_connect (handle->msg, "wrote-body-data",
+                    G_CALLBACK (push_wrote_body_data), handle);
+
+  soup_session_queue_message (G_VFS_BACKEND_HTTP (handle->backend)->session_async,
+                              handle->msg,
+                              push_done, handle);
+}
+
+static void
+push_source_fstat_cb (GObject *source, GAsyncResult *res, gpointer user_data)
+{
+  GFileInputStream *fin = G_FILE_INPUT_STREAM (source);
+  PushHandle *handle = user_data;
+  GError *error = NULL;
+  GFileInfo *info;
+
+  info = g_file_input_stream_query_info_finish (fin, res, &error);
+  if (info)
+    {
+      SoupMessage *msg;
+
+      handle->size = g_file_info_get_size (info);
+      g_object_unref (info);
+
+      msg = stat_location_begin (handle->uri, FALSE);
+      http_backend_queue_message (handle->backend,
+                                  msg,
+                                  push_stat_dest_cb, handle);
+    }
+  else
+    {
+      g_vfs_job_failed_from_error (handle->job, error);
+      g_error_free (error);
+      push_handle_free (handle);
+    }
+}
+
+static void
+push_source_open_cb (GObject *source, GAsyncResult *res, gpointer user_data)
+{
+  GFile *source_file = G_FILE (source);
+  PushHandle *handle = user_data;
+  GError *error = NULL;
+  GFileInputStream *fin;
+
+  fin = g_file_read_finish (source_file, res, &error);
+  if (fin)
+    {
+      handle->in = G_INPUT_STREAM (fin);
+
+      g_file_input_stream_query_info_async (fin,
+                                            G_FILE_ATTRIBUTE_STANDARD_SIZE,
+                                            0, handle->job->cancellable,
+                                            push_source_fstat_cb, handle);
+    }
+  else
+    {
+      if (error->domain == G_IO_ERROR && error->code == G_IO_ERROR_IS_DIRECTORY)
+        {
+          /* Fall back to default implementation to improve the error message */
+          g_vfs_job_failed (handle->job, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
+                            _("Not supported"));
+        }
+      else
+        g_vfs_job_failed_from_error (handle->job, error);
+
+      g_error_free (error);
+      push_handle_free (handle);
+    }
+}
+
+static void
+push_source_lstat_cb (GObject *source, GAsyncResult *res, gpointer user_data)
+{
+  GFile *source_file = G_FILE (source);
+  PushHandle *handle = user_data;
+  GError *error = NULL;
+  GFileInfo *info;
+
+  info = g_file_query_info_finish (source_file, res, &error);
+  if (!info)
+    {
+      g_vfs_job_failed_from_error (handle->job, error);
+      g_error_free (error);
+      push_handle_free (handle);
+      return;
+    }
+
+  if ((handle->op_job->flags & G_FILE_COPY_NOFOLLOW_SYMLINKS) &&
+      g_file_info_get_file_type (info) == G_FILE_TYPE_SYMBOLIC_LINK)
+    {
+      /* Fall back to default implementation to copy symlink */
+      g_vfs_job_failed (handle->job, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
+                        _("Not supported"));
+      push_handle_free (handle);
+      g_object_unref (info);
+      return;
+    }
+
+  g_file_read_async (source_file,
+                     0, handle->job->cancellable,
+                     push_source_open_cb, handle);
+  g_object_unref (info);
+}
+
+static gboolean
+try_push (GVfsBackend *backend,
+          GVfsJobPush *job,
+          const char *destination,
+          const char *local_path,
+          GFileCopyFlags flags,
+          gboolean remove_source,
+          GFileProgressCallback progress_callback,
+          gpointer progress_callback_data)
+{
+  GFile *source;
+  PushHandle *handle;
+
+  handle = g_slice_new0 (PushHandle);
+  handle->backend = g_object_ref (backend);
+  handle->job = g_object_ref (G_VFS_JOB (job));
+  handle->op_job = job;
+  handle->uri = g_vfs_backend_dav_uri_for_path (backend, destination, FALSE);
+
+  source = g_file_new_for_path (local_path);
+  g_file_query_info_async (source,
+                           G_FILE_ATTRIBUTE_STANDARD_TYPE,
+                           G_FILE_QUERY_INFO_NOFOLLOW_SYMLINKS,
+                           0, handle->job->cancellable,
+                           push_source_lstat_cb, handle);
+  g_object_unref (source);
+
+  return TRUE;
+}
+
 /* ************************************************************************* */
 /*  */
 static void
@@ -2858,4 +3233,5 @@ g_vfs_backend_dav_class_init (GVfsBackendDavClass *klass)
   backend_class->delete            = do_delete;
   backend_class->set_display_name  = do_set_display_name;
   backend_class->move              = do_move;
+  backend_class->try_push          = try_push;
 }


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]