[aravis/af_packet: 5/5] gv_stream: new ArvGvspSocket wrapper around socket API
- From: Emmanuel Pacaud <emmanuel src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [aravis/af_packet: 5/5] gv_stream: new ArvGvspSocket wrapper around socket API
- Date: Sat, 13 Dec 2014 19:48:13 +0000 (UTC)
commit 5245bef0849949907435ce26a1f138e414b0336d
Author: Emmanuel Pacaud <emmanuel gnome org>
Date: Sat Dec 13 20:46:37 2014 +0100
gv_stream: new ArvGvspSocket wrapper around socket API
In preparation to AF_PACKET socket support.
docs/reference/aravis/Makefile.am | 1 +
src/Makefile.am | 3 +-
src/arvgvspsocket.c | 117 +++++++++++++++++++++++++++++++++++++
src/arvgvspsocketprivate.h | 17 +++++
src/arvgvstream.c | 74 ++++++------------------
5 files changed, 155 insertions(+), 57 deletions(-)
---
diff --git a/docs/reference/aravis/Makefile.am b/docs/reference/aravis/Makefile.am
index cdaa864..e2cf843 100644
--- a/docs/reference/aravis/Makefile.am
+++ b/docs/reference/aravis/Makefile.am
@@ -68,6 +68,7 @@ IGNORE_HFILES=\
arvdeviceprivate.h \
arvstreamprivate.h \
arvrealtimeprivate.h \
+ arvgvspsocketprivate.h \
arv.h
# Images to copy into HTML directory.
diff --git a/src/Makefile.am b/src/Makefile.am
index c6626a9..6f11e94 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -83,7 +83,8 @@ ARAVIS_SRCS_NO_INTRO = \
arvzip.c \
arvstr.c \
arvgvcp.c \
- arvgvsp.c
+ arvgvsp.c \
+ arvgvspsocket.c
ARAVIS_HDRS = \
arv.h \
diff --git a/src/arvgvspsocket.c b/src/arvgvspsocket.c
new file mode 100644
index 0000000..046049c
--- /dev/null
+++ b/src/arvgvspsocket.c
@@ -0,0 +1,117 @@
+#include <arvgvspsocketprivate.h>
+#include <arvgvcp.h>
+#include <sys/socket.h>
+
+typedef enum {
+ ARV_GVSP_SOCKET_TYPE_GLIB,
+ ARV_GVSP_SOCKET_TYPE_PACKET
+} ArvSocketType;
+
+struct _ArvGvspSocket {
+ ArvSocketType type;
+ int fd;
+ union {
+ struct {
+ GSocket *socket;
+ GPollFD poll_fd;
+ int current_buffer_size;
+ GSocketAddress *device_address;
+ } glib;
+ struct {
+ } packet;
+ };
+};
+
+ArvGvspSocket *
+arv_gvsp_socket_new (GInetAddress *device_address)
+{
+ ArvGvspSocket *gvsp_socket;
+ GInetAddress *incoming_inet_address;
+ GSocketAddress *incoming_address;
+
+ gvsp_socket = g_new0 (ArvGvspSocket, 1);
+ gvsp_socket->type = ARV_GVSP_SOCKET_TYPE_GLIB;
+
+ gvsp_socket->glib.socket = g_socket_new (G_SOCKET_FAMILY_IPV4,
+ G_SOCKET_TYPE_DATAGRAM,
+ G_SOCKET_PROTOCOL_UDP, NULL);
+
+ incoming_inet_address = g_inet_address_new_any (G_SOCKET_FAMILY_IPV4);
+ incoming_address = g_inet_socket_address_new (incoming_inet_address, 0);
+ g_object_unref (incoming_inet_address);
+
+ g_socket_bind (gvsp_socket->glib.socket, incoming_address, TRUE, NULL);
+
+ g_object_unref (incoming_address);
+
+ gvsp_socket->fd = g_socket_get_fd (gvsp_socket->glib.socket);
+ gvsp_socket->glib.poll_fd.fd = gvsp_socket->fd;
+ gvsp_socket->glib.poll_fd.events = G_IO_IN;
+ gvsp_socket->glib.poll_fd.revents = 0;
+ gvsp_socket->glib.device_address = g_inet_socket_address_new (device_address, ARV_GVCP_PORT);
+ gvsp_socket->glib.current_buffer_size = 0;
+
+ return gvsp_socket;
+}
+
+void
+arv_gvsp_socket_free (ArvGvspSocket *gvsp_socket)
+{
+ g_return_if_fail (gvsp_socket != NULL);
+
+ g_object_unref (gvsp_socket->glib.socket);
+ g_object_unref (gvsp_socket->glib.device_address);
+ g_free (gvsp_socket);
+}
+
+gint16
+arv_gvsp_socket_get_port (ArvGvspSocket *gvsp_socket)
+{
+ GInetSocketAddress *local_address;
+ guint16 port;
+
+ g_return_val_if_fail (gvsp_socket != NULL, 0);
+
+ local_address = G_INET_SOCKET_ADDRESS (g_socket_get_local_address (gvsp_socket->glib.socket, NULL));
+ port = g_inet_socket_address_get_port (local_address);
+ g_object_unref (local_address);
+
+ return port;
+}
+
+int
+arv_gvsp_socket_poll (ArvGvspSocket *gvsp_socket, int timeout_ms)
+{
+ g_return_val_if_fail (gvsp_socket != NULL, 0);
+
+ return g_poll (&gvsp_socket->glib.poll_fd, 1, timeout_ms);
+}
+
+gsize
+arv_gvsp_socket_receive (ArvGvspSocket *gvsp_socket, char *buffer, gsize size)
+{
+ g_return_val_if_fail (gvsp_socket != NULL, 0);
+
+ return g_socket_receive (gvsp_socket->glib.socket, buffer, size, NULL, NULL);
+}
+
+gboolean
+arv_gvsp_socket_send_to (ArvGvspSocket *gvsp_socket, const char *data, gsize size)
+{
+ g_return_val_if_fail (gvsp_socket != NULL, FALSE);
+
+ return g_socket_send_to (gvsp_socket->glib.socket, gvsp_socket->glib.device_address, data, size,
NULL, NULL) >= 0;
+}
+
+gboolean
+arv_gvsp_socket_set_buffer_size (ArvGvspSocket *gvsp_socket, gsize buffer_size)
+{
+ g_return_val_if_fail (gvsp_socket != NULL, FALSE);
+
+ if (buffer_size != gvsp_socket->glib.current_buffer_size) {
+ setsockopt (gvsp_socket->fd, SOL_SOCKET, SO_RCVBUF, &buffer_size, sizeof (buffer_size));
+ gvsp_socket->glib.current_buffer_size = buffer_size;
+ }
+
+ return TRUE;
+}
diff --git a/src/arvgvspsocketprivate.h b/src/arvgvspsocketprivate.h
new file mode 100644
index 0000000..bed9163
--- /dev/null
+++ b/src/arvgvspsocketprivate.h
@@ -0,0 +1,17 @@
+#ifndef ARV_GVSP_SOCKET_PRIVATE_H
+#define ARV_GVSP_SOCKET_PRIVATE_H
+
+#include <arvtypes.h>
+#include <gio/gio.h>
+
+typedef struct _ArvGvspSocket ArvGvspSocket;
+
+ArvGvspSocket * arv_gvsp_socket_new (GInetAddress *device_address);
+void arv_gvsp_socket_free (ArvGvspSocket *gvsp_socket);
+int arv_gvsp_socket_poll (ArvGvspSocket *gvsp_socket, int timeout_ms);
+gsize arv_gvsp_socket_receive (ArvGvspSocket *gvsp_socket, char *buffer,
gsize size);
+gboolean arv_gvsp_socket_send_to (ArvGvspSocket *gvsp_socket, const char
*data, gsize size);
+gboolean arv_gvsp_socket_set_buffer_size (ArvGvspSocket *gvsp_socket, gsize size);
+gint16 arv_gvsp_socket_get_port (ArvGvspSocket *gvsp_socket);
+
+#endif
diff --git a/src/arvgvstream.c b/src/arvgvstream.c
index fbd7c23..93955eb 100644
--- a/src/arvgvstream.c
+++ b/src/arvgvstream.c
@@ -28,6 +28,7 @@
#include <arvgvstream.h>
#include <arvstreamprivate.h>
#include <arvbufferprivate.h>
+#include <arvgvspsocketprivate.h>
#include <arvgvsp.h>
#include <arvgvcp.h>
#include <arvdebug.h>
@@ -57,8 +58,7 @@ enum {
static GObjectClass *parent_class = NULL;
struct _ArvGvStreamPrivate {
- GSocket *socket;
- GSocketAddress *incoming_address;
+ gint gvsp_incoming_port;
GThread *thread;
void *thread_data;
@@ -91,8 +91,7 @@ typedef struct {
ArvStreamCallback callback;
void *user_data;
- GSocket *socket;
- GSocketAddress *device_address;
+ ArvGvspSocket *socket;
ArvGvStreamPacketResend packet_resend;
guint packet_timeout_us;
@@ -132,7 +131,6 @@ typedef struct {
ArvGvStreamSocketBuffer socket_buffer_option;
int socket_buffer_size;
- int current_socket_buffer_size;
} ArvGvStreamThreadData;
static void
@@ -154,8 +152,7 @@ _send_packet_request (ArvGvStreamThreadData *thread_data,
arv_gvcp_packet_debug (packet, ARV_DEBUG_LEVEL_LOG);
- g_socket_send_to (thread_data->socket, thread_data->device_address, (const char *) packet,
packet_size,
- NULL, NULL);
+ arv_gvsp_socket_send_to (thread_data->socket, (const char *) packet, packet_size);
arv_gvcp_packet_free (packet);
}
@@ -163,14 +160,12 @@ _send_packet_request (ArvGvStreamThreadData *thread_data,
static void
_update_socket (ArvGvStreamThreadData *thread_data, ArvBuffer *buffer)
{
- int buffer_size, fd;
+ int buffer_size;
if (thread_data->socket_buffer_option == ARV_GV_STREAM_SOCKET_BUFFER_FIXED &&
thread_data->socket_buffer_size <= 0)
return;
- fd = g_socket_get_fd (thread_data->socket);
-
switch (thread_data->socket_buffer_option) {
case ARV_GV_STREAM_SOCKET_BUFFER_FIXED:
buffer_size = thread_data->socket_buffer_size;
@@ -183,11 +178,7 @@ _update_socket (ArvGvStreamThreadData *thread_data, ArvBuffer *buffer)
break;
}
- if (buffer_size != thread_data->current_socket_buffer_size) {
- setsockopt (fd, SOL_SOCKET, SO_RCVBUF, &buffer_size, sizeof (buffer_size));
- thread_data->current_socket_buffer_size = buffer_size;
- arv_debug_stream_thread ("[GvStream::update_socket] Socket buffer size set to %d",
buffer_size);
- }
+ arv_gvsp_socket_set_buffer_size (thread_data->socket, buffer_size);
}
static void
@@ -584,7 +575,6 @@ arv_gv_stream_thread (void *data)
guint32 frame_id;
GTimeVal current_time;
guint64 time_us;
- GPollFD poll_fd;
size_t read_count;
int timeout_ms;
int n_events;
@@ -601,10 +591,6 @@ arv_gv_stream_thread (void *data)
if (thread_data->callback != NULL)
thread_data->callback (thread_data->user_data, ARV_STREAM_CALLBACK_TYPE_INIT, NULL);
- poll_fd.fd = g_socket_get_fd (thread_data->socket);
- poll_fd.events = G_IO_IN;
- poll_fd.revents = 0;
-
packet = g_malloc0 (ARV_GV_STREAM_INCOMING_BUFFER_SIZE);
do {
@@ -613,7 +599,7 @@ arv_gv_stream_thread (void *data)
else
timeout_ms = ARV_GV_STREAM_POLL_TIMEOUT_US / 1000;
- n_events = g_poll (&poll_fd, 1, timeout_ms);
+ n_events = arv_gvsp_socket_poll (thread_data->socket, timeout_ms);
g_get_current_time (¤t_time);
time_us = current_time.tv_sec * 1000000 + current_time.tv_usec;
@@ -621,8 +607,8 @@ arv_gv_stream_thread (void *data)
if (n_events > 0) {
thread_data->n_received_packets++;
- read_count = g_socket_receive (thread_data->socket, (char *) packet,
- ARV_GV_STREAM_INCOMING_BUFFER_SIZE, NULL, NULL);
+ read_count = arv_gvsp_socket_receive (thread_data->socket, (char *) packet,
+ ARV_GV_STREAM_INCOMING_BUFFER_SIZE);
frame_id = arv_gvsp_packet_get_frame_id (packet);
packet_id = arv_gvsp_packet_get_packet_id (packet);
@@ -704,16 +690,9 @@ arv_gv_stream_thread (void *data)
guint16
arv_gv_stream_get_port (ArvGvStream *gv_stream)
{
- GInetSocketAddress *local_address;
- guint16 port;
-
g_return_val_if_fail (ARV_IS_GV_STREAM (gv_stream), 0);
- local_address = G_INET_SOCKET_ADDRESS (g_socket_get_local_address (gv_stream->priv->socket, NULL));
- port = g_inet_socket_address_get_port (local_address);
- g_object_unref (local_address);
-
- return port;
+ return gv_stream->priv->gvsp_incoming_port;
}
/**
@@ -736,7 +715,6 @@ arv_gv_stream_new (GInetAddress *device_address, guint16 port,
{
ArvGvStream *gv_stream;
ArvStream *stream;
- GInetAddress *incoming_inet_address;
ArvGvStreamThreadData *thread_data;
g_return_val_if_fail (G_IS_INET_ADDRESS (device_address), NULL);
@@ -746,22 +724,11 @@ arv_gv_stream_new (GInetAddress *device_address, guint16 port,
stream = ARV_STREAM (gv_stream);
- gv_stream->priv->socket = g_socket_new (G_SOCKET_FAMILY_IPV4,
- G_SOCKET_TYPE_DATAGRAM,
- G_SOCKET_PROTOCOL_UDP, NULL);
-
- incoming_inet_address = g_inet_address_new_any (G_SOCKET_FAMILY_IPV4);
- gv_stream->priv->incoming_address = g_inet_socket_address_new (incoming_inet_address, port);
- g_object_unref (incoming_inet_address);
-
- g_socket_bind (gv_stream->priv->socket, gv_stream->priv->incoming_address, TRUE, NULL);
-
thread_data = g_new (ArvGvStreamThreadData, 1);
thread_data->stream = stream;
thread_data->callback = callback;
thread_data->user_data = user_data;
- thread_data->socket = gv_stream->priv->socket;
- thread_data->device_address = g_inet_socket_address_new (device_address, ARV_GVCP_PORT);
+ thread_data->socket = arv_gvsp_socket_new (device_address);
thread_data->packet_resend = ARV_GV_STREAM_PACKET_RESEND_ALWAYS;
thread_data->packet_timeout_us = ARV_GV_STREAM_PACKET_TIMEOUT_US_DEFAULT;
thread_data->frame_retention_us = ARV_GV_STREAM_FRAME_RETENTION_US_DEFAULT;
@@ -796,11 +763,10 @@ arv_gv_stream_new (GInetAddress *device_address, guint16 port,
thread_data->socket_buffer_option = ARV_GV_STREAM_SOCKET_BUFFER_FIXED;
thread_data->socket_buffer_size = 0;
- thread_data->current_socket_buffer_size = 0;
gv_stream->priv->thread_data = thread_data;
-
gv_stream->priv->thread = arv_g_thread_new ("arv_gv_stream", arv_gv_stream_thread,
gv_stream->priv->thread_data);
+ gv_stream->priv->gvsp_incoming_port = arv_gvsp_socket_get_port (thread_data->socket);
return ARV_STREAM (gv_stream);
}
@@ -913,18 +879,16 @@ static void
arv_gv_stream_finalize (GObject *object)
{
ArvGvStream *gv_stream = ARV_GV_STREAM (object);
+ ArvGvStreamThreadData *thread_data;
+
+ thread_data = gv_stream->priv->thread_data;
if (gv_stream->priv->thread != NULL) {
- ArvGvStreamThreadData *thread_data;
char *statistic_string;
- thread_data = gv_stream->priv->thread_data;
-
thread_data->cancel = TRUE;
g_thread_join (gv_stream->priv->thread);
- g_object_unref (thread_data->device_address);
-
statistic_string = arv_statistic_to_string (thread_data->statistic);
arv_debug_stream (statistic_string);
g_free (statistic_string);
@@ -962,14 +926,12 @@ arv_gv_stream_finalize (GObject *object)
arv_debug_stream ("[GvStream::finalize] n_duplicated_packets = %u",
thread_data->n_duplicated_packets);
- g_free (thread_data);
-
- gv_stream->priv->thread_data = NULL;
gv_stream->priv->thread = NULL;
}
- g_clear_object (&gv_stream->priv->incoming_address);
- g_clear_object (&gv_stream->priv->socket);
+ arv_gvsp_socket_free (thread_data->socket);
+
+ g_clear_pointer (&gv_stream->priv->thread_data, g_free);
parent_class->finalize (object);
}
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]