[libsoup] Fix asynchronicity of soup_session_queue_message() on plain SoupSession



commit 3a308914423d4aa16e5546c1484c983c69fd2780
Author: Dan Winship <danw gnome org>
Date:   Sat Sep 28 16:09:06 2013 -0400

    Fix asynchronicity of soup_session_queue_message() on plain SoupSession
    
    Messages sent via soup_session_queue_message() on a plain SoupSession
    accidentally ended up using blocking I/O. Fix this (and also make
    switching between sync and async ops during a streaming operation work
    better).
    
    https://bugzilla.gnome.org/show_bug.cgi?id=707711

 libsoup/soup-client-input-stream.c |    4 +-
 libsoup/soup-message-client-io.c   |    2 +-
 libsoup/soup-message-io.c          |   88 ++++++++++++++++++++----------------
 libsoup/soup-message-private.h     |    3 +
 libsoup/soup-session.c             |    4 +-
 5 files changed, 57 insertions(+), 44 deletions(-)
---
diff --git a/libsoup/soup-client-input-stream.c b/libsoup/soup-client-input-stream.c
index 3e533ca..d73fb00 100644
--- a/libsoup/soup-client-input-stream.c
+++ b/libsoup/soup-client-input-stream.c
@@ -129,7 +129,7 @@ soup_client_input_stream_close_fn (GInputStream  *stream,
 {
        SoupClientInputStream *cistream = SOUP_CLIENT_INPUT_STREAM (stream);
 
-       return soup_message_io_run_until_finish (cistream->priv->msg,
+       return soup_message_io_run_until_finish (cistream->priv->msg, TRUE,
                                                 cancellable, error);
 }
 
@@ -150,7 +150,7 @@ close_async_ready (SoupMessage *msg, gpointer user_data)
        SoupClientInputStream *cistream = g_task_get_source_object (task);
        GError *error = NULL;
 
-       if (!soup_message_io_run_until_finish (cistream->priv->msg,
+       if (!soup_message_io_run_until_finish (cistream->priv->msg, FALSE,
                                               g_task_get_cancellable (task),
                                               &error) &&
            g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
diff --git a/libsoup/soup-message-client-io.c b/libsoup/soup-message-client-io.c
index 1d96729..b145bba 100644
--- a/libsoup/soup-message-client-io.c
+++ b/libsoup/soup-message-client-io.c
@@ -150,7 +150,7 @@ soup_message_send_request (SoupMessageQueueItem      *item,
        GMainContext *async_context;
        GIOStream *iostream;
 
-       if (SOUP_IS_SESSION_ASYNC (item->session)) {
+       if (!SOUP_IS_SESSION_SYNC (item->session)) {
                async_context = soup_session_get_async_context (item->session);
                if (!async_context)
                        async_context = g_main_context_default ();
diff --git a/libsoup/soup-message-io.c b/libsoup/soup-message-io.c
index 497fd06..f5f4c51 100644
--- a/libsoup/soup-message-io.c
+++ b/libsoup/soup-message-io.c
@@ -60,7 +60,6 @@ typedef struct {
        GOutputStream          *ostream;
        GOutputStream          *body_ostream;
        GMainContext           *async_context;
-       gboolean                blocking;
 
        SoupMessageIOState    read_state;
        SoupEncoding          read_encoding;
@@ -167,7 +166,8 @@ soup_message_io_finished (SoupMessage *msg)
 }
 
 static gboolean
-read_headers (SoupMessage *msg, GCancellable *cancellable, GError **error)
+read_headers (SoupMessage *msg, gboolean blocking,
+             GCancellable *cancellable, GError **error)
 {
        SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
        SoupMessageIOData *io = priv->io_data;
@@ -180,7 +180,7 @@ read_headers (SoupMessage *msg, GCancellable *cancellable, GError **error)
                nread = soup_filter_input_stream_read_line (io->istream,
                                                            io->read_header_buf->data + old_len,
                                                            RESPONSE_BLOCK_SIZE,
-                                                           io->blocking,
+                                                           blocking,
                                                            &got_lf,
                                                            cancellable, error);
                io->read_header_buf->len = old_len + MAX (nread, 0);
@@ -303,7 +303,8 @@ soup_message_setup_body_istream (GInputStream *body_stream,
  * socket not writable, write is complete, etc).
  */
 static gboolean
-io_write (SoupMessage *msg, GCancellable *cancellable, GError **error)
+io_write (SoupMessage *msg, gboolean blocking,
+         GCancellable *cancellable, GError **error)
 {
        SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
        SoupMessageIOData *io = priv->io_data;
@@ -322,7 +323,7 @@ io_write (SoupMessage *msg, GCancellable *cancellable, GError **error)
                        nwrote = g_pollable_stream_write (io->ostream,
                                                          io->write_buf->str + io->written,
                                                          io->write_buf->len - io->written,
-                                                         io->blocking,
+                                                         blocking,
                                                          cancellable, error);
                        if (nwrote == -1)
                                return FALSE;
@@ -414,7 +415,7 @@ io_write (SoupMessage *msg, GCancellable *cancellable, GError **error)
                nwrote = g_pollable_stream_write (io->body_ostream,
                                                  io->write_chunk->data + io->written,
                                                  io->write_chunk->length - io->written,
-                                                 io->blocking,
+                                                 blocking,
                                                  cancellable, error);
                if (nwrote == -1)
                        return FALSE;
@@ -486,7 +487,8 @@ io_write (SoupMessage *msg, GCancellable *cancellable, GError **error)
  * socket not readable, read is complete, etc).
  */
 static gboolean
-io_read (SoupMessage *msg, GCancellable *cancellable, GError **error)
+io_read (SoupMessage *msg, gboolean blocking,
+        GCancellable *cancellable, GError **error)
 {
        SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
        SoupMessageIOData *io = priv->io_data;
@@ -497,7 +499,7 @@ io_read (SoupMessage *msg, GCancellable *cancellable, GError **error)
 
        switch (io->read_state) {
        case SOUP_MESSAGE_IO_STATE_HEADERS:
-               if (!read_headers (msg, cancellable, error))
+               if (!read_headers (msg, blocking, cancellable, error))
                        return FALSE;
 
                status = io->parse_headers_cb (msg, (char *)io->read_header_buf->data,
@@ -608,7 +610,8 @@ io_read (SoupMessage *msg, GCancellable *cancellable, GError **error)
                        const char *content_type;
                        GHashTable *params;
 
-                       if (!soup_content_sniffer_stream_is_ready (sniffer_stream, io->blocking, cancellable, 
error))
+                       if (!soup_content_sniffer_stream_is_ready (sniffer_stream, blocking,
+                                                                  cancellable, error))
                                return FALSE;
 
                        content_type = soup_content_sniffer_stream_sniff (sniffer_stream, &params);
@@ -638,7 +641,7 @@ io_read (SoupMessage *msg, GCancellable *cancellable, GError **error)
                nread = g_pollable_stream_read (io->body_istream,
                                                (guchar *)buffer->data,
                                                buffer->length,
-                                               io->blocking,
+                                               blocking,
                                                cancellable, error);
                if (nread > 0) {
                        buffer->length = nread;
@@ -829,7 +832,7 @@ request_is_restartable (SoupMessage *msg, GError *error)
 }
 
 static gboolean
-io_run_until (SoupMessage *msg,
+io_run_until (SoupMessage *msg, gboolean blocking,
              SoupMessageIOState read_state, SoupMessageIOState write_state,
              GCancellable *cancellable, GError **error)
 {
@@ -853,9 +856,9 @@ io_run_until (SoupMessage *msg,
               (io->read_state < read_state || io->write_state < write_state)) {
 
                if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->read_state))
-                       progress = io_read (msg, cancellable, &my_error);
+                       progress = io_read (msg, blocking, cancellable, &my_error);
                else if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->write_state))
-                       progress = io_write (msg, cancellable, &my_error);
+                       progress = io_write (msg, blocking, cancellable, &my_error);
                else
                        progress = FALSE;
        }
@@ -887,7 +890,7 @@ io_run_until (SoupMessage *msg,
        done = (io->read_state >= read_state &&
                io->write_state >= write_state);
 
-       if (io->paused && !done) {
+       if (!blocking && !done) {
                g_set_error_literal (error, G_IO_ERROR,
                                     G_IO_ERROR_WOULD_BLOCK,
                                     _("Operation would block"));
@@ -899,8 +902,17 @@ io_run_until (SoupMessage *msg,
        return done;
 }
 
+static void io_run (SoupMessage *msg, gboolean blocking);
+
 static gboolean
-io_run (SoupMessage *msg, gpointer user_data)
+io_run_ready (SoupMessage *msg, gpointer user_data)
+{
+       io_run (msg, FALSE);
+       return FALSE;
+}
+
+static void
+io_run (SoupMessage *msg, gboolean blocking)
 {
        SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
        SoupMessageIOData *io = priv->io_data;
@@ -916,14 +928,14 @@ io_run (SoupMessage *msg, gpointer user_data)
        g_object_ref (msg);
        cancellable = io->cancellable ? g_object_ref (io->cancellable) : NULL;
 
-       if (io_run_until (msg,
+       if (io_run_until (msg, blocking,
                          SOUP_MESSAGE_IO_STATE_DONE,
                          SOUP_MESSAGE_IO_STATE_DONE,
                          cancellable, &error)) {
                soup_message_io_finished (msg);
        } else if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
                g_clear_error (&error);
-               io->io_source = soup_message_io_get_source (msg, NULL, io_run, msg);
+               io->io_source = soup_message_io_get_source (msg, NULL, io_run_ready, msg);
                g_source_attach (io->io_source, io->async_context);
        } else if (error && priv->io_data == io) {
                if (g_error_matches (error, SOUP_HTTP_ERROR, SOUP_STATUS_TRY_AGAIN))
@@ -942,25 +954,23 @@ io_run (SoupMessage *msg, gpointer user_data)
 
        g_object_unref (msg);
        g_clear_object (&cancellable);
-
-       return FALSE;
 }
 
 gboolean
-soup_message_io_run_until_write (SoupMessage *msg,
+soup_message_io_run_until_write (SoupMessage *msg, gboolean blocking,
                                 GCancellable *cancellable, GError **error)
 {
-       return io_run_until (msg,
+       return io_run_until (msg, blocking,
                             SOUP_MESSAGE_IO_STATE_ANY,
                             SOUP_MESSAGE_IO_STATE_BODY,
                             cancellable, error);
 }
 
 gboolean
-soup_message_io_run_until_read (SoupMessage *msg,
+soup_message_io_run_until_read (SoupMessage *msg, gboolean blocking,
                                GCancellable *cancellable, GError **error)
 {
-       return io_run_until (msg,
+       return io_run_until (msg, blocking,
                             SOUP_MESSAGE_IO_STATE_BODY,
                             SOUP_MESSAGE_IO_STATE_ANY,
                             cancellable, error);
@@ -968,12 +978,13 @@ soup_message_io_run_until_read (SoupMessage *msg,
 
 gboolean
 soup_message_io_run_until_finish (SoupMessage   *msg,
+                                 gboolean       blocking,
                                  GCancellable  *cancellable,
                                  GError       **error)
 {
        g_object_ref (msg);
 
-       if (!io_run_until (msg,
+       if (!io_run_until (msg, blocking,
                           SOUP_MESSAGE_IO_STATE_DONE,
                           SOUP_MESSAGE_IO_STATE_DONE,
                           cancellable, error)) {
@@ -1045,11 +1056,8 @@ new_iostate (SoupMessage *msg, GIOStream *iostream,
        io->istream = SOUP_FILTER_INPUT_STREAM (g_io_stream_get_input_stream (iostream));
        io->ostream = g_io_stream_get_output_stream (iostream);
 
-       if (async_context) {
+       if (async_context)
                io->async_context = g_main_context_ref (async_context);
-               io->blocking = FALSE;
-       } else
-               io->blocking = TRUE;
 
        io->read_header_buf = g_byte_array_new ();
        io->write_buf       = g_string_new (NULL);
@@ -1088,8 +1096,13 @@ soup_message_io_client (SoupMessageQueueItem *item,
        io->write_body      = item->msg->request_body;
 
        io->write_state     = SOUP_MESSAGE_IO_STATE_HEADERS;
-       if (!item->new_api)
-               io_run (item->msg, NULL);
+
+       if (!item->new_api) {
+               gboolean blocking =
+                       SOUP_IS_SESSION_SYNC (item->session) ||
+                       (!SOUP_IS_SESSION_ASYNC (item->session) && !item->async);
+               io_run (item->msg, blocking);
+       }
 }
 
 void
@@ -1112,7 +1125,7 @@ soup_message_io_server (SoupMessage *msg,
        io->write_body      = msg->response_body;
 
        io->read_state      = SOUP_MESSAGE_IO_STATE_HEADERS;
-       io_run (msg, NULL);
+       io_run (msg, FALSE);
 }
 
 void  
@@ -1155,7 +1168,7 @@ io_unpause_internal (gpointer msg)
        if (io->io_source)
                return FALSE;
 
-       io_run (msg, NULL);
+       io_run (msg, FALSE);
        return FALSE;
 }
 
@@ -1173,13 +1186,10 @@ soup_message_io_unpause (SoupMessage *msg)
                return;
        }
 
-       if (!io->blocking) {
-               if (!io->unpause_source) {
-                       io->unpause_source = soup_add_completion_reffed (
-                               io->async_context, io_unpause_internal, msg);
-               }
-       } else
-               io_unpause_internal (msg);
+       if (!io->unpause_source) {
+               io->unpause_source = soup_add_completion_reffed (io->async_context,
+                                                                io_unpause_internal, msg);
+       }
 }
 
 /**
diff --git a/libsoup/soup-message-private.h b/libsoup/soup-message-private.h
index 356b96d..35cc988 100644
--- a/libsoup/soup-message-private.h
+++ b/libsoup/soup-message-private.h
@@ -105,12 +105,15 @@ void                soup_message_io_unpause     (SoupMessage          *msg);
 gboolean            soup_message_io_in_progress (SoupMessage          *msg);
 
 gboolean soup_message_io_run_until_write  (SoupMessage   *msg,
+                                          gboolean       blocking,
                                           GCancellable  *cancellable,
                                           GError       **error);
 gboolean soup_message_io_run_until_read   (SoupMessage   *msg,
+                                          gboolean       blocking,
                                           GCancellable  *cancellable,
                                           GError       **error);
 gboolean soup_message_io_run_until_finish (SoupMessage   *msg,
+                                          gboolean       blocking,
                                           GCancellable  *cancellable,
                                           GError       **error);
 
diff --git a/libsoup/soup-session.c b/libsoup/soup-session.c
index 85ef1b2..a9258ef 100644
--- a/libsoup/soup-session.c
+++ b/libsoup/soup-session.c
@@ -3767,7 +3767,7 @@ try_run_until_read (SoupMessageQueueItem *item)
        GError *error = NULL;
        GInputStream *stream = NULL;
 
-       if (soup_message_io_run_until_read (item->msg, item->cancellable, &error))
+       if (soup_message_io_run_until_read (item->msg, FALSE, item->cancellable, &error))
                stream = soup_message_io_get_response_istream (item->msg, &error);
        if (stream) {
                send_async_maybe_complete (item, stream);
@@ -4157,7 +4157,7 @@ soup_session_send (SoupSession   *session,
                        break;
 
                /* Send request, read headers */
-               if (!soup_message_io_run_until_read (msg, item->cancellable, &my_error)) {
+               if (!soup_message_io_run_until_read (msg, TRUE, item->cancellable, &my_error)) {
                        if (g_error_matches (my_error, SOUP_HTTP_ERROR, SOUP_STATUS_TRY_AGAIN)) {
                                item->state = SOUP_MESSAGE_RESTARTING;
                                soup_message_io_finished (item->msg);


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]