[gvfs] afp: avoid useless copy when writing



commit 76b50e1448a67c53282eec55619960c246efbad7
Author: Carl-Anton Ingmarsson <ca ingmarsson gmail com>
Date:   Tue Aug 9 15:44:32 2011 +0200

    afp: avoid useless copy when writing

 daemon/gvfsafpconnection.c |  272 ++++++++++++++++++++++++++++++++------------
 daemon/gvfsafpconnection.h |    2 +
 daemon/gvfsbackendafp.c    |    4 +-
 3 files changed, 201 insertions(+), 77 deletions(-)
---
diff --git a/daemon/gvfsafpconnection.c b/daemon/gvfsafpconnection.c
index 705f7d3..12100ed 100644
--- a/daemon/gvfsafpconnection.c
+++ b/daemon/gvfsafpconnection.c
@@ -77,12 +77,12 @@ g_vfs_afp_name_new (guint32 text_encoding, gchar *str, gsize len)
  */
 struct _GVfsAfpReplyClass
 {
-	GObjectClass parent_class;
+  GObjectClass parent_class;
 };
 
 struct _GVfsAfpReply
 {
-	GObject parent_instance;
+  GObject parent_instance;
 
   AfpResultCode result_code;
 
@@ -378,14 +378,17 @@ g_vfs_afp_reply_get_result_code (GVfsAfpReply *reply)
  */
 struct _GVfsAfpCommandClass
 {
-	GDataOutputStreamClass parent_class;
+  GDataOutputStreamClass parent_class;
 };
 
 struct _GVfsAfpCommand
 {
-	GDataOutputStream parent_instance;
-
+  GDataOutputStream parent_instance;
+  
   AfpCommandType type;
+
+  const char *buf;
+  gsize buf_size;
 };
 
 G_DEFINE_TYPE (GVfsAfpCommand, g_vfs_afp_command, G_TYPE_DATA_OUTPUT_STREAM);
@@ -511,6 +514,16 @@ g_vfs_afp_command_get_data (GVfsAfpCommand *comm)
   return g_memory_output_stream_get_data (mem_stream);
 }
 
+void
+g_vfs_afp_command_set_buffer (GVfsAfpCommand *comm, const char *buf, gsize size)
+{
+  g_return_if_fail (buf != NULL);
+  g_return_if_fail (size > 0);
+
+  comm->buf = buf;
+  comm->buf_size = size;
+}
+
 /*
  * GVfsAfpConnection
  */
@@ -551,7 +564,6 @@ struct _GVfsAfpConnectionPrivate
 
   /* send loop */
   gboolean send_loop_running;
-  gsize bytes_written;
   DSIHeader write_dsi_header;
 
   /* read loop */
@@ -672,20 +684,20 @@ dispatch_reply (GVfsAfpConnection *afp_connection)
       RequestData *req_data;
       
       req_data = g_hash_table_lookup (priv->request_hash,
-                                      GUINT_TO_POINTER ((guint)priv->read_dsi_header.requestID));
+                                      GUINT_TO_POINTER ((guint)dsi_header->requestID));
       if (req_data)
       {
         GVfsAfpReply *reply;
 
-        reply = g_vfs_afp_reply_new (priv->read_dsi_header.errorCode, priv->data,
-                                     priv->read_dsi_header.totalDataLength);
+        reply = g_vfs_afp_reply_new (dsi_header->errorCode, priv->data,
+                                     dsi_header->totalDataLength);
 
         g_simple_async_result_set_op_res_gpointer (req_data->simple, reply,
                                                    g_object_unref);
         g_simple_async_result_complete (req_data->simple);
 
         g_hash_table_remove (priv->request_hash,
-                             GUINT_TO_POINTER ((guint)priv->read_dsi_header.requestID));
+                             GUINT_TO_POINTER ((guint)dsi_header->requestID));
       }
       else
         g_free (priv->data);
@@ -786,6 +798,116 @@ read_reply (GVfsAfpConnection *afp_connection)
                              read_dsi_header_cb, afp_connection);
 }
 
