[aravis/timeout-resend] WIP.



commit a4df7582ee89cd3eb3fb18464ef1e0e14b5582d9
Author: Emmanuel Pacaud <emmanuel gnome org>
Date:   Tue Feb 22 21:43:06 2011 +0100

    WIP.

 src/arvbuffer.h   |    7 ++-
 src/arvgvstream.c |  174 ++++++++++++++++++++++-------------------------------
 src/arvgvstream.h |    4 +-
 3 files changed, 79 insertions(+), 106 deletions(-)
---
diff --git a/src/arvbuffer.h b/src/arvbuffer.h
index 1e616c6..1d168da 100644
--- a/src/arvbuffer.h
+++ b/src/arvbuffer.h
@@ -33,7 +33,8 @@ typedef void (*ArvFrameCallback)	(ArvBuffer *buffer);
  * ArvBufferStatus:
  * @ARV_BUFFER_STATUS_SUCCESS: the buffer contains a valid image
  * @ARV_BUFFER_STATUS_CLEARED: the buffer is cleared
- * @ARV_BUFFER_STATUS_MISSING_BLOCKS: image has missing blocks
+ * @ARV_BUFFER_STATUS_MISSING_PACKETS: stream has missing packets
+ * @ARV_BUFFER_STATUS_WRONG_PACKET_ID: stream has packet with wrong id
  * @ARV_BUFFER_STATUS_SIZE_MISMATCH: the received image didn't fit in the buffer data space
  * @ARV_BUFFER_STATUS_FILLING: the image is currently being filled
  * @ARV_BUFFER_STATUS_ABORTED: the filling was aborted before completion
