[gvfs] Use a worker thread in GVfsAfpConnection



commit c1b3b8a3e3d979a4857587b57026d91f56ff1627
Author: Carl-Anton Ingmarsson <ca ingmarsson gmail com>
Date:   Sun Jan 29 23:24:05 2012 +0100

    Use a worker thread in GVfsAfpConnection

 daemon/gvfsafpconnection.c |  550 +++++++++++++++++++++++++++++++-------------
 daemon/gvfsafpconnection.h |    6 +-
 daemon/gvfsafpserver.c     |   71 ++-----
 daemon/gvfsafpvolume.c     |    8 +-
 4 files changed, 413 insertions(+), 222 deletions(-)
---
diff --git a/daemon/gvfsafpconnection.c b/daemon/gvfsafpconnection.c
index 26c42ba..aaeebc2 100644
--- a/daemon/gvfsafpconnection.c
+++ b/daemon/gvfsafpconnection.c
@@ -618,16 +618,30 @@ typedef struct {
   guint32 reserved;
 } DSIHeader;
 
+enum {
+  FLAG_INITIALIZED   = 1 << 0,
+  FLAG_PENDING_CLOSE = 1 << 1,
+  FLAG_CLOSED        = 1 << 2
+};
+
 struct _GVfsAfpConnectionPrivate
 {
   GSocketConnectable *addr;
-  GIOStream *conn;
+  GIOStream *stream;
 
+  /* Flags */
+  volatile gint atomic_flags;
+  
   guint16 request_id;
 
   guint32 kRequestQuanta;
   guint32 kServerReplayCacheSize;
 
+  GThread      *worker_thread;
+  GMainContext *worker_context;
+  GMainLoop    *worker_loop;
+  GMutex        mutex;
+  
   GQueue     *request_queue;
   GHashTable *request_hash;
 
@@ -636,10 +650,12 @@ struct _GVfsAfpConnectionPrivate
   DSIHeader write_dsi_header;
 
   /* read loop */
-  gboolean read_loop_running;
+  GCancellable *read_cancellable;
   DSIHeader read_dsi_header;
   char *reply_buf;
   gboolean free_reply_buf;
+
+  GSList *pending_closes;
 };
 
 typedef enum
@@ -667,8 +683,58 @@ typedef struct
   char           *reply_buf;
   GSimpleAsyncResult *simple;
   GCancellable *cancellable;
+
+  GVfsAfpConnection *conn;
 } RequestData;
 
+typedef struct
+{
+  GMutex mutex;
+  GCond  cond;
+  GVfsAfpConnection *conn;
+  GCancellable *cancellable;
+  gboolean res;
+  GError **error;
+  void *data;
+} SyncData;
+
+static void
+sync_data_init (SyncData *data, GVfsAfpConnection *conn, GError **error)
+{
+  g_mutex_init (&data->mutex);
+  g_cond_init (&data->cond);
+  data->conn = conn;
+  data->error = error;
+  data->res = FALSE;
+}
+
+static void
+sync_data_clear (SyncData *data)
+{
+  g_mutex_clear (&data->mutex);
+  g_cond_clear (&data->cond);
+}
+
+static void
+sync_data_signal (SyncData *data)
+{
+  g_mutex_lock (&data->mutex);
+  g_cond_signal (&data->cond);
+  g_mutex_unlock (&data->mutex);
+}
+
+static void
+sync_data_wait (SyncData *data)
+{
+  g_mutex_lock (&data->mutex);
+  g_cond_wait (&data->cond, &data->mutex);
+  g_mutex_unlock (&data->mutex);
+}
+
+static void send_request_unlocked (GVfsAfpConnection *afp_connection);
+static void close_connection (GVfsAfpConnection *conn);
+static void read_reply (GVfsAfpConnection *afp_connection);
+
 static void
 free_request_data (RequestData *req_data)
 {
@@ -682,6 +748,31 @@ free_request_data (RequestData *req_data)
   g_slice_free (RequestData, req_data);
 }
 
+static gboolean
+check_open (GVfsAfpConnection *conn, GError **error)
+{
+  GVfsAfpConnectionPrivate *priv = conn->priv;
+  
+  /* Acts as memory barrier */
+  gint flags = g_atomic_int_get (&priv->atomic_flags);
+
+  if (!(flags & FLAG_INITIALIZED))
+  {
+    g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_INITIALIZED,
+                         _("The connection is not opened"));
+    return FALSE;
+  }
+
+  else if ((flags & FLAG_CLOSED))
+  {
+    g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_CLOSED,
+                         _("The connection is closed"));
+    return FALSE;
+  }
+
+  return TRUE;
+}
+
 static void
 g_vfs_afp_connection_init (GVfsAfpConnection *afp_connection)
 {
@@ -690,20 +781,17 @@ g_vfs_afp_connection_init (GVfsAfpConnection *afp_connection)
   afp_connection->priv = priv =  G_TYPE_INSTANCE_GET_PRIVATE (afp_connection,
                                                               G_VFS_TYPE_AFP_CONNECTION,
                                                               GVfsAfpConnectionPrivate);
-
-  priv->addr = NULL;
-  priv->conn = NULL;
-  priv->request_id = 0;
-
   priv->kRequestQuanta = -1;
   priv->kServerReplayCacheSize = -1;
 
+  g_mutex_init (&priv->mutex);
+
   priv->request_queue = g_queue_new ();
   priv->request_hash = g_hash_table_new_full (g_direct_hash, g_direct_equal,
                                               NULL, (GDestroyNotify)free_request_data);
+  priv->read_cancellable = g_cancellable_new ();
 
   priv->send_loop_running = FALSE;
-  priv->read_loop_running = FALSE;
 }
 
 static void
