[aravis/timeout-resend] gv_stream: discard old frames.



commit bb30f6b001bd3ca93e510d75eeeab3f23396bf53
Author: Emmanuel Pacaud <emmanuel gnome org>
Date:   Wed Mar 2 11:42:33 2011 +0100

    gv_stream: discard old frames.

 src/arvgvstream.c |   69 +++++++++++++++++++++++++++++++++-------------------
 1 files changed, 44 insertions(+), 25 deletions(-)
---
diff --git a/src/arvgvstream.c b/src/arvgvstream.c
index cf33694..1508893 100644
--- a/src/arvgvstream.c
+++ b/src/arvgvstream.c
@@ -37,6 +37,7 @@
 
 #define ARV_GV_STREAM_INCOMING_BUFFER_SIZE	65536
 #define ARV_GV_STREAM_THREAD_N_FRAMES		2
+#define ARV_GV_STREAM_THREAD_DISCARD_WINDOW	65536
 
 enum {
 	ARV_GV_STREAM_PROPERTY_0,
@@ -59,11 +60,11 @@ typedef struct {
 	guint frame_id;
 
 	gint last_valid_packet;
+	guint64 first_packet_timestamp_us;
+	guint64 last_packet_timestamp_us;
 
 	guint n_packets;
 	ArvGvStreamPacketData *packet_data;
-
-	gint64 leader_time_us;
 } ArvGvStreamFrameData;
 
 typedef struct {
@@ -84,6 +85,8 @@ typedef struct {
 
 	guint32 packet_count;
 
+	guint32 last_frame_id;
+
 	/* Statistics */
 
 	guint n_completed_buffers;
@@ -207,7 +210,7 @@ _close_buffer (ArvGvStreamThreadData *thread_data, ArvGvStreamFrameData *frame)
 	current_time_us = current_time.tv_sec * 1000000 + current_time.tv_usec;
 	if (thread_data->statistic_count > 5) {
 		arv_statistic_fill (thread_data->statistic, 1,
-				    current_time_us - frame->leader_time_us,
+				    current_time_us - frame->first_packet_timestamp_us,
 				    frame->buffer->frame_id);
 	} else
 		thread_data->statistic_count++;
@@ -319,11 +322,10 @@ _process_data_trailer (ArvGvStreamThreadData *thread_data,
 		_close_buffer (thread_data, frame);
 }
 
-static void
+static gboolean
 _update_frame_data (ArvGvStreamThreadData *thread_data, ArvGvStreamFrameData *frame,
-		    ArvGvspPacket *packet, size_t read_count)
+		    ArvGvspPacket *packet, size_t read_count, guint64 timestamp_us)
 {
-	GTimeVal current_time;
 	guint frame_id;
 	guint n_packets;
 	int packet_id;
@@ -331,16 +333,25 @@ _update_frame_data (ArvGvStreamThreadData *thread_data, ArvGvStreamFrameData *fr
 
 	frame_id = arv_gvsp_packet_get_frame_id (packet);
 
-	if (frame->buffer != NULL && frame->frame_id != frame_id) {
+	if (frame->buffer != NULL &&
+	    frame->frame_id != frame_id) {
+		/* discard old frames */
+		if (frame_id - thread_data->last_frame_id > - ARV_GV_STREAM_THREAD_DISCARD_WINDOW) {
+			arv_debug ("stream", "[GvStream::_update_frame_data] Discard late frame %u (last frame is %u)",
+				   frame_id, thread_data->last_frame_id);
+			return FALSE;
+		}
+
 		frame->buffer->status = ARV_BUFFER_STATUS_TIMEOUT;
 		_close_buffer (thread_data, frame);
 	}
 
 	if (frame->buffer == NULL) {
+
 		frame->buffer = arv_stream_pop_input_buffer (thread_data->stream);
 		if (frame->buffer == NULL) {
 			thread_data->n_underruns++;
-			return;
+			return FALSE;
 		}
 
 		_update_socket (thread_data, frame->buffer);
@@ -349,8 +360,9 @@ _update_frame_data (ArvGvStreamThreadData *thread_data, ArvGvStreamFrameData *fr
 		frame->last_valid_packet = -1;
 		frame->buffer->status = ARV_BUFFER_STATUS_FILLING;
 
-		g_get_current_time (&current_time);
-		frame->leader_time_us = current_time.tv_sec * 1000000 + current_time.tv_usec;
+		thread_data->last_frame_id = frame_id;
+
+		frame->first_packet_timestamp_us = timestamp_us;
 
 		n_packets = (frame->buffer->size + thread_data->data_size - 1) / thread_data->data_size + 2;
 		if (frame->n_packets != n_packets) {
@@ -369,6 +381,8 @@ _update_frame_data (ArvGvStreamThreadData *thread_data, ArvGvStreamFrameData *fr
 					       NULL);
 	}
 
+	frame->last_packet_timestamp_us = timestamp_us;
+
 	packet_id = arv_gvsp_packet_get_packet_id (packet);
 	if (packet_id < frame->n_packets) {
 		if (frame->packet_data[packet_id].received)
@@ -414,6 +428,8 @@ _update_frame_data (ArvGvStreamThreadData *thread_data, ArvGvStreamFrameData *fr
 			thread_data->n_resend_requests += (i - first_missing);
 		}
 	}
+
+	return TRUE;
 }
 
 static void *
@@ -423,6 +439,8 @@ arv_gv_stream_thread (void *data)
 	ArvGvStreamFrameData frames[ARV_GV_STREAM_THREAD_N_FRAMES];
 	ArvGvStreamFrameData *frame;
 	ArvGvspPacket *packet;
+	GTimeVal current_time;
+	guint64 timestamp_us;
 	guint frame_id;
 	GPollFD poll_fd;
 	size_t read_count;
@@ -444,27 +462,27 @@ arv_gv_stream_thread (void *data)
 		n_events = g_poll (&poll_fd, 1, 1000);
 
 		if (n_events > 0) {
+			g_get_current_time (&current_time);
+			timestamp_us = current_time.tv_sec * 1000000 + current_time.tv_usec;
+
 			read_count = g_socket_receive (thread_data->socket, (char *) packet,
 						       ARV_GV_STREAM_INCOMING_BUFFER_SIZE, NULL, NULL);
 
 			frame_id = arv_gvsp_packet_get_frame_id (packet);
 			frame = &frames[frame_id % ARV_GV_STREAM_THREAD_N_FRAMES];
 
-			_update_frame_data (thread_data, frame, packet, read_count);
-
-			switch (arv_gvsp_packet_get_packet_type (packet)) {
-				case ARV_GVSP_PACKET_TYPE_DATA_LEADER:
-					_process_data_leader (thread_data, frame, packet);
-					break;
-
-				case ARV_GVSP_PACKET_TYPE_DATA_BLOCK:
-					_process_data_block (thread_data, frame, packet, read_count);
-					break;
-
-				case ARV_GVSP_PACKET_TYPE_DATA_TRAILER:
-					_process_data_trailer (thread_data, frame, packet);
-					break;
-			}
+			if (_update_frame_data (thread_data, frame, packet, read_count, timestamp_us))
+				switch (arv_gvsp_packet_get_packet_type (packet)) {
+					case ARV_GVSP_PACKET_TYPE_DATA_LEADER:
+						_process_data_leader (thread_data, frame, packet);
+						break;
+					case ARV_GVSP_PACKET_TYPE_DATA_BLOCK:
+						_process_data_block (thread_data, frame, packet, read_count);
+						break;
+					case ARV_GVSP_PACKET_TYPE_DATA_TRAILER:
+						_process_data_trailer (thread_data, frame, packet);
+						break;
+				}
 		}
 	} while (!thread_data->cancel);
 
@@ -538,6 +556,7 @@ arv_gv_stream_new (GInetAddress *device_address, guint16 port,
 	thread_data->cancel = FALSE;
 
 	thread_data->packet_count = 1;
+	thread_data->last_frame_id = 0;
 
 	thread_data->n_completed_buffers = 0;
 	thread_data->n_failures = 0;



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]