[libsoup] Fix asynchronicity of soup_session_queue_message() on plain SoupSession
- From: Dan Winship <danw src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [libsoup] Fix asynchronicity of soup_session_queue_message() on plain SoupSession
- Date: Sat, 28 Sep 2013 20:22:00 +0000 (UTC)
commit 3a308914423d4aa16e5546c1484c983c69fd2780
Author: Dan Winship <danw gnome org>
Date: Sat Sep 28 16:09:06 2013 -0400
Fix asynchronicity of soup_session_queue_message() on plain SoupSession
Messages sent via soup_session_queue_message() on a plain SoupSession
accidentally ended up using blocking I/O. Fix this (and also make
switching between sync and async ops during a streaming operation work
better).
https://bugzilla.gnome.org/show_bug.cgi?id=707711
libsoup/soup-client-input-stream.c | 4 +-
libsoup/soup-message-client-io.c | 2 +-
libsoup/soup-message-io.c | 88 ++++++++++++++++++++----------------
libsoup/soup-message-private.h | 3 +
libsoup/soup-session.c | 4 +-
5 files changed, 57 insertions(+), 44 deletions(-)
---
diff --git a/libsoup/soup-client-input-stream.c b/libsoup/soup-client-input-stream.c
index 3e533ca..d73fb00 100644
--- a/libsoup/soup-client-input-stream.c
+++ b/libsoup/soup-client-input-stream.c
@@ -129,7 +129,7 @@ soup_client_input_stream_close_fn (GInputStream *stream,
{
SoupClientInputStream *cistream = SOUP_CLIENT_INPUT_STREAM (stream);
- return soup_message_io_run_until_finish (cistream->priv->msg,
+ return soup_message_io_run_until_finish (cistream->priv->msg, TRUE,
cancellable, error);
}
@@ -150,7 +150,7 @@ close_async_ready (SoupMessage *msg, gpointer user_data)
SoupClientInputStream *cistream = g_task_get_source_object (task);
GError *error = NULL;
- if (!soup_message_io_run_until_finish (cistream->priv->msg,
+ if (!soup_message_io_run_until_finish (cistream->priv->msg, FALSE,
g_task_get_cancellable (task),
&error) &&
g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
diff --git a/libsoup/soup-message-client-io.c b/libsoup/soup-message-client-io.c
index 1d96729..b145bba 100644
--- a/libsoup/soup-message-client-io.c
+++ b/libsoup/soup-message-client-io.c
@@ -150,7 +150,7 @@ soup_message_send_request (SoupMessageQueueItem *item,
GMainContext *async_context;
GIOStream *iostream;
- if (SOUP_IS_SESSION_ASYNC (item->session)) {
+ if (!SOUP_IS_SESSION_SYNC (item->session)) {
async_context = soup_session_get_async_context (item->session);
if (!async_context)
async_context = g_main_context_default ();
diff --git a/libsoup/soup-message-io.c b/libsoup/soup-message-io.c
index 497fd06..f5f4c51 100644
--- a/libsoup/soup-message-io.c
+++ b/libsoup/soup-message-io.c
@@ -60,7 +60,6 @@ typedef struct {
GOutputStream *ostream;
GOutputStream *body_ostream;
GMainContext *async_context;
- gboolean blocking;
SoupMessageIOState read_state;
SoupEncoding read_encoding;
@@ -167,7 +166,8 @@ soup_message_io_finished (SoupMessage *msg)
}
static gboolean
-read_headers (SoupMessage *msg, GCancellable *cancellable, GError **error)
+read_headers (SoupMessage *msg, gboolean blocking,
+ GCancellable *cancellable, GError **error)
{
SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
SoupMessageIOData *io = priv->io_data;
@@ -180,7 +180,7 @@ read_headers (SoupMessage *msg, GCancellable *cancellable, GError **error)
nread = soup_filter_input_stream_read_line (io->istream,
io->read_header_buf->data + old_len,
RESPONSE_BLOCK_SIZE,
- io->blocking,
+ blocking,
&got_lf,
cancellable, error);
io->read_header_buf->len = old_len + MAX (nread, 0);
@@ -303,7 +303,8 @@ soup_message_setup_body_istream (GInputStream *body_stream,
* socket not writable, write is complete, etc).
*/
static gboolean
-io_write (SoupMessage *msg, GCancellable *cancellable, GError **error)
+io_write (SoupMessage *msg, gboolean blocking,
+ GCancellable *cancellable, GError **error)
{
SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
SoupMessageIOData *io = priv->io_data;
@@ -322,7 +323,7 @@ io_write (SoupMessage *msg, GCancellable *cancellable, GError **error)
nwrote = g_pollable_stream_write (io->ostream,
io->write_buf->str + io->written,
io->write_buf->len - io->written,
- io->blocking,
+ blocking,
cancellable, error);
if (nwrote == -1)
return FALSE;
@@ -414,7 +415,7 @@ io_write (SoupMessage *msg, GCancellable *cancellable, GError **error)
nwrote = g_pollable_stream_write (io->body_ostream,
io->write_chunk->data + io->written,
io->write_chunk->length - io->written,
- io->blocking,
+ blocking,
cancellable, error);
if (nwrote == -1)
return FALSE;
@@ -486,7 +487,8 @@ io_write (SoupMessage *msg, GCancellable *cancellable, GError **error)
* socket not readable, read is complete, etc).
*/
static gboolean
-io_read (SoupMessage *msg, GCancellable *cancellable, GError **error)
+io_read (SoupMessage *msg, gboolean blocking,
+ GCancellable *cancellable, GError **error)
{
SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
SoupMessageIOData *io = priv->io_data;
@@ -497,7 +499,7 @@ io_read (SoupMessage *msg, GCancellable *cancellable, GError **error)
switch (io->read_state) {
case SOUP_MESSAGE_IO_STATE_HEADERS:
- if (!read_headers (msg, cancellable, error))
+ if (!read_headers (msg, blocking, cancellable, error))
return FALSE;
status = io->parse_headers_cb (msg, (char *)io->read_header_buf->data,
@@ -608,7 +610,8 @@ io_read (SoupMessage *msg, GCancellable *cancellable, GError **error)
const char *content_type;
GHashTable *params;
- if (!soup_content_sniffer_stream_is_ready (sniffer_stream, io->blocking, cancellable,
error))
+ if (!soup_content_sniffer_stream_is_ready (sniffer_stream, blocking,
+ cancellable, error))
return FALSE;
content_type = soup_content_sniffer_stream_sniff (sniffer_stream, ¶ms);
@@ -638,7 +641,7 @@ io_read (SoupMessage *msg, GCancellable *cancellable, GError **error)
nread = g_pollable_stream_read (io->body_istream,
(guchar *)buffer->data,
buffer->length,
- io->blocking,
+ blocking,
cancellable, error);
if (nread > 0) {
buffer->length = nread;
@@ -829,7 +832,7 @@ request_is_restartable (SoupMessage *msg, GError *error)
}
static gboolean
-io_run_until (SoupMessage *msg,
+io_run_until (SoupMessage *msg, gboolean blocking,
SoupMessageIOState read_state, SoupMessageIOState write_state,
GCancellable *cancellable, GError **error)
{
@@ -853,9 +856,9 @@ io_run_until (SoupMessage *msg,
(io->read_state < read_state || io->write_state < write_state)) {
if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->read_state))
- progress = io_read (msg, cancellable, &my_error);
+ progress = io_read (msg, blocking, cancellable, &my_error);
else if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->write_state))
- progress = io_write (msg, cancellable, &my_error);
+ progress = io_write (msg, blocking, cancellable, &my_error);
else
progress = FALSE;
}
@@ -887,7 +890,7 @@ io_run_until (SoupMessage *msg,
done = (io->read_state >= read_state &&
io->write_state >= write_state);
- if (io->paused && !done) {
+ if (!blocking && !done) {
g_set_error_literal (error, G_IO_ERROR,
G_IO_ERROR_WOULD_BLOCK,
_("Operation would block"));
@@ -899,8 +902,17 @@ io_run_until (SoupMessage *msg,
return done;
}
+static void io_run (SoupMessage *msg, gboolean blocking);
+
static gboolean
-io_run (SoupMessage *msg, gpointer user_data)
+io_run_ready (SoupMessage *msg, gpointer user_data)
+{
+ io_run (msg, FALSE);
+ return FALSE;
+}
+
+static void
+io_run (SoupMessage *msg, gboolean blocking)
{
SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
SoupMessageIOData *io = priv->io_data;
@@ -916,14 +928,14 @@ io_run (SoupMessage *msg, gpointer user_data)
g_object_ref (msg);
cancellable = io->cancellable ? g_object_ref (io->cancellable) : NULL;
- if (io_run_until (msg,
+ if (io_run_until (msg, blocking,
SOUP_MESSAGE_IO_STATE_DONE,
SOUP_MESSAGE_IO_STATE_DONE,
cancellable, &error)) {
soup_message_io_finished (msg);
} else if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
g_clear_error (&error);
- io->io_source = soup_message_io_get_source (msg, NULL, io_run, msg);
+ io->io_source = soup_message_io_get_source (msg, NULL, io_run_ready, msg);
g_source_attach (io->io_source, io->async_context);
} else if (error && priv->io_data == io) {
if (g_error_matches (error, SOUP_HTTP_ERROR, SOUP_STATUS_TRY_AGAIN))
@@ -942,25 +954,23 @@ io_run (SoupMessage *msg, gpointer user_data)
g_object_unref (msg);
g_clear_object (&cancellable);
-
- return FALSE;
}
gboolean
-soup_message_io_run_until_write (SoupMessage *msg,
+soup_message_io_run_until_write (SoupMessage *msg, gboolean blocking,
GCancellable *cancellable, GError **error)
{
- return io_run_until (msg,
+ return io_run_until (msg, blocking,
SOUP_MESSAGE_IO_STATE_ANY,
SOUP_MESSAGE_IO_STATE_BODY,
cancellable, error);
}
gboolean
-soup_message_io_run_until_read (SoupMessage *msg,
+soup_message_io_run_until_read (SoupMessage *msg, gboolean blocking,
GCancellable *cancellable, GError **error)
{
- return io_run_until (msg,
+ return io_run_until (msg, blocking,
SOUP_MESSAGE_IO_STATE_BODY,
SOUP_MESSAGE_IO_STATE_ANY,
cancellable, error);
@@ -968,12 +978,13 @@ soup_message_io_run_until_read (SoupMessage *msg,
gboolean
soup_message_io_run_until_finish (SoupMessage *msg,
+ gboolean blocking,
GCancellable *cancellable,
GError **error)
{
g_object_ref (msg);
- if (!io_run_until (msg,
+ if (!io_run_until (msg, blocking,
SOUP_MESSAGE_IO_STATE_DONE,
SOUP_MESSAGE_IO_STATE_DONE,
cancellable, error)) {
@@ -1045,11 +1056,8 @@ new_iostate (SoupMessage *msg, GIOStream *iostream,
io->istream = SOUP_FILTER_INPUT_STREAM (g_io_stream_get_input_stream (iostream));
io->ostream = g_io_stream_get_output_stream (iostream);
- if (async_context) {
+ if (async_context)
io->async_context = g_main_context_ref (async_context);
- io->blocking = FALSE;
- } else
- io->blocking = TRUE;
io->read_header_buf = g_byte_array_new ();
io->write_buf = g_string_new (NULL);
@@ -1088,8 +1096,13 @@ soup_message_io_client (SoupMessageQueueItem *item,
io->write_body = item->msg->request_body;
io->write_state = SOUP_MESSAGE_IO_STATE_HEADERS;
- if (!item->new_api)
- io_run (item->msg, NULL);
+
+ if (!item->new_api) {
+ gboolean blocking =
+ SOUP_IS_SESSION_SYNC (item->session) ||
+ (!SOUP_IS_SESSION_ASYNC (item->session) && !item->async);
+ io_run (item->msg, blocking);
+ }
}
void
@@ -1112,7 +1125,7 @@ soup_message_io_server (SoupMessage *msg,
io->write_body = msg->response_body;
io->read_state = SOUP_MESSAGE_IO_STATE_HEADERS;
- io_run (msg, NULL);
+ io_run (msg, FALSE);
}
void
@@ -1155,7 +1168,7 @@ io_unpause_internal (gpointer msg)
if (io->io_source)
return FALSE;
- io_run (msg, NULL);
+ io_run (msg, FALSE);
return FALSE;
}
@@ -1173,13 +1186,10 @@ soup_message_io_unpause (SoupMessage *msg)
return;
}
- if (!io->blocking) {
- if (!io->unpause_source) {
- io->unpause_source = soup_add_completion_reffed (
- io->async_context, io_unpause_internal, msg);
- }
- } else
- io_unpause_internal (msg);
+ if (!io->unpause_source) {
+ io->unpause_source = soup_add_completion_reffed (io->async_context,
+ io_unpause_internal, msg);
+ }
}
/**
diff --git a/libsoup/soup-message-private.h b/libsoup/soup-message-private.h
index 356b96d..35cc988 100644
--- a/libsoup/soup-message-private.h
+++ b/libsoup/soup-message-private.h
@@ -105,12 +105,15 @@ void soup_message_io_unpause (SoupMessage *msg);
gboolean soup_message_io_in_progress (SoupMessage *msg);
gboolean soup_message_io_run_until_write (SoupMessage *msg,
+ gboolean blocking,
GCancellable *cancellable,
GError **error);
gboolean soup_message_io_run_until_read (SoupMessage *msg,
+ gboolean blocking,
GCancellable *cancellable,
GError **error);
gboolean soup_message_io_run_until_finish (SoupMessage *msg,
+ gboolean blocking,
GCancellable *cancellable,
GError **error);
diff --git a/libsoup/soup-session.c b/libsoup/soup-session.c
index 85ef1b2..a9258ef 100644
--- a/libsoup/soup-session.c
+++ b/libsoup/soup-session.c
@@ -3767,7 +3767,7 @@ try_run_until_read (SoupMessageQueueItem *item)
GError *error = NULL;
GInputStream *stream = NULL;
- if (soup_message_io_run_until_read (item->msg, item->cancellable, &error))
+ if (soup_message_io_run_until_read (item->msg, FALSE, item->cancellable, &error))
stream = soup_message_io_get_response_istream (item->msg, &error);
if (stream) {
send_async_maybe_complete (item, stream);
@@ -4157,7 +4157,7 @@ soup_session_send (SoupSession *session,
break;
/* Send request, read headers */
- if (!soup_message_io_run_until_read (msg, item->cancellable, &my_error)) {
+ if (!soup_message_io_run_until_read (msg, TRUE, item->cancellable, &my_error)) {
if (g_error_matches (my_error, SOUP_HTTP_ERROR, SOUP_STATUS_TRY_AGAIN)) {
item->state = SOUP_MESSAGE_RESTARTING;
soup_message_io_finished (item->msg);
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]