[aravis/wip/emmanuel/wakeup: 2/2] stream: new delete_buffers API



commit 5f4e27ff6cd709908c06c860021acce654d7b21a
Author: Emmanuel Pacaud <emmanuel gnome org>
Date:   Mon May 4 21:01:37 2015 +0200

    stream: new delete_buffers API

 src/arvgvstream.c |  176 ++++++++++++++++++++++++++++++++--------------------
 src/arvstream.c   |   38 +++++++++++
 src/arvstream.h   |    2 +
 3 files changed, 148 insertions(+), 68 deletions(-)
---
diff --git a/src/arvgvstream.c b/src/arvgvstream.c
index fbd7c23..114ec7f 100644
--- a/src/arvgvstream.c
+++ b/src/arvgvstream.c
@@ -28,6 +28,7 @@
 #include <arvgvstream.h>
 #include <arvstreamprivate.h>
 #include <arvbufferprivate.h>
+#include <arvwakeupprivate.h>
 #include <arvgvsp.h>
 #include <arvgvcp.h>
 #include <arvdebug.h>
@@ -101,7 +102,10 @@ typedef struct {
        guint64 timestamp_tick_frequency;
        guint data_size;
 
-       gboolean cancel;
+       ArvWakeup *cancel;
+       GCond cancel_cond;
+       GMutex cancel_mutex;
+       gboolean exit_on_cancel;
 
        guint16 packet_id;
 
@@ -584,8 +588,8 @@ arv_gv_stream_thread (void *data)
        guint32 frame_id;
        GTimeVal current_time;
        guint64 time_us;
-       GPollFD poll_fd;
-       size_t read_count;
+       GPollFD poll_fd[2];
+       gssize read_count;
        int timeout_ms;
        int n_events;
        int i;
@@ -601,9 +605,11 @@ arv_gv_stream_thread (void *data)
        if (thread_data->callback != NULL)
                thread_data->callback (thread_data->user_data, ARV_STREAM_CALLBACK_TYPE_INIT, NULL);
 
-       poll_fd.fd = g_socket_get_fd (thread_data->socket);
-       poll_fd.events =  G_IO_IN;
-       poll_fd.revents = 0;
+       poll_fd[0].fd = g_socket_get_fd (thread_data->socket);
+       poll_fd[0].events =  G_IO_IN;
+       poll_fd[0].revents = 0;
+
+       arv_wakeup_get_pollfd (thread_data->cancel, &poll_fd[1]);
 
        packet = g_malloc0 (ARV_GV_STREAM_INCOMING_BUFFER_SIZE);
 
@@ -613,81 +619,94 @@ arv_gv_stream_thread (void *data)
                else
                        timeout_ms = ARV_GV_STREAM_POLL_TIMEOUT_US / 1000;
 
-               n_events = g_poll (&poll_fd, 1, timeout_ms);
+               n_events = g_poll (poll_fd, 2, timeout_ms);
 
                g_get_current_time (&current_time);
                time_us = current_time.tv_sec * 1000000 + current_time.tv_usec;
 
                if (n_events > 0) {
-                       thread_data->n_received_packets++;
-
-                       read_count = g_socket_receive (thread_data->socket, (char *) packet,
-                                                      ARV_GV_STREAM_INCOMING_BUFFER_SIZE, NULL, NULL);
+                       read_count = g_socket_receive_with_blocking (thread_data->socket, (char *) packet,
+                                                                    ARV_GV_STREAM_INCOMING_BUFFER_SIZE, 
FALSE, NULL, NULL);
 
-                       frame_id = arv_gvsp_packet_get_frame_id (packet);
-                       packet_id = arv_gvsp_packet_get_packet_id (packet);
+                       if (read_count > 0) {
+                               thread_data->n_received_packets++;
 
-                       if (first_packet) {
-                               thread_data->last_frame_id = frame_id - 1;
-                               first_packet = FALSE;
-                       }
+                               frame_id = arv_gvsp_packet_get_frame_id (packet);
+                               packet_id = arv_gvsp_packet_get_packet_id (packet);
 
-                       frame = _find_frame_data (thread_data, frame_id, packet, packet_id, read_count, 
time_us);
-
-                       if (frame != NULL) {
-                               ArvGvspPacketType packet_type = arv_gvsp_packet_get_packet_type (packet);
-
-                               if (packet_type != ARV_GVSP_PACKET_TYPE_OK &&
-                                   packet_type != ARV_GVSP_PACKET_TYPE_RESEND) {
-                                       arv_debug_stream_thread ("[GvStream::stream_thread]"
-                                                                " Error packet at dt = %" G_GINT64_FORMAT ", 
packet id = %u"
-                                                                " frame id = %u",
-                                                                time_us - frame->first_packet_time_us,
-                                                                packet_id, frame->frame_id);
-                                       arv_gvsp_packet_debug (packet, read_count, ARV_DEBUG_LEVEL_DEBUG);
-                                       frame->error_packet_received = TRUE;
-
-                                       thread_data->n_error_packets++;
-                               } else {
-                                       /* Check for duplicated packets */
-                                       if (packet_id < frame->n_packets) {
-                                               if (frame->packet_data[packet_id].received)
-                                                       thread_data->n_duplicated_packets++;
-                                               else
-                                                       frame->packet_data[packet_id].received = TRUE;
-                                       }
+                               if (first_packet) {
+                                       thread_data->last_frame_id = frame_id - 1;
+                                       first_packet = FALSE;
+                               }
 
-                                       /* Keep track of last packet of a continuous block starting from 
packet 0 */
-                                       for (i = frame->last_valid_packet + 1; i < frame->n_packets; i++)
-                                               if (!frame->packet_data[i].received)
-                                                       break;
-                                       frame->last_valid_packet = i - 1;
-
-                                       switch (arv_gvsp_packet_get_content_type (packet)) {
-                                               case ARV_GVSP_CONTENT_TYPE_DATA_LEADER:
-                                                       _process_data_leader (thread_data, frame, packet, 
packet_id);
-                                                       break;
-                                               case ARV_GVSP_CONTENT_TYPE_DATA_BLOCK:
-                                                       _process_data_block (thread_data, frame, packet, 
packet_id,
-                                                                            read_count);
-                                                       break;
-                                               case ARV_GVSP_CONTENT_TYPE_DATA_TRAILER:
-                                                       _process_data_trailer (thread_data, frame, packet, 
packet_id);
-                                                       break;
-                                               default:
-                                                       thread_data->n_ignored_packets++;
-                                                       break;
+                               frame = _find_frame_data (thread_data, frame_id, packet, packet_id, 
read_count, time_us);
+
+                               if (frame != NULL) {
+                                       ArvGvspPacketType packet_type = arv_gvsp_packet_get_packet_type 
(packet);
+
+                                       if (packet_type != ARV_GVSP_PACKET_TYPE_OK &&
+                                           packet_type != ARV_GVSP_PACKET_TYPE_RESEND) {
+                                               arv_debug_stream_thread ("[GvStream::stream_thread]"
+                                                                        " Error packet at dt = %" 
G_GINT64_FORMAT ", packet id = %u"
+                                                                        " frame id = %u",
+                                                                        time_us - 
frame->first_packet_time_us,
+                                                                        packet_id, frame->frame_id);
+                                               arv_gvsp_packet_debug (packet, read_count, 
ARV_DEBUG_LEVEL_DEBUG);
+                                               frame->error_packet_received = TRUE;
+
+                                               thread_data->n_error_packets++;
+                                       } else {
+                                               /* Check for duplicated packets */
+                                               if (packet_id < frame->n_packets) {
+                                                       if (frame->packet_data[packet_id].received)
+                                                               thread_data->n_duplicated_packets++;
+                                                       else
+                                                               frame->packet_data[packet_id].received = TRUE;
+                                               }
+
+                                               /* Keep track of last packet of a continuous block starting 
from packet 0 */
+                                               for (i = frame->last_valid_packet + 1; i < frame->n_packets; 
i++)
+                                                       if (!frame->packet_data[i].received)
+                                                               break;
+                                               frame->last_valid_packet = i - 1;
+
+                                               switch (arv_gvsp_packet_get_content_type (packet)) {
+                                                       case ARV_GVSP_CONTENT_TYPE_DATA_LEADER:
+                                                               _process_data_leader (thread_data, frame, 
packet, packet_id);
+                                                               break;
+                                                       case ARV_GVSP_CONTENT_TYPE_DATA_BLOCK:
+                                                               _process_data_block (thread_data, frame, 
packet, packet_id,
+                                                                                    read_count);
+                                                               break;
+                                                       case ARV_GVSP_CONTENT_TYPE_DATA_TRAILER:
+                                                               _process_data_trailer (thread_data, frame, 
packet, packet_id);
+                                                               break;
+                                                       default:
+                                                               thread_data->n_ignored_packets++;
+                                                               break;
+                                               }
+
+                                               _missing_packet_check (thread_data, frame, packet_id, 
time_us);
                                        }
-
-                                       _missing_packet_check (thread_data, frame, packet_id, time_us);
-                               }
+                               } else
+                                       thread_data->n_ignored_packets++;
                        } else
-                               thread_data->n_ignored_packets++;
+                               frame = NULL;
+                       
+                       if (read_count < 1 || n_events > 1) {
+                               arv_wakeup_acknowledge (thread_data->cancel);
+                       }
                } else
                        frame = NULL;
 
                _check_frame_completion (thread_data, time_us, frame);
-       } while (!thread_data->cancel);
+
+               if (thread_data->frames == NULL) {
+                       g_mutex_lock (&thread_data->cancel_mutex);
+                       g_cond_signal (&thread_data->cancel_cond);
+                       g_mutex_unlock (&thread_data->cancel_mutex);
+               }
+       } while (!thread_data->exit_on_cancel);
 
        _flush_frames (thread_data);
 
@@ -699,6 +718,19 @@ arv_gv_stream_thread (void *data)
        return NULL;
 }
 
