[libsoup] SoupHTTPRequest: O brave new world!



commit 9effb5ca942412ecde9242c745f2df6da80853a3
Author: Dan Winship <danw gnome org>
Date:   Thu Jan 26 16:25:57 2012 -0500

    SoupHTTPRequest: O brave new world!
    
    Kill SoupHTTPInputStream, and have SoupHTTPRequest return the
    message's body_istream directly (with a little help from SoupSession
    and its subclasses). SoupHTTPRequest works synchronously now as well
    (though it's still the case that async only works with
    SoupSessionAsync and sync only works with SoupSessionSync).
    
    https://bugzilla.gnome.org/show_bug.cgi?id=591739

 libsoup/Makefile.am                |    4 +-
 libsoup/soup-body-input-stream.c   |   28 ++
 libsoup/soup-client-input-stream.c |  280 +++++++++++++
 libsoup/soup-client-input-stream.h |   46 ++
 libsoup/soup-http-input-stream.c   |  793 ------------------------------------
 libsoup/soup-http-input-stream.h   |   79 ----
 libsoup/soup-message-io.c          |  310 +++++++++++---
 libsoup/soup-message-private.h     |   19 +
 libsoup/soup-message-queue.c       |    2 +
 libsoup/soup-message-queue.h       |    2 +
 libsoup/soup-request-http.c        |   71 +++-
 libsoup/soup-session-async.c       |  239 +++++++++++-
 libsoup/soup-session-private.h     |   14 +
 libsoup/soup-session-sync.c        |  117 ++++++-
 libsoup/soup-session.c             |    7 +-
 po/POTFILES.in                     |    1 +
 16 files changed, 1047 insertions(+), 965 deletions(-)
---
diff --git a/libsoup/Makefile.am b/libsoup/Makefile.am
index ad85b07..8fd8ee5 100644
--- a/libsoup/Makefile.am
+++ b/libsoup/Makefile.am
@@ -101,6 +101,8 @@ libsoup_2_4_la_SOURCES =		\
 	soup-body-output-stream.c	\
 	soup-cache.c			\
 	soup-cache-private.h		\
+	soup-client-input-stream.h	\
+	soup-client-input-stream.c	\
 	soup-connection.h		\
 	soup-connection.c		\
 	soup-content-decoder.c		\
@@ -121,8 +123,6 @@ libsoup_2_4_la_SOURCES =		\
 	soup-filter-input-stream.h	\
 	soup-form.c			\
 	soup-headers.c			\
-	soup-http-input-stream.h	\
-	soup-http-input-stream.c	\
 	soup-logger.c			\
 	soup-marshal.h			\
 	soup-marshal.c			\
diff --git a/libsoup/soup-body-input-stream.c b/libsoup/soup-body-input-stream.c
index 2c5d16e..a635bd5 100644
--- a/libsoup/soup-body-input-stream.c
+++ b/libsoup/soup-body-input-stream.c
@@ -18,6 +18,7 @@
 #include "soup-body-input-stream.h"
 #include "soup-enum-types.h"
 #include "soup-filter-input-stream.h"
