[aravis] gv_stream: split thread function into smaller ones.



commit d476317a432275c8d1622375a4cb16222ca850b8
Author: Emmanuel Pacaud <emmanuel gnome org>
Date:   Tue May 11 11:19:58 2010 +0200

    gv_stream: split thread function into smaller ones.

 src/arvgvstream.c |  245 ++++++++++++++++++++++++++++++-----------------------
 1 files changed, 138 insertions(+), 107 deletions(-)
---
diff --git a/src/arvgvstream.c b/src/arvgvstream.c
index 00a735b..04aec1d 100644
--- a/src/arvgvstream.c
+++ b/src/arvgvstream.c
@@ -29,6 +29,8 @@
 #include <string.h>
 #include <sys/socket.h>
 
+#define ARV_GV_STREAM_INCOMING_BUFFER_SIZE	65536
+
 static GObjectClass *parent_class = NULL;
 
 /* Acquisition thread */
@@ -64,8 +66,7 @@ typedef struct {
 	int current_socket_buffer_size;
 } ArvGvStreamThreadData;
 
-#if 0
-static void
+void
 arv_gv_stream_send_packet_request (ArvGvStreamThreadData *thread_data,
 				   guint32 frame_id,
 				   guint32 first_block,
@@ -87,7 +88,6 @@ arv_gv_stream_send_packet_request (ArvGvStreamThreadData *thread_data,
 
 	arv_gvcp_packet_free (packet);
 }
-#endif
 
 static void
 arv_gv_stream_update_socket (ArvGvStreamThreadData *thread_data, ArvBuffer *buffer)
@@ -120,148 +120,179 @@ arv_gv_stream_update_socket (ArvGvStreamThreadData *thread_data, ArvBuffer *buff
 	}
 }
 