@@ -42,7 +43,9 @@ typedef void (*ArvFrameCallback)	(ArvBuffer *buffer);
 typedef enum {
 	ARV_BUFFER_STATUS_SUCCESS,
 	ARV_BUFFER_STATUS_CLEARED,
-	ARV_BUFFER_STATUS_MISSING_BLOCKS,
+	ARV_BUFFER_STATUS_TIMEOUT,
+	ARV_BUFFER_STATUS_MISSING_PACKETS,
+	ARV_BUFFER_STATUS_WRONG_PACKET_ID,
 	ARV_BUFFER_STATUS_SIZE_MISMATCH,
 	ARV_BUFFER_STATUS_FILLING,
 	ARV_BUFFER_STATUS_ABORTED
diff --git a/src/arvgvstream.c b/src/arvgvstream.c
index 4ea8031..66d908a 100644
--- a/src/arvgvstream.c
+++ b/src/arvgvstream.c
@@ -63,13 +63,6 @@ typedef struct {
 	guint n_packets;
 	ArvGvStreamPacketData *packet_data;
 
-	size_t read_data_size;
-	gint32 n_missing_blocks;
-	gint32 last_block_size;
-	gint32 last_packet_id;
-	gint64 last_time_us;
-	guint64 last_timestamp_ns;
-	guint32 statistic_count;
 	gint64 leader_time_us;
 } ArvGvStreamFrameData;
 
@@ -85,7 +78,7 @@ typedef struct {
 	ArvGvStreamPacketResend packet_resend;
 
 	guint64 timestamp_tick_frequency;
-	guint packet_size;
+	guint data_size;
 
 	gboolean cancel;
 
@@ -95,14 +88,17 @@ typedef struct {
 
 	guint n_completed_buffers;
 	guint n_failures;
+	guint n_timeouts;
 	guint n_underruns;
 	guint n_aborteds;
 	guint n_size_mismatch_errors;
 
-	guint n_resent_blocks;
-	guint n_missing_blocks;
+	guint n_resend_requests;
+	guint n_resent_packets;
+	guint n_missing_packets;
 
 	ArvStatistic *statistic;
+	guint32 statistic_count;
 
 	ArvGvStreamSocketBuffer socket_buffer_option;
 	int socket_buffer_size;
@@ -178,17 +174,16 @@ _close_buffer (ArvGvStreamThreadData *thread_data, ArvGvStreamFrameData *frame)
 		if (!frame->packet_data[i].received)
 			n_missing_packets++;
 
-	if (n_missing_packets != 0)
-		arv_debug ("stream", "[GvStream::_close_buffer] n_missing_packets = %d", n_missing_packets);
+	thread_data->n_missing_packets += n_missing_packets;
 
-	arv_debug ("stream", "[GvStream::_close_buffer] last_valid_packet = %d", frame->last_valid_packet);
+/*        if (n_missing_packets != 0)*/
+/*                arv_debug ("stream", "[GvStream::_close_buffer] n_missing_packets = %d", n_missing_packets);*/
 
 	if (frame->buffer->status == ARV_BUFFER_STATUS_FILLING) {
-	       if (frame->read_data_size == frame->buffer->size &&
-		   frame->n_missing_blocks == 0)
+	       if (n_missing_packets == 0)
 		       frame->buffer->status = ARV_BUFFER_STATUS_SUCCESS;
 	       else
-		       frame->buffer->status = ARV_BUFFER_STATUS_MISSING_BLOCKS;
+		       frame->buffer->status = ARV_BUFFER_STATUS_MISSING_PACKETS;
 	}
 
 	if (frame->buffer->status == ARV_BUFFER_STATUS_SUCCESS)
@@ -199,6 +194,9 @@ _close_buffer (ArvGvStreamThreadData *thread_data, ArvGvStreamFrameData *frame)
 	if (frame->buffer->status == ARV_BUFFER_STATUS_ABORTED)
 		thread_data->n_aborteds++;
 
+	if (frame->buffer->status == ARV_BUFFER_STATUS_TIMEOUT)
+		thread_data->n_timeouts++;
+
 	if (frame->buffer->status == ARV_BUFFER_STATUS_SIZE_MISMATCH)
 		thread_data->n_size_mismatch_errors++;
 
@@ -209,21 +207,12 @@ _close_buffer (ArvGvStreamThreadData *thread_data, ArvGvStreamFrameData *frame)
 
 	g_get_current_time (&current_time);
 	current_time_us = current_time.tv_sec * 1000000 + current_time.tv_usec;
-	if (frame->statistic_count > 5) {
-		arv_statistic_fill (thread_data->statistic, 0,
-				    (frame->buffer->timestamp_ns - frame->last_timestamp_ns) /
-				    1000, frame->buffer->frame_id);
+	if (thread_data->statistic_count > 5) {
 		arv_statistic_fill (thread_data->statistic, 1,
-				    current_time_us - frame->last_time_us,
-				    frame->buffer->frame_id);
-		arv_statistic_fill (thread_data->statistic, 2,
 				    current_time_us - frame->leader_time_us,
 				    frame->buffer->frame_id);
 	} else
-		frame->statistic_count++;
-
-	frame->last_time_us = current_time_us;
-	frame->last_timestamp_ns = frame->buffer->timestamp_ns;
+		thread_data->statistic_count++;
 
 	arv_stream_push_output_buffer (thread_data->stream, frame->buffer);
 
@@ -250,11 +239,6 @@ _process_data_leader (ArvGvStreamThreadData *thread_data,
 
 	frame->buffer->timestamp_ns = arv_gvsp_packet_get_timestamp (packet,
 								     thread_data->timestamp_tick_frequency);
-
-	if (thread_data->callback != NULL)
-		thread_data->callback (thread_data->user_data,
-				       ARV_STREAM_CALLBACK_TYPE_START_BUFFER,
-				       NULL);
 }
 
 static void
@@ -273,58 +257,29 @@ _process_data_block (ArvGvStreamThreadData *thread_data,
 		return;
 
 	packet_id = arv_gvsp_packet_get_packet_id (packet);
-	if (packet_id <= frame->last_packet_id) {
-		arv_debug ("stream-thread", "[GvStream::thread] Receive resent packet (%d) frame %d",
-			   packet_id, frame->buffer->frame_id);
-
-		block_size = arv_gvsp_packet_get_data_size (read_count);
-		block_offset = frame->last_block_size * (packet_id - 1);
-		block_end = block_size + block_offset;
-
-		if (block_end  > frame->buffer->size) {
-			arv_gvsp_packet_debug (packet, read_count);
-			frame->buffer->status = ARV_BUFFER_STATUS_SIZE_MISMATCH;
-			return;
-		}
-		memcpy (frame->buffer->data + block_offset, &packet->data, block_size);
-
-		frame->read_data_size += block_size;
-		frame->n_missing_blocks--;
-		thread_data->n_resent_blocks++;
-		return;
-	}
-
-	if (packet_id != (frame->last_packet_id + 1)) {
-		gint32 n_misses;
-
-		n_misses = packet_id - frame->last_packet_id - 1;
-
+	if (packet_id > frame->n_packets - 2) {
 		arv_gvsp_packet_debug (packet, read_count);
-		arv_debug ("stream-thread", "[GvStream::thread] Missing block (expected %d - %d) frame %d",
-			   frame->last_packet_id + 1,
-			   packet_id, frame->buffer->frame_id);
-		thread_data->n_missing_blocks += n_misses;
-		frame->n_missing_blocks += n_misses;
-
-		if (thread_data->packet_resend != ARV_GV_STREAM_PACKET_RESEND_NEVER)
-			_send_packet_request (thread_data, frame->buffer->frame_id,
-					      frame->last_packet_id + 1, packet_id - 1);
+		frame->buffer->status = ARV_BUFFER_STATUS_WRONG_PACKET_ID;
+		return;
 	}
 
 	block_size = arv_gvsp_packet_get_data_size (read_count);
-	block_offset = frame->last_block_size * (packet_id - 1);
+	block_offset = (packet_id - 1) * thread_data->data_size;
 	block_end = block_size + block_offset;
 
-	if (block_end  > frame->buffer->size) {
+	if (block_end > frame->buffer->size) {
 		arv_gvsp_packet_debug (packet, read_count);
 		frame->buffer->status = ARV_BUFFER_STATUS_SIZE_MISMATCH;
 		return;
 	}
+
 	memcpy (frame->buffer->data + block_offset, &packet->data, block_size);
 
-	frame->read_data_size += block_size;
-	frame->last_block_size = block_size;
-	frame->last_packet_id =  packet_id;
+	if (frame->packet_data->n_requests > 0)
+		thread_data->n_resent_packets++;
+
+	if (frame->last_valid_packet == frame->n_packets - 1)
+		_close_buffer (thread_data, frame);
 }
 
 static void
@@ -333,10 +288,8 @@ _process_data_trailer (ArvGvStreamThreadData *thread_data, ArvGvStreamFrameData
 	if (frame->buffer == NULL)
 		return;
 
-	if (frame->read_data_size < frame->buffer->size)
-		return;
-
-	_close_buffer (thread_data, frame);
+	if (frame->last_valid_packet == frame->n_packets - 1)
+		_close_buffer (thread_data, frame);
 }
 
 static void
@@ -352,7 +305,7 @@ _update_frame_data (ArvGvStreamThreadData *thread_data, ArvGvStreamFrameData *fr
 	frame_id = arv_gvsp_packet_get_frame_id (packet);
 
 	if (frame->buffer != NULL && frame->frame_id != frame_id) {
-		frame->buffer->status = ARV_BUFFER_STATUS_ABORTED;
+		frame->buffer->status = ARV_BUFFER_STATUS_TIMEOUT;
 		_close_buffer (thread_data, frame);
 	}
 
@@ -367,15 +320,11 @@ _update_frame_data (ArvGvStreamThreadData *thread_data, ArvGvStreamFrameData *fr
 		frame->frame_id = frame_id;
 		frame->last_valid_packet = -1;
 		frame->buffer->status = ARV_BUFFER_STATUS_FILLING;
-		frame->read_data_size = 0;
-		frame->n_missing_blocks = 0;
-		frame->last_block_size = 0;
-		frame->last_packet_id = 0;
 
 		g_get_current_time (&current_time);
 		frame->leader_time_us = current_time.tv_sec * 1000000 + current_time.tv_usec;
 
-		n_packets = (frame->buffer->size + thread_data->packet_size - 1) / thread_data->packet_size + 2;
+		n_packets = (frame->buffer->size + thread_data->data_size - 1) / thread_data->data_size + 2;
 		if (frame->n_packets != n_packets) {
 			g_free (frame->packet_data);
 			frame->packet_data = g_new (ArvGvStreamPacketData, n_packets);
@@ -385,17 +334,32 @@ _update_frame_data (ArvGvStreamThreadData *thread_data, ArvGvStreamFrameData *fr
 		}
 
 		memset (frame->packet_data, 0, sizeof (ArvGvStreamPacketData) * frame->n_packets);
+
+		if (thread_data->callback != NULL)
+			thread_data->callback (thread_data->user_data,
+					       ARV_STREAM_CALLBACK_TYPE_START_BUFFER,
+					       NULL);
 	}
 
 	packet_id = arv_gvsp_packet_get_packet_id (packet);
-	if (packet_id < frame->n_packets) {
+	if (packet_id < frame->n_packets)
 		frame->packet_data[packet_id].received = TRUE;
 
-		for (i = frame->last_valid_packet; i < packet_id; i++)
-			if (!frame->packet_data[i + 1].received)
-				break;
+	for (i = frame->last_valid_packet + 1; i < frame->n_packets; i++)
+		if (!frame->packet_data[i].received)
+			break;
+	frame->last_valid_packet = i - 1;
 
-		frame->last_valid_packet = i;
+	if (packet_id < frame->n_packets) {
+		for (i = frame->last_valid_packet + 1; i < packet_id; i++) {
+			if (!frame->packet_data[i].received &&
+			    frame->packet_data[i].n_requests == 0) {
+				_send_packet_request (thread_data, frame->frame_id,
+						      i, i);
+				frame->packet_data[i].n_requests = 1;
+				thread_data->n_resend_requests++;
+			}
+		}
 	}
 }
 
@@ -492,7 +456,7 @@ arv_gv_stream_new (GInetAddress *device_address, guint16 port,
 	ArvGvStreamThreadData *thread_data;
 
 	g_return_val_if_fail (G_IS_INET_ADDRESS (device_address), NULL);
-	g_return_val_if_fail (packet_size > 0, NULL);
+	g_return_val_if_fail (packet_size > (20 + 8 + 8) /* FIXME IP + UDP + GVSP headers */, NULL);
 
 	gv_stream = g_object_new (ARV_TYPE_GV_STREAM, NULL);
 
@@ -516,7 +480,7 @@ arv_gv_stream_new (GInetAddress *device_address, guint16 port,
 	thread_data->device_address = g_inet_socket_address_new (device_address, ARV_GVCP_PORT);
 	thread_data->packet_resend = ARV_GV_STREAM_PACKET_RESEND_ALWAYS;
 	thread_data->timestamp_tick_frequency = timestamp_tick_frequency;
-	thread_data->packet_size = packet_size;
+	thread_data->data_size = packet_size - (20 + 8 + 8) /* FIXME IP + UDP + GVSP headers */;
 	thread_data->cancel = FALSE;
 
 	thread_data->packet_count = 1;
@@ -525,15 +489,17 @@ arv_gv_stream_new (GInetAddress *device_address, guint16 port,
 	thread_data->n_failures = 0;
 	thread_data->n_underruns = 0;
 	thread_data->n_size_mismatch_errors = 0;
-	thread_data->n_missing_blocks = 0;
-	thread_data->n_resent_blocks = 0;
+	thread_data->n_missing_packets = 0;
+	thread_data->n_resent_packets = 0;
+	thread_data->n_resend_requests = 0;
 	thread_data->n_aborteds = 0;
+	thread_data->n_timeouts = 0;
 
-	thread_data->statistic = arv_statistic_new (3, 5000, 200, 0);
+	thread_data->statistic = arv_statistic_new (2, 5000, 200, 0);
+	thread_data->statistic_count = 0;
 
 	arv_statistic_set_name (thread_data->statistic, 0, "Timestamp delta");
-	arv_statistic_set_name (thread_data->statistic, 1, "Local time delta");
-	arv_statistic_set_name (thread_data->statistic, 2, "Buffer reception time");
+	arv_statistic_set_name (thread_data->statistic, 1, "Buffer reception time");
 
 	thread_data->socket_buffer_option = ARV_GV_STREAM_SOCKET_BUFFER_FIXED;
 	thread_data->socket_buffer_size = 0;
@@ -550,8 +516,8 @@ arv_gv_stream_new (GInetAddress *device_address, guint16 port,
 
 void
 arv_gv_stream_get_statistics (ArvGvStream *gv_stream,
-			      guint64 *n_resent_blocks,
-			      guint64 *n_missing_blocks)
+			      guint64 *n_resent_packets,
+			      guint64 *n_missing_packets)
 
 {
 	ArvGvStreamThreadData *thread_data;
@@ -560,10 +526,10 @@ arv_gv_stream_get_statistics (ArvGvStream *gv_stream,
 
 	thread_data = gv_stream->thread_data;
 
-	if (n_resent_blocks != NULL)
-		*n_resent_blocks = thread_data->n_resent_blocks;
-	if (n_missing_blocks != NULL)
-		*n_missing_blocks = thread_data->n_missing_blocks;
+	if (n_resent_packets != NULL)
+		*n_resent_packets = thread_data->n_resent_packets;
+	if (n_missing_packets != NULL)
+		*n_missing_packets = thread_data->n_missing_packets;
 }
 
 static void
@@ -653,15 +619,19 @@ arv_gv_stream_finalize (GObject *object)
 		arv_debug ("stream",
 			   "[GvStream::finalize] n_failures             = %d", thread_data->n_failures);
 		arv_debug ("stream",
+			   "[GvStream::finalize] n_timeouts             = %d", thread_data->n_timeouts);
+		arv_debug ("stream",
 			   "[GvStream::finalize] n_aborteds             = %d", thread_data->n_aborteds);
 		arv_debug ("stream",
 			   "[GvStream::finalize] n_underruns            = %d", thread_data->n_underruns);
 		arv_debug ("stream",
 			   "[GvStream::finalize] n_size_mismatch_errors = %d", thread_data->n_size_mismatch_errors);
 		arv_debug ("stream",
-			   "[GvStream::finalize] n_missing_blocks       = %d", thread_data->n_missing_blocks);
+			   "[GvStream::finalize] n_missing_packets      = %d", thread_data->n_missing_packets);
+		arv_debug ("stream",
+			   "[GvStream::finalize] n_resend_requests      = %d", thread_data->n_resend_requests);
 		arv_debug ("stream",
-			   "[GvStream::finalize] n_resent_blocks        = %d", thread_data->n_resent_blocks);
+			   "[GvStream::finalize] n_resent_packets       = %d", thread_data->n_resent_packets);
 
 		thread_data->cancel = TRUE;
 		g_thread_join (gv_stream->thread);
diff --git a/src/arvgvstream.h b/src/arvgvstream.h
index 77fba74..257c5c4 100644
--- a/src/arvgvstream.h
+++ b/src/arvgvstream.h
@@ -83,8 +83,8 @@ ArvStream * 	arv_gv_stream_new			(GInetAddress *device_address, guint16 port,
 guint16 	arv_gv_stream_get_port			(ArvGvStream *gv_stream);
 
 void		arv_gv_stream_get_statistics		(ArvGvStream *gv_stream,
-							 guint64 *n_resent_blocks,
-							 guint64 *n_missing_blocks);
+							 guint64 *n_resent_packets,
+							 guint64 *n_missing_packets);
 
 G_END_DECLS
 



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]