+#include "soup-marshal.h"
 #include "soup-message-headers.h"
 
 typedef enum {
@@ -38,6 +39,13 @@ struct _SoupBodyInputStreamPrivate {
 };
 
 enum {
+	CLOSED,
+	LAST_SIGNAL
+};
+
+static guint signals[LAST_SIGNAL] = { 0 };
+
+enum {
 	PROP_0,
 
 	PROP_ENCODING,
@@ -270,6 +278,16 @@ soup_body_input_stream_read_fn (GInputStream  *stream,
 }
 
 static gboolean
+soup_body_input_stream_close_fn (GInputStream  *stream,
+				 GCancellable  *cancellable,
+				 GError       **error)
+{
+	g_signal_emit (stream, signals[CLOSED], 0);
+
+	return G_INPUT_STREAM_CLASS (soup_body_input_stream_parent_class)->close_fn (stream, cancellable, error);
+}
+
+static gboolean
 soup_body_input_stream_is_readable (GPollableInputStream *stream)
 {
 	SoupBodyInputStream *bistream = SOUP_BODY_INPUT_STREAM (stream);
@@ -321,6 +339,16 @@ soup_body_input_stream_class_init (SoupBodyInputStreamClass *stream_class)
 	object_class->get_property = get_property;
 
 	input_stream_class->read_fn = soup_body_input_stream_read_fn;
+	input_stream_class->close_fn = soup_body_input_stream_close_fn;
+
+	signals[CLOSED] =
+		g_signal_new ("closed",
+			      G_OBJECT_CLASS_TYPE (object_class),
+			      G_SIGNAL_RUN_LAST,
+			      0,
+			      NULL, NULL,
+			      _soup_marshal_NONE__NONE,
+			      G_TYPE_NONE, 0);
 
 	g_object_class_install_property (
 		object_class, PROP_ENCODING,
diff --git a/libsoup/soup-client-input-stream.c b/libsoup/soup-client-input-stream.c
new file mode 100644
index 0000000..8d1a2ea
--- /dev/null
+++ b/libsoup/soup-client-input-stream.c
@@ -0,0 +1,280 @@
+/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
+/*
+ * soup-client-input-stream.c
+ *
+ * Copyright 2010-2012 Red Hat, Inc.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#include <gio/gio.h>
+
+#include "soup-client-input-stream.h"
+#include "soup-marshal.h"
+#include "soup-message.h"
+#include "soup-message-private.h"
+
+struct _SoupClientInputStreamPrivate {
+	SoupMessage  *msg;
+};
+
+enum {
+	EOF,
+	LAST_SIGNAL
+};
+
+static guint signals[LAST_SIGNAL] = { 0 };
+
+enum {
+	PROP_0,
+
+	PROP_MESSAGE
+};
+
+static GPollableInputStreamInterface *soup_client_input_stream_parent_pollable_interface;
+static void soup_client_input_stream_pollable_init (GPollableInputStreamInterface *pollable_interface, gpointer interface_data);
+
+G_DEFINE_TYPE_WITH_CODE (SoupClientInputStream, soup_client_input_stream, SOUP_TYPE_FILTER_INPUT_STREAM,
+			 G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM,
+						soup_client_input_stream_pollable_init))
+
+static void
+soup_client_input_stream_init (SoupClientInputStream *stream)
+{
+	stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream,
+						    SOUP_TYPE_CLIENT_INPUT_STREAM,
+						    SoupClientInputStreamPrivate);
+}
+
+static void
+set_property (GObject *object, guint prop_id,
+	      const GValue *value, GParamSpec *pspec)
+{
+	SoupClientInputStream *cistream = SOUP_CLIENT_INPUT_STREAM (object);
+
+	switch (prop_id) {
+	case PROP_MESSAGE:
+		cistream->priv->msg = g_value_dup_object (value);
+		break;
+	default:
+		G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+		break;
+	}
+}
+
+static void
+get_property (GObject *object, guint prop_id,
+	      GValue *value, GParamSpec *pspec)
+{
+	SoupClientInputStream *cistream = SOUP_CLIENT_INPUT_STREAM (object);
+
+	switch (prop_id) {
+	case PROP_MESSAGE:
+		g_value_set_object (value, cistream->priv->msg);
+		break;
+	default:
+		G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+		break;
+	}
+}
+
+static gssize
+soup_client_input_stream_read_fn (GInputStream  *stream,
+				  void          *buffer,
+				  gsize          count,
+				  GCancellable  *cancellable,
+				  GError       **error)
+{
+	gssize nread;
+
+	nread = G_INPUT_STREAM_CLASS (soup_client_input_stream_parent_class)->
+		read_fn (stream, buffer, count, cancellable, error);
+
+	if (nread == 0)
+		g_signal_emit (stream, signals[EOF], 0);
+
+	return nread;
+}
+
+static gssize
+soup_client_input_stream_read_nonblocking (GPollableInputStream  *stream,
+					   void                  *buffer,
+					   gsize                  count,
+					   GError               **error)
+{
+	gssize nread;
+
+	nread = soup_client_input_stream_parent_pollable_interface->
+		read_nonblocking (stream, buffer, count, error);
+
+	if (nread == 0)
+		g_signal_emit (stream, signals[EOF], 0);
+
+	return nread;
+}
+
+static gboolean
+soup_client_input_stream_close_fn (GInputStream  *stream,
+				   GCancellable  *cancellable,
+				   GError       **error)
+{
+	SoupClientInputStream *cistream = SOUP_CLIENT_INPUT_STREAM (stream);
+
+	if (!soup_message_io_run_until_finish (cistream->priv->msg,
+					       cancellable, error))
+		return FALSE;
+
+	return G_INPUT_STREAM_CLASS (soup_client_input_stream_parent_class)->close_fn (stream, cancellable, error);
+}
+
+typedef struct {
+	SoupClientInputStream *cistream;
+	gint priority;
+	GCancellable *cancellable;
+	GSimpleAsyncResult *result;
+} CloseAsyncData;
+
+static void
+close_async_data_free (CloseAsyncData *cad)
+{
+	if (cad->cancellable)
+		g_object_unref (cad->cancellable);
+	g_object_unref (cad->result);
+	g_slice_free (CloseAsyncData, cad);
+}
+
+static void
+base_stream_closed (GObject *source, GAsyncResult *result, gpointer user_data)
+{
+	CloseAsyncData *cad = user_data;
+	GError *error = NULL;
+
+	if (G_INPUT_STREAM_CLASS (soup_client_input_stream_parent_class)->
+	    close_finish (G_INPUT_STREAM (cad->cistream), result, &error))
+		g_simple_async_result_set_op_res_gboolean (cad->result, TRUE);
+	else
+		g_simple_async_result_take_error (cad->result, error);
+
+	g_simple_async_result_complete_in_idle (cad->result);
+	close_async_data_free (cad);
+}
+
+static gboolean
+close_async_ready (SoupMessage *msg, gpointer user_data)
+{
+	CloseAsyncData *cad = user_data;
+	GError *error = NULL;
+
+	if (soup_message_io_run_until_finish (cad->cistream->priv->msg,
+					      cad->cancellable, &error)) {
+		G_INPUT_STREAM_CLASS (soup_client_input_stream_parent_class)->
+			close_async (G_INPUT_STREAM (cad->cistream),
+				     cad->priority,
+				     cad->cancellable,
+				     base_stream_closed,
+				     cad);
+		return FALSE;
+	} else if (!g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
+		g_simple_async_result_take_error (cad->result, error);
+		g_simple_async_result_complete_in_idle (cad->result);
+		close_async_data_free (cad);
+		return FALSE;
+	}
+
+	return TRUE;
+}
+
+static void
+soup_client_input_stream_close_async (GInputStream        *stream,
+				      gint                 priority,
+				      GCancellable        *cancellable,
+				      GAsyncReadyCallback  callback,
+				      gpointer             user_data)
+{
+	CloseAsyncData *cad;
+	GSource *source;
+
+	cad = g_slice_new (CloseAsyncData);
+	cad->cistream = SOUP_CLIENT_INPUT_STREAM (stream);
+	cad->result = g_simple_async_result_new (G_OBJECT (stream),
+						 callback, user_data,
+						 soup_client_input_stream_close_async);
+	cad->priority = priority;
+	cad->cancellable = cancellable ? g_object_ref (cancellable) : NULL;
+
+	source = soup_message_io_get_source (cad->cistream->priv->msg,
+					     cancellable,
+					     close_async_ready, cad);
+	g_source_set_priority (source, priority);
+	g_source_attach (source, g_main_context_get_thread_default ());
+	g_source_unref (source);
+}
+
+static gboolean
+soup_client_input_stream_close_finish (GInputStream  *stream,
+				       GAsyncResult  *result,
+				       GError       **error)
+{
+	GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
+
+	if (g_simple_async_result_propagate_error (simple, error))
+		return FALSE;
+	else
+		return g_simple_async_result_get_op_res_gboolean (simple);
+}
+
+static void
+soup_client_input_stream_class_init (SoupClientInputStreamClass *stream_class)
+{
+	GObjectClass *object_class = G_OBJECT_CLASS (stream_class);
+	GInputStreamClass *input_stream_class = G_INPUT_STREAM_CLASS (stream_class);
+
+	g_type_class_add_private (stream_class, sizeof (SoupClientInputStreamPrivate));
+
+	object_class->set_property = set_property;
+	object_class->get_property = get_property;
+
+	input_stream_class->read_fn = soup_client_input_stream_read_fn;
+	input_stream_class->close_fn = soup_client_input_stream_close_fn;
+	input_stream_class->close_async = soup_client_input_stream_close_async;
+	input_stream_class->close_finish = soup_client_input_stream_close_finish;
+
+	signals[EOF] =
+		g_signal_new ("eof",
+			      G_OBJECT_CLASS_TYPE (object_class),
+			      G_SIGNAL_RUN_LAST,
+			      0,
+			      NULL, NULL,
+			      _soup_marshal_NONE__NONE,
+			      G_TYPE_NONE, 0);
+
+	g_object_class_install_property (
+		object_class, PROP_MESSAGE,
+		g_param_spec_object ("message",
+				     "Message",
+				     "Message",
+				     SOUP_TYPE_MESSAGE,
+				     G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY));
+}
+
+static void
+soup_client_input_stream_pollable_init (GPollableInputStreamInterface *pollable_interface,
+					gpointer interface_data)
+{
+	soup_client_input_stream_parent_pollable_interface =
+		g_type_interface_peek_parent (pollable_interface);
+
+	pollable_interface->read_nonblocking = soup_client_input_stream_read_nonblocking;
+}
+
+GInputStream *
+soup_client_input_stream_new (GInputStream *base_stream,
+			      SoupMessage  *msg)
+{
+	return g_object_new (SOUP_TYPE_CLIENT_INPUT_STREAM,
+			     "base-stream", base_stream,
+			     "message", msg,
+			     NULL);
+}
diff --git a/libsoup/soup-client-input-stream.h b/libsoup/soup-client-input-stream.h
new file mode 100644
index 0000000..098c607
--- /dev/null
+++ b/libsoup/soup-client-input-stream.h
@@ -0,0 +1,46 @@
+/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
+/*
+ * Copyright 2010-2012 Red Hat, Inc.
+ */
+
+#ifndef SOUP_CLIENT_INPUT_STREAM_H
+#define SOUP_CLIENT_INPUT_STREAM_H 1
+
+#include "soup-types.h"
+#include "soup-filter-input-stream.h"
+
+G_BEGIN_DECLS
+
+#define SOUP_TYPE_CLIENT_INPUT_STREAM            (soup_client_input_stream_get_type ())
+#define SOUP_CLIENT_INPUT_STREAM(obj)            (G_TYPE_CHECK_INSTANCE_CAST ((obj), SOUP_TYPE_CLIENT_INPUT_STREAM, SoupClientInputStream))
+#define SOUP_CLIENT_INPUT_STREAM_CLASS(klass)    (G_TYPE_CHECK_CLASS_CAST ((klass), SOUP_TYPE_CLIENT_INPUT_STREAM, SoupClientInputStreamClass))
+#define SOUP_IS_CLIENT_INPUT_STREAM(obj)         (G_TYPE_CHECK_INSTANCE_TYPE ((obj), SOUP_TYPE_CLIENT_INPUT_STREAM))
+#define SOUP_IS_CLIENT_INPUT_STREAM_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE ((obj), SOUP_TYPE_CLIENT_INPUT_STREAM))
+#define SOUP_CLIENT_INPUT_STREAM_GET_CLASS(obj)  (G_TYPE_INSTANCE_GET_CLASS ((obj), SOUP_TYPE_CLIENT_INPUT_STREAM, SoupClientInputStreamClass))
+
+typedef struct _SoupClientInputStreamPrivate SoupClientInputStreamPrivate;
+
+typedef struct {
+	SoupFilterInputStream parent;
+
+	SoupClientInputStreamPrivate *priv;
+} SoupClientInputStream;
+
+typedef struct {
+	SoupFilterInputStreamClass parent_class;
+
+	/* Padding for future expansion */
+	void (*_libsoup_reserved1) (void);
+	void (*_libsoup_reserved2) (void);
+	void (*_libsoup_reserved3) (void);
+	void (*_libsoup_reserved4) (void);
+} SoupClientInputStreamClass;
+
+GType soup_client_input_stream_get_type (void);
+
+GInputStream *soup_client_input_stream_new (GInputStream *base_stream,
+					    SoupMessage  *msg);
+
+G_END_DECLS
+
+#endif /* SOUP_CLIENT_INPUT_STREAM_H */
diff --git a/libsoup/soup-message-io.c b/libsoup/soup-message-io.c
index e71a8ad..4e5c35f 100644
--- a/libsoup/soup-message-io.c
+++ b/libsoup/soup-message-io.c
@@ -12,8 +12,11 @@
 #include <stdlib.h>
 #include <string.h>
 
+#include <glib/gi18n-lib.h>
+
 #include "soup-body-input-stream.h"
 #include "soup-body-output-stream.h"
+#include "soup-client-input-stream.h"
 #include "soup-connection.h"
 #include "soup-content-sniffer-stream.h"
 #include "soup-converter-wrapper.h"
@@ -31,6 +34,7 @@ typedef enum {
 
 typedef enum {
 	SOUP_MESSAGE_IO_STATE_NOT_STARTED,
+	SOUP_MESSAGE_IO_STATE_ANY = SOUP_MESSAGE_IO_STATE_NOT_STARTED,
 	SOUP_MESSAGE_IO_STATE_HEADERS,
 	SOUP_MESSAGE_IO_STATE_BLOCKING,
 	SOUP_MESSAGE_IO_STATE_BODY_START,
@@ -45,6 +49,9 @@ typedef enum {
 	(state != SOUP_MESSAGE_IO_STATE_NOT_STARTED && \
 	 state != SOUP_MESSAGE_IO_STATE_BLOCKING && \
 	 state != SOUP_MESSAGE_IO_STATE_DONE)
+#define SOUP_MESSAGE_IO_STATE_POLLABLE(state) \
+	(SOUP_MESSAGE_IO_STATE_ACTIVE (state) && \
+	 state != SOUP_MESSAGE_IO_STATE_BODY_DONE)
 
 typedef struct {
 	SoupMessageQueueItem *item;
@@ -136,11 +143,13 @@ soup_message_io_stop (SoupMessage *msg)
 
 	if (io->io_source) {
 		g_source_destroy (io->io_source);
+		g_source_unref (io->io_source);
 		io->io_source = NULL;
 	}
 
 	if (io->unpause_source) {
 		g_source_destroy (io->unpause_source);
+		g_source_unref (io->unpause_source);
 		io->unpause_source = NULL;
 	}
 
@@ -215,8 +224,12 @@ read_headers (SoupMessage *msg, GCancellable *cancellable, GError **error)
 							    &got_lf,
 							    cancellable, error);
 		io->read_header_buf->len = old_len + MAX (nread, 0);
-		if (nread == 0)
-			io_error (io->sock, msg, NULL);
+		if (nread == 0) {
+			soup_message_set_status (msg, SOUP_STATUS_MALFORMED);
+			g_set_error_literal (error, G_IO_ERROR,
+					     G_IO_ERROR_PARTIAL_INPUT,
+					     _("Connection terminated unexpectedly"));
+		}
 		if (nread <= 0)
 			return FALSE;
 
@@ -257,9 +270,10 @@ setup_body_istream (SoupMessage *msg)
 	GInputStream *filter;
 	GSList *d;
 
-	io->body_istream = soup_body_input_stream_new (io->istream,
-						       io->read_encoding,
-						       io->read_length);
+	io->body_istream =
+		soup_body_input_stream_new (io->istream,
+					    io->read_encoding,
+					    io->read_length);
 
 	for (d = priv->decoders; d; d = d->next) {
 		decoder = d->data;
@@ -321,11 +335,6 @@ io_write (SoupMessage *msg, GCancellable *cancellable, GError **error)
 	gssize nwrote;
 
 	switch (io->write_state) {
-	case SOUP_MESSAGE_IO_STATE_NOT_STARTED:
-	case SOUP_MESSAGE_IO_STATE_BLOCKING:
-		return FALSE;
-
-
 	case SOUP_MESSAGE_IO_STATE_HEADERS:
 		if (!io->write_buf->len) {
 			io->get_headers_cb (msg, io->write_buf,
@@ -416,6 +425,7 @@ io_write (SoupMessage *msg, GCancellable *cancellable, GError **error)
 		if (!io->write_chunk) {
 			io->write_chunk = soup_message_body_get_chunk (io->write_body, io->write_body_offset);
 			if (!io->write_chunk) {
+				g_return_val_if_fail (!io->item || !io->item->new_api, FALSE);
 				soup_message_io_pause (msg);
 				return FALSE;
 			}
@@ -486,7 +496,6 @@ io_write (SoupMessage *msg, GCancellable *cancellable, GError **error)
 		break;
 
 
-	case SOUP_MESSAGE_IO_STATE_DONE:
 	default:
 		g_return_val_if_reached (FALSE);
 	}
@@ -511,11 +520,6 @@ io_read (SoupMessage *msg, GCancellable *cancellable, GError **error)
 	guint status;
 
 	switch (io->read_state) {
-	case SOUP_MESSAGE_IO_STATE_NOT_STARTED:
-	case SOUP_MESSAGE_IO_STATE_BLOCKING:
-		return FALSE;
-
-
 	case SOUP_MESSAGE_IO_STATE_HEADERS:
 		if (!read_headers (msg, cancellable, error))
 			return FALSE;
@@ -628,6 +632,7 @@ io_read (SoupMessage *msg, GCancellable *cancellable, GError **error)
 		if (priv->chunk_allocator) {
 			buffer = priv->chunk_allocator (msg, io->read_length, priv->chunk_allocator_data);
 			if (!buffer) {
+				g_return_val_if_fail (!io->item || !io->item->new_api, FALSE);
 				soup_message_io_pause (msg);
 				return FALSE;
 			}
@@ -675,7 +680,6 @@ io_read (SoupMessage *msg, GCancellable *cancellable, GError **error)
 		break;
 
 
-	case SOUP_MESSAGE_IO_STATE_DONE:
 	default:
 		g_return_val_if_reached (FALSE);
 	}
@@ -683,43 +687,160 @@ io_read (SoupMessage *msg, GCancellable *cancellable, GError **error)
 	return TRUE;
 }
 
-static GSource *
+typedef struct {
+	GSource source;
+	SoupMessage *msg;
+} SoupMessageSource;
+
+static gboolean
+message_source_prepare (GSource *source,
+			gint    *timeout)
+{
+	*timeout = -1;
+	return FALSE;
+}
+
+static gboolean
+message_source_check (GSource *source)
+{
+	return FALSE;
+}
+
+static gboolean
+message_source_dispatch (GSource     *source,
+			 GSourceFunc  callback,
+			 gpointer     user_data)
+{
+  SoupMessageSourceFunc func = (SoupMessageSourceFunc)callback;
+  SoupMessageSource *message_source = (SoupMessageSource *)source;
+
+  return (*func) (message_source->msg, user_data);
+}
+
+static void
+message_source_finalize (GSource *source)
+{
+  SoupMessageSource *message_source = (SoupMessageSource *)source;
+
+  g_object_unref (message_source->msg);
+}
+
+static gboolean
+message_source_closure_callback (SoupMessage *msg,
+				 gpointer     data)
+{
+  GClosure *closure = data;
+
+  GValue param = G_VALUE_INIT;
+  GValue result_value = G_VALUE_INIT;
+  gboolean result;
+
+  g_value_init (&result_value, G_TYPE_BOOLEAN);
+
+  g_value_init (&param, SOUP_TYPE_MESSAGE);
+  g_value_set_object (&param, msg);
+
+  g_closure_invoke (closure, &result_value, 1, &param, NULL);
+
+  result = g_value_get_boolean (&result_value);
+  g_value_unset (&result_value);
+  g_value_unset (&param);
+
+  return result;
+}
+
+static GSourceFuncs message_source_funcs =
+{
+  message_source_prepare,
+  message_source_check,
+  message_source_dispatch,
+  message_source_finalize,
+  (GSourceFunc)message_source_closure_callback,
+  (GSourceDummyMarshal)g_cclosure_marshal_generic,
+};
+
+GSource *
 soup_message_io_get_source (SoupMessage *msg, GCancellable *cancellable,
-			    GSourceFunc callback, gpointer user_data)
+			    SoupMessageSourceFunc callback, gpointer user_data)
 {
 	SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
 	SoupMessageIOData *io = priv->io_data;
-	GSource *source;
-
-	if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->read_state)) {
-		source = g_pollable_input_stream_create_source (
-			G_POLLABLE_INPUT_STREAM (io->istream), cancellable);
-	} else if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->write_state)) {
-		source = g_pollable_output_stream_create_source (
-			G_POLLABLE_OUTPUT_STREAM (io->ostream), cancellable);
+	GSource *base_source, *source;
+	SoupMessageSource *message_source;
+
+	if (io && SOUP_MESSAGE_IO_STATE_POLLABLE (io->read_state)) {
+		GPollableInputStream *istream;
+
+		if (io->body_istream)
+			istream = G_POLLABLE_INPUT_STREAM (io->body_istream);
+		else
+			istream = G_POLLABLE_INPUT_STREAM (io->istream);
+		base_source = g_pollable_input_stream_create_source (istream, cancellable);
+	} else if (io && SOUP_MESSAGE_IO_STATE_POLLABLE (io->write_state)) {
+		GPollableOutputStream *ostream;
+
+		if (io->body_ostream)
+			ostream = G_POLLABLE_OUTPUT_STREAM (io->body_ostream);
+		else
+			ostream = G_POLLABLE_OUTPUT_STREAM (io->ostream);
+		base_source = g_pollable_output_stream_create_source (ostream, cancellable);
 	} else
-		g_return_val_if_reached (NULL);
-
-	g_source_set_callback (source, callback, user_data, NULL);
+		base_source = g_timeout_source_new (0);
+
+	g_source_set_dummy_callback (base_source);
+	source = g_source_new (&message_source_funcs,
+			       sizeof (SoupMessageSource));
+	g_source_set_name (source, "SoupMessageSource");
+	message_source = (SoupMessageSource *)source;
+	message_source->msg = g_object_ref (msg);
+
+	g_source_add_child_source (source, base_source);
+	g_source_unref (base_source);
+	g_source_set_callback (source, (GSourceFunc) callback, user_data, NULL);
 	return source;
 }
 
-static gboolean io_run (GObject *stream, SoupMessage *msg);
-
-static void
-setup_io_source (SoupMessage *msg)
+static gboolean
+io_run_until (SoupMessage *msg,
+	      SoupMessageIOState read_state, SoupMessageIOState write_state,
+	      GCancellable *cancellable, GError **error)
 {
 	SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
 	SoupMessageIOData *io = priv->io_data;
+	gboolean progress = TRUE, done;
+
+	if (g_cancellable_set_error_if_cancelled (cancellable, error))
+		return FALSE;
+	else if (!io) {
+		g_set_error_literal (error, G_IO_ERROR,
+				     G_IO_ERROR_CANCELLED,
+				     _("Operation was cancelled"));
+		return FALSE;
+	}
+
+	g_object_ref (msg);
 
-	io->io_source = soup_message_io_get_source (msg, NULL,
-						    (GSourceFunc)io_run, msg);
-	g_source_attach (io->io_source, io->async_context);
-	g_source_unref (io->io_source);
+	while (progress && priv->io_data == io && !io->paused &&
+	       (io->read_state < read_state || io->write_state < write_state)) {
+
+		if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->read_state))
+			progress = io_read (msg, cancellable, error);
+		else if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->write_state))
+			progress = io_write (msg, cancellable, error);
+		else
+			progress = FALSE;
+	}
+
+	done = (priv->io_data == io &&
+		io->read_state >= read_state &&
+		io->write_state >= write_state);
+
+	g_object_unref (msg);
+	return done;
 }
 
 static gboolean
-io_run (GObject *stream, SoupMessage *msg)
+io_run (SoupMessage *msg, gpointer user_data)
 {
 	SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
 	SoupMessageIOData *io = priv->io_data;
@@ -727,36 +848,99 @@ io_run (GObject *stream, SoupMessage *msg)
 
 	if (io->io_source) {
 		g_source_destroy (io->io_source);
+		g_source_unref (io->io_source);
 		io->io_source = NULL;
 	}
 
 	g_object_ref (msg);
 
-	while (priv->io_data == io && !io->paused) {
-		gboolean progress = FALSE;
+	if (io_run_until (msg,
+			  SOUP_MESSAGE_IO_STATE_DONE,
+			  SOUP_MESSAGE_IO_STATE_DONE,
+			  io->cancellable, &error)) {
+		soup_message_io_finished (msg);
+	} else if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
+		g_clear_error (&error);
+		io->io_source = soup_message_io_get_source (msg, NULL, io_run, msg);
+		g_source_attach (io->io_source, io->async_context);
+	} else if (error) {
+		io_error (io->sock, msg, error);
+	}
 
-		if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->read_state))
-			progress = io_read (msg, io->cancellable, &error);
-		else if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->write_state))
-			progress = io_write (msg, io->cancellable, &error);
+	g_object_unref (msg);
+	return FALSE;
+}
 