+typedef struct {
+	ArvGvspPacket *packet;
+	ArvBuffer *buffer;
+	guint16 block_id;
+	size_t read_count;
+	ptrdiff_t offset;
+	gint64 last_time_us;
+	guint64 last_timestamp_ns;
+	guint32 statistic_count;
+} ArvGvStreamThreadState;
+
+static void
+_process_data_leader (ArvGvStreamThreadData *thread_data, ArvGvStreamThreadState *state)
+{
+	if (state->buffer != NULL)
+		g_async_queue_push (thread_data->output_queue, state->buffer);
+	state->buffer = g_async_queue_try_pop (thread_data->input_queue);
+	if (state->buffer == NULL) {
+		thread_data->n_underruns++;
+		return;
+	}
+
+	arv_gv_stream_update_socket (thread_data, state->buffer);
+
+	state->buffer->x_offset = arv_gvsp_packet_get_x_offset (state->packet);
+	state->buffer->y_offset = arv_gvsp_packet_get_y_offset (state->packet);
+	state->buffer->width = arv_gvsp_packet_get_width (state->packet);
+	state->buffer->height = arv_gvsp_packet_get_height (state->packet);
+	state->buffer->pixel_format = arv_gvsp_packet_get_pixel_format (state->packet);
+	state->buffer->frame_id = arv_gvsp_packet_get_frame_id (state->packet);
+
+	state->buffer->timestamp_ns = arv_gvsp_packet_get_timestamp (state->packet,
+								     thread_data->timestamp_tick_frequency);
+
+	state->buffer->status = ARV_BUFFER_STATUS_FILLING;
+	state->block_id = 0;
+	state->offset = 0;
+}
+
+static void
+_process_data_block (ArvGvStreamThreadData *thread_data, ArvGvStreamThreadState *state)
+{
+	size_t block_size;
+
+	if (state->buffer == NULL ||
+	    state->buffer->status != ARV_BUFFER_STATUS_FILLING)
+		return;
+
+	state->block_id++;
+	if (arv_gvsp_packet_get_block_id (state->packet) != state->block_id) {
+		arv_gvsp_packet_debug (state->packet, state->read_count);
+		arv_debug ("stream", "[GvStream::thread] Missing block (expected %d - %d) frame %d",
+			   state->block_id,
+			   arv_gvsp_packet_get_block_id (state->packet),
+			   arv_gvsp_packet_get_frame_id (state->packet));
+		state->buffer->status = ARV_BUFFER_STATUS_MISSING_BLOCK;
+		state->block_id =  arv_gvsp_packet_get_block_id (state->packet);
+		thread_data->n_failures++;
+		thread_data->n_missing_blocks++;
+		return;
+	}
+
+	block_size = arv_gvsp_packet_get_data_size (state->read_count);
+	if (block_size + state->offset > state->buffer->size) {
+		arv_gvsp_packet_debug (state->packet, state->read_count);
+		state->buffer->status = ARV_BUFFER_STATUS_SIZE_MISMATCH;
+		thread_data->n_failures++;
+		thread_data->n_size_mismatch_errors++;
+		return;
+	}
+	memcpy (state->buffer->data + state->offset, &state->packet->data, block_size);
+	state->offset += block_size;
+
+	if (state->offset == state->buffer->size)
+		state->buffer->status = ARV_BUFFER_STATUS_SUCCESS;
+}
+
+static void
+_process_data_trailer (ArvGvStreamThreadData *thread_data, ArvGvStreamThreadState *state)
+{
+	GTimeVal current_time;
+	gint64 current_time_us;
+
+	if (state->buffer == NULL)
+		return;
+
+	if (state->buffer->status == ARV_BUFFER_STATUS_FILLING)
+		state->buffer->status = ARV_BUFFER_STATUS_SIZE_MISMATCH;
+	if (state->buffer->status == ARV_BUFFER_STATUS_SUCCESS)
+		thread_data->n_processed_buffers++;
+
+	if (thread_data->callback != NULL)
+		thread_data->callback (thread_data->user_data,
+				       ARV_STREAM_CALLBACK_TYPE_BUFFER_DONE,
+				       state->buffer);
+
+	g_get_current_time (&current_time);
+	current_time_us = current_time.tv_sec * 1000000 + current_time.tv_usec;
+	if (state->statistic_count > 5) {
+		arv_statistic_fill (thread_data->statistic, 0,
+				    (current_time_us - state->last_time_us),
+				    state->buffer->frame_id);
+		arv_statistic_fill (thread_data->statistic, 1,
+				    (state->buffer->timestamp_ns - state->last_timestamp_ns) /
+				    1000, state->buffer->frame_id);
+	} else
+		state->statistic_count++;
+
+	state->last_time_us = current_time_us;
+	state->last_timestamp_ns = state->buffer->timestamp_ns;
+	g_async_queue_push (thread_data->output_queue, state->buffer);
+	state->buffer = NULL;
+}
+
 static void *
 arv_gv_stream_thread (void *data)
 {
 	ArvGvStreamThreadData *thread_data = data;
+	ArvGvStreamThreadState state;
+	GTimeVal current_time;
 	GPollFD poll_fd;
 	int n_events;
-	char packet_buffer[65536];
-	ArvGvspPacket *packet;
-	ArvBuffer *buffer = NULL;
-	guint16 block_id = 0;
-	size_t read_count;
-	size_t block_size;
-	ptrdiff_t offset = 0;
-	GTimeVal current_time;
-	gint64 current_time_us;
-	gint64 last_time_us;
-	guint64 last_timestamp_ns;
-	gboolean statistic_count = 0;
 
 	if (thread_data->callback != NULL)
 		thread_data->callback (thread_data->user_data, ARV_STREAM_CALLBACK_TYPE_INIT, NULL);
 
-	packet = (ArvGvspPacket *) packet_buffer;
-
 	poll_fd.fd = g_socket_get_fd (thread_data->socket);
 	poll_fd.events =  G_IO_IN;
 	poll_fd.revents = 0;
 
+	state.buffer = NULL;
+	state.packet = g_malloc0 (ARV_GV_STREAM_INCOMING_BUFFER_SIZE);
+	state.block_id = 0;
+	state.read_count = 0;
+	state.offset = 0;
+	state.statistic_count = 0;
 	g_get_current_time (&current_time);
-	last_time_us = current_time.tv_sec * 1000000 + current_time.tv_usec;
-	last_timestamp_ns = 0;
+	state.last_time_us = current_time.tv_sec * 1000000 + current_time.tv_usec;
+	state.last_timestamp_ns = 0;
 
 	do {
 		n_events = g_poll (&poll_fd, 1, 1000);
 
 		if (n_events > 0) {
-			read_count = g_socket_receive (thread_data->socket, packet_buffer, 65536, NULL, NULL);
+			state.read_count = g_socket_receive (thread_data->socket, (char *) state.packet,
+							     ARV_GV_STREAM_INCOMING_BUFFER_SIZE, NULL, NULL);
 
-			switch (arv_gvsp_packet_get_packet_type (packet)) {
+			switch (arv_gvsp_packet_get_packet_type (state.packet)) {
 				case ARV_GVSP_PACKET_TYPE_DATA_LEADER:
-					if (buffer != NULL)
-						g_async_queue_push (thread_data->output_queue, buffer);
-					buffer = g_async_queue_try_pop (thread_data->input_queue);
-					if (buffer == NULL) {
-						thread_data->n_underruns++;
-						break;
-					}
-
-					arv_gv_stream_update_socket (thread_data, buffer);
-
-					buffer->x_offset = arv_gvsp_packet_get_x_offset (packet);
-					buffer->y_offset = arv_gvsp_packet_get_y_offset (packet);
-					buffer->width = arv_gvsp_packet_get_width (packet);
-					buffer->height = arv_gvsp_packet_get_height (packet);
-					buffer->pixel_format = arv_gvsp_packet_get_pixel_format (packet);
-					buffer->frame_id = arv_gvsp_packet_get_frame_id (packet);
-
-					buffer->timestamp_ns = arv_gvsp_packet_get_timestamp
-						(packet, thread_data->timestamp_tick_frequency);
-
-					buffer->status = ARV_BUFFER_STATUS_FILLING;
-					block_id = 0;
-					offset = 0;
+					_process_data_leader (thread_data, &state);
 					break;
 
 				case ARV_GVSP_PACKET_TYPE_DATA_BLOCK:
-					if (buffer == NULL ||
-					    buffer->status != ARV_BUFFER_STATUS_FILLING)
-						break;
-					block_id++;
-					if (arv_gvsp_packet_get_block_id (packet) != block_id) {
-						arv_gvsp_packet_debug (packet, read_count);
-						arv_debug ("stream",
-							   "[GvStream::thread] Missing block (expected %d - %d)"
-							   " frame %d",
-							   block_id,
-							   arv_gvsp_packet_get_block_id (packet),
-							   arv_gvsp_packet_get_frame_id (packet));
-						buffer->status = ARV_BUFFER_STATUS_MISSING_BLOCK;
-						block_id =  arv_gvsp_packet_get_block_id (packet);
-						thread_data->n_failures++;
-						thread_data->n_missing_blocks++;
-						break;
-					}
-					block_size = arv_gvsp_packet_get_data_size (read_count);
-					if (block_size + offset > buffer->size) {
-						arv_gvsp_packet_debug (packet, read_count);
-						buffer->status = ARV_BUFFER_STATUS_SIZE_MISMATCH;
-						thread_data->n_failures++;
-						thread_data->n_size_mismatch_errors++;
-						break;
-					}
-					memcpy (buffer->data + offset, &packet->data, block_size);
-					offset += block_size;
-
-					if (offset == buffer->size)
-						buffer->status = ARV_BUFFER_STATUS_SUCCESS;
-
+					_process_data_block (thread_data, &state);
 					break;
 
 				case ARV_GVSP_PACKET_TYPE_DATA_TRAILER:
-					if (buffer != NULL) {
-						if (buffer->status == ARV_BUFFER_STATUS_FILLING)
-							buffer->status = ARV_BUFFER_STATUS_SIZE_MISMATCH;
-						if (buffer->status == ARV_BUFFER_STATUS_SUCCESS)
-							thread_data->n_processed_buffers++;
-
-						if (thread_data->callback != NULL)
-							thread_data->callback (thread_data->user_data,
-									       ARV_STREAM_CALLBACK_TYPE_BUFFER_DONE,
-									       buffer);
-
-						g_get_current_time (&current_time);
-						current_time_us = current_time.tv_sec * 1000000 + current_time.tv_usec;
-						statistic_count++;
-						if (statistic_count > 5) {
-							arv_statistic_fill (thread_data->statistic,
-									    0, (current_time_us - last_time_us),
-									    buffer->frame_id);
-							arv_statistic_fill (thread_data->statistic,
-									    1,
-									    (buffer->timestamp_ns - last_timestamp_ns) /
-									    1000, buffer->frame_id);
-						}
-						last_time_us = current_time_us;
-						last_timestamp_ns = buffer->timestamp_ns;
-						g_async_queue_push (thread_data->output_queue, buffer);
-						buffer = NULL;
-					}
+					_process_data_trailer (thread_data, &state);
 					break;
 			}
 		}
 	} while (!thread_data->cancel);
 
-	if (buffer != NULL) {
-		buffer->status = ARV_BUFFER_STATUS_ABORTED;
-		g_async_queue_push (thread_data->output_queue, buffer);
+	if (state.buffer != NULL) {
+		state.buffer->status = ARV_BUFFER_STATUS_ABORTED;
+		g_async_queue_push (thread_data->output_queue, state.buffer);
 	}
 
 	if (thread_data->callback != NULL)
 		thread_data->callback (thread_data->user_data, ARV_STREAM_CALLBACK_TYPE_EXIT, NULL);
 
+	g_free (state.packet);
+
 	return NULL;
 }
 



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]