[wing/wip/nacho/poll: 2/2] Add wing_poll



commit 1de9e035b0c3e849ff03c5aaddde457792f9b5b4
Author: Ignacio Casal Quinteiro <qignacio amazon com>
Date:   Fri Nov 30 17:02:00 2018 +0100

    Add wing_poll
    
    This is a new polling method allowing to poll more than 64 handles
    based on the glib one.
    When we reach the limit of 64 we create a thread and we poll
    on that thread for a batch of handles this way we overcome the limit.

 tests/meson.build |   5 +-
 tests/poll.c      | 619 ++++++++++++++++++++++++++++++++++++++++++++++++++++++
 wing/meson.build  |   2 +
 wing/wing.h       |   1 +
 wing/wingpoll.c   | 403 +++++++++++++++++++++++++++++++++++
 wing/wingpoll.h   |  46 ++++
 6 files changed, 1075 insertions(+), 1 deletion(-)
---
diff --git a/tests/meson.build b/tests/meson.build
index fbbfc14..9079a7e 100644
--- a/tests/meson.build
+++ b/tests/meson.build
@@ -1,10 +1,13 @@
 unit_tests = [
   'named-pipe',
+  'poll',
 ]
 
+winsock2_dep = cc.find_library('ws2_32')
+
 foreach unit: unit_tests
   exe = executable(unit, unit + '.c',
-                   dependencies: wing_dep,
+                   dependencies: [ wing_dep, winsock2_dep ],
                    include_directories: wing_inc)
   test(unit, exe, args: [ '--tap', '-k' ])
 endforeach
