[gst-debugger] [debugserver] use GThreadedSocketService for tcp server



commit 50455a3a4b5cdb5c061967e185e218811d2f6595
Author: Marcin Kolny <marcin kolny gmail com>
Date:   Thu Oct 5 21:48:59 2017 +0100

    [debugserver] use GThreadedSocketService for tcp server

 src/debugserver/gstdebugserver.c    |    6 --
 src/debugserver/gstdebugservertcp.c |  156 ++++++++++++++++++-----------------
 src/debugserver/gstdebugservertcp.h |    6 +-
 3 files changed, 83 insertions(+), 85 deletions(-)
---
diff --git a/src/debugserver/gstdebugserver.c b/src/debugserver/gstdebugserver.c
index de28d96..e9179a7 100644
--- a/src/debugserver/gstdebugserver.c
+++ b/src/debugserver/gstdebugserver.c
@@ -54,12 +54,6 @@ G_DEFINE_TYPE_WITH_CODE (GstDebugserverTracer, gst_debugserver_tracer,
 
 #define DEFAULT_PORT 8080
 
-enum
-{
-  PROP_0,
-  PROP_PORT
-};
-
 static void gst_debugserver_command_handler (GstDebugger__Command * command,
     gpointer debugtracer, TcpClient * client);
 
diff --git a/src/debugserver/gstdebugservertcp.c b/src/debugserver/gstdebugservertcp.c
index ac7ffad..fb5864f 100644
--- a/src/debugserver/gstdebugservertcp.c
+++ b/src/debugserver/gstdebugservertcp.c
@@ -35,37 +35,51 @@ GST_DEBUG_CATEGORY_STATIC (gst_debugserver_tcp);
 G_DEFINE_TYPE_WITH_CODE (GstDebugserverTcp, gst_debugserver_tcp,
     G_TYPE_OBJECT, _do_init)
 
-static void
-gst_debugserver_tcp_finalize (GObject * tcp);
-
 static gboolean
-gst_debugserver_tcp_incoming_callback (GSocketService * service,
+gst_debugserver_tcp_run (GThreadedSocketService * service,
     GSocketConnection * connection, GObject * source_object, gpointer user_data);
 
-static gpointer
-gst_debugserver_tcp_handle_client (gpointer user_data);
-
 static TcpClient*
 gst_debugserver_tcp_add_client (GstDebugserverTcp * tcp, GSocketConnection * connection);
 
 static gboolean
 gst_debugserver_tcp_send_packet_to_all_clients (GstDebugserverTcp * tcp, GstDebugger__GStreamerData * 
gst_data);
 
+
 static void
-gst_debugserver_tcp_class_init (GstDebugserverTcpClass * klass)
+gst_debugserver_tcp_finalize (GObject * obj)
 {
-  GObjectClass *gobject_class;
+  GstDebugserverTcp * tcp = GST_DEBUGSERVER_TCP (obj);
 
-  gobject_class = G_OBJECT_CLASS (klass);
-  gobject_class->finalize = gst_debugserver_tcp_finalize;
+  if (tcp->service == NULL) {
+    return;
+  }
+
+  g_socket_service_stop (tcp->service);
+
+  g_mutex_lock (&tcp->clients_mutex);
+  while (tcp->clients) {
+    TcpClient* client = tcp->clients->data;
+    g_cancellable_cancel (client->cancel);
+    gint64 end_time = g_get_monotonic_time () + 5 * G_TIME_SPAN_MILLISECOND;
+    g_cond_wait_until (&tcp->client_removed_cond, &tcp->clients_mutex, end_time);
+  }
+
+  g_mutex_unlock (&tcp->clients_mutex);
+
+  g_socket_listener_close (G_SOCKET_LISTENER (tcp->service));
+  tcp->service = NULL;
+  g_mutex_clear (&tcp->clients_mutex);
+  g_cond_clear (&tcp->client_removed_cond);
 }
 
 static void
-gst_debugserver_tcp_finalize (GObject * obj)
+gst_debugserver_tcp_class_init (GstDebugserverTcpClass * klass)
 {
-  GstDebugserverTcp * tcp = GST_DEBUGSERVER_TCP (obj);
-  gst_debugserver_tcp_stop_server (tcp);
-  g_slist_free_full (tcp->clients, g_free);
+  GObjectClass *gobject_class;
+
+  gobject_class = G_OBJECT_CLASS (klass);
+  gobject_class->finalize = gst_debugserver_tcp_finalize;
 }
 
 static void
@@ -76,6 +90,8 @@ gst_debugserver_tcp_init (GstDebugserverTcp * self)
   self->client_disconnected_handler = NULL;
   self->command_handler = NULL;
   self->owner = NULL;
+  g_mutex_init (&self->clients_mutex);
+  g_cond_init (&self->client_removed_cond);
 }
 
 GstDebugserverTcp * gst_debugserver_tcp_new (void)
@@ -90,7 +106,7 @@ gst_debugserver_tcp_start_server (GstDebugserverTcp * tcp, guint port)
 {
   GError *error = NULL;
 
-  tcp->service = g_socket_service_new ();
+  tcp->service = g_threaded_socket_service_new (10); // TODO expose to config
 
   g_socket_listener_add_inet_port ((GSocketListener *) tcp->service,
       port, NULL, &error);
@@ -102,109 +118,95 @@ gst_debugserver_tcp_start_server (GstDebugserverTcp * tcp, guint port)
   }
 
   g_signal_connect (tcp->service,
-      "incoming", G_CALLBACK (gst_debugserver_tcp_incoming_callback), tcp);
+      "run", G_CALLBACK (gst_debugserver_tcp_run), tcp);
 
   g_socket_service_start (tcp->service);
 
   return TRUE;
 }
 
