gvfs r1221 - in trunk: . client daemon



Author: alexl
Date: Mon Feb  4 09:51:48 2008
New Revision: 1221
URL: http://svn.gnome.org/viewvc/gvfs?rev=1221&view=rev

Log:
2008-02-04  Alexander Larsson  <alexl redhat com>

        * client/gdaemonfileinputstream.c:
        * client/gdaemonfileoutputstream.c:
	Init seq_nr to 1 so that seq_nr 0 is special
	(used for e.g. readahead ops)
	
        * daemon/gvfschannel.[ch]:
        * daemon/gvfsreadchannel.c:
	Implement readahead.



Modified:
   trunk/ChangeLog
   trunk/client/gdaemonfileinputstream.c
   trunk/client/gdaemonfileoutputstream.c
   trunk/daemon/gvfschannel.c
   trunk/daemon/gvfschannel.h
   trunk/daemon/gvfsreadchannel.c

Modified: trunk/client/gdaemonfileinputstream.c
==============================================================================
--- trunk/client/gdaemonfileinputstream.c	(original)
+++ trunk/client/gdaemonfileinputstream.c	Mon Feb  4 09:51:48 2008
@@ -294,6 +294,7 @@
 {
   info->output_buffer = g_string_new ("");
   info->input_buffer = g_string_new ("");
+  info->seq_nr = 1;
 }
 
 GFileInputStream *

Modified: trunk/client/gdaemonfileoutputstream.c
==============================================================================
--- trunk/client/gdaemonfileoutputstream.c	(original)
+++ trunk/client/gdaemonfileoutputstream.c	Mon Feb  4 09:51:48 2008
@@ -242,6 +242,7 @@
 {
   info->output_buffer = g_string_new ("");
   info->input_buffer = g_string_new ("");
+  info->seq_nr = 1;
 }
 
 GFileOutputStream *

Modified: trunk/daemon/gvfschannel.c
==============================================================================
--- trunk/daemon/gvfschannel.c	(original)
+++ trunk/daemon/gvfschannel.c	Mon Feb  4 09:51:48 2008
@@ -67,6 +67,17 @@
   gsize data_pos;
 } RequestReader;
 
+typedef struct {
+  guint32 command;
+  guint32 arg1;
+  guint32 arg2;
+  guint32 seq_nr;
+
+  gpointer data;
+  gsize data_len;
+  gboolean cancelled;
+} Request;
+
 struct _GVfsChannelPrivate
 {
   GVfsBackend *backend;
@@ -79,6 +90,8 @@
   GVfsJob *current_job;
   guint32 current_job_seq_nr;
 
+  GList *queued_requests;
+  
   RequestReader *request_reader;
   
   char reply_buffer[G_VFS_DAEMON_SOCKET_PROTOCOL_REPLY_SIZE];
@@ -254,61 +267,121 @@
   g_free (reader);
 }
 
