[aravis/wip/emmanuel/af_packet] gv_stream: new ArvGvspSocket wrapper around socket API



commit da0292f9d3d84a5f97101373aabb354df235b180
Author: Emmanuel Pacaud <emmanuel gnome org>
Date:   Sat Dec 13 20:46:37 2014 +0100

    gv_stream: new ArvGvspSocket wrapper around socket API
    
    In preparation to AF_PACKET socket support.

 docs/reference/aravis/Makefile.am |    1 +
 src/Makefile.am                   |    3 +-
 src/arvgvspsocket.c               |  117 +++++++++++++++++++++++++++++++++++++
 src/arvgvspsocketprivate.h        |   17 +++++
 src/arvgvstream.c                 |   74 ++++++------------------
 5 files changed, 155 insertions(+), 57 deletions(-)
---
diff --git a/docs/reference/aravis/Makefile.am b/docs/reference/aravis/Makefile.am
index cdaa864..e2cf843 100644
--- a/docs/reference/aravis/Makefile.am
+++ b/docs/reference/aravis/Makefile.am
@@ -68,6 +68,7 @@ IGNORE_HFILES=\
        arvdeviceprivate.h              \
        arvstreamprivate.h              \
        arvrealtimeprivate.h            \
+       arvgvspsocketprivate.h          \
        arv.h
 
 # Images to copy into HTML directory.
diff --git a/src/Makefile.am b/src/Makefile.am
index c6626a9..6f11e94 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -83,7 +83,8 @@ ARAVIS_SRCS_NO_INTRO =                                \
        arvzip.c                                \
        arvstr.c                                \
        arvgvcp.c                               \
-       arvgvsp.c
+       arvgvsp.c                               \
+       arvgvspsocket.c
 
 ARAVIS_HDRS =                                  \
        arv.h                                   \