+static void
+_release_buffers (ArvStream *stream)
+{
+       ArvGvStream *gv_stream = ARV_GV_STREAM (stream);
+       ArvGvStreamThreadData *thread_data = gv_stream->priv->thread_data;
+
+       arv_wakeup_signal (thread_data->cancel);
+
+       g_mutex_lock (&thread_data->cancel_mutex);
+       g_cond_wait (&thread_data->cancel_cond, &thread_data->cancel_mutex);
+       g_mutex_unlock (&thread_data->cancel_mutex);
+}
+
 /* ArvGvStream implemenation */
 
 guint16
@@ -767,7 +799,10 @@ arv_gv_stream_new (GInetAddress *device_address, guint16 port,
        thread_data->frame_retention_us = ARV_GV_STREAM_FRAME_RETENTION_US_DEFAULT;
        thread_data->timestamp_tick_frequency = timestamp_tick_frequency;
        thread_data->data_size = packet_size - ARV_GVSP_PACKET_PROTOCOL_OVERHEAD;
-       thread_data->cancel = FALSE;
+       thread_data->cancel = arv_wakeup_new ();
+       g_mutex_init (&thread_data->cancel_mutex);
+       g_cond_init (&thread_data->cancel_cond);
+       thread_data->exit_on_cancel = FALSE;
 
        thread_data->packet_id = 65300;
        thread_data->last_frame_id = 0;
@@ -920,8 +955,12 @@ arv_gv_stream_finalize (GObject *object)
 
                thread_data = gv_stream->priv->thread_data;
 
-               thread_data->cancel = TRUE;
+               thread_data->exit_on_cancel = TRUE;
+               arv_wakeup_signal (thread_data->cancel);
+
                g_thread_join (gv_stream->priv->thread);
+               
+               g_clear_pointer (&thread_data->cancel, arv_wakeup_free);
 
                g_object_unref (thread_data->device_address);
 
@@ -988,6 +1027,7 @@ arv_gv_stream_class_init (ArvGvStreamClass *gv_stream_class)
        object_class->set_property = arv_gv_stream_set_property;
        object_class->get_property = arv_gv_stream_get_property;
 
+       stream_class->release_buffers = _release_buffers;
        stream_class->get_statistics = _get_statistics;
 
        g_object_class_install_property (
diff --git a/src/arvstream.c b/src/arvstream.c
index 2b643eb..e04572f 100644
--- a/src/arvstream.c
+++ b/src/arvstream.c
@@ -216,6 +216,44 @@ arv_stream_get_n_buffers (ArvStream *stream, gint *n_input_buffers, gint *n_outp
 }
 
 /**
+ * arv_stream_delete_buffers:
+ * @stream: a #ArvStream
+ *
+ * Free all buffers in input and output queues.
+ *
+ * Since: 0.3.8
+ */
+
+void
+arv_stream_delete_buffers (ArvStream *stream)
+{
+       ArvStreamClass *stream_class;
+       unsigned n_deleted = 0;
+
+       g_return_if_fail (ARV_IS_STREAM (stream));
+
+       g_async_queue_lock (stream->priv->input_queue);
+       while (g_async_queue_length_unlocked (stream->priv->input_queue) > 0) {
+               g_object_unref (g_async_queue_pop_unlocked (stream->priv->input_queue));
+               n_deleted++;
+       }
+       g_async_queue_unlock (stream->priv->input_queue);
+
+       stream_class = ARV_STREAM_GET_CLASS (stream);
+       if (stream_class->release_buffers)
+               stream_class->release_buffers (stream);
+
+       g_async_queue_lock (stream->priv->output_queue);
+       while (g_async_queue_length_unlocked (stream->priv->output_queue) > 0) {
+               g_object_unref (g_async_queue_pop_unlocked (stream->priv->output_queue));
+               n_deleted++;
+       }
+       g_async_queue_unlock (stream->priv->output_queue);
+
+       arv_debug_stream ("[Stream::delete_buffers] Number of deleted buffers = %d", n_deleted);
+}
+
+/**
  * arv_stream_get_statistics:
  * @stream: a #ArvStream
  * @n_completed_buffers: (out) (allow-none): number of complete received buffers
diff --git a/src/arvstream.h b/src/arvstream.h
index 99a2897..6d534e7 100644
--- a/src/arvstream.h
+++ b/src/arvstream.h
@@ -65,6 +65,7 @@ struct _ArvStream {
 struct _ArvStreamClass {
        GObjectClass parent_class;
 
+       void            (*release_buffers)      (ArvStream *stream);
        void            (*get_statistics)       (ArvStream *stream, guint64 *n_completed_buffers,
                                                 guint64 *n_failures, guint64 *n_underruns);
 
@@ -81,6 +82,7 @@ ArvBuffer *   arv_stream_timeout_pop_buffer           (ArvStream *stream, guint64 timeout
 void           arv_stream_get_n_buffers                (ArvStream *stream,
                                                         gint *n_input_buffers,
                                                         gint *n_output_buffers);
+void           arv_stream_delete_buffers               (ArvStream *stream);
 void           arv_stream_get_statistics               (ArvStream *stream,
                                                         guint64 *n_completed_buffers,
                                                         guint64 *n_failures,


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]