[aravis/timeout-resend] WIP.



commit 73ca418dcfcbcd5c7d6491882161e25b7787ae40
Author: Emmanuel Pacaud <emmanuel gnome org>
Date:   Thu Feb 24 14:59:52 2011 +0100

    WIP.

 src/arvgvstream.c |  115 +++++++++++++++++++++++++++++++++++++++-------------
 1 files changed, 86 insertions(+), 29 deletions(-)
---
diff --git a/src/arvgvstream.c b/src/arvgvstream.c
index 66d908a..5738faf 100644
--- a/src/arvgvstream.c
+++ b/src/arvgvstream.c
@@ -1,6 +1,6 @@
 /* Aravis - Digital camera library
  *
- * Copyright © 2009-2010 Emmanuel Pacaud
+ * Copyright © 2009-2011 Emmanuel Pacaud
  *
  * This library is free software; you can redistribute it and/or
  * modify it under the terms of the GNU Lesser General Public
@@ -96,6 +96,7 @@ typedef struct {
 	guint n_resend_requests;
 	guint n_resent_packets;
 	guint n_missing_packets;
+	guint n_duplicated_packets;
 
 	ArvStatistic *statistic;
 	guint32 statistic_count;
@@ -117,7 +118,7 @@ _send_packet_request (ArvGvStreamThreadData *thread_data,
 	packet = arv_gvcp_packet_new_packet_resend_cmd (frame_id, first_block, last_block,
 							thread_data->packet_count++, &packet_size);
 
-	arv_debug ("stream-thread", "[GvStream::send_packet_request] frame_id = %d (%d - %d)",
+	arv_debug ("stream-thread", "[GvStream::send_packet_request] frame_id = %u (%d - %d)",
 		   frame_id, first_block, last_block);
 
 	arv_gvcp_packet_debug (packet);
@@ -170,20 +171,17 @@ _close_buffer (ArvGvStreamThreadData *thread_data, ArvGvStreamFrameData *frame)
 	if (frame->buffer == NULL)
 		return;
 
-	for (i = 0; i < frame->n_packets; i++)
-		if (!frame->packet_data[i].received)
-			n_missing_packets++;
-
-	thread_data->n_missing_packets += n_missing_packets;
+	if (frame->buffer->status == ARV_BUFFER_STATUS_FILLING) {
+		for (i = 0; i < frame->n_packets; i++)
+			if (!frame->packet_data[i].received)
+				n_missing_packets++;
 
-/*        if (n_missing_packets != 0)*/
-/*                arv_debug ("stream", "[GvStream::_close_buffer] n_missing_packets = %d", n_missing_packets);*/
+		thread_data->n_missing_packets += n_missing_packets;
 
-	if (frame->buffer->status == ARV_BUFFER_STATUS_FILLING) {
-	       if (n_missing_packets == 0)
-		       frame->buffer->status = ARV_BUFFER_STATUS_SUCCESS;
-	       else
-		       frame->buffer->status = ARV_BUFFER_STATUS_MISSING_PACKETS;
+		if (n_missing_packets == 0)
+			frame->buffer->status = ARV_BUFFER_STATUS_SUCCESS;
+		else
+			frame->buffer->status = ARV_BUFFER_STATUS_MISSING_PACKETS;
 	}
 
 	if (frame->buffer->status == ARV_BUFFER_STATUS_SUCCESS)
