[gvfs] sftp: Implement pull support



commit ed826fdf386cd0891cbda5c9fc3904d2a5aba03f
Author: Ross Lagerwall <rosslagerwall gmail com>
Date:   Fri Nov 1 10:03:52 2013 +0200

    sftp: Implement pull support
    
    Implement pull support with a sliding window to improve the speed of
    sftp downloads.
    
    The implementation is based on the one from the OpenSSH sftp client.  It
    uses up to 64 outstanding read requests.  The limit of 64 is incremented
    gradually to prevent overwhelming the server.  The file is fstat()ed to
    determine the size.  When the expected size is reached, the maximum
    number of outstanding requests is reduced to 1.
    
    The implementation is complicated by the fact that reads can return
    short and they can also be serviced out of order.
    
    This patch results in substantial performance improvments, especially
    for high-latency links.  Compared to the fallback copy implementation,
    other performance improvements are achieved by performing the initial
    lstat()/stat() and open() in parallel, as well as performing the
    fstat() and initial read requests in parallel.
    
    Some benchmark figures:
    Old behavior:
    Copying from local server = 6.1MB/s
    Copying from local server with 250ms of RTT latency = 0.251MB/s
    Copying many small files with 250ms of RTT latency = 0.64 files per second
    
    New behavior:
    Copying from local server = 13MB/s
    Copying from local server with 250ms of RTT latency = 6.6MB/s
    Copying many small files with 250ms of RTT latency = 1.24 files per second
    
    OpenSSH sftp client:
    Copying from local server = 14.2MB/s
    Copying from local server with 250ms of RTT latency = 6.4MB/s
    Copying many small files with 250ms of RTT latency = 1.34 files per second
    
    https://bugzilla.gnome.org/show_bug.cgi?id=532951

 daemon/gvfsbackendsftp.c |  534 ++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 534 insertions(+), 0 deletions(-)
---
diff --git a/daemon/gvfsbackendsftp.c b/daemon/gvfsbackendsftp.c
index 36beade..17ba4da 100644
--- a/daemon/gvfsbackendsftp.c
+++ b/daemon/gvfsbackendsftp.c
@@ -63,6 +63,7 @@
 #include "gvfsjobmakedirectory.h"
 #include "gvfsjobprogress.h"
 #include "gvfsjobpush.h"
+#include "gvfsjobpull.h"
 #include "gvfsdaemonprotocol.h"
 #include "gvfskeyring.h"
 #include "sftp.h"
@@ -5396,6 +5397,538 @@ try_push (GVfsBackend *backend,
   return TRUE;
 }
 
