[gnio] Implement condition check/wait and GSource for win32



commit 75cf6b45f3cdcedd4470b602c99a2145710bb67c
Author: Alexander Larsson <alexl redhat com>
Date:   Tue May 5 13:24:18 2009 +0200

    Implement condition check/wait and GSource for win32
---
 gio/gsocket.c |  534 ++++++++++++++++++++++++++++++++++++++++++++++++++++-----
 1 files changed, 494 insertions(+), 40 deletions(-)

diff --git a/gio/gsocket.c b/gio/gsocket.c
index 39f06bc..5150f29 100644
--- a/gio/gsocket.c
+++ b/gio/gsocket.c
@@ -70,14 +70,22 @@ struct _GSocketPrivate
   GSocketType     type;
   char           *protocol;
   gint            fd;
-  gboolean        blocking;
   gint            listen_backlog;
-  gboolean        reuse_address;
-  gboolean        keepalive;
   GError         *construct_error;
   GSocketAddress *local_address;
   GSocketAddress *remote_address;
-  gboolean        closed;
+  guint           blocking : 1;
+  guint           reuse_address : 1;
+  guint           keepalive : 1;
+  guint           closed : 1;
+  guint           blocking_mode_unknown : 1;
+#ifdef G_OS_WIN32
+  WSAEVENT        event;
+  int             current_events;
+  int             current_errors;
+  int             selected_events;
+  GList          *requested_conditions; /* list of requested GIOCondition * */
+#endif
 };
 
 static int
@@ -145,6 +153,18 @@ socket_strerror (int err)
 #endif
 }
 
+#ifdef G_OS_WIN32
+#define win32_unset_event_mask(_socket, _mask) _win32_unset_event_mask (_socket, _mask)
+static void
+_win32_unset_event_mask (GSocket *socket, int mask)
+{
+  socket->priv->current_events &= ~mask;
+  socket->priv->current_errors &= ~mask;
+}
+#else
+#define win32_unset_event_mask(_socket, _mask)
+#endif
+
 static gboolean
 check_socket (GSocket *socket,
 	      GError **error)
@@ -272,6 +292,7 @@ g_socket_details_from_fd (GSocket *socket)
 #else
   /* There doesn't seem to be a way to get this on win32... */
   socket->priv->blocking = FALSE;
+  socket->priv->blocking_mode_unknown = TRUE;
 #endif
 
   optlen = sizeof bool_val;
@@ -505,6 +526,10 @@ g_socket_finalize (GObject *object)
       !socket->priv->closed)
     g_socket_close (socket, NULL);
 
+#ifdef G_OS_WIN32
+  g_assert (socket->priv->requested_conditions == NULL);
+#endif
+
   if (G_OBJECT_CLASS (g_socket_parent_class)->finalize)
     (*G_OBJECT_CLASS (g_socket_parent_class)->finalize) (object);
 }
@@ -636,6 +661,9 @@ g_socket_init (GSocket *socket)
   socket->priv->construct_error = NULL;
   socket->priv->remote_address = NULL;
   socket->priv->local_address = NULL;
+#ifdef G_OS_WIN32
+  socket->priv->event = WSA_INVALID_EVENT;
+#endif
 }
 
 GSocket *
