[aravis] gv_stream: try to implement the packet resend feature.



commit c11b2bce26c8c04f8e9071d69e137d1833b4eacc
Author: Emmanuel Pacaud <emmanuel gnome org>
Date:   Tue May 11 16:33:33 2010 +0200

    gv_stream: try to implement the packet resend feature.

 src/arvcameratest.c |    6 +-
 src/arvgvstream.c   |  163 +++++++++++++++++++++++++++++++++------------------
 src/arvstream.c     |   10 ++--
 src/arvstream.h     |    4 +-
 4 files changed, 116 insertions(+), 67 deletions(-)
---
diff --git a/src/arvcameratest.c b/src/arvcameratest.c
index c7a0449..856962e 100644
--- a/src/arvcameratest.c
+++ b/src/arvcameratest.c
@@ -80,7 +80,7 @@ main (int argc, char **argv)
 		gint x, y, width, height;
 		gint dx, dy;
 		double exposure;
-		guint64 n_processed_buffers;
+		guint64 n_completed_buffers;
 		guint64 n_failures;
 		guint64 n_underruns;
 
@@ -148,9 +148,9 @@ main (int argc, char **argv)
 
 		} while (!cancel);
 
-		arv_stream_get_statistics (stream, &n_processed_buffers, &n_failures, &n_underruns);
+		arv_stream_get_statistics (stream, &n_completed_buffers, &n_failures, &n_underruns);
 
-		g_print ("Processed buffers = %Ld\n", n_processed_buffers);
+		g_print ("Completed buffers = %Ld\n", n_completed_buffers);
 		g_print ("Failures          = %Ld\n", n_failures);
 		g_print ("Underruns         = %Ld\n", n_underruns);
 
