[gnome-network-displays/cc-tmp: 15/80] cc: async read works




commit 3973c991959cd623a027eeb48bc47986edec4064
Author: Anupam Kumar <kyteinsky gmail com>
Date:   Wed Jul 27 11:52:18 2022 +0530

    cc: async read works
    
    issue:
      app crashes even for handled errors
    todo:
      strong connect request

 src/cc/cc-comm.c | 173 +++++++++++++++++++++++++++++++++++++------------------
 src/cc/cc-comm.h |  17 +++---
 src/nd-cc-sink.c |  10 +++-
 3 files changed, 135 insertions(+), 65 deletions(-)
---
diff --git a/src/cc/cc-comm.c b/src/cc/cc-comm.c
index 79ca9a7..42c74ce 100644
--- a/src/cc/cc-comm.c
+++ b/src/cc/cc-comm.c
@@ -16,11 +16,14 @@
  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
  */
 
+#include <math.h>
+
 #include "cc-comm.h"
 #include "cast_channel.pb-c.h"
 
 // function decl
-void cc_comm_listen (CcComm *comm);
+static void cc_comm_listen (CcComm *comm);
+static void cc_comm_read (CcComm *comm, gsize io_bytes, gboolean read_header);
 
 
 static void
@@ -44,19 +47,19 @@ cc_comm_dump_message (guint8 *msg, gsize length)
     g_debug ("%s", line->str);
 }
 
-void
+static void
 cc_comm_parse_received_data(uint8_t * input_buffer, gssize input_size)
 {
   Castchannel__CastMessage *message;
 
-  message = castchannel__cast_message__unpack(NULL, input_size-4, input_buffer+4);
+  message = castchannel__cast_message__unpack(NULL, input_size, input_buffer);
   if (message == NULL)
   {
-    g_warning ("CCComm: Failed to unpack received data");
+    g_warning ("CcComm: Failed to unpack received data");
     return;
   }
 
-  g_debug("CCComm: Received data: { source_id: %s, destination_id: %s, namespace_: %s, payload_type: %d, 
payload_utf8: %s }",
+  g_debug("CcComm: Received data: { source_id: %s, destination_id: %s, namespace_: %s, payload_type: %d, 
payload_utf8: %s }",
            message->source_id,
            message->destination_id,
            message->namespace_,
@@ -92,8 +95,68 @@ cc_comm_accept_certificate (GTlsClientConnection *conn,
 
 // LISTENER
 
-// async callback for input stream read
-void
+static guint32
+cc_comm_to_message_size (CcComm *comm)
+{
+  return (guint32) (comm->header_buffer[0] * pow(256, 3) +
+                    comm->header_buffer[1] * pow(256, 2) +
+                    comm->header_buffer[2] * pow(256, 1) +
+                    comm->header_buffer[3] * pow(256, 0));
+}
+
+// async callback for message read
+static void
+cc_comm_message_read_cb (GObject *source_object,
+                        GAsyncResult *res,
+                        gpointer user_data) 
+{
+  CcComm * comm = (CcComm*) user_data;
+  g_autoptr(GError) error = NULL;
+  gboolean success;
+  gsize io_bytes;
+
+  success = g_input_stream_read_all_finish (G_INPUT_STREAM (source_object), res, &io_bytes, &error);
+
+  // If we cancelled, just return immediately.
+  if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED))
+    return;
+
+  // XXX: should we give up or keep on retrying if errors pop up
+  /*
+   * If this error is for an old connection (that should be closed already),
+   * then just give up immediately with a CLOSED error.
+   */
+  if (comm->con &&
+    g_io_stream_get_input_stream (G_IO_STREAM (comm->con)) != G_INPUT_STREAM (source_object))
+  {
+    g_error ("CcComm: Error on old read connection, ignoring.");
+    cc_comm_listen (comm);
+    return;
+  }
+
+  if (!success || io_bytes == 0)
+  {
+    if (error)
+    {
+      g_error ("CcComm: Error reading message from stream: %s", error->message);
+      cc_comm_listen (comm);
+      return;
+    }
+    g_error ("CcComm: Error reading message from stream.");
+    cc_comm_listen (comm);
+    return;
+  }
+
+  // dump the received message and try to parse it
+  cc_comm_dump_message (comm->message_buffer, io_bytes);
+  cc_comm_parse_received_data (comm->message_buffer, io_bytes);
+
+  // go for another round
+  cc_comm_listen(comm);
+}
+
+// async callback for header read
+static void
 cc_comm_header_read_cb (GObject *source_object,
                         GAsyncResult *res,
                         gpointer user_data) 