+typedef struct
+{
+  const void *buffer;
+  gsize count;
+  int io_priority;
+  GCancellable *cancellable;
+  gssize bytes_written;
+} WriteAllData;
+
+inline static void
+free_write_all_data (WriteAllData *write_data)
+{
+  if (write_data->cancellable)
+    g_object_unref (write_data->cancellable);
+  
+  g_slice_free (WriteAllData, write_data);
+}
+
+static void
+write_all_cb (GObject      *source_object,
+              GAsyncResult *res,
+              gpointer      user_data)
+{
+  GOutputStream *stream = G_OUTPUT_STREAM (source_object);
+  GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
+
+  gssize bytes_written;
+  GError *err = NULL;
+  WriteAllData *write_data;
+
+  bytes_written = g_output_stream_write_finish (stream, res, &err);
+  if (bytes_written == -1)
+  {
+    g_simple_async_result_take_error (simple, err);
+    g_simple_async_result_complete (simple);
+    return;
+  }
+  
+  write_data = g_simple_async_result_get_op_res_gpointer (simple);
+
+  write_data->bytes_written += bytes_written;
+  if (write_data->bytes_written < write_data->count)
+  {
+    g_output_stream_write_async (stream,
+                                 (const guint8 *)write_data->buffer + write_data->bytes_written,
+                                 write_data->count - write_data->bytes_written,
+                                 write_data->io_priority, write_data->cancellable,
+                                 write_all_cb, simple);
+    return;
+  }
+
+  g_simple_async_result_complete (simple);
+}
+
+static void
+write_all_async (GOutputStream      *stream,
+                 const void         *buffer,
+                 gsize               count,
+                 int                 io_priority,
+                 GCancellable       *cancellable,
+                 GAsyncReadyCallback callback,
+                 gpointer            user_data)
+{
+  GSimpleAsyncResult *simple;
+  WriteAllData *write_data;
+
+  write_data = g_slice_new0 (WriteAllData);
+  write_data->buffer = buffer;
+  write_data->count = count;
+  write_data->io_priority = io_priority;
+  if (cancellable)
+    write_data->cancellable = g_object_ref (cancellable);
+  
+  simple = g_simple_async_result_new (G_OBJECT (stream), callback, user_data,
+                                      write_all_async);
+  g_simple_async_result_set_op_res_gpointer (simple, write_data,
+                                             (GDestroyNotify)free_write_all_data);
+  
+  g_output_stream_write_async (stream, buffer, count, io_priority, cancellable,
+                               write_all_cb, simple);
+}
+
+static gboolean
+write_all_finish (GOutputStream *stream,
+                  GAsyncResult  *res,
+                  gsize         *bytes_written,
+                  GError       **error)
+{
+  GSimpleAsyncResult *simple;
+  
+  g_return_val_if_fail (g_simple_async_result_is_valid (res, G_OBJECT (stream),
+                                                        write_all_async),
+                        FALSE);
+
+  simple = (GSimpleAsyncResult *)res;
+  if (g_simple_async_result_propagate_error (simple, error))
+    return FALSE;
+  
+  
+  if (bytes_written)
+  {
+    WriteAllData *write_data;
+  
+    write_data = g_simple_async_result_get_op_res_gpointer (simple);
+    *bytes_written = write_data->bytes_written;
+  }
+
+  return TRUE;
+}
+
 static void
 remove_first (GQueue *request_queue)
 {
@@ -795,54 +917,74 @@ remove_first (GQueue *request_queue)
   free_request_data (req_data);
 }
 
+#define HANDLE_RES() { \
+  gboolean result; \
+  GError *err = NULL; \
+\
+ result = write_all_finish (output, res, NULL, &err); \
+ if (!result) \
+  { \
+    if (req_data->simple) \
+    { \
+      g_simple_async_result_set_from_error (req_data->simple, err); \
+      g_simple_async_result_complete (req_data->simple); \
+    } \
+\
+    remove_first (priv->request_queue); \
+    g_error_free (err); \
+\
+    send_request (afp_conn); \
+    return; \
+  } \
+}
+    
+
 static void
