[glib/wip/nacho/gpoll] win32 gpoll: overcome the 64 handles limit
- From: Ignacio Casal Quinteiro <icq src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [glib/wip/nacho/gpoll] win32 gpoll: overcome the 64 handles limit
- Date: Fri, 14 Dec 2018 11:52:22 +0000 (UTC)
commit a24915a16407fb3384c1dd4f6aba4bd9db988c56
Author: Ignacio Casal Quinteiro <qignacio amazon com>
Date: Fri Dec 14 12:25:11 2018 +0100
win32 gpoll: overcome the 64 handles limit
This is a new polling method allowing to poll more than 64 handles
based on the glib one.
When we reach the limit of 64 we create a thread and we poll
on that thread for a batch of handles this way we overcome the limit.
https://gitlab.gnome.org/GNOME/glib/issues/1071
glib/gpoll.c | 303 +++++++++++++++++++++++++++++++++++++++++++++--------------
1 file changed, 233 insertions(+), 70 deletions(-)
---
diff --git a/glib/gpoll.c b/glib/gpoll.c
index 5f46906ba..4b5c92b80 100644
--- a/glib/gpoll.c
+++ b/glib/gpoll.c
@@ -72,6 +72,7 @@
#ifdef G_OS_WIN32
#define STRICT
#include <windows.h>
+#include <process.h>
#endif /* G_OS_WIN32 */
#include "gpoll.h"
@@ -130,6 +131,7 @@ g_poll (GPollFD *fds,
static int
poll_rest (GPollFD *msg_fd,
+ GPollFD *stop_fd,
HANDLE *handles,
GPollFD *handle_to_fd[],
gint nhandles,
@@ -145,23 +147,25 @@ poll_rest (GPollFD *msg_fd,
* -> Use MsgWaitForMultipleObjectsEx
*/
if (_g_main_poll_debug)
- g_print (" MsgWaitForMultipleObjectsEx(%d, %d)\n", nhandles, timeout);
+ g_print (" MsgWaitForMultipleObjectsEx(%d, %d)\n", nhandles, timeout);
ready = MsgWaitForMultipleObjectsEx (nhandles, handles, timeout,
- QS_ALLINPUT, MWMO_ALERTABLE);
+ QS_ALLINPUT, MWMO_ALERTABLE);
if (ready == WAIT_FAILED)
- {
- gchar *emsg = g_win32_error_message (GetLastError ());
- g_warning ("MsgWaitForMultipleObjectsEx failed: %s", emsg);
- g_free (emsg);
- }
+ {
+ gchar *emsg;
+
+ emsg = g_win32_error_message (GetLastError ());
+ g_warning ("MsgWaitForMultipleObjectsEx failed: %s", emsg);
+ g_free (emsg);
+ }
}
else if (nhandles == 0)
{
/* No handles to wait for, just the timeout */
if (timeout == INFINITE)
- ready = WAIT_FAILED;
+ ready = WAIT_FAILED;
else
{
/* Wait for the current process to die, more efficient than SleepEx(). */
@@ -175,28 +179,30 @@ poll_rest (GPollFD *msg_fd,
* -> Use WaitForMultipleObjectsEx
*/
if (_g_main_poll_debug)
- g_print (" WaitForMultipleObjectsEx(%d, %d)\n", nhandles, timeout);
+ g_print (" WaitForMultipleObjectsEx(%d, %d)\n", nhandles, timeout);
ready = WaitForMultipleObjectsEx (nhandles, handles, FALSE, timeout, TRUE);
if (ready == WAIT_FAILED)
- {
- gchar *emsg = g_win32_error_message (GetLastError ());
- g_warning ("WaitForMultipleObjectsEx failed: %s", emsg);
- g_free (emsg);
- }
+ {
+ gchar *emsg;
+
+ emsg = g_win32_error_message (GetLastError ());
+ g_warning ("WaitForMultipleObjectsEx failed: %s", emsg);
+ g_free (emsg);
+ }
}
if (_g_main_poll_debug)
g_print (" wait returns %ld%s\n",
- ready,
- (ready == WAIT_FAILED ? " (WAIT_FAILED)" :
- (ready == WAIT_TIMEOUT ? " (WAIT_TIMEOUT)" :
- (msg_fd != NULL && ready == WAIT_OBJECT_0 + nhandles ? " (msg)" : ""))));
+ ready,
+ (ready == WAIT_FAILED ? " (WAIT_FAILED)" :
+ (ready == WAIT_TIMEOUT ? " (WAIT_TIMEOUT)" :
+ (msg_fd != NULL && ready == WAIT_OBJECT_0 + nhandles ? " (msg)" : ""))));
if (ready == WAIT_FAILED)
return -1;
else if (ready == WAIT_TIMEOUT ||
- ready == WAIT_IO_COMPLETION)
+ ready == WAIT_IO_COMPLETION)
return 0;
else if (msg_fd != NULL && ready == WAIT_OBJECT_0 + nhandles)
{
@@ -206,118 +212,275 @@ poll_rest (GPollFD *msg_fd,
* with just noticing we have messages waiting.
*/
if (timeout != 0 || nhandles == 0)
- return 1;
+ return 1;
/* If no timeout and handles to poll, recurse to poll them,
* too.
*/
- recursed_result = poll_rest (NULL, handles, handle_to_fd, nhandles, 0);
+ recursed_result = poll_rest (NULL, stop_fd, handles, handle_to_fd, nhandles, 0);
return (recursed_result == -1) ? -1 : 1 + recursed_result;
}
else if (ready >= WAIT_OBJECT_0 && ready < WAIT_OBJECT_0 + nhandles)
{
+ int retval;
+
f = handle_to_fd[ready - WAIT_OBJECT_0];
f->revents = f->events;
if (_g_main_poll_debug)
g_print (" got event %p\n", (HANDLE) f->fd);
+ /* Do not count the stop_fd */
+ retval = (f != stop_fd) ? 1 : 0;
+
/* If no timeout and polling several handles, recurse to poll
* the rest of them.
*/
if (timeout == 0 && nhandles > 1)
- {
- /* Poll the handles with index > ready */
- HANDLE *shorter_handles;
+ {
+ /* Poll the handles with index > ready */
+ HANDLE *shorter_handles;
GPollFD **shorter_handle_to_fd;
- gint shorter_nhandles;
+ gint shorter_nhandles;
shorter_handles = &handles[ready - WAIT_OBJECT_0 + 1];
shorter_handle_to_fd = &handle_to_fd[ready - WAIT_OBJECT_0 + 1];
shorter_nhandles = nhandles - (ready - WAIT_OBJECT_0 + 1);
- recursed_result = poll_rest (NULL, shorter_handles, shorter_handle_to_fd, shorter_nhandles, 0);
- return (recursed_result == -1) ? -1 : 1 + recursed_result;
- }
- return 1;
+ recursed_result = poll_rest (NULL, stop_fd, shorter_handles, shorter_handle_to_fd,
shorter_nhandles, 0);
+ return (recursed_result == -1) ? -1 : retval + recursed_result;
+ }
+ return retval;
}
return 0;
}
-gint
-g_poll (GPollFD *fds,
- guint nfds,
- gint timeout)
+typedef struct
{
HANDLE handles[MAXIMUM_WAIT_OBJECTS];
GPollFD *handle_to_fd[MAXIMUM_WAIT_OBJECTS];
- GPollFD *msg_fd = NULL;
- GPollFD *f;
- gint nhandles = 0;
+ GPollFD *msg_fd;
+ GPollFD *stop_fd;
+ gint nhandles;
+ gint timeout;
+} GWin32PollThreadData;
+
+static gint
+poll_single_thread (GWin32PollThreadData *data)
+{
int retval;
- if (_g_main_poll_debug)
- g_print ("g_poll: waiting for");
+ /* Polling for several things? */
+ if (data->nhandles > 1 || (data->nhandles > 0 && data->msg_fd != NULL))
+ {
+ /* First check if one or several of them are immediately
+ * available
+ */
+ retval = poll_rest (data->msg_fd, data->stop_fd, data->handles, data->handle_to_fd, data->nhandles, 0);
+
+ /* If not, and we have a significant timeout, poll again with
+ * timeout then. Note that this will return indication for only
+ * one event, or only for messages.
+ */
+ if (retval == 0 && (data->timeout == INFINITE || data->timeout > 0))
+ retval = poll_rest (data->msg_fd, data->stop_fd, data->handles, data->handle_to_fd, data->nhandles,
data->timeout);
+ }
+ else
+ {
+ /* Just polling for one thing, so no need to check first if
+ * available immediately
+ */
+ retval = poll_rest (data->msg_fd, data->stop_fd, data->handles, data->handle_to_fd, data->nhandles,
data->timeout);
+ }
+
+ return retval;
+}
+
+static void
+fill_poll_thread_data (GPollFD *fds,
+ guint nfds,
+ gint timeout,
+ GPollFD *stop_fd,
+ GWin32PollThreadData *data)
+{
+ GPollFD *f;
+
+ data->timeout = timeout;
+
+ if (stop_fd != NULL)
+ {
+ if (_g_main_poll_debug)
+ g_print (" Stop FD: %p", (HANDLE) stop_fd->fd);
+
+ data->stop_fd = stop_fd;
+ data->handle_to_fd[data->nhandles] = stop_fd;
+ data->handles[data->nhandles++] = (HANDLE) stop_fd->fd;
+ }
for (f = fds; f < &fds[nfds]; ++f)
{
+ if ((data->nhandles == MAXIMUM_WAIT_OBJECTS) ||
+ (data->msg_fd != NULL && (data->nhandles == MAXIMUM_WAIT_OBJECTS - 1)))
+ {
+ g_warning ("Too many handles to wait for!");
+ break;
+ }
+
if (f->fd == G_WIN32_MSG_HANDLE && (f->events & G_IO_IN))
{
- if (_g_main_poll_debug && msg_fd == NULL)
+ if (_g_main_poll_debug && data->msg_fd == NULL)
g_print (" MSG");
- msg_fd = f;
+ data->msg_fd = f;
}
else if (f->fd > 0)
{
- if (nhandles == MAXIMUM_WAIT_OBJECTS)
- {
- g_warning ("Too many handles to wait for!");
- break;
- }
- else
- {
- if (_g_main_poll_debug)
- g_print (" %p", (HANDLE) f->fd);
- handle_to_fd[nhandles] = f;
- handles[nhandles++] = (HANDLE) f->fd;
- }
+ if (_g_main_poll_debug)
+ g_print (" %p", (HANDLE) f->fd);
+ data->handle_to_fd[data->nhandles] = f;
+ data->handles[data->nhandles++] = (HANDLE) f->fd;
}
+
f->revents = 0;
}
+}
- if (_g_main_poll_debug)
- g_print ("\n");
+static guint __stdcall
+poll_thread_run (gpointer user_data)
+{
+ GWin32PollThreadData *data = user_data;
+
+ /* Docs say that it is safer to call _endthreadex by our own */
+ _endthreadex (poll_single_thread (data));
+
+ g_assert_not_reached ();
+
+ return 0;
+}
+
+/* One slot for a possible msg object and another for the stop event */
+#define MAXIMUM_WAIT_OBJECTS_PER_THREAD (MAXIMUM_WAIT_OBJECTS - 2)
+
+gint
+g_poll (GPollFD *fds,
+ guint nfds,
+ gint timeout)
+{
+ guint nthreads, threads_remain;
+ HANDLE thread_handles[MAXIMUM_WAIT_OBJECTS];
+ GWin32PollThreadData *threads_data;
+ GPollFD stop_event = { 0, };
+ GPollFD *f;
+ guint i, fds_idx = 0;
+ DWORD ready;
+ DWORD thread_retval;
+ int retval;
+ GPollFD *msg_fd = NULL;
if (timeout == -1)
timeout = INFINITE;
- /* Polling for several things? */
- if (nhandles > 1 || (nhandles > 0 && msg_fd != NULL))
+ /* Simple case without extra threads */
+ if (nfds <= MAXIMUM_WAIT_OBJECTS)
{
- /* First check if one or several of them are immediately
- * available
- */
- retval = poll_rest (msg_fd, handles, handle_to_fd, nhandles, 0);
+ GWin32PollThreadData data = { 0, };
- /* If not, and we have a significant timeout, poll again with
- * timeout then. Note that this will return indication for only
- * one event, or only for messages.
- */
- if (retval == 0 && (timeout == INFINITE || timeout > 0))
- retval = poll_rest (msg_fd, handles, handle_to_fd, nhandles, timeout);
+ if (_g_main_poll_debug)
+ g_print ("g_poll: waiting for");
+
+ fill_poll_thread_data (fds, nfds, timeout, NULL, &data);
+
+ if (_g_main_poll_debug)
+ g_print ("\n");
+
+ retval = poll_single_thread (&data);
+ if (retval == -1)
+ for (f = fds; f < &fds[nfds]; ++f)
+ f->revents = 0;
+
+ return retval;
+ }
+
+ if (_g_main_poll_debug)
+ g_print ("g_poll: polling with threads\n");
+
+ nthreads = nfds / MAXIMUM_WAIT_OBJECTS_PER_THREAD;
+ threads_remain = nfds % MAXIMUM_WAIT_OBJECTS_PER_THREAD;
+ if (threads_remain > 0)
+ nthreads++;
+
+ if (nthreads > MAXIMUM_WAIT_OBJECTS)
+ {
+ g_warning ("Too many handles to wait for in threads!");
+ nthreads = MAXIMUM_WAIT_OBJECTS;
+ }
+
+#if GLIB_SIZEOF_VOID_P == 8
+ stop_event.fd = (gint64)CreateEventW (NULL, TRUE, FALSE, NULL);
+#else
+ stop_event.fd = (gint)CreateEventW (NULL, TRUE, FALSE, NULL);
+#endif
+ stop_event.events = G_IO_IN;
+
+ threads_data = g_new0 (GWin32PollThreadData, nthreads);
+ for (i = 0; i < nthreads; i++)
+ {
+ guint thread_fds;
+ guint ignore;
+
+ if (i == (nthreads - 1) && threads_remain > 0)
+ thread_fds = threads_remain;
+ else
+ thread_fds = MAXIMUM_WAIT_OBJECTS_PER_THREAD;
+
+ fill_poll_thread_data (fds + fds_idx, thread_fds, timeout, &stop_event, &threads_data[i]);
+ fds_idx += thread_fds;
+
+ /* We must poll for messages from the same thread, so poll it along with the threads */
+ if (threads_data[i].msg_fd != NULL)
+ {
+ msg_fd = threads_data[i].msg_fd;
+ threads_data[i].msg_fd = NULL;
+ }
+
+ thread_handles[i] = (HANDLE) _beginthreadex (NULL, 0, poll_thread_run, &threads_data[i], 0, &ignore);
}
+
+ /* Wait for at least one thread to return */
+ if (msg_fd != NULL)
+ ready = MsgWaitForMultipleObjectsEx (nthreads, thread_handles, timeout,
+ QS_ALLINPUT, MWMO_ALERTABLE);
else
+ ready = WaitForMultipleObjects (nthreads, thread_handles, timeout > 0, timeout);
+
+ /* Signal the stop in case any of the threads did not stop yet */
+ SetEvent ((HANDLE)stop_event.fd);
+
+ /* Wait for the rest of the threads to finish */
+ WaitForMultipleObjects (nthreads, thread_handles, TRUE, INFINITE);
+
+ /* The return value of all the threads give us all the fds that changed state */
+ retval = 0;
+ if (msg_fd != NULL && ready == WAIT_OBJECT_0 + nthreads)
{
- /* Just polling for one thing, so no need to check first if
- * available immediately
- */
- retval = poll_rest (msg_fd, handles, handle_to_fd, nhandles, timeout);
+ msg_fd->revents |= G_IO_IN;
+ retval = 1;
+ }
+
+ for (i = 0; i < nthreads; i++)
+ {
+ if (GetExitCodeThread (thread_handles[i], &thread_retval))
+ retval = retval == -1 ? -1 : thread_retval == -1 ? -1 : retval + thread_retval;
+
+ CloseHandle (thread_handles[i]);
}
if (retval == -1)
for (f = fds; f < &fds[nfds]; ++f)
f->revents = 0;
+ g_free (threads_data);
+ CloseHandle ((HANDLE)stop_event.fd);
+
return retval;
}
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]