[aravis] gv_stream: try to implement the packet resend feature.
- From: Emmanuel Pacaud <emmanuel src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [aravis] gv_stream: try to implement the packet resend feature.
- Date: Tue, 11 May 2010 14:34:05 +0000 (UTC)
commit c11b2bce26c8c04f8e9071d69e137d1833b4eacc
Author: Emmanuel Pacaud <emmanuel gnome org>
Date: Tue May 11 16:33:33 2010 +0200
gv_stream: try to implement the packet resend feature.
src/arvcameratest.c | 6 +-
src/arvgvstream.c | 163 +++++++++++++++++++++++++++++++++------------------
src/arvstream.c | 10 ++--
src/arvstream.h | 4 +-
4 files changed, 116 insertions(+), 67 deletions(-)
---
diff --git a/src/arvcameratest.c b/src/arvcameratest.c
index c7a0449..856962e 100644
--- a/src/arvcameratest.c
+++ b/src/arvcameratest.c
@@ -80,7 +80,7 @@ main (int argc, char **argv)
gint x, y, width, height;
gint dx, dy;
double exposure;
- guint64 n_processed_buffers;
+ guint64 n_completed_buffers;
guint64 n_failures;
guint64 n_underruns;
@@ -148,9 +148,9 @@ main (int argc, char **argv)
} while (!cancel);
- arv_stream_get_statistics (stream, &n_processed_buffers, &n_failures, &n_underruns);
+ arv_stream_get_statistics (stream, &n_completed_buffers, &n_failures, &n_underruns);
- g_print ("Processed buffers = %Ld\n", n_processed_buffers);
+ g_print ("Completed buffers = %Ld\n", n_completed_buffers);
g_print ("Failures = %Ld\n", n_failures);
g_print ("Underruns = %Ld\n", n_underruns);
diff --git a/src/arvgvstream.c b/src/arvgvstream.c
index e40618e..e6e1c97 100644
--- a/src/arvgvstream.c
+++ b/src/arvgvstream.c
@@ -52,9 +52,11 @@ typedef struct {
/* Statistics */
- guint n_processed_buffers;
+ guint n_completed_buffers;
guint n_failures;
guint n_underruns;
+ guint n_aborteds;
+ guint n_resent_blocks;
guint n_size_mismatch_errors;
guint n_missing_blocks;
@@ -66,11 +68,11 @@ typedef struct {
int current_socket_buffer_size;
} ArvGvStreamThreadData;
-void
-arv_gv_stream_send_packet_request (ArvGvStreamThreadData *thread_data,
- guint32 frame_id,
- guint32 first_block,
- guint32 last_block)
+static void
+_send_packet_request (ArvGvStreamThreadData *thread_data,
+ guint32 frame_id,
+ guint32 first_block,
+ guint32 last_block)
{
ArvGvcpPacket *packet;
guint32 packet_size;
@@ -78,7 +80,7 @@ arv_gv_stream_send_packet_request (ArvGvStreamThreadData *thread_data,
packet = arv_gvcp_packet_new_packet_resend_cmd (frame_id, first_block, last_block,
thread_data->packet_count++, &packet_size);
- arv_debug ("stream", "[GvStream::send_packet_request] frame_id = %d (%d - %d)",
+ arv_debug ("stream-thread", "[GvStream::send_packet_request] frame_id = %d (%d - %d)",
frame_id, first_block, last_block);
arv_gvcp_packet_debug (packet);
@@ -90,7 +92,7 @@ arv_gv_stream_send_packet_request (ArvGvStreamThreadData *thread_data,
}
static void
-arv_gv_stream_update_socket (ArvGvStreamThreadData *thread_data, ArvBuffer *buffer)
+_update_socket (ArvGvStreamThreadData *thread_data, ArvBuffer *buffer)
{
int buffer_size, fd;
@@ -115,7 +117,7 @@ arv_gv_stream_update_socket (ArvGvStreamThreadData *thread_data, ArvBuffer *buff
if (buffer_size != thread_data->current_socket_buffer_size) {
setsockopt (fd, SOL_SOCKET, SO_RCVBUF, &buffer_size, sizeof (buffer_size));
thread_data->current_socket_buffer_size = buffer_size;
- arv_debug ("stream", "[GvStream::update_socket] Socket buffer size set to %d",
+ arv_debug ("stream-thread", "[GvStream::update_socket] Socket buffer size set to %d",
buffer_size);
}
}
@@ -123,7 +125,7 @@ arv_gv_stream_update_socket (ArvGvStreamThreadData *thread_data, ArvBuffer *buff
typedef struct {
ArvGvspPacket *packet;
ArvBuffer *buffer;
- gboolean is_missing_blocks;
+ gint32 n_missing_blocks;
gint32 last_block_size;
gint32 last_block_id;
gint64 last_time_us;
@@ -132,17 +134,62 @@ typedef struct {
} ArvGvStreamThreadState;
static void
+_close_buffer (ArvGvStreamThreadData *thread_data, ArvGvStreamThreadState *state)
+{
+ GTimeVal current_time;
+ gint64 current_time_us;
+
+ if (state->buffer == NULL)
+ return;
+
+ if (state->buffer->status == ARV_BUFFER_STATUS_FILLING ||
+ state->n_missing_blocks != 0)
+ state->buffer->status = ARV_BUFFER_STATUS_MISSING_BLOCKS;
+ if (state->buffer->status == ARV_BUFFER_STATUS_SUCCESS)
+ thread_data->n_completed_buffers++;
+ else
+ thread_data->n_failures++;
+ if (state->buffer->status == ARV_BUFFER_STATUS_ABORTED)
+ thread_data->n_aborteds++;
+
+ if (thread_data->callback != NULL)
+ thread_data->callback (thread_data->user_data,
+ ARV_STREAM_CALLBACK_TYPE_BUFFER_DONE,
+ state->buffer);
+
+ g_get_current_time (¤t_time);
+ current_time_us = current_time.tv_sec * 1000000 + current_time.tv_usec;
+ if (state->statistic_count > 5) {
+ arv_statistic_fill (thread_data->statistic, 0,
+ (current_time_us - state->last_time_us),
+ state->buffer->frame_id);
+ arv_statistic_fill (thread_data->statistic, 1,
+ (state->buffer->timestamp_ns - state->last_timestamp_ns) /
+ 1000, state->buffer->frame_id);
+ } else
+ state->statistic_count++;
+
+ state->last_time_us = current_time_us;
+ state->last_timestamp_ns = state->buffer->timestamp_ns;
+ g_async_queue_push (thread_data->output_queue, state->buffer);
+ state->buffer = NULL;
+}
+
+static void
_process_data_leader (ArvGvStreamThreadData *thread_data, ArvGvStreamThreadState *state)
{
- if (state->buffer != NULL)
- g_async_queue_push (thread_data->output_queue, state->buffer);
+ if (state->buffer != NULL) {
+ state->buffer->status = ARV_BUFFER_STATUS_ABORTED;
+ _close_buffer (thread_data, state);
+ }
+
state->buffer = g_async_queue_try_pop (thread_data->input_queue);
if (state->buffer == NULL) {
thread_data->n_underruns++;
return;
}
- arv_gv_stream_update_socket (thread_data, state->buffer);
+ _update_socket (thread_data, state->buffer);
state->buffer->x_offset = arv_gvsp_packet_get_x_offset (state->packet);
state->buffer->y_offset = arv_gvsp_packet_get_y_offset (state->packet);
@@ -155,7 +202,7 @@ _process_data_leader (ArvGvStreamThreadData *thread_data, ArvGvStreamThreadState
thread_data->timestamp_tick_frequency);
state->buffer->status = ARV_BUFFER_STATUS_FILLING;
- state->is_missing_blocks = FALSE;
+ state->n_missing_blocks = 0;
state->last_block_size = 0;
state->last_block_id = 0;
}
@@ -172,16 +219,40 @@ _process_data_block (ArvGvStreamThreadData *thread_data, ArvGvStreamThreadState
state->buffer->status != ARV_BUFFER_STATUS_FILLING)
return;
+ if (state->buffer->frame_id != arv_gvsp_packet_get_frame_id (state->packet))
+ return;
+
block_id = arv_gvsp_packet_get_block_id (state->packet);
+ if (block_id <= state->last_block_id) {
+ arv_debug ("stream-thread", "[GvStream::thread] Receive resent block (%d) frame %d",
+ block_id, state->buffer->frame_id);
+
+ block_size = arv_gvsp_packet_get_data_size (read_count);
+ block_offset = state->last_block_size * (block_id - 1);
+ block_end = block_size + block_offset;
+
+ if (block_end <= state->buffer->size)
+ memcpy (state->buffer->data + block_offset, &state->packet->data, block_size);
+
+ state->n_missing_blocks--;
+ thread_data->n_resent_blocks++;
+ return;
+ }
+
if (block_id != (state->last_block_id + 1)) {
+ gint32 n_misses;
+
+ n_misses = block_id - state->last_block_id;
+
arv_gvsp_packet_debug (state->packet, read_count);
- arv_debug ("stream", "[GvStream::thread] Missing block (expected %d - %d) frame %d",
+ arv_debug ("stream-thread", "[GvStream::thread] Missing block (expected %d - %d) frame %d",
state->last_block_id + 1,
- block_id,
- arv_gvsp_packet_get_frame_id (state->packet));
- thread_data->n_failures++;
- thread_data->n_missing_blocks++;
- state->is_missing_blocks = TRUE;
+ block_id, state->buffer->frame_id);
+ thread_data->n_missing_blocks += n_misses;
+ state->n_missing_blocks += n_misses;
+
+ _send_packet_request (thread_data, state->buffer->frame_id,
+ state->last_block_id + 1, block_id - 1);
}
block_size = arv_gvsp_packet_get_data_size (read_count);
@@ -191,7 +262,6 @@ _process_data_block (ArvGvStreamThreadData *thread_data, ArvGvStreamThreadState
if (block_end > state->buffer->size) {
arv_gvsp_packet_debug (state->packet, read_count);
state->buffer->status = ARV_BUFFER_STATUS_SIZE_MISMATCH;
- thread_data->n_failures++;
thread_data->n_size_mismatch_errors++;
return;
}
@@ -207,40 +277,13 @@ _process_data_block (ArvGvStreamThreadData *thread_data, ArvGvStreamThreadState
static void
_process_data_trailer (ArvGvStreamThreadData *thread_data, ArvGvStreamThreadState *state)
{
- GTimeVal current_time;
- gint64 current_time_us;
-
if (state->buffer == NULL)
return;
- if (state->buffer->status == ARV_BUFFER_STATUS_FILLING)
- state->buffer->status = ARV_BUFFER_STATUS_SIZE_MISMATCH;
- if (state->buffer->status == ARV_BUFFER_STATUS_SUCCESS)
- thread_data->n_processed_buffers++;
- if (state->is_missing_blocks)
- state->buffer->status = ARV_BUFFER_STATUS_MISSING_BLOCKS;
-
- if (thread_data->callback != NULL)
- thread_data->callback (thread_data->user_data,
- ARV_STREAM_CALLBACK_TYPE_BUFFER_DONE,
- state->buffer);
-
- g_get_current_time (¤t_time);
- current_time_us = current_time.tv_sec * 1000000 + current_time.tv_usec;
- if (state->statistic_count > 5) {
- arv_statistic_fill (thread_data->statistic, 0,
- (current_time_us - state->last_time_us),
- state->buffer->frame_id);
- arv_statistic_fill (thread_data->statistic, 1,
- (state->buffer->timestamp_ns - state->last_timestamp_ns) /
- 1000, state->buffer->frame_id);
- } else
- state->statistic_count++;
+ if (state->n_missing_blocks != 0)
+ return;
- state->last_time_us = current_time_us;
- state->last_timestamp_ns = state->buffer->timestamp_ns;
- g_async_queue_push (thread_data->output_queue, state->buffer);
- state->buffer = NULL;
+ _close_buffer (thread_data, state);
}
static void *
@@ -262,7 +305,7 @@ arv_gv_stream_thread (void *data)
state.buffer = NULL;
state.packet = g_malloc0 (ARV_GV_STREAM_INCOMING_BUFFER_SIZE);
- state.is_missing_blocks = FALSE;
+ state.n_missing_blocks = 0;
state.last_block_size = 0;
state.last_block_id = 0;
state.statistic_count = 0;
@@ -295,7 +338,7 @@ arv_gv_stream_thread (void *data)
if (state.buffer != NULL) {
state.buffer->status = ARV_BUFFER_STATUS_ABORTED;
- g_async_queue_push (thread_data->output_queue, state.buffer);
+ _close_buffer (thread_data, &state);
}
if (thread_data->callback != NULL)
@@ -378,11 +421,13 @@ arv_gv_stream_new (GInetAddress *device_address, guint16 port,
thread_data->packet_count = 1;
- thread_data->n_processed_buffers = 0;
+ thread_data->n_completed_buffers = 0;
thread_data->n_failures = 0;
thread_data->n_underruns = 0;
thread_data->n_size_mismatch_errors = 0;
thread_data->n_missing_blocks = 0;
+ thread_data->n_resent_blocks = 0;
+ thread_data->n_aborteds = 0;
thread_data->statistic = arv_statistic_new (2, 5000, 200, 0);
@@ -404,7 +449,7 @@ arv_gv_stream_new (GInetAddress *device_address, guint16 port,
static void
arv_gv_stream_get_statistics (ArvStream *stream,
- guint64 *n_processed_buffers,
+ guint64 *n_completed_buffers,
guint64 *n_failures,
guint64 *n_underruns)
{
@@ -413,7 +458,7 @@ arv_gv_stream_get_statistics (ArvStream *stream,
thread_data = gv_stream->thread_data;
- *n_processed_buffers = thread_data->n_processed_buffers;
+ *n_completed_buffers = thread_data->n_completed_buffers;
*n_failures = thread_data->n_failures;
*n_underruns = thread_data->n_underruns;
}
@@ -435,15 +480,19 @@ arv_gv_stream_finalize (GObject *object)
thread_data = gv_stream->thread_data;
arv_debug ("stream",
- "[GvStream::finalize] n_processed_buffers = %d", thread_data->n_processed_buffers);
+ "[GvStream::finalize] n_completed_buffers = %d", thread_data->n_completed_buffers);
arv_debug ("stream",
"[GvStream::finalize] n_failures = %d", thread_data->n_failures);
arv_debug ("stream",
+ "[GvStream::finalize] n_aborteds = %d", thread_data->n_aborteds);
+ arv_debug ("stream",
"[GvStream::finalize] n_underruns = %d", thread_data->n_underruns);
arv_debug ("stream",
"[GvStream::finalize] n_size_mismatch_errors = %d", thread_data->n_size_mismatch_errors);
arv_debug ("stream",
"[GvStream::finalize] n_missing_blocks = %d", thread_data->n_missing_blocks);
+ arv_debug ("stream",
+ "[GvStream::finalize] n_resent_blocks = %d", thread_data->n_resent_blocks);
thread_data->cancel = TRUE;
g_thread_join (gv_stream->thread);
diff --git a/src/arvstream.c b/src/arvstream.c
index be01763..104414a 100644
--- a/src/arvstream.c
+++ b/src/arvstream.c
@@ -53,21 +53,21 @@ arv_stream_get_n_available_buffers (ArvStream *stream)
void
arv_stream_get_statistics (ArvStream *stream,
- guint64 *n_processed_buffers,
+ guint64 *n_completed_buffers,
guint64 *n_failures,
guint64 *n_underruns)
{
ArvStreamClass *stream_class;
guint64 dummy;
- if (n_processed_buffers == NULL)
- n_processed_buffers = &dummy;
+ if (n_completed_buffers == NULL)
+ n_completed_buffers = &dummy;
if (n_failures == NULL)
n_failures = &dummy;
if (n_underruns == NULL)
n_underruns = &dummy;
- *n_processed_buffers = 0;
+ *n_completed_buffers = 0;
*n_failures = 0;
*n_underruns = 0;
@@ -75,7 +75,7 @@ arv_stream_get_statistics (ArvStream *stream,
stream_class = ARV_STREAM_GET_CLASS (stream);
if (stream_class->get_statistics != NULL)
- stream_class->get_statistics (stream, n_processed_buffers, n_failures, n_underruns);
+ stream_class->get_statistics (stream, n_completed_buffers, n_failures, n_underruns);
}
static void
diff --git a/src/arvstream.h b/src/arvstream.h
index 2cb89ce..063ee0b 100644
--- a/src/arvstream.h
+++ b/src/arvstream.h
@@ -55,7 +55,7 @@ struct _ArvStream {
struct _ArvStreamClass {
GObjectClass parent_class;
- void (*get_statistics) (ArvStream *stream, guint64 *n_processed_buffers,
+ void (*get_statistics) (ArvStream *stream, guint64 *n_completed_buffers,
guint64 *n_failures, guint64 *n_underruns);
};
@@ -65,7 +65,7 @@ void arv_stream_push_buffer (ArvStream *stream, ArvBuffer *buffer);
ArvBuffer * arv_stream_pop_buffer (ArvStream *stream);
int arv_stream_get_n_available_buffers (ArvStream *stream);
void arv_stream_get_statistics (ArvStream *stream,
- guint64 *n_processed_buffers,
+ guint64 *n_completed_buffers,
guint64 *n_failures,
guint64 *n_underruns);
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]