@@ -666,7 +694,8 @@ g_socket_set_blocking (GSocket  *socket,
 
   blocking = !!blocking;
 
-  if (socket->priv->blocking == blocking)
+  if (socket->priv->blocking == blocking &&
+      !socket->priv->blocking_mode_unknown)
     return;
 
 #ifndef G_OS_WIN32
@@ -917,6 +946,7 @@ GSocket *
 g_socket_accept (GSocket       *socket,
                  GError       **error)
 {
+  GSocket *new_socket;
   gint ret;
 
   g_return_val_if_fail (G_IS_SOCKET (socket), NULL);
@@ -933,6 +963,8 @@ g_socket_accept (GSocket       *socket,
 	  if (errsv == EINTR)
 	    continue;
 
+	  win32_unset_event_mask (socket, FD_ACCEPT);
+
 	  g_set_error (error, G_IO_ERROR,
 		       socket_io_error_from_errno (errsv),
 		       "error accepting connection: %s", socket_strerror (errsv));
@@ -941,7 +973,30 @@ g_socket_accept (GSocket       *socket,
       break;
     }
 
-  return g_socket_new_from_fd (ret);
+  win32_unset_event_mask (socket, FD_ACCEPT);
+
+  new_socket = g_socket_new_from_fd (ret);
+
+#ifdef G_OS_WIN32
+  {
+    gulong arg;
+
+    /* The socket inherits the accepting sockets event mask and even object,
+       we need to remove that */
+    WSAEventSelect (ret, NULL, 0);
+
+    /* It also inherits the blocking mode, but on unix newly accepted
+       sockets are blocking, disable blocking to get the same behaviour
+       as on unix. */
+    arg = FALSE;
+    if (ioctlsocket (new_socket->priv->fd, FIONBIO, &arg) == SOCKET_ERROR)
+      g_warning ("Unable to set newly allocated socket to blocking mode");
+    new_socket->priv->blocking = TRUE;
+    new_socket->priv->blocking_mode_unknown = FALSE;
+  }
+#endif
+
+  return new_socket;
 }
 
 gboolean
@@ -971,20 +1026,22 @@ g_socket_connect (GSocket         *socket,
 #ifndef G_OS_WIN32
 	  if (errsv == EINPROGRESS)
 #else
-	    if (errsv == WSAEINPROGRESS)
+	  if (errsv == WSAEINPROGRESS)
 #endif
-	      g_set_error (error, G_IO_ERROR, G_IO_ERROR_PENDING,
-			   "connection in progress");
-	    else
-	      g_set_error (error, G_IO_ERROR,
-			   socket_io_error_from_errno (errsv),
-			   "error connecting: %s", socket_strerror (errsv));
+	    g_set_error (error, G_IO_ERROR, G_IO_ERROR_PENDING,
+			 "connection in progress");
+	  else
+	    g_set_error (error, G_IO_ERROR,
+			 socket_io_error_from_errno (errsv),
+			 "error connecting: %s", socket_strerror (errsv));
 
 	  return FALSE;
 	}
       break;
     }
 
+  win32_unset_event_mask (socket, FD_CONNECT);
+
   socket->priv->remote_address = g_object_ref (address);
 
   return TRUE;
@@ -1012,11 +1069,16 @@ g_socket_receive (GSocket       *socket,
 	  if (errsv == EINTR)
 	    continue;
 
+	  win32_unset_event_mask (socket, FD_READ);
+
 	  g_set_error (error, G_IO_ERROR,
 		       socket_io_error_from_errno (errsv),
 		       "error receiving data: %s", socket_strerror (errsv));
 	  return -1;
 	}
+
+      win32_unset_event_mask (socket, FD_READ);
+
       break;
     }
 
@@ -1045,6 +1107,11 @@ g_socket_send (GSocket      *socket,
 	  if (errsv == EINTR)
 	    continue;
 
+#ifdef WSAEWOULDBLOCK
+	  if (errsv == WSAEWOULDBLOCK)
+	    win32_unset_event_mask (socket, FD_WRITE);
+#endif
+
 	  g_set_error (error, G_IO_ERROR,
 		       socket_io_error_from_errno (errsv),
 		       "error sending data: %s", socket_strerror (errsv));
@@ -1092,6 +1159,14 @@ g_socket_close (GSocket *socket,
       break;
     }
 
+#ifdef G_OS_WIN32
+  if (socket->priv->event != WSA_INVALID_EVENT)
+    {
+      WSACloseEvent (socket->priv->event);
+      socket->priv->event = WSA_INVALID_EVENT;
+    }
+#endif
+
   socket->priv->closed = TRUE;
 
   return res != -1;
@@ -1103,14 +1178,320 @@ g_socket_is_closed (GSocket *socket)
   return socket->priv->closed;
 }
 