-void
-gst_debugserver_tcp_stop_server (GstDebugserverTcp * tcp)
-{
-  if (tcp->service != NULL) {
-    g_socket_service_stop (tcp->service);
-    tcp->service = NULL;
-  }
-}
-
 static gboolean
-gst_debugserver_tcp_incoming_callback (GSocketService * service,
+gst_debugserver_tcp_run (GThreadedSocketService * service,
     GSocketConnection * connection, GObject * source_object, gpointer user_data)
 {
-  GArray *user_data_array = g_array_new (FALSE, FALSE, sizeof (gpointer));
-  TcpClient *client = gst_debugserver_tcp_add_client (GST_DEBUGSERVER_TCP (user_data), connection);
-
-  g_array_insert_val (user_data_array, 0, client);
-  g_array_insert_val (user_data_array, 1, user_data);
-  g_object_ref (connection);
-  g_thread_new ("connection",
-      (GThreadFunc) gst_debugserver_tcp_handle_client, user_data_array);
-
-  return TRUE;
-}
-
-static TcpClient*
-gst_debugserver_tcp_add_client (GstDebugserverTcp * tcp, GSocketConnection * connection)
-{
-  TcpClient *client = (TcpClient*) g_malloc (sizeof (TcpClient));
-
-  g_mutex_init (&client->mutex);
-  client->connection = connection;
-  tcp->clients = g_slist_append (tcp->clients, client);
+  GstDebugserverTcp *self = GST_DEBUGSERVER_TCP (user_data);
+  TcpClient *client = gst_debugserver_tcp_add_client (self, connection);
 
-  return client;
-}
-
-static gpointer
-gst_debugserver_tcp_handle_client (gpointer user_data)
-{
-  GArray *user_data_array = (GArray *) user_data;
-  TcpClient *client = g_array_index (user_data_array, TcpClient *, 0);
-  GstDebugserverTcp *self = g_array_index (user_data_array, GstDebugserverTcp*, 1);
-  GInputStream *istream;
   gchar buffer[1024];
   gint size;
   GstDebugger__Command *command;
 
-  g_array_unref (user_data_array);
-
-  istream = g_io_stream_get_input_stream (G_IO_STREAM (client->connection));
+  GInputStream *istream = g_io_stream_get_input_stream (G_IO_STREAM (client->connection));
 
   GST_DEBUG_OBJECT (self, "Received connection from client!\n");
 
-  while ((size = gst_debugger_protocol_utils_read_header (istream, NULL)) > 0) {
-     assert (size <= 1024); // todo max message size in global file
-     GST_DEBUG_OBJECT (self, "Received message of size: %d\n", size);
-     gst_debugger_protocol_utils_read_requested_size (istream, size, buffer, NULL);
-     command = gst_debugger__command__unpack (NULL, size, (uint8_t*) buffer);
-     if (command == NULL) {
-       g_print ("error unpacking incoming message\n");
-       continue;
-     }
+  while ((size = gst_debugger_protocol_utils_read_header (istream, client->cancel)) > 0) {
+    assert (size <= 1024); // todo max message size in global file
+    GST_DEBUG_OBJECT (self, "Received message of size: %d\n", size);
+    gst_debugger_protocol_utils_read_requested_size (istream, size, buffer, NULL);
+    command = gst_debugger__command__unpack (NULL, size, (uint8_t*) buffer);
+    if (command == NULL) {
+      g_print ("error unpacking incoming message\n");
+      continue;
+    }
 
-     if (self->command_handler != NULL) {
-       self->command_handler (command, self->owner, client);
-     }
+    if (self->command_handler != NULL) {
+      self->command_handler (command, self->owner, client);
+    }
 
-     gst_debugger__command__free_unpacked (command, NULL);
-   }
+    gst_debugger__command__free_unpacked (command, NULL);
+  }
 
+  g_mutex_lock (&self->clients_mutex);
   self->clients = g_slist_remove (self->clients, client);
+  g_mutex_unlock (&self->clients_mutex);
 
   if (self->client_disconnected_handler)
     self->client_disconnected_handler (client, self->owner);
 
+  g_mutex_clear (&client->mutex);
+  g_object_unref (client->cancel);
   g_free (client);
 
   GST_LOG_OBJECT (self, "Client disconnected");
 
-  return NULL;
+  g_cond_signal (&self->client_removed_cond);
+
+  return FALSE;
+}
+
+static TcpClient*
+gst_debugserver_tcp_add_client (GstDebugserverTcp * tcp, GSocketConnection * connection)
+{
+  TcpClient *client = (TcpClient*) g_malloc (sizeof (TcpClient));
+
+  g_mutex_init (&client->mutex);
+  client->connection = connection;
+  client->cancel = g_cancellable_new ();
+
+  g_mutex_lock(&tcp->clients_mutex);
+  tcp->clients = g_slist_append (tcp->clients, client);
+  g_mutex_unlock(&tcp->clients_mutex);
+
+  return client;
 }
 
 TcpClient*
 gst_debugserver_tcp_find_client (GstDebugserverTcp * tcp, GSocketConnection * connection)
 {
+  g_mutex_lock(&tcp->clients_mutex);
   GSList *client_list = tcp->clients;
   TcpClient *client;
 
   while (client_list != NULL) {
     client = (TcpClient*) client_list->data;
     if (client->connection == connection) {
+      g_mutex_unlock(&tcp->clients_mutex);
       return client;
     }
     client_list = g_slist_next (client_list);
   }
+  g_mutex_unlock(&tcp->clients_mutex);
  return NULL;
 }
 
@@ -267,15 +269,17 @@ gst_debugserver_tcp_send_packet (GstDebugserverTcp * tcp, TcpClient * client,
 static gboolean
 gst_debugserver_tcp_send_packet_to_all_clients (GstDebugserverTcp * tcp, GstDebugger__GStreamerData * 
gst_data)
 {
-  GSList *clients = tcp->clients;
   TcpClient *client;
   gboolean ret = TRUE;
 
+  g_mutex_lock (&tcp->clients_mutex);
+  GSList *clients = tcp->clients;
   while (clients != NULL) {
     client = (TcpClient*)clients->data;
     ret = ret && gst_debugserver_tcp_send_packet (tcp, client, gst_data);
     clients = clients->next;
   }
+  g_mutex_unlock (&tcp->clients_mutex);
 
   return ret;
 }
diff --git a/src/debugserver/gstdebugservertcp.h b/src/debugserver/gstdebugservertcp.h
index 6d14c59..96276b6 100644
--- a/src/debugserver/gstdebugservertcp.h
+++ b/src/debugserver/gstdebugservertcp.h
@@ -41,6 +41,7 @@ G_BEGIN_DECLS
 typedef struct _TcpClient {
   GSocketConnection * connection;
   GMutex mutex;
+  GCancellable * cancel;
 } TcpClient;
 
 typedef void (*GstDebugserverTcpHandleCommandFunction)
@@ -61,8 +62,9 @@ struct _GstDebugserverTcp {
 
   /*< private >*/
   GSocketService * service;
-  guint port;
   GSList * clients;
+  GMutex clients_mutex;
+  GCond client_removed_cond;
 };
 
 struct _GstDebugserverTcpClass
@@ -76,8 +78,6 @@ GstDebugserverTcp * gst_debugserver_tcp_new (void);
 
 gboolean gst_debugserver_tcp_start_server (GstDebugserverTcp * tcp, guint port);
 
-void gst_debugserver_tcp_stop_server (GstDebugserverTcp * tcp);
-
 gboolean gst_debugserver_tcp_send_packet (GstDebugserverTcp * tcp, TcpClient * client,
   GstDebugger__GStreamerData * gst_data);
 


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]