[libsoup/carlosgc/request-body-stream: 1/3] message: add support for stream based request body




commit dbf1bb5037e58372d004d4b3d1e6bef8207c1b46
Author: Carlos Garcia Campos <cgarcia igalia com>
Date:   Tue Oct 6 14:52:06 2020 +0200

    message: add support for stream based request body
    
    Add soup_message_set_request_body() that takes a GInputStream and
    soup_message_set_request_body_from_bytes() for convenience that creates
    a GMemoryInputStream for the given bytes.

 docs/reference/libsoup-3.0-sections.txt |   2 +
 libsoup/soup-body-output-stream.c       |  32 ++-
 libsoup/soup-message-io.c               | 118 +++++++--
 libsoup/soup-message.c                  |  71 ++++++
 libsoup/soup-message.h                  |  10 +
 libsoup/soup-version.h.in               |   1 +
 tests/chunk-test.c                      | 407 --------------------------------
 tests/meson.build                       |   2 +-
 8 files changed, 212 insertions(+), 431 deletions(-)
---
diff --git a/docs/reference/libsoup-3.0-sections.txt b/docs/reference/libsoup-3.0-sections.txt
index ce2bcb75..b31a5fb9 100644
--- a/docs/reference/libsoup-3.0-sections.txt
+++ b/docs/reference/libsoup-3.0-sections.txt
@@ -7,6 +7,8 @@ SoupMessage
 soup_message_new
 soup_message_new_from_uri
 soup_message_set_request
+soup_message_set_request_body
+soup_message_set_request_body_from_bytes
 soup_message_set_response
 <SUBSECTION>
 SoupHTTPVersion
diff --git a/libsoup/soup-body-output-stream.c b/libsoup/soup-body-output-stream.c
index 7ba59dca..94fe2811 100644
--- a/libsoup/soup-body-output-stream.c
+++ b/libsoup/soup-body-output-stream.c
@@ -40,6 +40,14 @@ enum {
        PROP_CONTENT_LENGTH
 };
 
+enum {
+       WROTE_DATA,
+
+       LAST_SIGNAL
+};
+
+static guint signals[LAST_SIGNAL] = { 0 };
+
 static void soup_body_output_stream_pollable_init (GPollableOutputStreamInterface *pollable_interface, 
gpointer interface_data);
 
 G_DEFINE_TYPE_WITH_CODE (SoupBodyOutputStream, soup_body_output_stream, G_TYPE_FILTER_OUTPUT_STREAM,
@@ -99,6 +107,13 @@ soup_body_output_stream_get_property (GObject *object, guint prop_id,
        }
 }
 
