[aravis/timeout-resend] gv_stream: timeout based packet resend.
- From: Emmanuel Pacaud <emmanuel src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [aravis/timeout-resend] gv_stream: timeout based packet resend.
- Date: Thu, 7 Apr 2011 14:57:59 +0000 (UTC)
commit 318644d2d606118f32559f3ca743dabf9f9f32ba
Author: Emmanuel Pacaud <emmanuel gnome org>
Date: Thu Apr 7 16:57:38 2011 +0200
gv_stream: timeout based packet resend.
src/arvgvstream.c | 433 ++++++++++++++++++++++++++++++-----------------------
1 files changed, 244 insertions(+), 189 deletions(-)
---
diff --git a/src/arvgvstream.c b/src/arvgvstream.c
index e201dda..587f334 100644
--- a/src/arvgvstream.c
+++ b/src/arvgvstream.c
@@ -36,11 +36,9 @@
#include <sys/socket.h>
#define ARV_GV_STREAM_INCOMING_BUFFER_SIZE 65536
-#define ARV_GV_STREAM_THREAD_N_FRAMES 8
-#define ARV_GV_STREAM_THREAD_DISCARD_WINDOW 65536
-#define ARV_GV_STREAM_PACKET_TIMEOUT_MS 40
-#define ARV_GV_STREAM_FRAME_RETENTION_MS 200
+#define ARV_GV_STREAM_PACKET_TIMEOUT_US 40000
+#define ARV_GV_STREAM_FRAME_RETENTION_US 200000
enum {
ARV_GV_STREAM_PROPERTY_0,
@@ -55,7 +53,7 @@ static GObjectClass *parent_class = NULL;
typedef struct {
gboolean received;
- guint n_requests;
+ guint64 time_us;
} ArvGvStreamPacketData;
typedef struct {
@@ -88,6 +86,7 @@ typedef struct {
guint32 packet_count;
+ GSList *frames;
guint32 last_frame_id;
/* Statistics */
@@ -169,85 +168,16 @@ _update_socket (ArvGvStreamThreadData *thread_data, ArvBuffer *buffer)
}
static void
-_close_buffer (ArvGvStreamThreadData *thread_data, ArvGvStreamFrameData *frame)
-{
- GTimeVal current_time;
- gint64 current_time_us;
- int i;
- guint n_missing_packets = 0;
-
- if (frame->buffer == NULL)
- return;
-
- if (frame->buffer->status == ARV_BUFFER_STATUS_FILLING) {
- for (i = 0; i < frame->n_packets; i++)
- if (!frame->packet_data[i].received)
- n_missing_packets++;
-
- thread_data->n_missing_packets += n_missing_packets;
-
- if (n_missing_packets == 0)
- frame->buffer->status = ARV_BUFFER_STATUS_SUCCESS;
- else
- frame->buffer->status = ARV_BUFFER_STATUS_MISSING_PACKETS;
- }
-
- if (frame->buffer->status == ARV_BUFFER_STATUS_SUCCESS)
- thread_data->n_completed_buffers++;
- else
- thread_data->n_failures++;
-
- if (frame->buffer->status == ARV_BUFFER_STATUS_ABORTED)
- thread_data->n_aborteds++;
-
- if (frame->buffer->status == ARV_BUFFER_STATUS_TIMEOUT) {
- guint32 i;
-
- thread_data->n_timeouts++;
- for (i = 0; i < frame->n_packets; i++) {
- if (!frame->packet_data[i].received)
- arv_debug ("stream-thread", "[GvStream::_close_buffer] Missing packet %u for frame %u",
- i, frame->frame_id);
- }
- }
-
- if (frame->buffer->status == ARV_BUFFER_STATUS_SIZE_MISMATCH)
- thread_data->n_size_mismatch_errors++;
-
- if (thread_data->callback != NULL)
- thread_data->callback (thread_data->user_data,
- ARV_STREAM_CALLBACK_TYPE_BUFFER_DONE,
- frame->buffer);
-
- g_get_current_time (¤t_time);
- current_time_us = current_time.tv_sec * 1000000 + current_time.tv_usec;
- if (thread_data->statistic_count > 5) {
- arv_statistic_fill (thread_data->statistic, 1,
- current_time_us - frame->first_packet_timestamp_us,
- frame->buffer->frame_id);
- } else
- thread_data->statistic_count++;
-
- arv_stream_push_output_buffer (thread_data->stream, frame->buffer);
-
- frame->buffer = NULL;
- frame->frame_id = 0;
-}
-
-static void
_process_data_leader (ArvGvStreamThreadData *thread_data,
ArvGvStreamFrameData *frame,
- ArvGvspPacket *packet)
+ ArvGvspPacket *packet,
+ guint32 packet_id)
{
- guint32 packet_id;
-
- if (frame->buffer == NULL)
+ if (frame->buffer->status != ARV_BUFFER_STATUS_FILLING)
return;
- packet_id = arv_gvsp_packet_get_packet_id (packet);
if (packet_id != 0) {
frame->buffer->status = ARV_BUFFER_STATUS_WRONG_PACKET_ID;
- _close_buffer (thread_data, frame);
return;
}
@@ -261,36 +191,30 @@ _process_data_leader (ArvGvStreamThreadData *thread_data,
frame->buffer->timestamp_ns = arv_gvsp_packet_get_timestamp (packet,
thread_data->timestamp_tick_frequency);
- if (frame->packet_data[packet_id].n_requests > 0) {
+ if (frame->packet_data[packet_id].time_us > 0) {
thread_data->n_resent_packets++;
arv_debug ("stream-thread", "[GvStream::_process_data_leader] Received resent packet %u for frame %u",
packet_id, frame->frame_id);
}
-
- if (frame->last_valid_packet == frame->n_packets - 1)
- _close_buffer (thread_data, frame);
}
static void
_process_data_block (ArvGvStreamThreadData *thread_data,
ArvGvStreamFrameData *frame,
ArvGvspPacket *packet,
+ guint32 packet_id,
size_t read_count)
{
size_t block_size;
ptrdiff_t block_offset;
ptrdiff_t block_end;
- guint32 packet_id;
- if (frame->buffer == NULL ||
- frame->buffer->status != ARV_BUFFER_STATUS_FILLING)
+ if (frame->buffer->status != ARV_BUFFER_STATUS_FILLING)
return;
- packet_id = arv_gvsp_packet_get_packet_id (packet);
if (packet_id > frame->n_packets - 2) {
arv_gvsp_packet_debug (packet, read_count);
frame->buffer->status = ARV_BUFFER_STATUS_WRONG_PACKET_ID;
- _close_buffer (thread_data, frame);
return;
}
@@ -301,140 +225,130 @@ _process_data_block (ArvGvStreamThreadData *thread_data,
if (block_end > frame->buffer->size) {
arv_gvsp_packet_debug (packet, read_count);
frame->buffer->status = ARV_BUFFER_STATUS_SIZE_MISMATCH;
- _close_buffer (thread_data, frame);
return;
}
memcpy (frame->buffer->data + block_offset, &packet->data, block_size);
- if (frame->packet_data[packet_id].n_requests > 0) {
+ if (frame->packet_data[packet_id].time_us > 0) {
thread_data->n_resent_packets++;
- arv_debug ("stream-thread", "[GvStream::_process_data_leader] Received resent packet %u for frame %u",
+ arv_debug ("stream-thread", "[GvStream::_process_data_block] Received resent packet %u for frame %u",
packet_id, frame->frame_id);
}
-
- if (frame->last_valid_packet == frame->n_packets - 1)
- _close_buffer (thread_data, frame);
}
static void
_process_data_trailer (ArvGvStreamThreadData *thread_data,
ArvGvStreamFrameData *frame,
- ArvGvspPacket *packet)
+ ArvGvspPacket *packet,
+ guint32 packet_id)
{
- guint32 packet_id;
-
- if (frame->buffer == NULL)
+ if (frame->buffer->status != ARV_BUFFER_STATUS_FILLING)
return;
- packet_id = arv_gvsp_packet_get_packet_id (packet);
if (packet_id != frame->n_packets - 1) {
frame->buffer->status = ARV_BUFFER_STATUS_WRONG_PACKET_ID;
- _close_buffer (thread_data, frame);
return;
}
- if (frame->packet_data[packet_id].n_requests > 0) {
+ if (frame->packet_data[packet_id].time_us > 0) {
thread_data->n_resent_packets++;
- arv_debug ("stream-thread", "[GvStream::_process_data_leader] Received resent packet %u for frame %u",
+ arv_debug ("stream-thread", "[GvStream::_process_data_trailer] Received resent packet %u for frame %u",
packet_id, frame->frame_id);
}
-
- if (frame->last_valid_packet == frame->n_packets - 1)
- _close_buffer (thread_data, frame);
}
-static gboolean
-_update_frame_data (ArvGvStreamThreadData *thread_data, ArvGvStreamFrameData *frame,
- ArvGvspPacket *packet, size_t read_count, guint64 timestamp_us)
+static ArvGvStreamFrameData *
+_find_frame_data (ArvGvStreamThreadData *thread_data,
+ guint32 frame_id,
+ ArvGvspPacket *packet,
+ guint32 packet_id,
+ size_t read_count,
+ guint64 timestamp_us)
{
- guint32 frame_id;
- guint n_packets;
- int packet_id;
- int i;
+ ArvGvStreamFrameData *frame = NULL;
+ ArvBuffer *buffer;
+ GSList *iter;
+ guint n_packets = 0;
+
+ for (iter = thread_data->frames; iter != NULL; iter = iter->next) {
+ frame = iter->data;
+ if (frame->frame_id == frame_id) {
+ frame->last_packet_timestamp_us = timestamp_us;
- frame_id = arv_gvsp_packet_get_frame_id (packet);
- packet_id = arv_gvsp_packet_get_packet_id (packet);
-
- if (frame->buffer != NULL &&
- frame->frame_id != frame_id) {
- /* discard old frames */
- if (frame_id - thread_data->last_frame_id > - ARV_GV_STREAM_THREAD_DISCARD_WINDOW) {
- arv_debug ("stream", "[GvStream::_update_frame_data] Discard late frame %u"
- " - packet %u (last frame is %u)",
- frame_id, packet_id, thread_data->last_frame_id);
- thread_data->n_late_packets++;
- return FALSE;
+ return frame;
}
+ }
- frame->buffer->status = ARV_BUFFER_STATUS_TIMEOUT;
- _close_buffer (thread_data, frame);
+ buffer = arv_stream_pop_input_buffer (thread_data->stream);
+ if (buffer == NULL) {
+ thread_data->n_underruns++;
+
+ return NULL;
}
- if (frame->buffer == NULL) {
- gint32 frame_id_inc;
+ frame = g_new0 (ArvGvStreamFrameData, 1);
- frame->buffer = arv_stream_pop_input_buffer (thread_data->stream);
- if (frame->buffer == NULL) {
- thread_data->n_underruns++;
- return FALSE;
- }
+ frame->last_packet_timestamp_us = timestamp_us;
+ frame->frame_id = frame_id;
+ frame->last_valid_packet = -1;
+
+ frame->buffer = buffer;
+ _update_socket (thread_data, frame->buffer);
+ frame->buffer->status = ARV_BUFFER_STATUS_FILLING;
+ n_packets = (frame->buffer->size + thread_data->data_size - 1) / thread_data->data_size + 2;
- _update_socket (thread_data, frame->buffer);
+ frame->first_packet_timestamp_us = timestamp_us;
- frame->frame_id = frame_id;
- frame->last_valid_packet = -1;
- frame->buffer->status = ARV_BUFFER_STATUS_FILLING;
+ frame->packet_data = g_new0 (ArvGvStreamPacketData, n_packets);
+ frame->n_packets = n_packets;
+
+ if (thread_data->callback != NULL &&
+ frame->buffer != NULL)
+ thread_data->callback (thread_data->user_data,
+ ARV_STREAM_CALLBACK_TYPE_START_BUFFER,
+ NULL);
+
+ {
+ gint32 frame_id_inc;
frame_id_inc = (gint )frame_id - (gint) thread_data->last_frame_id;
if (frame_id_inc > 0) {
thread_data->last_frame_id = frame_id;
if (frame_id_inc != 1) {
thread_data->n_missing_frames++;
- arv_debug ("stream", "[GvStream::_update_frame_data] Missed %d frame(s) before %u",
+ arv_debug ("stream", "[GvStream::_find_frame_data]"
+ " Missed %d frame(s) before %u",
frame_id_inc - 1, frame_id);
}
}
+ }
- frame->first_packet_timestamp_us = timestamp_us;
-
- n_packets = (frame->buffer->size + thread_data->data_size - 1) / thread_data->data_size + 2;
- if (frame->n_packets != n_packets) {
- g_free (frame->packet_data);
- frame->packet_data = g_new (ArvGvStreamPacketData, n_packets);
- frame->n_packets = n_packets;
-
- arv_debug ("stream", "[GvStream::_update_frame_data] n_packets = %d", frame->n_packets);
- }
-
- memset (frame->packet_data, 0, sizeof (ArvGvStreamPacketData) * frame->n_packets);
+ thread_data->frames = g_slist_append (thread_data->frames, frame);
- if (thread_data->callback != NULL)
- thread_data->callback (thread_data->user_data,
- ARV_STREAM_CALLBACK_TYPE_START_BUFFER,
- NULL);
- }
+ arv_debug ("stream", "[GvStream::_find_frame_data] Start frame %u", frame_id);
- frame->last_packet_timestamp_us = timestamp_us;
+ return frame;
+}
- if (packet_id < frame->n_packets) {
- if (frame->packet_data[packet_id].received)
- thread_data->n_duplicated_packets++;
- else
- frame->packet_data[packet_id].received = TRUE;
- }
+static void
+_missing_packet_check (ArvGvStreamThreadData *thread_data,
+ ArvGvStreamFrameData *frame,
+ guint32 packet_id,
+ guint64 time_us)
+{
+ int i;
- for (i = frame->last_valid_packet + 1; i < frame->n_packets; i++)
- if (!frame->packet_data[i].received)
- break;
- frame->last_valid_packet = i - 1;
+ if (thread_data->packet_resend != ARV_GV_STREAM_PACKET_RESEND_ALWAYS)
+ return;
if (packet_id < frame->n_packets) {
int first_missing = -1;
- for (i = frame->last_valid_packet + 1; i < packet_id; i++) {
+ for (i = frame->last_valid_packet + 1; i <= packet_id; i++) {
if (!frame->packet_data[i].received &&
- frame->packet_data[i].n_requests == 0) {
+ (frame->packet_data[i].time_us == 0 ||
+ (time_us - frame->packet_data[i].time_us > ARV_GV_STREAM_PACKET_TIMEOUT_US))) {
if (first_missing < 0)
first_missing = i;
} else
@@ -444,7 +358,7 @@ _update_frame_data (ArvGvStreamThreadData *thread_data, ArvGvStreamFrameData *fr
_send_packet_request (thread_data, frame->frame_id,
first_missing, i - 1);
for (j = first_missing; j < i; j++)
- frame->packet_data[j].n_requests = 1;
+ frame->packet_data[j].time_us = time_us;
thread_data->n_resend_requests += (i - first_missing);
first_missing = -1;
@@ -457,29 +371,158 @@ _update_frame_data (ArvGvStreamThreadData *thread_data, ArvGvStreamFrameData *fr
_send_packet_request (thread_data, frame->frame_id,
first_missing, i - 1);
for (j = first_missing; j < i; j++)
- frame->packet_data[j].n_requests = 1;
+ frame->packet_data[j].time_us = time_us;
thread_data->n_resend_requests += (i - first_missing);
}
}
+}
+
+static void
+_close_frame (ArvGvStreamThreadData *thread_data, ArvGvStreamFrameData *frame)
+{
+ GTimeVal current_time;
+ gint64 current_time_us;
+ int i;
+ guint n_missing_packets = 0;
+
+ if (frame->buffer->status == ARV_BUFFER_STATUS_FILLING) {
+ for (i = 0; i < frame->n_packets; i++)
+ if (!frame->packet_data[i].received)
+ n_missing_packets++;
+
+ thread_data->n_missing_packets += n_missing_packets;
+
+ if (n_missing_packets == 0)
+ frame->buffer->status = ARV_BUFFER_STATUS_SUCCESS;
+ else
+ frame->buffer->status = ARV_BUFFER_STATUS_MISSING_PACKETS;
+ }
+
+ if (frame->buffer->status == ARV_BUFFER_STATUS_SUCCESS)
+ thread_data->n_completed_buffers++;
+ else
+ thread_data->n_failures++;
+
+ if (frame->buffer->status == ARV_BUFFER_STATUS_ABORTED)
+ thread_data->n_aborteds++;
+
+ if (frame->buffer->status == ARV_BUFFER_STATUS_TIMEOUT) {
+ guint32 i;
+
+ thread_data->n_timeouts++;
+ for (i = 0; i < frame->n_packets; i++) {
+ if (!frame->packet_data[i].received)
+ arv_debug ("stream-thread",
+ "[GvStream::_close_frame] Missing packet %u for frame %u",
+ i, frame->frame_id);
+ }
+ }
- return TRUE;
+ if (frame->buffer->status == ARV_BUFFER_STATUS_SIZE_MISMATCH)
+ thread_data->n_size_mismatch_errors++;
+
+ if (thread_data->callback != NULL)
+ thread_data->callback (thread_data->user_data,
+ ARV_STREAM_CALLBACK_TYPE_BUFFER_DONE,
+ frame->buffer);
+
+ g_get_current_time (¤t_time);
+ current_time_us = current_time.tv_sec * 1000000 + current_time.tv_usec;
+ if (thread_data->statistic_count > 5) {
+ arv_statistic_fill (thread_data->statistic, 1,
+ current_time_us - frame->first_packet_timestamp_us,
+ frame->buffer->frame_id);
+ } else
+ thread_data->statistic_count++;
+
+ arv_stream_push_output_buffer (thread_data->stream, frame->buffer);
+
+ arv_debug ("stream", "[GvStream::_close_frame] Close frame %u", frame->frame_id);
+
+ frame->buffer = NULL;
+ frame->frame_id = 0;
+
+ g_free (frame->packet_data);
+ g_free (frame);
+}
+
+static void
+_packet_timeout_check (ArvGvStreamThreadData *thread_data,
+ guint64 time_us)
+{
+ GSList *iter;
+ ArvGvStreamFrameData *frame;
+ gboolean can_close_frame = TRUE;
+
+ for (iter = thread_data->frames; iter != NULL;) {
+ frame = iter->data;
+
+ if (can_close_frame &&
+ frame->last_valid_packet == frame->n_packets - 1) {
+ arv_debug ("stream", "[GvStream::_packet_timeout_check] Completed frame %u", frame->frame_id);
+ _close_frame (thread_data, frame);
+ thread_data->frames = iter->next;
+ g_slist_free_1 (iter);
+ iter = thread_data->frames;
+ continue;
+ }
+
+ if (can_close_frame &&
+ time_us - frame->last_packet_timestamp_us > ARV_GV_STREAM_FRAME_RETENTION_US) {
+ frame->buffer->status = ARV_BUFFER_STATUS_TIMEOUT;
+ arv_debug ("stream", "[GvStream::_packet_timeout_check] Timeout for frame %u", frame->frame_id);
+ _close_frame (thread_data, frame);
+ thread_data->frames = iter->next;
+ g_slist_free_1 (iter);
+ iter = thread_data->frames;
+ continue;
+ }
+
+ can_close_frame = FALSE;
+
+ if (time_us - frame->last_packet_timestamp_us > ARV_GV_STREAM_PACKET_TIMEOUT_US) {
+ _missing_packet_check (thread_data, frame, frame->n_packets - 1, time_us);
+ iter = iter->next;
+ continue;
+ }
+
+ iter = iter->next;
+ }
+}
+
+static void
+_flush_frames (ArvGvStreamThreadData *thread_data)
+{
+ GSList *iter;
+ ArvGvStreamFrameData *frame;
+
+ for (iter = thread_data->frames; iter != NULL; iter = iter->next) {
+ frame = iter->data;
+ frame->buffer->status = ARV_BUFFER_STATUS_TIMEOUT;
+ _close_frame (thread_data, frame);
+ }
+
+ g_slist_free (thread_data->frames);
+ thread_data->frames = NULL;
}
static void *
arv_gv_stream_thread (void *data)
{
ArvGvStreamThreadData *thread_data = data;
- ArvGvStreamFrameData frames[ARV_GV_STREAM_THREAD_N_FRAMES];
ArvGvStreamFrameData *frame;
ArvGvspPacket *packet;
- GTimeVal current_time;
- guint64 timestamp_us;
+ guint32 packet_id;
guint32 frame_id;
+ GTimeVal current_time;
+ guint64 time_us;
GPollFD poll_fd;
size_t read_count;
int n_events;
int i;
+ thread_data->frames = NULL;
+
if (thread_data->callback != NULL)
thread_data->callback (thread_data->user_data, ARV_STREAM_CALLBACK_TYPE_INIT, NULL);
@@ -489,43 +532,55 @@ arv_gv_stream_thread (void *data)
packet = g_malloc0 (ARV_GV_STREAM_INCOMING_BUFFER_SIZE);
- memset (frames, 0, sizeof (ArvGvStreamFrameData) * ARV_GV_STREAM_THREAD_N_FRAMES);
-
do {
- n_events = g_poll (&poll_fd, 1, ARV_GV_STREAM_PACKET_TIMEOUT_MS);
+ n_events = g_poll (&poll_fd, 1, ARV_GV_STREAM_PACKET_TIMEOUT_US / 1000);
+
+ g_get_current_time (¤t_time);
+ time_us = current_time.tv_sec * 1000000 + current_time.tv_usec;
if (n_events > 0) {
- g_get_current_time (¤t_time);
- timestamp_us = current_time.tv_sec * 1000000 + current_time.tv_usec;
read_count = g_socket_receive (thread_data->socket, (char *) packet,
ARV_GV_STREAM_INCOMING_BUFFER_SIZE, NULL, NULL);
frame_id = arv_gvsp_packet_get_frame_id (packet);
- frame = &frames[frame_id % ARV_GV_STREAM_THREAD_N_FRAMES];
+ packet_id = arv_gvsp_packet_get_packet_id (packet);
+
+ frame = _find_frame_data (thread_data, frame_id, packet, packet_id, read_count, time_us);
+
+ if (frame != NULL) {
+
+ if (packet_id < frame->n_packets) {
+ if (frame->packet_data[packet_id].received)
+ thread_data->n_duplicated_packets++;
+ else
+ frame->packet_data[packet_id].received = TRUE;
+ }
+
+ for (i = frame->last_valid_packet + 1; i < frame->n_packets; i++)
+ if (!frame->packet_data[i].received)
+ break;
+ frame->last_valid_packet = i - 1;
- if (_update_frame_data (thread_data, frame, packet, read_count, timestamp_us))
switch (arv_gvsp_packet_get_packet_type (packet)) {
case ARV_GVSP_PACKET_TYPE_DATA_LEADER:
- _process_data_leader (thread_data, frame, packet);
+ _process_data_leader (thread_data, frame, packet, packet_id);
break;
case ARV_GVSP_PACKET_TYPE_DATA_BLOCK:
- _process_data_block (thread_data, frame, packet, read_count);
+ _process_data_block (thread_data, frame, packet, packet_id, read_count);
break;
case ARV_GVSP_PACKET_TYPE_DATA_TRAILER:
- _process_data_trailer (thread_data, frame, packet);
+ _process_data_trailer (thread_data, frame, packet, packet_id);
break;
}
+ }
+ _missing_packet_check (thread_data, frame, packet_id, time_us);
}
+
+ _packet_timeout_check (thread_data, time_us);
} while (!thread_data->cancel);
- for (i = 0; i < ARV_GV_STREAM_THREAD_N_FRAMES; i++) {
- if (frames[i].buffer != NULL) {
- frames[i].buffer->status = ARV_BUFFER_STATUS_ABORTED;
- _close_buffer (thread_data, &frames[i]);
- }
- g_free (frames[i].packet_data);
- }
+ _flush_frames (thread_data);
if (thread_data->callback != NULL)
thread_data->callback (thread_data->user_data, ARV_STREAM_CALLBACK_TYPE_EXIT, NULL);
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]