+/* The pull sliding window mechanism is based on the one from the OpenSSH sftp
+ * client. It is complicated because requests can be returned out of order. */
+
+#define PULL_MAX_REQUESTS 64  /* Never have more than this many requests outstanding */
+#define PULL_BLOCKSIZE 32768  /* Request this much data per request */
+#define PULL_SIZE_INCOMPLETE -1  /* Indicates an incomplete fstat() request */
+#define PULL_SIZE_INVALID -2  /* Indicates that no fstat() request is in progress */
+
+typedef struct {
+  /* initial job information */
+  GVfsBackendSftp *backend;
+  GVfsJob *job;
+  GVfsJobPull *op_job;
+  GFile *dest;
+
+  /* Open files */
+  DataBuffer *raw_handle;
+  GOutputStream *output;
+
+  /* fstat information */
+  goffset size;
+  guint32 mode;
+
+  /* state */
+  goffset offset;
+  goffset n_written;
+  int num_req; /* Number of outstanding read requests */
+  int max_req; /* Current maximum number of outstanding read requests */
+  GList *queued_writes;
+} SftpPullHandle;
+
+typedef struct {
+  SftpPullHandle *handle;
+  guint32 request_len;    /* number of bytes requested */
+  guint64 request_offset; /* offset of requested bytes */
+  gssize response_len;     /* number of bytes returned */
+  gssize write_offset;     /* offset in buffer of bytes written so far */
+  char *buffer;
+} PullRequest;
+
+static void
+pull_enqueue_next_request (SftpPullHandle *handle);
+
+static void
+pull_enqueue_request (SftpPullHandle *handle, guint64 offset, guint32 len);
+
+static void
+pull_try_start_write (SftpPullHandle *handle);
+
+static void
+pull_request_free (PullRequest *request)
+{
+  if (request->buffer)
+      g_slice_free1 (request->response_len, request->buffer);
+  g_slice_free (PullRequest, request);
+}
+
+static void
+sftp_pull_handle_free (SftpPullHandle *handle)
+{
+  if (handle->size != PULL_SIZE_INCOMPLETE && /* fstat complete */
+      (!handle->output || !g_output_stream_has_pending (handle->output)) && /* no writes outstanding */
+      handle->num_req == 0) /* no reads oustanding */
+    {
+      if (handle->raw_handle)
+        {
+          GDataOutputStream *command = new_command_stream (handle->backend, SSH_FXP_CLOSE);
+          put_data_buffer (command, handle->raw_handle);
+          queue_command_stream_and_free (handle->backend, command, NULL, handle->job, NULL);
+          data_buffer_free (handle->raw_handle);
+        }
+      g_clear_object (&handle->output);
+      g_object_unref(handle->backend);
+      g_object_unref(handle->op_job);
+      g_object_unref(handle->dest);
+      g_list_free_full (handle->queued_writes, (GDestroyNotify)pull_request_free);
+      g_slice_free (SftpPullHandle, handle);
+    }
+}
+
+static void
+pull_remove_source_reply (GVfsBackendSftp *backend,
+                          int reply_type,
+                          GDataInputStream *reply,
+                          guint32 len,
+                          GVfsJob *job,
+                          gpointer user_data)
+{
+  if (reply_type == SSH_FXP_STATUS)
+    result_from_status (job, reply, -1, -1);
+  else
+    g_vfs_job_failed (job, G_IO_ERROR, G_IO_ERROR_FAILED,
+                      _("Invalid reply received"));
+}
+
+static void
+pull_set_perms_cb (GObject *source, GAsyncResult *res, gpointer user_data)
+{
+  SftpPullHandle *handle = user_data;
+
+  if (handle->op_job->remove_source)
+    {
+      GDataOutputStream *command = new_command_stream (handle->backend, SSH_FXP_REMOVE);
+      put_string (command, handle->op_job->source);
+      queue_command_stream_and_free (handle->backend,
+                                     command,
+                                     pull_remove_source_reply,
+                                     handle->job,
+                                     NULL);
+    }
+  else
+    g_vfs_job_succeeded (handle->job);
+
+  sftp_pull_handle_free (handle);
+}
+
+static void
+pull_close_cb (GObject *source, GAsyncResult *res, gpointer user_data)
+{
+  SftpPullHandle *handle = user_data;
+  GError *error = NULL;
+
+  if (g_output_stream_close_finish(handle->output, res, &error))
+    {
+      g_vfs_job_progress_callback (handle->n_written, handle->n_written, handle->job);
+
+      if (handle->size >= 0 && !(handle->op_job->flags & G_FILE_COPY_TARGET_DEFAULT_PERMS))
+        {
+          GFileInfo *info = g_file_info_new ();
+          g_file_info_set_attribute_uint32 (info,
+                                            G_FILE_ATTRIBUTE_UNIX_MODE,
+                                            handle->mode);
+          g_file_set_attributes_async (handle->dest,
+                                       info,
+                                       G_FILE_QUERY_INFO_NONE,
+                                       G_PRIORITY_DEFAULT,
+                                       NULL,
+                                       pull_set_perms_cb, handle);
+          g_object_unref (info);
+          return;
+        }
+
+      if (handle->op_job->remove_source)
+        {
+          GDataOutputStream *command = new_command_stream (handle->backend, SSH_FXP_REMOVE);
+          put_string (command, handle->op_job->source);
+          queue_command_stream_and_free (handle->backend,
+                                         command,
+                                         pull_remove_source_reply,
+                                         handle->job,
+                                         NULL);
+        }
+      else
+        g_vfs_job_succeeded (handle->job);
+    }
+  else
+    {
+      g_vfs_job_failed_from_error (handle->job, error);
+      g_error_free (error);
+    }
+
+  sftp_pull_handle_free (handle);
+}
+
+static void
+pull_try_finish (SftpPullHandle *handle)
+{
+  if (handle->max_req == 0 && /* received EOF */
+      handle->size != PULL_SIZE_INCOMPLETE && /* fstat complete */
+      !g_output_stream_has_pending (handle->output) && /* no writes outstanding */
+      handle->num_req == 0) /* no reads oustanding */
+    {
+      g_output_stream_close_async (handle->output,
+                                   G_PRIORITY_DEFAULT,
+                                   NULL,
+                                   pull_close_cb, handle);
+    }
+}
+
+static void
+pull_write_cb (GObject *source, GAsyncResult *res, gpointer user_data)
+{
+  PullRequest *request = user_data;
+  SftpPullHandle *handle = request->handle;
+  GError *error = NULL;
+  gssize n_written;
+
+  n_written = g_output_stream_write_finish (handle->output, res, &error);
+  if (n_written == -1)
+    {
+      g_vfs_job_failed_from_error (handle->job, error);
+      g_error_free (error);
+      pull_request_free (request);
+      sftp_pull_handle_free (handle);
+      return;
+    }
+
+  handle->n_written += n_written;
+  request->write_offset += n_written;
+
+  /* If we didn't write everything, do another write */
+  if (request->write_offset < request->response_len)
+    {
+      g_output_stream_write_async (handle->output,
+                                   request->buffer + request->write_offset,
+                                   request->response_len - request->write_offset,
+                                   G_PRIORITY_DEFAULT,
+                                   NULL,
+                                   pull_write_cb, request);
+      return;
+    }
+
+  if (handle->size >= 0)
+    g_vfs_job_progress_callback (handle->n_written, handle->size, handle->job);
+
+  pull_try_start_write (handle);
+
+  /* If we read short, issue another request for the remaining data. */
+  if (request->response_len < request->request_len)
+    pull_enqueue_request (handle,
+                          request->request_offset + request->response_len,
+                          request->request_len - request->response_len);
+  else if (handle->max_req == 0)
+    pull_try_finish (handle);
+  else
+    {
+      /* Once we have requested past the estimated EOF, request one at a
+       * time.  Otherwise try increase the number of concurrent requests. */
+      if (handle->offset > handle->size)
+        handle->max_req = 1;
+      else if (handle->max_req < PULL_MAX_REQUESTS)
+        handle->max_req++;
+
+      while (handle->num_req < handle->max_req)
+        pull_enqueue_next_request (handle);
+    }
+
+  pull_request_free (request);
+}
+
+static void
+pull_try_start_write (SftpPullHandle *handle)
+{
+  GError *error = NULL;
+  PullRequest *request;
+
+  if (g_output_stream_has_pending (handle->output))
+    return;
+
+  if (!handle->queued_writes)
+    return;
+
+  request = handle->queued_writes->data;
+  handle->queued_writes = g_list_delete_link (handle->queued_writes, handle->queued_writes);
+
+  if (!g_seekable_seek (G_SEEKABLE (handle->output),
+                        request->request_offset, G_SEEK_SET,
+                        NULL, &error))
+    {
+      g_vfs_job_failed_from_error (handle->job, error);
+      g_error_free (error);
+      pull_request_free (request);
+      sftp_pull_handle_free (handle);
+      return;
+    }
+
+  g_output_stream_write_async (handle->output,
+                               request->buffer, request->response_len,
+                               G_PRIORITY_DEFAULT,
+                               NULL,
+                               pull_write_cb, request);
+}
+
+static void
+pull_read_reply (GVfsBackendSftp *backend,
+                 int reply_type,
+                 GDataInputStream *reply,
+                 guint32 len,
+                 GVfsJob *job,
+                 gpointer user_data)
+{
+  PullRequest *request = user_data;
+  SftpPullHandle *handle = request->handle;
+
+  handle->num_req--;
+
+  if (g_vfs_job_is_finished (job) || g_vfs_job_is_cancelled (job))
+    {
+    }
+  else if (reply_type == SSH_FXP_STATUS)
+    {
+      guint32 code = read_status_code (reply);
+      if (code == SSH_FX_EOF)
+        {
+          pull_request_free (request);
+          handle->max_req = 0;
+          pull_try_finish (handle);
+          return;
+        }
+      else
+        result_from_status_code (job, code, -1, -1);
+    }
+  else if (reply_type != SSH_FXP_DATA)
+    {
+      g_vfs_job_failed (job, G_IO_ERROR, G_IO_ERROR_FAILED,
+                        _("Invalid reply received"));
+    }
+  else
+    {
+      request->response_len = g_data_input_stream_read_uint32 (reply, NULL, NULL);
+      request->buffer = g_slice_alloc (request->response_len);
+
+      if (g_input_stream_read_all (G_INPUT_STREAM (reply),
+                                   request->buffer, request->response_len,
+                                   NULL, NULL, NULL))
+        {
+          handle->queued_writes = g_list_append (handle->queued_writes, request);
+          pull_try_start_write (handle);
+          return;
+        }
+      else
+        g_vfs_job_failed (job, G_IO_ERROR, G_IO_ERROR_FAILED,
+                          _("Invalid reply received"));
+    }
+
+  pull_request_free (request);
+  sftp_pull_handle_free (handle);
+}
+
+static void
+pull_enqueue_request (SftpPullHandle *handle, guint64 offset, guint32 len)
+{
+  PullRequest *request;
+  GDataOutputStream *command;
+
+  request = g_slice_new0 (PullRequest);
+  request->handle = handle;
+  request->request_len = len;
+  request->request_offset = offset;
+
+  command = new_command_stream (handle->backend, SSH_FXP_READ);
+  put_data_buffer (command, handle->raw_handle);
+  g_data_output_stream_put_uint64 (command, offset, NULL, NULL);
+  g_data_output_stream_put_uint32 (command, len, NULL, NULL);
+  queue_command_stream_and_free (handle->backend, command, pull_read_reply, handle->job, request);
+
+  handle->num_req++;
+}
+
+static void
+pull_enqueue_next_request (SftpPullHandle *handle)
+{
+  pull_enqueue_request (handle, handle->offset, PULL_BLOCKSIZE);
+  handle->offset += PULL_BLOCKSIZE;
+}
+
+static void
+pull_fstat_reply (GVfsBackendSftp *backend,
+                  int reply_type,
+                  GDataInputStream *reply,
+                  guint32 len,
+                  GVfsJob *job,
+                  gpointer user_data)
+{
+  SftpPullHandle *handle = user_data;
+
+  if (g_vfs_job_is_finished (job) || g_vfs_job_is_cancelled (job))
+    {
+      handle->size = PULL_SIZE_INVALID;
+      sftp_pull_handle_free (handle);
+      return;
+    }
+
+  if (reply_type == SSH_FXP_ATTRS)
+    {
+      GFileInfo *info = g_file_info_new ();
+      parse_attributes (backend, info, NULL, reply, NULL);
+      handle->size = g_file_info_get_size (info);
+      handle->mode = g_file_info_get_attribute_uint32 (info,
+                                                       G_FILE_ATTRIBUTE_UNIX_MODE);
+      g_object_unref (info);
+    }
+  else
+    handle->size = PULL_SIZE_INVALID;
+
+  pull_try_finish (handle);
+}
+
+static void
+pull_dest_open_cb (GObject *source, GAsyncResult *res, gpointer user_data)
+{
+  SftpPullHandle *handle = user_data;
+  GError *error = NULL;
+
+  if (handle->op_job->flags & G_FILE_COPY_OVERWRITE)
+    handle->output = G_OUTPUT_STREAM (g_file_replace_finish (handle->dest,
+                                                             res,
+                                                             &error));
+  else
+    handle->output = G_OUTPUT_STREAM (g_file_create_finish (handle->dest,
+                                                            res,
+                                                            &error));
+  if (handle->output)
+    {
+      /* Do an fstat() to find out the size and mode of the file. */
+      GDataOutputStream *command = new_command_stream (handle->backend,
+                                                       SSH_FXP_FSTAT);
+      put_data_buffer (command, handle->raw_handle);
+      queue_command_stream_and_free (handle->backend,
+                                     command,
+                                     pull_fstat_reply,
+                                     handle->job,
+                                     handle);
+      handle->size = PULL_SIZE_INCOMPLETE;
+
+      while (handle->num_req < handle->max_req)
+        pull_enqueue_next_request (handle);
+    }
+  else
+    {
+      g_vfs_job_failed_from_error (handle->job, error);
+      g_error_free (error);
+      sftp_pull_handle_free (handle);
+    }
+}
+
+static void
+pull_open_reply (GVfsBackendSftp *backend,
+                 MultiReply *replies,
+                 int n_replies,
+                 GVfsJob *job,
+                 gpointer user_data)
+{
+  SftpPullHandle *handle = user_data;
+
+  if (replies[0].type == SSH_FXP_ATTRS)
+    {
+      GFileType type;
+      GFileInfo *info = g_file_info_new ();
+
+      parse_attributes (backend, info, NULL, replies[0].data, NULL);
+      type = g_file_info_get_file_type (info);
+      g_object_unref (info);
+
+      if (type != G_FILE_TYPE_REGULAR)
+        {
+          /* Fall back to default implementation to copy non-regular files */
+          g_vfs_job_failed (job, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
+                            _("Not supported"));
+        }
+      else if (replies[1].type == SSH_FXP_STATUS)
+        result_from_status (job, replies[1].data, -1, -1);
+      else if (replies[1].type != SSH_FXP_HANDLE)
+        g_vfs_job_failed (job, G_IO_ERROR, G_IO_ERROR_FAILED,
+                          _("Invalid reply received"));
+      else
+        {
+          /* We got a valid file handle. */
+          handle->raw_handle = read_data_buffer (replies[1].data);
+
+          if (handle->op_job->flags & G_FILE_COPY_OVERWRITE)
+            g_file_replace_async (handle->dest,
+                                  NULL,
+                                  handle->op_job->flags & G_FILE_COPY_BACKUP ? TRUE : FALSE,
+                                  G_FILE_CREATE_REPLACE_DESTINATION,
+                                  G_PRIORITY_DEFAULT,
+                                  NULL,
+                                  pull_dest_open_cb, handle);
+          else
+            g_file_create_async (handle->dest,
+                                 G_FILE_CREATE_NONE,
+                                 G_PRIORITY_DEFAULT,
+                                 NULL,
+                                 pull_dest_open_cb, handle);
+          return;
+        }
+    }
+  else if (replies[0].type == SSH_FXP_STATUS)
+    result_from_status (job, replies[0].data, -1, -1);
+  else
+    g_vfs_job_failed (job,
+                      G_IO_ERROR, G_IO_ERROR_FAILED,
+                      "%s", _("Invalid reply received"));
+
+  /* If we got a file handle, store it.  It will be closed when the
+   * SftpPushHandle is freed. */
+  if (replies[1].type == SSH_FXP_HANDLE && !handle->raw_handle)
+    handle->raw_handle = read_data_buffer (replies[1].data);
+
+  sftp_pull_handle_free (handle);
+}
+
+static gboolean
+try_pull (GVfsBackend *backend,
+          GVfsJobPull *job,
+          const char *source,
+          const char *local_path,
+          GFileCopyFlags flags,
+          gboolean remove_source,
+          GFileProgressCallback progress_callback,
+          gpointer progress_callback_data)
+{
+  GVfsBackendSftp *op_backend = G_VFS_BACKEND_SFTP (backend);
+  SftpPullHandle *handle;
+  GDataOutputStream *commands[2];
+
+  handle = g_slice_new0 (SftpPullHandle);
+  handle->backend = g_object_ref (op_backend);
+  handle->op_job = g_object_ref (job);
+  handle->job = G_VFS_JOB (job);
+  handle->dest = g_file_new_for_path (local_path);
+  handle->size = PULL_SIZE_INVALID;
+  handle->max_req = 1;
+
+  commands[0] = new_command_stream (op_backend,
+                                    flags & G_FILE_COPY_NOFOLLOW_SYMLINKS ? SSH_FXP_LSTAT : SSH_FXP_STAT);
+  put_string (commands[0], source);
+
+  commands[1] = new_command_stream (op_backend, SSH_FXP_OPEN);
+  put_string (commands[1], source);
+  g_data_output_stream_put_uint32 (commands[1], SSH_FXF_READ, NULL, NULL);
+  g_data_output_stream_put_uint32 (commands[1], 0, NULL, NULL);
+
+  queue_command_streams_and_free (op_backend,
+                                  commands, 2,
+                                  pull_open_reply,
+                                  G_VFS_JOB(job),
+                                  handle);
+
+  return TRUE;
+}
+
 static void
 g_vfs_backend_sftp_class_init (GVfsBackendSftpClass *klass)
 {
@@ -5431,4 +5964,5 @@ g_vfs_backend_sftp_class_init (GVfsBackendSftpClass *klass)
   backend_class->try_query_settable_attributes = try_query_settable_attributes;
   backend_class->try_set_attribute = try_set_attribute;
   backend_class->try_push = try_push;
+  backend_class->try_pull = try_pull;
 }


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]