+#ifdef G_OS_WIN32
+/* Broken source, used on errors */
+static gboolean
+broken_prepare  (GSource  *source,
+		 gint     *timeout)
+{
+  return FALSE;
+}
+
+static gboolean
+broken_check    (GSource  *source)
+{
+  return FALSE;
+}
+
+static gboolean
+broken_dispatch (GSource    *source, 
+		 GSourceFunc callback,
+		 gpointer    user_data)
+{
+  return TRUE;
+}
+
+static GSourceFuncs broken_funcs =
+{
+  broken_prepare,
+  broken_check,
+  broken_dispatch,
+  NULL
+};
+
+static gint
+network_events_for_condition (GIOCondition condition)
+{
+  int event_mask = 0;
+
+  if (condition & G_IO_IN)
+    event_mask |= (FD_READ | FD_ACCEPT);
+  if (condition & G_IO_OUT)
+    event_mask |= (FD_WRITE | FD_CONNECT);
+  event_mask |= FD_CLOSE;
+
+  return event_mask;
+}
+
+static void
+ensure_event (GSocket *socket)
+{
+  if (socket->priv->event == WSA_INVALID_EVENT)
+    socket->priv->event = WSACreateEvent();
+}
+
+static void
+update_select_events (GSocket *socket)
+{
+  int event_mask;
+  GIOCondition *ptr;
+  GList *l;
+  WSAEVENT event;
+
+  ensure_event (socket);
+
+  event_mask = 0;
+  for (l = socket->priv->requested_conditions; l != NULL; l = l->next)
+    {
+      ptr = l->data;
+      event_mask |= network_events_for_condition (*ptr);
+    }
+
+  if (event_mask != socket->priv->selected_events)
+    {
+      /* If no events selected, disable event so we can unset
+	 nonblocking mode */
+
+      if (event_mask == 0)
+	event = NULL;
+      else
+	event = socket->priv->event;
+
+      if (WSAEventSelect (socket->priv->fd, event, event_mask) == 0)
+	{
+	  socket->priv->selected_events = event_mask;
+
+	  /* This automatically enables nonblocking mode */
+	  if (socket->priv->blocking)
+	    {
+	      socket->priv->blocking = FALSE;
+	      g_object_notify (G_OBJECT (socket), "blocking");
+	    }
+	}
+    }
+}
+
+static void
+add_condition_watch (GSocket *socket,
+		     GIOCondition *condition)
+{
+  g_assert (g_list_find (socket->priv->requested_conditions, condition) == NULL);
+
+  socket->priv->requested_conditions =
+    g_list_prepend (socket->priv->requested_conditions, condition);
+
+  update_select_events (socket);
+}
+
+static void
+remove_condition_watch (GSocket *socket,
+			GIOCondition *condition)
+{
+  g_assert (g_list_find (socket->priv->requested_conditions, condition) != NULL);
+
+  socket->priv->requested_conditions =
+    g_list_remove (socket->priv->requested_conditions, condition);
+
+  update_select_events (socket);
+}
+
+static GIOCondition
+update_condition (GSocket *socket)
+{
+  WSANETWORKEVENTS events;
+  GIOCondition condition;
+
+  if (WSAEnumNetworkEvents (socket->priv->fd,
+			    socket->priv->event,
+			    &events) == 0)
+    {
+      socket->priv->current_events |= events.lNetworkEvents;
+      if (events.lNetworkEvents & FD_WRITE &&
+	  events.iErrorCode[FD_WRITE_BIT] != 0)
+	socket->priv->current_errors |= FD_WRITE;
+      if (events.lNetworkEvents & FD_CONNECT &&
+	  events.iErrorCode[FD_CONNECT_BIT] != 0)
+	socket->priv->current_errors |= FD_CONNECT;
+    }
+
+  condition = 0;
+  if (socket->priv->current_events & (FD_READ | FD_ACCEPT))
+    condition |= G_IO_IN;
+
+  if (socket->priv->current_events & FD_CLOSE ||
+      socket->priv->closed)
+    condition |= G_IO_HUP;
+
+  /* Never report both G_IO_OUT and HUP, these are
+     mutually exclusive (can't write to a closed socket) */
+  if ((condition & G_IO_HUP) == 0 &&
+      socket->priv->current_events & FD_WRITE)
+    {
+      if (socket->priv->current_errors & FD_WRITE)
+	condition |= G_IO_ERR;
+      else
+	condition |= G_IO_OUT;
+    }
+  else
+    {
+      if (socket->priv->current_events & FD_CONNECT)
+	{
+	  if (socket->priv->current_errors & FD_CONNECT)
+	    condition |= (G_IO_HUP | G_IO_ERR);
+	  else
+	    condition |= G_IO_OUT;
+	}
+    }
+
+  return condition;
+}
+
+typedef struct {
+  GSource       source;
+  GPollFD       pollfd;
+  GSocket      *socket;
+  GIOCondition  condition;
+  GCancellable *cancellable;
+  GPollFD       cancel_pollfd;
+  GIOCondition  result_condition;
+} GWinsockSource;
+
+static gboolean
+winsock_prepare (GSource  *source,
+		 gint     *timeout)
+{
+  GWinsockSource *winsock_source = (GWinsockSource *)source;
+  GIOCondition current_condition;
+
+  current_condition = update_condition (winsock_source->socket);
+
+  if (g_cancellable_is_cancelled (winsock_source->cancellable))
+    {
+      winsock_source->result_condition = current_condition;
+      return TRUE;
+    }
+
+  if ((winsock_source->condition & current_condition) != 0)
+    {
+      winsock_source->result_condition = current_condition;
+      return TRUE;
+    }
+
+  return FALSE;
+}
+
+static gboolean
+winsock_check (GSource  *source)
+{
+  GWinsockSource *winsock_source = (GWinsockSource *)source;
+  GIOCondition current_condition;
+
+  current_condition = update_condition (winsock_source->socket);
+
+  if (g_cancellable_is_cancelled (winsock_source->cancellable))
+    {
+      winsock_source->result_condition = current_condition;
+      return TRUE;
+    }
+
+  if ((winsock_source->condition & current_condition) != 0)
+    {
+      winsock_source->result_condition = current_condition;
+      return TRUE;
+    }
+
+  return FALSE;
+}
+
+static gboolean
+winsock_dispatch (GSource    *source,
+		  GSourceFunc callback,
+		  gpointer    user_data)
+{
+  GSocketSourceFunc func = (GSocketSourceFunc)callback;
+  GWinsockSource *winsock_source = (GWinsockSource *)source;
+
+  return (*func) (user_data, winsock_source->result_condition);
+}
+
+static void
+winsock_finalize (GSource *source)
+{
+  GWinsockSource *winsock_source = (GWinsockSource *)source;
+  GSocket *socket;
+
+  socket = winsock_source->socket;
+
+  remove_condition_watch (socket, &winsock_source->condition);
+  g_object_unref (socket);
+
+  if (winsock_source->cancellable)
+    g_object_unref (winsock_source->cancellable);
+}
+
+static GSourceFuncs winsock_funcs =
+{
+  winsock_prepare,
+  winsock_check,
+  winsock_dispatch,
+  winsock_finalize
+};
+
+static GSource *
+winsock_source_new (GSocket      *socket,
+		    GIOCondition  condition,
+		    GCancellable *cancellable)
+{
+  GSource *source;
+  GWinsockSource *winsock_source;
+
+  ensure_event (socket);
+
+  if (socket->priv->event == WSA_INVALID_EVENT)
+    {
+      g_warning ("Failed to create WSAEvent");
+      return g_source_new (&broken_funcs, sizeof (GSource));
+    }
+
+  condition |= G_IO_HUP | G_IO_ERR;
+
+  source = g_source_new (&winsock_funcs, sizeof (GWinsockSource));
+  winsock_source = (GWinsockSource *)source;
+
+  winsock_source->socket = g_object_ref (socket);
+  winsock_source->condition = condition;
+  add_condition_watch (socket, &winsock_source->condition);
+
+  if (cancellable)
+    {
+      winsock_source->cancellable = g_object_ref (cancellable);
+      g_cancellable_make_pollfd (cancellable,
+				 &winsock_source->cancel_pollfd);
+      g_source_add_poll (source, &winsock_source->cancel_pollfd);
+    }
+
+  winsock_source->pollfd.fd = (gintptr) socket->priv->event;
+  winsock_source->pollfd.events = condition;
+  g_source_add_poll (source, &winsock_source->pollfd);
+
+  return source;
+}
+#endif
+
 GSource *
 g_socket_create_source (GSocket      *socket,
                         GIOCondition  condition,
                         GCancellable *cancellable)
 {
+  GSource *source;
   g_return_val_if_fail (G_IS_SOCKET (socket) && (cancellable == NULL || G_IS_CANCELLABLE (cancellable)), NULL);
 
-  return _g_fd_source_new (socket->priv->fd, condition, cancellable);
+#ifdef G_OS_WIN32
+  source = winsock_source_new (socket, condition, cancellable);
+#else
+  source =_g_fd_source_new (socket->priv->fd, condition, cancellable);
+#endif
+  return source;
 }
 
 /**
@@ -1133,20 +1514,34 @@ GIOCondition
 g_socket_condition_check (GSocket       *socket,
                           GIOCondition   condition)
 {
-  GPollFD poll_fd;
-  gint result;
-
   if (!check_socket (socket, NULL))
     return 0;
 
-  poll_fd.fd = socket->priv->fd;
-  poll_fd.events = condition;
+#ifdef G_OS_WIN32
+  {
+    GIOCondition current_condition;
 
-  do
-    result = g_poll (&poll_fd, 1, 0);
-  while (result == -1 && get_socket_errno () == EINTR);
+    condition |= G_IO_ERR | G_IO_HUP;
 
-  return poll_fd.revents;
+    add_condition_watch (socket, &condition);
+    current_condition = update_condition (socket);
+    remove_condition_watch (socket, &condition);
+    return condition & current_condition;
+  }
+#else
+  {
+    GPollFD poll_fd;
+    gint result;
+    poll_fd.fd = socket->priv->fd;
+    poll_fd.events = condition;
+
+    do
+      result = g_poll (&poll_fd, 1, 0);
+    while (result == -1 && get_socket_errno () == EINTR);
+
+    return poll_fd.revents;
+  }
+#endif
 }
 
 /**
@@ -1169,29 +1564,83 @@ g_socket_condition_wait (GSocket       *socket,
                          GCancellable  *cancellable,
                          GError       **error)
 {
-  GPollFD poll_fd[2];
-  gint result;
-  gint num;
-
   if (!check_socket (socket, error))
     return FALSE;
 
-  poll_fd[0].fd = socket->priv->fd;
-  poll_fd[0].events = condition;
-  num = 1;
+  if (g_cancellable_set_error_if_cancelled (cancellable, error))
+    return FALSE;
 
-  if (cancellable)
-    {
-      g_cancellable_make_pollfd (cancellable, &poll_fd[1]);
-      num++;
-    }
+#ifdef G_OS_WIN32
+  {
+    GIOCondition current_condition;
+    WSAEVENT events[2];
+    DWORD res;
+    GPollFD cancel_fd;
+    int num_events;
+
+    /* Always check these */
+    condition |=  G_IO_ERR | G_IO_HUP;
 
