[aravis/timeout-resend] WIP.
- From: Emmanuel Pacaud <emmanuel src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [aravis/timeout-resend] WIP.
- Date: Tue, 1 Mar 2011 09:36:03 +0000 (UTC)
commit a4df7582ee89cd3eb3fb18464ef1e0e14b5582d9
Author: Emmanuel Pacaud <emmanuel gnome org>
Date: Tue Feb 22 21:43:06 2011 +0100
WIP.
src/arvbuffer.h | 7 ++-
src/arvgvstream.c | 174 ++++++++++++++++++++++-------------------------------
src/arvgvstream.h | 4 +-
3 files changed, 79 insertions(+), 106 deletions(-)
---
diff --git a/src/arvbuffer.h b/src/arvbuffer.h
index 1e616c6..1d168da 100644
--- a/src/arvbuffer.h
+++ b/src/arvbuffer.h
@@ -33,7 +33,8 @@ typedef void (*ArvFrameCallback) (ArvBuffer *buffer);
* ArvBufferStatus:
* @ARV_BUFFER_STATUS_SUCCESS: the buffer contains a valid image
* @ARV_BUFFER_STATUS_CLEARED: the buffer is cleared
- * @ARV_BUFFER_STATUS_MISSING_BLOCKS: image has missing blocks
+ * @ARV_BUFFER_STATUS_MISSING_PACKETS: stream has missing packets
+ * @ARV_BUFFER_STATUS_WRONG_PACKET_ID: stream has packet with wrong id
* @ARV_BUFFER_STATUS_SIZE_MISMATCH: the received image didn't fit in the buffer data space
* @ARV_BUFFER_STATUS_FILLING: the image is currently being filled
* @ARV_BUFFER_STATUS_ABORTED: the filling was aborted before completion
@@ -42,7 +43,9 @@ typedef void (*ArvFrameCallback) (ArvBuffer *buffer);
typedef enum {
ARV_BUFFER_STATUS_SUCCESS,
ARV_BUFFER_STATUS_CLEARED,
- ARV_BUFFER_STATUS_MISSING_BLOCKS,
+ ARV_BUFFER_STATUS_TIMEOUT,
+ ARV_BUFFER_STATUS_MISSING_PACKETS,
+ ARV_BUFFER_STATUS_WRONG_PACKET_ID,
ARV_BUFFER_STATUS_SIZE_MISMATCH,
ARV_BUFFER_STATUS_FILLING,
ARV_BUFFER_STATUS_ABORTED
diff --git a/src/arvgvstream.c b/src/arvgvstream.c
index 4ea8031..66d908a 100644
--- a/src/arvgvstream.c
+++ b/src/arvgvstream.c
@@ -63,13 +63,6 @@ typedef struct {
guint n_packets;
ArvGvStreamPacketData *packet_data;
- size_t read_data_size;
- gint32 n_missing_blocks;
- gint32 last_block_size;
- gint32 last_packet_id;
- gint64 last_time_us;
- guint64 last_timestamp_ns;
- guint32 statistic_count;
gint64 leader_time_us;
} ArvGvStreamFrameData;
@@ -85,7 +78,7 @@ typedef struct {
ArvGvStreamPacketResend packet_resend;
guint64 timestamp_tick_frequency;
- guint packet_size;
+ guint data_size;
gboolean cancel;
@@ -95,14 +88,17 @@ typedef struct {
guint n_completed_buffers;
guint n_failures;
+ guint n_timeouts;
guint n_underruns;
guint n_aborteds;
guint n_size_mismatch_errors;
- guint n_resent_blocks;
- guint n_missing_blocks;
+ guint n_resend_requests;
+ guint n_resent_packets;
+ guint n_missing_packets;
ArvStatistic *statistic;
+ guint32 statistic_count;
ArvGvStreamSocketBuffer socket_buffer_option;
int socket_buffer_size;
@@ -178,17 +174,16 @@ _close_buffer (ArvGvStreamThreadData *thread_data, ArvGvStreamFrameData *frame)
if (!frame->packet_data[i].received)
n_missing_packets++;
- if (n_missing_packets != 0)
- arv_debug ("stream", "[GvStream::_close_buffer] n_missing_packets = %d", n_missing_packets);
+ thread_data->n_missing_packets += n_missing_packets;
- arv_debug ("stream", "[GvStream::_close_buffer] last_valid_packet = %d", frame->last_valid_packet);
+/* if (n_missing_packets != 0)*/
+/* arv_debug ("stream", "[GvStream::_close_buffer] n_missing_packets = %d", n_missing_packets);*/
if (frame->buffer->status == ARV_BUFFER_STATUS_FILLING) {
- if (frame->read_data_size == frame->buffer->size &&
- frame->n_missing_blocks == 0)
+ if (n_missing_packets == 0)
frame->buffer->status = ARV_BUFFER_STATUS_SUCCESS;
else
- frame->buffer->status = ARV_BUFFER_STATUS_MISSING_BLOCKS;
+ frame->buffer->status = ARV_BUFFER_STATUS_MISSING_PACKETS;
}
if (frame->buffer->status == ARV_BUFFER_STATUS_SUCCESS)
@@ -199,6 +194,9 @@ _close_buffer (ArvGvStreamThreadData *thread_data, ArvGvStreamFrameData *frame)
if (frame->buffer->status == ARV_BUFFER_STATUS_ABORTED)
thread_data->n_aborteds++;
+ if (frame->buffer->status == ARV_BUFFER_STATUS_TIMEOUT)
+ thread_data->n_timeouts++;
+
if (frame->buffer->status == ARV_BUFFER_STATUS_SIZE_MISMATCH)
thread_data->n_size_mismatch_errors++;
@@ -209,21 +207,12 @@ _close_buffer (ArvGvStreamThreadData *thread_data, ArvGvStreamFrameData *frame)
g_get_current_time (¤t_time);
current_time_us = current_time.tv_sec * 1000000 + current_time.tv_usec;
- if (frame->statistic_count > 5) {
- arv_statistic_fill (thread_data->statistic, 0,
- (frame->buffer->timestamp_ns - frame->last_timestamp_ns) /
- 1000, frame->buffer->frame_id);
+ if (thread_data->statistic_count > 5) {
arv_statistic_fill (thread_data->statistic, 1,
- current_time_us - frame->last_time_us,
- frame->buffer->frame_id);
- arv_statistic_fill (thread_data->statistic, 2,
current_time_us - frame->leader_time_us,
frame->buffer->frame_id);
} else
- frame->statistic_count++;
-
- frame->last_time_us = current_time_us;
- frame->last_timestamp_ns = frame->buffer->timestamp_ns;
+ thread_data->statistic_count++;
arv_stream_push_output_buffer (thread_data->stream, frame->buffer);
@@ -250,11 +239,6 @@ _process_data_leader (ArvGvStreamThreadData *thread_data,
frame->buffer->timestamp_ns = arv_gvsp_packet_get_timestamp (packet,
thread_data->timestamp_tick_frequency);
-
- if (thread_data->callback != NULL)
- thread_data->callback (thread_data->user_data,
- ARV_STREAM_CALLBACK_TYPE_START_BUFFER,
- NULL);
}
static void
@@ -273,58 +257,29 @@ _process_data_block (ArvGvStreamThreadData *thread_data,
return;
packet_id = arv_gvsp_packet_get_packet_id (packet);
- if (packet_id <= frame->last_packet_id) {
- arv_debug ("stream-thread", "[GvStream::thread] Receive resent packet (%d) frame %d",
- packet_id, frame->buffer->frame_id);
-
- block_size = arv_gvsp_packet_get_data_size (read_count);
- block_offset = frame->last_block_size * (packet_id - 1);
- block_end = block_size + block_offset;
-
- if (block_end > frame->buffer->size) {
- arv_gvsp_packet_debug (packet, read_count);
- frame->buffer->status = ARV_BUFFER_STATUS_SIZE_MISMATCH;
- return;
- }
- memcpy (frame->buffer->data + block_offset, &packet->data, block_size);
-
- frame->read_data_size += block_size;
- frame->n_missing_blocks--;
- thread_data->n_resent_blocks++;
- return;
- }
-
- if (packet_id != (frame->last_packet_id + 1)) {
- gint32 n_misses;
-
- n_misses = packet_id - frame->last_packet_id - 1;
-
+ if (packet_id > frame->n_packets - 2) {
arv_gvsp_packet_debug (packet, read_count);
- arv_debug ("stream-thread", "[GvStream::thread] Missing block (expected %d - %d) frame %d",
- frame->last_packet_id + 1,
- packet_id, frame->buffer->frame_id);
- thread_data->n_missing_blocks += n_misses;
- frame->n_missing_blocks += n_misses;
-
- if (thread_data->packet_resend != ARV_GV_STREAM_PACKET_RESEND_NEVER)
- _send_packet_request (thread_data, frame->buffer->frame_id,
- frame->last_packet_id + 1, packet_id - 1);
+ frame->buffer->status = ARV_BUFFER_STATUS_WRONG_PACKET_ID;
+ return;
}
block_size = arv_gvsp_packet_get_data_size (read_count);
- block_offset = frame->last_block_size * (packet_id - 1);
+ block_offset = (packet_id - 1) * thread_data->data_size;
block_end = block_size + block_offset;
- if (block_end > frame->buffer->size) {
+ if (block_end > frame->buffer->size) {
arv_gvsp_packet_debug (packet, read_count);
frame->buffer->status = ARV_BUFFER_STATUS_SIZE_MISMATCH;
return;
}
+
memcpy (frame->buffer->data + block_offset, &packet->data, block_size);
- frame->read_data_size += block_size;
- frame->last_block_size = block_size;
- frame->last_packet_id = packet_id;
+ if (frame->packet_data->n_requests > 0)
+ thread_data->n_resent_packets++;
+
+ if (frame->last_valid_packet == frame->n_packets - 1)
+ _close_buffer (thread_data, frame);
}
static void
@@ -333,10 +288,8 @@ _process_data_trailer (ArvGvStreamThreadData *thread_data, ArvGvStreamFrameData
if (frame->buffer == NULL)
return;
- if (frame->read_data_size < frame->buffer->size)
- return;
-
- _close_buffer (thread_data, frame);
+ if (frame->last_valid_packet == frame->n_packets - 1)
+ _close_buffer (thread_data, frame);
}
static void
@@ -352,7 +305,7 @@ _update_frame_data (ArvGvStreamThreadData *thread_data, ArvGvStreamFrameData *fr
frame_id = arv_gvsp_packet_get_frame_id (packet);
if (frame->buffer != NULL && frame->frame_id != frame_id) {
- frame->buffer->status = ARV_BUFFER_STATUS_ABORTED;
+ frame->buffer->status = ARV_BUFFER_STATUS_TIMEOUT;
_close_buffer (thread_data, frame);
}
@@ -367,15 +320,11 @@ _update_frame_data (ArvGvStreamThreadData *thread_data, ArvGvStreamFrameData *fr
frame->frame_id = frame_id;
frame->last_valid_packet = -1;
frame->buffer->status = ARV_BUFFER_STATUS_FILLING;
- frame->read_data_size = 0;
- frame->n_missing_blocks = 0;
- frame->last_block_size = 0;
- frame->last_packet_id = 0;
g_get_current_time (¤t_time);
frame->leader_time_us = current_time.tv_sec * 1000000 + current_time.tv_usec;
- n_packets = (frame->buffer->size + thread_data->packet_size - 1) / thread_data->packet_size + 2;
+ n_packets = (frame->buffer->size + thread_data->data_size - 1) / thread_data->data_size + 2;
if (frame->n_packets != n_packets) {
g_free (frame->packet_data);
frame->packet_data = g_new (ArvGvStreamPacketData, n_packets);
@@ -385,17 +334,32 @@ _update_frame_data (ArvGvStreamThreadData *thread_data, ArvGvStreamFrameData *fr
}
memset (frame->packet_data, 0, sizeof (ArvGvStreamPacketData) * frame->n_packets);
+
+ if (thread_data->callback != NULL)
+ thread_data->callback (thread_data->user_data,
+ ARV_STREAM_CALLBACK_TYPE_START_BUFFER,
+ NULL);
}
packet_id = arv_gvsp_packet_get_packet_id (packet);
- if (packet_id < frame->n_packets) {
+ if (packet_id < frame->n_packets)
frame->packet_data[packet_id].received = TRUE;
- for (i = frame->last_valid_packet; i < packet_id; i++)
- if (!frame->packet_data[i + 1].received)
- break;
+ for (i = frame->last_valid_packet + 1; i < frame->n_packets; i++)
+ if (!frame->packet_data[i].received)
+ break;
+ frame->last_valid_packet = i - 1;
- frame->last_valid_packet = i;
+ if (packet_id < frame->n_packets) {
+ for (i = frame->last_valid_packet + 1; i < packet_id; i++) {
+ if (!frame->packet_data[i].received &&
+ frame->packet_data[i].n_requests == 0) {
+ _send_packet_request (thread_data, frame->frame_id,
+ i, i);
+ frame->packet_data[i].n_requests = 1;
+ thread_data->n_resend_requests++;
+ }
+ }
}
}
@@ -492,7 +456,7 @@ arv_gv_stream_new (GInetAddress *device_address, guint16 port,
ArvGvStreamThreadData *thread_data;
g_return_val_if_fail (G_IS_INET_ADDRESS (device_address), NULL);
- g_return_val_if_fail (packet_size > 0, NULL);
+ g_return_val_if_fail (packet_size > (20 + 8 + 8) /* FIXME IP + UDP + GVSP headers */, NULL);
gv_stream = g_object_new (ARV_TYPE_GV_STREAM, NULL);
@@ -516,7 +480,7 @@ arv_gv_stream_new (GInetAddress *device_address, guint16 port,
thread_data->device_address = g_inet_socket_address_new (device_address, ARV_GVCP_PORT);
thread_data->packet_resend = ARV_GV_STREAM_PACKET_RESEND_ALWAYS;
thread_data->timestamp_tick_frequency = timestamp_tick_frequency;
- thread_data->packet_size = packet_size;
+ thread_data->data_size = packet_size - (20 + 8 + 8) /* FIXME IP + UDP + GVSP headers */;
thread_data->cancel = FALSE;
thread_data->packet_count = 1;
@@ -525,15 +489,17 @@ arv_gv_stream_new (GInetAddress *device_address, guint16 port,
thread_data->n_failures = 0;
thread_data->n_underruns = 0;
thread_data->n_size_mismatch_errors = 0;
- thread_data->n_missing_blocks = 0;
- thread_data->n_resent_blocks = 0;
+ thread_data->n_missing_packets = 0;
+ thread_data->n_resent_packets = 0;
+ thread_data->n_resend_requests = 0;
thread_data->n_aborteds = 0;
+ thread_data->n_timeouts = 0;
- thread_data->statistic = arv_statistic_new (3, 5000, 200, 0);
+ thread_data->statistic = arv_statistic_new (2, 5000, 200, 0);
+ thread_data->statistic_count = 0;
arv_statistic_set_name (thread_data->statistic, 0, "Timestamp delta");
- arv_statistic_set_name (thread_data->statistic, 1, "Local time delta");
- arv_statistic_set_name (thread_data->statistic, 2, "Buffer reception time");
+ arv_statistic_set_name (thread_data->statistic, 1, "Buffer reception time");
thread_data->socket_buffer_option = ARV_GV_STREAM_SOCKET_BUFFER_FIXED;
thread_data->socket_buffer_size = 0;
@@ -550,8 +516,8 @@ arv_gv_stream_new (GInetAddress *device_address, guint16 port,
void
arv_gv_stream_get_statistics (ArvGvStream *gv_stream,
- guint64 *n_resent_blocks,
- guint64 *n_missing_blocks)
+ guint64 *n_resent_packets,
+ guint64 *n_missing_packets)
{
ArvGvStreamThreadData *thread_data;
@@ -560,10 +526,10 @@ arv_gv_stream_get_statistics (ArvGvStream *gv_stream,
thread_data = gv_stream->thread_data;
- if (n_resent_blocks != NULL)
- *n_resent_blocks = thread_data->n_resent_blocks;
- if (n_missing_blocks != NULL)
- *n_missing_blocks = thread_data->n_missing_blocks;
+ if (n_resent_packets != NULL)
+ *n_resent_packets = thread_data->n_resent_packets;
+ if (n_missing_packets != NULL)
+ *n_missing_packets = thread_data->n_missing_packets;
}
static void
@@ -653,15 +619,19 @@ arv_gv_stream_finalize (GObject *object)
arv_debug ("stream",
"[GvStream::finalize] n_failures = %d", thread_data->n_failures);
arv_debug ("stream",
+ "[GvStream::finalize] n_timeouts = %d", thread_data->n_timeouts);
+ arv_debug ("stream",
"[GvStream::finalize] n_aborteds = %d", thread_data->n_aborteds);
arv_debug ("stream",
"[GvStream::finalize] n_underruns = %d", thread_data->n_underruns);
arv_debug ("stream",
"[GvStream::finalize] n_size_mismatch_errors = %d", thread_data->n_size_mismatch_errors);
arv_debug ("stream",
- "[GvStream::finalize] n_missing_blocks = %d", thread_data->n_missing_blocks);
+ "[GvStream::finalize] n_missing_packets = %d", thread_data->n_missing_packets);
+ arv_debug ("stream",
+ "[GvStream::finalize] n_resend_requests = %d", thread_data->n_resend_requests);
arv_debug ("stream",
- "[GvStream::finalize] n_resent_blocks = %d", thread_data->n_resent_blocks);
+ "[GvStream::finalize] n_resent_packets = %d", thread_data->n_resent_packets);
thread_data->cancel = TRUE;
g_thread_join (gv_stream->thread);
diff --git a/src/arvgvstream.h b/src/arvgvstream.h
index 77fba74..257c5c4 100644
--- a/src/arvgvstream.h
+++ b/src/arvgvstream.h
@@ -83,8 +83,8 @@ ArvStream * arv_gv_stream_new (GInetAddress *device_address, guint16 port,
guint16 arv_gv_stream_get_port (ArvGvStream *gv_stream);
void arv_gv_stream_get_statistics (ArvGvStream *gv_stream,
- guint64 *n_resent_blocks,
- guint64 *n_missing_blocks);
+ guint64 *n_resent_packets,
+ guint64 *n_missing_packets);
G_END_DECLS
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]