[glib] GSocket: Fix race conditions on Win32 if multiple threads are waiting on conditions for the same soc



commit 799f8dcd46fb40ea206d9f1b5468db62cc00a72e
Author: Sebastian Dröge <sebastian centricular com>
Date:   Mon Dec 5 12:28:57 2016 +0200

    GSocket: Fix race conditions on Win32 if multiple threads are waiting on conditions for the same socket
    
    WSAWaitForMultipleEvents() only returns for one of the waiting threads, and
    that one might not even be the one waiting for the condition that changed. As
    such, only let a single thread wait on the event and use a GCond for all other
    threads.
    
    With this it is possible to e.g. have an UDP socket that is written to from
    one thread and read from in another thread on Win32 too. On POSIX systems this
    was working before already.
    
    https://bugzilla.gnome.org/show_bug.cgi?id=762283

 gio/gsocket.c |   94 +++++++++++++++++++++++++++++++++++++++++----------------
 1 files changed, 68 insertions(+), 26 deletions(-)
---
diff --git a/gio/gsocket.c b/gio/gsocket.c
index 5a2f332..c7555ba 100644
--- a/gio/gsocket.c
+++ b/gio/gsocket.c
@@ -249,11 +249,14 @@ struct _GSocketPrivate
   guint           connect_pending : 1;
 #ifdef G_OS_WIN32
   WSAEVENT        event;
+  gboolean        waiting;
+  DWORD           waiting_result;
   int             current_events;
   int             current_errors;
   int             selected_events;
   GList          *requested_conditions; /* list of requested GIOCondition * */
   GMutex          win32_source_lock;
+  GCond           win32_source_cond;
 #endif
 
   struct {
@@ -333,8 +336,10 @@ socket_strerror (int err)
 static void
 _win32_unset_event_mask (GSocket *socket, int mask)
 {
+  g_mutex_lock (&socket->priv->win32_source_lock);
   socket->priv->current_events &= ~mask;
   socket->priv->current_errors &= ~mask;
+  g_mutex_unlock (&socket->priv->win32_source_lock);
 }
 #else
 #define win32_unset_event_mask(_socket, _mask)
@@ -831,6 +836,7 @@ g_socket_finalize (GObject *object)
 
   g_assert (socket->priv->requested_conditions == NULL);
   g_mutex_clear (&socket->priv->win32_source_lock);
+  g_cond_clear (&socket->priv->win32_source_cond);
 #endif
 
   for (i = 0; i < RECV_ADDR_CACHE_SIZE; i++)
@@ -1058,6 +1064,7 @@ g_socket_init (GSocket *socket)
 #ifdef G_OS_WIN32
   socket->priv->event = WSA_INVALID_EVENT;
   g_mutex_init (&socket->priv->win32_source_lock);
+  g_cond_init (&socket->priv->win32_source_cond);
 #endif
 }
 