@@ -712,11 +800,11 @@ g_vfs_afp_connection_finalize (GObject *object)
   GVfsAfpConnection *afp_connection = (GVfsAfpConnection *)object;
   GVfsAfpConnectionPrivate *priv = afp_connection->priv;
 
-  if (priv->addr)
-    g_object_unref (priv->addr);
-  
-  if (priv->conn)
-    g_object_unref (priv->conn);
+  g_clear_object (&priv->addr);
+  g_clear_object (&priv->stream);
+  g_clear_object (&priv->read_cancellable);
+
+  g_mutex_clear (&priv->mutex);
 
   G_OBJECT_CLASS (g_vfs_afp_connection_parent_class)->finalize (object);
 }
@@ -738,9 +826,6 @@ g_vfs_afp_connection_class_init (GVfsAfpConnectionClass *klass)
                   G_TYPE_NONE, 1, G_TYPE_UINT);
 }
 
-static void read_reply (GVfsAfpConnection *afp_connection);
-static void send_request (GVfsAfpConnection *afp_connection);
-
 static guint16
 get_request_id (GVfsAfpConnection *afp_connection)
 {
@@ -749,23 +834,6 @@ get_request_id (GVfsAfpConnection *afp_connection)
   return priv->request_id++;
 }
 
-static void
-run_loop (GVfsAfpConnection *afp_connection)
-{
-  GVfsAfpConnectionPrivate *priv = afp_connection->priv;
-
-  if (!priv->send_loop_running)
-  {
-    priv->send_loop_running = TRUE;
-    send_request (afp_connection);
-  }
-  if (!priv->read_loop_running)
-  {
-    priv->read_loop_running = TRUE;
-    read_reply (afp_connection);
-  }
-}
-
 typedef struct
 {
   void         *buffer;
@@ -896,8 +964,16 @@ dispatch_reply (GVfsAfpConnection *afp_connection)
       req_data = g_slice_new0 (RequestData);
       req_data->type = REQUEST_TYPE_TICKLE;
 
+      /* take lock */
+      g_mutex_lock (&priv->mutex);
       g_queue_push_head (priv->request_queue, req_data);
-      run_loop (afp_connection);
+      if (!priv->send_loop_running) {
+        priv->send_loop_running = TRUE;
+        send_request_unlocked (afp_connection);
+      }
+      /* release lock */
+      g_mutex_unlock (&priv->mutex);
+
       break;
     }
 
@@ -928,7 +1004,7 @@ dispatch_reply (GVfsAfpConnection *afp_connection)
 
         g_simple_async_result_set_op_res_gpointer (req_data->simple, reply,
                                                    g_object_unref);
-        g_simple_async_result_complete (req_data->simple);
+        g_simple_async_result_complete_in_idle (req_data->simple);
 
         g_hash_table_remove (priv->request_hash,
                              GUINT_TO_POINTER ((guint)dsi_header->requestID));
@@ -951,6 +1027,13 @@ read_data_cb (GObject *object, GAsyncResult *res, gpointer user_data)
   gboolean result;
   GError *err = NULL;
 
+  if (g_atomic_int_get (&priv->atomic_flags) & FLAG_PENDING_CLOSE)
+  {
+    if (!priv->send_loop_running)
+      close_connection (afp_connection);
+    return;
+  }
+  
   result = read_all_finish (input, res, NULL, &err);
   if (!result)
   {
@@ -976,6 +1059,13 @@ read_dsi_header_cb (GObject *object, GAsyncResult *res, gpointer user_data)
   gboolean result;
   GError *err = NULL;
   DSIHeader *dsi_header;
+
+  if (g_atomic_int_get (&priv->atomic_flags) & FLAG_PENDING_CLOSE)
+  {
+    if (!priv->send_loop_running)
+      close_connection (afp_conn);
+    return;
+  }
   
   result = read_all_finish (input, res, NULL, &err);
   if (!result)
@@ -1008,7 +1098,7 @@ read_dsi_header_cb (GObject *object, GAsyncResult *res, gpointer user_data)
     }
     
     read_all_async (input, priv->reply_buf, dsi_header->totalDataLength,
-                    0, NULL, read_data_cb, afp_conn);
+                    0, priv->read_cancellable, read_data_cb, afp_conn);
     
     return;
   }
@@ -1024,10 +1114,17 @@ read_reply (GVfsAfpConnection *afp_connection)
   
   GInputStream *input;
 
