[libsoup] io-http2: use the item cancellable for send data operations



commit e02749b4d3b3f26c300b537145c444130022c38a
Author: Carlos Garcia Campos <cgarcia igalia com>
Date:   Mon May 24 16:10:53 2021 +0200

    io-http2: use the item cancellable for send data operations
    
    And use the data source cancellable to create the io source to wait for
    the send data operations.

 libsoup/http2/soup-client-message-io-http2.c | 34 ++++++++++++++--------------
 tests/http2-test.c                           |  2 +-
 2 files changed, 18 insertions(+), 18 deletions(-)
---
diff --git a/libsoup/http2/soup-client-message-io-http2.c b/libsoup/http2/soup-client-message-io-http2.c
index 4eaf4d43..a70d41f1 100644
--- a/libsoup/http2/soup-client-message-io-http2.c
+++ b/libsoup/http2/soup-client-message-io-http2.c
@@ -562,6 +562,9 @@ on_data_readable (GInputStream *stream,
 {
         SoupHTTP2MessageData *data = (SoupHTTP2MessageData*)user_data;
 
+        g_cancellable_cancel (data->data_source_cancellable);
+        g_clear_object (&data->data_source_cancellable);
+
         NGCHECK (nghttp2_session_resume_data (data->io->session, data->stream_id));
 
         g_clear_pointer (&data->data_source_poll, g_source_unref);
@@ -579,6 +582,9 @@ on_data_read (GInputStream *source,
 
         h2_debug (data->io, data, "[SEND_BODY] Read %zd", read);
 
+        g_cancellable_cancel (data->data_source_cancellable);
+        g_clear_object (&data->data_source_cancellable);
+
         /* This operation may have outlived the message data in which
            case this will have been cancelled. */
         if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED)) {
@@ -625,13 +631,6 @@ on_data_source_read_callback (nghttp2_session     *session,
         SoupHTTP2MessageData *data = nghttp2_session_get_stream_user_data (session, stream_id);
         SoupClientMessageIOHTTP2 *io = get_io_data (data->msg);
 
-        /* This cancellable is only used for async data source operations,
-         * only exists while reading is happening, and will be cancelled
-         * at any point if the data is freed.
-         */
-        if (!data->data_source_cancellable)
-                data->data_source_cancellable = g_cancellable_new ();
-
         /* We support pollable streams in the best case because they
          * should perform better with one fewer copy of each buffer and no threading. */
         if (G_IS_POLLABLE_INPUT_STREAM (source->ptr) && g_pollable_input_stream_can_poll 
(G_POLLABLE_INPUT_STREAM (source->ptr))) {
@@ -650,12 +649,14 @@ on_data_source_read_callback (nghttp2_session     *session,
                                 g_assert (data->data_source_poll == NULL);
 
                                 h2_debug (io, data, "[SEND_BODY] Polling");
-                                data->data_source_poll = g_pollable_input_stream_create_source (in_stream, 
data->data_source_cancellable);
+                                data->data_source_poll = g_pollable_input_stream_create_source (in_stream, 
data->cancellable);
                                 g_source_set_callback (data->data_source_poll, 
(GSourceFunc)on_data_readable, data, NULL);
                                 g_source_set_priority (data->data_source_poll, get_data_io_priority (data));
                                 g_source_attach (data->data_source_poll, g_main_context_get_thread_default 
());
 
                                 g_error_free (error);
+                                g_assert (!data->data_source_cancellable);
+                                data->data_source_cancellable = g_cancellable_new ();
                                 return NGHTTP2_ERR_DEFERRED;
                         }
 
@@ -678,7 +679,7 @@ on_data_source_read_callback (nghttp2_session     *session,
                 if (!data->data_source_buffer)
                         data->data_source_buffer = g_byte_array_new ();
 
-                gsize buffer_len = data->data_source_buffer->len;
+                guint buffer_len = data->data_source_buffer->len;
                 if (buffer_len) {
                         h2_debug (io, data, "[SEND_BODY] Sending %zu", buffer_len);
                         g_assert (buffer_len <= length); /* QUESTION: Maybe not reliable */
@@ -688,19 +689,19 @@ on_data_source_read_callback (nghttp2_session     *session,
                         return buffer_len;
                 } else if (data->data_source_eof) {
                         h2_debug (io, data, "[SEND_BODY] EOF");
-                        g_clear_object (&data->data_source_cancellable);
                         *data_flags |= NGHTTP2_DATA_FLAG_EOF;
                         return 0;
                 } else if (data->data_source_error) {
-                        g_clear_object (&data->data_source_cancellable);
                         set_error_for_data (data, g_steal_pointer (&data->data_source_error));
                         return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
                 } else {
                         h2_debug (io, data, "[SEND_BODY] Reading async");
                         g_byte_array_set_size (data->data_source_buffer, length);
+                        g_assert (!data->data_source_cancellable);
+                        data->data_source_cancellable = g_cancellable_new ();
                         g_input_stream_read_async (in_stream, data->data_source_buffer->data, length,
                                                    get_data_io_priority (data),
-                                                   data->data_source_cancellable,
+                                                   data->cancellable,
                                                    (GAsyncReadyCallback)on_data_read, data);
                         return NGHTTP2_ERR_DEFERRED;
                 }
@@ -754,10 +755,7 @@ soup_http2_message_data_free (SoupHTTP2MessageData *data)
         g_clear_error (&data->data_source_error);
         g_clear_pointer (&data->data_source_buffer, g_byte_array_unref);
 
-        if (data->data_source_cancellable) {
-                g_cancellable_cancel (data->data_source_cancellable);
-                g_clear_object (&data->data_source_cancellable);
-        }
+        g_clear_object (&data->data_source_cancellable);
 
         g_clear_error (&data->error);
 
@@ -1024,6 +1022,8 @@ soup_client_message_io_http2_get_source (SoupMessage             *msg,
         /* TODO: Handle mixing writes in? */
         if (data->paused)
                 base_source = cancellable ? g_cancellable_source_new (cancellable) : NULL;
+        else if (data->state < STATE_WRITE_DONE && data->data_source_cancellable)
+                base_source = g_cancellable_source_new (data->data_source_cancellable);
         else if (data->state < STATE_WRITE_DONE && nghttp2_session_want_write (io->session))
                 base_source = g_pollable_output_stream_create_source (G_POLLABLE_OUTPUT_STREAM 
(io->ostream), cancellable);
         else if (data->state < STATE_READ_DONE && data->decoded_data_istream)
@@ -1196,7 +1196,7 @@ io_run_until (SoupClientMessageIOHTTP2 *io,
 
        g_object_ref (msg);
 
-       while (progress && get_io_data (msg) == io && !data->paused && data->state < state)
+       while (progress && get_io_data (msg) == io && !data->paused && !data->data_source_cancellable && 
data->state < state)
                 progress = io_run (data, blocking, cancellable, &my_error);
 
         if (my_error) {
diff --git a/tests/http2-test.c b/tests/http2-test.c
index 13e2e4c6..dd3214a6 100644
--- a/tests/http2-test.c
+++ b/tests/http2-test.c
@@ -314,7 +314,7 @@ do_post_blocked_async_test (Test *test, gconstpointer data)
                         soup_body_input_stream_http2_add_data (SOUP_BODY_INPUT_STREAM_HTTP2 (in_stream), 
(guint8*)" Part 2", 8);
                         soup_body_input_stream_http2_complete (SOUP_BODY_INPUT_STREAM_HTTP2 (in_stream));
                 }
-                g_main_context_iteration (async_context, FALSE);
+                g_main_context_iteration (async_context, TRUE);
         }
 
         g_assert_cmpstr (g_bytes_get_data (response, NULL), ==, "Part 1 - Part 2");


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]