@@ -102,6 +165,7 @@ cc_comm_header_read_cb (GObject *source_object,
   g_autoptr(GError) error = NULL;
   gboolean success;
   gsize io_bytes;
+  guint32 message_size;
 
   success = g_input_stream_read_all_finish (G_INPUT_STREAM (source_object), res, &io_bytes, &error);
 
@@ -109,6 +173,7 @@ cc_comm_header_read_cb (GObject *source_object,
   if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED))
     return;
 
+  // XXX: should we give up or keep on retrying if errors pop up
   /*
    * If this error is for an old connection (that should be closed already),
    * then just give up immediately with a CLOSED error.
@@ -116,7 +181,7 @@ cc_comm_header_read_cb (GObject *source_object,
   if (comm->con &&
     g_io_stream_get_input_stream (G_IO_STREAM (comm->con)) != G_INPUT_STREAM (source_object))
   {
-    g_error ("CCComm: Error on old read connection, ignoring.");
+    g_error ("CcComm: Error on old read connection, ignoring.");
     cc_comm_listen (comm);
     return;
   }
@@ -125,62 +190,59 @@ cc_comm_header_read_cb (GObject *source_object,
   {
     if (error)
     {
-      g_error ("CCComm: Error reading from stream: %s", error->message);
+      g_error ("CcComm: Error reading header from stream: %s", error->message);
       cc_comm_listen (comm);
       return;
     }
-    g_error ("CCComm: Error reading from stream, couldn't read 4 bytes header.");
+    g_error ("CcComm: Error reading header from stream.");
     cc_comm_listen (comm);
     return;
   }
 
-  // TODO
   // if everything is well, read all `io_bytes`
-  // cc_comm_read (comm);
+  g_debug ("CcComm: Raw header dump:");
+  cc_comm_dump_message (comm->header_buffer, 4);
+
+  message_size = cc_comm_to_message_size (comm);
+  g_debug ("CcComm: Message size: %d", message_size);
+
+  comm->message_buffer = g_malloc0 (message_size);
+  cc_comm_read (comm, message_size, FALSE);
 }
 
-void
-cc_comm_read (CcComm *comm, uint8_t *buffer, gsize io_bytes)
+static void
+cc_comm_read (CcComm *comm, gsize io_bytes, gboolean read_header)
 {
   GInputStream *istream;
 
   istream = g_io_stream_get_input_stream (G_IO_STREAM (comm->con));
 
+  if (read_header)
+    {
+      g_input_stream_read_all_async (istream,
+                                    comm->header_buffer,
+                                    io_bytes,
+                                    G_PRIORITY_DEFAULT,
+                                    NULL,
+                                    cc_comm_header_read_cb,
+                                    comm);
+      return;
+    }
   g_input_stream_read_all_async (istream,
-                                 buffer,
-                                 io_bytes,
-                                 G_PRIORITY_DEFAULT,
-                                 NULL,
-                                 cc_comm_header_read_cb,
-                                 comm);
+                                comm->message_buffer,
+                                io_bytes,
+                                G_PRIORITY_DEFAULT,
+                                NULL,
+                                cc_comm_message_read_cb,
+                                comm);
 }
 
 // listen to all incoming messages from Chromecast
-void
+static void
 cc_comm_listen (CcComm *comm)
 {
-  // gssize io_bytes;
-  // g_autofree uint8_t buffer[MAX_MSG_SIZE];
-  g_autofree uint8_t header_buffer[4];
-
-  cc_comm_read (comm, header_buffer, 4);
-
-
-
-
-  // if (io_bytes <= 0)
-  // {
-  //   g_warning ("CCComm: Failed to read: %s", error->message);
-  //   g_clear_error (error);
-  //   return FALSE;
-  // }
-
-  // g_debug ("CCComm: Received %" G_GSSIZE_FORMAT " bytes", io_bytes);
-  // g_debug ("CCComm: Received data:");
-  // cc_comm_dump_message (buffer, io_bytes);
-
-  // cc_comm_parse_received_data (buffer, io_bytes);
-
+  comm->header_buffer = g_malloc0 (4);
+  cc_comm_read (comm, 4, TRUE);
 }
 
 gboolean
@@ -205,7 +267,7 @@ cc_comm_make_connection (CcComm *comm, gchar *remote_address, GError **error)
   socket = g_socket_new (socket_family, socket_type, G_SOCKET_PROTOCOL_DEFAULT, error);
   if (socket == NULL)
   {
-    g_warning ("CCComm: Failed to create socket: %s", (*error)->message);
+    g_warning ("CcComm: Failed to create socket: %s", (*error)->message);
     return FALSE;
   }
 
@@ -215,7 +277,7 @@ cc_comm_make_connection (CcComm *comm, gchar *remote_address, GError **error)
   connectable = g_network_address_parse (remote_address, 8009, error);
   if (connectable == NULL)
   {
-    g_warning ("CCComm: Failed to create connectable: %s", (*error)->message);
+    g_warning ("CcComm: Failed to create connectable: %s", (*error)->message);
     return FALSE;
   }
 
@@ -225,7 +287,7 @@ cc_comm_make_connection (CcComm *comm, gchar *remote_address, GError **error)
     address = g_socket_address_enumerator_next (enumerator, NULL, error);
     if (address == NULL)
     {
-      g_warning ("CCComm: Failed to create address: %s", (*error)->message);
+      g_warning ("CcComm: Failed to create address: %s", (*error)->message);
       return FALSE;
     }
 
@@ -244,7 +306,7 @@ cc_comm_make_connection (CcComm *comm, gchar *remote_address, GError **error)
   tls_conn = g_tls_client_connection_new (comm->con, connectable, error);
   if (tls_conn == NULL)
   {
-    g_warning ("CCComm: Failed to create TLS connection: %s", (*error)->message);
+    g_warning ("CcComm: Failed to create TLS connection: %s", (*error)->message);
     return FALSE;
   }
 
@@ -256,20 +318,19 @@ cc_comm_make_connection (CcComm *comm, gchar *remote_address, GError **error)
   // see what should be done about cancellable
   if (!g_tls_connection_handshake (G_TLS_CONNECTION (tls_conn), NULL, error))
   {
-    g_warning ("CCComm: Failed to handshake: %s", (*error)->message);
+    g_warning ("CcComm: Failed to handshake: %s", (*error)->message);
     return FALSE;
   }
 
-  g_debug ("CCComm: Connected to %s", remote_address);
+  g_debug ("CcComm: Connected to %s", remote_address);
 
-  // TODO
   // start listening to all incoming messages
-  // cc_comm_listen (comm);
+  cc_comm_listen (comm);
 
   return TRUE;
 }
 
-gboolean
+static gboolean
 cc_comm_tls_send (CcComm        * comm,
                   uint8_t       * message,
                   gssize          size,
@@ -298,12 +359,12 @@ cc_comm_tls_send (CcComm        * comm,
 
     if (io_bytes <= 0)
     {
-      g_warning ("CCComm: Failed to write: %s", (*error)->message);
+      g_warning ("CcComm: Failed to write: %s", (*error)->message);
       g_clear_error (error);
       return FALSE;
     }
 
-    g_debug ("CCComm: Sent %" G_GSSIZE_FORMAT " bytes", io_bytes);
+    g_debug ("CcComm: Sent %" G_GSSIZE_FORMAT " bytes", io_bytes);
 
     size -= io_bytes;
   }
@@ -315,7 +376,7 @@ cc_comm_tls_send (CcComm        * comm,
 
 
 // builds message based on available types
-Castchannel__CastMessage
+static Castchannel__CastMessage
 cc_comm_build_message (gchar *namespace_,
                        Castchannel__CastMessage__PayloadType payload_type,
                        ProtobufCBinaryData * binary_payload,
@@ -354,7 +415,7 @@ cc_comm_send_request (CcComm * comm, enum MessageType message_type, char *utf8_p
   gboolean expect_input = TRUE;
   g_autofree uint8_t *sock_buffer = NULL;
 
-  g_debug("Send request: %d", message_type);
+  g_debug("CcComm: Send request: %d", message_type);
 
   switch (message_type)
   {
diff --git a/src/cc/cc-comm.h b/src/cc/cc-comm.h
index 6fe73fa..6c8b9ea 100644
--- a/src/cc/cc-comm.h
+++ b/src/cc/cc-comm.h
@@ -22,22 +22,25 @@
 
 G_BEGIN_DECLS
 
+#define MAX_MSG_SIZE (64 * 1024)
+
 struct _CcComm
 {
   /*< public >*/
   GIOStream *con;
