[aravis] gv_stream: split thread function into smaller ones.
- From: Emmanuel Pacaud <emmanuel src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [aravis] gv_stream: split thread function into smaller ones.
- Date: Tue, 11 May 2010 09:20:38 +0000 (UTC)
commit d476317a432275c8d1622375a4cb16222ca850b8
Author: Emmanuel Pacaud <emmanuel gnome org>
Date: Tue May 11 11:19:58 2010 +0200
gv_stream: split thread function into smaller ones.
src/arvgvstream.c | 245 ++++++++++++++++++++++++++++++-----------------------
1 files changed, 138 insertions(+), 107 deletions(-)
---
diff --git a/src/arvgvstream.c b/src/arvgvstream.c
index 00a735b..04aec1d 100644
--- a/src/arvgvstream.c
+++ b/src/arvgvstream.c
@@ -29,6 +29,8 @@
#include <string.h>
#include <sys/socket.h>
+#define ARV_GV_STREAM_INCOMING_BUFFER_SIZE 65536
+
static GObjectClass *parent_class = NULL;
/* Acquisition thread */
@@ -64,8 +66,7 @@ typedef struct {
int current_socket_buffer_size;
} ArvGvStreamThreadData;
-#if 0
-static void
+void
arv_gv_stream_send_packet_request (ArvGvStreamThreadData *thread_data,
guint32 frame_id,
guint32 first_block,
@@ -87,7 +88,6 @@ arv_gv_stream_send_packet_request (ArvGvStreamThreadData *thread_data,
arv_gvcp_packet_free (packet);
}
-#endif
static void
arv_gv_stream_update_socket (ArvGvStreamThreadData *thread_data, ArvBuffer *buffer)
@@ -120,148 +120,179 @@ arv_gv_stream_update_socket (ArvGvStreamThreadData *thread_data, ArvBuffer *buff
}
}
+typedef struct {
+ ArvGvspPacket *packet;
+ ArvBuffer *buffer;
+ guint16 block_id;
+ size_t read_count;
+ ptrdiff_t offset;
+ gint64 last_time_us;
+ guint64 last_timestamp_ns;
+ guint32 statistic_count;
+} ArvGvStreamThreadState;
+
+static void
+_process_data_leader (ArvGvStreamThreadData *thread_data, ArvGvStreamThreadState *state)
+{
+ if (state->buffer != NULL)
+ g_async_queue_push (thread_data->output_queue, state->buffer);
+ state->buffer = g_async_queue_try_pop (thread_data->input_queue);
+ if (state->buffer == NULL) {
+ thread_data->n_underruns++;
+ return;
+ }
+
+ arv_gv_stream_update_socket (thread_data, state->buffer);
+
+ state->buffer->x_offset = arv_gvsp_packet_get_x_offset (state->packet);
+ state->buffer->y_offset = arv_gvsp_packet_get_y_offset (state->packet);
+ state->buffer->width = arv_gvsp_packet_get_width (state->packet);
+ state->buffer->height = arv_gvsp_packet_get_height (state->packet);
+ state->buffer->pixel_format = arv_gvsp_packet_get_pixel_format (state->packet);
+ state->buffer->frame_id = arv_gvsp_packet_get_frame_id (state->packet);
+
+ state->buffer->timestamp_ns = arv_gvsp_packet_get_timestamp (state->packet,
+ thread_data->timestamp_tick_frequency);
+
+ state->buffer->status = ARV_BUFFER_STATUS_FILLING;
+ state->block_id = 0;
+ state->offset = 0;
+}
+
+static void
+_process_data_block (ArvGvStreamThreadData *thread_data, ArvGvStreamThreadState *state)
+{
+ size_t block_size;
+
+ if (state->buffer == NULL ||
+ state->buffer->status != ARV_BUFFER_STATUS_FILLING)
+ return;
+
+ state->block_id++;
+ if (arv_gvsp_packet_get_block_id (state->packet) != state->block_id) {
+ arv_gvsp_packet_debug (state->packet, state->read_count);
+ arv_debug ("stream", "[GvStream::thread] Missing block (expected %d - %d) frame %d",
+ state->block_id,
+ arv_gvsp_packet_get_block_id (state->packet),
+ arv_gvsp_packet_get_frame_id (state->packet));
+ state->buffer->status = ARV_BUFFER_STATUS_MISSING_BLOCK;
+ state->block_id = arv_gvsp_packet_get_block_id (state->packet);
+ thread_data->n_failures++;
+ thread_data->n_missing_blocks++;
+ return;
+ }
+
+ block_size = arv_gvsp_packet_get_data_size (state->read_count);
+ if (block_size + state->offset > state->buffer->size) {
+ arv_gvsp_packet_debug (state->packet, state->read_count);
+ state->buffer->status = ARV_BUFFER_STATUS_SIZE_MISMATCH;
+ thread_data->n_failures++;
+ thread_data->n_size_mismatch_errors++;
+ return;
+ }
+ memcpy (state->buffer->data + state->offset, &state->packet->data, block_size);
+ state->offset += block_size;
+
+ if (state->offset == state->buffer->size)
+ state->buffer->status = ARV_BUFFER_STATUS_SUCCESS;
+}
+
+static void
+_process_data_trailer (ArvGvStreamThreadData *thread_data, ArvGvStreamThreadState *state)
+{
+ GTimeVal current_time;
+ gint64 current_time_us;
+
+ if (state->buffer == NULL)
+ return;
+
+ if (state->buffer->status == ARV_BUFFER_STATUS_FILLING)
+ state->buffer->status = ARV_BUFFER_STATUS_SIZE_MISMATCH;
+ if (state->buffer->status == ARV_BUFFER_STATUS_SUCCESS)
+ thread_data->n_processed_buffers++;
+
+ if (thread_data->callback != NULL)
+ thread_data->callback (thread_data->user_data,
+ ARV_STREAM_CALLBACK_TYPE_BUFFER_DONE,
+ state->buffer);
+
+ g_get_current_time (¤t_time);
+ current_time_us = current_time.tv_sec * 1000000 + current_time.tv_usec;
+ if (state->statistic_count > 5) {
+ arv_statistic_fill (thread_data->statistic, 0,
+ (current_time_us - state->last_time_us),
+ state->buffer->frame_id);
+ arv_statistic_fill (thread_data->statistic, 1,
+ (state->buffer->timestamp_ns - state->last_timestamp_ns) /
+ 1000, state->buffer->frame_id);
+ } else
+ state->statistic_count++;
+
+ state->last_time_us = current_time_us;
+ state->last_timestamp_ns = state->buffer->timestamp_ns;
+ g_async_queue_push (thread_data->output_queue, state->buffer);
+ state->buffer = NULL;
+}
+
static void *
arv_gv_stream_thread (void *data)
{
ArvGvStreamThreadData *thread_data = data;
+ ArvGvStreamThreadState state;
+ GTimeVal current_time;
GPollFD poll_fd;
int n_events;
- char packet_buffer[65536];
- ArvGvspPacket *packet;
- ArvBuffer *buffer = NULL;
- guint16 block_id = 0;
- size_t read_count;
- size_t block_size;
- ptrdiff_t offset = 0;
- GTimeVal current_time;
- gint64 current_time_us;
- gint64 last_time_us;
- guint64 last_timestamp_ns;
- gboolean statistic_count = 0;
if (thread_data->callback != NULL)
thread_data->callback (thread_data->user_data, ARV_STREAM_CALLBACK_TYPE_INIT, NULL);
- packet = (ArvGvspPacket *) packet_buffer;
-
poll_fd.fd = g_socket_get_fd (thread_data->socket);
poll_fd.events = G_IO_IN;
poll_fd.revents = 0;
+ state.buffer = NULL;
+ state.packet = g_malloc0 (ARV_GV_STREAM_INCOMING_BUFFER_SIZE);
+ state.block_id = 0;
+ state.read_count = 0;
+ state.offset = 0;
+ state.statistic_count = 0;
g_get_current_time (¤t_time);
- last_time_us = current_time.tv_sec * 1000000 + current_time.tv_usec;
- last_timestamp_ns = 0;
+ state.last_time_us = current_time.tv_sec * 1000000 + current_time.tv_usec;
+ state.last_timestamp_ns = 0;
do {
n_events = g_poll (&poll_fd, 1, 1000);
if (n_events > 0) {
- read_count = g_socket_receive (thread_data->socket, packet_buffer, 65536, NULL, NULL);
+ state.read_count = g_socket_receive (thread_data->socket, (char *) state.packet,
+ ARV_GV_STREAM_INCOMING_BUFFER_SIZE, NULL, NULL);
- switch (arv_gvsp_packet_get_packet_type (packet)) {
+ switch (arv_gvsp_packet_get_packet_type (state.packet)) {
case ARV_GVSP_PACKET_TYPE_DATA_LEADER:
- if (buffer != NULL)
- g_async_queue_push (thread_data->output_queue, buffer);
- buffer = g_async_queue_try_pop (thread_data->input_queue);
- if (buffer == NULL) {
- thread_data->n_underruns++;
- break;
- }
-
- arv_gv_stream_update_socket (thread_data, buffer);
-
- buffer->x_offset = arv_gvsp_packet_get_x_offset (packet);
- buffer->y_offset = arv_gvsp_packet_get_y_offset (packet);
- buffer->width = arv_gvsp_packet_get_width (packet);
- buffer->height = arv_gvsp_packet_get_height (packet);
- buffer->pixel_format = arv_gvsp_packet_get_pixel_format (packet);
- buffer->frame_id = arv_gvsp_packet_get_frame_id (packet);
-
- buffer->timestamp_ns = arv_gvsp_packet_get_timestamp
- (packet, thread_data->timestamp_tick_frequency);
-
- buffer->status = ARV_BUFFER_STATUS_FILLING;
- block_id = 0;
- offset = 0;
+ _process_data_leader (thread_data, &state);
break;
case ARV_GVSP_PACKET_TYPE_DATA_BLOCK:
- if (buffer == NULL ||
- buffer->status != ARV_BUFFER_STATUS_FILLING)
- break;
- block_id++;
- if (arv_gvsp_packet_get_block_id (packet) != block_id) {
- arv_gvsp_packet_debug (packet, read_count);
- arv_debug ("stream",
- "[GvStream::thread] Missing block (expected %d - %d)"
- " frame %d",
- block_id,
- arv_gvsp_packet_get_block_id (packet),
- arv_gvsp_packet_get_frame_id (packet));
- buffer->status = ARV_BUFFER_STATUS_MISSING_BLOCK;
- block_id = arv_gvsp_packet_get_block_id (packet);
- thread_data->n_failures++;
- thread_data->n_missing_blocks++;
- break;
- }
- block_size = arv_gvsp_packet_get_data_size (read_count);
- if (block_size + offset > buffer->size) {
- arv_gvsp_packet_debug (packet, read_count);
- buffer->status = ARV_BUFFER_STATUS_SIZE_MISMATCH;
- thread_data->n_failures++;
- thread_data->n_size_mismatch_errors++;
- break;
- }
- memcpy (buffer->data + offset, &packet->data, block_size);
- offset += block_size;
-
- if (offset == buffer->size)
- buffer->status = ARV_BUFFER_STATUS_SUCCESS;
-
+ _process_data_block (thread_data, &state);
break;
case ARV_GVSP_PACKET_TYPE_DATA_TRAILER:
- if (buffer != NULL) {
- if (buffer->status == ARV_BUFFER_STATUS_FILLING)
- buffer->status = ARV_BUFFER_STATUS_SIZE_MISMATCH;
- if (buffer->status == ARV_BUFFER_STATUS_SUCCESS)
- thread_data->n_processed_buffers++;
-
- if (thread_data->callback != NULL)
- thread_data->callback (thread_data->user_data,
- ARV_STREAM_CALLBACK_TYPE_BUFFER_DONE,
- buffer);
-
- g_get_current_time (¤t_time);
- current_time_us = current_time.tv_sec * 1000000 + current_time.tv_usec;
- statistic_count++;
- if (statistic_count > 5) {
- arv_statistic_fill (thread_data->statistic,
- 0, (current_time_us - last_time_us),
- buffer->frame_id);
- arv_statistic_fill (thread_data->statistic,
- 1,
- (buffer->timestamp_ns - last_timestamp_ns) /
- 1000, buffer->frame_id);
- }
- last_time_us = current_time_us;
- last_timestamp_ns = buffer->timestamp_ns;
- g_async_queue_push (thread_data->output_queue, buffer);
- buffer = NULL;
- }
+ _process_data_trailer (thread_data, &state);
break;
}
}
} while (!thread_data->cancel);
- if (buffer != NULL) {
- buffer->status = ARV_BUFFER_STATUS_ABORTED;
- g_async_queue_push (thread_data->output_queue, buffer);
+ if (state.buffer != NULL) {
+ state.buffer->status = ARV_BUFFER_STATUS_ABORTED;
+ g_async_queue_push (thread_data->output_queue, state.buffer);
}
if (thread_data->callback != NULL)
thread_data->callback (thread_data->user_data, ARV_STREAM_CALLBACK_TYPE_EXIT, NULL);
+ g_free (state.packet);
+
return NULL;
}
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]