[aravis/timeout-resend] WIP.
- From: Emmanuel Pacaud <emmanuel src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [aravis/timeout-resend] WIP.
- Date: Tue, 1 Mar 2011 09:35:58 +0000 (UTC)
commit 31b5789bdbba6da297ed82e5af9ede3465710fbb
Author: Emmanuel Pacaud <emmanuel gnome org>
Date: Tue Feb 22 15:50:17 2011 +0100
WIP.
src/arvgvstream.c | 155 ++++++++++++++++++++++++++++++++++++-----------------
src/arvgvstream.h | 3 +-
2 files changed, 107 insertions(+), 51 deletions(-)
---
diff --git a/src/arvgvstream.c b/src/arvgvstream.c
index 53e2d33..4ea8031 100644
--- a/src/arvgvstream.c
+++ b/src/arvgvstream.c
@@ -36,6 +36,7 @@
#include <sys/socket.h>
#define ARV_GV_STREAM_INCOMING_BUFFER_SIZE 65536
+#define ARV_GV_STREAM_THREAD_N_FRAMES 2
enum {
ARV_GV_STREAM_PROPERTY_0,
@@ -49,10 +50,21 @@ static GObjectClass *parent_class = NULL;
/* Acquisition thread */
typedef struct {
+ gboolean received;
+ guint n_requests;
+} ArvGvStreamPacketData;
+
+typedef struct {
ArvBuffer *buffer;
+ guint frame_id;
+
+ gint last_valid_packet;
+
+ guint n_packets;
+ ArvGvStreamPacketData *packet_data;
+
size_t read_data_size;
gint32 n_missing_blocks;
- gint32 n_late_blocks;
gint32 last_block_size;
gint32 last_packet_id;
gint64 last_time_us;
@@ -89,7 +101,6 @@ typedef struct {
guint n_resent_blocks;
guint n_missing_blocks;
- guint n_late_blocks;
ArvStatistic *statistic;
@@ -157,10 +168,21 @@ _close_buffer (ArvGvStreamThreadData *thread_data, ArvGvStreamFrameData *frame)
{
GTimeVal current_time;
gint64 current_time_us;
+ int i;
+ guint n_missing_packets = 0;
if (frame->buffer == NULL)
return;
+ for (i = 0; i < frame->n_packets; i++)
+ if (!frame->packet_data[i].received)
+ n_missing_packets++;
+
+ if (n_missing_packets != 0)
+ arv_debug ("stream", "[GvStream::_close_buffer] n_missing_packets = %d", n_missing_packets);
+
+ arv_debug ("stream", "[GvStream::_close_buffer] last_valid_packet = %d", frame->last_valid_packet);
+
if (frame->buffer->status == ARV_BUFFER_STATUS_FILLING) {
if (frame->read_data_size == frame->buffer->size &&
frame->n_missing_blocks == 0)
@@ -206,6 +228,7 @@ _close_buffer (ArvGvStreamThreadData *thread_data, ArvGvStreamFrameData *frame)
arv_stream_push_output_buffer (thread_data->stream, frame->buffer);
frame->buffer = NULL;
+ frame->frame_id = 0;
}
static void
@@ -213,16 +236,8 @@ _process_data_leader (ArvGvStreamThreadData *thread_data,
ArvGvStreamFrameData *frame,
ArvGvspPacket *packet)
{
- GTimeVal current_time;
-
- if (frame->buffer != NULL)
- _close_buffer (thread_data, frame);
-
- frame->buffer = arv_stream_pop_input_buffer (thread_data->stream);
- if (frame->buffer == NULL) {
- thread_data->n_underruns++;
+ if (frame->buffer == NULL)
return;
- }
_update_socket (thread_data, frame->buffer);
@@ -236,19 +251,10 @@ _process_data_leader (ArvGvStreamThreadData *thread_data,
frame->buffer->timestamp_ns = arv_gvsp_packet_get_timestamp (packet,
thread_data->timestamp_tick_frequency);
- frame->buffer->status = ARV_BUFFER_STATUS_FILLING;
- frame->read_data_size = 0;
- frame->n_missing_blocks = 0;
- frame->last_block_size = 0;
- frame->last_packet_id = 0;
-
- g_get_current_time (¤t_time);
- frame->leader_time_us = current_time.tv_sec * 1000000 + current_time.tv_usec;
-
if (thread_data->callback != NULL)
thread_data->callback (thread_data->user_data,
ARV_STREAM_CALLBACK_TYPE_START_BUFFER,
- frame->buffer);
+ NULL);
}
static void
@@ -266,11 +272,6 @@ _process_data_block (ArvGvStreamThreadData *thread_data,
frame->buffer->status != ARV_BUFFER_STATUS_FILLING)
return;
- if (frame->buffer->frame_id != arv_gvsp_packet_get_frame_id (packet)) {
- thread_data->n_late_blocks ++;
- return;
- }
-
packet_id = arv_gvsp_packet_get_packet_id (packet);
if (packet_id <= frame->last_packet_id) {
arv_debug ("stream-thread", "[GvStream::thread] Receive resent packet (%d) frame %d",
@@ -338,16 +339,78 @@ _process_data_trailer (ArvGvStreamThreadData *thread_data, ArvGvStreamFrameData
_close_buffer (thread_data, frame);
}
+static void
+_update_frame_data (ArvGvStreamThreadData *thread_data, ArvGvStreamFrameData *frame,
+ ArvGvspPacket *packet, size_t read_count)
+{
+ GTimeVal current_time;
+ guint frame_id;
+ guint n_packets;
+ int packet_id;
+ int i;
+
+ frame_id = arv_gvsp_packet_get_frame_id (packet);
+
+ if (frame->buffer != NULL && frame->frame_id != frame_id) {
+ frame->buffer->status = ARV_BUFFER_STATUS_ABORTED;
+ _close_buffer (thread_data, frame);
+ }
+
+ if (frame->buffer == NULL) {
+
+ frame->buffer = arv_stream_pop_input_buffer (thread_data->stream);
+ if (frame->buffer == NULL) {
+ thread_data->n_underruns++;
+ return;
+ }
+
+ frame->frame_id = frame_id;
+ frame->last_valid_packet = -1;
+ frame->buffer->status = ARV_BUFFER_STATUS_FILLING;
+ frame->read_data_size = 0;
+ frame->n_missing_blocks = 0;
+ frame->last_block_size = 0;
+ frame->last_packet_id = 0;
+
+ g_get_current_time (¤t_time);
+ frame->leader_time_us = current_time.tv_sec * 1000000 + current_time.tv_usec;
+
+ n_packets = (frame->buffer->size + thread_data->packet_size - 1) / thread_data->packet_size + 2;
+ if (frame->n_packets != n_packets) {
+ g_free (frame->packet_data);
+ frame->packet_data = g_new (ArvGvStreamPacketData, n_packets);
+ frame->n_packets = n_packets;
+
+ arv_debug ("stream", "[GvStream::_update_frame_data] n_packets = %d", frame->n_packets);
+ }
+
+ memset (frame->packet_data, 0, sizeof (ArvGvStreamPacketData) * frame->n_packets);
+ }
+
+ packet_id = arv_gvsp_packet_get_packet_id (packet);
+ if (packet_id < frame->n_packets) {
+ frame->packet_data[packet_id].received = TRUE;
+
+ for (i = frame->last_valid_packet; i < packet_id; i++)
+ if (!frame->packet_data[i + 1].received)
+ break;
+
+ frame->last_valid_packet = i;
+ }
+}
+
static void *
arv_gv_stream_thread (void *data)
{
ArvGvStreamThreadData *thread_data = data;
- ArvGvStreamFrameData frame;
+ ArvGvStreamFrameData frames[ARV_GV_STREAM_THREAD_N_FRAMES];
+ ArvGvStreamFrameData *frame;
ArvGvspPacket *packet;
- GTimeVal current_time;
+ guint frame_id;
GPollFD poll_fd;
size_t read_count;
int n_events;
+ int i;
if (thread_data->callback != NULL)
thread_data->callback (thread_data->user_data, ARV_STREAM_CALLBACK_TYPE_INIT, NULL);
@@ -358,14 +421,7 @@ arv_gv_stream_thread (void *data)
packet = g_malloc0 (ARV_GV_STREAM_INCOMING_BUFFER_SIZE);
- frame.buffer = NULL;
- frame.n_missing_blocks = 0;
- frame.last_block_size = 0;
- frame.last_packet_id = 0;
- frame.statistic_count = 0;
- g_get_current_time (¤t_time);
- frame.last_time_us = current_time.tv_sec * 1000000 + current_time.tv_usec;
- frame.last_timestamp_ns = 0;
+ memset (frames, 0, sizeof (ArvGvStreamFrameData) * ARV_GV_STREAM_THREAD_N_FRAMES);
do {
n_events = g_poll (&poll_fd, 1, 1000);
@@ -373,26 +429,33 @@ arv_gv_stream_thread (void *data)
if (n_events > 0) {
read_count = g_socket_receive (thread_data->socket, (char *) packet,
ARV_GV_STREAM_INCOMING_BUFFER_SIZE, NULL, NULL);
+ frame_id = arv_gvsp_packet_get_frame_id (packet);
+ frame = &frames[frame_id % ARV_GV_STREAM_THREAD_N_FRAMES];
+
+ _update_frame_data (thread_data, frame, packet, read_count);
switch (arv_gvsp_packet_get_packet_type (packet)) {
case ARV_GVSP_PACKET_TYPE_DATA_LEADER:
- _process_data_leader (thread_data, &frame, packet);
+ _process_data_leader (thread_data, frame, packet);
break;
case ARV_GVSP_PACKET_TYPE_DATA_BLOCK:
- _process_data_block (thread_data, &frame, packet, read_count);
+ _process_data_block (thread_data, frame, packet, read_count);
break;
case ARV_GVSP_PACKET_TYPE_DATA_TRAILER:
- _process_data_trailer (thread_data, &frame);
+ _process_data_trailer (thread_data, frame);
break;
}
}
} while (!thread_data->cancel);
- if (frame.buffer != NULL) {
- frame.buffer->status = ARV_BUFFER_STATUS_ABORTED;
- _close_buffer (thread_data, &frame);
+ for (i = 0; i < ARV_GV_STREAM_THREAD_N_FRAMES; i++) {
+ if (frames[i].buffer != NULL) {
+ frames[i].buffer->status = ARV_BUFFER_STATUS_ABORTED;
+ _close_buffer (thread_data, &frames[i]);
+ }
+ g_free (frames[i].packet_data);
}
if (thread_data->callback != NULL)
@@ -464,7 +527,6 @@ arv_gv_stream_new (GInetAddress *device_address, guint16 port,
thread_data->n_size_mismatch_errors = 0;
thread_data->n_missing_blocks = 0;
thread_data->n_resent_blocks = 0;
- thread_data->n_late_blocks = 0;
thread_data->n_aborteds = 0;
thread_data->statistic = arv_statistic_new (3, 5000, 200, 0);
@@ -489,8 +551,7 @@ arv_gv_stream_new (GInetAddress *device_address, guint16 port,
void
arv_gv_stream_get_statistics (ArvGvStream *gv_stream,
guint64 *n_resent_blocks,
- guint64 *n_missing_blocks,
- guint64 *n_late_blocks)
+ guint64 *n_missing_blocks)
{
ArvGvStreamThreadData *thread_data;
@@ -503,8 +564,6 @@ arv_gv_stream_get_statistics (ArvGvStream *gv_stream,
*n_resent_blocks = thread_data->n_resent_blocks;
if (n_missing_blocks != NULL)
*n_missing_blocks = thread_data->n_missing_blocks;
- if (n_late_blocks != NULL)
- *n_late_blocks = thread_data->n_late_blocks;
}
static void
@@ -603,8 +662,6 @@ arv_gv_stream_finalize (GObject *object)
"[GvStream::finalize] n_missing_blocks = %d", thread_data->n_missing_blocks);
arv_debug ("stream",
"[GvStream::finalize] n_resent_blocks = %d", thread_data->n_resent_blocks);
- arv_debug ("stream",
- "[GvStream::finalize] n_late_blocks = %d", thread_data->n_late_blocks);
thread_data->cancel = TRUE;
g_thread_join (gv_stream->thread);
diff --git a/src/arvgvstream.h b/src/arvgvstream.h
index f614fb2..77fba74 100644
--- a/src/arvgvstream.h
+++ b/src/arvgvstream.h
@@ -84,8 +84,7 @@ guint16 arv_gv_stream_get_port (ArvGvStream *gv_stream);
void arv_gv_stream_get_statistics (ArvGvStream *gv_stream,
guint64 *n_resent_blocks,
- guint64 *n_missing_blocks,
- guint64 *n_late_blocks);
+ guint64 *n_missing_blocks);
G_END_DECLS
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]