@@ -2441,6 +2448,8 @@ g_socket_accept (GSocket       *socket,
 
   while (TRUE)
     {
+      win32_unset_event_mask (socket, FD_ACCEPT);
+
       if ((ret = accept (socket->priv->fd, NULL, 0)) < 0)
        {
          int errsv = get_socket_errno ();
@@ -2455,8 +2464,6 @@ g_socket_accept (GSocket       *socket,
               errsv == EAGAIN)
 #endif
             {
-              win32_unset_event_mask (socket, FD_ACCEPT);
-
               if (socket->priv->blocking)
                 {
                   if (!g_socket_condition_wait (socket,
@@ -2473,8 +2480,6 @@ g_socket_accept (GSocket       *socket,
       break;
     }
 
-  win32_unset_event_mask (socket, FD_ACCEPT);
-
 #ifdef G_OS_WIN32
   {
     /* The socket inherits the accepting sockets event mask and even object,
@@ -2563,6 +2568,8 @@ g_socket_connect (GSocket         *socket,
 
   while (1)
     {
+      win32_unset_event_mask (socket, FD_CONNECT);
+
       if (connect (socket->priv->fd, (struct sockaddr *) &buffer,
                   g_socket_address_get_native_size (address)) < 0)
        {
@@ -2577,8 +2584,6 @@ g_socket_connect (GSocket         *socket,
          if (errsv == WSAEWOULDBLOCK)
 #endif
            {
-              win32_unset_event_mask (socket, FD_CONNECT);
-
              if (socket->priv->blocking)
                {
                  if (g_socket_condition_wait (socket, G_IO_OUT, cancellable, error))
@@ -2604,8 +2609,6 @@ g_socket_connect (GSocket         *socket,
       break;
     }
 
-  win32_unset_event_mask (socket, FD_CONNECT);
-
   socket->priv->connected_read = TRUE;
   socket->priv->connected_write = TRUE;
 
@@ -2785,6 +2788,8 @@ g_socket_receive_with_timeout (GSocket       *socket,
 
   while (1)
     {
+      win32_unset_event_mask (socket, FD_READ);
+
       if ((ret = recv (socket->priv->fd, buffer, size, 0)) < 0)
        {
          int errsv = get_socket_errno ();
@@ -2799,8 +2804,6 @@ g_socket_receive_with_timeout (GSocket       *socket,
               errsv == EAGAIN)
 #endif
             {
-              win32_unset_event_mask (socket, FD_READ);
-
               if (timeout != 0)
                 {
                   if (!block_on_timeout (socket, G_IO_IN, timeout, start_time,
@@ -2811,14 +2814,10 @@ g_socket_receive_with_timeout (GSocket       *socket,
                 }
             }
 
-         win32_unset_event_mask (socket, FD_READ);
-
          socket_set_error_lazy (error, errsv, _("Error receiving data: %s"));
          return -1;
        }
 
-      win32_unset_event_mask (socket, FD_READ);
-
       break;
     }
 
@@ -2984,6 +2983,8 @@ g_socket_send_with_timeout (GSocket       *socket,
 
   while (1)
     {
+      win32_unset_event_mask (socket, FD_WRITE);
+
       if ((ret = send (socket->priv->fd, buffer, size, G_SOCKET_DEFAULT_SEND_FLAGS)) < 0)
        {
          int errsv = get_socket_errno ();
@@ -2998,8 +2999,6 @@ g_socket_send_with_timeout (GSocket       *socket,
               errsv == EAGAIN)
 #endif
             {
-              win32_unset_event_mask (socket, FD_WRITE);
-
               if (timeout != 0)
                 {
                   if (!block_on_timeout (socket, G_IO_OUT, timeout, start_time,
@@ -3414,7 +3413,7 @@ remove_condition_watch (GSocket      *socket,
 }
 
 static GIOCondition
-update_condition (GSocket *socket)
+update_condition_unlocked (GSocket *socket)
 {
   WSANETWORKEVENTS events;
   GIOCondition condition;
@@ -3481,6 +3480,16 @@ update_condition (GSocket *socket)
 
   return condition;
 }
+
+static GIOCondition
+update_condition (GSocket *socket)
+{
+  GIOCondition res;
+  g_mutex_lock (&socket->priv->win32_source_lock);
+  res = update_condition_unlocked (socket);
+  g_mutex_unlock (&socket->priv->win32_source_lock);
+  return res;
+}
 #endif
 
 typedef struct {
@@ -3876,11 +3885,44 @@ g_socket_condition_timed_wait (GSocket       *socket,
     if (timeout == -1)
       timeout = WSA_INFINITE;
 
-    current_condition = update_condition (socket);
+    g_mutex_lock (&socket->priv->win32_source_lock);
+    current_condition = update_condition_unlocked (socket);
     while ((condition & current_condition) == 0)
       {
-       res = WSAWaitForMultipleEvents (num_events, events,
-                                       FALSE, timeout, FALSE);
+        if (!socket->priv->waiting)
+          {
+            socket->priv->waiting = TRUE;
+            socket->priv->waiting_result = 0;
+            g_mutex_unlock (&socket->priv->win32_source_lock);
+
+            res = WSAWaitForMultipleEvents (num_events, events, FALSE, timeout, FALSE);
+
+            g_mutex_lock (&socket->priv->win32_source_lock);
+            socket->priv->waiting = FALSE;
+            socket->priv->waiting_result = res;
+            g_cond_broadcast (&socket->priv->win32_source_cond);
+          }
+        else
+          {
+            if (timeout != WSA_INFINITE)
+              {
+                if (!g_cond_wait_until (&socket->priv->win32_source_cond, &socket->priv->win32_source_lock, 
timeout))
+                  {
+                    res = WSA_WAIT_TIMEOUT;
+                    break;
+                  }
+                else
+                  {
+                    res = socket->priv->waiting_result;
+                  }
+              }
+            else
+              {
+                g_cond_wait (&socket->priv->win32_source_cond, &socket->priv->win32_source_lock);
+                res = socket->priv->waiting_result;
+              }
+          }
+
        if (res == WSA_WAIT_FAILED)
          {
            int errsv = get_socket_errno ();
@@ -3901,7 +3943,7 @@ g_socket_condition_timed_wait (GSocket       *socket,
        if (g_cancellable_set_error_if_cancelled (cancellable, error))
          break;
 
-       current_condition = update_condition (socket);
+        current_condition = update_condition_unlocked (socket);
 
        if (timeout != WSA_INFINITE)
          {
@@ -3910,6 +3952,7 @@ g_socket_condition_timed_wait (GSocket       *socket,
              timeout = 0;
          }
       }
+    g_mutex_unlock (&socket->priv->win32_source_lock);
     remove_condition_watch (socket, &condition);
     if (num_events > 1)
       g_cancellable_release_fd (cancellable);
@@ -4405,6 +4448,8 @@ g_socket_send_message_with_timeout (GSocket                *socket,
 
     while (1)
       {
+        win32_unset_event_mask (socket, FD_WRITE);
+
        if (address)
          result = WSASendTo (socket->priv->fd,
                              bufs, num_vectors,
@@ -4426,8 +4471,6 @@ g_socket_send_message_with_timeout (GSocket                *socket,
 
            if (errsv == WSAEWOULDBLOCK)
               {
-                win32_unset_event_mask (socket, FD_WRITE);
-
                 if (timeout != 0)
                   {
                     if (!block_on_timeout (socket, G_IO_OUT, timeout,
@@ -4875,6 +4918,8 @@ g_socket_receive_message_with_timeout (GSocket                 *socket,
     /* do it */
     while (1)
       {
+        win32_unset_event_mask (socket, FD_READ);
+
        addrlen = sizeof addr;
        if (address)
          result = WSARecvFrom (socket->priv->fd,
@@ -4896,8 +4941,6 @@ g_socket_receive_message_with_timeout (GSocket                 *socket,
 
             if (errsv == WSAEWOULDBLOCK)
               {
-                win32_unset_event_mask (socket, FD_READ);
-
                 if (timeout != 0)
                   {
                     if (!block_on_timeout (socket, G_IO_IN, timeout,
@@ -4911,7 +4954,6 @@ g_socket_receive_message_with_timeout (GSocket                 *socket,
            socket_set_error_lazy (error, errsv, _("Error receiving message: %s"));
            return -1;
          }
-       win32_unset_event_mask (socket, FD_READ);
        break;
       }
 


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]