diff --git a/src/arvgvspsocket.c b/src/arvgvspsocket.c
new file mode 100644
index 0000000..046049c
--- /dev/null
+++ b/src/arvgvspsocket.c
@@ -0,0 +1,117 @@
+#include <arvgvspsocketprivate.h>
+#include <arvgvcp.h>
+#include <sys/socket.h>
+
+typedef enum {
+       ARV_GVSP_SOCKET_TYPE_GLIB,
+       ARV_GVSP_SOCKET_TYPE_PACKET
+} ArvSocketType;
+
+struct _ArvGvspSocket {
+       ArvSocketType type;
+       int fd;
+       union {
+               struct {
+                       GSocket *socket;
+                       GPollFD poll_fd;
+                       int current_buffer_size;
+                       GSocketAddress *device_address;
+               } glib;
+               struct {
+               } packet;
+       };
+};
+
+ArvGvspSocket *
+arv_gvsp_socket_new (GInetAddress *device_address)
+{
+       ArvGvspSocket *gvsp_socket;
+       GInetAddress *incoming_inet_address;
+       GSocketAddress *incoming_address;
+
+       gvsp_socket = g_new0 (ArvGvspSocket, 1);
+       gvsp_socket->type = ARV_GVSP_SOCKET_TYPE_GLIB;
+
+       gvsp_socket->glib.socket = g_socket_new (G_SOCKET_FAMILY_IPV4,
+                                                G_SOCKET_TYPE_DATAGRAM,
+                                                G_SOCKET_PROTOCOL_UDP, NULL);
+
+       incoming_inet_address = g_inet_address_new_any (G_SOCKET_FAMILY_IPV4);
+       incoming_address = g_inet_socket_address_new (incoming_inet_address, 0);
+       g_object_unref (incoming_inet_address);
+
+       g_socket_bind (gvsp_socket->glib.socket, incoming_address, TRUE, NULL);
+
+       g_object_unref (incoming_address);
+
+       gvsp_socket->fd = g_socket_get_fd (gvsp_socket->glib.socket);
+       gvsp_socket->glib.poll_fd.fd = gvsp_socket->fd;
+       gvsp_socket->glib.poll_fd.events = G_IO_IN;
+       gvsp_socket->glib.poll_fd.revents = 0;
+       gvsp_socket->glib.device_address = g_inet_socket_address_new (device_address, ARV_GVCP_PORT);
+       gvsp_socket->glib.current_buffer_size = 0;
+
+       return gvsp_socket;
+} 
+
+void
+arv_gvsp_socket_free (ArvGvspSocket *gvsp_socket)
+{
+       g_return_if_fail (gvsp_socket != NULL);
+
+       g_object_unref (gvsp_socket->glib.socket);
+       g_object_unref (gvsp_socket->glib.device_address);
+       g_free (gvsp_socket);
+}
+
+gint16
+arv_gvsp_socket_get_port (ArvGvspSocket *gvsp_socket)
+{
+       GInetSocketAddress *local_address;
+       guint16 port;
+
+       g_return_val_if_fail (gvsp_socket != NULL, 0);
+
+       local_address = G_INET_SOCKET_ADDRESS (g_socket_get_local_address (gvsp_socket->glib.socket, NULL));
+       port = g_inet_socket_address_get_port (local_address);
+       g_object_unref (local_address);
+
+       return port;
+}
+
+int
+arv_gvsp_socket_poll (ArvGvspSocket *gvsp_socket, int timeout_ms)
+{
+       g_return_val_if_fail (gvsp_socket != NULL, 0);
+
+       return g_poll (&gvsp_socket->glib.poll_fd, 1, timeout_ms);
+}
+
+gsize
+arv_gvsp_socket_receive (ArvGvspSocket *gvsp_socket, char *buffer, gsize size)
+{
+       g_return_val_if_fail (gvsp_socket != NULL, 0);
+
+       return g_socket_receive (gvsp_socket->glib.socket, buffer, size, NULL, NULL);
+}
+
+gboolean
+arv_gvsp_socket_send_to (ArvGvspSocket *gvsp_socket, const char *data, gsize size)
+{
+       g_return_val_if_fail (gvsp_socket != NULL, FALSE);
+
+       return g_socket_send_to (gvsp_socket->glib.socket, gvsp_socket->glib.device_address, data, size, 
NULL, NULL) >= 0;
+}
+
+gboolean
+arv_gvsp_socket_set_buffer_size        (ArvGvspSocket *gvsp_socket, gsize buffer_size)
+{
+       g_return_val_if_fail (gvsp_socket != NULL, FALSE);
+
+       if (buffer_size != gvsp_socket->glib.current_buffer_size) {
+               setsockopt (gvsp_socket->fd, SOL_SOCKET, SO_RCVBUF, &buffer_size, sizeof (buffer_size));
+               gvsp_socket->glib.current_buffer_size = buffer_size;
+       }
+
+       return TRUE;
+}
diff --git a/src/arvgvspsocketprivate.h b/src/arvgvspsocketprivate.h
new file mode 100644
index 0000000..bed9163
--- /dev/null
+++ b/src/arvgvspsocketprivate.h
@@ -0,0 +1,17 @@
+#ifndef ARV_GVSP_SOCKET_PRIVATE_H
+#define ARV_GVSP_SOCKET_PRIVATE_H
+
+#include <arvtypes.h>
+#include <gio/gio.h>
+
+typedef struct _ArvGvspSocket          ArvGvspSocket;
+
+ArvGvspSocket *                arv_gvsp_socket_new             (GInetAddress *device_address);
+void                           arv_gvsp_socket_free            (ArvGvspSocket *gvsp_socket);
+int                            arv_gvsp_socket_poll            (ArvGvspSocket *gvsp_socket, int timeout_ms);
+gsize                          arv_gvsp_socket_receive         (ArvGvspSocket *gvsp_socket, char *buffer, 
gsize size);
+gboolean                       arv_gvsp_socket_send_to         (ArvGvspSocket *gvsp_socket, const char 
*data, gsize size);
+gboolean                       arv_gvsp_socket_set_buffer_size (ArvGvspSocket *gvsp_socket, gsize size);
+gint16                                 arv_gvsp_socket_get_port        (ArvGvspSocket *gvsp_socket);
+
+#endif
diff --git a/src/arvgvstream.c b/src/arvgvstream.c
index fbd7c23..93955eb 100644
--- a/src/arvgvstream.c
+++ b/src/arvgvstream.c
@@ -28,6 +28,7 @@
 #include <arvgvstream.h>
 #include <arvstreamprivate.h>
 #include <arvbufferprivate.h>
+#include <arvgvspsocketprivate.h>
 #include <arvgvsp.h>
 #include <arvgvcp.h>
 #include <arvdebug.h>
