[gnome-network-displays/cc-tmp: 15/80] cc: async read works
- From: Benjamin Berg <bberg src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [gnome-network-displays/cc-tmp: 15/80] cc: async read works
- Date: Fri, 9 Sep 2022 12:03:49 +0000 (UTC)
commit 3973c991959cd623a027eeb48bc47986edec4064
Author: Anupam Kumar <kyteinsky gmail com>
Date: Wed Jul 27 11:52:18 2022 +0530
cc: async read works
issue:
app crashes even for handled errors
todo:
strong connect request
src/cc/cc-comm.c | 173 +++++++++++++++++++++++++++++++++++++------------------
src/cc/cc-comm.h | 17 +++---
src/nd-cc-sink.c | 10 +++-
3 files changed, 135 insertions(+), 65 deletions(-)
---
diff --git a/src/cc/cc-comm.c b/src/cc/cc-comm.c
index 79ca9a7..42c74ce 100644
--- a/src/cc/cc-comm.c
+++ b/src/cc/cc-comm.c
@@ -16,11 +16,14 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
+#include <math.h>
+
#include "cc-comm.h"
#include "cast_channel.pb-c.h"
// function decl
-void cc_comm_listen (CcComm *comm);
+static void cc_comm_listen (CcComm *comm);
+static void cc_comm_read (CcComm *comm, gsize io_bytes, gboolean read_header);
static void
@@ -44,19 +47,19 @@ cc_comm_dump_message (guint8 *msg, gsize length)
g_debug ("%s", line->str);
}
-void
+static void
cc_comm_parse_received_data(uint8_t * input_buffer, gssize input_size)
{
Castchannel__CastMessage *message;
- message = castchannel__cast_message__unpack(NULL, input_size-4, input_buffer+4);
+ message = castchannel__cast_message__unpack(NULL, input_size, input_buffer);
if (message == NULL)
{
- g_warning ("CCComm: Failed to unpack received data");
+ g_warning ("CcComm: Failed to unpack received data");
return;
}
- g_debug("CCComm: Received data: { source_id: %s, destination_id: %s, namespace_: %s, payload_type: %d,
payload_utf8: %s }",
+ g_debug("CcComm: Received data: { source_id: %s, destination_id: %s, namespace_: %s, payload_type: %d,
payload_utf8: %s }",
message->source_id,
message->destination_id,
message->namespace_,
@@ -92,8 +95,68 @@ cc_comm_accept_certificate (GTlsClientConnection *conn,
// LISTENER
-// async callback for input stream read
-void
+static guint32
+cc_comm_to_message_size (CcComm *comm)
+{
+ return (guint32) (comm->header_buffer[0] * pow(256, 3) +
+ comm->header_buffer[1] * pow(256, 2) +
+ comm->header_buffer[2] * pow(256, 1) +
+ comm->header_buffer[3] * pow(256, 0));
+}
+
+// async callback for message read
+static void
+cc_comm_message_read_cb (GObject *source_object,
+ GAsyncResult *res,
+ gpointer user_data)
+{
+ CcComm * comm = (CcComm*) user_data;
+ g_autoptr(GError) error = NULL;
+ gboolean success;
+ gsize io_bytes;
+
+ success = g_input_stream_read_all_finish (G_INPUT_STREAM (source_object), res, &io_bytes, &error);
+
+ // If we cancelled, just return immediately.
+ if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED))
+ return;
+
+ // XXX: should we give up or keep on retrying if errors pop up
+ /*
+ * If this error is for an old connection (that should be closed already),
+ * then just give up immediately with a CLOSED error.
+ */
+ if (comm->con &&
+ g_io_stream_get_input_stream (G_IO_STREAM (comm->con)) != G_INPUT_STREAM (source_object))
+ {
+ g_error ("CcComm: Error on old read connection, ignoring.");
+ cc_comm_listen (comm);
+ return;
+ }
+
+ if (!success || io_bytes == 0)
+ {
+ if (error)
+ {
+ g_error ("CcComm: Error reading message from stream: %s", error->message);
+ cc_comm_listen (comm);
+ return;
+ }
+ g_error ("CcComm: Error reading message from stream.");
+ cc_comm_listen (comm);
+ return;
+ }
+
+ // dump the received message and try to parse it
+ cc_comm_dump_message (comm->message_buffer, io_bytes);
+ cc_comm_parse_received_data (comm->message_buffer, io_bytes);
+
+ // go for another round
+ cc_comm_listen(comm);
+}
+
+// async callback for header read
+static void
cc_comm_header_read_cb (GObject *source_object,
GAsyncResult *res,
gpointer user_data)
@@ -102,6 +165,7 @@ cc_comm_header_read_cb (GObject *source_object,
g_autoptr(GError) error = NULL;
gboolean success;
gsize io_bytes;
+ guint32 message_size;
success = g_input_stream_read_all_finish (G_INPUT_STREAM (source_object), res, &io_bytes, &error);
@@ -109,6 +173,7 @@ cc_comm_header_read_cb (GObject *source_object,
if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED))
return;
+ // XXX: should we give up or keep on retrying if errors pop up
/*
* If this error is for an old connection (that should be closed already),
* then just give up immediately with a CLOSED error.
@@ -116,7 +181,7 @@ cc_comm_header_read_cb (GObject *source_object,
if (comm->con &&
g_io_stream_get_input_stream (G_IO_STREAM (comm->con)) != G_INPUT_STREAM (source_object))
{
- g_error ("CCComm: Error on old read connection, ignoring.");
+ g_error ("CcComm: Error on old read connection, ignoring.");
cc_comm_listen (comm);
return;
}
@@ -125,62 +190,59 @@ cc_comm_header_read_cb (GObject *source_object,
{
if (error)
{
- g_error ("CCComm: Error reading from stream: %s", error->message);
+ g_error ("CcComm: Error reading header from stream: %s", error->message);
cc_comm_listen (comm);
return;
}
- g_error ("CCComm: Error reading from stream, couldn't read 4 bytes header.");
+ g_error ("CcComm: Error reading header from stream.");
cc_comm_listen (comm);
return;
}
- // TODO
// if everything is well, read all `io_bytes`
- // cc_comm_read (comm);
+ g_debug ("CcComm: Raw header dump:");
+ cc_comm_dump_message (comm->header_buffer, 4);
+
+ message_size = cc_comm_to_message_size (comm);
+ g_debug ("CcComm: Message size: %d", message_size);
+
+ comm->message_buffer = g_malloc0 (message_size);
+ cc_comm_read (comm, message_size, FALSE);
}
-void
-cc_comm_read (CcComm *comm, uint8_t *buffer, gsize io_bytes)
+static void
+cc_comm_read (CcComm *comm, gsize io_bytes, gboolean read_header)
{
GInputStream *istream;
istream = g_io_stream_get_input_stream (G_IO_STREAM (comm->con));
+ if (read_header)
+ {
+ g_input_stream_read_all_async (istream,
+ comm->header_buffer,
+ io_bytes,
+ G_PRIORITY_DEFAULT,
+ NULL,
+ cc_comm_header_read_cb,
+ comm);
+ return;
+ }
g_input_stream_read_all_async (istream,
- buffer,
- io_bytes,
- G_PRIORITY_DEFAULT,
- NULL,
- cc_comm_header_read_cb,
- comm);
+ comm->message_buffer,
+ io_bytes,
+ G_PRIORITY_DEFAULT,
+ NULL,
+ cc_comm_message_read_cb,
+ comm);
}
// listen to all incoming messages from Chromecast
-void
+static void
cc_comm_listen (CcComm *comm)
{
- // gssize io_bytes;
- // g_autofree uint8_t buffer[MAX_MSG_SIZE];
- g_autofree uint8_t header_buffer[4];
-
- cc_comm_read (comm, header_buffer, 4);
-
-
-
-
- // if (io_bytes <= 0)
- // {
- // g_warning ("CCComm: Failed to read: %s", error->message);
- // g_clear_error (error);
- // return FALSE;
- // }
-
- // g_debug ("CCComm: Received %" G_GSSIZE_FORMAT " bytes", io_bytes);
- // g_debug ("CCComm: Received data:");
- // cc_comm_dump_message (buffer, io_bytes);
-
- // cc_comm_parse_received_data (buffer, io_bytes);
-
+ comm->header_buffer = g_malloc0 (4);
+ cc_comm_read (comm, 4, TRUE);
}
gboolean
@@ -205,7 +267,7 @@ cc_comm_make_connection (CcComm *comm, gchar *remote_address, GError **error)
socket = g_socket_new (socket_family, socket_type, G_SOCKET_PROTOCOL_DEFAULT, error);
if (socket == NULL)
{
- g_warning ("CCComm: Failed to create socket: %s", (*error)->message);
+ g_warning ("CcComm: Failed to create socket: %s", (*error)->message);
return FALSE;
}
@@ -215,7 +277,7 @@ cc_comm_make_connection (CcComm *comm, gchar *remote_address, GError **error)
connectable = g_network_address_parse (remote_address, 8009, error);
if (connectable == NULL)
{
- g_warning ("CCComm: Failed to create connectable: %s", (*error)->message);
+ g_warning ("CcComm: Failed to create connectable: %s", (*error)->message);
return FALSE;
}
@@ -225,7 +287,7 @@ cc_comm_make_connection (CcComm *comm, gchar *remote_address, GError **error)
address = g_socket_address_enumerator_next (enumerator, NULL, error);
if (address == NULL)
{
- g_warning ("CCComm: Failed to create address: %s", (*error)->message);
+ g_warning ("CcComm: Failed to create address: %s", (*error)->message);
return FALSE;
}
@@ -244,7 +306,7 @@ cc_comm_make_connection (CcComm *comm, gchar *remote_address, GError **error)
tls_conn = g_tls_client_connection_new (comm->con, connectable, error);
if (tls_conn == NULL)
{
- g_warning ("CCComm: Failed to create TLS connection: %s", (*error)->message);
+ g_warning ("CcComm: Failed to create TLS connection: %s", (*error)->message);
return FALSE;
}
@@ -256,20 +318,19 @@ cc_comm_make_connection (CcComm *comm, gchar *remote_address, GError **error)
// see what should be done about cancellable
if (!g_tls_connection_handshake (G_TLS_CONNECTION (tls_conn), NULL, error))
{
- g_warning ("CCComm: Failed to handshake: %s", (*error)->message);
+ g_warning ("CcComm: Failed to handshake: %s", (*error)->message);
return FALSE;
}
- g_debug ("CCComm: Connected to %s", remote_address);
+ g_debug ("CcComm: Connected to %s", remote_address);
- // TODO
// start listening to all incoming messages
- // cc_comm_listen (comm);
+ cc_comm_listen (comm);
return TRUE;
}
-gboolean
+static gboolean
cc_comm_tls_send (CcComm * comm,
uint8_t * message,
gssize size,
@@ -298,12 +359,12 @@ cc_comm_tls_send (CcComm * comm,
if (io_bytes <= 0)
{
- g_warning ("CCComm: Failed to write: %s", (*error)->message);
+ g_warning ("CcComm: Failed to write: %s", (*error)->message);
g_clear_error (error);
return FALSE;
}
- g_debug ("CCComm: Sent %" G_GSSIZE_FORMAT " bytes", io_bytes);
+ g_debug ("CcComm: Sent %" G_GSSIZE_FORMAT " bytes", io_bytes);
size -= io_bytes;
}
@@ -315,7 +376,7 @@ cc_comm_tls_send (CcComm * comm,
// builds message based on available types
-Castchannel__CastMessage
+static Castchannel__CastMessage
cc_comm_build_message (gchar *namespace_,
Castchannel__CastMessage__PayloadType payload_type,
ProtobufCBinaryData * binary_payload,
@@ -354,7 +415,7 @@ cc_comm_send_request (CcComm * comm, enum MessageType message_type, char *utf8_p
gboolean expect_input = TRUE;
g_autofree uint8_t *sock_buffer = NULL;
- g_debug("Send request: %d", message_type);
+ g_debug("CcComm: Send request: %d", message_type);
switch (message_type)
{
diff --git a/src/cc/cc-comm.h b/src/cc/cc-comm.h
index 6fe73fa..6c8b9ea 100644
--- a/src/cc/cc-comm.h
+++ b/src/cc/cc-comm.h
@@ -22,22 +22,25 @@
G_BEGIN_DECLS
+#define MAX_MSG_SIZE (64 * 1024)
+
struct _CcComm
{
/*< public >*/
GIOStream *con;
+
+ guint8 *header_buffer;
+ guint8 *message_buffer;
};
typedef struct _CcComm CcComm;
-#define MAX_MSG_SIZE 64 * 1024
-
enum MessageType {
- MESSAGE_TYPE_CONNECT,
- MESSAGE_TYPE_DISCONNECT,
- MESSAGE_TYPE_PING,
- MESSAGE_TYPE_PONG,
- MESSAGE_TYPE_RECEIVER,
+ MESSAGE_TYPE_CONNECT,
+ MESSAGE_TYPE_DISCONNECT,
+ MESSAGE_TYPE_PING,
+ MESSAGE_TYPE_PONG,
+ MESSAGE_TYPE_RECEIVER,
};
gboolean cc_comm_make_connection (CcComm *comm, gchar *remote_address, GError **error);
diff --git a/src/nd-cc-sink.c b/src/nd-cc-sink.c
index 9ae2638..a8ed37c 100644
--- a/src/nd-cc-sink.c
+++ b/src/nd-cc-sink.c
@@ -338,8 +338,6 @@ nd_cc_sink_sink_start_stream (NdSink *sink)
return NULL;
}
- // TODO: listen to all incoming messages
-
// open up a virtual connection to the device
cc_comm_send_request(&self->comm, MESSAGE_TYPE_CONNECT, NULL, NULL);
@@ -350,8 +348,15 @@ nd_cc_sink_sink_start_stream (NdSink *sink)
cc_comm_send_request(&self->comm, MESSAGE_TYPE_RECEIVER, "{\"type\": \"GET_STATUS\"}", NULL);
// send req to open youtube
+ g_debug("NdCCSink: Launching YouTube");
cc_comm_send_request(&self->comm, MESSAGE_TYPE_RECEIVER, "{ \"type\": \"LAUNCH\", \"appId\": \"YouTube\",
\"requestId\": 1 }", NULL);
+ g_debug("NdCCSink: Get Status Again");
+ cc_comm_send_request(&self->comm, MESSAGE_TYPE_RECEIVER, "{\"type\": \"GET_STATUS\"}", NULL);
+
+ g_debug ("NdCCSink: Mute the TV");
+ cc_comm_send_request(&self->comm, MESSAGE_TYPE_RECEIVER, "{ \"type\": \"SET_VOLUME\", \"volume\": {
\"muted\": true } }", NULL);
+
self->server = wfd_server_new ();
self->server_source_id = gst_rtsp_server_attach (GST_RTSP_SERVER (self->server), NULL);
@@ -404,6 +409,7 @@ nd_cc_sink_sink_stop_stream_int (NdCCSink *self)
self->cancellable = g_cancellable_new ();
/* Close the client connection
+ * FIX: this does not close the connection
* TODO: This should be moved into cc-comm.c */
if (self->comm.con != NULL)
{
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]