+
+  guint8    *header_buffer;
+  guint8    *message_buffer;
 };
 
 typedef struct _CcComm CcComm;
 
-#define MAX_MSG_SIZE 64 * 1024
-
 enum MessageType {
-    MESSAGE_TYPE_CONNECT,
-    MESSAGE_TYPE_DISCONNECT,
-    MESSAGE_TYPE_PING,
-    MESSAGE_TYPE_PONG,
-    MESSAGE_TYPE_RECEIVER,
+  MESSAGE_TYPE_CONNECT,
+  MESSAGE_TYPE_DISCONNECT,
+  MESSAGE_TYPE_PING,
+  MESSAGE_TYPE_PONG,
+  MESSAGE_TYPE_RECEIVER,
 };
 
 gboolean cc_comm_make_connection (CcComm *comm, gchar *remote_address, GError **error);
diff --git a/src/nd-cc-sink.c b/src/nd-cc-sink.c
index 9ae2638..a8ed37c 100644
--- a/src/nd-cc-sink.c
+++ b/src/nd-cc-sink.c
@@ -338,8 +338,6 @@ nd_cc_sink_sink_start_stream (NdSink *sink)
       return NULL;
     }
 
-  // TODO: listen to all incoming messages
-
   // open up a virtual connection to the device
   cc_comm_send_request(&self->comm, MESSAGE_TYPE_CONNECT, NULL, NULL);
 