@@ -57,8 +58,7 @@ enum {
 static GObjectClass *parent_class = NULL;
 
 struct _ArvGvStreamPrivate {
-       GSocket *socket;
-       GSocketAddress *incoming_address;
+       gint gvsp_incoming_port;
 
        GThread *thread;
        void *thread_data;
@@ -91,8 +91,7 @@ typedef struct {
        ArvStreamCallback callback;
        void *user_data;
 
-       GSocket *socket;
-       GSocketAddress *device_address;
+       ArvGvspSocket *socket;
 
        ArvGvStreamPacketResend packet_resend;
        guint packet_timeout_us;
@@ -132,7 +131,6 @@ typedef struct {
 
        ArvGvStreamSocketBuffer socket_buffer_option;
        int socket_buffer_size;
-       int current_socket_buffer_size;
 } ArvGvStreamThreadData;
 
 static void
@@ -154,8 +152,7 @@ _send_packet_request (ArvGvStreamThreadData *thread_data,
 
        arv_gvcp_packet_debug (packet, ARV_DEBUG_LEVEL_LOG);
 
-       g_socket_send_to (thread_data->socket, thread_data->device_address, (const char *) packet, 
packet_size,
-                         NULL, NULL);
+       arv_gvsp_socket_send_to (thread_data->socket, (const char *) packet, packet_size);
 
        arv_gvcp_packet_free (packet);
 }
@@ -163,14 +160,12 @@ _send_packet_request (ArvGvStreamThreadData *thread_data,
 static void
 _update_socket (ArvGvStreamThreadData *thread_data, ArvBuffer *buffer)
 {
-       int buffer_size, fd;
+       int buffer_size;
 
        if (thread_data->socket_buffer_option == ARV_GV_STREAM_SOCKET_BUFFER_FIXED &&
            thread_data->socket_buffer_size <= 0)
                return;
 
-       fd = g_socket_get_fd (thread_data->socket);
-
        switch (thread_data->socket_buffer_option) {
                case ARV_GV_STREAM_SOCKET_BUFFER_FIXED:
                        buffer_size = thread_data->socket_buffer_size;
@@ -183,11 +178,7 @@ _update_socket (ArvGvStreamThreadData *thread_data, ArvBuffer *buffer)
                        break;
        }
 
-       if (buffer_size != thread_data->current_socket_buffer_size) {
-               setsockopt (fd, SOL_SOCKET, SO_RCVBUF, &buffer_size, sizeof (buffer_size));
-               thread_data->current_socket_buffer_size = buffer_size;
-               arv_debug_stream_thread ("[GvStream::update_socket] Socket buffer size set to %d", 
buffer_size);
-       }
+       arv_gvsp_socket_set_buffer_size (thread_data->socket, buffer_size);
 }
 
 static void
@@ -584,7 +575,6 @@ arv_gv_stream_thread (void *data)
        guint32 frame_id;
        GTimeVal current_time;
        guint64 time_us;
-       GPollFD poll_fd;
        size_t read_count;
        int timeout_ms;
        int n_events;
@@ -601,10 +591,6 @@ arv_gv_stream_thread (void *data)
        if (thread_data->callback != NULL)
                thread_data->callback (thread_data->user_data, ARV_STREAM_CALLBACK_TYPE_INIT, NULL);
 
-       poll_fd.fd = g_socket_get_fd (thread_data->socket);
-       poll_fd.events =  G_IO_IN;
-       poll_fd.revents = 0;
-
        packet = g_malloc0 (ARV_GV_STREAM_INCOMING_BUFFER_SIZE);
 
        do {
@@ -613,7 +599,7 @@ arv_gv_stream_thread (void *data)
                else
                        timeout_ms = ARV_GV_STREAM_POLL_TIMEOUT_US / 1000;
 
-               n_events = g_poll (&poll_fd, 1, timeout_ms);
+               n_events = arv_gvsp_socket_poll (thread_data->socket, timeout_ms);
 
                g_get_current_time (&current_time);
                time_us = current_time.tv_sec * 1000000 + current_time.tv_usec;
@@ -621,8 +607,8 @@ arv_gv_stream_thread (void *data)
                if (n_events > 0) {
                        thread_data->n_received_packets++;
 
-                       read_count = g_socket_receive (thread_data->socket, (char *) packet,
-                                                      ARV_GV_STREAM_INCOMING_BUFFER_SIZE, NULL, NULL);
+                       read_count = arv_gvsp_socket_receive (thread_data->socket, (char *) packet,
+                                                             ARV_GV_STREAM_INCOMING_BUFFER_SIZE);
 
                        frame_id = arv_gvsp_packet_get_frame_id (packet);
                        packet_id = arv_gvsp_packet_get_packet_id (packet);
@@ -704,16 +690,9 @@ arv_gv_stream_thread (void *data)
 guint16
 arv_gv_stream_get_port (ArvGvStream *gv_stream)
 {
-       GInetSocketAddress *local_address;
-       guint16 port;
-
        g_return_val_if_fail (ARV_IS_GV_STREAM (gv_stream), 0);
 
-       local_address = G_INET_SOCKET_ADDRESS (g_socket_get_local_address (gv_stream->priv->socket, NULL));
-       port = g_inet_socket_address_get_port (local_address);
-       g_object_unref (local_address);
-
-       return port;
+       return gv_stream->priv->gvsp_incoming_port;
 }
 
 /**
@@ -736,7 +715,6 @@ arv_gv_stream_new (GInetAddress *device_address, guint16 port,
 {
        ArvGvStream *gv_stream;
        ArvStream *stream;
-       GInetAddress *incoming_inet_address;
        ArvGvStreamThreadData *thread_data;
 
        g_return_val_if_fail (G_IS_INET_ADDRESS (device_address), NULL);
@@ -746,22 +724,11 @@ arv_gv_stream_new (GInetAddress *device_address, guint16 port,
 
        stream = ARV_STREAM (gv_stream);
 
-       gv_stream->priv->socket = g_socket_new (G_SOCKET_FAMILY_IPV4,
-                                         G_SOCKET_TYPE_DATAGRAM,
-                                         G_SOCKET_PROTOCOL_UDP, NULL);
-
-       incoming_inet_address = g_inet_address_new_any (G_SOCKET_FAMILY_IPV4);
-       gv_stream->priv->incoming_address = g_inet_socket_address_new (incoming_inet_address, port);
-       g_object_unref (incoming_inet_address);
-
-       g_socket_bind (gv_stream->priv->socket, gv_stream->priv->incoming_address, TRUE, NULL);
-
        thread_data = g_new (ArvGvStreamThreadData, 1);
        thread_data->stream = stream;
        thread_data->callback = callback;
        thread_data->user_data = user_data;
-       thread_data->socket = gv_stream->priv->socket;
-       thread_data->device_address = g_inet_socket_address_new (device_address, ARV_GVCP_PORT);
+       thread_data->socket = arv_gvsp_socket_new (device_address);
        thread_data->packet_resend = ARV_GV_STREAM_PACKET_RESEND_ALWAYS;
        thread_data->packet_timeout_us = ARV_GV_STREAM_PACKET_TIMEOUT_US_DEFAULT;
        thread_data->frame_retention_us = ARV_GV_STREAM_FRAME_RETENTION_US_DEFAULT;
@@ -796,11 +763,10 @@ arv_gv_stream_new (GInetAddress *device_address, guint16 port,
 
        thread_data->socket_buffer_option = ARV_GV_STREAM_SOCKET_BUFFER_FIXED;
        thread_data->socket_buffer_size = 0;
-       thread_data->current_socket_buffer_size = 0;
 
        gv_stream->priv->thread_data = thread_data;
-
        gv_stream->priv->thread = arv_g_thread_new ("arv_gv_stream", arv_gv_stream_thread, 
gv_stream->priv->thread_data);
+       gv_stream->priv->gvsp_incoming_port = arv_gvsp_socket_get_port (thread_data->socket);
 
        return ARV_STREAM (gv_stream);
 }
@@ -913,18 +879,16 @@ static void
 arv_gv_stream_finalize (GObject *object)
 {
        ArvGvStream *gv_stream = ARV_GV_STREAM (object);
+       ArvGvStreamThreadData *thread_data;
+
+       thread_data = gv_stream->priv->thread_data;
 
        if (gv_stream->priv->thread != NULL) {
-               ArvGvStreamThreadData *thread_data;
                char *statistic_string;
 
-               thread_data = gv_stream->priv->thread_data;
-
                thread_data->cancel = TRUE;
                g_thread_join (gv_stream->priv->thread);
 
-               g_object_unref (thread_data->device_address);
-
                statistic_string = arv_statistic_to_string (thread_data->statistic);
                arv_debug_stream (statistic_string);
                g_free (statistic_string);
@@ -962,14 +926,12 @@ arv_gv_stream_finalize (GObject *object)
                arv_debug_stream ("[GvStream::finalize] n_duplicated_packets   = %u",
                                  thread_data->n_duplicated_packets);
 
-               g_free (thread_data);
-
-               gv_stream->priv->thread_data = NULL;
                gv_stream->priv->thread = NULL;
        }
 
-       g_clear_object (&gv_stream->priv->incoming_address);
-       g_clear_object (&gv_stream->priv->socket);
+       arv_gvsp_socket_free (thread_data->socket);
+
+       g_clear_pointer (&gv_stream->priv->thread_data, g_free);
 
        parent_class->finalize (object);
 }


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]