[glib] gsocket: add g_socket_send_messages()
- From: Tim-Philipp Müller <tpm src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [glib] gsocket: add g_socket_send_messages()
- Date: Thu, 11 Dec 2014 15:23:32 +0000 (UTC)
commit fff5c7cd631f7eefeb93412b1d9d90517c4b895e
Author: Tim-Philipp Müller <tim centricular com>
Date: Thu Jun 12 18:16:45 2014 +0100
gsocket: add g_socket_send_messages()
Allows sending of multiple messages (packets, datagrams)
in one go using sendmmsg(), thus drastically reducing the
number of syscalls when sending out a lot of data, or when
sending out the same data to multiple recipients.
https://bugzilla.gnome.org/show_bug.cgi?id=719646
configure.ac | 2 +-
gio/giotypes.h | 35 +++++++
gio/gsocket.c | 275 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++
gio/gsocket.h | 9 ++
4 files changed, 320 insertions(+), 1 deletions(-)
---
diff --git a/configure.ac b/configure.ac
index ced92b2..aad3fa9 100644
--- a/configure.ac
+++ b/configure.ac
@@ -1030,7 +1030,7 @@ if $glib_failed ; then
AC_MSG_ERROR([Could not determine values for MSG_* constants])
fi
-AC_CHECK_FUNCS(getprotobyname_r endservent if_nametoindex if_indextoname)
+AC_CHECK_FUNCS(getprotobyname_r endservent if_nametoindex if_indextoname sendmmsg)
AS_IF([test $glib_native_win32 = yes], [
# <wspiapi.h> in the Windows SDK and in mingw-w64 has wrappers for
diff --git a/gio/giotypes.h b/gio/giotypes.h
index de62cef..7f93145 100644
--- a/gio/giotypes.h
+++ b/gio/giotypes.h
@@ -425,6 +425,41 @@ struct _GOutputVector {
gsize size;
};
+/**
+ * GOutputMessage:
+ * @address: (allow-none): a #GSocketAddress, or %NULL
+ * @vectors: pointer to an array of output vectors
+ * @num_vectors: the number of output vectors pointed to by @vectors.
+ * @bytes_sent: initialize to 0. Will be set to the number of bytes
+ * that have been sent
+ * @control_messages: (array length=num_control_messages) (allow-none): a pointer
+ * to an array of #GSocketControlMessages, or %NULL.
+ * @num_control_messages: number of elements in @control_messages.
+ *
+ * Structure used for scatter/gather data output when sending multiple
+ * messages or packets in one go. You generally pass in an array of
+ * #GOutputVectors and the operation will use all the buffers as if they
+ * were one buffer.
+ *
+ * If @address is %NULL then the message is sent to the default receiver
+ * (as previously set by g_socket_connect()).
+ *
+ * Since: 2.44
+ */
+typedef struct _GOutputMessage GOutputMessage;
+
+struct _GOutputMessage {
+ GSocketAddress *address;
+
+ GOutputVector *vectors;
+ guint num_vectors;
+
+ guint bytes_sent;
+
+ GSocketControlMessage **control_messages;
+ guint num_control_messages;
+};
+
typedef struct _GCredentials GCredentials;
typedef struct _GUnixCredentialsMessage GUnixCredentialsMessage;
typedef struct _GUnixFDList GUnixFDList;
diff --git a/gio/gsocket.c b/gio/gsocket.c
index 4d863ce..d9e135d 100644
--- a/gio/gsocket.c
+++ b/gio/gsocket.c
@@ -3987,6 +3987,281 @@ g_socket_send_message (GSocket *socket,
#endif
}
+/**
+ * g_socket_send_messages:
+ * @socket: a #GSocket
+ * @messages: (array length=num_messages): an array of #GOutputMessage structs
+ * @num_messages: the number of elements in @messages
+ * @flags: an int containing #GSocketMsgFlags flags
+ * @cancellable: (allow-none): a %GCancellable or %NULL
+ * @error: #GError for error reporting, or %NULL to ignore.
+ *
+ * Send multiple data messages from @socket in one go. This is the most
+ * complicated and fully-featured version of this call. For easier use, see
+ * g_socket_send(), g_socket_send_to(), and g_socket_send_message().
+ *
+ * @messages must point to an array of #GOutputMessage structs and
+ * @num_messages must be the length of this array. Each #GOutputMessage
+ * contains an address to send the data to, and a pointer to an array of
+ * #GOutputVector structs to describe the buffers that the data to be sent
+ * for each message will be gathered from. Using multiple #GOutputVectors is
+ * more memory-efficient than manually copying data from multiple sources
+ * into a single buffer, and more network-efficient than making multiple
+ * calls to g_socket_send(). Sending multiple messages in one go avoids the
+ * overhead of making a lot of syscalls in scenarios where a lot of data
+ * packets need to be sent (e.g. high-bandwidth video streaming over RTP/UDP),
+ * or where the same data needs to be sent to multiple recipients.
+ *
+ * @flags modify how the message is sent. The commonly available arguments
+ * for this are available in the #GSocketMsgFlags enum, but the
+ * values there are the same as the system values, and the flags
+ * are passed in as-is, so you can pass in system-specific flags too.
+ *
+ * If the socket is in blocking mode the call will block until there is
+ * space for all the data in the socket queue. If there is no space available
+ * and the socket is in non-blocking mode a %G_IO_ERROR_WOULD_BLOCK error
+ * will be returned if no data was written at all, otherwise the number of
+ * messages sent will be returned. To be notified when space is available,
+ * wait for the %G_IO_OUT condition. Note though that you may still receive
+ * %G_IO_ERROR_WOULD_BLOCK from g_socket_send() even if you were previously
+ * notified of a %G_IO_OUT condition. (On Windows in particular, this is
+ * very common due to the way the underlying APIs work.)
+ *
+ * On error -1 is returned and @error is set accordingly.
+ *
+ * Returns: number of messages sent, or -1 on error. Note that the number of
+ * messages sent may be smaller than @num_messages if the socket is
+ * non-blocking or if @num_messages was larger than UIO_MAXIOV (1024),
+ * in which case the caller may re-try to send the remaining messages.
+ *
+ * Since: 2.44
+ */
+gint
+g_socket_send_messages (GSocket *socket,
+ GOutputMessage *messages,
+ guint num_messages,
+ gint flags,
+ GCancellable *cancellable,
+ GError **error)
+{
+ g_return_val_if_fail (G_IS_SOCKET (socket), -1);
+ g_return_val_if_fail (num_messages == 0 || messages != NULL, -1);
+ g_return_val_if_fail (cancellable == NULL || G_IS_CANCELLABLE (cancellable), -1);
+ g_return_val_if_fail (error == NULL || *error == NULL, -1);
+
+ if (!check_socket (socket, error))
+ return -1;
+
+ if (!check_timeout (socket, error))
+ return -1;
+
+ if (g_cancellable_set_error_if_cancelled (cancellable, error))
+ return -1;
+
+ if (num_messages == 0)
+ return 0;
+
+#if !defined (G_OS_WIN32) && defined (HAVE_SENDMMSG)
+ {
+ struct mmsghdr *msgvec;
+ gint i, num_sent, result, max_sent;
+
+#ifdef UIO_MAXIOV
+#define MAX_NUM_MESSAGES UIO_MAXIOV
+#else
+#define MAX_NUM_MESSAGES 1024
+#endif
+
+ if (num_messages > MAX_NUM_MESSAGES)
+ num_messages = MAX_NUM_MESSAGES;
+
+ msgvec = g_newa (struct mmsghdr, num_messages);
+
+ for (i = 0; i < num_messages; ++i)
+ {
+ GOutputMessage *msg = &messages[i];
+ struct msghdr *msg_hdr = &msgvec[i].msg_hdr;
+
+ msgvec[i].msg_len = 0;
+
+ msg_hdr->msg_flags = 0;
+
+ /* name */
+ if (i > 0 && msg->address == messages[i-1].address)
+ {
+ msg_hdr->msg_name = msgvec[i-1].msg_hdr.msg_name;
+ msg_hdr->msg_namelen = msgvec[i-1].msg_hdr.msg_namelen;
+ }
+ else if (msg->address)
+ {
+ msg_hdr->msg_namelen = g_socket_address_get_native_size (msg->address);
+ msg_hdr->msg_name = g_alloca (msg_hdr->msg_namelen);
+ if (!g_socket_address_to_native (msg->address, msg_hdr->msg_name, msg_hdr->msg_namelen, error))
+ return -1;
+ }
+ else
+ {
+ msg_hdr->msg_name = NULL;
+ msg_hdr->msg_namelen = 0;
+ }
+
+ /* iov */
+ {
+ /* this entire expression will be evaluated at compile time */
+ if (sizeof (struct iovec) == sizeof (GOutputVector) &&
+ sizeof msg_hdr->msg_iov->iov_base == sizeof msg->vectors->buffer &&
+ G_STRUCT_OFFSET (struct iovec, iov_base) ==
+ G_STRUCT_OFFSET (GOutputVector, buffer) &&
+ sizeof msg_hdr->msg_iov->iov_len == sizeof msg->vectors->size &&
+ G_STRUCT_OFFSET (struct iovec, iov_len) ==
+ G_STRUCT_OFFSET (GOutputVector, size))
+ /* ABI is compatible */
+ {
+ msg_hdr->msg_iov = (struct iovec *) msg->vectors;
+ msg_hdr->msg_iovlen = msg->num_vectors;
+ }
+ else
+ /* ABI is incompatible */
+ {
+ gint j;
+
+ msg_hdr->msg_iov = g_newa (struct iovec, msg->num_vectors);
+ for (j = 0; j < msg->num_vectors; j++)
+ {
+ msg_hdr->msg_iov[j].iov_base = (void *) msg->vectors[j].buffer;
+ msg_hdr->msg_iov[j].iov_len = msg->vectors[j].size;
+ }
+ msg_hdr->msg_iovlen = msg->num_vectors;
+ }
+ }
+
+ /* control */
+ {
+ struct cmsghdr *cmsg;
+ gint j;
+
+ msg_hdr->msg_controllen = 0;
+ for (j = 0; j < msg->num_control_messages; j++)
+ msg_hdr->msg_controllen += CMSG_SPACE (g_socket_control_message_get_size
(msg->control_messages[j]));
+
+ if (msg_hdr->msg_controllen == 0)
+ msg_hdr->msg_control = NULL;
+ else
+ {
+ msg_hdr->msg_control = g_alloca (msg_hdr->msg_controllen);
+ memset (msg_hdr->msg_control, '\0', msg_hdr->msg_controllen);
+ }
+
+ cmsg = CMSG_FIRSTHDR (msg_hdr);
+ for (j = 0; j < msg->num_control_messages; j++)
+ {
+ GSocketControlMessage *cm = msg->control_messages[j];
+
+ cmsg->cmsg_level = g_socket_control_message_get_level (cm);
+ cmsg->cmsg_type = g_socket_control_message_get_msg_type (cm);
+ cmsg->cmsg_len = CMSG_LEN (g_socket_control_message_get_size (cm));
+ g_socket_control_message_serialize (cm, CMSG_DATA (cmsg));
+ cmsg = CMSG_NXTHDR (msg_hdr, cmsg);
+ }
+ g_assert (cmsg == NULL);
+ }
+ }
+
+ num_sent = result = 0;
+ max_sent = num_messages;
+ while (num_sent < num_messages)
+ {
+ gint ret;
+
+ if (socket->priv->blocking &&
+ !g_socket_condition_wait (socket,
+ G_IO_OUT, cancellable, error))
+ return -1;
+
+ ret = sendmmsg (socket->priv->fd, msgvec + num_sent, num_messages - num_sent,
+ flags | G_SOCKET_DEFAULT_SEND_FLAGS);
+
+ if (ret < 0)
+ {
+ int errsv = get_socket_errno ();
+
+ if (errsv == EINTR)
+ continue;
+
+ if (socket->priv->blocking &&
+ (errsv == EWOULDBLOCK ||
+ errsv == EAGAIN))
+ continue;
+
+ if (num_sent > 0 &&
+ (errsv == EWOULDBLOCK ||
+ errsv == EAGAIN))
+ {
+ max_sent = num_sent;
+ break;
+ }
+
+ g_set_error (error, G_IO_ERROR,
+ socket_io_error_from_errno (errsv),
+ _("Error sending message: %s"), socket_strerror (errsv));
+
+ /* we have to iterate over all messages below now, because we don't
+ * know where between num_sent and num_messages the error occured */
+ max_sent = num_messages;
+
+ result = -1;
+ break;
+ }
+
+ num_sent += ret;
+ result = num_sent;
+ }
+
+ for (i = 0; i < max_sent; ++i)
+ messages[i].bytes_sent = msgvec[i].msg_len;
+
+ return result;
+ }
+#else
+ {
+ gssize result;
+ gint i;
+
+ for (i = 0; i < num_messages; ++i)
+ {
+ GOutputMessage *msg = &messages[i];
+ GError *msg_error = NULL;
+
+ result = g_socket_send_message (socket, msg->address,
+ msg->vectors, msg->num_vectors,
+ msg->control_messages,
+ msg->num_control_messages,
+ flags, cancellable, &msg_error);
+
+ if (result < 0)
+ {
+ /* if we couldn't send all messages, just return how many we did
+ * manage to send, provided we managed to send at least one */
+ if (msg_error->code == G_IO_ERROR_WOULD_BLOCK && i > 0)
+ {
+ g_error_free (msg_error);
+ return i;
+ }
+ else
+ {
+ g_propagate_error (error, msg_error);
+ return -1;
+ }
+ }
+
+ msg->bytes_sent = result;
+ }
+
+ return i;
+ }
+#endif
+}
+
static GSocketAddress *
cache_recv_address (GSocket *socket, struct sockaddr *native, int native_len)
{
diff --git a/gio/gsocket.h b/gio/gsocket.h
index 8b49cc1..dd866e7 100644
--- a/gio/gsocket.h
+++ b/gio/gsocket.h
@@ -236,6 +236,15 @@ gssize g_socket_send_message (GSocket
gint flags,
GCancellable *cancellable,
GError **error);
+
+GLIB_AVAILABLE_IN_2_44
+gint g_socket_send_messages (GSocket *socket,
+ GOutputMessage *messages,
+ guint num_messages,
+ gint flags,
+ GCancellable *cancellable,
+ GError **error);
+
GLIB_AVAILABLE_IN_ALL
gboolean g_socket_close (GSocket *socket,
GError **error);
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]