-write_command_cb (GObject *object, GAsyncResult *res, gpointer user_data)
+write_buf_cb (GObject *object, GAsyncResult *res, gpointer user_data)
 {
   GOutputStream *output = G_OUTPUT_STREAM (object);
   GVfsAfpConnection *afp_conn = G_VFS_AFP_CONNECTION (user_data);
   GVfsAfpConnectionPrivate *priv = afp_conn->priv;
 
   RequestData *req_data;
-  gssize bytes_written;
-  GError *err = NULL;
-  gsize size;
 
-  req_data = g_queue_pop_head (priv->request_queue);
+  req_data = g_queue_peek_head (priv->request_queue);
   
-  bytes_written = g_output_stream_write_finish (output, res, &err);
-  if (bytes_written == -1)
-  {
-    if (req_data->simple)
-    {
-      g_simple_async_result_set_from_error (req_data->simple, err);
-      g_simple_async_result_complete (req_data->simple);
-    }
+  HANDLE_RES ();
 
-    free_request_data (req_data);
-    g_error_free (err);
+  g_hash_table_insert (priv->request_hash,
+                       GUINT_TO_POINTER ((guint)GUINT16_FROM_BE (priv->write_dsi_header.requestID)),
+                       req_data);
+  g_queue_pop_head (priv->request_queue);
 
-    send_request (afp_conn);
-    return;
-  }
+  send_request (afp_conn);
+}
 
-  size = g_vfs_afp_command_get_size (req_data->command);
+static void
+write_command_cb (GObject *object, GAsyncResult *res, gpointer user_data)
+{
+  GOutputStream *output = G_OUTPUT_STREAM (object);
+  GVfsAfpConnection *afp_conn = G_VFS_AFP_CONNECTION (user_data);
+  GVfsAfpConnectionPrivate *priv = afp_conn->priv;
+
+  RequestData *req_data;
+
+  req_data = g_queue_peek_head (priv->request_queue);
   
-  priv->bytes_written += bytes_written;
-  if (priv->bytes_written < size)
+  HANDLE_RES ();
+
+  if (priv->write_dsi_header.command == DSI_WRITE &&
+     req_data->command->buf)
   {
-    char *data;
-    
-    data = g_vfs_afp_command_get_data (req_data->command);
-    
-    g_output_stream_write_async (output, data + priv->bytes_written,
-                                 size - priv->bytes_written, 0, NULL,
-                                 write_command_cb, afp_conn);
+    write_all_async (output, req_data->command->buf, req_data->command->buf_size,
+                     0, NULL, write_buf_cb, afp_conn);
     return;
   }
-
+  
   g_hash_table_insert (priv->request_hash,
                        GUINT_TO_POINTER ((guint)GUINT16_FROM_BE (priv->write_dsi_header.requestID)),
                        req_data);
+  g_queue_pop_head (priv->request_queue);
 
   send_request (afp_conn);
 }
@@ -855,38 +997,13 @@ write_dsi_header_cb (GObject *object, GAsyncResult *res, gpointer user_data)
   GVfsAfpConnectionPrivate *priv = afp_conn->priv;
   
   RequestData *req_data;
-  gssize bytes_written;
-  GError *err = NULL;
 
   char *data;
   gsize size;
 
   req_data = g_queue_peek_head (priv->request_queue);
   
-  bytes_written = g_output_stream_write_finish (output, res, &err);
-  if (bytes_written == -1)
-  {
-    if (req_data->simple)
-    {
-      g_simple_async_result_set_from_error (req_data->simple, err);
-      g_simple_async_result_complete (req_data->simple);
-    }
-
-    remove_first (priv->request_queue);
-    g_error_free (err);
-
-    send_request (afp_conn);
-    return;
-  }
-
-  priv->bytes_written += bytes_written;
-  if (priv->bytes_written < sizeof (DSIHeader))
-  {
-    g_output_stream_write_async (output, &priv->write_dsi_header + priv->bytes_written,
-                                 sizeof (DSIHeader) - priv->bytes_written, 0,
-                                 NULL, write_dsi_header_cb, afp_conn);
-    return;
-  }
+  HANDLE_RES ();
 
   if (req_data->type == REQUEST_TYPE_TICKLE)
   {
@@ -898,9 +1015,7 @@ write_dsi_header_cb (GObject *object, GAsyncResult *res, gpointer user_data)
   data = g_vfs_afp_command_get_data (req_data->command);
   size = g_vfs_afp_command_get_size (req_data->command);
 
-  priv->bytes_written = 0;
-  g_output_stream_write_async (output, data, size, 0,
-                               NULL, write_command_cb, afp_conn);
+  write_all_async (output, data, size, 0, NULL, write_command_cb, afp_conn);
 }
 
 static void
