[glib/wip/gmaincontext: 12/12] pollcore!



commit f8e2ac9319c7b9024888a63a4fa945a750811732
Author: Ryan Lortie <desrt desrt ca>
Date:   Sat Feb 15 18:06:59 2014 -0500

    pollcore!

 configure.ac            |   16 +++
 glib/Makefile.am        |   16 +++
 glib/gpollcore-epoll.c  |  158 +++++++++++++++++++++++++++
 glib/gpollcore-kqueue.c |  220 ++++++++++++++++++++++++++++++++++++++
 glib/gpollcore-poll.c   |  231 ++++++++++++++++++++++++++++++++++++++++
 glib/gpollcore-win32.c  |  270 +++++++++++++++++++++++++++++++++++++++++++++++
 glib/gpollcore.h        |  154 +++++++++++++++++++++++++++
 glib/tests/Makefile.am  |    3 +
 glib/tests/pollcore.c   |  184 ++++++++++++++++++++++++++++++++
 9 files changed, 1252 insertions(+), 0 deletions(-)
---
diff --git a/configure.ac b/configure.ac
index c38cd35..ed4ab49 100644
--- a/configure.ac
+++ b/configure.ac
@@ -115,6 +115,22 @@ AC_USE_SYSTEM_EXTENSIONS
 
 AM_CONDITIONAL(HAVE_GCC, [test "$GCC" = "yes"])
 
+pollcore="kqueue"
+
+if test "$pollcore" = "kqueue"; then
+  AC_DEFINE(POLLCORE_KQUEUE, [1], [Defined if we use kqueue])
+elif test "$pollcore" = "epoll"; then
+  AC_DEFINE(POLLCORE_EPOLL,  [1], [Defined if we use epoll])
+elif test "$pollcore" = "win32"; then
+  AC_DEFINE(POLLCORE_WIN32,  [1], [Defined if we use win32 poll core])
+else
+  AC_DEFINE(POLLCORE_POLL,   [1], [Defined to use poll emulation])
+fi
+AM_CONDITIONAL(POLLCORE_KQUEUE, [test "$pollcore" = "kqueue"])
+AM_CONDITIONAL(POLLCORE_EPOLL,  [test "$pollcore" = "epoll"])
+AM_CONDITIONAL(POLLCORE_WIN32,  [test "$pollcore" = "win32"])
+AM_CONDITIONAL(POLLCORE_POLL,   [test "$pollcore" = "poll"])
+
 AC_CANONICAL_HOST
 
 dnl
diff --git a/glib/Makefile.am b/glib/Makefile.am
index 15377ec..8cbfe21 100644
--- a/glib/Makefile.am
+++ b/glib/Makefile.am
@@ -208,6 +208,22 @@ if OS_UNIX
 libglib_2_0_la_SOURCES += glib-unix.c
 endif
 
+if POLLCORE_KQUEUE
+libglib_2_0_la_SOURCES += gpollcore-kqueue.c
+endif
+
+if POLLCORE_EPOLL
+libglib_2_0_la_SOURCES += gpollcore-epoll.c
+endif
+
+if POLLCORE_WIN32
+libglib_2_0_la_SOURCES += gpollcore-win32.c
+endif
+
+if POLLCORE_POLL
+libglib_2_0_la_SOURCES += gpollcore-poll.c
+endif
+
 if THREADS_WIN32
 libglib_2_0_la_SOURCES += gthread-win32.c
 else