-		if (!progress)
-			break;
-	}
+gboolean
+soup_message_io_run_until_write (SoupMessage *msg,
+				 GCancellable *cancellable, GError **error)
+{
+	return io_run_until (msg,
+			     SOUP_MESSAGE_IO_STATE_ANY,
+			     SOUP_MESSAGE_IO_STATE_BODY,
+			     cancellable, error);
+}
 
-	if (error) {
-		if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
-			g_clear_error (&error);
-			setup_io_source (msg);
-		} else
-			io_error (io->sock, msg, error);
-	} else if (priv->io_data == io &&
-		   io->read_state == SOUP_MESSAGE_IO_STATE_DONE &&
-		   io->write_state == SOUP_MESSAGE_IO_STATE_DONE)
-		soup_message_io_finished (msg);
+gboolean
+soup_message_io_run_until_read (SoupMessage *msg,
+				GCancellable *cancellable, GError **error)
+{
+	return io_run_until (msg,
+			     SOUP_MESSAGE_IO_STATE_BODY,
+			     SOUP_MESSAGE_IO_STATE_ANY,
+			     cancellable, error);
+}
 
+gboolean
+soup_message_io_run_until_finish (SoupMessage   *msg,
+				  GCancellable  *cancellable,
+				  GError       **error)
+{
+	g_object_ref (msg);
+
+	if (!io_run_until (msg,
+			   SOUP_MESSAGE_IO_STATE_DONE,
+			   SOUP_MESSAGE_IO_STATE_DONE,
+			   cancellable, error))
+		return FALSE;
+
+	soup_message_io_finished (msg);
 	g_object_unref (msg);
-	return FALSE;
+	return TRUE;
+}
+
+static void
+client_stream_eof (SoupClientInputStream *stream, gpointer user_data)
+{
+	SoupMessage *msg = user_data;
+	SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
+	SoupMessageIOData *io = priv->io_data;
+
+	if (io && io->read_state == SOUP_MESSAGE_IO_STATE_BODY)
+		io->read_state = SOUP_MESSAGE_IO_STATE_BODY_DONE;
+}
+
+GInputStream *
+soup_message_io_get_response_istream (SoupMessage  *msg,
+				      GError      **error)
+{
+	SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
+	SoupMessageIOData *io = priv->io_data;
+	GInputStream *client_stream;
+
+	g_return_val_if_fail (io->mode == SOUP_MESSAGE_IO_CLIENT, NULL);
+
+	if (SOUP_STATUS_IS_TRANSPORT_ERROR (msg->status_code)) {
+		g_set_error_literal (error, SOUP_HTTP_ERROR,
+				     msg->status_code, msg->reason_phrase);
+		return NULL;
+	}
+
+	client_stream = soup_client_input_stream_new (io->body_istream, msg);
+	g_signal_connect (client_stream, "eof",
+			  G_CALLBACK (client_stream_eof), msg);
+
+	return client_stream;
 }
 
 