@@ -947,6 +1062,9 @@ send_request (GVfsAfpConnection *afp_connection)
       break;
 
     case REQUEST_TYPE_COMMAND:
+    {
+      gsize size;
+      
       switch (req_data->command->type)
       {
         case AFP_COMMAND_WRITE:
@@ -968,19 +1086,25 @@ send_request (GVfsAfpConnection *afp_connection)
       priv->write_dsi_header.command = dsi_command;
       priv->write_dsi_header.requestID = GUINT16_TO_BE (get_request_id (afp_connection));
       priv->write_dsi_header.writeOffset = GUINT32_TO_BE (writeOffset);
-      priv->write_dsi_header.totalDataLength = GUINT32_TO_BE (g_vfs_afp_command_get_size (req_data->command));
+
+      /* totalDataLength */
+      size = g_vfs_afp_command_get_size (req_data->command);
+      if (dsi_command == DSI_WRITE && req_data->command->buf)
+        size += req_data->command->buf_size;
+      priv->write_dsi_header.totalDataLength = GUINT32_TO_BE (size);
+      
       priv->write_dsi_header.reserved = 0;
       break;
+    }
 
     default:
       g_assert_not_reached ();
   }
 
-  
-  priv->bytes_written = 0;
-  g_output_stream_write_async (g_io_stream_get_output_stream (priv->conn),
-                               &priv->write_dsi_header, sizeof (DSIHeader), 0,
-                               NULL, write_dsi_header_cb, afp_connection); 
+
+  write_all_async (g_io_stream_get_output_stream (priv->conn),
+                   &priv->write_dsi_header, sizeof (DSIHeader), 0,
+                   NULL, write_dsi_header_cb, afp_connection);
 }
 
 void
diff --git a/daemon/gvfsafpconnection.h b/daemon/gvfsafpconnection.h
index 9520e0b..6494bc9 100644
--- a/daemon/gvfsafpconnection.h
+++ b/daemon/gvfsafpconnection.h
@@ -335,6 +335,8 @@ void            g_vfs_afp_command_pad_to_even  (GVfsAfpCommand *comm);
 gsize           g_vfs_afp_command_get_size     (GVfsAfpCommand *comm);
 char*           g_vfs_afp_command_get_data     (GVfsAfpCommand *comm);
 
+void            g_vfs_afp_command_set_buffer   (GVfsAfpCommand *comm, const char *buf, gsize size);
+
 GType           g_vfs_afp_command_get_type (void) G_GNUC_CONST;
 
 
diff --git a/daemon/gvfsbackendafp.c b/daemon/gvfsbackendafp.c
index ceaa76b..a2dd9f5 100644
--- a/daemon/gvfsbackendafp.c
+++ b/daemon/gvfsbackendafp.c
@@ -1873,9 +1873,7 @@ try_write (GVfsBackend *backend,
   req_count = MIN (buffer_size, G_MAXUINT32);
   g_vfs_afp_command_put_int64 (comm, req_count);
 
-  /* TODO: don't copy buffer here  */
-  g_output_stream_write_all (G_OUTPUT_STREAM (comm), buffer, req_count, NULL,
-                             NULL, NULL);
+  g_vfs_afp_command_set_buffer (comm, buffer, buffer_size);
 
   g_vfs_afp_connection_send_command (afp_backend->server->conn, comm,
                                      write_ext_cb, G_VFS_JOB (job)->cancellable,



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]