-  input = g_io_stream_get_input_stream (priv->conn);
+  if (g_atomic_int_get (&priv->atomic_flags) & FLAG_PENDING_CLOSE)
+  {
+    if (!priv->send_loop_running)
+      close_connection (afp_connection);
+    return;
+  }
   
-  read_all_async (input, &priv->read_dsi_header, sizeof (DSIHeader), 0, NULL,
-                  read_dsi_header_cb, afp_connection);
+  input = g_io_stream_get_input_stream (priv->stream);
+  
+  read_all_async (input, &priv->read_dsi_header, sizeof (DSIHeader), 0,
+                  priv->read_cancellable, read_dsi_header_cb, afp_connection);
 }
 
 typedef struct
@@ -1141,15 +1238,6 @@ write_all_finish (GOutputStream *stream,
   return TRUE;
 }
 
-static void
-remove_first (GQueue *request_queue)
-{
-  RequestData *req_data;
-
-  req_data = g_queue_pop_head (request_queue);
-  free_request_data (req_data);
-}
-
 #define HANDLE_RES() { \
   gboolean result; \
   GError *err = NULL; \
@@ -1160,99 +1248,95 @@ remove_first (GQueue *request_queue)
     if (req_data->simple) \
     { \
       g_simple_async_result_set_from_error (req_data->simple, err); \
-      g_simple_async_result_complete (req_data->simple); \
+      g_simple_async_result_complete_in_idle (req_data->simple); \
     } \
 \
-    remove_first (priv->request_queue); \
     g_error_free (err); \
+    free_request_data (req_data); \
 \
-    send_request (afp_conn); \
+    g_mutex_lock (&priv->mutex); \
+    send_request_unlocked (afp_conn); \
+    g_mutex_unlock (&priv->mutex); \
     return; \
   } \
 }
-    
 
 static void
 write_buf_cb (GObject *object, GAsyncResult *res, gpointer user_data)
 {
   GOutputStream *output = G_OUTPUT_STREAM (object);
-  GVfsAfpConnection *afp_conn = G_VFS_AFP_CONNECTION (user_data);
+  RequestData *req_data = user_data;
+  GVfsAfpConnection *afp_conn = req_data->conn;
   GVfsAfpConnectionPrivate *priv = afp_conn->priv;
-
-  RequestData *req_data;
-
-  req_data = g_queue_peek_head (priv->request_queue);
   
   HANDLE_RES ();
 
   g_hash_table_insert (priv->request_hash,
                        GUINT_TO_POINTER ((guint)GUINT16_FROM_BE (priv->write_dsi_header.requestID)),
                        req_data);
-  g_queue_pop_head (priv->request_queue);
 
-  send_request (afp_conn);
+
+  g_mutex_lock (&priv->mutex);
+  send_request_unlocked (afp_conn);
+  g_mutex_unlock (&priv->mutex);
 }
 
 static void
 write_command_cb (GObject *object, GAsyncResult *res, gpointer user_data)
 {
   GOutputStream *output = G_OUTPUT_STREAM (object);
-  GVfsAfpConnection *afp_conn = G_VFS_AFP_CONNECTION (user_data);
+  RequestData *req_data = user_data;
+  GVfsAfpConnection *afp_conn = req_data->conn;
   GVfsAfpConnectionPrivate *priv = afp_conn->priv;
 
-  RequestData *req_data;
-
-  req_data = g_queue_peek_head (priv->request_queue);
-  
   HANDLE_RES ();
 
   if (priv->write_dsi_header.command == DSI_WRITE &&
      req_data->command->buf)
   {
     write_all_async (output, req_data->command->buf, req_data->command->buf_size,
-                     0, NULL, write_buf_cb, afp_conn);
+                     0, NULL, write_buf_cb, req_data);
     return;
   }
   
   g_hash_table_insert (priv->request_hash,
                        GUINT_TO_POINTER ((guint)GUINT16_FROM_BE (priv->write_dsi_header.requestID)),
                        req_data);
-  g_queue_pop_head (priv->request_queue);
 
-  send_request (afp_conn);
+  g_mutex_lock (&priv->mutex);
+  send_request_unlocked (afp_conn);
+  g_mutex_unlock (&priv->mutex);
 }
 
 static void
 write_dsi_header_cb (GObject *object, GAsyncResult *res, gpointer user_data)
 {
   GOutputStream *output = G_OUTPUT_STREAM (object);
-  GVfsAfpConnection *afp_conn = G_VFS_AFP_CONNECTION (user_data);
+  RequestData *req_data = user_data;
+  GVfsAfpConnection *afp_conn = req_data->conn;
   GVfsAfpConnectionPrivate *priv = afp_conn->priv;
   
-  RequestData *req_data;
-
   char *data;
   gsize size;
 
-  req_data = g_queue_peek_head (priv->request_queue);
-  
   HANDLE_RES ();
 
   if (req_data->type == REQUEST_TYPE_TICKLE)
   {
-    remove_first (priv->request_queue);
-    send_request (afp_conn);
+    g_mutex_lock (&priv->mutex);
+    send_request_unlocked (afp_conn);
+    g_mutex_unlock (&priv->mutex);
     return;
   }
 
   data = g_vfs_afp_command_get_data (req_data->command);
   size = g_vfs_afp_command_get_size (req_data->command);
 
-  write_all_async (output, data, size, 0, NULL, write_command_cb, afp_conn);
+  write_all_async (output, data, size, 0, NULL, write_command_cb, req_data);
 }
 
 static void