@@ -839,7 +1023,8 @@ soup_message_io_client (SoupMessageQueueItem *item,
 	io->write_body      = item->msg->request_body;
 
 	io->write_state     = SOUP_MESSAGE_IO_STATE_HEADERS;
-	io_run (NULL, item->msg);
+	if (!item->new_api)
+		io_run (item->msg, NULL);
 }
 
 void
@@ -860,7 +1045,7 @@ soup_message_io_server (SoupMessage *msg, SoupSocket *sock,
 	io->write_body      = msg->response_body;
 
 	io->read_state      = SOUP_MESSAGE_IO_STATE_HEADERS;
-	io_run (NULL, msg);
+	io_run (msg, NULL);
 }
 
 void  
@@ -873,6 +1058,7 @@ soup_message_io_pause (SoupMessage *msg)
 
 	if (io->io_source) {
 		g_source_destroy (io->io_source);
+		g_source_unref (io->io_source);
 		io->io_source = NULL;
 	}
 
@@ -897,7 +1083,7 @@ io_unpause_internal (gpointer msg)
 	if (io->io_source)
 		return FALSE;
 
-	io_run (NULL, msg);
+	io_run (msg, NULL);
 	return FALSE;
 }
 
diff --git a/libsoup/soup-message-private.h b/libsoup/soup-message-private.h
index 5625354..0d34833 100644
--- a/libsoup/soup-message-private.h
+++ b/libsoup/soup-message-private.h
@@ -94,6 +94,25 @@ void                soup_message_io_pause       (SoupMessage          *msg);
 void                soup_message_io_unpause     (SoupMessage          *msg);
 gboolean            soup_message_io_in_progress (SoupMessage          *msg);
 