diff --git a/tests/poll.c b/tests/poll.c
new file mode 100644
index 0000000..2e372da
--- /dev/null
+++ b/tests/poll.c
@@ -0,0 +1,619 @@
+/* Unit test for W32 version of wing_poll()
+ *
+ * Copyright © 2017 Руслан Ижбулатов <lrn1986 gmail com>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this library; if not, see <http://www.gnu.org/licenses/>.
+ */
+
+#define _WIN32_WINNT 0x0600
+
+#include <Winsock2.h>
+#include <wing/wing.h>
+#include <glib.h>
+
+#define NUM_POLLEES 999
+#define NUM_POLLFDS 1000
+
+#define ASYNC_CONNECT_OK(r) (r == 0 || (r < 0 && GetLastError () == WSAEWOULDBLOCK))
+
+#define REPEAT 1
+
+static void
+init_networking (void)
+{
+  WSADATA wsadata;
+
+  if (WSAStartup (MAKEWORD (2, 0), &wsadata) != 0)
+    g_error ("Windows Sockets could not be initialized");
+}
+
+static void
+prepare_fds (SOCKET  sockets[],
+             GPollFD fds[],
+             int     num_pollees)
+{
+  gint i;
+
+  for (i = 0; i < num_pollees; i++)
+    {
+      fds[i].fd = (gintptr) WSACreateEvent ();
+      g_assert (WSAEventSelect (sockets[i], (HANDLE) fds[i].fd, FD_READ | FD_CLOSE) == 0);
+    }
+}
+
+static void
+reset_fds (GPollFD fds[],
+           int     num_pollees)
+{
+  gint i;
+
+  for (i = 0; i < num_pollees; i++)
+    {
+      WSAResetEvent ((HANDLE) fds[i].fd);
+      fds[i].events =  G_IO_IN | G_IO_OUT | G_IO_ERR;
+      fds[i].revents = 0;
+    }
+}
+
+static void
+reset_fds_msg (GPollFD fds[],
+               int     num_pollfds)
+{
+  fds[num_pollfds - 1].fd = G_WIN32_MSG_HANDLE;
+  fds[num_pollfds - 1].events = G_IO_IN;
+  fds[num_pollfds - 1].revents = 0;
+}
+
+static void
+check_fds (SOCKET  sockets[],
+           GPollFD fds[],
+           int     num_pollees)
+{
+  gint i;
+
+  for (i = 0; i < num_pollees; i++)
+    {
+      if (fds[i].revents != 0)
+        {
+          WSANETWORKEVENTS events;
+          g_assert (WSAEnumNetworkEvents (sockets[i], 0, &events) == 0);
+
+          fds[i].revents = 0;
+          if (events.lNetworkEvents & (FD_READ | FD_ACCEPT))
+            fds[i].revents |= G_IO_IN;
+
+          if (events.lNetworkEvents & FD_WRITE)
+            fds[i].revents |= G_IO_OUT;
+          else
+            {
+              /* We have called WSAEnumNetworkEvents() above but it didn't
+               * set FD_WRITE.
+               */
+              if (events.lNetworkEvents & FD_CONNECT)
+                {
+                  if (events.iErrorCode[FD_CONNECT_BIT] == 0)
+                    fds[i].revents |= G_IO_OUT;
+                  else
+                    fds[i].revents |= (G_IO_HUP | G_IO_ERR);
+                }
+              if (fds[i].revents == 0 && (events.lNetworkEvents & (FD_CLOSE)))
+                fds[i].revents |= G_IO_HUP;
+            }
+        }
+    }
+}
+
+static void
+prepare_sockets (SOCKET  sockets[],
+                 SOCKET  opp_sockets[],
+                 GPollFD fds[],
+                 int     num_pollees)
+{
+  gint i;
+  SOCKET server;
+  struct sockaddr_in sa;
+  unsigned long ul = 1;
+  int sa_size;
+  int r;
+
+  server = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
+  g_assert (server != INVALID_SOCKET);
+
+  memset(&sa, 0, sizeof sa);
+
+  sa.sin_family = AF_INET;
+  sa.sin_port = 0;
+  sa.sin_addr.s_addr = htonl (INADDR_LOOPBACK);
+  sa_size = sizeof (sa);
+
+  g_assert (bind (server, (const struct sockaddr *) &sa, sa_size) == 0);
+  g_assert (getsockname (server, (struct sockaddr *) &sa, &sa_size) == 0);
+  g_assert (listen (server, 1) == 0);
+
+  for (i = 0; i < num_pollees; i++)
+    {
+      opp_sockets[i] = socket (AF_INET, SOCK_STREAM, IPPROTO_TCP);
+      g_assert (opp_sockets[i] != INVALID_SOCKET);
+      g_assert (ioctlsocket (opp_sockets[i], FIONBIO, &ul) == 0);
+
+      r = connect (opp_sockets[i], (const struct sockaddr *) &sa, sizeof (sa));
+      g_assert (ASYNC_CONNECT_OK (r));
+
+      sockets[i] = accept (server, NULL, NULL);
+      g_assert (sockets[i] != INVALID_SOCKET);
+      g_assert (ioctlsocket (sockets[i], FIONBIO, &ul) == 0);
+    }
+
+  closesocket (server);
+}
+
+static void
+cleanup_sockets (SOCKET sockets[],
+                 SOCKET opp_sockets[],
+                 int    num_pollees)
+{
+  gint i;
+
+  for (i = 0; i < num_pollees; i++)
+    {
+      closesocket (sockets[i]);
+      closesocket (opp_sockets[i]);
+    }
+}
+
+static void
+bucketize (gint64 val,
+           gint   buckets[],
+           gint64 bucket_limits[],
+           gint   count)
+{
+  gint i;
+
+  if (val > bucket_limits[count - 1])
+    {
+      buckets[count - 1] += 1;
+      return;
+    }
+
+  for (i = count - 1; i > 0; i--)
+    if (val < bucket_limits[i] && val >= bucket_limits[i - 1])
+      {
+        buckets[i] += 1;
+        return;
+      }
+
+  buckets[0] += 1;
+}
+
+static void
+print_buckets (gint   buckets[],
+               gint64 bucket_limits[],
+               gint   count)
+{
+  gint i;
+
+  for (i = 0; i < count; i++)
+    if (i < count - 1)
+      g_print ("%-4lld-%4lld|", i == 0 ? 0 : bucket_limits[i - 1], bucket_limits[i] - 1);
+    else
+      g_print ("  >= %-4lld|", bucket_limits[i - 1]);
+
+  g_print ("\n");
+
+  for (i = 0; i < count; i++)
+    {
+      gint len;
+      gint padding;
+      gint j;
+      if (buckets[i] < 10)
+        len = 1;
+      else if (buckets[i] < 100)
+        len = 2;
+      else if (buckets[i] < 1000)
+        len = 3;
+      else
+        len = 4;
+      padding = 9 - len;
+      for (j = 0; j < padding / 2; j++)
+        g_print (" ");
+      if (buckets[i] != 0)
+        g_print ("%*d", len, buckets[i]);
+      else
+        g_print (" ");
+      for (j = padding / 2; j < padding; j++)
+        g_print (" ");
+      g_print (" ");
+    }
+
+  g_print ("\n\n");
+}
+
+static void
+test_wing_poll (void)
+{
+  SOCKET sockets[NUM_POLLEES];
+  GPollFD fds[NUM_POLLFDS];
+  SOCKET opp_sockets[NUM_POLLEES];
+  gint i;
+  gint activatable;
+  gint64 times[REPEAT][2];
+#define BUCKET_COUNT 25
+  gint64 bucket_limits[BUCKET_COUNT] = {3, 5, 10, 15, 20, 25, 30, 35, 40, 50, 60, 70, 80, 90, 100, 120, 150, 
180, 220, 280, 350, 450, 600, 800, 1000};
+  gint   buckets[BUCKET_COUNT];
+  gint64 times_avg = 0, times_min = G_MAXINT64, times_max = 0;
+
+  prepare_sockets (sockets, opp_sockets, fds, NUM_POLLEES);
+  prepare_fds (sockets, fds, NUM_POLLEES);
+
+  times_avg = 0;
+  times_min = G_MAXINT64;
+  times_max = 0;
+  memset (buckets, 0, sizeof (gint) * BUCKET_COUNT);
+
+  for (i = 0; i < REPEAT; i++)
+    {
+      gint r;
+      gint64 diff;
+
+      reset_fds (fds, NUM_POLLEES);
+      reset_fds_msg (fds, NUM_POLLFDS);
+      times[i][0] = g_get_monotonic_time ();
+      r = wing_poll (fds, NUM_POLLFDS, 0);
+      times[i][1] = g_get_monotonic_time ();
+      g_assert (r == 0);
+      diff = times[i][1] - times[i][0];
+      if (times_min > diff)
+        times_min = diff;
+      if (times_max < diff)
+        times_max = diff;
+      times_avg += diff;
+      bucketize (diff, buckets, bucket_limits, BUCKET_COUNT);
+    }
+
+  times_avg /= NUM_POLLEES;
+  g_print ("\nempty poll time:\n%4lldns - %4lldns, average %4lldns\n", times_min, times_max, times_avg);
+  print_buckets (buckets, bucket_limits, BUCKET_COUNT);
+
+  times_avg = 0;
+  times_min = G_MAXINT64;
+  times_max = 0;
+  memset (buckets, 0, sizeof (gint) * BUCKET_COUNT);
+
+  activatable = 0;
+
+  for (i = 0; i < REPEAT; i++)
+    {
+      gint r, s, v, t;
+      gint64 diff;
+      MSG msg;
+      gboolean found_app;
+
+      reset_fds (fds, NUM_POLLEES);
+      reset_fds_msg (fds, NUM_POLLFDS);
+      s = send (opp_sockets[activatable], (const char *) &t, 1, 0);
+      g_assert (PostMessage (NULL, WM_APP, 1, 2));
+      /* This is to ensure that all sockets catch up, otherwise some might not poll active */
+      g_usleep (G_USEC_PER_SEC / 1000);
+
+      times[i][0] = g_get_monotonic_time ();
+      r = wing_poll (fds, NUM_POLLFDS, 1000);
+      times[i][1] = g_get_monotonic_time ();
+
+      check_fds (sockets, fds, NUM_POLLEES);
+      v = recv (sockets[activatable], (char *) &t, 1, 0);
+      found_app = FALSE;
+      while (!found_app && PeekMessage (&msg, NULL, 0, 0, PM_REMOVE))
+        if (msg.message == WM_APP && msg.wParam == 1 && msg.lParam == 2)
+          found_app = TRUE;
+      g_assert (s == 1);
+      g_assert (r == 2);
+      g_assert (v == 1);
+      g_assert (found_app);
+
+      reset_fds (fds, NUM_POLLEES);
+      reset_fds_msg (fds, NUM_POLLFDS);
+      r = wing_poll (fds, NUM_POLLFDS, 0);
+      check_fds (sockets, fds, NUM_POLLEES);
+      g_assert (r == 0);
+      diff = times[i][1] - times[i][0];
+      if (times_min > diff)
+        times_min = diff;
+      if (times_max < diff)
+        times_max = diff;
+      times_avg += diff;
+      activatable = (activatable + 1) % NUM_POLLEES;
+      bucketize (diff, buckets, bucket_limits, BUCKET_COUNT);
+    }
+
+  times_avg /= NUM_POLLEES;
+  g_print ("1-socket + msg poll time:\n%4lldns - %4lldns, average %4lldns\n", times_min, times_max, 
times_avg);
+  print_buckets (buckets, bucket_limits, BUCKET_COUNT);
+
+  times_avg = 0;
+  times_min = G_MAXINT64;
+  times_max = 0;
+  memset (buckets, 0, sizeof (gint) * BUCKET_COUNT);
+
+  activatable = 0;
+
+  for (i = 0; i < REPEAT; i++)
+    {
+      gint r, s, v, t;
+      gint64 diff;
+
+      reset_fds (fds, NUM_POLLEES);
+      reset_fds_msg (fds, NUM_POLLFDS);
+      s = send (opp_sockets[activatable], (const char *) &t, 1, 0);
+
+      g_usleep (G_USEC_PER_SEC / 1000);
+
+      times[i][0] = g_get_monotonic_time ();
+      r = wing_poll (fds, NUM_POLLFDS, 1000);
+      times[i][1] = g_get_monotonic_time ();
+
+      check_fds (sockets, fds, NUM_POLLEES);
+      v = recv (sockets[activatable], (char *) &t, 1, 0);
+      g_assert (s == 1);
+      g_assert (r == 1);
+      g_assert (v == 1);
+
+      reset_fds (fds, NUM_POLLEES);
+      reset_fds_msg (fds, NUM_POLLFDS);
+      r = wing_poll (fds, NUM_POLLFDS, 0);
+      check_fds (sockets, fds, NUM_POLLEES);
+      g_assert (r == 0);
+
+      diff = times[i][1] - times[i][0];
+      if (times_min > diff)
+        times_min = diff;
+      if (times_max < diff)
+        times_max = diff;
+      times_avg += diff;
+      activatable = (activatable + 1) % NUM_POLLEES;
+      bucketize (diff, buckets, bucket_limits, BUCKET_COUNT);
+    }
+
+  times_avg /= NUM_POLLEES;
+  g_print ("1-socket poll time:\n%4lldns - %4lldns, average %4lldns\n", times_min, times_max, times_avg);
+  print_buckets (buckets, bucket_limits, BUCKET_COUNT);
+
+  times_avg = 0;
+  times_min = G_MAXINT64;
+  times_max = 0;
+  memset (buckets, 0, sizeof (gint) * BUCKET_COUNT);
+
+  for (i = 0; i < REPEAT; i++)
+    {
+      gint r, s, v, t;
+      gint64 diff;
+      gint j;
+
+      reset_fds (fds, NUM_POLLEES);
+      reset_fds_msg (fds, NUM_POLLFDS);
+      s = v = 0;
+
+      for (j = 0; j < NUM_POLLEES / 2; j++)
+        s += send (opp_sockets[j], (const char *) &t, 1, 0) == 1 ? 1 : 0;
+
+      g_usleep (G_USEC_PER_SEC / 1000);
+
+      times[i][0] = g_get_monotonic_time ();
+      r = wing_poll (fds, NUM_POLLFDS, 1000);
+      times[i][1] = g_get_monotonic_time ();
+      check_fds (sockets, fds, NUM_POLLEES);
+      for (j = 0; j < NUM_POLLEES / 2; j++)
+        v += recv (sockets[j], (char *) &t, 1, 0) == 1 ? 1 : 0;
+      g_assert (s == NUM_POLLEES / 2);
+      g_assert (r == NUM_POLLEES / 2);
+      g_assert (v == NUM_POLLEES / 2);
+
+      reset_fds (fds, NUM_POLLEES);
+      reset_fds_msg (fds, NUM_POLLFDS);
+      r = wing_poll (fds, NUM_POLLFDS, 0);
+      check_fds (sockets, fds, NUM_POLLEES);
+      g_assert (r == 0);
+
+      diff = times[i][1] - times[i][0];
+      if (times_min > diff)
+        times_min = diff;
+      if (times_max < diff)
+        times_max = diff;
+      times_avg += diff;
+      bucketize (diff, buckets, bucket_limits, BUCKET_COUNT);
+    }
+
+  times_avg /= NUM_POLLEES;
+  g_print ("half-socket poll time:\n%4lldns - %4lldns, average %4lldns\n", times_min, times_max, times_avg);
+  print_buckets (buckets, bucket_limits, BUCKET_COUNT);
+
+  times_avg = 0;
+  times_min = G_MAXINT64;
+  times_max = 0;
+  memset (buckets, 0, sizeof (gint) * BUCKET_COUNT);
+
+  for (i = 0; i < REPEAT; i++)
+    {
+      gint r, s, v, t;
+      gint64 diff;
+      gint j;
+      MSG msg;
+      gboolean found_app;
+
+      reset_fds (fds, NUM_POLLEES);
+      reset_fds_msg (fds, NUM_POLLFDS);
+      s = v = 0;
+
+      for (j = 0; j < NUM_POLLEES / 2; j++)
+        s += send (opp_sockets[j], (const char *) &t, 1, 0) == 1 ? 1 : 0;
+      g_assert (PostMessage (NULL, WM_APP, 1, 2));
+
+      /* This is to ensure that all sockets catch up, otherwise some might not poll active */
+      g_usleep (G_USEC_PER_SEC / 1000);
+
+      times[i][0] = g_get_monotonic_time ();
+      r = wing_poll (fds, NUM_POLLFDS, 1000);
+      times[i][1] = g_get_monotonic_time ();
+      check_fds (sockets, fds, NUM_POLLEES);
+      for (j = 0; j < NUM_POLLEES / 2; j++)
+        v += recv (sockets[j], (char *) &t, 1, 0) == 1 ? 1 : 0;
+      found_app = FALSE;
+      while (!found_app && PeekMessage (&msg, NULL, 0, 0, PM_REMOVE))
+        if (msg.message == WM_APP && msg.wParam == 1 && msg.lParam == 2)
+          found_app = TRUE;
+      g_assert (s == NUM_POLLEES / 2);
+      g_assert (r == NUM_POLLEES / 2 + 1);
+      g_assert (v == NUM_POLLEES / 2);
+      g_assert (found_app);
+
+      reset_fds (fds, NUM_POLLEES);
+      reset_fds_msg (fds, NUM_POLLFDS);
+      r = wing_poll (fds, NUM_POLLFDS, 0);
+      check_fds (sockets, fds, NUM_POLLEES);
+      g_assert (r == 0);
+
+      diff = times[i][1] - times[i][0];
+      if (times_min > diff)
+        times_min = diff;
+      if (times_max < diff)
+        times_max = diff;
+      times_avg += diff;
+      bucketize (diff, buckets, bucket_limits, BUCKET_COUNT);
+    }
+
+  times_avg /= NUM_POLLEES;
+  g_print ("half-socket + msg poll time:\n%4lldns - %4lldns, average %4lldns\n", times_min, times_max, 
times_avg);
+  print_buckets (buckets, bucket_limits, BUCKET_COUNT);
+
+  times_avg = 0;
+  times_min = G_MAXINT64;
+  times_max = 0;
+  memset (buckets, 0, sizeof (gint) * BUCKET_COUNT);
+
+  for (i = 0; i < REPEAT; i++)
+    {
+      gint r, s, v, t;
+      gint64 diff;
+      gint j;
+
+      reset_fds (fds, NUM_POLLEES);
+      reset_fds_msg (fds, NUM_POLLFDS);
+      s = v = 0;
+
+      for (j = 0; j < NUM_POLLEES; j++)
+        s += send (opp_sockets[j], (const char *) &t, 1, 0) == 1 ? 1 : 0;
+
+      g_usleep (G_USEC_PER_SEC / 1000);
+
+      times[i][0] = g_get_monotonic_time ();
+      r = wing_poll (fds, NUM_POLLFDS, 1000);
+      times[i][1] = g_get_monotonic_time ();
+      check_fds (sockets, fds, NUM_POLLEES);
+      for (j = 0; j < NUM_POLLEES; j++)
+        v += recv (sockets[j], (char *) &t, 1, 0) == 1 ? 1 : 0;
+      g_assert (s == NUM_POLLEES);
+      g_assert (r == NUM_POLLEES);
+      g_assert (v == NUM_POLLEES);
+
+      reset_fds (fds, NUM_POLLEES);
+      reset_fds_msg (fds, NUM_POLLFDS);
+      r = wing_poll (fds, NUM_POLLFDS, 0);
+      check_fds (sockets, fds, NUM_POLLEES);
+      g_assert (r == 0);
+
+      diff = times[i][1] - times[i][0];
+      if (times_min > diff)
+        times_min = diff;
+      if (times_max < diff)
+        times_max = diff;
+      times_avg += diff;
+      bucketize (diff, buckets, bucket_limits, BUCKET_COUNT);
+    }
+
+  times_avg /= NUM_POLLEES;
+  g_print ("%d-socket poll time: \n%4lldns - %4lldns, average %4lldns\n", NUM_POLLEES, times_min, times_max, 
times_avg);
+  print_buckets (buckets, bucket_limits, BUCKET_COUNT);
+
+  activatable = 0;
+  times_avg = 0;
+  times_min = G_MAXINT64;
+  times_max = 0;
+  memset (buckets, 0, sizeof (gint) * BUCKET_COUNT);
+
+  for (i = 0; i < REPEAT; i++)
+    {
+      gint r, s, v, t;
+      gint64 diff;
+      gint j;
+      MSG msg;
+      gboolean found_app;
+
+      reset_fds (fds, NUM_POLLEES);
+      reset_fds_msg (fds, NUM_POLLFDS);
+      s = v = 0;
+
+      for (j = 0; j < activatable; j++)
+        s += send (opp_sockets[j], (const char *) &t, 1, 0) == 1 ? 1 : 0;
+      g_assert (PostMessage (NULL, WM_APP, 1, 2));
+
+      g_usleep (G_USEC_PER_SEC / 1000);
+
+      times[i][0] = g_get_monotonic_time ();
+      r = wing_poll (fds, NUM_POLLFDS, 1000);
+      times[i][1] = g_get_monotonic_time ();
+      check_fds (sockets, fds, NUM_POLLEES);
+      for (j = 0; j < activatable; j++)
+        v += recv (sockets[j], (char *) &t, 1, 0) == 1 ? 1 : 0;
+      found_app = FALSE;
+      while (!found_app && PeekMessage (&msg, NULL, 0, 0, PM_REMOVE))
+        if (msg.message == WM_APP && msg.wParam == 1 && msg.lParam == 2)
+          found_app = TRUE;
+      g_assert (s == activatable);
+      g_assert (r == activatable + 1);
+      g_assert (v == activatable);
+      g_assert (found_app);
+
+      reset_fds (fds, NUM_POLLEES);
+      reset_fds_msg (fds, NUM_POLLFDS);
+      r = wing_poll (fds, NUM_POLLFDS, 0);
+      check_fds (sockets, fds, NUM_POLLEES);
+      g_assert (r == 0);
+
+      diff = times[i][1] - times[i][0];
+      if (times_min > diff)
+        times_min = diff;
+      if (times_max < diff)
+        times_max = diff;
+      times_avg += diff;
+      bucketize (diff, buckets, bucket_limits, BUCKET_COUNT);
+      activatable = (activatable + 1) % NUM_POLLEES;
+    }
+
+  times_avg /= NUM_POLLEES;
+  g_print ("variable socket number + msg poll time: \n%4lldns - %4lldns, average %4lldns\n", times_min, 
times_max, times_avg);
+  print_buckets (buckets, bucket_limits, BUCKET_COUNT);
+
+  cleanup_sockets (sockets, opp_sockets, NUM_POLLEES);
+}
+
+int
+main (int   argc,
+      char *argv[])
+{
+  g_test_init (&argc, &argv, NULL);
+  init_networking ();
+
+  g_test_add_func ("/poll/wing-poll", test_wing_poll);
+
+  return g_test_run ();
+}
diff --git a/wing/meson.build b/wing/meson.build
index 596e283..8da554b 100644
--- a/wing/meson.build
+++ b/wing/meson.build
@@ -10,6 +10,7 @@ headers = [
   'wingnamedpipeconnection.h',
   'wingnamedpipelistener.h',
   'wingoutputstream.h',
+  'wingpoll.h',
   'wingservice.h',
   'wingservicemanager.h',
   'wingsource.h',
@@ -27,6 +28,7 @@ sources = [
   'wingnamedpipeconnection.c',
   'wingnamedpipelistener.c',
   'wingoutputstream.c',
+  'wingpoll.c',
   'wingservice.c',
   'wingservice-private.h',
   'wingservicemanager.c',
diff --git a/wing/wing.h b/wing/wing.h
index 7113f06..ce3badd 100644
--- a/wing/wing.h
+++ b/wing/wing.h
@@ -29,6 +29,7 @@
 #include <wing/wingnamedpipelistener.h>
 #include <wing/wingnamedpipeconnection.h>
 #include <wing/wingoutputstream.h>
+#include <wing/wingpoll.h>
 #include <wing/wingservice.h>
 #include <wing/wingservicemanager.h>
 #include <wing/wingsource.h>
diff --git a/wing/wingpoll.c b/wing/wingpoll.c
new file mode 100644
index 0000000..0b63aa6
--- /dev/null
+++ b/wing/wingpoll.c
@@ -0,0 +1,403 @@
+/*
+ * Copyright (C) 1995-1997  Peter Mattis, Spencer Kimball and Josh MacDonald
+ *
+ * gpoll.c: poll(2) abstraction
+ * Copyright 1998 Owen Taylor
+ * Copyright 2008 Red Hat, Inc.
+ * Copyright (C) 2018 NICE s.r.l.
+ * 
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.         See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, see <http://www.gnu.org/licenses/>.
+ */
+
+/*
+ * Modified by the GLib Team and others 1997-2000.  See the AUTHORS
+ * file for a list of people on the GLib Team.  See the ChangeLog
+ * files for a list of changes.  These files are distributed with
+ * GLib at ftp://ftp.gtk.org/pub/gtk/.
+ */
+
+#include "wingpoll.h"
+
+#ifdef G_OS_WIN32
+#define STRICT
+#include <windows.h>
+#include <process.h>
+#endif /* G_OS_WIN32 */
+
+#ifdef _WIN32
+/* Always enable debugging printout on Windows, as it is more often
+ * needed there...
+ */
+#define G_MAIN_POLL_DEBUG
+#endif
+
+#ifdef G_MAIN_POLL_DEBUG
+static gboolean _wing_main_poll_debug = FALSE;
+#endif
+
+static int
+poll_rest (GPollFD *msg_fd,
+           GPollFD *stop_fd,
+           HANDLE  *handles,
+           GPollFD *handle_to_fd[],
+           gint     nhandles,
+           gint     timeout)
+{
+  DWORD ready;
+  GPollFD *f;
+  int recursed_result;
+
+  if (msg_fd != NULL)
+    {
+      /* Wait for either messages or handles
+       * -> Use MsgWaitForMultipleObjectsEx
+       */
+      if (_wing_main_poll_debug)
+        g_print ("  MsgWaitForMultipleObjectsEx(%d, %d)\n", nhandles, timeout);
+
+      ready = MsgWaitForMultipleObjectsEx (nhandles, handles, timeout,
+                                           QS_ALLINPUT, MWMO_ALERTABLE);
+
+      if (ready == WAIT_FAILED)
+        {
+          gchar *emsg;
+
+          emsg = g_win32_error_message (GetLastError ());
+          g_warning ("MsgWaitForMultipleObjectsEx failed: %s", emsg);
+          g_free (emsg);
+        }
+    }
+  else if (nhandles == 0)
+    {
+      /* No handles to wait for, just the timeout */
+      if (timeout == INFINITE)
+        ready = WAIT_FAILED;
+      else
+        {
+          /* Wait for the current process to die, more efficient than SleepEx(). */
+          WaitForSingleObjectEx (GetCurrentProcess (), timeout, TRUE);
+          ready = WAIT_TIMEOUT;
+        }
+    }
+  else
+    {
+      /* Wait for just handles
+       * -> Use WaitForMultipleObjectsEx
+       */
+      if (_wing_main_poll_debug)
+        g_print ("  WaitForMultipleObjectsEx(%d, %d)\n", nhandles, timeout);
+
+      ready = WaitForMultipleObjectsEx (nhandles, handles, FALSE, timeout, TRUE);
+      if (ready == WAIT_FAILED)
+        {
+          gchar *emsg;
+
+          emsg = g_win32_error_message (GetLastError ());
+          g_warning ("WaitForMultipleObjectsEx failed: %s", emsg);
+          g_free (emsg);
+        }
+    }
+
+  if (_wing_main_poll_debug)
+    g_print ("  wait returns %ld%s\n",
+             ready,
+             (ready == WAIT_FAILED ? " (WAIT_FAILED)" :
+              (ready == WAIT_TIMEOUT ? " (WAIT_TIMEOUT)" :
+               (msg_fd != NULL && ready == WAIT_OBJECT_0 + nhandles ? " (msg)" : ""))));
+
+  if (ready == WAIT_FAILED)
+    return -1;
+  else if (ready == WAIT_TIMEOUT ||
+           ready == WAIT_IO_COMPLETION)
+    return 0;
+  else if (msg_fd != NULL && ready == WAIT_OBJECT_0 + nhandles)
+    {
+      msg_fd->revents |= G_IO_IN;
+
+      /* If we have a timeout, or no handles to poll, be satisfied
+       * with just noticing we have messages waiting.
+       */
+      if (timeout != 0 || nhandles == 0)
+        return 1;
+
+      /* If no timeout and handles to poll, recurse to poll them,
+       * too.
+       */
+      recursed_result = poll_rest (NULL, stop_fd, handles, handle_to_fd, nhandles, 0);
+      return (recursed_result == -1) ? -1 : 1 + recursed_result;
+    }
+  else if (ready >= WAIT_OBJECT_0 && ready < WAIT_OBJECT_0 + nhandles)
+    {
+      int retval;
+
+      f = handle_to_fd[ready - WAIT_OBJECT_0];
+      f->revents = f->events;
+      if (_wing_main_poll_debug)
+        g_print ("  got event %p\n", (HANDLE) f->fd);
+
+      /* Do not count the stop_fd */
+      retval = (f != stop_fd) ? 1 : 0;
+
+      /* If no timeout and polling several handles, recurse to poll
+       * the rest of them.
+       */
+      if (timeout == 0 && nhandles > 1)
+        {
+          /* Poll the handles with index > ready */
+          HANDLE *shorter_handles;
+          GPollFD **shorter_handle_to_fd;
+          gint shorter_nhandles;
+
+          shorter_handles = &handles[ready - WAIT_OBJECT_0 + 1];
+          shorter_handle_to_fd = &handle_to_fd[ready - WAIT_OBJECT_0 + 1];
+          shorter_nhandles = nhandles - (ready - WAIT_OBJECT_0 + 1);
+
+          recursed_result = poll_rest (NULL, stop_fd, shorter_handles, shorter_handle_to_fd, 
shorter_nhandles, 0);
+          return (recursed_result == -1) ? -1 : retval + recursed_result;
+        }
+      return retval;
+    }
+
+  return 0;
+}
+
+typedef struct
+{
+  HANDLE handles[MAXIMUM_WAIT_OBJECTS];
+  GPollFD *handle_to_fd[MAXIMUM_WAIT_OBJECTS];
+  GPollFD *msg_fd;
+  GPollFD *stop_fd;
+  gint nhandles;
+  gint timeout;
+} GWin32PollThreadData;
+
+static gint
+poll_single_thread (GWin32PollThreadData *data)
+{
+  int retval;
+
+  /* Polling for several things? */
+  if (data->nhandles > 1 || (data->nhandles > 0 && data->msg_fd != NULL))
+    {
+      /* First check if one or several of them are immediately
+       * available
+       */
+      retval = poll_rest (data->msg_fd, data->stop_fd, data->handles, data->handle_to_fd, data->nhandles, 0);
+
+      /* If not, and we have a significant timeout, poll again with
+       * timeout then. Note that this will return indication for only
+       * one event, or only for messages.
+       */
+      if (retval == 0 && (data->timeout == INFINITE || data->timeout > 0))
+        retval = poll_rest (data->msg_fd, data->stop_fd, data->handles, data->handle_to_fd, data->nhandles, 
data->timeout);
+    }
+  else
+    {
+      /* Just polling for one thing, so no need to check first if
+       * available immediately
+       */
+      retval = poll_rest (data->msg_fd, data->stop_fd, data->handles, data->handle_to_fd, data->nhandles, 
data->timeout);
+    }
+
+  return retval;
+}
+
+static void
+fill_poll_thread_data (GPollFD              *fds,
+                       guint                 nfds,
+                       gint                  timeout,
+                       GPollFD              *stop_fd,
+                       GWin32PollThreadData *data)
+{
+  GPollFD *f;
+
+  data->timeout = timeout;
+
+  if (stop_fd != NULL)
+    {
+      if (_wing_main_poll_debug)
+        g_print (" Stop FD: %p", (HANDLE) stop_fd->fd);
+
+      data->stop_fd = stop_fd;
+      data->handle_to_fd[data->nhandles] = stop_fd;
+      data->handles[data->nhandles++] = (HANDLE) stop_fd->fd;
+    }
+
+  for (f = fds; f < &fds[nfds]; ++f)
+    {
+      if ((data->nhandles == MAXIMUM_WAIT_OBJECTS) ||
+          (data->msg_fd != NULL && (data->nhandles == MAXIMUM_WAIT_OBJECTS - 1)))
+        {
+          g_warning ("Too many handles to wait for!");
+          break;
+        }
+
+      if (f->fd == G_WIN32_MSG_HANDLE && (f->events & G_IO_IN))
+        {
+          if (_wing_main_poll_debug && data->msg_fd == NULL)
+            g_print (" MSG");
+          data->msg_fd = f;
+        }
+      else if (f->fd > 0)
+        {
+          if (_wing_main_poll_debug)
+            g_print (" %p", (HANDLE) f->fd);
+          data->handle_to_fd[data->nhandles] = f;
+          data->handles[data->nhandles++] = (HANDLE) f->fd;
+        }
+
+      f->revents = 0;
+    }
+}
+
+static guint __stdcall
+poll_thread_run (gpointer user_data)
+{
+  GWin32PollThreadData *data = user_data;
+
+  /* Docs say that it is safer to call _endthreadex by our own */
+  _endthreadex (poll_single_thread (data));
+
+  g_assert_not_reached ();
+
+  return 0;
+}
+
+/* One slot for a possible msg object and another for the stop event */
+#define MAXIMUM_WAIT_OBJECTS_PER_THREAD (MAXIMUM_WAIT_OBJECTS - 2)
+
+gint
+wing_poll (GPollFD *fds,
+           guint    nfds,
+           gint     timeout)
+{
+  guint nthreads, threads_remain;
+  HANDLE thread_handles[MAXIMUM_WAIT_OBJECTS];
+  GWin32PollThreadData *threads_data;
+  GPollFD stop_event = { 0, };
+  GPollFD *f;
+  guint i, fds_idx = 0;
+  DWORD ready;
+  DWORD thread_retval;
+  int retval;
+  GPollFD *msg_fd = NULL;
+
+  if (timeout == -1)
+    timeout = INFINITE;
+
+  /* Simple case without extra threads */
+  if (nfds <= MAXIMUM_WAIT_OBJECTS)
+    {
+      GWin32PollThreadData data = { 0, };
+
+      if (_wing_main_poll_debug)
+        g_print ("wing_poll: waiting for");
+
+      fill_poll_thread_data (fds, nfds, timeout, NULL, &data);
+
+      if (_wing_main_poll_debug)
+        g_print ("\n");
+
+      retval = poll_single_thread (&data);
+      if (retval == -1)
+        for (f = fds; f < &fds[nfds]; ++f)
+          f->revents = 0;
+
+      return retval;
+    }
+
+  if (_wing_main_poll_debug)
+    g_print ("wing_poll: polling with threads\n");
+
+  nthreads = nfds / MAXIMUM_WAIT_OBJECTS_PER_THREAD;
+  threads_remain = nfds % MAXIMUM_WAIT_OBJECTS_PER_THREAD;
+  if (threads_remain > 0)
+    nthreads++;
+
+  if (nthreads > MAXIMUM_WAIT_OBJECTS)
+    {
+      g_warning ("Too many handles to wait for in threads!");
+      nthreads = MAXIMUM_WAIT_OBJECTS;
+    }
+
+#if GLIB_SIZEOF_VOID_P == 8
+  stop_event.fd = (gint64)CreateEventW (NULL, TRUE, FALSE, NULL);
+#else
+  stop_event.fd = (gint)CreateEventW (NULL, TRUE, FALSE, NULL);
+#endif
+  stop_event.events = G_IO_IN;
+
+  threads_data = g_new0 (GWin32PollThreadData, nthreads);
+  for (i = 0; i < nthreads; i++)
+    {
+      guint thread_fds;
+      guint ignore;
+
+      if (i == (nthreads - 1) && threads_remain > 0)
+        thread_fds = threads_remain;
+      else
+        thread_fds = MAXIMUM_WAIT_OBJECTS_PER_THREAD;
+
+      fill_poll_thread_data (fds + fds_idx, thread_fds, timeout, &stop_event, &threads_data[i]);
+      fds_idx += thread_fds;
+
+      /* We must poll for messages from the same thread, so poll it along with the threads */
+      if (threads_data[i].msg_fd != NULL)
+        {
+          msg_fd = threads_data[i].msg_fd;
+          threads_data[i].msg_fd = NULL;
+        }
+
+      thread_handles[i] = (HANDLE) _beginthreadex (NULL, 0, poll_thread_run, &threads_data[i], 0, &ignore);
+    }
+
+  /* Wait for at least one thread to return */
+  if (msg_fd != NULL)
+    ready = MsgWaitForMultipleObjectsEx (nthreads, thread_handles, timeout,
+                                         QS_ALLINPUT, MWMO_ALERTABLE);
+  else
+    ready = WaitForMultipleObjects (nthreads, thread_handles, timeout > 0, timeout);
+
+  /* Signal the stop in case any of the threads did not stop yet */
+  SetEvent ((HANDLE)stop_event.fd);
+
+  /* Wait for the rest of the threads to finish */
+  WaitForMultipleObjects (nthreads, thread_handles, TRUE, INFINITE);
+
+  /* The return value of all the threads give us all the fds that changed state */
+  retval = 0;
+  if (msg_fd != NULL && ready == WAIT_OBJECT_0 + nthreads)
+    {
+      msg_fd->revents |= G_IO_IN;
+      retval = 1;
+    }
+
+  for (i = 0; i < nthreads; i++)
+    {
+      if (GetExitCodeThread (thread_handles[i], &thread_retval))
+        retval = retval == -1 ? -1 : thread_retval == -1 ? -1 : retval + thread_retval;
+
+      CloseHandle (thread_handles[i]);
+    }
+
+  if (retval == -1)
+    for (f = fds; f < &fds[nfds]; ++f)
+      f->revents = 0;
+
+  g_free (threads_data);
+  CloseHandle ((HANDLE)stop_event.fd);
+
+  return retval;
+}
diff --git a/wing/wingpoll.h b/wing/wingpoll.h
new file mode 100644
index 0000000..111cbb6
--- /dev/null
+++ b/wing/wingpoll.h
@@ -0,0 +1,46 @@
+/*
+ * Copyright (C) 1995-1997  Peter Mattis, Spencer Kimball and Josh MacDonald
+ *
+ * gpoll.c: poll(2) abstraction
+ * Copyright 1998 Owen Taylor
+ * Copyright 2008 Red Hat, Inc.
+ * Copyright (C) 2018 NICE s.r.l.
+ * 
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.         See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, see <http://www.gnu.org/licenses/>.
+ */
+
+/*
+ * Modified by the GLib Team and others 1997-2000.  See the AUTHORS
+ * file for a list of people on the GLib Team.  See the ChangeLog
+ * files for a list of changes.  These files are distributed with
+ * GLib at ftp://ftp.gtk.org/pub/gtk/.
+ */
+
+#ifndef WING_POLL_H
+#define WING_POLL_H
+
+#include <glib.h>
+#include <wing/wingversionmacros.h>
+
+G_BEGIN_DECLS
+
+WING_AVAILABLE_IN_ALL
+gint wing_poll (GPollFD *fds,
+                guint    nfds,
+                gint     timeout);
+
+G_END_DECLS
+
+#endif /* WING_UTILS_H */


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]