-send_request (GVfsAfpConnection *afp_connection)
+send_request_unlocked (GVfsAfpConnection *afp_connection)
 {
   GVfsAfpConnectionPrivate *priv = afp_connection->priv;
 
@@ -1260,7 +1344,7 @@ send_request (GVfsAfpConnection *afp_connection)
   guint32 writeOffset;
   guint8 dsi_command;
 
-  while ((req_data = g_queue_peek_head (priv->request_queue)))
+  while ((req_data = g_queue_pop_head (priv->request_queue)))
   {
     if (req_data->cancellable && g_cancellable_is_cancelled (req_data->cancellable))
     {
@@ -1270,9 +1354,8 @@ send_request (GVfsAfpConnection *afp_connection)
 
         g_cancellable_set_error_if_cancelled (req_data->cancellable, &err);
         g_simple_async_result_take_error (req_data->simple, err);
-        g_simple_async_result_complete (req_data->simple);
+        g_simple_async_result_complete_in_idle (req_data->simple);
       }
-      remove_first (priv->request_queue);
     }
     else
       break;
@@ -1335,9 +1418,28 @@ send_request (GVfsAfpConnection *afp_connection)
   }
 
 
-  write_all_async (g_io_stream_get_output_stream (priv->conn),
+  write_all_async (g_io_stream_get_output_stream (priv->stream),
                    &priv->write_dsi_header, sizeof (DSIHeader), 0,
-                   NULL, write_dsi_header_cb, afp_connection);
+                   NULL, write_dsi_header_cb, req_data);
+}
+
+static gboolean
+start_send_loop_func (gpointer data)
+{
+  GVfsAfpConnection *conn = data;
+  GVfsAfpConnectionPrivate *priv = conn->priv;
+
+  g_mutex_lock (&priv->mutex);
+  
+  if (priv->send_loop_running)
+    goto out;
+
+  priv->send_loop_running = TRUE;
+  send_request_unlocked (conn);
+
+out:
+  g_mutex_unlock (&priv->mutex);
+  return G_SOURCE_REMOVE;
 }
 
 void
