[gst-debugger] [debugserver] use GThreadedSocketService for tcp server
- From: Marcin Kolny <mkolny src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [gst-debugger] [debugserver] use GThreadedSocketService for tcp server
- Date: Thu, 5 Oct 2017 20:51:31 +0000 (UTC)
commit 50455a3a4b5cdb5c061967e185e218811d2f6595
Author: Marcin Kolny <marcin kolny gmail com>
Date: Thu Oct 5 21:48:59 2017 +0100
[debugserver] use GThreadedSocketService for tcp server
src/debugserver/gstdebugserver.c | 6 --
src/debugserver/gstdebugservertcp.c | 156 ++++++++++++++++++-----------------
src/debugserver/gstdebugservertcp.h | 6 +-
3 files changed, 83 insertions(+), 85 deletions(-)
---
diff --git a/src/debugserver/gstdebugserver.c b/src/debugserver/gstdebugserver.c
index de28d96..e9179a7 100644
--- a/src/debugserver/gstdebugserver.c
+++ b/src/debugserver/gstdebugserver.c
@@ -54,12 +54,6 @@ G_DEFINE_TYPE_WITH_CODE (GstDebugserverTracer, gst_debugserver_tracer,
#define DEFAULT_PORT 8080
-enum
-{
- PROP_0,
- PROP_PORT
-};
-
static void gst_debugserver_command_handler (GstDebugger__Command * command,
gpointer debugtracer, TcpClient * client);
diff --git a/src/debugserver/gstdebugservertcp.c b/src/debugserver/gstdebugservertcp.c
index ac7ffad..fb5864f 100644
--- a/src/debugserver/gstdebugservertcp.c
+++ b/src/debugserver/gstdebugservertcp.c
@@ -35,37 +35,51 @@ GST_DEBUG_CATEGORY_STATIC (gst_debugserver_tcp);
G_DEFINE_TYPE_WITH_CODE (GstDebugserverTcp, gst_debugserver_tcp,
G_TYPE_OBJECT, _do_init)
-static void
-gst_debugserver_tcp_finalize (GObject * tcp);
-
static gboolean
-gst_debugserver_tcp_incoming_callback (GSocketService * service,
+gst_debugserver_tcp_run (GThreadedSocketService * service,
GSocketConnection * connection, GObject * source_object, gpointer user_data);
-static gpointer
-gst_debugserver_tcp_handle_client (gpointer user_data);
-
static TcpClient*
gst_debugserver_tcp_add_client (GstDebugserverTcp * tcp, GSocketConnection * connection);
static gboolean
gst_debugserver_tcp_send_packet_to_all_clients (GstDebugserverTcp * tcp, GstDebugger__GStreamerData *
gst_data);
+
static void
-gst_debugserver_tcp_class_init (GstDebugserverTcpClass * klass)
+gst_debugserver_tcp_finalize (GObject * obj)
{
- GObjectClass *gobject_class;
+ GstDebugserverTcp * tcp = GST_DEBUGSERVER_TCP (obj);
- gobject_class = G_OBJECT_CLASS (klass);
- gobject_class->finalize = gst_debugserver_tcp_finalize;
+ if (tcp->service == NULL) {
+ return;
+ }
+
+ g_socket_service_stop (tcp->service);
+
+ g_mutex_lock (&tcp->clients_mutex);
+ while (tcp->clients) {
+ TcpClient* client = tcp->clients->data;
+ g_cancellable_cancel (client->cancel);
+ gint64 end_time = g_get_monotonic_time () + 5 * G_TIME_SPAN_MILLISECOND;
+ g_cond_wait_until (&tcp->client_removed_cond, &tcp->clients_mutex, end_time);
+ }
+
+ g_mutex_unlock (&tcp->clients_mutex);
+
+ g_socket_listener_close (G_SOCKET_LISTENER (tcp->service));
+ tcp->service = NULL;
+ g_mutex_clear (&tcp->clients_mutex);
+ g_cond_clear (&tcp->client_removed_cond);
}
static void
-gst_debugserver_tcp_finalize (GObject * obj)
+gst_debugserver_tcp_class_init (GstDebugserverTcpClass * klass)
{
- GstDebugserverTcp * tcp = GST_DEBUGSERVER_TCP (obj);
- gst_debugserver_tcp_stop_server (tcp);
- g_slist_free_full (tcp->clients, g_free);
+ GObjectClass *gobject_class;
+
+ gobject_class = G_OBJECT_CLASS (klass);
+ gobject_class->finalize = gst_debugserver_tcp_finalize;
}
static void
@@ -76,6 +90,8 @@ gst_debugserver_tcp_init (GstDebugserverTcp * self)
self->client_disconnected_handler = NULL;
self->command_handler = NULL;
self->owner = NULL;
+ g_mutex_init (&self->clients_mutex);
+ g_cond_init (&self->client_removed_cond);
}
GstDebugserverTcp * gst_debugserver_tcp_new (void)
@@ -90,7 +106,7 @@ gst_debugserver_tcp_start_server (GstDebugserverTcp * tcp, guint port)
{
GError *error = NULL;
- tcp->service = g_socket_service_new ();
+ tcp->service = g_threaded_socket_service_new (10); // TODO expose to config
g_socket_listener_add_inet_port ((GSocketListener *) tcp->service,
port, NULL, &error);
@@ -102,109 +118,95 @@ gst_debugserver_tcp_start_server (GstDebugserverTcp * tcp, guint port)
}
g_signal_connect (tcp->service,
- "incoming", G_CALLBACK (gst_debugserver_tcp_incoming_callback), tcp);
+ "run", G_CALLBACK (gst_debugserver_tcp_run), tcp);
g_socket_service_start (tcp->service);
return TRUE;
}
-void
-gst_debugserver_tcp_stop_server (GstDebugserverTcp * tcp)
-{
- if (tcp->service != NULL) {
- g_socket_service_stop (tcp->service);
- tcp->service = NULL;
- }
-}
-
static gboolean
-gst_debugserver_tcp_incoming_callback (GSocketService * service,
+gst_debugserver_tcp_run (GThreadedSocketService * service,
GSocketConnection * connection, GObject * source_object, gpointer user_data)
{
- GArray *user_data_array = g_array_new (FALSE, FALSE, sizeof (gpointer));
- TcpClient *client = gst_debugserver_tcp_add_client (GST_DEBUGSERVER_TCP (user_data), connection);
-
- g_array_insert_val (user_data_array, 0, client);
- g_array_insert_val (user_data_array, 1, user_data);
- g_object_ref (connection);
- g_thread_new ("connection",
- (GThreadFunc) gst_debugserver_tcp_handle_client, user_data_array);
-
- return TRUE;
-}
-
-static TcpClient*
-gst_debugserver_tcp_add_client (GstDebugserverTcp * tcp, GSocketConnection * connection)
-{
- TcpClient *client = (TcpClient*) g_malloc (sizeof (TcpClient));
-
- g_mutex_init (&client->mutex);
- client->connection = connection;
- tcp->clients = g_slist_append (tcp->clients, client);
+ GstDebugserverTcp *self = GST_DEBUGSERVER_TCP (user_data);
+ TcpClient *client = gst_debugserver_tcp_add_client (self, connection);
- return client;
-}
-
-static gpointer
-gst_debugserver_tcp_handle_client (gpointer user_data)
-{
- GArray *user_data_array = (GArray *) user_data;
- TcpClient *client = g_array_index (user_data_array, TcpClient *, 0);
- GstDebugserverTcp *self = g_array_index (user_data_array, GstDebugserverTcp*, 1);
- GInputStream *istream;
gchar buffer[1024];
gint size;
GstDebugger__Command *command;
- g_array_unref (user_data_array);
-
- istream = g_io_stream_get_input_stream (G_IO_STREAM (client->connection));
+ GInputStream *istream = g_io_stream_get_input_stream (G_IO_STREAM (client->connection));
GST_DEBUG_OBJECT (self, "Received connection from client!\n");
- while ((size = gst_debugger_protocol_utils_read_header (istream, NULL)) > 0) {
- assert (size <= 1024); // todo max message size in global file
- GST_DEBUG_OBJECT (self, "Received message of size: %d\n", size);
- gst_debugger_protocol_utils_read_requested_size (istream, size, buffer, NULL);
- command = gst_debugger__command__unpack (NULL, size, (uint8_t*) buffer);
- if (command == NULL) {
- g_print ("error unpacking incoming message\n");
- continue;
- }
+ while ((size = gst_debugger_protocol_utils_read_header (istream, client->cancel)) > 0) {
+ assert (size <= 1024); // todo max message size in global file
+ GST_DEBUG_OBJECT (self, "Received message of size: %d\n", size);
+ gst_debugger_protocol_utils_read_requested_size (istream, size, buffer, NULL);
+ command = gst_debugger__command__unpack (NULL, size, (uint8_t*) buffer);
+ if (command == NULL) {
+ g_print ("error unpacking incoming message\n");
+ continue;
+ }
- if (self->command_handler != NULL) {
- self->command_handler (command, self->owner, client);
- }
+ if (self->command_handler != NULL) {
+ self->command_handler (command, self->owner, client);
+ }
- gst_debugger__command__free_unpacked (command, NULL);
- }
+ gst_debugger__command__free_unpacked (command, NULL);
+ }
+ g_mutex_lock (&self->clients_mutex);
self->clients = g_slist_remove (self->clients, client);
+ g_mutex_unlock (&self->clients_mutex);
if (self->client_disconnected_handler)
self->client_disconnected_handler (client, self->owner);
+ g_mutex_clear (&client->mutex);
+ g_object_unref (client->cancel);
g_free (client);
GST_LOG_OBJECT (self, "Client disconnected");
- return NULL;
+ g_cond_signal (&self->client_removed_cond);
+
+ return FALSE;
+}
+
+static TcpClient*
+gst_debugserver_tcp_add_client (GstDebugserverTcp * tcp, GSocketConnection * connection)
+{
+ TcpClient *client = (TcpClient*) g_malloc (sizeof (TcpClient));
+
+ g_mutex_init (&client->mutex);
+ client->connection = connection;
+ client->cancel = g_cancellable_new ();
+
+ g_mutex_lock(&tcp->clients_mutex);
+ tcp->clients = g_slist_append (tcp->clients, client);
+ g_mutex_unlock(&tcp->clients_mutex);
+
+ return client;
}
TcpClient*
gst_debugserver_tcp_find_client (GstDebugserverTcp * tcp, GSocketConnection * connection)
{
+ g_mutex_lock(&tcp->clients_mutex);
GSList *client_list = tcp->clients;
TcpClient *client;
while (client_list != NULL) {
client = (TcpClient*) client_list->data;
if (client->connection == connection) {
+ g_mutex_unlock(&tcp->clients_mutex);
return client;
}
client_list = g_slist_next (client_list);
}
+ g_mutex_unlock(&tcp->clients_mutex);
return NULL;
}
@@ -267,15 +269,17 @@ gst_debugserver_tcp_send_packet (GstDebugserverTcp * tcp, TcpClient * client,
static gboolean
gst_debugserver_tcp_send_packet_to_all_clients (GstDebugserverTcp * tcp, GstDebugger__GStreamerData *
gst_data)
{
- GSList *clients = tcp->clients;
TcpClient *client;
gboolean ret = TRUE;
+ g_mutex_lock (&tcp->clients_mutex);
+ GSList *clients = tcp->clients;
while (clients != NULL) {
client = (TcpClient*)clients->data;
ret = ret && gst_debugserver_tcp_send_packet (tcp, client, gst_data);
clients = clients->next;
}
+ g_mutex_unlock (&tcp->clients_mutex);
return ret;
}
diff --git a/src/debugserver/gstdebugservertcp.h b/src/debugserver/gstdebugservertcp.h
index 6d14c59..96276b6 100644
--- a/src/debugserver/gstdebugservertcp.h
+++ b/src/debugserver/gstdebugservertcp.h
@@ -41,6 +41,7 @@ G_BEGIN_DECLS
typedef struct _TcpClient {
GSocketConnection * connection;
GMutex mutex;
+ GCancellable * cancel;
} TcpClient;
typedef void (*GstDebugserverTcpHandleCommandFunction)
@@ -61,8 +62,9 @@ struct _GstDebugserverTcp {
/*< private >*/
GSocketService * service;
- guint port;
GSList * clients;
+ GMutex clients_mutex;
+ GCond client_removed_cond;
};
struct _GstDebugserverTcpClass
@@ -76,8 +78,6 @@ GstDebugserverTcp * gst_debugserver_tcp_new (void);
gboolean gst_debugserver_tcp_start_server (GstDebugserverTcp * tcp, guint port);
-void gst_debugserver_tcp_stop_server (GstDebugserverTcp * tcp);
-
gboolean gst_debugserver_tcp_send_packet (GstDebugserverTcp * tcp, TcpClient * client,
GstDebugger__GStreamerData * gst_data);
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]