[aravis/timeout-resend] WIP.



commit 31b5789bdbba6da297ed82e5af9ede3465710fbb
Author: Emmanuel Pacaud <emmanuel gnome org>
Date:   Tue Feb 22 15:50:17 2011 +0100

    WIP.

 src/arvgvstream.c |  155 ++++++++++++++++++++++++++++++++++++-----------------
 src/arvgvstream.h |    3 +-
 2 files changed, 107 insertions(+), 51 deletions(-)
---
diff --git a/src/arvgvstream.c b/src/arvgvstream.c
index 53e2d33..4ea8031 100644
--- a/src/arvgvstream.c
+++ b/src/arvgvstream.c
@@ -36,6 +36,7 @@
 #include <sys/socket.h>
 
 #define ARV_GV_STREAM_INCOMING_BUFFER_SIZE	65536
+#define ARV_GV_STREAM_THREAD_N_FRAMES		2
 
 enum {
 	ARV_GV_STREAM_PROPERTY_0,
@@ -49,10 +50,21 @@ static GObjectClass *parent_class = NULL;
 /* Acquisition thread */
 
 typedef struct {
+	gboolean received;
+	guint	n_requests;
+} ArvGvStreamPacketData;
+
+typedef struct {
 	ArvBuffer *buffer;
+	guint frame_id;
+
+	gint last_valid_packet;
+
+	guint n_packets;
+	ArvGvStreamPacketData *packet_data;
+
 	size_t read_data_size;
 	gint32 n_missing_blocks;
-	gint32 n_late_blocks;
 	gint32 last_block_size;
 	gint32 last_packet_id;
 	gint64 last_time_us;
@@ -89,7 +101,6 @@ typedef struct {
 
 	guint n_resent_blocks;
 	guint n_missing_blocks;
-	guint n_late_blocks;
 
 	ArvStatistic *statistic;
 
@@ -157,10 +168,21 @@ _close_buffer (ArvGvStreamThreadData *thread_data, ArvGvStreamFrameData *frame)
 {
 	GTimeVal current_time;
 	gint64 current_time_us;
+	int i;
+	guint n_missing_packets = 0;
 
 	if (frame->buffer == NULL)
 		return;
 
+	for (i = 0; i < frame->n_packets; i++)
+		if (!frame->packet_data[i].received)
+			n_missing_packets++;
+
+	if (n_missing_packets != 0)
+		arv_debug ("stream", "[GvStream::_close_buffer] n_missing_packets = %d", n_missing_packets);
+
+	arv_debug ("stream", "[GvStream::_close_buffer] last_valid_packet = %d", frame->last_valid_packet);
+
 	if (frame->buffer->status == ARV_BUFFER_STATUS_FILLING) {
 	       if (frame->read_data_size == frame->buffer->size &&
 		   frame->n_missing_blocks == 0)
@@ -206,6 +228,7 @@ _close_buffer (ArvGvStreamThreadData *thread_data, ArvGvStreamFrameData *frame)
 	arv_stream_push_output_buffer (thread_data->stream, frame->buffer);
 
 	frame->buffer = NULL;
+	frame->frame_id = 0;
 }
 
 static void
@@ -213,16 +236,8 @@ _process_data_leader (ArvGvStreamThreadData *thread_data,
 		      ArvGvStreamFrameData *frame,
 		      ArvGvspPacket *packet)
 {
-	GTimeVal current_time;
-
-	if (frame->buffer != NULL)
-		_close_buffer (thread_data, frame);
-
-	frame->buffer = arv_stream_pop_input_buffer (thread_data->stream);
-	if (frame->buffer == NULL) {
-		thread_data->n_underruns++;
+	if (frame->buffer == NULL)
 		return;
-	}
 
 	_update_socket (thread_data, frame->buffer);
 
@@ -236,19 +251,10 @@ _process_data_leader (ArvGvStreamThreadData *thread_data,
 	frame->buffer->timestamp_ns = arv_gvsp_packet_get_timestamp (packet,
 								     thread_data->timestamp_tick_frequency);
 
-	frame->buffer->status = ARV_BUFFER_STATUS_FILLING;
-	frame->read_data_size = 0;
-	frame->n_missing_blocks = 0;
-	frame->last_block_size = 0;
-	frame->last_packet_id = 0;
-
-	g_get_current_time (&current_time);
-	frame->leader_time_us = current_time.tv_sec * 1000000 + current_time.tv_usec;
-
 	if (thread_data->callback != NULL)
 		thread_data->callback (thread_data->user_data,
 				       ARV_STREAM_CALLBACK_TYPE_START_BUFFER,
-				       frame->buffer);
+				       NULL);
 }
 
 static void
@@ -266,11 +272,6 @@ _process_data_block (ArvGvStreamThreadData *thread_data,
 	    frame->buffer->status != ARV_BUFFER_STATUS_FILLING)
 		return;
 
-	if (frame->buffer->frame_id != arv_gvsp_packet_get_frame_id (packet)) {
-		thread_data->n_late_blocks ++;
-		return;
-	}
-
 	packet_id = arv_gvsp_packet_get_packet_id (packet);
 	if (packet_id <= frame->last_packet_id) {
 		arv_debug ("stream-thread", "[GvStream::thread] Receive resent packet (%d) frame %d",
@@ -338,16 +339,78 @@ _process_data_trailer (ArvGvStreamThreadData *thread_data, ArvGvStreamFrameData
 	_close_buffer (thread_data, frame);
 }
 
+static void
+_update_frame_data (ArvGvStreamThreadData *thread_data, ArvGvStreamFrameData *frame,
+		    ArvGvspPacket *packet, size_t read_count)
+{
+	GTimeVal current_time;
+	guint frame_id;
+	guint n_packets;
+	int packet_id;
+	int i;
+
+	frame_id = arv_gvsp_packet_get_frame_id (packet);
+
+	if (frame->buffer != NULL && frame->frame_id != frame_id) {
+		frame->buffer->status = ARV_BUFFER_STATUS_ABORTED;
+		_close_buffer (thread_data, frame);
+	}
+
+	if (frame->buffer == NULL) {
+
+		frame->buffer = arv_stream_pop_input_buffer (thread_data->stream);
+		if (frame->buffer == NULL) {
+			thread_data->n_underruns++;
+			return;
+		}
+
+		frame->frame_id = frame_id;
+		frame->last_valid_packet = -1;
+		frame->buffer->status = ARV_BUFFER_STATUS_FILLING;
+		frame->read_data_size = 0;
+		frame->n_missing_blocks = 0;
+		frame->last_block_size = 0;
+		frame->last_packet_id = 0;
+
+		g_get_current_time (&current_time);
+		frame->leader_time_us = current_time.tv_sec * 1000000 + current_time.tv_usec;
+
+		n_packets = (frame->buffer->size + thread_data->packet_size - 1) / thread_data->packet_size + 2;
+		if (frame->n_packets != n_packets) {
+			g_free (frame->packet_data);
+			frame->packet_data = g_new (ArvGvStreamPacketData, n_packets);
+			frame->n_packets = n_packets;
+
+			arv_debug ("stream", "[GvStream::_update_frame_data] n_packets = %d", frame->n_packets);
+		}
+
+		memset (frame->packet_data, 0, sizeof (ArvGvStreamPacketData) * frame->n_packets);
+	}
+
+	packet_id = arv_gvsp_packet_get_packet_id (packet);
+	if (packet_id < frame->n_packets) {
+		frame->packet_data[packet_id].received = TRUE;
+
+		for (i = frame->last_valid_packet; i < packet_id; i++)
+			if (!frame->packet_data[i + 1].received)
+				break;
+
+		frame->last_valid_packet = i;
+	}
+}
+
 static void *
 arv_gv_stream_thread (void *data)
 {
 	ArvGvStreamThreadData *thread_data = data;
-	ArvGvStreamFrameData frame;
+	ArvGvStreamFrameData frames[ARV_GV_STREAM_THREAD_N_FRAMES];
+	ArvGvStreamFrameData *frame;
 	ArvGvspPacket *packet;
-	GTimeVal current_time;
+	guint frame_id;
 	GPollFD poll_fd;
 	size_t read_count;
 	int n_events;
+	int i;
 
 	if (thread_data->callback != NULL)
 		thread_data->callback (thread_data->user_data, ARV_STREAM_CALLBACK_TYPE_INIT, NULL);
@@ -358,14 +421,7 @@ arv_gv_stream_thread (void *data)
 
 	packet = g_malloc0 (ARV_GV_STREAM_INCOMING_BUFFER_SIZE);
 
-	frame.buffer = NULL;
-	frame.n_missing_blocks = 0;
-	frame.last_block_size = 0;
-	frame.last_packet_id = 0;
-	frame.statistic_count = 0;
-	g_get_current_time (&current_time);
-	frame.last_time_us = current_time.tv_sec * 1000000 + current_time.tv_usec;
-	frame.last_timestamp_ns = 0;
+	memset (frames, 0, sizeof (ArvGvStreamFrameData) * ARV_GV_STREAM_THREAD_N_FRAMES);
 
 	do {
 		n_events = g_poll (&poll_fd, 1, 1000);
@@ -373,26 +429,33 @@ arv_gv_stream_thread (void *data)
 		if (n_events > 0) {
 			read_count = g_socket_receive (thread_data->socket, (char *) packet,
 						       ARV_GV_STREAM_INCOMING_BUFFER_SIZE, NULL, NULL);
+			frame_id = arv_gvsp_packet_get_frame_id (packet);
+			frame = &frames[frame_id % ARV_GV_STREAM_THREAD_N_FRAMES];
+
+			_update_frame_data (thread_data, frame, packet, read_count);
 
 			switch (arv_gvsp_packet_get_packet_type (packet)) {
 				case ARV_GVSP_PACKET_TYPE_DATA_LEADER:
-					_process_data_leader (thread_data, &frame, packet);
+					_process_data_leader (thread_data, frame, packet);
 					break;
 
 				case ARV_GVSP_PACKET_TYPE_DATA_BLOCK:
-					_process_data_block (thread_data, &frame, packet, read_count);
+					_process_data_block (thread_data, frame, packet, read_count);
 					break;
 
 				case ARV_GVSP_PACKET_TYPE_DATA_TRAILER:
-					_process_data_trailer (thread_data, &frame);
+					_process_data_trailer (thread_data, frame);
 					break;
 			}
 		}
 	} while (!thread_data->cancel);
 
-	if (frame.buffer != NULL) {
-		frame.buffer->status = ARV_BUFFER_STATUS_ABORTED;
-		_close_buffer (thread_data, &frame);
+	for (i = 0; i < ARV_GV_STREAM_THREAD_N_FRAMES; i++) {
+		if (frames[i].buffer != NULL) {
+			frames[i].buffer->status = ARV_BUFFER_STATUS_ABORTED;
+			_close_buffer (thread_data, &frames[i]);
+		}
+		g_free (frames[i].packet_data);
 	}
 
 	if (thread_data->callback != NULL)
@@ -464,7 +527,6 @@ arv_gv_stream_new (GInetAddress *device_address, guint16 port,
 	thread_data->n_size_mismatch_errors = 0;
 	thread_data->n_missing_blocks = 0;
 	thread_data->n_resent_blocks = 0;
-	thread_data->n_late_blocks = 0;
 	thread_data->n_aborteds = 0;
 
 	thread_data->statistic = arv_statistic_new (3, 5000, 200, 0);
@@ -489,8 +551,7 @@ arv_gv_stream_new (GInetAddress *device_address, guint16 port,
 void
 arv_gv_stream_get_statistics (ArvGvStream *gv_stream,
 			      guint64 *n_resent_blocks,
-			      guint64 *n_missing_blocks,
-			      guint64 *n_late_blocks)
+			      guint64 *n_missing_blocks)
 
 {
 	ArvGvStreamThreadData *thread_data;
@@ -503,8 +564,6 @@ arv_gv_stream_get_statistics (ArvGvStream *gv_stream,
 		*n_resent_blocks = thread_data->n_resent_blocks;
 	if (n_missing_blocks != NULL)
 		*n_missing_blocks = thread_data->n_missing_blocks;
-	if (n_late_blocks != NULL)
-		*n_late_blocks = thread_data->n_late_blocks;
 }
 
 static void
@@ -603,8 +662,6 @@ arv_gv_stream_finalize (GObject *object)
 			   "[GvStream::finalize] n_missing_blocks       = %d", thread_data->n_missing_blocks);
 		arv_debug ("stream",
 			   "[GvStream::finalize] n_resent_blocks        = %d", thread_data->n_resent_blocks);
-		arv_debug ("stream",
-			   "[GvStream::finalize] n_late_blocks          = %d", thread_data->n_late_blocks);
 
 		thread_data->cancel = TRUE;
 		g_thread_join (gv_stream->thread);
diff --git a/src/arvgvstream.h b/src/arvgvstream.h
index f614fb2..77fba74 100644
--- a/src/arvgvstream.h
+++ b/src/arvgvstream.h
@@ -84,8 +84,7 @@ guint16 	arv_gv_stream_get_port			(ArvGvStream *gv_stream);
 
 void		arv_gv_stream_get_statistics		(ArvGvStream *gv_stream,
 							 guint64 *n_resent_blocks,
-							 guint64 *n_missing_blocks,
-							 guint64 *n_late_blocks);
+							 guint64 *n_missing_blocks);
 
 G_END_DECLS
 



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]