+gboolean soup_message_io_run_until_write  (SoupMessage   *msg,
+					   GCancellable  *cancellable,
+					   GError       **error);
+gboolean soup_message_io_run_until_read   (SoupMessage   *msg,
+					   GCancellable  *cancellable,
+					   GError       **error);
+gboolean soup_message_io_run_until_finish (SoupMessage   *msg,
+					   GCancellable  *cancellable,
+					   GError       **error);
+
+typedef gboolean (*SoupMessageSourceFunc) (SoupMessage *, gpointer);
+GSource *soup_message_io_get_source       (SoupMessage           *msg,
+					   GCancellable          *cancellable,
+					   SoupMessageSourceFunc  callback,
+					   gpointer               user_data);
+
+GInputStream *soup_message_io_get_response_istream (SoupMessage  *msg,
+						    GError      **error);
+
 gboolean soup_message_disables_feature (SoupMessage *msg,
 					gpointer     feature);
 
diff --git a/libsoup/soup-message-queue.c b/libsoup/soup-message-queue.c
index 7b1e5dd..9715080 100644
--- a/libsoup/soup-message-queue.c
+++ b/libsoup/soup-message-queue.c
@@ -184,6 +184,8 @@ soup_message_queue_item_unref (SoupMessageQueueItem *item)
 		g_object_unref (item->proxy_addr);
 	if (item->proxy_uri)
 		soup_uri_free (item->proxy_uri);
+	if (item->result)
+		g_object_unref (item->result);
 	soup_message_queue_item_set_connection (item, NULL);
 	g_slice_free (SoupMessageQueueItem, item);
 }