@@ -350,8 +348,15 @@ nd_cc_sink_sink_start_stream (NdSink *sink)
   cc_comm_send_request(&self->comm, MESSAGE_TYPE_RECEIVER, "{\"type\": \"GET_STATUS\"}", NULL);
 
   // send req to open youtube
+  g_debug("NdCCSink: Launching YouTube");
   cc_comm_send_request(&self->comm, MESSAGE_TYPE_RECEIVER, "{ \"type\": \"LAUNCH\", \"appId\": \"YouTube\", 
\"requestId\": 1 }", NULL);
 
+  g_debug("NdCCSink: Get Status Again");
+  cc_comm_send_request(&self->comm, MESSAGE_TYPE_RECEIVER, "{\"type\": \"GET_STATUS\"}", NULL);
+
+  g_debug ("NdCCSink: Mute the TV");
+  cc_comm_send_request(&self->comm, MESSAGE_TYPE_RECEIVER, "{ \"type\": \"SET_VOLUME\", \"volume\": { 
\"muted\": true } }", NULL);
+
   self->server = wfd_server_new ();
   self->server_source_id = gst_rtsp_server_attach (GST_RTSP_SERVER (self->server), NULL);
 
@@ -404,6 +409,7 @@ nd_cc_sink_sink_stop_stream_int (NdCCSink *self)
   self->cancellable = g_cancellable_new ();
 
   /* Close the client connection
+   * FIX: this does not close the connection
    * TODO: This should be moved into cc-comm.c */
   if (self->comm.con != NULL)
     {


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]