[glib] gsocket: Add g_socket_receive_messages()
- From: Philip Withnall <pwithnall src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [glib] gsocket: Add g_socket_receive_messages()
- Date: Thu, 1 Oct 2015 13:17:17 +0000 (UTC)
commit f62cbfc02230f160e0fd0947d74c4a012eb6232c
Author: Philip Withnall <philip withnall collabora co uk>
Date: Fri Jun 12 08:47:37 2015 +0100
gsocket: Add g_socket_receive_messages()
Add support for receiving multiple messages with a single system call,
using recvmmsg() if available. Otherwise, fall back to looping over
g_socket_receive_message().
This adds new API, g_socket_receive_messages(), and corresponding unit
tests.
https://bugzilla.gnome.org/show_bug.cgi?id=751924
config.h.win32.in | 3 +
configure.ac | 2 +-
docs/reference/gio/gio-sections.txt | 1 +
gio/gsocket.c | 294 ++++++++++++++++++++++++++++++++++-
gio/gsocket.h | 7 +
gio/tests/socket.c | 163 +++++++++++++++++++
6 files changed, 467 insertions(+), 3 deletions(-)
---
diff --git a/config.h.win32.in b/config.h.win32.in
index 2fd650f..6c909ca 100644
--- a/config.h.win32.in
+++ b/config.h.win32.in
@@ -401,6 +401,9 @@
/* Define to 1 if you have the `sendmmsg` function */
/* #undef HAVE_SENDMMSG */
+/* Define to 1 if you have the `recvmmsg` function */
+/* #undef HAVE_RECVMMSG */
+
/* Define to 1 if you have the <selinux/selinux.h> header file. */
/* #undef HAVE_SELINUX_SELINUX_H */
diff --git a/configure.ac b/configure.ac
index 38dcbf6..10a001c 100644
--- a/configure.ac
+++ b/configure.ac
@@ -1029,7 +1029,7 @@ if $glib_failed ; then
AC_MSG_ERROR([Could not determine values for MSG_* constants])
fi
-AC_CHECK_FUNCS(getprotobyname_r endservent if_nametoindex if_indextoname sendmmsg)
+AC_CHECK_FUNCS(getprotobyname_r endservent if_nametoindex if_indextoname sendmmsg recvmmsg)
AS_IF([test $glib_native_win32 = yes], [
# <wspiapi.h> in the Windows SDK and in mingw-w64 has wrappers for
diff --git a/docs/reference/gio/gio-sections.txt b/docs/reference/gio/gio-sections.txt
index 54bb966..f63ddd3 100644
--- a/docs/reference/gio/gio-sections.txt
+++ b/docs/reference/gio/gio-sections.txt
@@ -2017,6 +2017,7 @@ g_socket_check_connect_result
g_socket_receive
g_socket_receive_from
g_socket_receive_message
+g_socket_receive_messages
g_socket_receive_with_blocking
g_socket_send
g_socket_send_to
diff --git a/gio/gsocket.c b/gio/gsocket.c
index 362fa5b..a0759d4 100644
--- a/gio/gsocket.c
+++ b/gio/gsocket.c
@@ -3,6 +3,7 @@
* Copyright (C) 2008 Christian Kellner, Samuel Cormier-Iijima
* Copyright © 2009 Codethink Limited
* Copyright © 2009 Red Hat, Inc
+ * Copyright © 2015 Collabora, Ltd.
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
@@ -21,6 +22,7 @@
* Samuel Cormier-Iijima <sciyoshi gmail com>
* Ryan Lortie <desrt desrt ca>
* Alexander Larsson <alexl redhat com>
+ * Philip Withnall <philip withnall collabora co uk>
*/
#include "config.h"
@@ -150,6 +152,14 @@ g_socket_receive_message_with_timeout (GSocket *socket,
GCancellable *cancellable,
GError **error);
static gint
+g_socket_receive_messages_with_timeout (GSocket *socket,
+ GInputMessage *messages,
+ guint num_messages,
+ gint flags,
+ gint64 timeout,
+ GCancellable *cancellable,
+ GError **error);
+static gint
g_socket_send_messages_with_timeout (GSocket *socket,
GOutputMessage *messages,
guint num_messages,
@@ -4744,6 +4754,286 @@ g_socket_receive_message_with_timeout (GSocket *socket,
}
/**
+ * g_socket_receive_messages:
+ * @socket: a #GSocket
+ * @messages: (array length=num_messages): an array of #GInputMessage structs
+ * @num_messages: the number of elements in @messages
+ * @flags: an int containing #GSocketMsgFlags flags for the overall operation
+ * @cancellable: (allow-none): a %GCancellable or %NULL
+ * @error: #GError for error reporting, or %NULL to ignore
+ *
+ * Receive multiple data messages from @socket in one go. This is the most
+ * complicated and fully-featured version of this call. For easier use, see
+ * g_socket_receive(), g_socket_receive_from(), and g_socket_receive_message().
+ *
+ * @messages must point to an array of #GInputMessage structs and
+ * @num_messages must be the length of this array. Each #GInputMessage
+ * contains a pointer to an array of #GInputVector structs describing the
+ * buffers that the data received in each message will be written to. Using
+ * multiple #GInputVectors is more memory-efficient than manually copying data
+ * out of a single buffer to multiple sources, and more system-call-efficient
+ * than making multiple calls to g_socket_receive(), such as in scenarios where
+ * a lot of data packets need to be received (e.g. high-bandwidth video
+ * streaming over RTP/UDP).
+ *
+ * @flags modify how all messages are received. The commonly available
+ * arguments for this are available in the #GSocketMsgFlags enum, but the
+ * values there are the same as the system values, and the flags
+ * are passed in as-is, so you can pass in system-specific flags too. These
+ * flags affect the overall receive operation. Flags affecting individual
+ * messages are returned in #GInputMessage.flags.
+ *
+ * The other members of #GInputMessage are treated as described in its
+ * documentation.
+ *
+ * If #GSocket:blocking is %TRUE the call will block until @num_messages have
+ * been received, or the end of the stream is reached.
+ *
+ * If #GSocket:blocking is %FALSE the call will return up to @num_messages
+ * without blocking, or %G_IO_ERROR_WOULD_BLOCK if no messages are queued in the
+ * operating system to be received.
+ *
+ * In blocking mode, if #GSocket:timeout is positive and is reached before any
+ * messages are received, %G_IO_ERROR_TIMED_OUT is returned, otherwise up to
+ * @num_messages are returned. (Note: This is effectively the
+ * behaviour of `MSG_WAITFORONE` with recvmmsg().)
+ *
+ * To be notified when messages are available, wait for the
+ * %G_IO_IN condition. Note though that you may still receive
+ * %G_IO_ERROR_WOULD_BLOCK from g_socket_receive_messages() even if you were
+ * previously notified of a %G_IO_IN condition.
+ *
+ * If the remote peer closes the connection, any messages queued in the
+ * operating system will be returned, and subsequent calls to
+ * g_socket_receive_messages() will return 0 (with no error set).
+ *
+ * On error -1 is returned and @error is set accordingly. An error will only
+ * be returned if zero messages could be received; otherwise the number of
+ * messages successfully received before the error will be returned.
+ *
+ * Returns: number of messages received, or -1 on error. Note that the number
+ * of messages received may be smaller than @num_messages if in non-blocking
+ * mode, if the peer closed the connection, or if @num_messages
+ * was larger than `UIO_MAXIOV` (1024), in which case the caller may re-try
+ * to receive the remaining messages.
+ *
+ * Since: 2.48
+ */
+gint
+g_socket_receive_messages (GSocket *socket,
+ GInputMessage *messages,
+ guint num_messages,
+ gint flags,
+ GCancellable *cancellable,
+ GError **error)
+{
+ if (!check_socket (socket, error) ||
+ !check_timeout (socket, error))
+ return -1;
+
+ return g_socket_receive_messages_with_timeout (socket, messages, num_messages,
+ flags,
+ socket->priv->blocking ? -1 : 0,
+ cancellable, error);
+}
+
+static gint
+g_socket_receive_messages_with_timeout (GSocket *socket,
+ GInputMessage *messages,
+ guint num_messages,
+ gint flags,
+ gint64 timeout,
+ GCancellable *cancellable,
+ GError **error)
+{
+ gint64 start_time;
+
+ g_return_val_if_fail (G_IS_SOCKET (socket), -1);
+ g_return_val_if_fail (num_messages == 0 || messages != NULL, -1);
+ g_return_val_if_fail (cancellable == NULL ||
+ G_IS_CANCELLABLE (cancellable), -1);
+ g_return_val_if_fail (error == NULL || *error == NULL, -1);
+
+ start_time = g_get_monotonic_time ();
+
+ if (!check_socket (socket, error))
+ return -1;
+
+ if (!check_timeout (socket, error))
+ return -1;
+
+ if (g_cancellable_set_error_if_cancelled (cancellable, error))
+ return -1;
+
+ if (num_messages == 0)
+ return 0;
+
+#if !defined (G_OS_WIN32) && defined (HAVE_RECVMMSG)
+ {
+ struct mmsghdr *msgvec;
+ guint i, num_received;
+
+#ifdef UIO_MAXIOV
+#define MAX_NUM_MESSAGES UIO_MAXIOV
+#else
+#define MAX_NUM_MESSAGES 1024
+#endif
+
+ if (num_messages > MAX_NUM_MESSAGES)
+ num_messages = MAX_NUM_MESSAGES;
+
+ msgvec = g_newa (struct mmsghdr, num_messages);
+
+ for (i = 0; i < num_messages; ++i)
+ {
+ GInputMessage *msg = &messages[i];
+ struct msghdr *msg_hdr = &msgvec[i].msg_hdr;
+
+ input_message_to_msghdr (msg, msg_hdr);
+ msgvec[i].msg_len = 0;
+ }
+
+ /* We always set the close-on-exec flag so we don't leak file
+ * descriptors into child processes. Note that gunixfdmessage.c
+ * will later call fcntl (fd, FD_CLOEXEC), but that isn't atomic.
+ */
+#ifdef MSG_CMSG_CLOEXEC
+ flags |= MSG_CMSG_CLOEXEC;
+#endif
+
+ for (num_received = 0; num_received < num_messages;)
+ {
+ gint ret;
+
+ /* We operate in non-blocking mode and handle the timeout ourselves. */
+ ret = recvmmsg (socket->priv->fd,
+ msgvec + num_received,
+ num_messages - num_received,
+ flags | G_SOCKET_DEFAULT_SEND_FLAGS, NULL);
+#ifdef MSG_CMSG_CLOEXEC
+ if (ret < 0 && get_socket_errno () == EINVAL)
+ {
+ /* We must be running on an old kernel. Call without the flag. */
+ flags &= ~(MSG_CMSG_CLOEXEC);
+ ret = recvmmsg (socket->priv->fd,
+ msgvec + num_received,
+ num_messages - num_received,
+ flags | G_SOCKET_DEFAULT_SEND_FLAGS, NULL);
+ }
+#endif
+
+ if (ret < 0)
+ {
+ int errsv = get_socket_errno ();
+
+ if (errsv == EINTR)
+ continue;
+
+ if (timeout != 0 &&
+ (errsv == EWOULDBLOCK ||
+ errsv == EAGAIN))
+ {
+ if (!block_on_timeout (socket, G_IO_IN, timeout, start_time,
+ cancellable, error))
+ {
+ if (num_received > 0)
+ {
+ g_clear_error (error);
+ break;
+ }
+
+ return -1;
+ }
+
+ continue;
+ }
+
+ /* If any messages were successfully received, do not error. */
+ if (num_received > 0)
+ break;
+
+ socket_set_error_lazy (error, errsv,
+ _("Error receiving message: %s"));
+
+ return -1;
+ }
+ else if (ret == 0)
+ {
+ /* EOS. */
+ break;
+ }
+
+ num_received += ret;
+ }
+
+ for (i = 0; i < num_received; ++i)
+ {
+ input_message_from_msghdr (&msgvec[i].msg_hdr, &messages[i], socket);
+ messages[i].bytes_received = msgvec[i].msg_len;
+ }
+
+ return num_received;
+ }
+#else
+ {
+ guint i;
+ gint64 wait_timeout;
+
+ wait_timeout = timeout;
+
+ for (i = 0; i < num_messages; i++)
+ {
+ GInputMessage *msg = &messages[i];
+ gssize len;
+ GError *msg_error = NULL;
+
+ msg->flags = flags; /* in-out parameter */
+
+ len = g_socket_receive_message_with_timeout (socket,
+ msg->address,
+ msg->vectors,
+ msg->num_vectors,
+ msg->control_messages,
+ (gint *) msg->num_control_messages,
+ &msg->flags,
+ wait_timeout,
+ cancellable,
+ &msg_error);
+
+ /* check if we've timed out or how much time to wait at most */
+ if (timeout > 0)
+ {
+ gint64 elapsed = g_get_monotonic_time () - start_time;
+ wait_timeout = MAX (timeout - elapsed, 1);
+ }
+
+ if (len >= 0)
+ msg->bytes_received = len;
+
+ if (i != 0 &&
+ (g_error_matches (msg_error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK) ||
+ g_error_matches (msg_error, G_IO_ERROR, G_IO_ERROR_TIMED_OUT)))
+ {
+ g_clear_error (&msg_error);
+ break;
+ }
+
+ if (msg_error != NULL)
+ {
+ g_propagate_error (error, msg_error);
+ return -1;
+ }
+
+ if (len == 0)
+ break;
+ }
+
+ return i;
+ }
+#endif
+}
+
+/**
* g_socket_receive_message:
* @socket: a #GSocket
* @address: (out) (allow-none): a pointer to a #GSocketAddress
@@ -4758,8 +5048,8 @@ g_socket_receive_message_with_timeout (GSocket *socket,
* @cancellable: (allow-none): a %GCancellable or %NULL
* @error: a #GError pointer, or %NULL
*
- * Receive data from a socket. This is the most complicated and
- * fully-featured version of this call. For easier use, see
+ * Receive data from a socket. For receiving multiple messages, see
+ * g_socket_receive_messages(); for easier use, see
* g_socket_receive() and g_socket_receive_from().
*
* If @address is non-%NULL then @address will be set equal to the
diff --git a/gio/gsocket.h b/gio/gsocket.h
index dd866e7..4e3fb6a 100644
--- a/gio/gsocket.h
+++ b/gio/gsocket.h
@@ -237,6 +237,13 @@ gssize g_socket_send_message (GSocket
GCancellable *cancellable,
GError **error);
+GLIB_AVAILABLE_IN_2_48
+gint g_socket_receive_messages (GSocket *socket,
+ GInputMessage *messages,
+ guint num_messages,
+ gint flags,
+ GCancellable *cancellable,
+ GError **error);
GLIB_AVAILABLE_IN_2_44
gint g_socket_send_messages (GSocket *socket,
GOutputMessage *messages,
diff --git a/gio/tests/socket.c b/gio/tests/socket.c
index 3d8e642..15fb61a 100644
--- a/gio/tests/socket.c
+++ b/gio/tests/socket.c
@@ -588,7 +588,9 @@ test_ip_sync_dgram (GSocketFamily family)
{
GOutputMessage m[3] = { { NULL, }, };
+ GInputMessage im[3] = { { NULL, }, };
GOutputVector v[7] = { { NULL, }, };
+ GInputVector iv[7] = { { NULL, }, };
v[0].buffer = testbuf2 + 0;
v[0].size = 3;
@@ -605,6 +607,21 @@ test_ip_sync_dgram (GSocketFamily family)
v[6].buffer = testbuf2 + 3 + 5 + 6 + 2 + 1;
v[6].size = strlen (testbuf2) - (3 + 5 + 6 + 2 + 1);
+ iv[0].buffer = buf + 0;
+ iv[0].size = 3;
+ iv[1].buffer = buf + 3;
+ iv[1].size = 5;
+ iv[2].buffer = buf + 3 + 5;
+ iv[2].size = 0;
+ iv[3].buffer = buf + 3 + 5;
+ iv[3].size = 6;
+ iv[4].buffer = buf + 3 + 5 + 6;
+ iv[4].size = 2;
+ iv[5].buffer = buf + 3 + 5 + 6 + 2;
+ iv[5].size = 1;
+ iv[6].buffer = buf + 3 + 5 + 6 + 2 + 1;
+ iv[6].size = sizeof (buf) - (3 + 5 + 6 + 2 + 1);
+
len = g_socket_send_message (client, dest_addr, v, G_N_ELEMENTS (v), NULL, 0, 0, NULL, &error);
g_assert_no_error (error);
g_assert_cmpint (len, ==, strlen (testbuf2));
@@ -655,6 +672,37 @@ test_ip_sync_dgram (GSocketFamily family)
m[1].bytes_sent = 0;
m[2].bytes_sent = 0;
+ /* now try receiving multiple messages */
+ len = g_socket_send_messages (client, m, G_N_ELEMENTS (m), 0, NULL, &error);
+ g_assert_no_error (error);
+ g_assert_cmpint (len, ==, G_N_ELEMENTS (m));
+ g_assert_cmpint (m[0].bytes_sent, ==, 3);
+ g_assert_cmpint (m[1].bytes_sent, ==, 17);
+ g_assert_cmpint (m[2].bytes_sent, ==, v[6].size);
+
+ im[0].vectors = &iv[0];
+ im[0].num_vectors = 1;
+ im[1].vectors = &iv[0];
+ im[1].num_vectors = 6;
+ im[2].vectors = &iv[6];
+ im[2].num_vectors = 1;
+
+ memset (buf, 0, sizeof (buf));
+ len = g_socket_receive_messages (client, im, G_N_ELEMENTS (im), 0,
+ NULL, &error);
+ g_assert_no_error (error);
+ g_assert_cmpint (len, ==, G_N_ELEMENTS (im));
+
+ g_assert_cmpuint (im[0].bytes_received, ==, 3);
+ /* v[0].size + v[1].size + v[2].size + v[3].size + v[4].size + v[5].size */
+ g_assert_cmpuint (im[1].bytes_received, ==, 17);
+ g_assert_cmpuint (im[2].bytes_received, ==, v[6].size);
+
+ /* reset since we're re-using the message structs */
+ m[0].bytes_sent = 0;
+ m[1].bytes_sent = 0;
+ m[2].bytes_sent = 0;
+
/* now try to generate an error by omitting the destination address on [1] */
m[1].address = NULL;
len = g_socket_send_messages (client, m, G_N_ELEMENTS (m), 0, NULL, &error);
@@ -706,6 +754,119 @@ test_ipv6_sync_dgram (void)
}
static gpointer
+cancellable_thread_cb (gpointer data)
+{
+ GCancellable *cancellable = data;
+
+ g_usleep (0.1 * G_USEC_PER_SEC);
+ g_cancellable_cancel (cancellable);
+ g_object_unref (cancellable);
+
+ return NULL;
+}
+
+static void
+test_ip_sync_dgram_timeouts (GSocketFamily family)
+{
+ GError *error = NULL;
+ GSocket *client = NULL;
+ GCancellable *cancellable = NULL;
+ GThread *cancellable_thread = NULL;
+ gssize len;
+
+ client = g_socket_new (family,
+ G_SOCKET_TYPE_DATAGRAM,
+ G_SOCKET_PROTOCOL_DEFAULT,
+ &error);
+ g_assert_no_error (error);
+
+ g_assert_cmpint (g_socket_get_family (client), ==, family);
+ g_assert_cmpint (g_socket_get_socket_type (client), ==, G_SOCKET_TYPE_DATAGRAM);
+ g_assert_cmpint (g_socket_get_protocol (client), ==, G_SOCKET_PROTOCOL_DEFAULT);
+
+ /* No overall timeout: test the per-operation timeouts instead. */
+ g_socket_set_timeout (client, 0);
+
+ cancellable = g_cancellable_new ();
+
+ /* Check for timeouts when no server is running. */
+ {
+ gint64 start_time;
+ GInputMessage im = { NULL, };
+ GInputVector iv = { NULL, };
+ guint8 buf[128];
+
+ iv.buffer = buf;
+ iv.size = sizeof (buf);
+
+ im.vectors = &iv;
+ im.num_vectors = 1;
+
+ memset (buf, 0, sizeof (buf));
+
+ /* Try a non-blocking read. */
+ g_socket_set_blocking (client, FALSE);
+ len = g_socket_receive_messages (client, &im, 1, 0 /* flags */,
+ NULL, &error);
+ g_assert_error (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK);
+ g_assert_cmpint (len, ==, -1);
+ g_clear_error (&error);
+
+ /* Try a timeout read. Can’t really validate the time taken more than
+ * checking it’s positive. */
+ g_socket_set_timeout (client, 1);
+ g_socket_set_blocking (client, TRUE);
+ start_time = g_get_monotonic_time ();
+ len = g_socket_receive_messages (client, &im, 1, 0 /* flags */,
+ NULL, &error);
+ g_assert_error (error, G_IO_ERROR, G_IO_ERROR_TIMED_OUT);
+ g_assert_cmpint (len, ==, -1);
+ g_assert_cmpint (g_get_monotonic_time () - start_time, >, 0);
+ g_clear_error (&error);
+
+ /* Try a blocking read, cancelled from another thread. */
+ g_socket_set_timeout (client, 0);
+ cancellable_thread = g_thread_new ("cancellable",
+ cancellable_thread_cb,
+ g_object_ref (cancellable));
+
+ start_time = g_get_monotonic_time ();
+ len = g_socket_receive_messages (client, &im, 1, 0 /* flags */,
+ cancellable, &error);
+ g_assert_error (error, G_IO_ERROR, G_IO_ERROR_CANCELLED);
+ g_assert_cmpint (len, ==, -1);
+ g_assert_cmpint (g_get_monotonic_time () - start_time, >, 0);
+ g_clear_error (&error);
+
+ g_thread_join (cancellable_thread);
+ }
+
+ g_socket_close (client, &error);
+ g_assert_no_error (error);
+
+ g_object_unref (client);
+ g_object_unref (cancellable);
+}
+
+static void
+test_ipv4_sync_dgram_timeouts (void)
+{
+ test_ip_sync_dgram_timeouts (G_SOCKET_FAMILY_IPV4);
+}
+
+static void
+test_ipv6_sync_dgram_timeouts (void)
+{
+ if (!ipv6_supported)
+ {
+ g_test_skip ("No support for IPv6");
+ return;
+ }
+
+ test_ip_sync_dgram_timeouts (G_SOCKET_FAMILY_IPV6);
+}
+
+static gpointer
graceful_server_thread (gpointer user_data)
{
IPTestData *data = user_data;
@@ -1447,7 +1608,9 @@ main (int argc,
g_test_add_func ("/socket/ipv6_sync", test_ipv6_sync);
g_test_add_func ("/socket/ipv6_async", test_ipv6_async);
g_test_add_func ("/socket/ipv4_sync/datagram", test_ipv4_sync_dgram);
+ g_test_add_func ("/socket/ipv4_sync/datagram/timeouts", test_ipv4_sync_dgram_timeouts);
g_test_add_func ("/socket/ipv6_sync/datagram", test_ipv6_sync_dgram);
+ g_test_add_func ("/socket/ipv6_sync/datagram/timeouts", test_ipv6_sync_dgram_timeouts);
#if defined (IPPROTO_IPV6) && defined (IPV6_V6ONLY)
g_test_add_func ("/socket/ipv6_v4mapped", test_ipv6_v4mapped);
#endif
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]