-/* Ownership of data is passed here to avoid copying it */
-static void
-got_request (GVfsChannel *channel,
-	     GVfsDaemonSocketProtocolRequest *request,
-	     gpointer data, gsize data_len)
+static gboolean
+start_queued_request (GVfsChannel *channel)
 {
   GVfsChannelClass *class;
+  Request *req;
   GVfsJob *job;
   GError *error;
-  guint32 seq_nr, command, arg1, arg2;
-  
-  command = g_ntohl (request->command);
-  arg1 = g_ntohl (request->arg1);
-  arg2 = g_ntohl (request->arg2);
-  seq_nr = g_ntohl (request->seq_nr);
+  gboolean started_job;
 
-  job = NULL;
+  started_job = FALSE;
   
-  if (channel->priv->current_job != NULL)
+  class = G_VFS_CHANNEL_GET_CLASS (channel);
+  
+  while (channel->priv->current_job == NULL &&
+	 channel->priv->queued_requests != NULL)
     {
-      if (command != G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_CANCEL)
+      req = channel->priv->queued_requests->data;
+
+      channel->priv->queued_requests =
+	g_list_delete_link (channel->priv->queued_requests,
+			    channel->priv->queued_requests);
+      
+      error = NULL;
+      job = NULL;
+      if (req->cancelled)
 	{
-	  g_warning ("Ignored non-cancel request with outstanding request");
-	  /* Can't send an error reply now, that would confuse the reply
-	     to the outstanding request */
+	  error =
+	    g_error_new_literal (G_IO_ERROR, G_IO_ERROR_CANCELLED,
+				 _("Operation was cancelled"));
+	  g_free (req->data); /* Did no pass ownership */
+	}
+      else
+	{
+	  /* This passes on ownership of req->data */
+	  job = class->handle_request (channel,
+				       req->command, req->seq_nr,
+				       req->arg1, req->arg2,
+				       req->data, req->data_len, 
+				       &error);
 	}
-      else if (arg1 == channel->priv->current_job_seq_nr)
-	g_vfs_job_cancel (channel->priv->current_job);
       
-      g_free (data);
-      return;
-    }
-  /* Ignore cancel with no outstanding job */
-  else if (command != G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_CANCEL)
-    {
-      class = G_VFS_CHANNEL_GET_CLASS (channel);
-
-      error = NULL;
-      job = class->handle_request (channel,
-				   command, seq_nr,
-				   arg1, arg2,
-				   data, data_len, 
-				   &error);
       if (job)
 	{
 	  channel->priv->current_job = job;
-	  channel->priv->current_job_seq_nr = seq_nr;
+	  channel->priv->current_job_seq_nr = req->seq_nr;
 	  g_vfs_job_source_new_job (G_VFS_JOB_SOURCE (channel), channel->priv->current_job);
+	  started_job = TRUE;
 	}
       else
 	{
 	  g_vfs_channel_send_error (channel, error);
 	  g_error_free (error);
 	}
+      
+      g_free (req);
+    }
+
+  return started_job;
+}
+
+
+/* Ownership of data is passed here to avoid copying it */
+static void
+got_request (GVfsChannel *channel,
+	     GVfsDaemonSocketProtocolRequest *request,
+	     gpointer data, gsize data_len)
+{
+  Request *req;
+  guint32 command, arg1;
+  GList *l;
+
+  command = g_ntohl (request->command);
+  arg1 = g_ntohl (request->arg1);
+
+  if (command == G_VFS_DAEMON_SOCKET_PROTOCOL_REQUEST_CANCEL)
+    {
+      if (arg1 == channel->priv->current_job_seq_nr)
+	g_vfs_job_cancel (channel->priv->current_job);
+      else
+	{
+	  for (l = channel->priv->queued_requests; l != NULL; l = l->next)
+	    {
+	      req = l->data;
+
+	      if (req->seq_nr == 0)
+		/* We're cancelling something later but this readahead might
+		   be the actual operation thats replacing it */
+		req->cancelled = TRUE;
+		  
+	      if (req->seq_nr == arg1)
+		{
+		  req->cancelled = TRUE;
+		  break;
+		}
+	    }
+	}
+
+      /* Cancel ops get no return */
+      g_free (data);
+      return;
     }
+  
+  req = g_new0 (Request, 1);
+  req->command = command;
+  req->arg1 = arg1;
+  req->arg2 = g_ntohl (request->arg2);
+  req->seq_nr = g_ntohl (request->seq_nr);
+  req->data_len = data_len;
+  req->data = data;
+
+  channel->priv->queued_requests =
+    g_list_append (channel->priv->queued_requests,
+		   req);
+  
+  start_queued_request (channel);
 }
 
 static void command_read_cb (GObject *source_object,
@@ -515,6 +588,8 @@
   channel->priv->current_job = NULL;
   g_vfs_job_emit_finished (job);
 
+  class = G_VFS_CHANNEL_GET_CLASS (channel);
+  
   if (G_VFS_IS_JOB_CLOSE_READ (job) ||
       G_VFS_IS_JOB_CLOSE_WRITE (job))
     {
@@ -523,15 +598,23 @@
     }
   else if (channel->priv->connection_closed)
     {
-      class = G_VFS_CHANNEL_GET_CLASS (channel);
 
       channel->priv->current_job = class->close (channel);
       channel->priv->current_job_seq_nr = 0;
       g_vfs_job_source_new_job (G_VFS_JOB_SOURCE (channel), channel->priv->current_job);
     }
+  /* Start queued request or readahead */
+  else if (!start_queued_request (channel) &&
+	   class->readahead)
+    {
+      /* No queued requests, maybe we want to do a readahead call */
+      channel->priv->current_job = class->readahead (channel, job);
+      channel->priv->current_job_seq_nr = 0;
+      if (channel->priv->current_job)
+	g_vfs_job_source_new_job (G_VFS_JOB_SOURCE (channel), channel->priv->current_job);
+    }
 
   g_object_unref (job);
-  g_print ("Sent reply\n");
 }
 
 /* Might be called on an i/o thread */

Modified: trunk/daemon/gvfschannel.h
==============================================================================
--- trunk/daemon/gvfschannel.h	(original)
+++ trunk/daemon/gvfschannel.h	Mon Feb  4 09:51:48 2008
@@ -61,6 +61,8 @@
 			      gpointer data,
 			      gsize data_len,
 			      GError **error);
+  GVfsJob *(*readahead)      (GVfsChannel *channel,
+			      GVfsJob *job);
 };
 
 GType g_vfs_channel_get_type (void) G_GNUC_CONST;

Modified: trunk/daemon/gvfsreadchannel.c
==============================================================================
--- trunk/daemon/gvfsreadchannel.c	(original)
+++ trunk/daemon/gvfsreadchannel.c	Mon Feb  4 09:51:48 2008
@@ -59,6 +59,8 @@
 					     gpointer      data,
 					     gsize         data_len,
 					     GError      **error);
+static GVfsJob *read_channel_readahead      (GVfsChannel  *channel,
+					     GVfsJob       *job);
   
 static void
 g_vfs_read_channel_finalize (GObject *object)
@@ -80,6 +82,7 @@
   gobject_class->finalize = g_vfs_read_channel_finalize;
   channel_class->close = read_channel_close;
   channel_class->handle_request = read_channel_handle_request;
+  channel_class->readahead = read_channel_readahead;
 }
 
 static void
@@ -186,6 +189,35 @@
   return job;
 }
 
+static GVfsJob *
+read_channel_readahead (GVfsChannel  *channel,
+			GVfsJob       *job)
+{
+  GVfsJob *readahead_job;
+  GVfsReadChannel *read_channel;
+  GVfsJobRead *read_job;
+
+  readahead_job = NULL;
+  if (!job->failed &&
+      G_VFS_IS_JOB_READ (job))
+    {
+      read_job = G_VFS_JOB_READ (job);
+      read_channel = G_VFS_READ_CHANNEL (channel);
+
+      if (read_job->data_count != 0)
+	{
+	  read_channel->read_count++;
+	  readahead_job = g_vfs_job_read_new (read_channel,
+					      g_vfs_channel_get_backend_handle (channel),
+					      modify_read_size (read_channel, 8192),
+					      g_vfs_channel_get_backend (channel));
+	}
+    }
+  
+  return readahead_job;
+}
+
+
 /* Might be called on an i/o thread
  */
 void



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]