[glib] gsocket: add g_socket_send_messages()



commit fff5c7cd631f7eefeb93412b1d9d90517c4b895e
Author: Tim-Philipp Müller <tim centricular com>
Date:   Thu Jun 12 18:16:45 2014 +0100

    gsocket: add g_socket_send_messages()
    
    Allows sending of multiple messages (packets, datagrams)
    in one go using sendmmsg(), thus drastically reducing the
    number of syscalls when sending out a lot of data, or when
    sending out the same data to multiple recipients.
    
    https://bugzilla.gnome.org/show_bug.cgi?id=719646

 configure.ac   |    2 +-
 gio/giotypes.h |   35 +++++++
 gio/gsocket.c  |  275 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 gio/gsocket.h  |    9 ++
 4 files changed, 320 insertions(+), 1 deletions(-)
---
diff --git a/configure.ac b/configure.ac
index ced92b2..aad3fa9 100644
--- a/configure.ac
+++ b/configure.ac
@@ -1030,7 +1030,7 @@ if $glib_failed ; then
   AC_MSG_ERROR([Could not determine values for MSG_* constants])
 fi
 
-AC_CHECK_FUNCS(getprotobyname_r endservent if_nametoindex if_indextoname)
+AC_CHECK_FUNCS(getprotobyname_r endservent if_nametoindex if_indextoname sendmmsg)
 
 AS_IF([test $glib_native_win32 = yes], [
   # <wspiapi.h> in the Windows SDK and in mingw-w64 has wrappers for
diff --git a/gio/giotypes.h b/gio/giotypes.h
index de62cef..7f93145 100644
--- a/gio/giotypes.h
+++ b/gio/giotypes.h
@@ -425,6 +425,41 @@ struct _GOutputVector {
   gsize size;
 };
 
+/**
+ * GOutputMessage:
+ * @address: (allow-none): a #GSocketAddress, or %NULL
+ * @vectors: pointer to an array of output vectors
+ * @num_vectors: the number of output vectors pointed to by @vectors.
+ * @bytes_sent: initialize to 0. Will be set to the number of bytes
+ *     that have been sent
+ * @control_messages: (array length=num_control_messages) (allow-none): a pointer
+ *   to an array of #GSocketControlMessages, or %NULL.
+ * @num_control_messages: number of elements in @control_messages.
+ *
+ * Structure used for scatter/gather data output when sending multiple
+ * messages or packets in one go. You generally pass in an array of
+ * #GOutputVectors and the operation will use all the buffers as if they
+ * were one buffer.
+ *
+ * If @address is %NULL then the message is sent to the default receiver
+ * (as previously set by g_socket_connect()).
+ *
+ * Since: 2.44
+ */
+typedef struct _GOutputMessage GOutputMessage;
+
+struct _GOutputMessage {
+  GSocketAddress         *address;
+
+  GOutputVector          *vectors;
+  guint                   num_vectors;
+
+  guint                   bytes_sent;
+
+  GSocketControlMessage **control_messages;
+  guint                   num_control_messages;
+};
+
 typedef struct _GCredentials                  GCredentials;
 typedef struct _GUnixCredentialsMessage       GUnixCredentialsMessage;
 typedef struct _GUnixFDList                   GUnixFDList;
diff --git a/gio/gsocket.c b/gio/gsocket.c
index 4d863ce..d9e135d 100644
--- a/gio/gsocket.c
+++ b/gio/gsocket.c
@@ -3987,6 +3987,281 @@ g_socket_send_message (GSocket                *socket,
 #endif
 }
 
+/**
+ * g_socket_send_messages:
+ * @socket: a #GSocket
+ * @messages: (array length=num_messages): an array of #GOutputMessage structs
+ * @num_messages: the number of elements in @messages
+ * @flags: an int containing #GSocketMsgFlags flags
+ * @cancellable: (allow-none): a %GCancellable or %NULL
+ * @error: #GError for error reporting, or %NULL to ignore.
+ *
+ * Send multiple data messages from @socket in one go.  This is the most
+ * complicated and fully-featured version of this call. For easier use, see
+ * g_socket_send(), g_socket_send_to(), and g_socket_send_message().
+ *
+ * @messages must point to an array of #GOutputMessage structs and
+ * @num_messages must be the length of this array. Each #GOutputMessage
+ * contains an address to send the data to, and a pointer to an array of
+ * #GOutputVector structs to describe the buffers that the data to be sent
+ * for each message will be gathered from. Using multiple #GOutputVectors is
+ * more memory-efficient than manually copying data from multiple sources
+ * into a single buffer, and more network-efficient than making multiple
+ * calls to g_socket_send(). Sending multiple messages in one go avoids the
+ * overhead of making a lot of syscalls in scenarios where a lot of data
+ * packets need to be sent (e.g. high-bandwidth video streaming over RTP/UDP),
+ * or where the same data needs to be sent to multiple recipients.
+ *
+ * @flags modify how the message is sent. The commonly available arguments
+ * for this are available in the #GSocketMsgFlags enum, but the
+ * values there are the same as the system values, and the flags
+ * are passed in as-is, so you can pass in system-specific flags too.
+ *
+ * If the socket is in blocking mode the call will block until there is
+ * space for all the data in the socket queue. If there is no space available
+ * and the socket is in non-blocking mode a %G_IO_ERROR_WOULD_BLOCK error
+ * will be returned if no data was written at all, otherwise the number of
+ * messages sent will be returned. To be notified when space is available,
+ * wait for the %G_IO_OUT condition. Note though that you may still receive
+ * %G_IO_ERROR_WOULD_BLOCK from g_socket_send() even if you were previously
+ * notified of a %G_IO_OUT condition. (On Windows in particular, this is
+ * very common due to the way the underlying APIs work.)
+ *
+ * On error -1 is returned and @error is set accordingly.
+ *
+ * Returns: number of messages sent, or -1 on error. Note that the number of
+ *     messages sent may be smaller than @num_messages if the socket is
+ *     non-blocking or if @num_messages was larger than UIO_MAXIOV (1024),
+ *     in which case the caller may re-try to send the remaining messages.
+ *
+ * Since: 2.44
+ */
+gint
+g_socket_send_messages (GSocket        *socket,
+                       GOutputMessage *messages,
+                       guint           num_messages,
+                       gint            flags,
+                       GCancellable   *cancellable,
+                       GError        **error)
+{
+  g_return_val_if_fail (G_IS_SOCKET (socket), -1);
+  g_return_val_if_fail (num_messages == 0 || messages != NULL, -1);
+  g_return_val_if_fail (cancellable == NULL || G_IS_CANCELLABLE (cancellable), -1);
+  g_return_val_if_fail (error == NULL || *error == NULL, -1);
+
+  if (!check_socket (socket, error))
+    return -1;
+
+  if (!check_timeout (socket, error))
+    return -1;
+
+  if (g_cancellable_set_error_if_cancelled (cancellable, error))
+    return -1;
+
+  if (num_messages == 0)
+    return 0;
+
+#if !defined (G_OS_WIN32) && defined (HAVE_SENDMMSG)
+  {
+    struct mmsghdr *msgvec;
+    gint i, num_sent, result, max_sent;
+
+#ifdef UIO_MAXIOV
+#define MAX_NUM_MESSAGES UIO_MAXIOV
+#else
+#define MAX_NUM_MESSAGES 1024
+#endif
+
+    if (num_messages > MAX_NUM_MESSAGES)
+      num_messages = MAX_NUM_MESSAGES;
+
+    msgvec = g_newa (struct mmsghdr, num_messages);
+
+    for (i = 0; i < num_messages; ++i)
+      {
+        GOutputMessage *msg = &messages[i];
+        struct msghdr *msg_hdr = &msgvec[i].msg_hdr;
+
+        msgvec[i].msg_len = 0;
+
+        msg_hdr->msg_flags = 0;
+
+        /* name */
+        if (i > 0 && msg->address == messages[i-1].address)
+          {
+            msg_hdr->msg_name = msgvec[i-1].msg_hdr.msg_name;
+            msg_hdr->msg_namelen = msgvec[i-1].msg_hdr.msg_namelen;
+          }
+        else if (msg->address)
+          {
+            msg_hdr->msg_namelen = g_socket_address_get_native_size (msg->address);
+            msg_hdr->msg_name = g_alloca (msg_hdr->msg_namelen);
+            if (!g_socket_address_to_native (msg->address, msg_hdr->msg_name, msg_hdr->msg_namelen, error))
+              return -1;
+          }
+        else
+          {
+            msg_hdr->msg_name = NULL;
+            msg_hdr->msg_namelen = 0;
+          }
+
+        /* iov */
+        {
+          /* this entire expression will be evaluated at compile time */
+          if (sizeof (struct iovec) == sizeof (GOutputVector) &&
+              sizeof msg_hdr->msg_iov->iov_base == sizeof msg->vectors->buffer &&
+              G_STRUCT_OFFSET (struct iovec, iov_base) ==
+              G_STRUCT_OFFSET (GOutputVector, buffer) &&
+              sizeof msg_hdr->msg_iov->iov_len == sizeof msg->vectors->size &&
+              G_STRUCT_OFFSET (struct iovec, iov_len) ==
+              G_STRUCT_OFFSET (GOutputVector, size))
+            /* ABI is compatible */
+            {
+              msg_hdr->msg_iov = (struct iovec *) msg->vectors;
+              msg_hdr->msg_iovlen = msg->num_vectors;
+            }
+          else
+            /* ABI is incompatible */
+            {
+              gint j;
+
+              msg_hdr->msg_iov = g_newa (struct iovec, msg->num_vectors);
+              for (j = 0; j < msg->num_vectors; j++)
+                {
+                  msg_hdr->msg_iov[j].iov_base = (void *) msg->vectors[j].buffer;
+                  msg_hdr->msg_iov[j].iov_len = msg->vectors[j].size;
+                }
+              msg_hdr->msg_iovlen = msg->num_vectors;
+            }
+        }
+
+        /* control */
+        {
+          struct cmsghdr *cmsg;
+          gint j;
+
+          msg_hdr->msg_controllen = 0;
+          for (j = 0; j < msg->num_control_messages; j++)
+            msg_hdr->msg_controllen += CMSG_SPACE (g_socket_control_message_get_size 
(msg->control_messages[j]));
+
+          if (msg_hdr->msg_controllen == 0)
+            msg_hdr->msg_control = NULL;
+          else
+            {
+              msg_hdr->msg_control = g_alloca (msg_hdr->msg_controllen);
+              memset (msg_hdr->msg_control, '\0', msg_hdr->msg_controllen);
+            }
+
+          cmsg = CMSG_FIRSTHDR (msg_hdr);
+          for (j = 0; j < msg->num_control_messages; j++)
+            {
+              GSocketControlMessage *cm = msg->control_messages[j];
+
+              cmsg->cmsg_level = g_socket_control_message_get_level (cm);
+              cmsg->cmsg_type = g_socket_control_message_get_msg_type (cm);
+              cmsg->cmsg_len = CMSG_LEN (g_socket_control_message_get_size (cm));
+              g_socket_control_message_serialize (cm, CMSG_DATA (cmsg));
+              cmsg = CMSG_NXTHDR (msg_hdr, cmsg);
+            }
+          g_assert (cmsg == NULL);
+        }
+      }
+
+    num_sent = result = 0;
+    max_sent = num_messages;
+    while (num_sent < num_messages)
+      {
+        gint ret;
+
+        if (socket->priv->blocking &&
+            !g_socket_condition_wait (socket,
+                                      G_IO_OUT, cancellable, error))
+          return -1;
+
+        ret = sendmmsg (socket->priv->fd, msgvec + num_sent, num_messages - num_sent,
+                        flags | G_SOCKET_DEFAULT_SEND_FLAGS);
+
+        if (ret < 0)
+          {
+            int errsv = get_socket_errno ();
+
+            if (errsv == EINTR)
+              continue;
+
+            if (socket->priv->blocking &&
+                (errsv == EWOULDBLOCK ||
+                 errsv == EAGAIN))
+              continue;
+
+            if (num_sent > 0 &&
+                (errsv == EWOULDBLOCK ||
+                 errsv == EAGAIN))
+              {
+                max_sent = num_sent;
+                break;
+              }
+
+            g_set_error (error, G_IO_ERROR,
+                         socket_io_error_from_errno (errsv),
+                         _("Error sending message: %s"), socket_strerror (errsv));
+
+            /* we have to iterate over all messages below now, because we don't
+             * know where between num_sent and num_messages the error occured */
+            max_sent = num_messages;
+
+            result = -1;
+            break;
+          }
+
+        num_sent += ret;
+        result = num_sent;
+      }
+
+    for (i = 0; i < max_sent; ++i)
+      messages[i].bytes_sent = msgvec[i].msg_len;
+
+    return result;
+  }
+#else
+  {
+    gssize result;
+    gint i;
+
+    for (i = 0; i < num_messages; ++i)
+      {
+        GOutputMessage *msg = &messages[i];
+        GError *msg_error = NULL;
+
+        result = g_socket_send_message (socket, msg->address,
+                                        msg->vectors, msg->num_vectors,
+                                        msg->control_messages,
+                                        msg->num_control_messages,
+                                        flags, cancellable, &msg_error);
+
+        if (result < 0)
+          {
+            /* if we couldn't send all messages, just return how many we did
+             * manage to send, provided we managed to send at least one */
+            if (msg_error->code == G_IO_ERROR_WOULD_BLOCK && i > 0)
+              {
+                g_error_free (msg_error);
+                return i;
+              }
+            else
+              {
+                g_propagate_error (error, msg_error);
+                return -1;
+              }
+          }
+
+        msg->bytes_sent = result;
+      }
+
+    return i;
+  }
+#endif
+}
+
 static GSocketAddress *
 cache_recv_address (GSocket *socket, struct sockaddr *native, int native_len)
 {
diff --git a/gio/gsocket.h b/gio/gsocket.h
index 8b49cc1..dd866e7 100644
--- a/gio/gsocket.h
+++ b/gio/gsocket.h
@@ -236,6 +236,15 @@ gssize                 g_socket_send_message            (GSocket
                                                         gint                     flags,
                                                         GCancellable            *cancellable,
                                                         GError                 **error);
+
+GLIB_AVAILABLE_IN_2_44
+gint                   g_socket_send_messages           (GSocket                 *socket,
+                                                        GOutputMessage          *messages,
+                                                        guint                    num_messages,
+                                                        gint                     flags,
+                                                        GCancellable            *cancellable,
+                                                        GError                 **error);
+
 GLIB_AVAILABLE_IN_ALL
 gboolean               g_socket_close                   (GSocket                 *socket,
                                                         GError                 **error);


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]