[gvfs] sftp: Add support for multiple connections



commit d1e64f442068364b9352e96f8b7cc1cd83c66311
Author: Ross Lagerwall <rosslagerwall gmail com>
Date:   Sun Feb 8 10:31:02 2015 +0000

    sftp: Add support for multiple connections
    
    Multiple connections are needed since 7890d2801a7f ("sftp: Implement
    push support") and ed826fdf386c ("sftp: Implement pull support"). This
    is because up to 2 MiB of data may be in flight, so any "command" sent
    (e.g. refreshing the view in Nautilus) gets queued up which makes the
    mount unusable on a slow connection when copying at the same time.
    
    This changes the code to support multiple connections but still uses
    a single connection.
    
    https://bugzilla.gnome.org/show_bug.cgi?id=747128

 daemon/gvfsbackendsftp.c |  513 +++++++++++++++++++++++++++++-----------------
 1 files changed, 327 insertions(+), 186 deletions(-)
---
diff --git a/daemon/gvfsbackendsftp.c b/daemon/gvfsbackendsftp.c
index 3044626..cbdd1da 100644
--- a/daemon/gvfsbackendsftp.c
+++ b/daemon/gvfsbackendsftp.c
@@ -160,6 +160,27 @@ typedef struct {
   gpointer user_data;
 } ExpectedReply;
 
+typedef struct {
+  GVfsBackendSftp *op_backend;
+
+  GOutputStream *command_stream;
+  GInputStream *reply_stream;
+  GDataInputStream *error_stream;
+
+  GCancellable *reply_stream_cancellable;
+
+  /* Output Queue */
+
+  gsize command_bytes_written;
+  GList *command_queue;
+
+  /* Reply reading: */
+  GHashTable *expected_replies;
+  guint32 reply_size;
+  guint32 reply_size_read;
+  guint8 *reply;
+} Connection;
+
 struct _GVfsBackendSftp
 {
   GVfsBackend parent_instance;
@@ -178,26 +199,11 @@ struct _GVfsBackendSftp
   
   int protocol_version;
   SFTPServerExtensions extensions;
-  
-  GOutputStream *command_stream;
-  GInputStream *reply_stream;
-  GDataInputStream *error_stream;
-
-  GCancellable *reply_stream_cancellable;
 
   guint32 current_id;
-  
-  /* Output Queue */
-  
-  gsize command_bytes_written;
-  GList *command_queue;
-  
-  /* Reply reading: */
-  GHashTable *expected_replies;
-  guint32 reply_size;
-  guint32 reply_size_read;
-  guint8 *reply;
-  
+
+  Connection command_connection;
+
   GMountSource *mount_source; /* Only used/set during mount */
   int mount_try;
   gboolean mount_try_again;
@@ -269,26 +275,24 @@ has_extension (GVfsBackendSftp *backend, SFTPServerExtensions extension)
 }
 
 static void
+destroy_connection (Connection *conn)
+{
+  g_hash_table_destroy (conn->expected_replies);
+
+  g_clear_object (&conn->command_stream);
+  g_clear_object (&conn->reply_stream_cancellable);
+  g_clear_object (&conn->reply_stream);
+  g_clear_object (&conn->error_stream);
+}
+
+static void
 g_vfs_backend_sftp_finalize (GObject *object)
 {
   GVfsBackendSftp *backend;
 
   backend = G_VFS_BACKEND_SFTP (object);
+  destroy_connection (&backend->command_connection);
 
-  g_hash_table_destroy (backend->expected_replies);
-  
-  if (backend->command_stream)
-    g_object_unref (backend->command_stream);
-  
-  if (backend->reply_stream_cancellable)
-    g_object_unref (backend->reply_stream_cancellable);
-
-  if (backend->reply_stream)
-    g_object_unref (backend->reply_stream);
-  
-  if (backend->error_stream)
-    g_object_unref (backend->error_stream);
-  
   if (G_OBJECT_CLASS (g_vfs_backend_sftp_parent_class)->finalize)
     (*G_OBJECT_CLASS (g_vfs_backend_sftp_parent_class)->finalize) (object);
 }
@@ -303,18 +307,20 @@ expected_reply_free (ExpectedReply *reply)
 static void
 g_vfs_backend_sftp_init (GVfsBackendSftp *backend)
 {
-  backend->expected_replies = g_hash_table_new_full (NULL, NULL, NULL, (GDestroyNotify)expected_reply_free);
+  backend->command_connection.expected_replies = g_hash_table_new_full (NULL,
+                                                                        NULL,
+                                                                        NULL,
+                                                                        (GDestroyNotify)expected_reply_free);
 }
 
 static void
-look_for_stderr_errors (GVfsBackend *backend, GError **error)
+look_for_stderr_errors (Connection *conn, GError **error)
 {
-  GVfsBackendSftp *op_backend = G_VFS_BACKEND_SFTP (backend);
   char *line;
 
   while (1)
     {
-      line = g_data_input_stream_read_line (op_backend->error_stream, NULL, NULL, NULL);
+      line = g_data_input_stream_read_line (conn->error_stream, NULL, NULL, NULL);
       
       if (line == NULL)
         {
@@ -603,7 +609,7 @@ get_data_from_command_stream (GDataOutputStream *command_stream, gsize *len)
 }
 
 static gboolean
-send_command_sync_and_unref_command (GVfsBackendSftp *backend,
+send_command_sync_and_unref_command (Connection *conn,
                                      GDataOutputStream *command_stream,
                                      GCancellable *cancellable,
                                      GError **error)
@@ -615,7 +621,7 @@ send_command_sync_and_unref_command (GVfsBackendSftp *backend,
   
   data = get_data_from_command_stream (command_stream, &len);
 
-  res = g_output_stream_write_all (backend->command_stream,
+  res = g_output_stream_write_all (conn->command_stream,
                                    data, len,
                                    &bytes_written,
                                    cancellable, error);
@@ -668,14 +674,14 @@ make_reply_stream (guint8 *data, gsize len)
 }
 
 static GDataInputStream *
-read_reply_sync (GVfsBackendSftp *backend, gsize *len_out, GError **error)
+read_reply_sync (Connection *conn, gsize *len_out, GError **error)
 {
   guint32 len;
   gsize bytes_read;
   GByteArray *array;
   guint8 *data;
   
-  if (!g_input_stream_read_all (backend->reply_stream,
+  if (!g_input_stream_read_all (conn->reply_stream,
                                &len, 4,
                                &bytes_read, NULL, error))
     return NULL;
@@ -694,7 +700,7 @@ read_reply_sync (GVfsBackendSftp *backend, gsize *len_out, GError **error)
   
   array = g_byte_array_sized_new (len);
 
-  if (!g_input_stream_read_all (backend->reply_stream,
+  if (!g_input_stream_read_all (conn->reply_stream,
                                array->data, len,
                                &bytes_read, NULL, error))
     {
@@ -1274,17 +1280,23 @@ handle_login (GVfsBackend *backend,
 }
 
 static void
-fail_jobs_and_unmount (GVfsBackendSftp *backend, GError *error)
+fail_jobs (Connection *conn, GError *error)
 {
   GHashTableIter iter;
   gpointer key, value;
 
-  g_hash_table_iter_init (&iter, backend->expected_replies);
+  g_hash_table_iter_init (&iter, conn->expected_replies);
   while (g_hash_table_iter_next (&iter, &key, &value))
     {
       ExpectedReply *expected_reply = (ExpectedReply *) value;
       g_vfs_job_failed_from_error (expected_reply->job, error);
     }
+}
+
+static void
+fail_jobs_and_unmount (GVfsBackendSftp *backend, GError *error)
+{
+  fail_jobs (&backend->command_connection, error);
 
   g_error_free (error);
 
@@ -1292,7 +1304,7 @@ fail_jobs_and_unmount (GVfsBackendSftp *backend, GError *error)
 }
 
 static int
-check_input_stream_read_result (GVfsBackendSftp *backend, gssize res, GError *error)
+check_input_stream_read_result (Connection *conn, gssize res, GError *error)
 {
   if (G_UNLIKELY (res <= 0))
     {
@@ -1304,21 +1316,21 @@ check_input_stream_read_result (GVfsBackendSftp *backend, gssize res, GError *er
                                 : _("Internal error: Unknown Error"));
         }
 
-      fail_jobs_and_unmount (backend, error);
+      fail_jobs_and_unmount (conn->op_backend, error);
       return -1;
     }
 
   return 0;
 }
 
-static void read_reply_async (GVfsBackendSftp *backend);
+static void read_reply_async (Connection *conn);
 
 static void
 read_reply_async_got_data  (GObject *source_object,
                             GAsyncResult *result,
                             gpointer user_data)
 {
-  GVfsBackendSftp *backend = user_data;
+  Connection *conn = user_data;
   gssize res;
   GDataInputStream *reply;
   ExpectedReply *expected_reply;
@@ -1331,39 +1343,39 @@ read_reply_async_got_data  (GObject *source_object,
 
   /* If we got an error, we've already called force_unmount so don't do
    * anything further. */
-  if (check_input_stream_read_result (backend, res, error) == -1)
+  if (check_input_stream_read_result (conn, res, error) == -1)
     return;
 
-  backend->reply_size_read += res;
+  conn->reply_size_read += res;
 
-  if (backend->reply_size_read < backend->reply_size)
+  if (conn->reply_size_read < conn->reply_size)
     {
-      g_input_stream_read_async (backend->reply_stream,
-                                backend->reply + backend->reply_size_read, backend->reply_size - 
backend->reply_size_read,
-                                0, NULL, read_reply_async_got_data, backend);
+      g_input_stream_read_async (conn->reply_stream,
+                                conn->reply + conn->reply_size_read, conn->reply_size - 
conn->reply_size_read,
+                                0, NULL, read_reply_async_got_data, conn);
       return;
     }
 
-  reply = make_reply_stream (backend->reply, backend->reply_size);
-  backend->reply = NULL;
+  reply = make_reply_stream (conn->reply, conn->reply_size);
+  conn->reply = NULL;
 
   type = g_data_input_stream_read_byte (reply, NULL, NULL);
   id = g_data_input_stream_read_uint32 (reply, NULL, NULL);
 
-  expected_reply = g_hash_table_lookup (backend->expected_replies, GINT_TO_POINTER (id));
+  expected_reply = g_hash_table_lookup (conn->expected_replies, GINT_TO_POINTER (id));
   if (expected_reply)
     {
       if (expected_reply->callback != NULL)
-        (expected_reply->callback) (backend, type, reply, backend->reply_size,
+        (expected_reply->callback) (conn->op_backend, type, reply, conn->reply_size,
                                     expected_reply->job, expected_reply->user_data);
-      g_hash_table_remove (backend->expected_replies, GINT_TO_POINTER (id));
+      g_hash_table_remove (conn->expected_replies, GINT_TO_POINTER (id));
     }
   else
-    g_warning ("Got unhandled reply of size %"G_GUINT32_FORMAT" for id %"G_GUINT32_FORMAT"\n", 
backend->reply_size, id);
+    g_warning ("Got unhandled reply of size %"G_GUINT32_FORMAT" for id %"G_GUINT32_FORMAT"\n", 
conn->reply_size, id);
 
   g_object_unref (reply);
 
-  read_reply_async (backend);
+  read_reply_async (conn);
   
 }
 
@@ -1372,7 +1384,7 @@ read_reply_async_got_len  (GObject *source_object,
                            GAsyncResult *result,
                            gpointer user_data)
 {
-  GVfsBackendSftp *backend = user_data;
+  Connection *conn = user_data;
   gssize res;
   GError *error;
 
@@ -1383,53 +1395,53 @@ read_reply_async_got_len  (GObject *source_object,
   if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED))
     {
       g_error_free (error);
-      g_object_unref (backend);
+      g_object_unref (conn->op_backend);
       return;
     }
 
   /* If we got an error, we've already called force_unmount so don't do
    * anything further. */
-  if (check_input_stream_read_result (backend, res, error) == -1)
+  if (check_input_stream_read_result (conn, res, error) == -1)
     return;
 
-  backend->reply_size_read += res;
+  conn->reply_size_read += res;
 
-  if (backend->reply_size_read < 4)
+  if (conn->reply_size_read < 4)
     {
-      g_input_stream_read_async (backend->reply_stream,
-                                (char *)&backend->reply_size + backend->reply_size_read, 4 - 
backend->reply_size_read,
-                                0, backend->reply_stream_cancellable, read_reply_async_got_len,
-                                backend);
+      g_input_stream_read_async (conn->reply_stream,
+                                (char *)&conn->reply_size + conn->reply_size_read, 4 - conn->reply_size_read,
+                                0, conn->reply_stream_cancellable, read_reply_async_got_len,
+                                conn);
       return;
     }
-  backend->reply_size = GUINT32_FROM_BE (backend->reply_size);
+  conn->reply_size = GUINT32_FROM_BE (conn->reply_size);
 
-  backend->reply_size_read = 0;
-  backend->reply = g_malloc (backend->reply_size);
-  g_input_stream_read_async (backend->reply_stream,
-                            backend->reply, backend->reply_size,
-                            0, NULL, read_reply_async_got_data, backend);
+  conn->reply_size_read = 0;
+  conn->reply = g_malloc (conn->reply_size);
+  g_input_stream_read_async (conn->reply_stream,
+                            conn->reply, conn->reply_size,
+                            0, NULL, read_reply_async_got_data, conn);
 }
 
 static void
-read_reply_async (GVfsBackendSftp *backend)
+read_reply_async (Connection *conn)
 {
-  backend->reply_size_read = 0;
-  g_input_stream_read_async (backend->reply_stream,
-                             &backend->reply_size, 4,
-                             0, backend->reply_stream_cancellable,
+  conn->reply_size_read = 0;
+  g_input_stream_read_async (conn->reply_stream,
+                             &conn->reply_size, 4,
+                             0, conn->reply_stream_cancellable,
                              read_reply_async_got_len,
-                             backend);
+                             conn);
 }
 
-static void send_command (GVfsBackendSftp *backend);
+static void send_command (Connection *conn);
 
 static void
 send_command_data (GObject *source_object,
                    GAsyncResult *result,
                    gpointer user_data)
 {
-  GVfsBackendSftp *backend = user_data;
+  Connection *conn = user_data;
   gssize res;
   DataBuffer *buffer;
 
@@ -1438,53 +1450,53 @@ send_command_data (GObject *source_object,
   if (res <= 0)
     {
       g_warning ("Error sending command");
-      g_vfs_backend_force_unmount ((GVfsBackend*)backend);
+      g_vfs_backend_force_unmount ((GVfsBackend*)conn->op_backend);
       return;
     }
 
-  buffer = backend->command_queue->data;
+  buffer = conn->command_queue->data;
   
-  backend->command_bytes_written += res;
+  conn->command_bytes_written += res;
 
-  if (backend->command_bytes_written < buffer->size)
+  if (conn->command_bytes_written < buffer->size)
     {
-      g_output_stream_write_async (backend->command_stream,
-                                   buffer->data + backend->command_bytes_written,
-                                   buffer->size - backend->command_bytes_written,
+      g_output_stream_write_async (conn->command_stream,
+                                   buffer->data + conn->command_bytes_written,
+                                   buffer->size - conn->command_bytes_written,
                                    0,
                                    NULL,
                                    send_command_data,
-                                   backend);
+                                   conn);
       return;
     }
 
   data_buffer_free (buffer);
 
-  backend->command_queue = g_list_delete_link (backend->command_queue, backend->command_queue);
+  conn->command_queue = g_list_delete_link (conn->command_queue, conn->command_queue);
 
-  if (backend->command_queue != NULL)
-    send_command (backend);
+  if (conn->command_queue != NULL)
+    send_command (conn);
 }
 
 static void
-send_command (GVfsBackendSftp *backend)
+send_command (Connection *conn)
 {
   DataBuffer *buffer;
 
-  buffer = backend->command_queue->data;
+  buffer = conn->command_queue->data;
   
-  backend->command_bytes_written = 0;
-  g_output_stream_write_async (backend->command_stream,
+  conn->command_bytes_written = 0;
+  g_output_stream_write_async (conn->command_stream,
                                buffer->data,
                                buffer->size,
                                0,
                                NULL,
                                send_command_data,
-                               backend);
+                               conn);
 }
 
 static void
-expect_reply (GVfsBackendSftp *backend,
+expect_reply (Connection *conn,
               guint32 id,
               ReplyCallback callback,
               GVfsJob *job,
@@ -1497,7 +1509,7 @@ expect_reply (GVfsBackendSftp *backend,
   expected->job = g_object_ref (job);
   expected->user_data = user_data;
 
-  g_hash_table_replace (backend->expected_replies, GINT_TO_POINTER (id), expected);
+  g_hash_table_replace (conn->expected_replies, GINT_TO_POINTER (id), expected);
 }
 
 static DataBuffer *
@@ -1513,21 +1525,21 @@ data_buffer_new (guchar *data, gsize len)
 }
 
 static void
-queue_command_buffer (GVfsBackendSftp *backend,
+queue_command_buffer (Connection *conn,
                       DataBuffer *buffer)
 {
   gboolean first;
   
-  first = backend->command_queue == NULL;
+  first = conn->command_queue == NULL;
 
-  backend->command_queue = g_list_append (backend->command_queue, buffer);
+  conn->command_queue = g_list_append (conn->command_queue, buffer);
   
   if (first)
-    send_command (backend);
+    send_command (conn);
 }
 
 static void
-queue_command_stream_and_free (GVfsBackendSftp *backend,
+queue_command_stream_and_free (Connection *conn,
                                GDataOutputStream *command_stream,
                                ReplyCallback callback,
                                GVfsJob *job,
@@ -1544,8 +1556,8 @@ queue_command_stream_and_free (GVfsBackendSftp *backend,
   buffer = data_buffer_new (data, len);
   g_object_unref (command_stream);
 
-  expect_reply (backend, id, callback, job, user_data);
-  queue_command_buffer (backend, buffer);
+  expect_reply (conn, id, callback, job, user_data);
+  queue_command_buffer (conn, buffer);
 }
 
 
@@ -1594,7 +1606,7 @@ multi_request_cb (GVfsBackendSftp *backend,
 }
 
 static void
-queue_command_streams_and_free (GVfsBackendSftp *backend,
+queue_command_streams_and_free (Connection *conn,
                                 GDataOutputStream **commands,
                                 int n_commands,
                                 MultiReplyCallback callback,
@@ -1618,7 +1630,7 @@ queue_command_streams_and_free (GVfsBackendSftp *backend,
     {
       reply = &data->replies[i];
       reply->request = data;
-      queue_command_stream_and_free (backend,
+      queue_command_stream_and_free (conn,
                                      commands[i],
                                      multi_request_cb,
                                      job,
@@ -1635,9 +1647,11 @@ get_uid_sync (GVfsBackendSftp *backend)
   
   command = new_command_stream (backend, SSH_FXP_STAT);
   put_string (command, ".");
-  send_command_sync_and_unref_command (backend, command, NULL, NULL);
+  send_command_sync_and_unref_command (&backend->command_connection,
+                                       command,
+                                       NULL, NULL);
 
-  reply = read_reply_sync (backend, NULL, NULL);
+  reply = read_reply_sync (&backend->command_connection, NULL, NULL);
   if (reply == NULL)
     return FALSE;
   
@@ -1680,9 +1694,11 @@ get_home_sync (GVfsBackendSftp *backend)
 
   command = new_command_stream (backend, SSH_FXP_REALPATH);
   put_string (command, ".");
-  send_command_sync_and_unref_command (backend, command, NULL, NULL);
+  send_command_sync_and_unref_command (&backend->command_connection,
+                                       command,
+                                       NULL, NULL);
 
-  reply = read_reply_sync (backend, NULL, NULL);
+  reply = read_reply_sync (&backend->command_connection, NULL, NULL);
   if (reply == NULL)
     return FALSE;
 
@@ -1749,12 +1765,15 @@ do_mount (GVfsBackend *backend,
 
   g_strfreev (args);
 
-  op_backend->command_stream = g_unix_output_stream_new (stdin_fd, TRUE);
+  op_backend->command_connection.op_backend = op_backend;
+  op_backend->command_connection.command_stream = g_unix_output_stream_new (stdin_fd, TRUE);
 
   command = new_command_stream (op_backend, SSH_FXP_INIT);
   g_data_output_stream_put_int32 (command,
                                   SSH_FILEXFER_VERSION, NULL, NULL);
-  send_command_sync_and_unref_command (op_backend, command, NULL, NULL);
+  send_command_sync_and_unref_command (&op_backend->command_connection,
+                                       command,
+                                       NULL, NULL);
 
   if (tty_fd == -1)
     res = wait_for_reply (backend, stdout_fd, &error);
@@ -1786,18 +1805,18 @@ do_mount (GVfsBackend *backend,
       return;
     }
 
-  op_backend->reply_stream = g_unix_input_stream_new (stdout_fd, TRUE);
-  op_backend->reply_stream_cancellable = g_cancellable_new ();
+  op_backend->command_connection.reply_stream = g_unix_input_stream_new (stdout_fd, TRUE);
+  op_backend->command_connection.reply_stream_cancellable = g_cancellable_new ();
 
   make_fd_nonblocking (stderr_fd);
   is = g_unix_input_stream_new (stderr_fd, TRUE);
-  op_backend->error_stream = g_data_input_stream_new (is);
+  op_backend->command_connection.error_stream = g_data_input_stream_new (is);
   g_object_unref (is);
   
-  reply = read_reply_sync (op_backend, NULL, NULL);
+  reply = read_reply_sync (&op_backend->command_connection, NULL, NULL);
   if (reply == NULL)
     {
-      look_for_stderr_errors (backend, &error);
+      look_for_stderr_errors (&op_backend->command_connection, &error);
       g_vfs_job_failed_from_error (G_VFS_JOB (job), error);
       g_error_free (error);
       return;
@@ -1839,7 +1858,8 @@ do_mount (GVfsBackend *backend,
       return;
     }
 
-  read_reply_async (g_object_ref (op_backend));
+  g_object_ref (op_backend);
+  read_reply_async (&op_backend->command_connection);
 
   sftp_mount_spec = g_mount_spec_new ("sftp");
   if (op_backend->user_specified_in_uri)
@@ -1945,8 +1965,9 @@ try_unmount (GVfsBackend *backend,
 {
   GVfsBackendSftp *op_backend = G_VFS_BACKEND_SFTP (backend);
 
-  if (op_backend->reply_stream && op_backend->reply_stream_cancellable)
-    g_cancellable_cancel (op_backend->reply_stream_cancellable);
+  if (op_backend->command_connection.reply_stream &&
+      op_backend->command_connection.reply_stream_cancellable)
+    g_cancellable_cancel (op_backend->command_connection.reply_stream_cancellable);
   g_vfs_job_succeeded (G_VFS_JOB (job));
 
   return TRUE;
@@ -2207,7 +2228,8 @@ error_from_lstat (GVfsBackendSftp *backend,
   data->original_error = original_error;
   data->callback = callback;
   data->user_data = user_data;
-  queue_command_stream_and_free (op_backend, command, error_from_lstat_reply,
+  queue_command_stream_and_free (&op_backend->command_connection, command,
+                                 error_from_lstat_reply,
                                 G_VFS_JOB (job), data);
 }
 
@@ -2550,7 +2572,9 @@ open_for_read_reply (GVfsBackendSftp *backend,
           
           command = new_command_stream (backend, SSH_FXP_CLOSE);
           put_data_buffer (command, bhandle);
-          queue_command_stream_and_free (backend, command, NULL, G_VFS_JOB (job), NULL);
+          queue_command_stream_and_free (&backend->command_connection, command,
+                                         NULL,
+                                         G_VFS_JOB (job), NULL);
 
           data_buffer_free (bhandle);
         }
@@ -2600,7 +2624,9 @@ try_open_for_read (GVfsBackend *backend,
   command = new_command_stream (op_backend,
                                 SSH_FXP_STAT);
   put_string (command, filename);
-  queue_command_stream_and_free (op_backend, command, open_stat_reply, G_VFS_JOB (job), NULL);
+  queue_command_stream_and_free (&op_backend->command_connection, command,
+                                 open_stat_reply,
+                                 G_VFS_JOB (job), NULL);
 
   command = new_command_stream (op_backend,
                                 SSH_FXP_OPEN);
@@ -2608,7 +2634,9 @@ try_open_for_read (GVfsBackend *backend,
   g_data_output_stream_put_uint32 (command, SSH_FXF_READ, NULL, NULL); /* open flags */
   g_data_output_stream_put_uint32 (command, 0, NULL, NULL); /* Attr flags */
   
-  queue_command_stream_and_free (op_backend, command, open_for_read_reply, G_VFS_JOB (job), NULL);
+  queue_command_stream_and_free (&op_backend->command_connection, command,
+                                 open_for_read_reply,
+                                 G_VFS_JOB (job), NULL);
 
   return TRUE;
 }
@@ -2673,7 +2701,9 @@ try_read (GVfsBackend *backend,
   g_data_output_stream_put_uint64 (command, handle->offset, NULL, NULL);
   g_data_output_stream_put_uint32 (command, bytes_requested, NULL, NULL);
   
-  queue_command_stream_and_free (op_backend, command, read_reply, G_VFS_JOB (job), handle);
+  queue_command_stream_and_free (&op_backend->command_connection, command,
+                                 read_reply,
+                                 G_VFS_JOB (job), handle);
 
   return TRUE;
 }
@@ -2746,7 +2776,9 @@ try_seek_on_read (GVfsBackend *backend,
       command = new_command_stream (op_backend,
                                     SSH_FXP_FSTAT);
       put_data_buffer (command, handle->raw_handle);
-      queue_command_stream_and_free (op_backend, command, seek_read_fstat_reply, G_VFS_JOB (job), handle);
+      queue_command_stream_and_free (&op_backend->command_connection, command,
+                                     seek_read_fstat_reply,
+                                     G_VFS_JOB (job), handle);
       return TRUE;
     }
 
@@ -2771,7 +2803,9 @@ delete_temp_file (GVfsBackendSftp *backend,
       command = new_command_stream (backend,
                                     SSH_FXP_REMOVE);
       put_string (command, handle->tempname);
-      queue_command_stream_and_free (backend, command, NULL, job, NULL);
+      queue_command_stream_and_free (&backend->command_connection, command,
+                                     NULL,
+                                     job, NULL);
     }
 }
 
@@ -2817,7 +2851,9 @@ close_restore_permissions (GVfsBackendSftp *backend,
                                 SSH_FXP_RENAME);
   put_string (command, handle->tempname);
   put_string (command, handle->filename);
-  queue_command_stream_and_free (backend, command, close_moved_tempfile, G_VFS_JOB (job), handle);
+  queue_command_stream_and_free (&backend->command_connection, command,
+                                 close_moved_tempfile,
+                                 G_VFS_JOB (job), handle);
 }
 
 static void
@@ -2852,7 +2888,9 @@ close_deleted_file (GVfsBackendSftp *backend,
           put_string (command, handle->tempname);
           g_data_output_stream_put_uint32 (command, SSH_FILEXFER_ATTR_PERMISSIONS, NULL, NULL);
           g_data_output_stream_put_uint32 (command, handle->permissions, NULL, NULL);
-          queue_command_stream_and_free (backend, command, close_restore_permissions, G_VFS_JOB (job), 
handle);
+          queue_command_stream_and_free (&backend->command_connection, command,
+                                         close_restore_permissions,
+                                         G_VFS_JOB (job), handle);
         }
       else
         {
@@ -2904,7 +2942,9 @@ close_moved_file (GVfsBackendSftp *backend,
                                     SSH_FXP_RENAME);
       put_string (command, handle->tempname);
       put_string (command, handle->filename);
-      queue_command_stream_and_free (backend, command, close_moved_tempfile, G_VFS_JOB (job), handle);
+      queue_command_stream_and_free (&backend->command_connection, command,
+                                     close_moved_tempfile,
+                                     G_VFS_JOB (job), handle);
     }
   else
     {
@@ -2944,7 +2984,9 @@ close_deleted_backup (GVfsBackendSftp *backend,
   put_string (command, handle->filename);
   put_string (command, backup_name);
   g_free (backup_name);
-  queue_command_stream_and_free (backend, command, close_moved_file, G_VFS_JOB (job), handle);
+  queue_command_stream_and_free (&backend->command_connection, command,
+                                 close_moved_file,
+                                 G_VFS_JOB (job), handle);
 }
 
 static void
@@ -2982,14 +3024,20 @@ close_write_reply (GVfsBackendSftp *backend,
               backup_name = g_strconcat (handle->filename, "~", NULL);
               put_string (command, backup_name);
               g_free (backup_name);
-              queue_command_stream_and_free (backend, command, close_deleted_backup, G_VFS_JOB (job), 
handle);
+              queue_command_stream_and_free (&backend->command_connection,
+                                             command,
+                                             close_deleted_backup,
+                                             G_VFS_JOB (job), handle);
             }
           else
             {
               command = new_command_stream (backend,
                                             SSH_FXP_REMOVE);
               put_string (command, handle->filename);
-              queue_command_stream_and_free (backend, command, close_deleted_file, G_VFS_JOB (job), handle);
+              queue_command_stream_and_free (&backend->command_connection,
+                                             command,
+                                             close_deleted_file,
+                                             G_VFS_JOB (job), handle);
             }
         }
       else
@@ -3039,7 +3087,9 @@ close_write_fstat_reply (GVfsBackendSftp *backend,
   command = new_command_stream (backend, SSH_FXP_CLOSE);
   put_data_buffer (command, handle->raw_handle);
 
-  queue_command_stream_and_free (backend, command, close_write_reply, G_VFS_JOB (job), handle);
+  queue_command_stream_and_free (&backend->command_connection, command,
+                                 close_write_reply,
+                                 G_VFS_JOB (job), handle);
 }
 
 static gboolean
@@ -3054,7 +3104,9 @@ try_close_write (GVfsBackend *backend,
   command = new_command_stream (op_backend, SSH_FXP_FSTAT);
   put_data_buffer (command, handle->raw_handle);
 
-  queue_command_stream_and_free (op_backend, command, close_write_fstat_reply, G_VFS_JOB (job), handle);
+  queue_command_stream_and_free (&op_backend->command_connection, command,
+                                 close_write_fstat_reply,
+                                 G_VFS_JOB (job), handle);
 
   return TRUE;
 }
@@ -3092,7 +3144,9 @@ try_close_read (GVfsBackend *backend,
   command = new_command_stream (op_backend, SSH_FXP_CLOSE);
   put_data_buffer (command, handle->raw_handle);
 
-  queue_command_stream_and_free (op_backend, command, close_read_reply, G_VFS_JOB (job), handle);
+  queue_command_stream_and_free (&op_backend->command_connection, command,
+                                 close_read_reply,
+                                 G_VFS_JOB (job), handle);
 
   return TRUE;
 }
@@ -3234,7 +3288,9 @@ try_create (GVfsBackend *backend,
   g_data_output_stream_put_uint32 (command, SSH_FXF_WRITE|SSH_FXF_CREAT|SSH_FXF_EXCL,  NULL, NULL); /* open 
flags */
   put_mode (command, flags);
   
-  queue_command_stream_and_free (op_backend, command, create_reply, G_VFS_JOB (job), NULL);
+  queue_command_stream_and_free (&op_backend->command_connection, command,
+                                 create_reply,
+                                 G_VFS_JOB (job), NULL);
 
   return TRUE;
 }
@@ -3316,7 +3372,9 @@ try_append_to (GVfsBackend *backend,
   g_data_output_stream_put_uint32 (command, SSH_FXF_WRITE|SSH_FXF_CREAT|SSH_FXF_APPEND,  NULL, NULL); /* 
open flags */
   put_mode (command, flags);
   
-  queue_command_stream_and_free (op_backend, command, append_to_reply, G_VFS_JOB (job), NULL);
+  queue_command_stream_and_free (&op_backend->command_connection, command,
+                                 append_to_reply,
+                                 G_VFS_JOB (job), NULL);
 
   return TRUE;
 }
@@ -3408,7 +3466,9 @@ replace_truncate_original (GVfsBackendSftp *backend,
   g_data_output_stream_put_uint32 (command, SSH_FXF_WRITE|SSH_FXF_CREAT|SSH_FXF_TRUNC,  NULL, NULL); /* open 
flags */
   put_mode (command, op_job->flags);
   
-  queue_command_stream_and_free (backend, command, replace_truncate_original_reply, job, NULL);
+  queue_command_stream_and_free (&backend->command_connection, command,
+                                 replace_truncate_original_reply,
+                                 job, NULL);
 }
 
 static void
@@ -3537,7 +3597,9 @@ replace_create_temp (GVfsBackendSftp *backend,
   
   if (data->set_permissions)
     g_data_output_stream_put_uint32 (command, data->permissions, NULL, NULL);
-  queue_command_stream_and_free (op_backend, command, replace_create_temp_reply, G_VFS_JOB (job), NULL);
+  queue_command_stream_and_free (&op_backend->command_connection, command,
+                                 replace_create_temp_reply,
+                                 G_VFS_JOB (job), NULL);
 }
 
 static void
@@ -3680,7 +3742,9 @@ replace_exclusive_reply (GVfsBackendSftp *backend,
           command = new_command_stream (backend,
                                         SSH_FXP_LSTAT);
           put_string (command, op_job->filename);
-          queue_command_stream_and_free (backend, command, replace_stat_reply, G_VFS_JOB (job), NULL);
+          queue_command_stream_and_free (&backend->command_connection, command,
+                                         replace_stat_reply,
+                                         G_VFS_JOB (job), NULL);
         }
       else
         {
@@ -3723,7 +3787,9 @@ try_replace (GVfsBackend *backend,
   g_data_output_stream_put_uint32 (command, SSH_FXF_WRITE|SSH_FXF_CREAT|SSH_FXF_EXCL,  NULL, NULL); /* open 
flags */
   put_mode (command, flags);
   
-  queue_command_stream_and_free (op_backend, command, replace_exclusive_reply, G_VFS_JOB (job), NULL);
+  queue_command_stream_and_free (&op_backend->command_connection, command,
+                                 replace_exclusive_reply,
+                                 G_VFS_JOB (job), NULL);
 
   return TRUE;
 }
@@ -3774,7 +3840,9 @@ try_write (GVfsBackend *backend,
                              buffer, buffer_size,
                              NULL, NULL, NULL);
   
-  queue_command_stream_and_free (op_backend, command, write_reply, G_VFS_JOB (job), handle);
+  queue_command_stream_and_free (&op_backend->command_connection, command,
+                                 write_reply,
+                                 G_VFS_JOB (job), handle);
 
   /* We always write the full size (on success) */
   g_vfs_job_write_set_written_size (job, buffer_size);
@@ -3850,7 +3918,9 @@ try_seek_on_write (GVfsBackend *backend,
       command = new_command_stream (op_backend,
                                     SSH_FXP_FSTAT);
       put_data_buffer (command, handle->raw_handle);
-      queue_command_stream_and_free (op_backend, command, seek_write_fstat_reply, G_VFS_JOB (job), handle);
+      queue_command_stream_and_free (&op_backend->command_connection, command,
+                                     seek_write_fstat_reply,
+                                     G_VFS_JOB (job), handle);
       return TRUE;
     }
 
@@ -3892,7 +3962,9 @@ try_truncate (GVfsBackend *backend,
   put_data_buffer (command, handle->raw_handle);
   g_data_output_stream_put_uint32 (command, SSH_FILEXFER_ATTR_SIZE, NULL, NULL);
   g_data_output_stream_put_uint64 (command, size, NULL, NULL);
-  queue_command_stream_and_free (op_backend, command, truncate_reply, G_VFS_JOB (job), NULL);
+  queue_command_stream_and_free (&op_backend->command_connection, command,
+                                 truncate_reply,
+                                 G_VFS_JOB (job), NULL);
 
   return TRUE;
 }
@@ -3966,7 +4038,9 @@ read_dir_got_stat_info (GVfsBackendSftp *backend,
       abs_name = g_build_filename (enum_job->filename, g_file_info_get_name (info), NULL);
       put_string (command, abs_name);
       g_free (abs_name);
-      queue_command_stream_and_free (backend, command, read_dir_readlink_reply, G_VFS_JOB (job), 
g_object_ref (info));
+      queue_command_stream_and_free (&backend->command_connection, command,
+                                     read_dir_readlink_reply,
+                                     G_VFS_JOB (job), g_object_ref (info));
     }
   else
     g_vfs_job_enumerate_add_info (enum_job, info);
@@ -4038,7 +4112,9 @@ read_dir_reply (GVfsBackendSftp *backend,
       command = new_command_stream (backend,
                                     SSH_FXP_CLOSE);
       put_data_buffer (command, data->handle);
-      queue_command_stream_and_free (backend, command, NULL, G_VFS_JOB (job), NULL);
+      queue_command_stream_and_free (&backend->command_connection, command,
+                                     NULL,
+                                     G_VFS_JOB (job), NULL);
   
       if (--data->outstanding_requests == 0)
         g_vfs_job_enumerate_done (enum_job);
@@ -4074,7 +4150,9 @@ read_dir_reply (GVfsBackendSftp *backend,
           put_string (command, abs_name);
           g_free (abs_name);
           
-          queue_command_stream_and_free (backend, command, read_dir_symlink_reply, G_VFS_JOB (job), 
g_object_ref (info));
+          queue_command_stream_and_free (&backend->command_connection, command,
+                                         read_dir_symlink_reply,
+                                         G_VFS_JOB (job), g_object_ref (info));
           data->outstanding_requests ++;
         }
       else if (strcmp (".", name) != 0 &&
@@ -4088,7 +4166,9 @@ read_dir_reply (GVfsBackendSftp *backend,
   command = new_command_stream (backend,
                                 SSH_FXP_READDIR);
   put_data_buffer (command, data->handle);
-  queue_command_stream_and_free (backend, command, read_dir_reply, G_VFS_JOB (job), NULL);
+  queue_command_stream_and_free (&backend->command_connection, command,
+                                 read_dir_reply,
+                                 G_VFS_JOB (job), NULL);
 }
 
 static void
@@ -4154,7 +4234,9 @@ open_dir_reply (GVfsBackendSftp *backend,
 
   data->outstanding_requests = 1;
   
-  queue_command_stream_and_free (op_backend, command, read_dir_reply, G_VFS_JOB (job), NULL);
+  queue_command_stream_and_free (&op_backend->command_connection, command,
+                                 read_dir_reply,
+                                 G_VFS_JOB (job), NULL);
 }
 
 static gboolean
@@ -4175,7 +4257,9 @@ try_enumerate (GVfsBackend *backend,
                                 SSH_FXP_OPENDIR);
   put_string (command, filename);
   
-  queue_command_stream_and_free (op_backend, command, open_dir_reply, G_VFS_JOB (job), NULL);
+  queue_command_stream_and_free (&op_backend->command_connection, command,
+                                 open_dir_reply,
+                                 G_VFS_JOB (job), NULL);
 
   return TRUE;
 }
@@ -4307,7 +4391,10 @@ try_query_info (GVfsBackend *backend,
       put_string (command, filename);
     }
 
-  queue_command_streams_and_free (op_backend, commands, n_commands, query_info_reply, G_VFS_JOB (job), NULL);
+  queue_command_streams_and_free (&op_backend->command_connection,
+                                  commands, n_commands,
+                                  query_info_reply,
+                                  G_VFS_JOB (job), NULL);
   
   return TRUE;
 }
@@ -4405,7 +4492,8 @@ try_query_fs_info (GVfsBackend *backend,
       put_string (command, "statvfs openssh com");
       put_string (command, filename);
 
-      queue_command_stream_and_free (op_backend, command, query_fs_info_reply,
+      queue_command_stream_and_free (&op_backend->command_connection, command,
+                                     query_fs_info_reply,
                                      G_VFS_JOB (job), info);
     }
   else
@@ -4475,7 +4563,9 @@ try_query_info_fstat (GVfsBackend *backend,
   data = g_slice_new (QueryInfoFStatData);
   data->info = info;
   data->attribute_matcher = attribute_matcher;
-  queue_command_stream_and_free (op_backend, command, query_info_fstat_reply, G_VFS_JOB (job), data);
+  queue_command_stream_and_free (&op_backend->command_connection, command,
+                                 query_info_fstat_reply,
+                                 G_VFS_JOB (job), data);
 
   return TRUE;
 }
@@ -4521,7 +4611,9 @@ move_do_rename (GVfsBackendSftp *backend,
   put_string (command, op_job->source);
   put_string (command, op_job->destination);
 
-  queue_command_stream_and_free (backend, command, move_reply, G_VFS_JOB (job), NULL);
+  queue_command_stream_and_free (&backend->command_connection, command,
+                                 move_reply,
+                                 G_VFS_JOB (job), NULL);
 }
 
 static void
@@ -4627,7 +4719,9 @@ move_lstat_reply (GVfsBackendSftp *backend,
       command = new_command_stream (backend,
                                     SSH_FXP_REMOVE);
       put_string (command, op_job->destination);
-      queue_command_stream_and_free (backend, command, move_delete_target_reply, G_VFS_JOB (job), NULL);
+      queue_command_stream_and_free (&backend->command_connection, command,
+                                     move_delete_target_reply,
+                                     G_VFS_JOB (job), NULL);
       return;
     }
 
@@ -4658,7 +4752,10 @@ try_move (GVfsBackend *backend,
                         SSH_FXP_LSTAT);
   put_string (command, destination);
 
-  queue_command_streams_and_free (op_backend, commands, 2, move_lstat_reply, G_VFS_JOB (job), NULL);
+  queue_command_streams_and_free (&op_backend->command_connection,
+                                  commands, 2,
+                                  move_lstat_reply,
+                                  G_VFS_JOB (job), NULL);
   
   return TRUE;
 }
@@ -4709,7 +4806,9 @@ try_set_display_name (GVfsBackend *backend,
   put_string (command, filename);
   put_string (command, new_name);
   
-  queue_command_stream_and_free (op_backend, command, set_display_name_reply, G_VFS_JOB (job), NULL);
+  queue_command_stream_and_free (&op_backend->command_connection, command,
+                                 set_display_name_reply,
+                                 G_VFS_JOB (job), NULL);
 
   g_free (new_name);
 
@@ -4748,7 +4847,9 @@ try_make_symlink (GVfsBackend *backend,
   put_string (command, symlink_value);
   put_string (command, filename);
   
-  queue_command_stream_and_free (op_backend, command, make_symlink_reply, G_VFS_JOB (job), NULL);
+  queue_command_stream_and_free (&op_backend->command_connection, command,
+                                 make_symlink_reply,
+                                 G_VFS_JOB (job), NULL);
 
   return TRUE;
 }
@@ -4795,7 +4896,9 @@ make_directory_reply (GVfsBackendSftp *backend,
           command = new_command_stream (backend,
                                         SSH_FXP_LSTAT);
           put_string (command, G_VFS_JOB_MAKE_DIRECTORY (job)->filename);
-          queue_command_stream_and_free (backend, command, mkdir_stat_reply, G_VFS_JOB (job), NULL);
+          queue_command_stream_and_free (&backend->command_connection, command,
+                                         mkdir_stat_reply,
+                                         G_VFS_JOB (job), NULL);
         }
       else
         result_from_status_code (job, stat_error, -1, -1);
@@ -4819,7 +4922,9 @@ try_make_directory (GVfsBackend *backend,
   /* No file info - flag 0 */
   g_data_output_stream_put_uint32 (command, 0, NULL, NULL);
 
-  queue_command_stream_and_free (op_backend, command, make_directory_reply, G_VFS_JOB (job), NULL);
+  queue_command_stream_and_free (&op_backend->command_connection, command,
+                                 make_directory_reply,
+                                 G_VFS_JOB (job), NULL);
 
   return TRUE;
 }
@@ -4880,14 +4985,18 @@ delete_lstat_reply (GVfsBackendSftp *backend,
           command = new_command_stream (backend,
                                         SSH_FXP_RMDIR);
           put_string (command, G_VFS_JOB_DELETE (job)->filename);
-          queue_command_stream_and_free (backend, command, delete_rmdir_reply, G_VFS_JOB (job), NULL);
+          queue_command_stream_and_free (&backend->command_connection, command,
+                                         delete_rmdir_reply,
+                                         G_VFS_JOB (job), NULL);
         }
       else
         {
           command = new_command_stream (backend,
                                         SSH_FXP_REMOVE);
           put_string (command, G_VFS_JOB_DELETE (job)->filename);
-          queue_command_stream_and_free (backend, command, delete_remove_reply, G_VFS_JOB (job), NULL);
+          queue_command_stream_and_free (&backend->command_connection, command,
+                                         delete_remove_reply,
+                                         G_VFS_JOB (job), NULL);
         }
 
       g_object_unref (info);
@@ -4905,7 +5014,9 @@ try_delete (GVfsBackend *backend,
   command = new_command_stream (op_backend,
                                 SSH_FXP_LSTAT);
   put_string (command, filename);
-  queue_command_stream_and_free (op_backend, command, delete_lstat_reply, G_VFS_JOB (job), NULL);
+  queue_command_stream_and_free (&op_backend->command_connection, command,
+                                 delete_lstat_reply,
+                                 G_VFS_JOB (job), NULL);
 
   return TRUE;
 }
@@ -4982,7 +5093,9 @@ try_set_attribute (GVfsBackend *backend,
   put_string (command, filename);
   g_data_output_stream_put_uint32 (command, SSH_FILEXFER_ATTR_PERMISSIONS, NULL, NULL);
   g_data_output_stream_put_uint32 (command, (*(guint32 *)value_p) & 0777, NULL, NULL);
-  queue_command_stream_and_free (op_backend, command, set_attribute_reply, G_VFS_JOB (job), NULL);
+  queue_command_stream_and_free (&op_backend->command_connection, command,
+                                 set_attribute_reply,
+                                 G_VFS_JOB (job), NULL);
   
   return TRUE;
 }
@@ -5045,7 +5158,10 @@ sftp_push_handle_free (SftpPushHandle *handle)
         {
           command = new_command_stream (handle->backend, SSH_FXP_CLOSE);
           put_data_buffer (command, handle->raw_handle);
-          queue_command_stream_and_free (handle->backend, command, NULL, handle->job, NULL);
+          queue_command_stream_and_free (&handle->backend->command_connection,
+                                         command,
+                                         NULL,
+                                         handle->job, NULL);
           data_buffer_free (handle->raw_handle);
         }
 
@@ -5055,7 +5171,10 @@ sftp_push_handle_free (SftpPushHandle *handle)
         {
           command = new_command_stream (handle->backend, SSH_FXP_REMOVE);
           put_string (command, handle->tempname);
-          queue_command_stream_and_free (handle->backend, command, NULL, handle->job, NULL);
+          queue_command_stream_and_free (&handle->backend->command_connection,
+                                         command,
+                                         NULL,
+                                         handle->job, NULL);
 
           g_free (handle->tempname);
         }
@@ -5130,7 +5249,9 @@ push_close_deleted_file (GVfsBackendSftp *backend,
           GDataOutputStream *command = new_command_stream (backend, SSH_FXP_RENAME);
           put_string (command, handle->tempname);
           put_string (command, handle->op_job->destination);
-          queue_command_stream_and_free (backend, command, push_close_moved_file, job, handle);
+          queue_command_stream_and_free (&backend->command_connection, command,
+                                         push_close_moved_file,
+                                         job, handle);
 
           g_free (handle->tempname);
           handle->tempname = NULL;
@@ -5154,7 +5275,9 @@ push_close_delete_or_succeed (SftpPushHandle *handle)
       /* If we wrote to a temp file, do delete then rename. */
       GDataOutputStream *command = new_command_stream (handle->backend, SSH_FXP_REMOVE);
       put_string (command, handle->op_job->destination);
-      queue_command_stream_and_free (handle->backend, command, push_close_deleted_file, handle->job, handle);
+      queue_command_stream_and_free (&handle->backend->command_connection, command,
+                                     push_close_deleted_file,
+                                     handle->job, handle);
     }
   else
     {
@@ -5202,7 +5325,9 @@ push_close_write_reply (GVfsBackendSftp *backend,
               put_string (command, handle->tempname ? handle->tempname : handle->op_job->destination);
               g_data_output_stream_put_uint32 (command, SSH_FILEXFER_ATTR_PERMISSIONS, NULL, NULL);
               g_data_output_stream_put_uint32 (command, handle->permissions, NULL, NULL);
-              queue_command_stream_and_free (backend, command, push_close_restore_permissions, job, handle);
+              queue_command_stream_and_free (&backend->command_connection, command,
+                                             push_close_restore_permissions,
+                                             job, handle);
             }
           return;
         }
@@ -5221,7 +5346,9 @@ push_finish (SftpPushHandle *handle)
 {
   GDataOutputStream *command = new_command_stream (handle->backend, SSH_FXP_CLOSE);
   put_data_buffer (command, handle->raw_handle);
-  queue_command_stream_and_free (handle->backend, command, push_close_write_reply, handle->job, handle);
+  queue_command_stream_and_free (&handle->backend->command_connection, command,
+                                 push_close_write_reply,
+                                 handle->job, handle);
 
   data_buffer_free (handle->raw_handle);
   handle->raw_handle = NULL;
@@ -5349,7 +5476,9 @@ push_read_cb (GObject *source, GAsyncResult *res, gpointer user_data)
   g_output_stream_write_all (G_OUTPUT_STREAM (command),
                              handle->buffer, count,
                              NULL, NULL, NULL);
-  queue_command_stream_and_free (handle->backend, command, push_write_reply, handle->job, request);
+  queue_command_stream_and_free (&handle->backend->command_connection, command,
+                                 push_write_reply,
+                                 handle->job, request);
   handle->offset += count;
 
   if (handle->num_req < PUSH_MAX_REQUESTS)
@@ -5407,7 +5536,9 @@ push_create_temp_reply (GVfsBackendSftp *backend,
           put_string (command, handle->op_job->destination);
           g_data_output_stream_put_uint32 (command, SSH_FXF_WRITE|SSH_FXF_CREAT|SSH_FXF_TRUNC,  NULL, NULL);
           g_data_output_stream_put_uint32 (command, 0, NULL, NULL);
-          queue_command_stream_and_free (backend, command, push_truncate_original_reply, job, handle);
+          queue_command_stream_and_free (&backend->command_connection, command,
+                                         push_truncate_original_reply,
+                                         job, handle);
 
           return;
         }
@@ -5461,7 +5592,9 @@ push_create_temp (SftpPushHandle *handle)
   put_string (command, handle->tempname);
   g_data_output_stream_put_uint32 (command, SSH_FXF_WRITE|SSH_FXF_CREAT|SSH_FXF_EXCL,  NULL, NULL);
   g_data_output_stream_put_uint32 (command, 0, NULL, NULL);
-  queue_command_stream_and_free (handle->backend, command, push_create_temp_reply, handle->job, handle);
+  queue_command_stream_and_free (&handle->backend->command_connection, command,
+                                 push_create_temp_reply,
+                                 handle->job, handle);
 }
 
 static void
@@ -5527,7 +5660,9 @@ push_open_reply (GVfsBackendSftp *backend,
                * it. */
               GDataOutputStream *command = new_command_stream (backend, SSH_FXP_LSTAT);
               put_string (command, handle->op_job->destination);
-              queue_command_stream_and_free (backend, command, push_open_stat_reply, job, handle);
+              queue_command_stream_and_free (&backend->command_connection, command,
+                                             push_open_stat_reply,
+                                             job, handle);
               return;
             }
           else
@@ -5568,7 +5703,9 @@ push_source_fstat_cb (GObject *source, GAsyncResult *res, gpointer user_data)
       put_string (command, handle->op_job->destination);
       g_data_output_stream_put_uint32 (command, SSH_FXF_WRITE|SSH_FXF_CREAT|SSH_FXF_EXCL, NULL, NULL);
       g_data_output_stream_put_uint32 (command, 0, NULL, NULL);
-      queue_command_stream_and_free (handle->backend, command, push_open_reply, handle->job, handle);
+      queue_command_stream_and_free (&handle->backend->command_connection, command,
+                                     push_open_reply,
+                                     handle->job, handle);
     }
   else
     {
@@ -5741,7 +5878,9 @@ sftp_pull_handle_free (SftpPullHandle *handle)
         {
           GDataOutputStream *command = new_command_stream (handle->backend, SSH_FXP_CLOSE);
           put_data_buffer (command, handle->raw_handle);
-          queue_command_stream_and_free (handle->backend, command, NULL, handle->job, NULL);
+          queue_command_stream_and_free (&handle->backend->command_connection, command,
+                                         NULL,
+                                         handle->job, NULL);
           data_buffer_free (handle->raw_handle);
         }
       g_clear_object (&handle->output);
@@ -5777,7 +5916,7 @@ pull_set_perms_cb (GObject *source, GAsyncResult *res, gpointer user_data)
     {
       GDataOutputStream *command = new_command_stream (handle->backend, SSH_FXP_REMOVE);
       put_string (command, handle->op_job->source);
-      queue_command_stream_and_free (handle->backend,
+      queue_command_stream_and_free (&handle->backend->command_connection,
                                      command,
                                      pull_remove_source_reply,
                                      handle->job,
@@ -5819,7 +5958,7 @@ pull_close_cb (GObject *source, GAsyncResult *res, gpointer user_data)
         {
           GDataOutputStream *command = new_command_stream (handle->backend, SSH_FXP_REMOVE);
           put_string (command, handle->op_job->source);
-          queue_command_stream_and_free (handle->backend,
+          queue_command_stream_and_free (&handle->backend->command_connection,
                                          command,
                                          pull_remove_source_reply,
                                          handle->job,
@@ -6017,7 +6156,9 @@ pull_enqueue_request (SftpPullHandle *handle, guint64 offset, guint32 len)
   put_data_buffer (command, handle->raw_handle);
   g_data_output_stream_put_uint64 (command, offset, NULL, NULL);
   g_data_output_stream_put_uint32 (command, len, NULL, NULL);
-  queue_command_stream_and_free (handle->backend, command, pull_read_reply, handle->job, request);
+  queue_command_stream_and_free (&handle->backend->command_connection, command,
+                                 pull_read_reply,
+                                 handle->job, request);
 
   handle->num_req++;
 }
@@ -6081,7 +6222,7 @@ pull_dest_open_cb (GObject *source, GAsyncResult *res, gpointer user_data)
       GDataOutputStream *command = new_command_stream (handle->backend,
                                                        SSH_FXP_FSTAT);
       put_data_buffer (command, handle->raw_handle);
-      queue_command_stream_and_free (handle->backend,
+      queue_command_stream_and_free (&handle->backend->command_connection,
                                      command,
                                      pull_fstat_reply,
                                      handle->job,
@@ -6196,7 +6337,7 @@ try_pull (GVfsBackend *backend,
   g_data_output_stream_put_uint32 (commands[1], SSH_FXF_READ, NULL, NULL);
   g_data_output_stream_put_uint32 (commands[1], 0, NULL, NULL);
 
-  queue_command_streams_and_free (op_backend,
+  queue_command_streams_and_free (&op_backend->command_connection,
                                   commands, 2,
                                   pull_open_reply,
                                   G_VFS_JOB(job),


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]