diff --git a/src/arvgvstream.c b/src/arvgvstream.c
index e40618e..e6e1c97 100644
--- a/src/arvgvstream.c
+++ b/src/arvgvstream.c
@@ -52,9 +52,11 @@ typedef struct {
 
 	/* Statistics */
 
-	guint n_processed_buffers;
+	guint n_completed_buffers;
 	guint n_failures;
 	guint n_underruns;
+	guint n_aborteds;
+	guint n_resent_blocks;
 
 	guint n_size_mismatch_errors;
 	guint n_missing_blocks;
@@ -66,11 +68,11 @@ typedef struct {
 	int current_socket_buffer_size;
 } ArvGvStreamThreadData;
 
-void
-arv_gv_stream_send_packet_request (ArvGvStreamThreadData *thread_data,
-				   guint32 frame_id,
-				   guint32 first_block,
-				   guint32 last_block)
+static void
+_send_packet_request (ArvGvStreamThreadData *thread_data,
+		      guint32 frame_id,
+		      guint32 first_block,
+		      guint32 last_block)
 {
 	ArvGvcpPacket *packet;
 	guint32 packet_size;
@@ -78,7 +80,7 @@ arv_gv_stream_send_packet_request (ArvGvStreamThreadData *thread_data,
 	packet = arv_gvcp_packet_new_packet_resend_cmd (frame_id, first_block, last_block,
 							thread_data->packet_count++, &packet_size);
 
-	arv_debug ("stream", "[GvStream::send_packet_request] frame_id = %d (%d - %d)",
+	arv_debug ("stream-thread", "[GvStream::send_packet_request] frame_id = %d (%d - %d)",
 		   frame_id, first_block, last_block);
 
 	arv_gvcp_packet_debug (packet);
@@ -90,7 +92,7 @@ arv_gv_stream_send_packet_request (ArvGvStreamThreadData *thread_data,
 }
 
 static void
-arv_gv_stream_update_socket (ArvGvStreamThreadData *thread_data, ArvBuffer *buffer)
+_update_socket (ArvGvStreamThreadData *thread_data, ArvBuffer *buffer)
 {
 	int buffer_size, fd;
 
@@ -115,7 +117,7 @@ arv_gv_stream_update_socket (ArvGvStreamThreadData *thread_data, ArvBuffer *buff
 	if (buffer_size != thread_data->current_socket_buffer_size) {
 		setsockopt (fd, SOL_SOCKET, SO_RCVBUF, &buffer_size, sizeof (buffer_size));
 		thread_data->current_socket_buffer_size = buffer_size;
-		arv_debug ("stream", "[GvStream::update_socket] Socket buffer size set to %d",
+		arv_debug ("stream-thread", "[GvStream::update_socket] Socket buffer size set to %d",
 			 buffer_size);
 	}
 }
@@ -123,7 +125,7 @@ arv_gv_stream_update_socket (ArvGvStreamThreadData *thread_data, ArvBuffer *buff
 typedef struct {
 	ArvGvspPacket *packet;
 	ArvBuffer *buffer;
-	gboolean is_missing_blocks;
+	gint32 n_missing_blocks;
 	gint32 last_block_size;
 	gint32 last_block_id;
 	gint64 last_time_us;
@@ -132,17 +134,62 @@ typedef struct {
 } ArvGvStreamThreadState;
 
 static void
+_close_buffer (ArvGvStreamThreadData *thread_data, ArvGvStreamThreadState *state)
+{
+	GTimeVal current_time;
+	gint64 current_time_us;
+
+	if (state->buffer == NULL)
+		return;
+
+	if (state->buffer->status == ARV_BUFFER_STATUS_FILLING ||
+	    state->n_missing_blocks != 0)
+		state->buffer->status = ARV_BUFFER_STATUS_MISSING_BLOCKS;
+	if (state->buffer->status == ARV_BUFFER_STATUS_SUCCESS)
+		thread_data->n_completed_buffers++;
+	else
+		thread_data->n_failures++;
+	if (state->buffer->status == ARV_BUFFER_STATUS_ABORTED)
+		thread_data->n_aborteds++;
+
+	if (thread_data->callback != NULL)
+		thread_data->callback (thread_data->user_data,
+				       ARV_STREAM_CALLBACK_TYPE_BUFFER_DONE,
+				       state->buffer);
+
+	g_get_current_time (&current_time);
+	current_time_us = current_time.tv_sec * 1000000 + current_time.tv_usec;
+	if (state->statistic_count > 5) {
+		arv_statistic_fill (thread_data->statistic, 0,
+				    (current_time_us - state->last_time_us),
+				    state->buffer->frame_id);
+		arv_statistic_fill (thread_data->statistic, 1,
+				    (state->buffer->timestamp_ns - state->last_timestamp_ns) /
+				    1000, state->buffer->frame_id);
+	} else
+		state->statistic_count++;
+
+	state->last_time_us = current_time_us;
+	state->last_timestamp_ns = state->buffer->timestamp_ns;
+	g_async_queue_push (thread_data->output_queue, state->buffer);
+	state->buffer = NULL;
+}
+
+static void
 _process_data_leader (ArvGvStreamThreadData *thread_data, ArvGvStreamThreadState *state)
 {
-	if (state->buffer != NULL)
-		g_async_queue_push (thread_data->output_queue, state->buffer);
+	if (state->buffer != NULL) {
+		state->buffer->status = ARV_BUFFER_STATUS_ABORTED;
+		_close_buffer (thread_data, state);
+	}
+
 	state->buffer = g_async_queue_try_pop (thread_data->input_queue);
 	if (state->buffer == NULL) {
 		thread_data->n_underruns++;
 		return;
 	}
 
-	arv_gv_stream_update_socket (thread_data, state->buffer);
+	_update_socket (thread_data, state->buffer);
 
 	state->buffer->x_offset = arv_gvsp_packet_get_x_offset (state->packet);
 	state->buffer->y_offset = arv_gvsp_packet_get_y_offset (state->packet);
@@ -155,7 +202,7 @@ _process_data_leader (ArvGvStreamThreadData *thread_data, ArvGvStreamThreadState
 								     thread_data->timestamp_tick_frequency);
 
 	state->buffer->status = ARV_BUFFER_STATUS_FILLING;
-	state->is_missing_blocks = FALSE;
+	state->n_missing_blocks = 0;
 	state->last_block_size = 0;
 	state->last_block_id = 0;
 }
@@ -172,16 +219,40 @@ _process_data_block (ArvGvStreamThreadData *thread_data, ArvGvStreamThreadState
 	    state->buffer->status != ARV_BUFFER_STATUS_FILLING)
 		return;
 
+	if (state->buffer->frame_id != arv_gvsp_packet_get_frame_id (state->packet))
+		return;
+
 	block_id = arv_gvsp_packet_get_block_id (state->packet);
+	if (block_id <= state->last_block_id) {
+		arv_debug ("stream-thread", "[GvStream::thread] Receive resent block (%d) frame %d",
+			   block_id, state->buffer->frame_id);
+
+		block_size = arv_gvsp_packet_get_data_size (read_count);
+		block_offset = state->last_block_size * (block_id - 1);
+		block_end = block_size + block_offset;
+
+		if (block_end  <= state->buffer->size)
+			memcpy (state->buffer->data + block_offset, &state->packet->data, block_size);
+
+		state->n_missing_blocks--;
+		thread_data->n_resent_blocks++;
+		return;
+	}
+
 	if (block_id != (state->last_block_id + 1)) {
+		gint32 n_misses;
+
+		n_misses = block_id - state->last_block_id;
+
 		arv_gvsp_packet_debug (state->packet, read_count);
-		arv_debug ("stream", "[GvStream::thread] Missing block (expected %d - %d) frame %d",
+		arv_debug ("stream-thread", "[GvStream::thread] Missing block (expected %d - %d) frame %d",
 			   state->last_block_id + 1,
-			   block_id,
-			   arv_gvsp_packet_get_frame_id (state->packet));
-		thread_data->n_failures++;
-		thread_data->n_missing_blocks++;
-		state->is_missing_blocks = TRUE;
+			   block_id, state->buffer->frame_id);
+		thread_data->n_missing_blocks += n_misses;
+		state->n_missing_blocks += n_misses;
+
+		_send_packet_request (thread_data, state->buffer->frame_id,
+				      state->last_block_id + 1, block_id - 1);
 	}
 
 	block_size = arv_gvsp_packet_get_data_size (read_count);
@@ -191,7 +262,6 @@ _process_data_block (ArvGvStreamThreadData *thread_data, ArvGvStreamThreadState
 	if (block_end  > state->buffer->size) {
 		arv_gvsp_packet_debug (state->packet, read_count);
 		state->buffer->status = ARV_BUFFER_STATUS_SIZE_MISMATCH;
-		thread_data->n_failures++;
 		thread_data->n_size_mismatch_errors++;
 		return;
 	}
@@ -207,40 +277,13 @@ _process_data_block (ArvGvStreamThreadData *thread_data, ArvGvStreamThreadState
 static void
 _process_data_trailer (ArvGvStreamThreadData *thread_data, ArvGvStreamThreadState *state)
 {
-	GTimeVal current_time;
-	gint64 current_time_us;
-
 	if (state->buffer == NULL)
 		return;
 
-	if (state->buffer->status == ARV_BUFFER_STATUS_FILLING)
-		state->buffer->status = ARV_BUFFER_STATUS_SIZE_MISMATCH;
-	if (state->buffer->status == ARV_BUFFER_STATUS_SUCCESS)
-		thread_data->n_processed_buffers++;
-	if (state->is_missing_blocks)
-		state->buffer->status = ARV_BUFFER_STATUS_MISSING_BLOCKS;
-
-	if (thread_data->callback != NULL)
-		thread_data->callback (thread_data->user_data,
-				       ARV_STREAM_CALLBACK_TYPE_BUFFER_DONE,
-				       state->buffer);
-
-	g_get_current_time (&current_time);
-	current_time_us = current_time.tv_sec * 1000000 + current_time.tv_usec;
-	if (state->statistic_count > 5) {
-		arv_statistic_fill (thread_data->statistic, 0,
-				    (current_time_us - state->last_time_us),
-				    state->buffer->frame_id);
-		arv_statistic_fill (thread_data->statistic, 1,
-				    (state->buffer->timestamp_ns - state->last_timestamp_ns) /
-				    1000, state->buffer->frame_id);
-	} else
-		state->statistic_count++;
+	if (state->n_missing_blocks != 0)
+		return;
 
-	state->last_time_us = current_time_us;
-	state->last_timestamp_ns = state->buffer->timestamp_ns;
-	g_async_queue_push (thread_data->output_queue, state->buffer);
-	state->buffer = NULL;
+	_close_buffer (thread_data, state);
 }
 
 static void *
@@ -262,7 +305,7 @@ arv_gv_stream_thread (void *data)
 
 	state.buffer = NULL;
 	state.packet = g_malloc0 (ARV_GV_STREAM_INCOMING_BUFFER_SIZE);
-	state.is_missing_blocks = FALSE;
+	state.n_missing_blocks = 0;
 	state.last_block_size = 0;
 	state.last_block_id = 0;
 	state.statistic_count = 0;
@@ -295,7 +338,7 @@ arv_gv_stream_thread (void *data)
 
 	if (state.buffer != NULL) {
 		state.buffer->status = ARV_BUFFER_STATUS_ABORTED;
-		g_async_queue_push (thread_data->output_queue, state.buffer);
+		_close_buffer (thread_data, &state);
 	}
 
 	if (thread_data->callback != NULL)
@@ -378,11 +421,13 @@ arv_gv_stream_new (GInetAddress *device_address, guint16 port,
 
 	thread_data->packet_count = 1;
 
-	thread_data->n_processed_buffers = 0;
+	thread_data->n_completed_buffers = 0;
 	thread_data->n_failures = 0;
 	thread_data->n_underruns = 0;
 	thread_data->n_size_mismatch_errors = 0;
 	thread_data->n_missing_blocks = 0;
+	thread_data->n_resent_blocks = 0;
+	thread_data->n_aborteds = 0;
 
 	thread_data->statistic = arv_statistic_new (2, 5000, 200, 0);
 
@@ -404,7 +449,7 @@ arv_gv_stream_new (GInetAddress *device_address, guint16 port,
 
 static void
 arv_gv_stream_get_statistics (ArvStream *stream,
-			      guint64 *n_processed_buffers,
+			      guint64 *n_completed_buffers,
 			      guint64 *n_failures,
 			      guint64 *n_underruns)
 {
@@ -413,7 +458,7 @@ arv_gv_stream_get_statistics (ArvStream *stream,
 
 	thread_data = gv_stream->thread_data;
 
-	*n_processed_buffers = thread_data->n_processed_buffers;
+	*n_completed_buffers = thread_data->n_completed_buffers;
 	*n_failures = thread_data->n_failures;
 	*n_underruns = thread_data->n_underruns;
 }
@@ -435,15 +480,19 @@ arv_gv_stream_finalize (GObject *object)
 		thread_data = gv_stream->thread_data;
 
 		arv_debug ("stream",
-			   "[GvStream::finalize] n_processed_buffers    = %d", thread_data->n_processed_buffers);
+			   "[GvStream::finalize] n_completed_buffers    = %d", thread_data->n_completed_buffers);
 		arv_debug ("stream",
 			   "[GvStream::finalize] n_failures             = %d", thread_data->n_failures);
 		arv_debug ("stream",
+			   "[GvStream::finalize] n_aborteds             = %d", thread_data->n_aborteds);
+		arv_debug ("stream",
 			   "[GvStream::finalize] n_underruns            = %d", thread_data->n_underruns);
 		arv_debug ("stream",
 			   "[GvStream::finalize] n_size_mismatch_errors = %d", thread_data->n_size_mismatch_errors);
 		arv_debug ("stream",
 			   "[GvStream::finalize] n_missing_blocks       = %d", thread_data->n_missing_blocks);
+		arv_debug ("stream",
+			   "[GvStream::finalize] n_resent_blocks        = %d", thread_data->n_resent_blocks);
 
 		thread_data->cancel = TRUE;
 		g_thread_join (gv_stream->thread);
diff --git a/src/arvstream.c b/src/arvstream.c
index be01763..104414a 100644
--- a/src/arvstream.c
+++ b/src/arvstream.c
@@ -53,21 +53,21 @@ arv_stream_get_n_available_buffers (ArvStream *stream)
 
 void
 arv_stream_get_statistics (ArvStream *stream,
-			   guint64 *n_processed_buffers,
+			   guint64 *n_completed_buffers,
 			   guint64 *n_failures,
 			   guint64 *n_underruns)
 {
 	ArvStreamClass *stream_class;
 	guint64 dummy;
 
-	if (n_processed_buffers == NULL)
-		n_processed_buffers = &dummy;
+	if (n_completed_buffers == NULL)
+		n_completed_buffers = &dummy;
 	if (n_failures == NULL)
 		n_failures = &dummy;
 	if (n_underruns == NULL)
 		n_underruns = &dummy;
 
-	*n_processed_buffers = 0;
+	*n_completed_buffers = 0;
 	*n_failures = 0;
 	*n_underruns = 0;
 
@@ -75,7 +75,7 @@ arv_stream_get_statistics (ArvStream *stream,
 
 	stream_class = ARV_STREAM_GET_CLASS (stream);
 	if (stream_class->get_statistics != NULL)
-		stream_class->get_statistics (stream, n_processed_buffers, n_failures, n_underruns);
+		stream_class->get_statistics (stream, n_completed_buffers, n_failures, n_underruns);
 }
 
 static void
diff --git a/src/arvstream.h b/src/arvstream.h
index 2cb89ce..063ee0b 100644
--- a/src/arvstream.h
+++ b/src/arvstream.h
@@ -55,7 +55,7 @@ struct _ArvStream {
 struct _ArvStreamClass {
 	GObjectClass parent_class;
 
-	void		(*get_statistics)	(ArvStream *stream, guint64 *n_processed_buffers,
+	void		(*get_statistics)	(ArvStream *stream, guint64 *n_completed_buffers,
 						 guint64 *n_failures, guint64 *n_underruns);
 };
 
@@ -65,7 +65,7 @@ void			arv_stream_push_buffer 			(ArvStream *stream, ArvBuffer *buffer);
 ArvBuffer *		arv_stream_pop_buffer			(ArvStream *stream);
 int			arv_stream_get_n_available_buffers	(ArvStream *stream);
 void			arv_stream_get_statistics		(ArvStream *stream,
-								 guint64 *n_processed_buffers,
+								 guint64 *n_completed_buffers,
 								 guint64 *n_failures,
 								 guint64 *n_underruns);
 



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]