+static void
+soup_body_output_stream_wrote_data (SoupBodyOutputStream *bostream,
+                                   gsize                 count)
+{
+       g_signal_emit (bostream, signals[WROTE_DATA], 0, count);
+}
+
 static gssize
 soup_body_output_stream_write_raw (SoupBodyOutputStream  *bostream,
                                   const void            *buffer,
@@ -126,8 +141,10 @@ soup_body_output_stream_write_raw (SoupBodyOutputStream  *bostream,
                                          buffer, my_count,
                                          blocking, cancellable, error);
 
-       if (nwrote > 0 && bostream->priv->write_length)
+       if (nwrote > 0 && bostream->priv->write_length) {
                bostream->priv->written += nwrote;
+               soup_body_output_stream_wrote_data (bostream, nwrote);
+       }
 
        if (nwrote == my_count && my_count != count)
                nwrote = count;
@@ -173,6 +190,9 @@ again:
                nwrote = g_pollable_stream_write (bostream->priv->base_stream,
                                                  buffer, count, blocking,
                                                  cancellable, error);
+               if (nwrote > 0)
+                       soup_body_output_stream_wrote_data (bostream, nwrote);
+
                if (nwrote < (gssize)count)
                        return nwrote;
 
@@ -300,6 +320,16 @@ soup_body_output_stream_class_init (SoupBodyOutputStreamClass *stream_class)
        output_stream_class->write_fn = soup_body_output_stream_write_fn;
        output_stream_class->close_fn = soup_body_output_stream_close_fn;
 
+        signals[WROTE_DATA] =
+                g_signal_new ("wrote-data",
+                              G_OBJECT_CLASS_TYPE (object_class),
+                              G_SIGNAL_RUN_LAST,
+                              0,
+                              NULL, NULL,
+                              NULL,
+                              G_TYPE_NONE, 1,
+                              G_TYPE_UINT);
+
        g_object_class_install_property (
                object_class, PROP_ENCODING,
                g_param_spec_enum ("encoding",
diff --git a/libsoup/soup-message-io.c b/libsoup/soup-message-io.c
index 1d85daa8..cfbba0f7 100644
--- a/libsoup/soup-message-io.c
+++ b/libsoup/soup-message-io.c
@@ -85,8 +85,8 @@ typedef struct {
        GSource *unpause_source;
        gboolean paused;
 
-       GCancellable *async_close_wait;
-       GError       *async_close_error;
+       GCancellable *async_wait;
+       GError       *async_error;
 
        SoupMessageGetHeadersFn   get_headers_cb;
        SoupMessageParseHeadersFn parse_headers_cb;
@@ -130,11 +130,11 @@ soup_message_io_cleanup (SoupMessage *msg)
        g_string_free (io->write_buf, TRUE);
         g_clear_pointer (&io->write_chunk, g_bytes_unref);
 
-       if (io->async_close_wait) {
-               g_cancellable_cancel (io->async_close_wait);
-               g_clear_object (&io->async_close_wait);
+       if (io->async_wait) {
+               g_cancellable_cancel (io->async_wait);
+               g_clear_object (&io->async_wait);
        }
-       g_clear_error (&io->async_close_error);
+       g_clear_error (&io->async_error);
 
        g_slice_free (SoupMessageIOData, io);
 }
@@ -321,6 +321,50 @@ soup_message_setup_body_istream (GInputStream *body_stream,
        return istream;
 }
 
+static void
+request_body_stream_wrote_data_cb (SoupMessage *msg,
+                                  guint        count)
+{
+       GBytes *chunk;
+
+       /* FIXME: Change SoupMessage::wrote-body-data to pass just the size */
+       chunk = g_bytes_new_static ("", count);
+       soup_message_wrote_body_data (msg, chunk);
+       g_bytes_unref (chunk);
+}
+
+static void
+request_body_stream_wrote_cb (GOutputStream *ostream,
+                             GAsyncResult  *result,
+                             SoupMessage   *msg)
+{
+       SoupMessageIOData *io;
+       gssize nwrote;
+       GCancellable *async_wait;
+       GError *error = NULL;
+
+       nwrote = g_output_stream_splice_finish (ostream, result, &error);
+
+       io = soup_message_get_io_data (msg);
+       if (!io || !io->async_wait || io->body_ostream != ostream) {
+               g_clear_error (&error);
+               g_object_unref (msg);
+               return;
+       }
+
+       if (nwrote != -1)
+               io->write_state = SOUP_MESSAGE_IO_STATE_BODY_FLUSH;
+
+       if (error)
+               g_propagate_error (&io->async_error, error);
+       async_wait = io->async_wait;
+       io->async_wait = NULL;
+       g_cancellable_cancel (async_wait);
+       g_object_unref (async_wait);
+
+       g_object_unref (msg);
+}
+
 static void
 closed_async (GObject      *source,
              GAsyncResult *result,
@@ -329,21 +373,21 @@ closed_async (GObject      *source,
        GOutputStream *body_ostream = G_OUTPUT_STREAM (source);
        SoupMessage *msg = user_data;
        SoupMessageIOData *io;
-       GCancellable *async_close_wait;
+       GCancellable *async_wait;
 
        io = soup_message_get_io_data (msg);
-       if (!io || !io->async_close_wait || io->body_ostream != body_ostream) {
+       if (!io || !io->async_wait || io->body_ostream != body_ostream) {
                g_object_unref (msg);
                return;
        }
 
-       g_output_stream_close_finish (body_ostream, result, &io->async_close_error);
+       g_output_stream_close_finish (body_ostream, result, &io->async_error);
        g_clear_object (&io->body_ostream);
 
-       async_close_wait = io->async_close_wait;
-       io->async_close_wait = NULL;
-       g_cancellable_cancel (async_close_wait);
-       g_object_unref (async_close_wait);
+       async_wait = io->async_wait;
+       io->async_wait = NULL;
+       g_cancellable_cancel (async_wait);
+       g_object_unref (async_wait);
 
        g_object_unref (msg);
 }
@@ -388,11 +432,11 @@ io_write (SoupMessage *msg, gboolean blocking,
        GBytes *chunk;
        gssize nwrote;
 
-       if (io->async_close_error) {
-               g_propagate_error (error, io->async_close_error);
-               io->async_close_error = NULL;
+       if (io->async_error) {
+               g_propagate_error (error, io->async_error);
+               io->async_error = NULL;
                return FALSE;
-       } else if (io->async_close_wait) {
+       } else if (io->async_wait) {
                g_set_error_literal (error, G_IO_ERROR,
                                     G_IO_ERROR_WOULD_BLOCK,
                                     _("Operation would block"));
@@ -503,6 +547,36 @@ io_write (SoupMessage *msg, gboolean blocking,
                        break;
                }
 
+               if (io->mode == SOUP_MESSAGE_IO_CLIENT && msg->request_body_stream) {
+                       g_signal_connect_object (io->body_ostream,
+                                                "wrote-data",
+                                                G_CALLBACK (request_body_stream_wrote_data_cb),
+                                                msg, G_CONNECT_SWAPPED);
+                       if (blocking) {
+                               nwrote = g_output_stream_splice (io->body_ostream,
+                                                                msg->request_body_stream,
+                                                                G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE,
+                                                                cancellable,
+                                                                error);
+                               if (nwrote == -1)
+                                       return FALSE;
+                               io->write_state = SOUP_MESSAGE_IO_STATE_BODY_FLUSH;
+                               break;
+                       } else {
+                               io->async_wait = g_cancellable_new ();
+                               g_main_context_push_thread_default (io->async_context);
+                               g_output_stream_splice_async (io->body_ostream,
+                                                             msg->request_body_stream,
+                                                             G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE,
+                                                             G_PRIORITY_DEFAULT,
+                                                             cancellable,
+                                                             
(GAsyncReadyCallback)request_body_stream_wrote_cb,
+                                                             g_object_ref (msg));
+                               g_main_context_pop_thread_default (io->async_context);
+                               return FALSE;
+                       }
+               }
+
                if (!io->write_chunk) {
                        io->write_chunk = soup_message_body_get_chunk (io->write_body, io->write_body_offset);
                        if (!io->write_chunk) {
@@ -562,7 +636,7 @@ io_write (SoupMessage *msg, gboolean blocking,
                                        return FALSE;
                                g_clear_object (&io->body_ostream);
                        } else {
-                               io->async_close_wait = g_cancellable_new ();
+                               io->async_wait = g_cancellable_new ();
                                g_main_context_push_thread_default (io->async_context);
                                g_output_stream_close_async (io->body_ostream,
                                                             G_PRIORITY_DEFAULT, cancellable,
@@ -883,8 +957,8 @@ soup_message_io_get_source (SoupMessage *msg, GCancellable *cancellable,
                base_source = g_timeout_source_new (0);
        } else if (io->paused) {
                base_source = NULL;
-       } else if (io->async_close_wait) {
-               base_source = g_cancellable_source_new (io->async_close_wait);
+       } else if (io->async_wait) {
+               base_source = g_cancellable_source_new (io->async_wait);
        } else if (SOUP_MESSAGE_IO_STATE_POLLABLE (io->read_state)) {
                GPollableInputStream *istream;
 
@@ -958,7 +1032,7 @@ io_run_until (SoupMessage *msg, gboolean blocking,
 
        g_object_ref (msg);
 
-       while (progress && soup_message_get_io_data (msg) == io && !io->paused && !io->async_close_wait &&
+       while (progress && soup_message_get_io_data (msg) == io && !io->paused && !io->async_wait &&
               (io->read_state < read_state || io->write_state < write_state)) {
 
                if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->read_state))
@@ -988,7 +1062,7 @@ io_run_until (SoupMessage *msg, gboolean blocking,
                                     _("Operation was cancelled"));
                g_object_unref (msg);
                return FALSE;
-       } else if (!io->async_close_wait &&
+       } else if (!io->async_wait &&
                   g_cancellable_set_error_if_cancelled (cancellable, error)) {
                g_object_unref (msg);
                return FALSE;
diff --git a/libsoup/soup-message.c b/libsoup/soup-message.c
index 393dbd35..a9a55902 100644
--- a/libsoup/soup-message.c
+++ b/libsoup/soup-message.c
@@ -1129,6 +1129,75 @@ soup_message_set_response (SoupMessage    *msg,
        }
 }
 
+/**
+ * soup_message_set_request_body:
+ * @msg: the message
+ * @content_type: (allow-none): MIME Content-Type of the body
+ * @stream: (allow-none): a #GInputStream to read the request body from
+ * @content_length: the byte length of @tream or -1 if unknown
+ *
+ * Set the request body of a #SoupMessage. If
+ * @content_type is %NULL, the request body must be empty (or @stream %NULL) as well.
+ * The request body needs to be set again in case @msg is restarted
+ * (in case of redirection or authentication).
+ */
+void
+soup_message_set_request_body (SoupMessage  *msg,
+                               const char   *content_type,
+                               GInputStream *stream,
+                               gssize        content_length)
+{
+        g_return_if_fail (SOUP_IS_MESSAGE (msg));
+        g_return_if_fail (content_type != NULL || content_length == 0);
+        g_return_if_fail (content_type == NULL || G_IS_INPUT_STREAM (stream));
+
+        g_clear_object (&msg->request_body_stream);
+
+        if (content_type) {
+                g_warn_if_fail (strchr (content_type, '/') != NULL);
+
+                soup_message_headers_replace (msg->request_headers, "Content-Type", content_type);
+                if (content_length == -1)
+                        soup_message_headers_set_encoding (msg->request_headers, SOUP_ENCODING_CHUNKED);
+                else
+                        soup_message_headers_set_content_length (msg->request_headers, content_length);
+                msg->request_body_stream = g_object_ref (stream);
+        } else {
+                soup_message_headers_remove (msg->request_headers, "Content-Type");
+                soup_message_headers_remove (msg->request_headers, "Content-Length");
+        }
+}
+
+/**
+ * soup_message_set_request_body_from_bytes:
+ * @msg: the message
+ * @content_type: (allow-none): MIME Content-Type of the body
+ * @bytes: (allow-none): a #GBytes with the request body data
+ *
+ * Set the request body of a #SoupMessage from #GBytes. If
+ * @content_type is %NULL, the request body must be empty (or @bytes %NULL) as well.
+ * The request body needs to be set again in case @msg is restarted
+ * (in case of redirection or authentication).
+ */
+void
+soup_message_set_request_body_from_bytes (SoupMessage  *msg,
+                                          const char   *content_type,
+                                          GBytes       *bytes)
+{
+        g_return_if_fail (SOUP_IS_MESSAGE (msg));
+        g_return_if_fail (content_type == NULL || bytes != NULL);
+        g_return_if_fail (content_type != NULL || g_bytes_get_size (bytes) == 0);
+
+       if (bytes) {
+               GInputStream *stream;
+
+               stream = g_memory_input_stream_new_from_bytes (bytes);
+               soup_message_set_request_body (msg, content_type, stream, g_bytes_get_size (bytes));
+               g_object_unref (stream);
+       } else
+               soup_message_set_request_body (msg, NULL, NULL, 0);
+}
+
 void
 soup_message_wrote_informational (SoupMessage *msg)
 {
@@ -1203,6 +1272,8 @@ soup_message_restarted (SoupMessage *msg)
        if (priv->msg_flags & SOUP_MESSAGE_CAN_REBUILD)
                soup_message_body_truncate (msg->request_body);
 
+       g_clear_object (&msg->request_body_stream);
+
        g_signal_emit (msg, signals[RESTARTED], 0);
 }
 
diff --git a/libsoup/soup-message.h b/libsoup/soup-message.h
index 70a0becb..91cd5f7d 100644
--- a/libsoup/soup-message.h
+++ b/libsoup/soup-message.h
@@ -30,6 +30,7 @@ struct _SoupMessage {
        char               *reason_phrase;
 
        SoupMessageBody    *request_body;
+       GInputStream       *request_body_stream;
        SoupMessageHeaders *request_headers;
 
        SoupMessageBody    *response_body;
@@ -100,6 +101,15 @@ void           soup_message_set_response        (SoupMessage       *msg,
                                                 SoupMemoryUse      resp_use,
                                                 const char        *resp_body,
                                                 gsize              resp_length);
+SOUP_AVAILABLE_IN_ALL
+void           soup_message_set_request_body    (SoupMessage       *msg,
+                                                const char        *content_type,
+                                                GInputStream      *stream,
+                                                gssize             content_length);
+SOUP_AVAILABLE_IN_ALL
+void           soup_message_set_request_body_from_bytes (SoupMessage  *msg,
+                                                        const char   *content_type,
+                                                        GBytes       *bytes);
 
 typedef enum {
        SOUP_HTTP_1_0 = 0, /*< nick=http-1-0 >*/
diff --git a/libsoup/soup-version.h.in b/libsoup/soup-version.h.in
index 53605dfa..308c2f27 100644
--- a/libsoup/soup-version.h.in
+++ b/libsoup/soup-version.h.in
@@ -109,6 +109,7 @@ G_BEGIN_DECLS
 #error "SOUP_VERSION_MIN_REQUIRED must be >= SOUP_VERSION_2_24"
 #endif
 
+#define SOUP_AVAILABLE_IN_ALL                   _SOUP_EXTERN
 #define SOUP_AVAILABLE_IN_2_4                   _SOUP_EXTERN
 
 #if SOUP_VERSION_MIN_REQUIRED >= SOUP_VERSION_2_24
diff --git a/tests/meson.build b/tests/meson.build
index 84e679ea..b01397cd 100644
--- a/tests/meson.build
+++ b/tests/meson.build
@@ -35,7 +35,6 @@ endif
 # ['name', is_parallel, extra_deps]
 tests = [
   ['cache', true, []],
-  ['chunk', true, []],
   ['chunk-io', true, []],
   ['coding', true, []],
   ['context', true, []],
@@ -51,6 +50,7 @@ tests = [
   ['no-ssl', true, []],
   ['ntlm', true, []],
   ['redirect', true, []],
+  ['request-body', true, []],
   ['resource', true, []],
   ['samesite', true, []],
   ['session', true, []],


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]