diff --git a/glib/gpollcore-epoll.c b/glib/gpollcore-epoll.c
new file mode 100644
index 0000000..a9edb01
--- /dev/null
+++ b/glib/gpollcore-epoll.c
@@ -0,0 +1,158 @@
+/*
+ * Copyright © 2014 Canonical Limited
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the licence, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, see <http://www.gnu.org/licenses/>.
+ *
+ * Author: Ryan Lortie <desrt desrt ca>
+ */
+
+#include "config.h"
+
+#include "gpollcore.h"
+
+#include <sys/timerfd.h>
+#include <sys/epoll.h>
+#include <sys/poll.h>
+#include <unistd.h>
+#include <errno.h>
+
+void
+g_poll_core_update (GPollCore *core,
+                    gint       fd,
+                    guint      old_events,
+                    guint      new_events,
+                    gpointer   user_data)
+{
+  struct epoll_event event;
+  gint ret;
+  gint op;
+
+  event.events = new_events;
+  event.data.ptr = user_data;
+
+  if (old_events == 0)
+    op = EPOLL_CTL_ADD;
+  else if (new_events == 0)
+    op = EPOLL_CTL_DEL;
+  else
+    op = EPOLL_CTL_MOD;
+
+  ret = epoll_ctl (core->epollfd, op, fd, &event);
+
+  if (ret != 0)
+    g_error ("gpollcore: epoll_ctl() fail: %s\n", g_strerror (errno));
+}
+
+void
+g_poll_core_set_ready_time (GPollCore *core,
+                            gint64     ready_time)
+{
+  struct itimerspec its;
+  gint ret;
+
+  if (ready_time >= 0)
+    {
+      /* Arm */
+      its.it_value.tv_sec = (ready_time / G_TIME_SPAN_SECOND);
+      its.it_value.tv_nsec = (ready_time % G_TIME_SPAN_SECOND) * 1000;
+
+      /* Make sure we don't disarm the timer for a ready_time of 0 */
+      if (!its.it_value.tv_sec && !its.it_value.tv_nsec)
+        its.it_value.tv_nsec = 1;
+    }
+  else
+    /* All-zeros = disarm */
+    its.it_value.tv_sec = its.it_value.tv_nsec = 0;
+
+  its.it_interval.tv_sec = its.it_interval.tv_nsec = 0;
+
+  ret = timerfd_settime (core->timerfd, TFD_TIMER_ABSTIME, &its, NULL);
+
+  if (ret != 0)
+    g_error ("gpollcore: timerfd_settime() fail: %s\n", g_strerror (errno));
+}
+
+void
+g_poll_core_wait (GPollCore *core)
+{
+  struct pollfd pfd;
+
+  pfd.fd = core->epollfd;
+  pfd.events = POLLIN;
+
+  poll (&pfd, 1, -1);
+}
+
+gint
+g_poll_core_update_and_collect (GPollCore  *core,
+                                GHashTable *updates,
+                                gint64     *ready_time_update,
+                                GPollEvent *events,
+                                gint        max_events)
+{
+  if (ready_time_update)
+    g_poll_core_set_ready_time (core, *ready_time_update);
+
+  if (updates)
+    {
+      GHashTableIter iter;
+      gpointer key, value;
+
+      g_hash_table_iter_init (&iter, updates);
+      while (g_hash_table_iter_next (&iter, &key, &value))
+        {
+          GPollUpdate *update = value;
+
+          g_poll_core_update (core, GPOINTER_TO_INT (key), update->old_events, update->new_events, 
update->user_data);
+        }
+    }
+
+  return epoll_wait (core->epollfd, events, max_events, 0);
+}
+
+gint
+g_poll_core_get_unix_fd (GPollCore *core)
+{
+  return core->epollfd;
+}
+
+void
+g_poll_core_init (GPollCore *core)
+{
+  struct epoll_event ev;
+  gint ret;
+
+  core->epollfd = epoll_create1 (EPOLL_CLOEXEC);
+  if (core->epollfd < 0)
+    g_error ("gpollcore: epoll_create1() fail: %s\n", g_strerror (errno));
+
+  core->timerfd = timerfd_create (CLOCK_MONOTONIC, TFD_CLOEXEC);
+  if (core->timerfd < 0)
+    g_error ("gpollcore: timerfd_create() fail: %s\n", g_strerror (errno));
+
+  ev.events = EPOLLIN;
+  ev.data.ptr = NULL;
+
+  ret = epoll_ctl (core->epollfd, EPOLL_CTL_ADD, core->timerfd, &ev);
+
+  if (ret < 0)
+    g_error ("gpollcore: epoll_ctl() fail [init]: %s\n", g_strerror (errno));
+}
+
+void
+g_poll_core_clear (GPollCore *core)
+{
+  close (core->epollfd);
+  close (core->timerfd);
+}
diff --git a/glib/gpollcore-kqueue.c b/glib/gpollcore-kqueue.c
new file mode 100644
index 0000000..274672a
--- /dev/null
+++ b/glib/gpollcore-kqueue.c
@@ -0,0 +1,220 @@
+/*
+ * Copyright © 2014 Canonical Limited
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the licence, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, see <http://www.gnu.org/licenses/>.
+ *
+ * Author: Ryan Lortie <desrt desrt ca>
+ */
+
+#include "config.h"
+
+#include "gpollcore.h"
+
+#include <sys/poll.h>
+#include <unistd.h>
+#include <errno.h>
+
+static gboolean
+g_poll_core_create_ready_time_update (struct kevent *events,
+                                      gint           n_events,
+                                      gint          *n_changes,
+                                      gint64         ready_time)
+{
+  if (*n_changes == n_events)
+    return FALSE;
+
+  if (ready_time < 0)
+    {
+      EV_SET (&events[*n_changes], 0, EVFILT_TIMER, EV_DELETE, 0, 0, NULL);
+    }
+  else
+    {
+#if defined(NOTE_USECONDS) && defined(NOTE_ABSOLUTE)
+      /* MacOS has a more-capable kevent() than the BSDs.
+       *
+       * It allows us to set the timer as an absolute monotonic time and
+       * also allows for microsecond accuracy.
+       */
+      EV_SET (&events[*n_changes], 0, EVFILT_TIMER, EV_ADD | EV_ONESHOT,
+              NOTE_ABSOLUTE | NOTE_USECONDS, ready_time, NULL);
+#else
+      /* Need to do calculations to get milliseconds of relative time */
+      gint timeout;
+
+      if (ready_time > 0)
+        {
+          gint64 now = g_get_monotonic_time ();
+
+          if (now < ready_time)
+            timeout = (ready_time - now + 999) / 1000;
+          else
+            timeout = 0;
+        }
+      else /* ready_time == 0 */
+        timeout = 0;
+
+      EV_SET (&events[*n_changes], 0, EVFILT_TIMER, EV_ADD | EV_ONESHOT, 0, timeout, NULL);
+#endif
+    }
+
+  (*n_changes)++;
+
+  return TRUE;
+}
+
+static gboolean
+g_poll_core_create_fd_update (struct kevent *events,
+                              gint           n_events,
+                              gint          *n_changes,
+                              gint           fd,
+                              guint          old_events,
+                              guint          new_events,
+                              gpointer       user_data)
+{
+  if ((old_events ^ new_events) & G_IO_IN)
+    {
+      if (*n_changes == n_events)
+        return FALSE;
+
+      if (new_events & G_IO_IN)
+        EV_SET (&events[*n_changes], fd, EVFILT_READ, EV_ADD, 0, 0, user_data);
+      else
+        EV_SET (&events[*n_changes], fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
+
+      (*n_changes)++;
+    }
+
+  if ((old_events ^ new_events) & G_IO_OUT)
+    {
+      if (*n_changes == n_events)
+        return FALSE;
+
+      if (new_events & G_IO_OUT)
+        EV_SET (&events[*n_changes], fd, EVFILT_WRITE, EV_ADD, 0, 0, user_data);
+      else
+        EV_SET (&events[*n_changes], fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
+
+      (*n_changes)++;
+    }
+
+  return TRUE;
+}
+
+void
+g_poll_core_update (GPollCore *core,
+                    gint       fd,
+                    guint      old_events,
+                    guint      new_events,
+                    gpointer   user_data)
+{
+  struct kevent kev[2];
+  gint n_changes = 0;
+
+  g_poll_core_create_fd_update (kev, G_N_ELEMENTS (kev), &n_changes, fd, old_events, new_events, user_data);
+  kevent (core->kqueue_fd, kev, n_changes, NULL, 0, NULL);
+}
+
+void
+g_poll_core_set_ready_time (GPollCore *core,
+                            gint64     ready_time)
+{
+  struct kevent kev[1];
+  gint n_changes = 0;
+
+  g_poll_core_create_ready_time_update (kev, G_N_ELEMENTS (kev), &n_changes, ready_time);
+  kevent (core->kqueue_timer, kev, n_changes, NULL, 0, NULL);
+}
+
+void
+g_poll_core_wait (GPollCore *core,
+                  GMutex    *mutex)
+{
+  struct pollfd pfd;
+
+  pfd.fd = core->kqueue_fd;
+  pfd.events = POLLIN;
+
+  g_mutex_unlock (mutex);
+  poll (&pfd, 1, -1);
+  g_mutex_lock (mutex);
+}
+
+gint
+g_poll_core_update_and_collect (GPollCore  *core,
+                                GHashTable *updates,
+                                gint64     *ready_time_update,
+                                GPollEvent *events,
+                                gint        max_events)
+{
+  struct timespec zero = { 0, 0 };
+  gint n_changes = 0;
+
+  if (updates)
+    {
+      GHashTableIter iter;
+      gpointer key, value;
+
+      g_hash_table_iter_init (&iter, updates);
+      while (g_hash_table_iter_next (&iter, &key, &value))
+        {
+          GPollUpdate *update = value;
+
+          if (!g_poll_core_create_fd_update (events, max_events, &n_changes, GPOINTER_TO_INT (key),
+                                             update->old_events, update->new_events, update->user_data))
+            return n_changes;
+        }
+    }
+
+  /* We convert absolute to relative time here, so try to do it as close
+   * as possible to the kevent() call.
+   */
+  if (ready_time_update)
+    g_poll_core_set_ready_time (core, *ready_time_update);
+    //if (!g_poll_core_create_ready_time_update (events, max_events, &n_changes, *ready_time_update))
+      //return n_changes;
+
+  return kevent (core->kqueue_fd, events, n_changes, events, max_events, &zero);
+}
+
+gint
+g_poll_core_get_unix_fd (GPollCore *core)
+{
+  return core->kqueue_fd;
+}
+
+void
+g_poll_core_init (GPollCore *core)
+{
+  struct kevent ev;
+  gint ret;
+
+  core->kqueue_fd = kqueue ();
+  if (core->kqueue_fd < 0)
+    g_error ("gpollcore: kqueue() fail: %s", g_strerror (errno));
+
+  core->kqueue_timer = kqueue ();
+  if (core->kqueue_timer < 0)
+    g_error ("gpollcore: kqueue() fail [timer]: %s", g_strerror (errno));
+
+  EV_SET (&ev, core->kqueue_timer, EVFILT_READ, EV_ADD, 0, 0, NULL);
+  ret = kevent (core->kqueue_fd, &ev, 1, NULL, 0, NULL);
+  if (ret < 0)
+    g_error ("gpollcore: kevent() fail [init]: %s", g_strerror (errno));
+}
+
+void
+g_poll_core_clear (GPollCore *core)
+{
+  close (core->kqueue_fd);
+}
diff --git a/glib/gpollcore-poll.c b/glib/gpollcore-poll.c
new file mode 100644
index 0000000..9575316
--- /dev/null
+++ b/glib/gpollcore-poll.c
@@ -0,0 +1,231 @@
+/*
+ * Copyright © 2014 Canonical Limited
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the licence, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, see <http://www.gnu.org/licenses/>.
+ *
+ * Author: Ryan Lortie <desrt desrt ca>
+ */
+
+#include "config.h"
+
+#include "gpollcore.h"
+
+#include <sys/poll.h>
+#include <unistd.h>
+#include <string.h>
+#include <errno.h>
+
+static void
+g_poll_core_update_locked (GPollCore *core,
+                           gint       fd,
+                           guint      old_events,
+                           guint      new_events,
+                           gpointer   user_data)
+{
+  gint position;
+
+  if (old_events)
+    {
+      for (position = 0; position < core->n_pfds; position++)
+        if (core->pfds[position].fd == fd)
+          break;
+      g_assert (position < core->n_pfds);
+    }
+  else
+    position = core->n_pfds;
+
+  if (new_events)
+    {
+      if (position == core->n_allocated_pfds)
+        {
+          core->n_allocated_pfds *= 2;
+          core->pfds = g_renew (struct pollfd, core->pfds, core->n_allocated_pfds);
+          core->user_data = g_renew (gpointer, core->user_data, core->n_allocated_pfds);
+        }
+
+      core->pfds[position].fd = fd;
+      core->pfds[position].events = new_events;
+      core->user_data[position] = user_data;
+
+      if (position == core->n_pfds)
+        core->n_pfds++;
+    }
+}
+
+void
+g_poll_core_update (GPollCore *core,
+                    gint       fd,
+                    guint      old_events,
+                    guint      new_events,
+                    gpointer   user_data)
+{
+  g_mutex_lock (&core->mutex);
+
+  g_poll_core_update_locked (core, fd, old_events, new_events, user_data);
+
+  if (core->waiting)
+    {
+      gint ret;
+
+      do
+        ret = write (core->pipes[1], "x", 1);
+      while (ret == -1 && errno == EINTR);
+    }
+
+  g_mutex_unlock (&core->mutex);
+}
+
+void
+g_poll_core_set_ready_time (GPollCore *core,
+                            gint64     ready_time)
+{
+  g_mutex_lock (&core->mutex);
+
+  /* We want to wake the owner thread if it is sleeping and if the
+   * current timeout is greater than the new one.
+   */
+  if (core->waiting && ready_time >= 0 && ready_time < core->ready_time)
+    {
+      gint ret;
+
+      do
+        ret = write (core->pipes[1], "x", 1);
+      while (ret == -1 && errno == EINTR);
+    }
+
+  core->ready_time = ready_time;
+
+  g_mutex_unlock (&core->mutex);
+}
+
+void
+g_poll_core_wait (GPollCore *core)
+{
+  struct pollfd *pfds;
+  gint n_pfds;
+  gint timeout;
+  gint result;
+
+  g_mutex_lock (&core->mutex);
+
+again:
+  pfds = g_new (struct pollfd, core->n_pfds + 1);
+  pfds[0].fd = core->pipes[0];
+  pfds[1].events = POLLIN;
+  memcpy (pfds + 1, core->pfds, sizeof (struct pollfd) * core->n_pfds);
+  n_pfds = core->n_pfds + 1;
+
+  if (core->ready_time > 0)
+    {
+      gint64 now = g_get_monotonic_time ();
+
+      if (now < core->ready_time)
+        timeout = (core->ready_time - now + 999) / 1000;
+      else
+        timeout = 0;
+    }
+  else if (core->ready_time == 0)
+    timeout = 0;
+  else
+    timeout = -1;
+
+  core->waiting = TRUE;
+
+  g_mutex_unlock (&core->mutex);
+
+  do
+    result = poll (pfds, n_pfds, timeout);
+  while (result < 0 && errno == EINTR);
+
+  if (result < 0)
+    g_error ("gpollcore: poll() fail [wait]: %s", g_strerror (errno));
+
+  g_mutex_lock (&core->mutex);
+
+  core->waiting = FALSE;
+
+  if (pfds[0].revents & POLLIN)
+    {
+      char buffer[20];
+
+      while (read (core->pipes[0], buffer, sizeof buffer) > 0)
+        ;
+      g_free (pfds);
+      goto again;
+    }
+
+  g_free (pfds);
+
+  g_mutex_unlock (&core->mutex);
+}
+
+gint
+g_poll_core_update_and_collect (GPollCore  *core,
+                                GHashTable *updates,
+                                gint64     *ready_time_update,
+                                GPollEvent *events,
+                                gint        max_events)
+{
+  gint n_collected = 0;
+  gint n_ready;
+  gint i;
+
+  /* We are protected by the GMainContext lock here, so no need to use
+   * our own...
+   */
+
+  /* Make sure there is room for timeout */
+  g_assert (max_events >= 1);
+
+  if (ready_time_update)
+    core->ready_time = *ready_time_update;
+
+  if (updates)
+    {
+      GHashTableIter iter;
+      gpointer key, value;
+
+      g_hash_table_iter_init (&iter, updates);
+      while (g_hash_table_iter_next (&iter, &key, &value))
+        {
+          GPollUpdate *update = value;
+
+          g_poll_core_update_locked (core, GPOINTER_TO_INT (key),
+                                     update->old_events, update->new_events, update->user_data);
+        }
+    }
+
+  /* Check for timeout */
+  if (core->ready_time < g_get_monotonic_time ())
+    events[n_collected++].user_data = NULL;
+
+  /* Check the file descriptors */
+  do
+    n_ready = poll (core->pfds, core->n_pfds, 0);
+  while (n_ready < 0 && errno == EINTR);
+
+  if (n_ready < 0)
+    g_error ("gpollcore: poll() fail [collect]: %s", g_strerror (errno));
+
+  for (i = 0; n_ready && i < core->n_pfds; i++)
+    if (core->pfds[i].revents)
+      {
+        events[n_collected].revents = core->pfds[i].revents;
+        events[n_collected].user_data = core->user_data[i];
+        n_collected++;
+        n_ready--;
+      }
+
+  return n_collected;
+}
diff --git a/glib/gpollcore-win32.c b/glib/gpollcore-win32.c
new file mode 100644
index 0000000..4732d62
--- /dev/null
+++ b/glib/gpollcore-win32.c
@@ -0,0 +1,270 @@
+/*
+ * Copyright © 2014 Canonical Limited
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the licence, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, see <http://www.gnu.org/licenses/>.
+ *
+ * Author: Ryan Lortie <desrt desrt ca>
+ */
+
+#include "config.h"
+
+#include "gpollcore.h"
+
+#include <windows.h>
+
+static HANDLE
+get_thread_handle (void)
+{
+  static GPrivate this_thread = G_PRIVATE_INIT ((GDestroyNotify) CloseHandle);
+  HANDLE thread;
+
+  thread = g_private_get (&this_thread);
+  if (thread == NULL)
+    {
+      gboolean success;
+
+      success = DuplicateHandle (GetCurrentProcess (), GetCurrentThread (), GetCurrentProcess (),
+                                 &thread, DUPLICATE_SAME_ACCESS, FALSE, DUPLICATE_SAME_ACCESS);
+      if (!success)
+        g_error ("gpollcore: DuplicateHandle() fail: %u", (guint) GetLastError ());
+
+      g_private_set (&this_thread, thread);
+    }
+
+  return thread;
+}
+
+static gboolean
+g_poll_core_update_locked (GPollCore *core,
+                           HANDLE     handle,
+                           guint      old_events,
+                           guint      new_events,
+                           gpointer   user_data)
+{
+  /* We only care about this one flag */
+  if ((old_events ^ new_events) & G_IO_IN)
+    {
+      gboolean enabled = !!(new_events & G_IO_IN);
+
+      if (handle == GUINT_TO_POINTER (G_WIN32_MSG_HANDLE))
+        {
+          core->polling_msgs = enabled;
+          core->msgs_user_data = enabled ? user_data : NULL;
+
+          return TRUE;
+        }
+
+      if (enabled)
+        {
+          /* Add an entry */
+          gint i;
+
+          /* paranoid checking... */
+          for (i = 0; i < core->n_handles; i++)
+            g_assert (core->handles[i] != handle);
+
+          if (core->n_handles < MAXIMUM_WAIT_OBJECTS)
+            {
+              core->handles[core->n_handles] = handle;
+              core->user_data[core->n_handles] = user_data;
+              core->n_handles++;
+
+              return TRUE;
+            }
+          else
+            {
+              g_warning ("Windows can only wait on 64 handles per thread.  Ignoring request to add new 
handle.");
+              return FALSE;
+            }
+        }
+      else
+        {
+          gint i;
+
+          /* Remove an entry */
+
+          for (i = 0; i < core->n_handles; i++)
+            if (core->handles[i] == handle)
+              {
+                core->n_handles--;
+
+                /* Maybe existing == core->n_handles now, but in that
+                 * case this is just a no-op...
+                 */
+                core->handles[i] = core->handles[core->n_handles];
+                core->user_data[i] = core->user_data[core->n_handles];
+
+                return TRUE;
+              }
+
+          g_assert_not_reached ();
+        }
+    }
+  else
+    return FALSE;
+}
+
+static void CALLBACK
+user_apc (ULONG_PTR data)
+{
+  /* Do nothing -- it is enough to wake the sleep. */
+}
+
+void
+g_poll_core_update (GPollCore *core,
+                    HANDLE     handle,
+                    guint      old_events,
+                    guint      new_events,
+                    gpointer   user_data)
+{
+  gboolean made_change;
+
+  g_mutex_lock (&core->mutex);
+
+  made_change = g_poll_core_update_locked (core, handle, old_events, new_events, user_data);
+
+  if (core->waiting_thread && made_change)
+    QueueUserAPC (user_apc, core->waiting_thread, 0);
+
+  g_mutex_unlock (&core->mutex);
+}
+
+void
+g_poll_core_set_ready_time (GPollCore *core,
+                            gint64     ready_time)
+{
+  g_mutex_lock (&core->mutex);
+
+  /* We want to wake the owner thread if it is sleeping and if the
+   * current timeout is greater than the new one.
+   */
+  if (core->waiting_thread && ready_time >= 0 && ready_time < core->ready_time)
+    QueueUserAPC (user_apc, core->waiting_thread, 0);
+
+  core->ready_time = ready_time;
+
+  g_mutex_unlock (&core->mutex);
+}
+
+void
+g_poll_core_wait (GPollCore *core)
+{
+  HANDLE handles[MAXIMUM_WAIT_OBJECTS];
+  gint n_handles;
+  DWORD timeout;
+  DWORD result;
+
+  g_mutex_lock (&core->mutex);
+
+again:
+  memcpy (handles, core->handles, sizeof (HANDLE) * core->n_handles);
+  n_handles = core->n_handles;
+
+  if (core->ready_time > 0)
+    {
+      gint64 now = g_get_monotonic_time ();
+
+      if (now < core->ready_time)
+        timeout = (core->ready_time - now + 999) / 1000;
+      else
+        timeout = 0;
+    }
+  else if (core->ready_time == 0)
+    timeout = 0;
+  else
+    timeout = INFINITE;
+
+  core->waiting_thread = get_thread_handle ();
+
+  g_mutex_unlock (&core->mutex);
+
+  /* Wait on all of the objects, ignoring any results.
+   * We will collect the results once we have retaken the lock.
+   *
+   * Set waiting_thread so that we can signal ourselves to wake up
+   * if we make changes.
+   */
+  result = MsgWaitForMultipleObjectsEx (n_handles, handles, timeout, QS_ALLEVENTS, MWMO_ALERTABLE);
+
+  g_mutex_lock (&core->mutex);
+  core->waiting_thread = NULL;
+
+  /* We allow APC in case the user wants to do it, but also because
+   * this is how we alert ourselves if the timeout or list of
+   * handles changes from another thread while we're waiting.
+   */
+  if (result == WAIT_IO_COMPLETION)
+   goto again;
+
+  g_mutex_unlock (&core->mutex);
+}
+
+gint
+g_poll_core_update_and_collect (GPollCore  *core,
+                                GHashTable *updates,
+                                gint64     *ready_time_update,
+                                GPollEvent *events,
+                                gint        max_events)
+{
+  gint n_collected = 0;
+  gint i;
+
+  /* We are protected by the GMainContext lock here, so no need to use
+   * our own...
+   */
+
+  /* Make sure there is room for timeout and msgs */
+  g_assert (max_events >= 2);
+
+  if (ready_time_update)
+    core->ready_time = *ready_time_update;
+
+  if (updates)
+    {
+      GHashTableIter iter;
+      gpointer key, value;
+
+      g_hash_table_iter_init (&iter, updates);
+      while (g_hash_table_iter_next (&iter, &key, &value))
+        {
+          GPollUpdate *update = value;
+
+          g_poll_core_update_locked (core, key, update->old_events, update->new_events, update->user_data);
+        }
+    }
+
+  /* Check for timeout */
+  if (core->ready_time < g_get_monotonic_time ())
+    events[n_collected++] = NULL;
+
+  /* Check the ready status of the message queue, if we're watching that. */
+  if (core->polling_msgs && MsgWaitForMultipleObjects (0, NULL, FALSE, 0, QS_ALLEVENTS) == 0)
+    events[n_collected++] = core->msgs_user_data;
+
+  /* Check the ready statuses of all of the handles we're watching.
+   * There are 64 of them at most (and typically a good deal fewer), so
+   * this shouldn't be too awful...
+   */
+  for (i = 0; i < core->n_handles; i++)
+    if (WaitForSingleObject (core->handles[i], 0) == 0)
+      {
+        events[n_collected++] = core->user_data[i];
+
+        if (n_collected == max_events)
+          /* This will cause us to be re-run... */
+          break;
+      }
+
+  return n_collected;
+}
diff --git a/glib/gpollcore.h b/glib/gpollcore.h
new file mode 100644
index 0000000..cb86eb4
--- /dev/null
+++ b/glib/gpollcore.h
@@ -0,0 +1,154 @@
+/*
+ * Copyright © 2014 Canonical Limited
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the licence, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, see <http://www.gnu.org/licenses/>.
+ *
+ * Author: Ryan Lortie <desrt desrt ca>
+ */
+
+#ifndef __gpollcore_h__
+#define __gpollcore_h__
+
+#include <glib.h>
+
+#if defined(POLLCORE_KQUEUE)
+
+  #include <sys/types.h>
+  #include <sys/event.h>
+  #include <sys/time.h>
+
+  typedef gint ghandle;
+
+  typedef struct kevent GPollEvent;
+
+  #define g_poll_event_get_user_data(gpe)       ((gpe).udata)
+  #define g_poll_event_get_revents(gpe)         (((gpe).filter == EVFILT_WRITE) ? POLLOUT : POLLIN)
+
+  typedef struct
+  {
+    gint kqueue_fd;
+    gint kqueue_timer;
+  } GPollCore;
+
+#elif defined(POLLCORE_EPOLL)
+
+  #include <sys/epoll.h>
+
+  typedef gint ghandle;
+
+  typedef struct epoll_event GPollEvent;
+
+  #define g_poll_event_get_user_data(gpe)       ((gpe).data.ptr)
+  #define g_poll_event_get_revents(gpe)         ((gpe).events)
+
+  typedef struct
+  {
+    gint epollfd;
+    gint timerfd;
+  } GPollCore;
+
+#elif defined(POLLCORE_WIN32)
+
+  #include <windows.h>
+
+  typedef HANDLE ghandle;
+
+  typedef gpointer GPollEvent;
+
+  #define g_poll_event_get_user_data(gpe)       (gpe)
+  #define g_poll_event_get_revents(gpe)         (G_IO_IN)
+
+  typedef struct
+  {
+    gboolean polling_msgs;
+    gpointer msgs_user_data;
+    HANDLE   handles[MAXIMUM_WAIT_OBJECTS];
+    gpointer user_data[MAXIMUM_WAIT_OBJECTS];
+    gint     n_handles;
+    gint64   ready_time;
+    HANDLE   waiting_thread;
+    GMutex   mutex;
+  } GPollCore;
+
+#elif defined(POLLCORE_POLL)
+
+  typedef gint ghandle;
+
+  typedef struct
+  {
+    struct pollfd *pfds;
+    gpointer      *user_data;
+    gint           n_pfds;
+    gint           n_allocated_pfds;
+    gint64         ready_time;
+    gboolean       waiting;
+    gint           pipes[2];
+    GMutex         mutex;
+  } GPollCore;
+
+  typedef struct
+  {
+    guint    revents;
+    gpointer user_data;
+  } GPollEvent;
+
+  #define g_poll_event_get_user_data(gpe)       ((gpe).user_data)
+  #define g_poll_event_get_revents(gpe)         ((gpe).revents)
+
+#else
+  #error This should not be possible.  Check your configuration...
+#endif
+
+typedef struct
+{
+  gpointer user_data;
+  gushort old_events;
+  gushort new_events;
+} GPollUpdate;
+
+GLIB_AVAILABLE_IN_ALL
+void            g_poll_core_init                (GPollCore  *core);
+GLIB_AVAILABLE_IN_ALL
+void            g_poll_core_clear               (GPollCore  *core);
+
+/* Called from owner thread with lock held */
+GLIB_AVAILABLE_IN_ALL
+gint            g_poll_core_update_and_collect  (GPollCore  *core,
+                                                 GHashTable *updates,
+                                                 gint64     *ready_time_update,
+                                                 GPollEvent *events,
+                                                 gint        max_events);
+
+/* Called with lock held and must release it before sleeping */
+GLIB_AVAILABLE_IN_ALL
+void            g_poll_core_wait                (GPollCore  *core,
+                                                 GMutex     *mutex);
+
+/* Called from another thread with context lock held */
+GLIB_AVAILABLE_IN_ALL
+void            g_poll_core_update              (GPollCore  *core,
+                                                 ghandle     handle,
+                                                 guint       old_events,
+                                                 guint       new_events,
+                                                 gpointer    user_data);
+
+GLIB_AVAILABLE_IN_ALL
+void            g_poll_core_set_ready_time      (GPollCore  *core,
+                                                 gint64      ready_time);
+
+/* Only on UNIX */
+GLIB_AVAILABLE_IN_ALL
+gint            g_poll_core_get_unix_fd         (GPollCore  *core);
+
+#endif /* __gpollcore_h__ */
diff --git a/glib/tests/Makefile.am b/glib/tests/Makefile.am
index daf4229..1de080c 100644
--- a/glib/tests/Makefile.am
+++ b/glib/tests/Makefile.am
@@ -12,6 +12,8 @@ spawn_multithreaded_LDFLAGS = $(patsubst -lgcov,,$(LDFLAGS))
 
 # -----------------------------------------------------------------------------
 
+noinst_PROGRAMS = pollcore
+
 dist_test_data = \
        keyfiletest.ini                 \
        pages.ini                       \
@@ -71,6 +73,7 @@ test_programs = \
        option-context                  \
        option-argv0                    \
        pattern                         \
+       pollcore                        \
        private                         \
        protocol                        \
        queue                           \
diff --git a/glib/tests/pollcore.c b/glib/tests/pollcore.c
new file mode 100644
index 0000000..20b4ab3
--- /dev/null
+++ b/glib/tests/pollcore.c
@@ -0,0 +1,184 @@
+/*
+ * Copyright © 2014 Canonical Limited
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the licence, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, see <http://www.gnu.org/licenses/>.
+ *
+ * Author: Ryan Lortie <desrt desrt ca>
+ */
+
+#include "config.h"
+
+#include <glib.h>
+
+#include "gpollcore.h"
+
+#include <sys/poll.h>
+#include <unistd.h>
+
+static void
+update_free (gpointer data)
+{
+  GPollUpdate *update = data;
+
+  g_slice_free (GPollUpdate, update);
+}
+
+static void
+add_update (GHashTable *table,
+            gint        fd,
+            guint       old_events,
+            guint       new_events,
+            gpointer    user_data)
+{
+  GPollUpdate *update;
+
+  update = g_hash_table_lookup (table, GINT_TO_POINTER (fd));
+
+  if (!update)
+    {
+      update = g_slice_new (GPollUpdate);
+      update->old_events = old_events;
+      update->user_data = user_data;
+
+      g_hash_table_insert (table, GINT_TO_POINTER (fd), update);
+    }
+  else
+    {
+      g_assert_cmpint (update->new_events, ==, old_events);
+      g_assert (update->user_data == user_data);
+    }
+
+  update->new_events = new_events;
+
+  /* XXX: what to do if only user_data changed? */
+  if (update->new_events == update->old_events)
+    g_hash_table_remove (table, GINT_TO_POINTER (fd));
+}
+
+static gboolean
+is_ready (gint fd)
+{
+  struct pollfd pfd;
+
+  pfd.fd = fd;
+  pfd.events = POLLIN;
+
+  return poll (&pfd, 1, 0);
+}
+
+static gpointer
+kick_core (gpointer user_data)
+{
+  GPollCore *core = user_data;
+
+  g_usleep (G_TIME_SPAN_SECOND * 0.1);
+  g_poll_core_set_ready_time (core, 0);
+  return NULL;
+}
+
+static void
+test_pollcore (void)
+{
+  GPollEvent events[10];
+  GHashTable *updates;
+  GPollCore core;
+  GMutex lock;
+  gint pipes[2];
+  gint64 time;
+  gchar b;
+  gint fd;
+  gint r;
+
+  g_mutex_init (&lock);
+  g_mutex_lock (&lock);
+
+  updates = g_hash_table_new_full (NULL, NULL, NULL, update_free);
+
+  pipe (pipes);
+
+  g_poll_core_init (&core);
+
+  fd = g_poll_core_get_unix_fd (&core);
+
+  r = g_poll_core_update_and_collect (&core, NULL, NULL, events, G_N_ELEMENTS (events));
+  g_assert_cmpint (r, ==, 0);
+  g_assert (!is_ready (fd));
+
+  add_update (updates, pipes[1], 0, G_IO_IN, test_pollcore);
+  add_update (updates, pipes[1], G_IO_IN, G_IO_OUT, test_pollcore);
+  r = g_poll_core_update_and_collect (&core, updates, NULL, events, G_N_ELEMENTS (events));
+  g_assert_cmpint (r, ==, 1);
+  g_hash_table_remove_all (updates);
+  g_assert (is_ready (fd));
+
+  add_update (updates, pipes[0], 0, G_IO_IN, test_pollcore);
+  r = g_poll_core_update_and_collect (&core, updates, NULL, events, G_N_ELEMENTS (events));
+  g_assert_cmpint (r, ==, 1);
+  g_hash_table_remove_all (updates);
+  g_assert (is_ready (fd));
+
+  write (pipes[1], "x", 1);
+
+  r = g_poll_core_update_and_collect (&core, updates, NULL, events, G_N_ELEMENTS (events));
+  g_assert_cmpint (r, ==, 2);
+  g_assert (is_ready (fd));
+
+  read (pipes[0], &b, 1);
+
+  time = g_get_monotonic_time () + G_TIME_SPAN_SECOND * 0.1;
+
+  add_update (updates, pipes[1], G_IO_OUT, 0, NULL);
+  r = g_poll_core_update_and_collect (&core, updates, &time, events, G_N_ELEMENTS (events));
+  g_hash_table_remove_all (updates);
+  g_assert_cmpint (r, ==, 0);
+  g_assert (!is_ready (fd));
+
+  g_poll_core_wait (&core, &lock);
+  g_assert (is_ready (fd));
+
+  r = g_poll_core_update_and_collect (&core, NULL, NULL, events, G_N_ELEMENTS (events));
+  g_assert_cmpint (r, ==, 1);
+  g_assert (is_ready (fd));
+
+  time = -1;
+  r = g_poll_core_update_and_collect (&core, updates, &time, events, G_N_ELEMENTS (events));
+  g_assert_cmpint (r, ==, 0);
+  g_assert (!is_ready (fd));
+
+  g_thread_unref (g_thread_new ("kicker", kick_core, &core));
+
+  g_poll_core_wait (&core, &lock);
+  r = g_poll_core_update_and_collect (&core, NULL, NULL, events, G_N_ELEMENTS (events));
+  g_assert_cmpint (r, ==, 1);
+  g_assert (is_ready (fd));
+
+  r = g_poll_core_update_and_collect (&core, NULL, &time, events, G_N_ELEMENTS (events));
+  g_assert_cmpint (r, ==, 0);
+  g_assert (!is_ready (fd));
+
+  g_poll_core_clear (&core);
+
+  g_mutex_unlock (&lock);
+  g_mutex_clear (&lock);
+}
+
+int
+main (int argc, char **argv)
+{
+  g_test_init (&argc, &argv, NULL);
+
+  g_test_add_func ("/glib/pollcore", test_pollcore);
+
+  return g_test_run ();
+}


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]