@@ -1350,7 +1452,15 @@ g_vfs_afp_connection_send_command (GVfsAfpConnection   *afp_connection,
 {
   GVfsAfpConnectionPrivate *priv = afp_connection->priv;
 
+  GError *err = NULL;
   RequestData *req_data;
+
+  if (!check_open (afp_connection, &err))
+  {
+    g_simple_async_report_take_gerror_in_idle (G_OBJECT(afp_connection), callback,
+                                               user_data, err);
+    return;
+  }
   
   req_data = g_slice_new0 (RequestData);
   req_data->type = REQUEST_TYPE_COMMAND;
@@ -1360,12 +1470,23 @@ g_vfs_afp_connection_send_command (GVfsAfpConnection   *afp_connection,
   req_data->simple = g_simple_async_result_new (G_OBJECT (afp_connection), callback,
                                                 user_data,
                                                 g_vfs_afp_connection_send_command);
+  req_data->conn = afp_connection;
+  
   if (cancellable)
     req_data->cancellable = g_object_ref (cancellable);
 
+  /* Take lock */
+  g_mutex_lock (&priv->mutex);
+  
   g_queue_push_tail (priv->request_queue, req_data);
+  if (!priv->send_loop_running)
+  {
+    g_main_context_invoke (priv->worker_context, start_send_loop_func,
+                           afp_connection);
+  }
 
-  run_loop (afp_connection);
+  /* Release lock */
+  g_mutex_unlock (&priv->mutex);
 }
 
 GVfsAfpReply *
@@ -1443,25 +1564,6 @@ read_reply_sync (GInputStream      *input,
   return TRUE;
 }
 
-GVfsAfpReply *
-g_vfs_afp_connection_read_reply_sync (GVfsAfpConnection *afp_connection,
-                                      GCancellable *cancellable,
-                                      GError **error)
-{
-  GVfsAfpConnectionPrivate *priv = afp_connection->priv;
-  
-  gboolean res;
-  char *data;
-  DSIHeader dsi_header;
-
-  res = read_reply_sync (g_io_stream_get_input_stream (priv->conn), &dsi_header,
-                         &data, cancellable, error);
-  if (!res)
-    return NULL;
-
-  return g_vfs_afp_reply_new (dsi_header.errorCode, data, dsi_header.totalDataLength, TRUE);
-}
-
 static gboolean
 send_request_sync (GOutputStream     *output,
                    DsiCommand        command,
@@ -1501,79 +1603,162 @@ send_request_sync (GOutputStream     *output,
   return TRUE;
 }
 
-gboolean
+static void
+send_command_sync_cb (GObject *source_object, GAsyncResult *res, gpointer user_data)
+{
+  SyncData *sync_data = user_data;
+
+  sync_data->data = g_object_ref (res);
+  sync_data_signal (sync_data);
+}
+
+GVfsAfpReply *
 g_vfs_afp_connection_send_command_sync (GVfsAfpConnection *afp_connection,
-                                        GVfsAfpCommand    *afp_command,
+                                        GVfsAfpCommand    *command,
                                         GCancellable      *cancellable,
                                         GError            **error)
 {
-  GVfsAfpConnectionPrivate *priv = afp_connection->priv;
+  SyncData sync_data;
+  GVfsAfpReply *reply;
   
-  DsiCommand dsi_command;
-  guint16 req_id;
-  guint32 writeOffset;
+  if (!check_open (afp_connection, error))
+    return FALSE;
 
-  /* set dsi_command */
-  switch (afp_command->type)
-  {
-    case AFP_COMMAND_WRITE:
-      writeOffset = 8;
-      dsi_command = DSI_WRITE;
-      break;
-    case AFP_COMMAND_WRITE_EXT:
-      writeOffset = 20;
-      dsi_command = DSI_WRITE;
-      break;
+  sync_data_init (&sync_data, afp_connection, NULL);
 
-    default:
-      writeOffset = 0;
-      dsi_command = DSI_COMMAND;
-      break;
-  }
+  g_vfs_afp_connection_send_command (afp_connection, command, NULL,
+                                     send_command_sync_cb, cancellable, &sync_data);
 
-  req_id = get_request_id (afp_connection);
-  return send_request_sync (g_io_stream_get_output_stream (priv->conn),
-                            dsi_command, req_id, writeOffset,
-                            g_vfs_afp_command_get_size (afp_command),
-                            g_vfs_afp_command_get_data (afp_command),
-                            cancellable, error);
+  sync_data_wait (&sync_data);
+
+  reply = g_vfs_afp_connection_send_command_finish (afp_connection, sync_data.data,
+                                                    error);
+  g_object_unref (sync_data.data);
+  sync_data_clear (&sync_data);
+  
+  return reply;
 }
 
-gboolean
-g_vfs_afp_connection_close_sync (GVfsAfpConnection *afp_connection,
-                                 GCancellable      *cancellable,
-                                 GError            **error)
+static void
+close_connection (GVfsAfpConnection *conn)
 {
-  GVfsAfpConnectionPrivate *priv = afp_connection->priv;
+  GVfsAfpConnectionPrivate *priv = conn->priv;
   
   guint16 req_id;
   gboolean res;
+  GError *err = NULL;
+
+  GQueue *request_queue;
+  GSList *pending_closes, *siter;
+  GHashTable *request_hash;
+  GHashTableIter iter;
+  RequestData *req_data;
+
+  /* Take lock */
+  g_mutex_lock (&priv->mutex);
+
+  /* Set closed flag */
+  g_atomic_int_or (&priv->atomic_flags, FLAG_CLOSED);
+
+  request_queue = priv->request_queue;
+  priv->request_queue = NULL;
+
+  request_hash = priv->request_hash;
+  priv->request_hash = NULL;
+  
+  pending_closes = priv->pending_closes;
+  priv->pending_closes = NULL;
+
+  /* Release lock */
+  g_mutex_unlock (&priv->mutex);
   
   /* close DSI session */
-  req_id = get_request_id (afp_connection);
-  res = send_request_sync (g_io_stream_get_output_stream (priv->conn),
+  req_id = get_request_id (conn);
+  res = send_request_sync (g_io_stream_get_output_stream (priv->stream),
                            DSI_CLOSE_SESSION, req_id, 0, 0, NULL,
-                           cancellable, error);
+                           NULL, &err);
   if (!res)
+    g_io_stream_close (priv->stream, NULL, NULL);
+  else
+    res = g_io_stream_close (priv->stream, NULL, &err);
+  
+  g_clear_object (&priv->stream);
+
+#define REQUEST_DATA_CLOSED(request_data) { \
+  g_simple_async_result_set_from_error (req_data->simple, \
+  g_error_new_literal (G_IO_ERROR, G_IO_ERROR_CLOSED, "Connection was closed")); \
+  \
+  g_simple_async_result_complete_in_idle (req_data->simple); \
+  free_request_data (req_data); \
+}
+
+  while ((req_data = g_queue_pop_head (request_queue)))
   {
-    g_io_stream_close (priv->conn, cancellable, NULL);
-    g_object_unref (priv->conn);
-    return FALSE;
+    REQUEST_DATA_CLOSED (req_data);
   }
 
-  res = g_io_stream_close (priv->conn, cancellable, error);
-  g_object_unref (priv->conn);
+  g_hash_table_iter_init (&iter, request_hash);
+  while (g_hash_table_iter_next (&iter, NULL, (void **)&req_data))
+  {
+    REQUEST_DATA_CLOSED (req_data);
+  }
   
-  return res;
+#undef REQUEST_DATA_CLOSED
+
+  for (siter = pending_closes; siter != NULL; siter = siter->next)
+  {
+    SyncData *close_data = siter->data;
+
+    close_data->res = TRUE;
+    sync_data_signal (close_data);
+  }
+  g_slist_free (pending_closes);
+
+  /* quit main_loop */
+  g_main_loop_quit (priv->worker_loop);
+  g_main_loop_unref (priv->worker_loop);
+  g_main_context_unref (priv->worker_context);
 }
 
 gboolean
-g_vfs_afp_connection_open_sync (GVfsAfpConnection *afp_connection,
-                                GCancellable      *cancellable,
-                                GError            **error)
+g_vfs_afp_connection_close_sync (GVfsAfpConnection *afp_connection,
+                                 GCancellable      *cancellable,
+                                 GError            **error)
 {
   GVfsAfpConnectionPrivate *priv = afp_connection->priv;
 
+  SyncData close_data;
+
+  /* Take lock */
+  g_mutex_lock (&priv->mutex);
+
+  if (!check_open (afp_connection, error)) {
+    g_mutex_unlock (&priv->mutex);
+    return FALSE;
+  }
+
+  sync_data_init (&close_data, afp_connection, error);
+  priv->pending_closes = g_slist_prepend (priv->pending_closes, &close_data);
+
+  /* Release lock */
+  g_mutex_unlock (&priv->mutex);
+
+  g_atomic_int_or (&priv->atomic_flags, FLAG_PENDING_CLOSE);
+  g_cancellable_cancel (priv->read_cancellable);
+  
+  sync_data_wait (&close_data);
+
+  
+  return close_data.res;
+}
+
+static gpointer
+open_thread_func (gpointer user_data)
+{
+  SyncData *data = user_data;
+  GVfsAfpConnection *conn = data->conn;
+  GVfsAfpConnectionPrivate *priv = conn->priv;
+
   GSocketClient *client;
 
   guint16 req_id;
@@ -1583,23 +1768,24 @@ g_vfs_afp_connection_open_sync (GVfsAfpConnection *afp_connection,
   guint pos;
 
   client = g_socket_client_new ();
-  priv->conn = G_IO_STREAM (g_socket_client_connect (client, priv->addr, cancellable, error));
+  priv->stream = G_IO_STREAM (g_socket_client_connect (client, priv->addr, data->cancellable,
+                                                       data->error));
   g_object_unref (client);
 
-  if (!priv->conn)
-    return FALSE;
+  if (!priv->stream)
+    goto out;
 
-  req_id = get_request_id (afp_connection);
-  res = send_request_sync (g_io_stream_get_output_stream (priv->conn),
+  req_id = get_request_id (conn);
+  res = send_request_sync (g_io_stream_get_output_stream (priv->stream),
                            DSI_OPEN_SESSION, req_id, 0,  0, NULL,
-                           cancellable, error);
+                           data->cancellable, data->error);
   if (!res)
-    return FALSE;
+    goto out;
 
-  res = read_reply_sync (g_io_stream_get_input_stream (priv->conn),
-                         &dsi_header, &reply, cancellable, error);
+  res = read_reply_sync (g_io_stream_get_input_stream (priv->stream),
+                         &dsi_header, &reply, data->cancellable, data->error);
   if (!res)
-    return FALSE;
+    goto out;
 
   pos = 0;
   while ((dsi_header.totalDataLength - pos) > 2)
@@ -1634,7 +1820,57 @@ g_vfs_afp_connection_open_sync (GVfsAfpConnection *afp_connection,
   }
   g_free (reply);
 
-  return TRUE;
+out:
+  if (res)
+    g_atomic_int_or (&priv->atomic_flags, FLAG_INITIALIZED);
+  
+  /* Signal sync call thread */
+  data->res = res;
+  sync_data_signal (data);
+
+  /* Return from thread on failure */
+  if (!res)
+  {
+    g_clear_object (&priv->stream);
+    return NULL;
+  }
+  
+  /* Create MainLoop */
+  priv->worker_context = g_main_context_new ();
+  priv->worker_loop = g_main_loop_new (priv->worker_context, TRUE);
+
+  read_reply (conn);
+  
+  /* Run mainloop */
+  g_main_loop_run (priv->worker_loop);
+
+  return NULL;
+}
+
+gboolean
+g_vfs_afp_connection_open_sync (GVfsAfpConnection *afp_connection,
+                                GCancellable      *cancellable,
+                                GError            **error)
+{
+  GVfsAfpConnectionPrivate *priv = afp_connection->priv;
+
+  SyncData data;
+
+  sync_data_init (&data, afp_connection, error);
+  data.cancellable = cancellable;
+  
+  priv->worker_thread = g_thread_create (open_thread_func, &data,
+                                         FALSE, error);
+  if (!priv->worker_thread)
+    goto out;
+
+  sync_data_wait (&data);
+
+out:
+  
+  sync_data_clear (&data);
+
+  return data.res;
 }
 
 GVfsAfpConnection *
diff --git a/daemon/gvfsafpconnection.h b/daemon/gvfsafpconnection.h
index 57db862..924a47c 100644
--- a/daemon/gvfsafpconnection.h
+++ b/daemon/gvfsafpconnection.h
@@ -360,15 +360,11 @@ gboolean           g_vfs_afp_connection_close_sync        (GVfsAfpConnection *af
                                                            GCancellable      *cancellable,
                                                            GError            **error);
 
-gboolean           g_vfs_afp_connection_send_command_sync (GVfsAfpConnection *afp_connection,
+GVfsAfpReply*      g_vfs_afp_connection_send_command_sync (GVfsAfpConnection *afp_connection,
                                                            GVfsAfpCommand    *afp_command,
                                                            GCancellable      *cancellable,
                                                            GError            **error);
 
-GVfsAfpReply*      g_vfs_afp_connection_read_reply_sync   (GVfsAfpConnection *afp_connection,
-                                                           GCancellable *cancellable,
-                                                           GError **error);
-
 GVfsAfpReply*      g_vfs_afp_connection_send_command_finish (GVfsAfpConnection *afp_connnection,
                                                              GAsyncResult      *res,
                                                              GError           **error);
diff --git a/daemon/gvfsafpserver.c b/daemon/gvfsafpserver.c
index e405917..9acc2b1 100644
--- a/daemon/gvfsafpserver.c
+++ b/daemon/gvfsafpserver.c
@@ -48,7 +48,6 @@ g_vfs_afp_server_new (GNetworkAddress *addr)
   afp_serv = g_object_new (G_VFS_TYPE_AFP_SERVER, NULL);
 
   afp_serv->addr = addr;
-  afp_serv->conn = g_vfs_afp_connection_new (G_SOCKET_CONNECTABLE (addr));
   
   return afp_serv;
 }
@@ -180,13 +179,9 @@ dhx2_login (GVfsAfpServer *afp_serv,
   g_vfs_afp_command_put_pascal (comm, username);
   g_vfs_afp_command_pad_to_even (comm);
 
-  res = g_vfs_afp_connection_send_command_sync (afp_serv->conn, comm,
-                                                cancellable, error);
+  reply = g_vfs_afp_connection_send_command_sync (afp_serv->conn, comm,
+                                                  cancellable, error);
   g_object_unref (comm);
-  if (!res)
-    goto error;
-
-  reply = g_vfs_afp_connection_read_reply_sync (afp_serv->conn, cancellable, error);
   if (!reply)
     goto error;
 
@@ -288,17 +283,11 @@ dhx2_login (GVfsAfpServer *afp_serv,
   /* clientNonce */
   g_output_stream_write_all (G_OUTPUT_STREAM (comm), clientNonce_buf, 16, NULL, NULL, NULL);
 
-  res = g_vfs_afp_connection_send_command_sync (afp_serv->conn, comm,
+  reply = g_vfs_afp_connection_send_command_sync (afp_serv->conn, comm,
                                                 cancellable, error);
   g_object_unref (comm);
-  if (!res)
-    goto error;
-
-  
-  reply = g_vfs_afp_connection_read_reply_sync (afp_serv->conn, cancellable, error);
   if (!reply)
     goto error;
-
   
   res_code = g_vfs_afp_reply_get_result_code (reply);
   if (res_code != AFP_RESULT_AUTH_CONTINUE)
@@ -359,13 +348,9 @@ dhx2_login (GVfsAfpServer *afp_serv,
                              G_N_ELEMENTS (answer_buf), NULL, NULL, NULL);
 
 
-  res = g_vfs_afp_connection_send_command_sync (afp_serv->conn, comm,
-                                                cancellable, error);
+  reply = g_vfs_afp_connection_send_command_sync (afp_serv->conn, comm,
+                                                  cancellable, error);
   g_object_unref (comm);
-  if (!res)
-    goto error;
-  
-  reply = g_vfs_afp_connection_read_reply_sync (afp_serv->conn, cancellable, error);
   if (!reply)
     goto error;
 
@@ -493,15 +478,11 @@ dhx_login (GVfsAfpServer *afp_serv,
   g_output_stream_write_all (G_OUTPUT_STREAM(comm), ma_buf, G_N_ELEMENTS (ma_buf),
                              NULL, NULL, NULL);
 
-  res = g_vfs_afp_connection_send_command_sync (afp_serv->conn, comm,
-                                                cancellable, error);
+  reply = g_vfs_afp_connection_send_command_sync (afp_serv->conn, comm,
+                                                  cancellable, error);
   g_object_unref (comm);
-  if (!res)
-    goto done;
-
-  reply = g_vfs_afp_connection_read_reply_sync (afp_serv->conn, cancellable, error);
   if (!reply)
-    goto error;
+    goto error;  
 
   res_code = g_vfs_afp_reply_get_result_code (reply);
   if (res_code != AFP_RESULT_AUTH_CONTINUE)
@@ -587,13 +568,9 @@ dhx_login (GVfsAfpServer *afp_serv,
                              G_N_ELEMENTS (answer_buf), NULL, NULL, NULL);
 
 
-  res = g_vfs_afp_connection_send_command_sync (afp_serv->conn, comm,
-                                                cancellable, error);
+  reply = g_vfs_afp_connection_send_command_sync (afp_serv->conn, comm,
+                                                  cancellable, error);
   g_object_unref (comm);
-  if (!res)
-    goto done;
-
-  reply = g_vfs_afp_connection_read_reply_sync (afp_serv->conn, cancellable, error);
   if (!reply)
     goto error;
 
@@ -643,7 +620,6 @@ do_login (GVfsAfpServer *afp_serv,
   if (anonymous)
   {
     GVfsAfpCommand *comm;
-    gboolean res;
     GVfsAfpReply *reply;
     AfpResultCode res_code;
     
@@ -659,13 +635,9 @@ do_login (GVfsAfpServer *afp_serv,
 
     g_vfs_afp_command_put_pascal (comm, afp_version_to_string (afp_serv->version));
     g_vfs_afp_command_put_pascal (comm, AFP_UAM_NO_USER);
-    res = g_vfs_afp_connection_send_command_sync (afp_serv->conn, comm,
-                                                  cancellable, error);
+    reply = g_vfs_afp_connection_send_command_sync (afp_serv->conn, comm,
+                                                    cancellable, error);
     g_object_unref (comm);
-    if (!res)
-      return FALSE;
-
-    reply = g_vfs_afp_connection_read_reply_sync (afp_serv->conn, cancellable, error);
     if (!reply)
       return FALSE;
 
@@ -805,7 +777,6 @@ get_server_parms (GVfsAfpServer *server,
 {
   GVfsAfpCommand *comm;
   GVfsAfpReply   *reply;
-  gboolean        res;
   AfpResultCode   res_code;
   gint32          server_time;
   
@@ -814,13 +785,9 @@ get_server_parms (GVfsAfpServer *server,
   /* pad byte */
   g_vfs_afp_command_put_byte (comm, 0);
 
-  res = g_vfs_afp_connection_send_command_sync (server->conn, comm, cancellable,
-                                                error);
+  reply = g_vfs_afp_connection_send_command_sync (server->conn, comm, cancellable,
+                                                  error);
   g_object_unref (comm);
-  if (!res)
-    return FALSE;
-
-  reply = g_vfs_afp_connection_read_reply_sync (server->conn, cancellable, error);
   if (!reply)
     return FALSE;
 
@@ -849,7 +816,6 @@ get_userinfo (GVfsAfpServer *server,
 {
   GVfsAfpCommand *comm;
   guint16 bitmap;
-  gboolean res;
 
   GVfsAfpReply *reply;
   AfpResultCode res_code;
@@ -863,14 +829,9 @@ get_userinfo (GVfsAfpServer *server,
   bitmap = AFP_GET_USER_INFO_BITMAP_GET_UID_BIT | AFP_GET_USER_INFO_BITMAP_GET_GID_BIT;
   g_vfs_afp_command_put_uint16 (comm, bitmap);
 
-  res = g_vfs_afp_connection_send_command_sync (server->conn,
+  reply = g_vfs_afp_connection_send_command_sync (server->conn,
                                                 comm, cancellable, error);
   g_object_unref (comm);
-  if (!res)
-    return FALSE;
-
-  reply = g_vfs_afp_connection_read_reply_sync (server->conn,
-                                                cancellable, error);
   if (!reply)
     return FALSE;
 
@@ -1038,6 +999,7 @@ g_vfs_afp_server_login (GVfsAfpServer *server,
 try_login:
 
     /* Open connection */
+    server->conn = g_vfs_afp_connection_new (G_SOCKET_CONNECTABLE (server->addr));
     res = g_vfs_afp_connection_open_sync (server->conn, cancellable, &err);
     if (!res)
       break;
@@ -1047,6 +1009,7 @@ try_login:
     if (!res)
     {
       g_vfs_afp_connection_close_sync (server->conn, cancellable, NULL);
+      g_clear_object (&server->conn);
 
       if (!g_error_matches (err, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED))
         break;
diff --git a/daemon/gvfsafpvolume.c b/daemon/gvfsafpvolume.c
index 589b21d..e257b01 100644
--- a/daemon/gvfsafpvolume.c
+++ b/daemon/gvfsafpvolume.c
@@ -112,13 +112,9 @@ g_vfs_afp_volume_mount_sync (GVfsAfpVolume *volume,
 
   /* TODO: password? */
 
-  res = g_vfs_afp_connection_send_command_sync (priv->server->conn, comm, cancellable,
-                                                error);
+  reply = g_vfs_afp_connection_send_command_sync (priv->server->conn, comm, cancellable,
+                                                  error);
   g_object_unref (comm);
-  if (!res)
-    return FALSE;
-
-  reply = g_vfs_afp_connection_read_reply_sync (priv->server->conn, cancellable, error);
   if (!reply)
     return FALSE;
 



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]