diff --git a/libsoup/soup-message-queue.h b/libsoup/soup-message-queue.h
index a1ae663..a9242a1 100644
--- a/libsoup/soup-message-queue.h
+++ b/libsoup/soup-message-queue.h
@@ -45,8 +45,10 @@ struct _SoupMessageQueueItem {
 	SoupAddress *proxy_addr;
 	SoupURI *proxy_uri;
 	SoupConnection *conn;
+	GSimpleAsyncResult *result;
 
 	guint paused            : 1;
+	guint new_api           : 1;
 	guint redirection_count : 31;
 
 	SoupMessageQueueItemState state;
diff --git a/libsoup/soup-request-http.c b/libsoup/soup-request-http.c
index 89547e1..264bda0 100644
--- a/libsoup/soup-request-http.c
+++ b/libsoup/soup-request-http.c
@@ -32,9 +32,9 @@
 #include "soup-request-http.h"
 #include "soup-cache.h"
 #include "soup-cache-private.h"
-#include "soup-http-input-stream.h"
 #include "soup-message.h"
 #include "soup-session.h"
+#include "soup-session-private.h"
 #include "soup-uri.h"
 
 G_DEFINE_TYPE (SoupRequestHTTP, soup_request_http, SOUP_TYPE_REQUEST)
@@ -44,6 +44,11 @@ struct _SoupRequestHTTPPrivate {
 	char *content_type;
 };
 
+static void content_sniffed (SoupMessage *msg,
+			     const char  *content_type,
+			     GHashTable  *params,
+			     gpointer     user_data);
+
 static void
 soup_request_http_init (SoupRequestHTTP *http)
 {
@@ -61,6 +66,8 @@ soup_request_http_check_uri (SoupRequest  *request,
 		return FALSE;
 
 	http->priv->msg = soup_message_new_from_uri (SOUP_METHOD_GET, uri);
+	g_signal_connect (http->priv->msg, "content-sniffed",
+			  G_CALLBACK (content_sniffed), http);
 	return TRUE;
 }
 
@@ -69,8 +76,12 @@ soup_request_http_finalize (GObject *object)
 {
 	SoupRequestHTTP *http = SOUP_REQUEST_HTTP (object);
 
-	if (http->priv->msg)
+	if (http->priv->msg) {
+		g_signal_handlers_disconnect_by_func (http->priv->msg,
+						      G_CALLBACK (content_sniffed),
+						      http);
 		g_object_unref (http->priv->msg);
+	}
 
 	g_free (http->priv->content_type);
 
@@ -82,17 +93,11 @@ soup_request_http_send (SoupRequest          *request,
 			GCancellable         *cancellable,
 			GError              **error)
 {
-	GInputStream *httpstream;
 	SoupRequestHTTP *http = SOUP_REQUEST_HTTP (request);
 
-	httpstream = soup_http_input_stream_new (soup_request_get_session (request), http->priv->msg);
-	if (!soup_http_input_stream_send (SOUP_HTTP_INPUT_STREAM (httpstream),
-					  cancellable, error)) {
-		g_object_unref (httpstream);
-		return NULL;
-	}
-	http->priv->content_type = g_strdup (soup_http_input_stream_get_content_type (SOUP_HTTP_INPUT_STREAM (httpstream)));
-	return httpstream;
+	return soup_session_send_request (soup_request_get_session (request),
+					  http->priv->msg,
+					  cancellable, error);
 }
 
 
@@ -124,16 +129,15 @@ free_send_async_data (SendAsyncData *sadata)
 static void
 http_input_stream_ready_cb (GObject *source, GAsyncResult *result, gpointer user_data)
 {
-	SoupHTTPInputStream *httpstream = SOUP_HTTP_INPUT_STREAM (source);
 	SendAsyncData *sadata = user_data;
 	GError *error = NULL;
+	GInputStream *stream;
 
-	if (soup_http_input_stream_send_finish (httpstream, result, &error)) {
-		sadata->http->priv->content_type = g_strdup (soup_http_input_stream_get_content_type (httpstream));
-		g_simple_async_result_set_op_res_gpointer (sadata->simple, httpstream, g_object_unref);
+	stream = soup_session_send_request_finish (SOUP_SESSION (source), result, &error);
+	if (stream) {
+		g_simple_async_result_set_op_res_gpointer (sadata->simple, stream, g_object_unref);
 	} else {
 		g_simple_async_result_take_error (sadata->simple, error);
-		g_object_unref (httpstream);
 	}
 	g_simple_async_result_complete (sadata->simple);
 	free_send_async_data (sadata);
@@ -171,9 +175,8 @@ conditional_get_ready_cb (SoupSession *session, SoupMessage *msg, gpointer user_
 	/* The resource was modified, or else it mysteriously disappeared
 	 * from our cache. Either way we need to reload it now.
 	 */
-	stream = soup_http_input_stream_new (session, sadata->original);
-	soup_http_input_stream_send_async (SOUP_HTTP_INPUT_STREAM (stream), G_PRIORITY_DEFAULT,
-					   sadata->cancellable, http_input_stream_ready_cb, sadata);
+	soup_session_send_request_async (session, msg, sadata->cancellable,
+					 http_input_stream_ready_cb, sadata);
 }
 
 static gboolean
@@ -253,10 +256,8 @@ soup_request_http_send_async (SoupRequest          *request,
 		}
 	}
 
-	stream = soup_http_input_stream_new (session, http->priv->msg);
-	soup_http_input_stream_send_async (SOUP_HTTP_INPUT_STREAM (stream),
-					   G_PRIORITY_DEFAULT, cancellable,
-					   http_input_stream_ready_cb, sadata);
+	soup_session_send_request_async (session, http->priv->msg, cancellable,
+					 http_input_stream_ready_cb, sadata);
 }
 
 static GInputStream *
@@ -282,6 +283,30 @@ soup_request_http_get_content_length (SoupRequest *request)
 	return soup_message_headers_get_content_length (http->priv->msg->response_headers);
 }
 
+static void
+content_sniffed (SoupMessage *msg,
+		 const char  *content_type,
+		 GHashTable  *params,
+		 gpointer     user_data)
+{
+	SoupRequestHTTP *http = user_data;
+	GString *sniffed_type;
+
+	sniffed_type = g_string_new (content_type);
+	if (params) {
+		GHashTableIter iter;
+		gpointer key, value;
+
+		g_hash_table_iter_init (&iter, params);
+		while (g_hash_table_iter_next (&iter, &key, &value)) {
+			g_string_append (sniffed_type, "; ");
+			soup_header_g_string_append_param (sniffed_type, key, value);
+		}
+	}
+	g_free (http->priv->content_type);
+	http->priv->content_type = g_string_free (sniffed_type, FALSE);
+}
+
 static const char *
 soup_request_http_get_content_type (SoupRequest *request)
 {
diff --git a/libsoup/soup-session-async.c b/libsoup/soup-session-async.c
index a58439e..90526dc 100644
--- a/libsoup/soup-session-async.c
+++ b/libsoup/soup-session-async.c
@@ -34,6 +34,10 @@
 static void run_queue (SoupSessionAsync *sa);
 static void do_idle_run_queue (SoupSession *session);
 
+static void send_request_running   (SoupSession *session, SoupMessageQueueItem *item);
+static void send_request_restarted (SoupSession *session, SoupMessageQueueItem *item);
+static void send_request_finished  (SoupSession *session, SoupMessageQueueItem *item);
+
 static void  queue_message   (SoupSession *session, SoupMessage *req,
 			      SoupSessionCallback callback, gpointer user_data);
 static guint send_message    (SoupSession *session, SoupMessage *req);
@@ -225,9 +229,10 @@ message_completed (SoupMessage *msg, gpointer user_data)
 {
 	SoupMessageQueueItem *item = user_data;
 
+	do_idle_run_queue (item->session);
+
 	if (item->state != SOUP_MESSAGE_RESTARTING)
 		item->state = SOUP_MESSAGE_FINISHING;
-	do_idle_run_queue (item->session);
 }
 
 static void
@@ -403,11 +408,15 @@ process_queue_item (SoupMessageQueueItem *item,
 		case SOUP_MESSAGE_READY:
 			item->state = SOUP_MESSAGE_RUNNING;
 			soup_session_send_queue_item (session, item, message_completed);
+			if (item->new_api)
+				send_request_running (session, item);
 			break;
 
 		case SOUP_MESSAGE_RESTARTING:
 			item->state = SOUP_MESSAGE_STARTING;
 			soup_message_restarted (item->msg);
+			if (item->new_api)
+				send_request_restarted (session, item);
 			break;
 
 		case SOUP_MESSAGE_FINISHING:
@@ -420,6 +429,8 @@ process_queue_item (SoupMessageQueueItem *item,
 			soup_session_unqueue_item (session, item);
 			if (item->callback)
 				item->callback (session, item->msg, item->callback_data);
+			else if (item->new_api)
+				send_request_finished (session, item);
 			g_object_unref (item->msg);
 			do_idle_run_queue (session);
 			g_object_unref (session);
@@ -611,3 +622,229 @@ kick (SoupSession *session)
 {
 	do_idle_run_queue (session);
 }
+
+
+static void
+send_request_return_result (SoupMessageQueueItem *item,
+			    gpointer stream, GError *error)
+{
+	GSimpleAsyncResult *simple;
+
+	simple = item->result;
+	item->result = NULL;
+
+	if (error)
+		g_simple_async_result_take_error (simple, error);
+	else if (SOUP_STATUS_IS_TRANSPORT_ERROR (item->msg->status_code)) {
+		if (stream)
+			g_object_unref (stream);
+		g_simple_async_result_set_error (simple,
+						 SOUP_HTTP_ERROR,
+						 item->msg->status_code,
+						 "%s",
+						 item->msg->reason_phrase);
+	} else
+		g_simple_async_result_set_op_res_gpointer (simple, stream, g_object_unref);
+
+	g_simple_async_result_complete (simple);
+	g_object_unref (simple);
+}
+
+static void
+send_request_restarted (SoupSession *session, SoupMessageQueueItem *item)
+{
+	/* We won't be needing this, then. */
+	g_object_set_data (G_OBJECT (item->msg), "SoupSessionAsync:ostream", NULL);
+}
+
+static void
+send_request_finished (SoupSession *session, SoupMessageQueueItem *item)
+{
+	GMemoryOutputStream *mostream;
+	GInputStream *istream = NULL;
+
+	if (!item->result) {
+		/* Something else already took care of it. */
+		return;
+	}
+
+	mostream = g_object_get_data (G_OBJECT (item->msg), "SoupSessionAsync:ostream");
+	if (mostream && !SOUP_STATUS_IS_TRANSPORT_ERROR (item->msg->status_code)) {
+		gpointer data;
+		gssize size;
+
+		/* We thought it would be requeued, but it wasn't, so
+		 * return the original body.
+		 */
+		size = g_memory_output_stream_get_data_size (mostream);
+		data = size ? g_memory_output_stream_steal_data (mostream) : g_strdup ("");
+		istream = g_memory_input_stream_new_from_data (data, size, g_free);
+	}
+
+	send_request_return_result (item, istream, NULL);
+}
+
+static void
+send_async_spliced (GObject *source, GAsyncResult *result, gpointer user_data)
+{
+	SoupMessageQueueItem *item = user_data;
+	GInputStream *istream = g_object_get_data (source, "istream");
+
+	GError *error = NULL;
+
+	/* If the message was cancelled, it will be completed via other means */
+	if (g_cancellable_is_cancelled (item->cancellable) ||
+	    !item->result) {
+		soup_message_queue_item_unref (item);
+		return;
+	}
+
+	if (g_output_stream_splice_finish (G_OUTPUT_STREAM (source),
+					   result, &error) == -1) {
+		send_request_return_result (item, NULL, error);
+		return;
+	}
+
+	/* Otherwise either restarted or finished will eventually be called.
+	 * It should be safe to call the sync close() method here since
+	 * the message body has already been written.
+	 */
+	g_input_stream_close (istream, NULL, NULL);
+	do_idle_run_queue (item->session);
+	soup_message_queue_item_unref (item);
+}
+
+static void
+send_async_maybe_complete (SoupMessageQueueItem *item,
+			   GInputStream         *stream)
+{
+	if (item->msg->status_code == SOUP_STATUS_UNAUTHORIZED ||
+	    item->msg->status_code == SOUP_STATUS_PROXY_UNAUTHORIZED ||
+	    soup_session_would_redirect (item->session, item->msg)) {
+		GOutputStream *ostream;
+
+		/* Message may be requeued, so gather the current message body... */
+		ostream = g_memory_output_stream_new (NULL, 0, g_realloc, g_free);
+		g_object_set_data_full (G_OBJECT (item->msg), "SoupSessionAsync:ostream",
+					ostream, g_object_unref);
+
+		g_object_set_data_full (G_OBJECT (ostream), "istream",
+					stream, g_object_unref);
+
+		/* Give the splice op its own ref on item */
+		soup_message_queue_item_ref (item);
+		g_output_stream_splice_async (ostream, stream,
+					      /* We can't use CLOSE_SOURCE because it
+					       * might get closed in the wrong thread.
+					       */
+					      G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET,
+					      G_PRIORITY_DEFAULT,
+					      item->cancellable,
+					      send_async_spliced, item);
+		return;
+	}
+
+	send_request_return_result (item, stream, NULL);
+}
+
+static void try_run_until_read (SoupMessageQueueItem *item);
+
+static gboolean
+read_ready_cb (SoupMessage *msg, gpointer user_data)
+{
+	SoupMessageQueueItem *item = user_data;
+	GError *error = NULL;
+
+	if (g_cancellable_set_error_if_cancelled (item->cancellable, &error)) {
+		send_request_return_result (item, NULL, error);
+		return FALSE;
+	}
+
+	try_run_until_read (item);
+	return FALSE;
+}
+
+static void
+try_run_until_read (SoupMessageQueueItem *item)
+{
+	GError *error = NULL;
+	GInputStream *stream = NULL;
+	GSource *source;
+
+	if (soup_message_io_run_until_read (item->msg, item->cancellable, &error))
+		stream = soup_message_io_get_response_istream (item->msg, &error);
+	if (stream) {
+		send_async_maybe_complete (item, stream);
+		return;
+	}
+
+	if (!g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
+		send_request_return_result (item, NULL, error);
+		return;
+	}
+
+	g_clear_error (&error);
+	source = soup_message_io_get_source (item->msg, item->cancellable,
+					     read_ready_cb, item);
+	g_source_attach (source, soup_session_get_async_context (item->session));
+	g_source_unref (source);
+}
+
+static void
+send_request_running (SoupSession *session, SoupMessageQueueItem *item)
+{
+	try_run_until_read (item);
+}
+
+void
+soup_session_send_request_async (SoupSession         *session,
+				 SoupMessage         *msg,
+				 GCancellable        *cancellable,
+				 GAsyncReadyCallback  callback,
+				 gpointer             user_data)
+{
+	SoupMessageQueueItem *item;
+	gboolean use_thread_context;
+
+	g_return_if_fail (SOUP_IS_SESSION_ASYNC (session));
+
+	g_object_get (G_OBJECT (session),
+		      SOUP_SESSION_USE_THREAD_CONTEXT, &use_thread_context,
+		      NULL);
+	g_return_if_fail (use_thread_context);
+
+	/* Balance out the unref that queuing will eventually do */
+	g_object_ref (msg);
+
+	queue_message (session, msg, NULL, NULL);
+
+	item = soup_message_queue_lookup (soup_session_get_queue (session), msg);
+	g_return_if_fail (item != NULL);
+
+	item->new_api = TRUE;
+	item->result = g_simple_async_result_new (G_OBJECT (session),
+						  callback, user_data,
+						  soup_session_send_request_async);
+	g_simple_async_result_set_op_res_gpointer (item->result, item, (GDestroyNotify) soup_message_queue_item_unref);
+
+	if (cancellable) {
+		g_object_unref (item->cancellable);
+		item->cancellable = g_object_ref (cancellable);
+	}
+}
+
+GInputStream *
+soup_session_send_request_finish (SoupSession   *session,
+				  GAsyncResult  *result,
+				  GError       **error)
+{
+	GSimpleAsyncResult *simple;
+
+	g_return_val_if_fail (SOUP_IS_SESSION_ASYNC (session), NULL);
+	g_return_val_if_fail (g_simple_async_result_is_valid (result, G_OBJECT (session), soup_session_send_request_async), NULL);
+
+	simple = G_SIMPLE_ASYNC_RESULT (result);
+	if (g_simple_async_result_propagate_error (simple, error))
+		return NULL;
+	return g_object_ref (g_simple_async_result_get_op_res_gpointer (simple));
+}
diff --git a/libsoup/soup-session-private.h b/libsoup/soup-session-private.h
index 7462c61..a72fb2b 100644
--- a/libsoup/soup-session-private.h
+++ b/libsoup/soup-session-private.h
@@ -31,6 +31,20 @@ void                  soup_session_set_item_status      (SoupSession          *s
 							 SoupMessageQueueItem *item,
 							 guint                 status_code);
 
+GInputStream         *soup_session_send_request         (SoupSession          *session,
+							 SoupMessage          *msg,
+							 GCancellable         *cancellable,
+							 GError              **error);
+
+void                  soup_session_send_request_async   (SoupSession          *session,
+							 SoupMessage          *msg,
+							 GCancellable         *cancellable,
+							 GAsyncReadyCallback   callback,
+							 gpointer              user_data);
+GInputStream         *soup_session_send_request_finish  (SoupSession          *session,
+							 GAsyncResult         *result,
+							 GError              **error);
+
 G_END_DECLS
 
 #endif /* SOUP_SESSION_PRIVATE_H */
diff --git a/libsoup/soup-session-sync.c b/libsoup/soup-session-sync.c
index 1a919c7..a09c5b4 100644
--- a/libsoup/soup-session-sync.c
+++ b/libsoup/soup-session-sync.c
@@ -240,6 +240,19 @@ try_again:
 	item->state = SOUP_MESSAGE_READY;
 }
 
+static void process_queue_item (SoupMessageQueueItem *item);
+
+static void
+new_api_message_completed (SoupMessage *msg, gpointer user_data)
+{
+	SoupMessageQueueItem *item = user_data;
+
+	if (item->state != SOUP_MESSAGE_RESTARTING) {
+		item->state = SOUP_MESSAGE_FINISHING;
+		process_queue_item (item);
+	}
+}
+
 static void
 process_queue_item (SoupMessageQueueItem *item)
 {
@@ -249,7 +262,8 @@ process_queue_item (SoupMessageQueueItem *item)
 	SoupProxyURIResolver *proxy_resolver;
 	guint status;
 
-	item->state = SOUP_MESSAGE_STARTING;
+	soup_message_queue_item_ref (item);
+
 	do {
 		if (item->paused) {
 			g_mutex_lock (&priv->lock);
@@ -303,11 +317,22 @@ process_queue_item (SoupMessageQueueItem *item)
 
 		case SOUP_MESSAGE_READY:
 			item->state = SOUP_MESSAGE_RUNNING;
+
+			if (item->new_api) {
+				soup_session_send_queue_item (item->session, item, new_api_message_completed);
+				goto out;
+			}
+
 			soup_session_send_queue_item (item->session, item, NULL);
 			if (item->state != SOUP_MESSAGE_RESTARTING)
 				item->state = SOUP_MESSAGE_FINISHING;
 			break;
 
+		case SOUP_MESSAGE_RUNNING:
+			g_warn_if_fail (item->new_api);
+			item->state = SOUP_MESSAGE_FINISHING;
+			break;
+
 		case SOUP_MESSAGE_RESTARTING:
 			item->state = SOUP_MESSAGE_STARTING;
 			soup_message_restarted (item->msg);
@@ -326,6 +351,9 @@ process_queue_item (SoupMessageQueueItem *item)
 			break;
 		}
 	} while (item->state != SOUP_MESSAGE_FINISHED);
+
+ out:
+	soup_message_queue_item_unref (item);
 }
 
 static gboolean
@@ -476,3 +504,90 @@ kick (SoupSession *session)
 	g_cond_broadcast (&priv->cond);
 	g_mutex_unlock (&priv->lock);
 }
+
+
+GInputStream *
+soup_session_send_request (SoupSession   *session,
+			   SoupMessage   *msg,
+			   GCancellable  *cancellable,
+			   GError       **error)
+{
+	SoupMessageQueueItem *item;
+	GInputStream *stream = NULL;
+	GOutputStream *ostream;
+	GMemoryOutputStream *mostream;
+	gssize size;
+
+	g_return_val_if_fail (SOUP_IS_SESSION_SYNC (session), NULL);
+
+	SOUP_SESSION_CLASS (soup_session_sync_parent_class)->queue_message (session, msg, NULL, NULL);
+
+	item = soup_message_queue_lookup (soup_session_get_queue (session), msg);
+	g_return_val_if_fail (item != NULL, NULL);
+
+	item->new_api = TRUE;
+
+	while (!stream) {
+		/* Get a connection, etc */
+		process_queue_item (item);
+		if (SOUP_STATUS_IS_TRANSPORT_ERROR (msg->status_code))
+			break;
+
+		/* Send request, read headers */
+		if (!soup_message_io_run_until_read (msg, cancellable, error))
+			break;
+
+		stream = soup_message_io_get_response_istream (msg, error);
+		if (!stream)
+			break;
+
+		/* Break if the message doesn't look likely-to-be-requeued */
+		if (msg->status_code != SOUP_STATUS_UNAUTHORIZED &&
+		    msg->status_code != SOUP_STATUS_PROXY_UNAUTHORIZED &&
+		    !soup_session_would_redirect (session, msg))
+			break;
+
+		/* Gather the current message body... */
+		ostream = g_memory_output_stream_new (NULL, 0, g_realloc, g_free);
+		if (g_output_stream_splice (ostream, stream,
+					    G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE |
+					    G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET,
+					    cancellable, error) == -1) {
+			g_object_unref (stream);
+			g_object_unref (ostream);
+			stream = NULL;
+			break;
+		}
+		g_object_unref (stream);
+		stream = NULL;
+
+		/* If the message was requeued, loop */
+		if (item->state == SOUP_MESSAGE_RESTARTING) {
+			g_object_unref (ostream);
+			continue;
+		}
+
+		/* Not requeued, so return the original body */
+		mostream = G_MEMORY_OUTPUT_STREAM (ostream);
+		size = g_memory_output_stream_get_data_size (mostream);
+		stream = g_memory_input_stream_new ();
+		if (size) {
+			g_memory_input_stream_add_data (G_MEMORY_INPUT_STREAM (stream),
+							g_memory_output_stream_steal_data (mostream),
+							size, g_free);
+		}
+		g_object_unref (ostream);
+	}
+
+	if (SOUP_STATUS_IS_TRANSPORT_ERROR (msg->status_code)) {
+		if (stream) {
+			g_object_unref (stream);
+			stream = NULL;
+		}
+		g_set_error_literal (error, SOUP_HTTP_ERROR, msg->status_code,
+				     msg->reason_phrase);
+	}
+
+	soup_message_queue_item_unref (item);
+	return stream;
+}
diff --git a/libsoup/soup-session.c b/libsoup/soup-session.c
index fa95f7e..200a25a 100644
--- a/libsoup/soup-session.c
+++ b/libsoup/soup-session.c
@@ -1739,10 +1739,7 @@ redirect_handler (SoupMessage *msg, gpointer user_data)
 
 		if (new_uri)
 			soup_uri_free (new_uri);
-		if (invalid) {
-			/* Really we should just leave the status as-is,
-			 * but that would be an API break.
-			 */
+		if (invalid && !item->new_api) {
 			soup_message_set_status_full (msg,
 						      SOUP_STATUS_MALFORMED,
 						      "Invalid Redirect URL");
@@ -2247,6 +2244,7 @@ soup_session_pause_message (SoupSession *session,
 	priv = SOUP_SESSION_GET_PRIVATE (session);
 	item = soup_message_queue_lookup (priv->queue, msg);
 	g_return_if_fail (item != NULL);
+	g_return_if_fail (!item->new_api);
 
 	item->paused = TRUE;
 	if (item->state == SOUP_MESSAGE_RUNNING)
@@ -2279,6 +2277,7 @@ soup_session_unpause_message (SoupSession *session,
 	priv = SOUP_SESSION_GET_PRIVATE (session);
 	item = soup_message_queue_lookup (priv->queue, msg);
 	g_return_if_fail (item != NULL);
+	g_return_if_fail (!item->new_api);
 
 	item->paused = FALSE;
 	if (item->state == SOUP_MESSAGE_RUNNING)
diff --git a/po/POTFILES.in b/po/POTFILES.in
index b35ee88..80db055 100644
--- a/po/POTFILES.in
+++ b/po/POTFILES.in
@@ -1,4 +1,5 @@
 libsoup/soup-body-input-stream.c
 libsoup/soup-converter-wrapper.c
+libsoup/soup-message-io.c
 libsoup/soup-request.c
 libsoup/soup-requester.c



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]