@@ -225,10 +223,17 @@ _process_data_leader (ArvGvStreamThreadData *thread_data,
 		      ArvGvStreamFrameData *frame,
 		      ArvGvspPacket *packet)
 {
+	guint32 packet_id;
+
 	if (frame->buffer == NULL)
 		return;
 
-	_update_socket (thread_data, frame->buffer);
+	packet_id = arv_gvsp_packet_get_packet_id (packet);
+	if (packet_id != 0) {
+		frame->buffer->status = ARV_BUFFER_STATUS_WRONG_PACKET_ID;
+		_close_buffer (thread_data, frame);
+		return;
+	}
 
 	frame->buffer->x_offset = arv_gvsp_packet_get_x_offset (packet);
 	frame->buffer->y_offset = arv_gvsp_packet_get_y_offset (packet);
@@ -239,6 +244,12 @@ _process_data_leader (ArvGvStreamThreadData *thread_data,
 
 	frame->buffer->timestamp_ns = arv_gvsp_packet_get_timestamp (packet,
 								     thread_data->timestamp_tick_frequency);
+
+	if (frame->packet_data[packet_id].n_requests > 0)
+		thread_data->n_resent_packets++;
+
+	if (frame->last_valid_packet == frame->n_packets - 1)
+		_close_buffer (thread_data, frame);
 }
 
 static void
@@ -260,6 +271,7 @@ _process_data_block (ArvGvStreamThreadData *thread_data,
 	if (packet_id > frame->n_packets - 2) {
 		arv_gvsp_packet_debug (packet, read_count);
 		frame->buffer->status = ARV_BUFFER_STATUS_WRONG_PACKET_ID;
+		_close_buffer (thread_data, frame);
 		return;
 	}
 
@@ -270,12 +282,13 @@ _process_data_block (ArvGvStreamThreadData *thread_data,
 	if (block_end > frame->buffer->size) {
 		arv_gvsp_packet_debug (packet, read_count);
 		frame->buffer->status = ARV_BUFFER_STATUS_SIZE_MISMATCH;
+		_close_buffer (thread_data, frame);
 		return;
 	}
 
 	memcpy (frame->buffer->data + block_offset, &packet->data, block_size);
 
-	if (frame->packet_data->n_requests > 0)
+	if (frame->packet_data[packet_id].n_requests > 0)
 		thread_data->n_resent_packets++;
 
 	if (frame->last_valid_packet == frame->n_packets - 1)
@@ -283,11 +296,25 @@ _process_data_block (ArvGvStreamThreadData *thread_data,
 }
 
 static void
-_process_data_trailer (ArvGvStreamThreadData *thread_data, ArvGvStreamFrameData *frame)
+_process_data_trailer (ArvGvStreamThreadData *thread_data,
+		       ArvGvStreamFrameData *frame,
+		       ArvGvspPacket *packet)
 {
+	guint32 packet_id;
+
 	if (frame->buffer == NULL)
 		return;
 
+	packet_id = arv_gvsp_packet_get_packet_id (packet);
+	if (packet_id != frame->n_packets - 1) {
+		frame->buffer->status = ARV_BUFFER_STATUS_WRONG_PACKET_ID;
+		_close_buffer (thread_data, frame);
+		return;
+	}
+
+	if (frame->packet_data[packet_id].n_requests > 0)
+		thread_data->n_resent_packets++;
+
 	if (frame->last_valid_packet == frame->n_packets - 1)
 		_close_buffer (thread_data, frame);
 }
@@ -310,13 +337,14 @@ _update_frame_data (ArvGvStreamThreadData *thread_data, ArvGvStreamFrameData *fr
 	}
 
 	if (frame->buffer == NULL) {
-
 		frame->buffer = arv_stream_pop_input_buffer (thread_data->stream);
 		if (frame->buffer == NULL) {
 			thread_data->n_underruns++;
 			return;
 		}
 
+		_update_socket (thread_data, frame->buffer);
+
 		frame->frame_id = frame_id;
 		frame->last_valid_packet = -1;
 		frame->buffer->status = ARV_BUFFER_STATUS_FILLING;
@@ -342,8 +370,12 @@ _update_frame_data (ArvGvStreamThreadData *thread_data, ArvGvStreamFrameData *fr
 	}
 
 	packet_id = arv_gvsp_packet_get_packet_id (packet);
-	if (packet_id < frame->n_packets)
-		frame->packet_data[packet_id].received = TRUE;
+	if (packet_id < frame->n_packets) {
+		if (frame->packet_data[packet_id].received)
+			thread_data->n_duplicated_packets++;
+		else
+			frame->packet_data[packet_id].received = TRUE;
+	}
 
 	for (i = frame->last_valid_packet + 1; i < frame->n_packets; i++)
 		if (!frame->packet_data[i].received)
@@ -351,14 +383,35 @@ _update_frame_data (ArvGvStreamThreadData *thread_data, ArvGvStreamFrameData *fr
 	frame->last_valid_packet = i - 1;
 
 	if (packet_id < frame->n_packets) {
+		int first_missing = -1;
+
 		for (i = frame->last_valid_packet + 1; i < packet_id; i++) {
 			if (!frame->packet_data[i].received &&
 			    frame->packet_data[i].n_requests == 0) {
-				_send_packet_request (thread_data, frame->frame_id,
-						      i, i);
-				frame->packet_data[i].n_requests = 1;
-				thread_data->n_resend_requests++;
-			}
+				if (first_missing < 0)
+					first_missing = i;
+			} else
+				if (first_missing >= 0) {
+					int j;
+
+					_send_packet_request (thread_data, frame->frame_id,
+							      first_missing, i - 1);
+					for (j = first_missing; j < i; j++)
+						frame->packet_data[j].n_requests = 1;
+					thread_data->n_resend_requests += (i - first_missing);
+
+					first_missing = -1;
+				}
+		}
+
+		if (first_missing >= 0) {
+			int j;
+
+			_send_packet_request (thread_data, frame->frame_id,
+					      first_missing, i - 1);
+			for (j = first_missing; j < i; j++)
+				frame->packet_data[j].n_requests = 1;
+			thread_data->n_resend_requests += (i - first_missing);
 		}
 	}
 }
@@ -393,6 +446,7 @@ arv_gv_stream_thread (void *data)
 		if (n_events > 0) {
 			read_count = g_socket_receive (thread_data->socket, (char *) packet,
 						       ARV_GV_STREAM_INCOMING_BUFFER_SIZE, NULL, NULL);
+
 			frame_id = arv_gvsp_packet_get_frame_id (packet);
 			frame = &frames[frame_id % ARV_GV_STREAM_THREAD_N_FRAMES];
 
@@ -408,7 +462,7 @@ arv_gv_stream_thread (void *data)
 					break;
 
 				case ARV_GVSP_PACKET_TYPE_DATA_TRAILER:
-					_process_data_trailer (thread_data, frame);
+					_process_data_trailer (thread_data, frame, packet);
 					break;
 			}
 		}
@@ -492,6 +546,7 @@ arv_gv_stream_new (GInetAddress *device_address, guint16 port,
 	thread_data->n_missing_packets = 0;
 	thread_data->n_resent_packets = 0;
 	thread_data->n_resend_requests = 0;
+	thread_data->n_duplicated_packets = 0;
 	thread_data->n_aborteds = 0;
 	thread_data->n_timeouts = 0;
 
@@ -614,6 +669,9 @@ arv_gv_stream_finalize (GObject *object)
 
 		thread_data = gv_stream->thread_data;
 
+		thread_data->cancel = TRUE;
+		g_thread_join (gv_stream->thread);
+
 		arv_debug ("stream",
 			   "[GvStream::finalize] n_completed_buffers    = %d", thread_data->n_completed_buffers);
 		arv_debug ("stream",
@@ -632,9 +690,8 @@ arv_gv_stream_finalize (GObject *object)
 			   "[GvStream::finalize] n_resend_requests      = %d", thread_data->n_resend_requests);
 		arv_debug ("stream",
 			   "[GvStream::finalize] n_resent_packets       = %d", thread_data->n_resent_packets);
-
-		thread_data->cancel = TRUE;
-		g_thread_join (gv_stream->thread);
+		arv_debug ("stream",
+			   "[GvStream::finalize] n_duplicated_packets   = %d", thread_data->n_duplicated_packets);
 
 		g_object_unref (thread_data->device_address);
 



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]