-  do
-    result = g_poll (poll_fd, num, -1);
-  while (result == -1 && get_socket_errno () == EINTR);
+    add_condition_watch (socket, &condition);
 
-  return cancellable == NULL ||
-         !g_cancellable_set_error_if_cancelled (cancellable, error);
+    num_events = 0;
+    events[num_events++] = socket->priv->event;
+
+    if (cancellable)
+      {
+	g_cancellable_make_pollfd (cancellable, &cancel_fd);
+	events[num_events++] = (WSAEVENT)cancel_fd.fd;
+      }
+
+    current_condition = update_condition (socket);
+    while ((condition & current_condition) == 0)
+      {
+	res = WSAWaitForMultipleEvents(num_events, events,
+				       FALSE, WSA_INFINITE, FALSE);
+	if (res == WSA_WAIT_FAILED)
+	  {
+	    int errsv = get_socket_errno ();
+
+	    g_set_error (error, G_IO_ERROR,
+			 socket_io_error_from_errno (errsv),
+			 "waiting for socket condition: %s",
+			 socket_strerror (errsv));
+	    break;
+	  }
+
+	if (g_cancellable_set_error_if_cancelled (cancellable, error))
+	  break;
+
+	current_condition = update_condition (socket);
+      }
+    remove_condition_watch (socket, &condition);
+
+    return (condition & current_condition) != 0;
+  }
+#else
+  {
+    GPollFD poll_fd[2];
+    gint result;
+    gint num;
+
+    poll_fd[0].fd = socket->priv->fd;
+    poll_fd[0].events = condition;
+    num = 1;
+
+    if (cancellable)
+      {
+	g_cancellable_make_pollfd (cancellable, &poll_fd[1]);
+	num++;
+      }
+
+    do
+      result = g_poll (poll_fd, num, -1);
+    while (result == -1 && get_socket_errno () == EINTR);
+
+    return cancellable == NULL ||
+      !g_cancellable_set_error_if_cancelled (cancellable, error);
+  }
+  #endif
 }
 
 gssize
@@ -1377,6 +1826,9 @@ g_socket_send_message (GSocket                *socket,
 	    if (errsv == WSAEINTR)
 	      continue;
 
+	    if (errsv == WSAEWOULDBLOCK)
+	      win32_unset_event_mask (socket, FD_WRITE);
+
 	    g_set_error (error, G_IO_ERROR,
 			 socket_io_error_from_errno (errsv),
 			 "WSASendTo: %s", socket_strerror (errsv));
@@ -1665,12 +2117,14 @@ g_socket_receive_message (GSocket                 *socket,
 	    if (errsv == WSAEINTR)
 	      continue;
 
+	    win32_unset_event_mask (socket, FD_READ);
 	    g_set_error (error, G_IO_ERROR,
 			 socket_io_error_from_errno (errsv),
 			 "WSARecvFrom: %s", socket_strerror (errsv));
 
 	    return -1;
 	  }
+	win32_unset_event_mask (socket, FD_READ);
 	break;
       }
 



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]