[libsoup] soup-message-io: use gio streams rather than SoupSocket



commit c0414594616131e082e87b78b41542be6785158a
Author: Dan Winship <danw gnome org>
Date:   Wed Dec 8 15:56:37 2010 +0100

    soup-message-io: use gio streams rather than SoupSocket
    
    Use the socket's input/output streams for the base I/O, and add new
    SoupBodyInputStream and SoupBodyOutputStream that can be created from
    them to handle the body of a single message (including handling
    chunked encoding/decoding).
    
    Update chunk-test, which was assuming that the chunk_allocator
    callback would never be called if the message had a 0-length body;
    that's no longer true.

 libsoup/Makefile.am               |    4 +
 libsoup/soup-body-input-stream.c  |  362 +++++++++++++++
 libsoup/soup-body-input-stream.h  |   48 ++
 libsoup/soup-body-output-stream.c |  322 +++++++++++++
 libsoup/soup-body-output-stream.h |   47 ++
 libsoup/soup-message-io.c         |  920 +++++++++++++++----------------------
 libsoup/soup-socket.c             |   18 +-
 libsoup/soup-socket.h             |    2 +
 po/POTFILES.in                    |    1 +
 tests/chunk-test.c                |   10 -
 tests/connection-test.c           |   15 +-
 11 files changed, 1181 insertions(+), 568 deletions(-)
---
diff --git a/libsoup/Makefile.am b/libsoup/Makefile.am
index 4526ca5..5cfba04 100644
--- a/libsoup/Makefile.am
+++ b/libsoup/Makefile.am
@@ -95,6 +95,10 @@ libsoup_2_4_la_SOURCES =		\
 	soup-auth-manager.c		\
 	soup-auth-manager-ntlm.h	\
 	soup-auth-manager-ntlm.c	\
+	soup-body-input-stream.h	\
+	soup-body-input-stream.c	\
+	soup-body-output-stream.h	\
+	soup-body-output-stream.c	\
 	soup-cache.c			\
 	soup-cache-private.h		\
 	soup-connection.h		\
diff --git a/libsoup/soup-body-input-stream.c b/libsoup/soup-body-input-stream.c
new file mode 100644
index 0000000..2c5d16e
--- /dev/null
+++ b/libsoup/soup-body-input-stream.c
@@ -0,0 +1,362 @@
+/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
+/*
+ * soup-body-input-stream.c
+ *
+ * Copyright 2012 Red Hat, Inc.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#include <stdlib.h>
+#include <string.h>
+#include <gio/gio.h>
+
+#include <glib/gi18n-lib.h>
+
+#include "soup-body-input-stream.h"
+#include "soup-enum-types.h"
+#include "soup-filter-input-stream.h"
+#include "soup-message-headers.h"
+
+typedef enum {
+	SOUP_BODY_INPUT_STREAM_STATE_CHUNK_SIZE,
+	SOUP_BODY_INPUT_STREAM_STATE_CHUNK_END,
+	SOUP_BODY_INPUT_STREAM_STATE_CHUNK,
+	SOUP_BODY_INPUT_STREAM_STATE_TRAILERS,
+	SOUP_BODY_INPUT_STREAM_STATE_DONE
+} SoupBodyInputStreamState;
+
+struct _SoupBodyInputStreamPrivate {
+	GInputStream *base_stream;
+
+	SoupEncoding  encoding;
+	goffset       read_length;
+	SoupBodyInputStreamState chunked_state;
+	gboolean      eof;
+};
+
+enum {
+	PROP_0,
+
+	PROP_ENCODING,
+	PROP_CONTENT_LENGTH
+};
+
+static void soup_body_input_stream_pollable_init (GPollableInputStreamInterface *pollable_interface, gpointer interface_data);
+
+G_DEFINE_TYPE_WITH_CODE (SoupBodyInputStream, soup_body_input_stream, G_TYPE_FILTER_INPUT_STREAM,
+			 G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM,
+						soup_body_input_stream_pollable_init))
+
+static void
+soup_body_input_stream_init (SoupBodyInputStream *bistream)
+{
+	bistream->priv = G_TYPE_INSTANCE_GET_PRIVATE (bistream,
+						      SOUP_TYPE_BODY_INPUT_STREAM,
+						      SoupBodyInputStreamPrivate);
+	bistream->priv->encoding = SOUP_ENCODING_NONE;
+}
+
+static void
+constructed (GObject *object)
+{
+	SoupBodyInputStream *bistream = SOUP_BODY_INPUT_STREAM (object);
+
+	bistream->priv->base_stream = g_filter_input_stream_get_base_stream (G_FILTER_INPUT_STREAM (bistream));
+
+	if (bistream->priv->encoding == SOUP_ENCODING_NONE ||
+	    (bistream->priv->encoding == SOUP_ENCODING_CONTENT_LENGTH &&
+	     bistream->priv->read_length == 0))
+		bistream->priv->eof = TRUE;
+}
+
+static void
+set_property (GObject *object, guint prop_id,
+	      const GValue *value, GParamSpec *pspec)
+{
+	SoupBodyInputStream *bistream = SOUP_BODY_INPUT_STREAM (object);
+
+	switch (prop_id) {
+	case PROP_ENCODING:
+		bistream->priv->encoding = g_value_get_enum (value);
+		if (bistream->priv->encoding == SOUP_ENCODING_CHUNKED)
+			bistream->priv->chunked_state = SOUP_BODY_INPUT_STREAM_STATE_CHUNK_SIZE;
+		break;
+	case PROP_CONTENT_LENGTH:
+		bistream->priv->read_length = g_value_get_int64 (value);
+		break;
+	default:
+		G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+		break;
+	}
+}
+
+static void
+get_property (GObject *object, guint prop_id,
+	      GValue *value, GParamSpec *pspec)
+{
+	SoupBodyInputStream *bistream = SOUP_BODY_INPUT_STREAM (object);
+
+	switch (prop_id) {
+	case PROP_ENCODING:
+		g_value_set_enum (value, bistream->priv->encoding);
+		break;
+	default:
+		G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+		break;
+	}
+}
+
+static gssize
+soup_body_input_stream_read_raw (SoupBodyInputStream  *bistream,
+				 void                 *buffer,
+				 gsize                 count,
+				 gboolean              blocking,
+				 GCancellable         *cancellable,
+				 GError              **error)
+{
+	gssize nread;
+
+	nread = g_pollable_stream_read (bistream->priv->base_stream,
+					buffer, count,
+					blocking,
+					cancellable, error);
+	if (nread == 0) {
+		bistream->priv->eof = TRUE;
+		if (bistream->priv->encoding != SOUP_ENCODING_EOF) {
+			g_set_error_literal (error, G_IO_ERROR,
+					     G_IO_ERROR_PARTIAL_INPUT,
+					     _("Connection terminated unexpectedly"));
+			return -1;
+		}
+	}
+	return nread;
+}
+
+static gssize
+soup_body_input_stream_read_chunked (SoupBodyInputStream  *bistream,
+				     void                 *buffer,
+				     gsize                 count,
+				     gboolean              blocking,
+				     GCancellable         *cancellable,
+				     GError              **error)
+{
+	SoupFilterInputStream *fstream = SOUP_FILTER_INPUT_STREAM (bistream->priv->base_stream);
+	char metabuf[128];
+	gssize nread;
+	gboolean got_line;
+
+again:
+	switch (bistream->priv->chunked_state) {
+	case SOUP_BODY_INPUT_STREAM_STATE_CHUNK_SIZE:
+		nread = soup_filter_input_stream_read_line (
+			fstream, metabuf, sizeof (metabuf), blocking,
+			&got_line, cancellable, error);
+		if (nread <= 0)
+			return nread;
+		if (!got_line) {
+			g_set_error_literal (error, G_IO_ERROR,
+					     G_IO_ERROR_PARTIAL_INPUT,
+					     _("Connection terminated unexpectedly"));
+			return -1;
+		}
+
+		bistream->priv->read_length = strtoul (metabuf, NULL, 16);
+		if (bistream->priv->read_length > 0)
+			bistream->priv->chunked_state = SOUP_BODY_INPUT_STREAM_STATE_CHUNK;
+		else
+			bistream->priv->chunked_state = SOUP_BODY_INPUT_STREAM_STATE_TRAILERS;
+		break;
+
+	case SOUP_BODY_INPUT_STREAM_STATE_CHUNK:
+		nread = soup_body_input_stream_read_raw (
+			bistream, buffer,
+			MIN (count, bistream->priv->read_length),
+			blocking, cancellable, error);
+		if (nread > 0) {
+			bistream->priv->read_length -= nread;
+			if (bistream->priv->read_length == 0)
+				bistream->priv->chunked_state = SOUP_BODY_INPUT_STREAM_STATE_CHUNK_END;
+		}
+		return nread;
+
+	case SOUP_BODY_INPUT_STREAM_STATE_CHUNK_END:
+		nread = soup_filter_input_stream_read_line (
+			SOUP_FILTER_INPUT_STREAM (bistream->priv->base_stream),
+			metabuf, sizeof (metabuf), blocking,
+			&got_line, cancellable, error);
+		if (nread <= 0)
+			return nread;
+		if (!got_line) {
+			g_set_error_literal (error, G_IO_ERROR,
+					     G_IO_ERROR_PARTIAL_INPUT,
+					     _("Connection terminated unexpectedly"));
+			return -1;
+		}
+
+		bistream->priv->chunked_state = SOUP_BODY_INPUT_STREAM_STATE_CHUNK_SIZE;
+		break;
+
+	case SOUP_BODY_INPUT_STREAM_STATE_TRAILERS:
+		nread = soup_filter_input_stream_read_line (
+			fstream, buffer, count, blocking,
+			&got_line, cancellable, error);
+		if (nread <= 0)
+			return nread;
+
+		if (strncmp (buffer, "\r\n", nread) || strncmp (buffer, "\n", nread))
+			bistream->priv->chunked_state = SOUP_BODY_INPUT_STREAM_STATE_DONE;
+		break;
+
+	case SOUP_BODY_INPUT_STREAM_STATE_DONE:
+		return 0;
+	}
+
+	goto again;
+}
+
+static gssize
+read_internal (GInputStream  *stream,
+	       void          *buffer,
+	       gsize          count,
+	       gboolean       blocking,
+	       GCancellable  *cancellable,
+	       GError       **error)
+{
+	SoupBodyInputStream *bistream = SOUP_BODY_INPUT_STREAM (stream);
+	gssize nread;
+
+	if (bistream->priv->eof)
+		return 0;
+
+	switch (bistream->priv->encoding) {
+	case SOUP_ENCODING_NONE:
+		return 0;
+
+	case SOUP_ENCODING_CHUNKED:
+		return soup_body_input_stream_read_chunked (bistream, buffer, count,
+							    blocking, cancellable, error);
+
+	case SOUP_ENCODING_CONTENT_LENGTH:
+	case SOUP_ENCODING_EOF:
+		if (bistream->priv->read_length != -1) {
+			count = MIN (count, bistream->priv->read_length);
+			if (count == 0)
+				return 0;
+		}
+
+		nread = soup_body_input_stream_read_raw (bistream, buffer, count,
+							 blocking, cancellable, error);
+		if (bistream->priv->read_length != -1 && nread > 0)
+			bistream->priv->read_length -= nread;
+		return nread;
+
+	default:
+		g_return_val_if_reached (-1);
+	}
+}
+
+static gssize
+soup_body_input_stream_read_fn (GInputStream  *stream,
+				void          *buffer,
+				gsize          count,
+				GCancellable  *cancellable,
+				GError       **error)
+{
+	return read_internal (stream, buffer, count, TRUE,
+			      cancellable, error);
+}
+
+static gboolean
+soup_body_input_stream_is_readable (GPollableInputStream *stream)
+{
+	SoupBodyInputStream *bistream = SOUP_BODY_INPUT_STREAM (stream);
+
+	return bistream->priv->eof ||
+		g_pollable_input_stream_is_readable (G_POLLABLE_INPUT_STREAM (bistream->priv->base_stream));
+}
+
+static gssize
+soup_body_input_stream_read_nonblocking (GPollableInputStream  *stream,
+					 void                  *buffer,
+					 gsize                  count,
+					 GError               **error)
+{
+	return read_internal (G_INPUT_STREAM (stream), buffer, count, FALSE,
+			      NULL, error);
+}
+
+static GSource *
+soup_body_input_stream_create_source (GPollableInputStream *stream,
+				      GCancellable *cancellable)
+{
+	SoupBodyInputStream *bistream = SOUP_BODY_INPUT_STREAM (stream);
+	GSource *base_source, *pollable_source;
+
+	if (bistream->priv->eof)
+		base_source = g_timeout_source_new (0);
+	else
+		base_source = g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM (bistream->priv->base_stream), cancellable);
+	g_source_set_dummy_callback (base_source);
+
+	pollable_source = g_pollable_source_new (G_OBJECT (stream));
+	g_source_add_child_source (pollable_source, base_source);
+	g_source_unref (base_source);
+
+	return pollable_source;
+}
+
+static void
+soup_body_input_stream_class_init (SoupBodyInputStreamClass *stream_class)
+{
+	GObjectClass *object_class = G_OBJECT_CLASS (stream_class);
+	GInputStreamClass *input_stream_class = G_INPUT_STREAM_CLASS (stream_class);
+
+	g_type_class_add_private (stream_class, sizeof (SoupBodyInputStreamPrivate));
+
+	object_class->constructed = constructed;
+	object_class->set_property = set_property;
+	object_class->get_property = get_property;
+
+	input_stream_class->read_fn = soup_body_input_stream_read_fn;
+
+	g_object_class_install_property (
+		object_class, PROP_ENCODING,
+		g_param_spec_enum ("encoding",
+				   "Encoding",
+				   "Message body encoding",
+				   SOUP_TYPE_ENCODING,
+				   SOUP_ENCODING_NONE,
+				   G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY));
+	g_object_class_install_property (
+		object_class, PROP_CONTENT_LENGTH,
+		g_param_spec_int64 ("content-length",
+				    "Content-Length",
+				    "Message body Content-Length",
+				    -1, G_MAXINT64, -1,
+				    G_PARAM_WRITABLE | G_PARAM_CONSTRUCT_ONLY));
+}
+
+static void
+soup_body_input_stream_pollable_init (GPollableInputStreamInterface *pollable_interface,
+				 gpointer interface_data)
+{
+	pollable_interface->is_readable = soup_body_input_stream_is_readable;
+	pollable_interface->read_nonblocking = soup_body_input_stream_read_nonblocking;
+	pollable_interface->create_source = soup_body_input_stream_create_source;
+}
+
+GInputStream *
+soup_body_input_stream_new (SoupFilterInputStream *base_stream,
+			    SoupEncoding           encoding,
+			    goffset                content_length)
+{
+	return g_object_new (SOUP_TYPE_BODY_INPUT_STREAM,
+			     "base-stream", base_stream,
+			     "close-base-stream", FALSE,
+			     "encoding", encoding,
+			     "content-length", content_length,
+			     NULL);
+}
diff --git a/libsoup/soup-body-input-stream.h b/libsoup/soup-body-input-stream.h
new file mode 100644
index 0000000..9e0c08e
--- /dev/null
+++ b/libsoup/soup-body-input-stream.h
@@ -0,0 +1,48 @@
+/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
+/*
+ * Copyright 2012 Red Hat, Inc.
+ */
+
+#ifndef SOUP_BODY_INPUT_STREAM_H
+#define SOUP_BODY_INPUT_STREAM_H 1
+
+#include "soup-types.h"
+#include "soup-filter-input-stream.h"
+#include "soup-message-headers.h"
+
+G_BEGIN_DECLS
+
+#define SOUP_TYPE_BODY_INPUT_STREAM            (soup_body_input_stream_get_type ())
+#define SOUP_BODY_INPUT_STREAM(obj)            (G_TYPE_CHECK_INSTANCE_CAST ((obj), SOUP_TYPE_BODY_INPUT_STREAM, SoupBodyInputStream))
+#define SOUP_BODY_INPUT_STREAM_CLASS(klass)    (G_TYPE_CHECK_CLASS_CAST ((klass), SOUP_TYPE_BODY_INPUT_STREAM, SoupBodyInputStreamClass))
+#define SOUP_IS_BODY_INPUT_STREAM(obj)         (G_TYPE_CHECK_INSTANCE_TYPE ((obj), SOUP_TYPE_BODY_INPUT_STREAM))
+#define SOUP_IS_BODY_INPUT_STREAM_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE ((obj), SOUP_TYPE_BODY_INPUT_STREAM))
+#define SOUP_BODY_INPUT_STREAM_GET_CLASS(obj)  (G_TYPE_INSTANCE_GET_CLASS ((obj), SOUP_TYPE_BODY_INPUT_STREAM, SoupBodyInputStreamClass))
+
+typedef struct _SoupBodyInputStreamPrivate SoupBodyInputStreamPrivate;
+
+typedef struct {
+	GFilterInputStream parent;
+
+	SoupBodyInputStreamPrivate *priv;
+} SoupBodyInputStream;
+
+typedef struct {
+	GFilterInputStreamClass parent_class;
+
+	/* Padding for future expansion */
+	void (*_libsoup_reserved1) (void);
+	void (*_libsoup_reserved2) (void);
+	void (*_libsoup_reserved3) (void);
+	void (*_libsoup_reserved4) (void);
+} SoupBodyInputStreamClass;
+
+GType soup_body_input_stream_get_type (void);
+
+GInputStream *soup_body_input_stream_new (SoupFilterInputStream *base_stream,
+					  SoupEncoding           encoding,
+					  goffset                content_length);
+
+G_END_DECLS
+
+#endif /* SOUP_BODY_INPUT_STREAM_H */
diff --git a/libsoup/soup-body-output-stream.c b/libsoup/soup-body-output-stream.c
new file mode 100644
index 0000000..269ec71
--- /dev/null
+++ b/libsoup/soup-body-output-stream.c
@@ -0,0 +1,322 @@
+/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
+/*
+ * soup-body-output-stream.c
+ *
+ * Copyright 2012 Red Hat, Inc.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <gio/gio.h>
+
+#include "soup-body-output-stream.h"
+#include "soup-enum-types.h"
+#include "soup-message-headers.h"
+
+typedef enum {
+	SOUP_BODY_OUTPUT_STREAM_STATE_CHUNK_SIZE,
+	SOUP_BODY_OUTPUT_STREAM_STATE_CHUNK_END,
+	SOUP_BODY_OUTPUT_STREAM_STATE_CHUNK,
+	SOUP_BODY_OUTPUT_STREAM_STATE_TRAILERS,
+	SOUP_BODY_OUTPUT_STREAM_STATE_DONE
+} SoupBodyOutputStreamState;
+
+struct _SoupBodyOutputStreamPrivate {
+	GOutputStream *base_stream;
+	char           buf[20];
+
+	SoupEncoding   encoding;
+	goffset        write_length;
+	goffset        written;
+	SoupBodyOutputStreamState chunked_state;
+	gboolean       eof;
+};
+
+enum {
+	PROP_0,
+
+	PROP_ENCODING,
+	PROP_CONTENT_LENGTH
+};
+
+static void soup_body_output_stream_pollable_init (GPollableOutputStreamInterface *pollable_interface, gpointer interface_data);
+
+G_DEFINE_TYPE_WITH_CODE (SoupBodyOutputStream, soup_body_output_stream, G_TYPE_FILTER_OUTPUT_STREAM,
+			 G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_OUTPUT_STREAM,
+						soup_body_output_stream_pollable_init))
+
+
+static void
+soup_body_output_stream_init (SoupBodyOutputStream *stream)
+{
+	stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream,
+						    SOUP_TYPE_BODY_OUTPUT_STREAM,
+						    SoupBodyOutputStreamPrivate);
+}
+
+static void
+constructed (GObject *object)
+{
+	SoupBodyOutputStream *bostream = SOUP_BODY_OUTPUT_STREAM (object);
+
+	bostream->priv->base_stream = g_filter_output_stream_get_base_stream (G_FILTER_OUTPUT_STREAM (bostream));
+}
+
+static void
+set_property (GObject *object, guint prop_id,
+	      const GValue *value, GParamSpec *pspec)
+{
+	SoupBodyOutputStream *bostream = SOUP_BODY_OUTPUT_STREAM (object);
+
+	switch (prop_id) {
+	case PROP_ENCODING:
+		bostream->priv->encoding = g_value_get_enum (value);
+		if (bostream->priv->encoding == SOUP_ENCODING_CHUNKED)
+			bostream->priv->chunked_state = SOUP_BODY_OUTPUT_STREAM_STATE_CHUNK_SIZE;
+		break;
+	case PROP_CONTENT_LENGTH:
+		bostream->priv->write_length = g_value_get_uint64 (value);
+		break;
+	default:
+		G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+		break;
+	}
+}
+
+static void
+get_property (GObject *object, guint prop_id,
+	      GValue *value, GParamSpec *pspec)
+{
+	SoupBodyOutputStream *bostream = SOUP_BODY_OUTPUT_STREAM (object);
+
+	switch (prop_id) {
+	case PROP_ENCODING:
+		g_value_set_enum (value, bostream->priv->encoding);
+		break;
+	default:
+		G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+		break;
+	}
+}
+
+static gssize
+soup_body_output_stream_write_raw (SoupBodyOutputStream  *bostream,
+				   const void            *buffer,
+				   gsize                  count,
+				   GCancellable          *cancellable,
+				   GError               **error)
+{
+	gssize nwrote, my_count;
+
+	/* If the caller tries to write too much to a Content-Length
+	 * encoded stream, we truncate at the right point, but keep
+	 * accepting additional data until they stop.
+	 */
+	if (bostream->priv->write_length) {
+		my_count = MIN (count, bostream->priv->write_length - bostream->priv->written);
+		if (my_count == 0) {
+			bostream->priv->eof = TRUE;
+			return count;
+		}
+	} else
+		my_count = count;
+
+	nwrote = g_output_stream_write (bostream->priv->base_stream,
+					buffer, my_count,
+					cancellable, error);
+
+	if (nwrote > 0 && bostream->priv->write_length)
+		bostream->priv->written += nwrote;
+
+	if (nwrote == my_count && my_count != count)
+		nwrote = count;
+
+	return nwrote;
+}
+
+static gssize
+soup_body_output_stream_write_chunked (SoupBodyOutputStream  *bostream,
+				       const void            *buffer,
+				       gsize                  count,
+				       GCancellable          *cancellable,
+				       GError               **error)
+{
+	char *buf = bostream->priv->buf;
+	gssize nwrote, len;
+
+again:
+	len = strlen (buf);
+	if (len) {
+		nwrote = g_output_stream_write (bostream->priv->base_stream,
+						buf, len, cancellable, error);
+		if (nwrote < 0)
+			return nwrote;
+		memmove (buf, buf + nwrote, len + 1 - nwrote);
+		goto again;
+	}
+
+	switch (bostream->priv->chunked_state) {
+	case SOUP_BODY_OUTPUT_STREAM_STATE_CHUNK_SIZE:
+		snprintf (buf, sizeof (bostream->priv->buf),
+			  "%lx\r\n", (gulong)count);
+		len = strlen (buf);
+
+		if (count > 0)
+			bostream->priv->chunked_state = SOUP_BODY_OUTPUT_STREAM_STATE_CHUNK;
+		else
+			bostream->priv->chunked_state = SOUP_BODY_OUTPUT_STREAM_STATE_TRAILERS;
+		break;
+
+	case SOUP_BODY_OUTPUT_STREAM_STATE_CHUNK:
+		nwrote = g_output_stream_write (bostream->priv->base_stream,
+						buffer, count, cancellable, error);
+		if (nwrote < (gssize)count)
+			return nwrote;
+
+		bostream->priv->chunked_state = SOUP_BODY_OUTPUT_STREAM_STATE_CHUNK_END;
+		break;
+
+	case SOUP_BODY_OUTPUT_STREAM_STATE_CHUNK_END:
+		strncpy (buf, "\r\n", sizeof (bostream->priv->buf));
+		len = 2;
+		bostream->priv->chunked_state = SOUP_BODY_OUTPUT_STREAM_STATE_DONE;
+		break;
+
+	case SOUP_BODY_OUTPUT_STREAM_STATE_TRAILERS:
+		strncpy (buf, "\r\n", sizeof (bostream->priv->buf));
+		len = 2;
+		bostream->priv->chunked_state = SOUP_BODY_OUTPUT_STREAM_STATE_DONE;
+		break;
+
+	case SOUP_BODY_OUTPUT_STREAM_STATE_DONE:
+		bostream->priv->chunked_state = SOUP_BODY_OUTPUT_STREAM_STATE_CHUNK_SIZE;
+		return count;
+	}
+
+	goto again;
+}
+
+static gssize
+soup_body_output_stream_write_fn (GOutputStream  *stream,
+				  const void     *buffer,
+				  gsize           count,
+				  GCancellable   *cancellable,
+				  GError        **error)
+{
+	SoupBodyOutputStream *bostream = SOUP_BODY_OUTPUT_STREAM (stream);
+
+	if (bostream->priv->eof)
+		return count;
+
+	switch (bostream->priv->encoding) {
+	case SOUP_ENCODING_CHUNKED:
+		return soup_body_output_stream_write_chunked (bostream, buffer, count,
+							      cancellable, error);
+
+	default:
+		return soup_body_output_stream_write_raw (bostream, buffer, count,
+							  cancellable, error);
+	}
+}
+
+static gboolean
+soup_body_output_stream_close_fn (GOutputStream  *stream,
+				  GCancellable   *cancellable,
+				  GError        **error)
+{
+	SoupBodyOutputStream *bostream = SOUP_BODY_OUTPUT_STREAM (stream);
+
+	if (bostream->priv->encoding == SOUP_ENCODING_CHUNKED) {
+		if (soup_body_output_stream_write_chunked (bostream, NULL, 0, cancellable, error) == -1)
+			return FALSE;
+	}
+
+	return G_OUTPUT_STREAM_CLASS (soup_body_output_stream_parent_class)->close_fn (stream, cancellable, error);
+}
+
+static gboolean
+soup_body_output_stream_is_writable (GPollableOutputStream *stream)
+{
+	SoupBodyOutputStream *bostream = SOUP_BODY_OUTPUT_STREAM (stream);
+
+	return bostream->priv->eof ||
+		g_pollable_output_stream_is_writable (G_POLLABLE_OUTPUT_STREAM (bostream->priv->base_stream));
+}
+
+static GSource *
+soup_body_output_stream_create_source (GPollableOutputStream *stream,
+				       GCancellable *cancellable)
+{
+	SoupBodyOutputStream *bostream = SOUP_BODY_OUTPUT_STREAM (stream);
+	GSource *base_source, *pollable_source;
+
+	if (bostream->priv->eof)
+		base_source = g_timeout_source_new (0);
+	else
+		base_source = g_pollable_output_stream_create_source (G_POLLABLE_OUTPUT_STREAM (bostream->priv->base_stream), cancellable);
+	g_source_set_dummy_callback (base_source);
+
+	pollable_source = g_pollable_source_new (G_OBJECT (stream));
+	g_source_add_child_source (pollable_source, base_source);
+	g_source_unref (base_source);
+
+	return pollable_source;
+}
+
+static void
+soup_body_output_stream_class_init (SoupBodyOutputStreamClass *stream_class)
+{
+	GObjectClass *object_class = G_OBJECT_CLASS (stream_class);
+	GOutputStreamClass *output_stream_class = G_OUTPUT_STREAM_CLASS (stream_class);
+
+	g_type_class_add_private (stream_class, sizeof (SoupBodyOutputStreamPrivate));
+
+	object_class->constructed = constructed;
+	object_class->set_property = set_property;
+	object_class->get_property = get_property;
+
+	output_stream_class->write_fn = soup_body_output_stream_write_fn;
+	output_stream_class->close_fn = soup_body_output_stream_close_fn;
+
+	g_object_class_install_property (
+		object_class, PROP_ENCODING,
+		g_param_spec_enum ("encoding",
+				   "Encoding",
+				   "Message body encoding",
+				   SOUP_TYPE_ENCODING,
+				   SOUP_ENCODING_NONE,
+				   G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY));
+	g_object_class_install_property (
+		object_class, PROP_CONTENT_LENGTH,
+		g_param_spec_uint64 ("content-length",
+				     "Content-Length",
+				     "Message body Content-Length",
+				     0, G_MAXUINT64, 0,
+				     G_PARAM_WRITABLE | G_PARAM_CONSTRUCT_ONLY));
+}
+
+static void
+soup_body_output_stream_pollable_init (GPollableOutputStreamInterface *pollable_interface,
+				       gpointer interface_data)
+{
+	pollable_interface->is_writable = soup_body_output_stream_is_writable;
+	pollable_interface->create_source = soup_body_output_stream_create_source;
+}
+
+GOutputStream *
+soup_body_output_stream_new (GOutputStream *base_stream,
+			     SoupEncoding   encoding,
+			     goffset        content_length)
+{
+	return g_object_new (SOUP_TYPE_BODY_OUTPUT_STREAM,
+			     "base-stream", base_stream,
+			     "close-base-stream", FALSE,
+			     "encoding", encoding,
+			     "content-length", content_length,
+			     NULL);
+}
diff --git a/libsoup/soup-body-output-stream.h b/libsoup/soup-body-output-stream.h
new file mode 100644
index 0000000..8bd8970
--- /dev/null
+++ b/libsoup/soup-body-output-stream.h
@@ -0,0 +1,47 @@
+/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
+/*
+ * Copyright 2012 Red Hat, Inc.
+ */
+
+#ifndef SOUP_BODY_OUTPUT_STREAM_H
+#define SOUP_BODY_OUTPUT_STREAM_H 1
+
+#include "soup-types.h"
+#include "soup-message-headers.h"
+
+G_BEGIN_DECLS
+
+#define SOUP_TYPE_BODY_OUTPUT_STREAM            (soup_body_output_stream_get_type ())
+#define SOUP_BODY_OUTPUT_STREAM(obj)            (G_TYPE_CHECK_INSTANCE_CAST ((obj), SOUP_TYPE_BODY_OUTPUT_STREAM, SoupBodyOutputStream))
+#define SOUP_BODY_OUTPUT_STREAM_CLASS(klass)    (G_TYPE_CHECK_CLASS_CAST ((klass), SOUP_TYPE_BODY_OUTPUT_STREAM, SoupBodyOutputStreamClass))
+#define SOUP_IS_BODY_OUTPUT_STREAM(obj)         (G_TYPE_CHECK_INSTANCE_TYPE ((obj), SOUP_TYPE_BODY_OUTPUT_STREAM))
+#define SOUP_IS_BODY_OUTPUT_STREAM_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE ((obj), SOUP_TYPE_BODY_OUTPUT_STREAM))
+#define SOUP_BODY_OUTPUT_STREAM_GET_CLASS(obj)  (G_TYPE_INSTANCE_GET_CLASS ((obj), SOUP_TYPE_BODY_OUTPUT_STREAM, SoupBodyOutputStreamClass))
+
+typedef struct _SoupBodyOutputStreamPrivate SoupBodyOutputStreamPrivate;
+
+typedef struct {
+	GFilterOutputStream parent;
+
+	SoupBodyOutputStreamPrivate *priv;
+} SoupBodyOutputStream;
+
+typedef struct {
+	GFilterOutputStreamClass parent_class;
+
+	/* Padding for future expansion */
+	void (*_libsoup_reserved1) (void);
+	void (*_libsoup_reserved2) (void);
+	void (*_libsoup_reserved3) (void);
+	void (*_libsoup_reserved4) (void);
+} SoupBodyOutputStreamClass;
+
+GType soup_body_output_stream_get_type (void);
+
+GOutputStream *soup_body_output_stream_new (GOutputStream *base_stream,
+					    SoupEncoding   encoding,
+					    goffset        content_length);
+
+G_END_DECLS
+
+#endif /* SOUP_BODY_OUTPUT_STREAM_H */
diff --git a/libsoup/soup-message-io.c b/libsoup/soup-message-io.c
index cf2a2e3..aabb902 100644
--- a/libsoup/soup-message-io.c
+++ b/libsoup/soup-message-io.c
@@ -12,7 +12,10 @@
 #include <stdlib.h>
 #include <string.h>
 
+#include "soup-body-input-stream.h"
+#include "soup-body-output-stream.h"
 #include "soup-connection.h"
+#include "soup-filter-input-stream.h"
 #include "soup-message.h"
 #include "soup-message-private.h"
 #include "soup-message-queue.h"
@@ -28,11 +31,10 @@ typedef enum {
 	SOUP_MESSAGE_IO_STATE_NOT_STARTED,
 	SOUP_MESSAGE_IO_STATE_HEADERS,
 	SOUP_MESSAGE_IO_STATE_BLOCKING,
+	SOUP_MESSAGE_IO_STATE_BODY_START,
 	SOUP_MESSAGE_IO_STATE_BODY,
-	SOUP_MESSAGE_IO_STATE_CHUNK_SIZE,
-	SOUP_MESSAGE_IO_STATE_CHUNK,
-	SOUP_MESSAGE_IO_STATE_CHUNK_END,
-	SOUP_MESSAGE_IO_STATE_TRAILERS,
+	SOUP_MESSAGE_IO_STATE_BODY_DATA,
+	SOUP_MESSAGE_IO_STATE_BODY_DONE,
 	SOUP_MESSAGE_IO_STATE_FINISHING,
 	SOUP_MESSAGE_IO_STATE_DONE
 } SoupMessageIOState;
@@ -43,17 +45,23 @@ typedef enum {
 	 state != SOUP_MESSAGE_IO_STATE_DONE)
 
 typedef struct {
-	SoupSocket           *sock;
 	SoupMessageQueueItem *item;
 	SoupMessageIOMode     mode;
 	GCancellable         *cancellable;
 
+	SoupSocket             *sock;
+	SoupFilterInputStream  *istream;
+	GInputStream           *body_istream;
+	GOutputStream          *ostream;
+	GOutputStream          *body_ostream;
+	GMainContext           *async_context;
+	gboolean                blocking;
+
 	SoupMessageIOState    read_state;
 	SoupEncoding          read_encoding;
-	GByteArray           *read_meta_buf;
+	GByteArray           *read_header_buf;
 	SoupMessageBody      *read_body;
 	goffset               read_length;
-	gboolean              read_eof_ok;
 
 	gboolean              need_content_sniffed, need_got_chunk;
 	SoupMessageBody      *sniff_data;
@@ -67,8 +75,9 @@ typedef struct {
 	goffset               write_length;
 	goffset               written;
 
-	guint read_tag, write_tag;
+	GSource *io_source;
 	GSource *unpause_source;
+	gboolean paused;
 
 	SoupMessageGetHeadersFn   get_headers_cb;
 	SoupMessageParseHeadersFn parse_headers_cb;
@@ -83,8 +92,8 @@ typedef struct {
  */
 #define dummy_to_make_emacs_happy {
 #define SOUP_MESSAGE_IO_PREPARE_FOR_CALLBACK { gboolean cancelled; g_object_ref (msg);
-#define SOUP_MESSAGE_IO_RETURN_IF_CANCELLED_OR_PAUSED cancelled = (priv->io_data != io); g_object_unref (msg); if (cancelled || (!io->read_tag && !io->write_tag)) return; }
-#define SOUP_MESSAGE_IO_RETURN_VAL_IF_CANCELLED_OR_PAUSED(val) cancelled = (priv->io_data != io); g_object_unref (msg); if (cancelled || (!io->read_tag && !io->write_tag)) return val; }
+#define SOUP_MESSAGE_IO_RETURN_IF_CANCELLED_OR_PAUSED cancelled = (priv->io_data != io); g_object_unref (msg); if (cancelled || io->paused) return; }
+#define SOUP_MESSAGE_IO_RETURN_VAL_IF_CANCELLED_OR_PAUSED(val) cancelled = (priv->io_data != io); g_object_unref (msg); if (cancelled || io->paused) return val; }
 
 #define RESPONSE_BLOCK_SIZE 8192
 
@@ -103,10 +112,20 @@ soup_message_io_cleanup (SoupMessage *msg)
 
 	if (io->sock)
 		g_object_unref (io->sock);
+	if (io->istream)
+		g_object_remove_weak_pointer (G_OBJECT (io->istream), (gpointer *)&io->istream);
+	if (io->ostream)
+		g_object_remove_weak_pointer (G_OBJECT (io->ostream), (gpointer *)&io->ostream);
+	if (io->body_istream)
+		g_object_unref (io->body_istream);
+	if (io->body_ostream)
+		g_object_unref (io->body_ostream);
+	if (io->async_context)
+		g_main_context_unref (io->async_context);
 	if (io->item)
 		soup_message_queue_item_unref (io->item);
 
-	g_byte_array_free (io->read_meta_buf, TRUE);
+	g_byte_array_free (io->read_header_buf, TRUE);
 
 	g_string_free (io->write_buf, TRUE);
 	if (io->write_chunk)
@@ -127,13 +146,9 @@ soup_message_io_stop (SoupMessage *msg)
 	if (!io)
 		return;
 
-	if (io->read_tag) {
-		g_signal_handler_disconnect (io->sock, io->read_tag);
-		io->read_tag = 0;
-	}
-	if (io->write_tag) {
-		g_signal_handler_disconnect (io->sock, io->write_tag);
-		io->write_tag = 0;
+	if (io->io_source) {
+		g_source_destroy (io->io_source);
+		io->io_source = NULL;
 	}
 
 	if (io->unpause_source) {
@@ -145,9 +160,6 @@ soup_message_io_stop (SoupMessage *msg)
 		soup_socket_disconnect (io->sock);
 }
 
-#define SOUP_MESSAGE_IO_EOL            "\r\n"
-#define SOUP_MESSAGE_IO_EOL_LEN        2
-
 void
 soup_message_io_finished (SoupMessage *msg)
 {
@@ -163,8 +175,6 @@ soup_message_io_finished (SoupMessage *msg)
 	g_object_unref (msg);
 }
 
-static void io_read (SoupSocket *sock, SoupMessage *msg);
-
 static gboolean
 request_is_idempotent (SoupMessage *msg)
 {
@@ -184,7 +194,7 @@ io_error (SoupSocket *sock, SoupMessage *msg, GError *error)
 					      error->message);
 	} else if (io->mode == SOUP_MESSAGE_IO_CLIENT &&
 		   io->read_state <= SOUP_MESSAGE_IO_STATE_HEADERS &&
-		   io->read_meta_buf->len == 0 &&
+		   io->read_header_buf->len == 0 &&
 		   soup_connection_get_ever_used (io->item->conn) &&
 		   !g_error_matches (error, G_IO_ERROR, G_IO_ERROR_TIMED_OUT) &&
 		   request_is_idempotent (msg)) {
@@ -248,87 +258,54 @@ io_handle_sniffing (SoupMessage *msg, gboolean done_reading)
 	return TRUE;
 }
 
-/* Reads data from io->sock into io->read_meta_buf. If @to_blank is
- * %TRUE, it reads up until a blank line ("CRLF CRLF" or "LF LF").
- * Otherwise, it reads up until a single CRLF or LF.
- *
- * This function is used to read metadata, and read_body_chunk() is
- * used to read the message body contents.
- *
- * read_metadata, read_body_chunk, and write_data all use the same
- * convention for return values: if they return %TRUE, it means
- * they've completely finished the requested read/write, and the
- * caller should move on to the next step. If they return %FALSE, it
- * means that either (a) the socket returned SOUP_SOCKET_WOULD_BLOCK,
- * so the caller should give up for now and wait for the socket to
- * emit a signal, or (b) the socket returned an error, and io_error()
- * was called to process it and cancel the I/O. So either way, if the
- * function returns %FALSE, the caller should return immediately.
- */
 static gboolean
-read_metadata (SoupMessage *msg, gboolean to_blank)
+read_headers (SoupMessage *msg, GCancellable *cancellable, GError **error)
 {
 	SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
 	SoupMessageIOData *io = priv->io_data;
-	SoupSocketIOStatus status;
-	guchar read_buf[RESPONSE_BLOCK_SIZE];
-	gsize nread;
+	gssize nread, old_len;
 	gboolean got_lf;
-	GError *error = NULL;
 
 	while (1) {
-		status = soup_socket_read_until (io->sock, read_buf,
-						 sizeof (read_buf),
-						 "\n", 1, &nread, &got_lf,
-						 io->cancellable, &error);
-		switch (status) {
-		case SOUP_SOCKET_OK:
-			g_byte_array_append (io->read_meta_buf, read_buf, nread);
-			break;
-
-		case SOUP_SOCKET_EOF:
-			/* More lame server handling... deal with
-			 * servers that don't send the final chunk.
-			 */
-			if (io->read_state == SOUP_MESSAGE_IO_STATE_CHUNK_SIZE &&
-			    io->read_meta_buf->len == 0) {
-				g_byte_array_append (io->read_meta_buf,
-						     (guchar *)"0\r\n", 3);
-				got_lf = TRUE;
-				break;
-			} else if (io->read_state == SOUP_MESSAGE_IO_STATE_TRAILERS &&
-				   io->read_meta_buf->len == 0) {
-				g_byte_array_append (io->read_meta_buf,
-						     (guchar *)"\r\n", 2);
-				got_lf = TRUE;
-				break;
-			}
-			/* else fall through */
-
-		case SOUP_SOCKET_ERROR:
-			io_error (io->sock, msg, error);
-			return FALSE;
-
-		case SOUP_SOCKET_WOULD_BLOCK:
+		old_len = io->read_header_buf->len;
+		g_byte_array_set_size (io->read_header_buf, old_len + RESPONSE_BLOCK_SIZE);
+		nread = soup_filter_input_stream_read_line (io->istream,
+							    io->read_header_buf->data + old_len,
+							    RESPONSE_BLOCK_SIZE,
+							    io->blocking,
+							    &got_lf,
+							    cancellable, error);
+		io->read_header_buf->len = old_len + MAX (nread, 0);
+		if (nread == 0)
+			io_error (io->sock, msg, NULL);
+		if (nread <= 0)
 			return FALSE;
-		}
 
 		if (got_lf) {
-			if (!to_blank)
-				break;
-			if (nread == 1 && io->read_meta_buf->len >= 2 &&
-			    !strncmp ((char *)io->read_meta_buf->data +
-				      io->read_meta_buf->len - 2,
+			if (nread == 1 && old_len >= 2 &&
+			    !strncmp ((char *)io->read_header_buf->data +
+				      io->read_header_buf->len - 2,
 				      "\n\n", 2))
 				break;
-			else if (nread == 2 && io->read_meta_buf->len >= 3 &&
-				 !strncmp ((char *)io->read_meta_buf->data +
-					   io->read_meta_buf->len - 3,
+			else if (nread == 2 && old_len >= 3 &&
+				 !strncmp ((char *)io->read_header_buf->data +
+					   io->read_header_buf->len - 3,
 					   "\n\r\n", 3))
 				break;
 		}
 	}
 
+	/* We need to "rewind" io->read_header_buf back one line.
+	 * That SHOULD be two characters (CR LF), but if the
+	 * web server was stupid, it might only be one.
+	 */
+	if (io->read_header_buf->len < 3 ||
+	    io->read_header_buf->data[io->read_header_buf->len - 2] == '\n')
+		io->read_header_buf->len--;
+	else
+		io->read_header_buf->len -= 2;
+	io->read_header_buf->data[io->read_header_buf->len] = '\0';
+
 	return TRUE;
 }
 
@@ -445,165 +422,6 @@ content_decode (SoupMessage *msg, SoupBuffer *buf)
 	return buf;
 }
 
-/* Reads as much message body data as is available on io->sock (but no
- * further than the end of the current message body or chunk). On a
- * successful read, emits "got_chunk" (possibly multiple times), and
- * (unless told not to) appends the chunk to io->read_body.
- *
- * See the note at read_metadata() for an explanation of the return
- * value.
- */
-static gboolean
-read_body_chunk (SoupMessage *msg)
-{
-	SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
-	SoupMessageIOData *io = priv->io_data;
-	SoupSocketIOStatus status;
-	guchar *stack_buf = NULL;
-	gsize len;
-	gboolean read_to_eof = (io->read_encoding == SOUP_ENCODING_EOF);
-	gsize nread;
-	GError *error = NULL;
-	SoupBuffer *buffer;
-
-	if (!io_handle_sniffing (msg, FALSE))
-		return FALSE;
-
-	while (read_to_eof || io->read_length > 0) {
-		if (priv->chunk_allocator) {
-			buffer = priv->chunk_allocator (msg, io->read_length, priv->chunk_allocator_data);
-			if (!buffer) {
-				soup_message_io_pause (msg);
-				return FALSE;
-			}
-		} else {
-			if (!stack_buf)
-				stack_buf = alloca (RESPONSE_BLOCK_SIZE);
-			buffer = soup_buffer_new (SOUP_MEMORY_TEMPORARY,
-						  stack_buf,
-						  RESPONSE_BLOCK_SIZE);
-		}
-
-		if (read_to_eof)
-			len = buffer->length;
-		else
-			len = MIN (buffer->length, io->read_length);
-
-		status = soup_socket_read (io->sock,
-					   (guchar *)buffer->data, len,
-					   &nread, io->cancellable, &error);
-
-		if (status == SOUP_SOCKET_OK && nread) {
-			buffer->length = nread;
-			io->read_length -= nread;
-
-			buffer = content_decode (msg, buffer);
-			if (!buffer)
-				continue;
-
-			soup_message_body_got_chunk (io->read_body, buffer);
-
-			if (io->need_content_sniffed) {
-				soup_message_body_append_buffer (io->sniff_data, buffer);
-				soup_buffer_free (buffer);
-				io->need_got_chunk = TRUE;
-				if (!io_handle_sniffing (msg, FALSE))
-					return FALSE;
-				continue;
-			}
-
-			SOUP_MESSAGE_IO_PREPARE_FOR_CALLBACK;
-			soup_message_got_chunk (msg, buffer);
-			soup_buffer_free (buffer);
-			SOUP_MESSAGE_IO_RETURN_VAL_IF_CANCELLED_OR_PAUSED (FALSE);
-			continue;
-		}
-
-		soup_buffer_free (buffer);
-		switch (status) {
-		case SOUP_SOCKET_OK:
-			break;
-
-		case SOUP_SOCKET_EOF:
-			if (io->read_eof_ok) {
-				io->read_length = 0;
-				return TRUE;
-			}
-			/* else fall through */
-
-		case SOUP_SOCKET_ERROR:
-			io_error (io->sock, msg, error);
-			return FALSE;
-
-		case SOUP_SOCKET_WOULD_BLOCK:
-			return FALSE;
-		}
-	}
-
-	return TRUE;
-}
-
-/* Attempts to write @len bytes from @data. See the note at
- * read_metadata() for an explanation of the return value.
- */
-static gboolean
-write_data (SoupMessage *msg, const char *data, guint len, gboolean body)
-{
-	SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
-	SoupMessageIOData *io = priv->io_data;
-	SoupSocketIOStatus status;
-	gsize nwrote;
-	GError *error = NULL;
-	SoupBuffer *chunk;
-	const char *start;
-
-	while (len > io->written) {
-		status = soup_socket_write (io->sock,
-					    data + io->written,
-					    len - io->written,
-					    &nwrote,
-					    io->cancellable, &error);
-		switch (status) {
-		case SOUP_SOCKET_EOF:
-		case SOUP_SOCKET_ERROR:
-			io_error (io->sock, msg, error);
-			return FALSE;
-
-		case SOUP_SOCKET_WOULD_BLOCK:
-			return FALSE;
-
-		case SOUP_SOCKET_OK:
-			start = data + io->written;
-			io->written += nwrote;
-
-			if (body) {
-				if (io->write_length)
-					io->write_length -= nwrote;
-
-				chunk = soup_buffer_new (SOUP_MEMORY_TEMPORARY,
-							 start, nwrote);
-				SOUP_MESSAGE_IO_PREPARE_FOR_CALLBACK;
-				soup_message_wrote_body_data (msg, chunk);
-				soup_buffer_free (chunk);
-				SOUP_MESSAGE_IO_RETURN_VAL_IF_CANCELLED_OR_PAUSED (FALSE);
-			}
-			break;
-		}
-	}
-
-	io->written = 0;
-	return TRUE;
-}
-
-static inline SoupMessageIOState
-io_body_state (SoupEncoding encoding)
-{
-	if (encoding == SOUP_ENCODING_CHUNKED)
-		return SOUP_MESSAGE_IO_STATE_CHUNK_SIZE;
-	else
-		return SOUP_MESSAGE_IO_STATE_BODY;
-}
-
 /*
  * There are two request/response formats: the basic request/response,
  * possibly with one or more unsolicited informational responses (such
@@ -630,16 +448,24 @@ io_body_state (SoupEncoding encoding)
  *      W:DONE     / R:DONE               R:DONE     / W:DONE
  */
 
-static void
-io_write (SoupSocket *sock, SoupMessage *msg)
+/* Attempts to push forward the writing side of @msg's I/O. Returns
+ * %TRUE if it manages to make some progress, and it is likely that
+ * further progress can be made. Returns %FALSE if it has reached a
+ * stopping point of some sort (need input from the application,
+ * socket not writable, write is complete, etc).
+ */
+static gboolean
+io_write (SoupMessage *msg, GCancellable *cancellable, GError **error)
 {
 	SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
 	SoupMessageIOData *io = priv->io_data;
+	SoupBuffer *chunk;
+	gssize nwrote;
 
- write_more:
 	switch (io->write_state) {
 	case SOUP_MESSAGE_IO_STATE_NOT_STARTED:
-		return;
+	case SOUP_MESSAGE_IO_STATE_BLOCKING:
+		return FALSE;
 
 
 	case SOUP_MESSAGE_IO_STATE_HEADERS:
@@ -647,32 +473,29 @@ io_write (SoupSocket *sock, SoupMessage *msg)
 			io->get_headers_cb (msg, io->write_buf,
 					    &io->write_encoding,
 					    io->header_data);
-			if (!io->write_buf->len) {
-				soup_message_io_pause (msg);
-				return;
-			}
 		}
 
-		if (!write_data (msg, io->write_buf->str,
-				 io->write_buf->len, FALSE))
-			return;
+		while (io->written < io->write_buf->len) {
+			nwrote = g_pollable_stream_write (io->ostream,
+							  io->write_buf->str + io->written,
+							  io->write_buf->len - io->written,
+							  io->blocking,
+							  cancellable, error);
+			if (nwrote == -1)
+				return FALSE;
+			io->written += nwrote;
+		}
 
+		io->written = 0;
 		g_string_truncate (io->write_buf, 0);
 
-		if (io->write_encoding == SOUP_ENCODING_CONTENT_LENGTH) {
-			SoupMessageHeaders *hdrs =
-				(io->mode == SOUP_MESSAGE_IO_CLIENT) ?
-				msg->request_headers : msg->response_headers;
-			io->write_length = soup_message_headers_get_content_length (hdrs);
-		}
-
 		if (io->mode == SOUP_MESSAGE_IO_SERVER &&
 		    SOUP_STATUS_IS_INFORMATIONAL (msg->status_code)) {
 			if (msg->status_code == SOUP_STATUS_CONTINUE) {
 				/* Stop and wait for the body now */
 				io->write_state =
 					SOUP_MESSAGE_IO_STATE_BLOCKING;
-				io->read_state = io_body_state (io->read_encoding);
+				io->read_state = SOUP_MESSAGE_IO_STATE_BODY;
 			} else {
 				/* We just wrote a 1xx response
 				 * header, so stay in STATE_HEADERS.
@@ -682,13 +505,26 @@ io_write (SoupSocket *sock, SoupMessage *msg)
 				 * response.)
 				 */
 			}
-		} else if (io->mode == SOUP_MESSAGE_IO_CLIENT &&
-			   soup_message_headers_get_expectations (msg->request_headers) & SOUP_EXPECTATION_CONTINUE) {
+
+			soup_message_wrote_informational (msg);
+			soup_message_cleanup_response (msg);
+			break;
+		}
+
+		if (io->write_encoding == SOUP_ENCODING_CONTENT_LENGTH) {
+			SoupMessageHeaders *hdrs =
+				(io->mode == SOUP_MESSAGE_IO_CLIENT) ?
+				msg->request_headers : msg->response_headers;
+			io->write_length = soup_message_headers_get_content_length (hdrs);
+		}
+
+		if (io->mode == SOUP_MESSAGE_IO_CLIENT &&
+		    soup_message_headers_get_expectations (msg->request_headers) & SOUP_EXPECTATION_CONTINUE) {
 			/* Need to wait for the Continue response */
 			io->write_state = SOUP_MESSAGE_IO_STATE_BLOCKING;
 			io->read_state = SOUP_MESSAGE_IO_STATE_HEADERS;
 		} else {
-			io->write_state = io_body_state (io->write_encoding);
+			io->write_state = SOUP_MESSAGE_IO_STATE_BODY_START;
 
 			/* If the client was waiting for a Continue
 			 * but we sent something else, then they're
@@ -696,39 +532,26 @@ io_write (SoupSocket *sock, SoupMessage *msg)
 			 */
 			if (io->mode == SOUP_MESSAGE_IO_SERVER &&
 			    io->read_state == SOUP_MESSAGE_IO_STATE_BLOCKING)
-				io->read_state = SOUP_MESSAGE_IO_STATE_FINISHING;
+				io->read_state = SOUP_MESSAGE_IO_STATE_DONE;
 		}
 
-		SOUP_MESSAGE_IO_PREPARE_FOR_CALLBACK;
-		if (SOUP_STATUS_IS_INFORMATIONAL (msg->status_code)) {
-			soup_message_wrote_informational (msg);
-			soup_message_cleanup_response (msg);
-		} else
-			soup_message_wrote_headers (msg);
-		SOUP_MESSAGE_IO_RETURN_IF_CANCELLED_OR_PAUSED;
+		soup_message_wrote_headers (msg);
 		break;
 
 
-	case SOUP_MESSAGE_IO_STATE_BLOCKING:
-		io_read (sock, msg);
-
-		/* If io_read reached a point where we could write
-		 * again, it would have recursively called io_write.
-		 * So (a) we don't need to try to keep writing, and
-		 * (b) we can't anyway, because msg may have been
-		 * destroyed.
-		 */
-		return;
+	case SOUP_MESSAGE_IO_STATE_BODY_START:
+		io->body_ostream = soup_body_output_stream_new (io->ostream,
+								io->write_encoding,
+								io->write_length);
+		io->write_state = SOUP_MESSAGE_IO_STATE_BODY;
+		break;
 
 
 	case SOUP_MESSAGE_IO_STATE_BODY:
-		if (!io->write_length && io->write_encoding != SOUP_ENCODING_EOF) {
-		wrote_body:
-			io->write_state = SOUP_MESSAGE_IO_STATE_FINISHING;
-
-			SOUP_MESSAGE_IO_PREPARE_FOR_CALLBACK;
-			soup_message_wrote_body (msg);
-			SOUP_MESSAGE_IO_RETURN_IF_CANCELLED_OR_PAUSED;
+		if (!io->write_length &&
+		    io->write_encoding != SOUP_ENCODING_EOF &&
+		    io->write_encoding != SOUP_ENCODING_CHUNKED) {
+			io->write_state = SOUP_MESSAGE_IO_STATE_BODY_DONE;
 			break;
 		}
 
@@ -736,164 +559,114 @@ io_write (SoupSocket *sock, SoupMessage *msg)
 			io->write_chunk = soup_message_body_get_chunk (io->write_body, io->write_body_offset);
 			if (!io->write_chunk) {
 				soup_message_io_pause (msg);
-				return;
+				return FALSE;
+			}
+			if (!io->write_chunk->length) {
+				io->write_state = SOUP_MESSAGE_IO_STATE_BODY_DONE;
+				break;
 			}
-			if (io->write_chunk->length > io->write_length &&
-			    io->write_encoding != SOUP_ENCODING_EOF) {
-				/* App is trying to write more than it
-				 * claimed it would; we have to truncate.
-				 */
-				SoupBuffer *truncated =
-					soup_buffer_new_subbuffer (io->write_chunk,
-								   0, io->write_length);
-				soup_buffer_free (io->write_chunk);
-				io->write_chunk = truncated;
-			} else if (io->write_encoding == SOUP_ENCODING_EOF &&
-				   !io->write_chunk->length)
-				goto wrote_body;
 		}
 
-		if (!write_data (msg, io->write_chunk->data,
-				 io->write_chunk->length, TRUE))
-			return;
-
-		if (io->mode == SOUP_MESSAGE_IO_SERVER ||
-		    priv->msg_flags & SOUP_MESSAGE_CAN_REBUILD)
-			soup_message_body_wrote_chunk (io->write_body, io->write_chunk);
-		io->write_body_offset += io->write_chunk->length;
-		soup_buffer_free (io->write_chunk);
-		io->write_chunk = NULL;
+		nwrote = g_pollable_stream_write (io->body_ostream,
+						  io->write_chunk->data + io->written,
+						  io->write_chunk->length - io->written,
+						  io->blocking,
+						  cancellable, error);
+		if (nwrote == -1)
+			return FALSE;
 
-		SOUP_MESSAGE_IO_PREPARE_FOR_CALLBACK;
-		soup_message_wrote_chunk (msg);
-		SOUP_MESSAGE_IO_RETURN_IF_CANCELLED_OR_PAUSED;
-		break;
+		chunk = soup_buffer_new_subbuffer (io->write_chunk,
+						   io->written, nwrote);
+		io->written += nwrote;
+		if (io->write_length)
+			io->write_length -= nwrote;
 
-	case SOUP_MESSAGE_IO_STATE_CHUNK_SIZE:
-		if (!io->write_chunk) {
-			io->write_chunk = soup_message_body_get_chunk (io->write_body, io->write_body_offset);
-			if (!io->write_chunk) {
-				soup_message_io_pause (msg);
-				return;
-			}
-			g_string_append_printf (io->write_buf, "%lx\r\n",
-						(unsigned long) io->write_chunk->length);
-			io->write_body_offset += io->write_chunk->length;
-		}
+		if (io->written == io->write_chunk->length)
+			io->write_state = SOUP_MESSAGE_IO_STATE_BODY_DATA;
 
-		if (!write_data (msg, io->write_buf->str,
-				 io->write_buf->len, FALSE))
-			return;
+		soup_message_wrote_body_data (msg, chunk);
+		soup_buffer_free (chunk);
+		break;
 
-		g_string_truncate (io->write_buf, 0);
 
+	case SOUP_MESSAGE_IO_STATE_BODY_DATA:
+		io->written = 0;
 		if (io->write_chunk->length == 0) {
-			/* The last chunk has no CHUNK_END... */
-			io->write_state = SOUP_MESSAGE_IO_STATE_TRAILERS;
+			io->write_state = SOUP_MESSAGE_IO_STATE_BODY_DONE;
 			break;
 		}
 
-		io->write_state = SOUP_MESSAGE_IO_STATE_CHUNK;
-		/* fall through */
-
-
-	case SOUP_MESSAGE_IO_STATE_CHUNK:
-		if (!write_data (msg, io->write_chunk->data,
-				 io->write_chunk->length, TRUE))
-			return;
-
 		if (io->mode == SOUP_MESSAGE_IO_SERVER ||
 		    priv->msg_flags & SOUP_MESSAGE_CAN_REBUILD)
 			soup_message_body_wrote_chunk (io->write_body, io->write_chunk);
+		io->write_body_offset += io->write_chunk->length;
 		soup_buffer_free (io->write_chunk);
 		io->write_chunk = NULL;
 
-		io->write_state = SOUP_MESSAGE_IO_STATE_CHUNK_END;
-
-		SOUP_MESSAGE_IO_PREPARE_FOR_CALLBACK;
+		io->write_state = SOUP_MESSAGE_IO_STATE_BODY;
 		soup_message_wrote_chunk (msg);
-		SOUP_MESSAGE_IO_RETURN_IF_CANCELLED_OR_PAUSED;
-
-		/* fall through */
-
-
-	case SOUP_MESSAGE_IO_STATE_CHUNK_END:
-		if (!write_data (msg, SOUP_MESSAGE_IO_EOL,
-				 SOUP_MESSAGE_IO_EOL_LEN, FALSE))
-			return;
-
-		io->write_state = SOUP_MESSAGE_IO_STATE_CHUNK_SIZE;
 		break;
 
 
-	case SOUP_MESSAGE_IO_STATE_TRAILERS:
-		if (!write_data (msg, SOUP_MESSAGE_IO_EOL,
-				 SOUP_MESSAGE_IO_EOL_LEN, FALSE))
-			return;
+	case SOUP_MESSAGE_IO_STATE_BODY_DONE:
+		if (io->body_ostream) {
+			if (!g_output_stream_close (io->body_ostream, cancellable, error))
+				return FALSE;
+			g_clear_object (&io->body_ostream);
+		}
 
 		io->write_state = SOUP_MESSAGE_IO_STATE_FINISHING;
-
-		SOUP_MESSAGE_IO_PREPARE_FOR_CALLBACK;
 		soup_message_wrote_body (msg);
-		SOUP_MESSAGE_IO_RETURN_IF_CANCELLED_OR_PAUSED;
-		/* fall through */
+		break;
 
 
 	case SOUP_MESSAGE_IO_STATE_FINISHING:
-		if (io->write_tag) {
-			g_signal_handler_disconnect (io->sock, io->write_tag);
-			io->write_tag = 0;
-		}
 		io->write_state = SOUP_MESSAGE_IO_STATE_DONE;
 
-		if (io->mode == SOUP_MESSAGE_IO_CLIENT) {
+		if (io->mode == SOUP_MESSAGE_IO_CLIENT)
 			io->read_state = SOUP_MESSAGE_IO_STATE_HEADERS;
-			io_read (sock, msg);
-		} else
-			soup_message_io_finished (msg);
-		return;
+		break;
 
 
 	case SOUP_MESSAGE_IO_STATE_DONE:
 	default:
-		g_return_if_reached ();
+		g_return_val_if_reached (FALSE);
 	}
 
-	goto write_more;
+	return TRUE;
 }
 
-static void
-io_read (SoupSocket *sock, SoupMessage *msg)
+/* Attempts to push forward the reading side of @msg's I/O. Returns
+ * %TRUE if it manages to make some progress, and it is likely that
+ * further progress can be made. Returns %FALSE if it has reached a
+ * stopping point of some sort (need input from the application,
+ * socket not readable, read is complete, etc).
+ */
+static gboolean
+io_read (SoupMessage *msg, GCancellable *cancellable, GError **error)
 {
 	SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
 	SoupMessageIOData *io = priv->io_data;
+	guchar *stack_buf = NULL;
+	gssize nread;
+	SoupBuffer *buffer;
 	guint status;
 
- read_more:
 	switch (io->read_state) {
 	case SOUP_MESSAGE_IO_STATE_NOT_STARTED:
-		return;
+	case SOUP_MESSAGE_IO_STATE_BLOCKING:
+		return FALSE;
 
 
 	case SOUP_MESSAGE_IO_STATE_HEADERS:
-		if (!read_metadata (msg, TRUE))
-			return;
-
-		/* We need to "rewind" io->read_meta_buf back one line.
-		 * That SHOULD be two characters (CR LF), but if the
-		 * web server was stupid, it might only be one.
-		 */
-		if (io->read_meta_buf->len < 3 ||
-		    io->read_meta_buf->data[io->read_meta_buf->len - 2] == '\n')
-			io->read_meta_buf->len--;
-		else
-			io->read_meta_buf->len -= 2;
-		io->read_meta_buf->data[io->read_meta_buf->len] = '\0';
-		status = io->parse_headers_cb (msg, (char *)io->read_meta_buf->data,
-					       io->read_meta_buf->len,
+		if (!read_headers (msg, cancellable, error))
+			return FALSE;
+
+		status = io->parse_headers_cb (msg, (char *)io->read_header_buf->data,
+					       io->read_header_buf->len,
 					       &io->read_encoding,
 					       io->header_data);
-		g_byte_array_set_size (io->read_meta_buf, 0);
+		g_byte_array_set_size (io->read_header_buf, 0);
 
 		if (status != SOUP_STATUS_OK) {
 			/* Either we couldn't parse the headers, or they
@@ -910,26 +683,6 @@ io_read (SoupSocket *sock, SoupMessage *msg)
 			break;
 		}
 
-		if (io->read_encoding == SOUP_ENCODING_EOF)
-			io->read_eof_ok = TRUE;
-
-		if (io->read_encoding == SOUP_ENCODING_CONTENT_LENGTH) {
-			SoupMessageHeaders *hdrs =
-				(io->mode == SOUP_MESSAGE_IO_CLIENT) ?
-				msg->response_headers : msg->request_headers;
-			io->read_length = soup_message_headers_get_content_length (hdrs);
-
-			if (io->mode == SOUP_MESSAGE_IO_CLIENT &&
-			    !soup_message_is_keepalive (msg)) {
-				/* Some servers suck and send
-				 * incorrect Content-Length values, so
-				 * allow EOF termination in this case
-				 * (iff the message is too short) too.
-				 */
-				io->read_eof_ok = TRUE;
-			}
-		}
-
 		if (io->mode == SOUP_MESSAGE_IO_CLIENT &&
 		    SOUP_STATUS_IS_INFORMATIONAL (msg->status_code)) {
 			if (msg->status_code == SOUP_STATUS_CONTINUE &&
@@ -938,11 +691,18 @@ io_read (SoupSocket *sock, SoupMessage *msg)
 				io->read_state =
 					SOUP_MESSAGE_IO_STATE_BLOCKING;
 				io->write_state =
-					io_body_state (io->write_encoding);
+					SOUP_MESSAGE_IO_STATE_BODY_START;
 			} else {
 				/* Just stay in HEADERS */
 				io->read_state = SOUP_MESSAGE_IO_STATE_HEADERS;
 			}
+
+			/* Informational responses have no bodies, so
+			 * bail out here rather than parsing encoding, etc
+			 */
+			soup_message_got_informational (msg);
+			soup_message_cleanup_response (msg);
+			break;
 		} else if (io->mode == SOUP_MESSAGE_IO_SERVER &&
 			   soup_message_headers_get_expectations (msg->request_headers) & SOUP_EXPECTATION_CONTINUE) {
 			/* The client requested a Continue response. The
@@ -953,7 +713,7 @@ io_read (SoupSocket *sock, SoupMessage *msg)
 			io->write_state = SOUP_MESSAGE_IO_STATE_HEADERS;
 			io->read_state = SOUP_MESSAGE_IO_STATE_BLOCKING;
 		} else {
-			io->read_state = io_body_state (io->read_encoding);
+			io->read_state = SOUP_MESSAGE_IO_STATE_BODY;
 
 			/* If the client was waiting for a Continue
 			 * but got something else, then it's done
@@ -964,121 +724,187 @@ io_read (SoupSocket *sock, SoupMessage *msg)
 				io->write_state = SOUP_MESSAGE_IO_STATE_FINISHING;
 		}
 
-		if (io->mode == SOUP_MESSAGE_IO_CLIENT &&
-		    SOUP_STATUS_IS_INFORMATIONAL (msg->status_code)) {
-			SOUP_MESSAGE_IO_PREPARE_FOR_CALLBACK;
-			soup_message_got_informational (msg);
-			soup_message_cleanup_response (msg);
-			SOUP_MESSAGE_IO_RETURN_IF_CANCELLED_OR_PAUSED;
-		} else {
-			SOUP_MESSAGE_IO_PREPARE_FOR_CALLBACK;
-			soup_message_got_headers (msg);
-			SOUP_MESSAGE_IO_RETURN_IF_CANCELLED_OR_PAUSED;
-		}
-		break;
-
+		if (io->read_encoding == SOUP_ENCODING_CONTENT_LENGTH) {
+			SoupMessageHeaders *hdrs =
+				(io->mode == SOUP_MESSAGE_IO_CLIENT) ?
+				msg->response_headers : msg->request_headers;
+			io->read_length = soup_message_headers_get_content_length (hdrs);
 
-	case SOUP_MESSAGE_IO_STATE_BLOCKING:
-		io_write (sock, msg);
+			if (io->mode == SOUP_MESSAGE_IO_CLIENT &&
+			    !soup_message_is_keepalive (msg)) {
+				/* Some servers suck and send
+				 * incorrect Content-Length values, so
+				 * allow EOF termination in this case
+				 * (iff the message is too short) too.
+				 */
+				io->read_encoding = SOUP_ENCODING_EOF;
+			}
+		} else
+			io->read_length = -1;
 
-		/* As in the io_write case, we *must* return here. */
-		return;
+		io->body_istream = soup_body_input_stream_new (SOUP_FILTER_INPUT_STREAM (io->istream),
+							       io->read_encoding,
+							       io->read_length);
+		soup_message_got_headers (msg);
+		break;
 
 
 	case SOUP_MESSAGE_IO_STATE_BODY:
-		if (!read_body_chunk (msg))
-			return;
-
-	got_body:
-		if (!io_handle_sniffing (msg, TRUE)) {
-			/* If the message was paused (as opposed to
-			 * cancelled), we need to make sure we wind up
-			 * back here when it's unpaused, even if it
-			 * was doing a chunked or EOF-terminated read
-			 * before.
-			 */
-			if (io == priv->io_data) {
-				io->read_state = SOUP_MESSAGE_IO_STATE_BODY;
-				io->read_encoding = SOUP_ENCODING_CONTENT_LENGTH;
-				io->read_length = 0;
+		if (!io_handle_sniffing (msg, FALSE))
+			return FALSE;
+
+		if (priv->chunk_allocator) {
+			buffer = priv->chunk_allocator (msg, io->read_length, priv->chunk_allocator_data);
+			if (!buffer) {
+				soup_message_io_pause (msg);
+				return FALSE;
 			}
-			return;
+		} else {
+			if (!stack_buf)
+				stack_buf = alloca (RESPONSE_BLOCK_SIZE);
+			buffer = soup_buffer_new (SOUP_MEMORY_TEMPORARY,
+						  stack_buf,
+						  RESPONSE_BLOCK_SIZE);
 		}
 
-		io->read_state = SOUP_MESSAGE_IO_STATE_FINISHING;
+		nread = g_pollable_stream_read (io->body_istream,
+						(guchar *)buffer->data,
+						buffer->length,
+						io->blocking,
+						cancellable, error);
+		if (nread > 0) {
+			buffer->length = nread;
+			buffer = content_decode (msg, buffer);
+			if (!buffer)
+				break;
 
-		SOUP_MESSAGE_IO_PREPARE_FOR_CALLBACK;
-		soup_message_got_body (msg);
-		SOUP_MESSAGE_IO_RETURN_IF_CANCELLED_OR_PAUSED;
-		break;
+			soup_message_body_got_chunk (io->read_body, buffer);
 
+			if (io->need_content_sniffed) {
+				soup_message_body_append_buffer (io->sniff_data, buffer);
+				soup_buffer_free (buffer);
+				io->need_got_chunk = TRUE;
+				if (!io_handle_sniffing (msg, FALSE))
+					return FALSE;
+				break;
+			}
 
-	case SOUP_MESSAGE_IO_STATE_CHUNK_SIZE:
-		if (!read_metadata (msg, FALSE))
-			return;
+			soup_message_got_chunk (msg, buffer);
+			soup_buffer_free (buffer);
+			break;
+		}
 
-		io->read_length = strtoul ((char *)io->read_meta_buf->data, NULL, 16);
-		g_byte_array_set_size (io->read_meta_buf, 0);
+		soup_buffer_free (buffer);
+		if (nread == -1)
+			return FALSE;
 
-		if (io->read_length > 0)
-			io->read_state = SOUP_MESSAGE_IO_STATE_CHUNK;
-		else
-			io->read_state = SOUP_MESSAGE_IO_STATE_TRAILERS;
+		/* else nread == 0 */
+		io->read_state = SOUP_MESSAGE_IO_STATE_BODY_DONE;
 		break;
 
 
-	case SOUP_MESSAGE_IO_STATE_CHUNK:
-		if (!read_body_chunk (msg))
-			return;
+	case SOUP_MESSAGE_IO_STATE_BODY_DONE:
+		if (!io_handle_sniffing (msg, TRUE))
+			return FALSE;
 
-		io->read_state = SOUP_MESSAGE_IO_STATE_CHUNK_END;
+		io->read_state = SOUP_MESSAGE_IO_STATE_FINISHING;
+		soup_message_got_body (msg);
 		break;
 
 
-	case SOUP_MESSAGE_IO_STATE_CHUNK_END:
-		if (!read_metadata (msg, FALSE))
-			return;
+	case SOUP_MESSAGE_IO_STATE_FINISHING:
+		io->read_state = SOUP_MESSAGE_IO_STATE_DONE;
 
-		g_byte_array_set_size (io->read_meta_buf, 0);
-		io->read_state = SOUP_MESSAGE_IO_STATE_CHUNK_SIZE;
+		if (io->mode == SOUP_MESSAGE_IO_SERVER)
+			io->write_state = SOUP_MESSAGE_IO_STATE_HEADERS;
 		break;
 
 
-	case SOUP_MESSAGE_IO_STATE_TRAILERS:
-		if (!read_metadata (msg, FALSE))
-			return;
+	case SOUP_MESSAGE_IO_STATE_DONE:
+	default:
+		g_return_val_if_reached (FALSE);
+	}
 
-		if (io->read_meta_buf->len <= SOUP_MESSAGE_IO_EOL_LEN)
-			goto got_body;
+	return TRUE;
+}
 
-		/* FIXME: process trailers */
-		g_byte_array_set_size (io->read_meta_buf, 0);
-		break;
+static GSource *
+soup_message_io_get_source (SoupMessage *msg, GCancellable *cancellable,
+			    GSourceFunc callback, gpointer user_data)
+{
+	SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
+	SoupMessageIOData *io = priv->io_data;
+	GSource *source;
+
+	if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->read_state)) {
+		source = g_pollable_input_stream_create_source (
+			G_POLLABLE_INPUT_STREAM (io->istream), cancellable);
+	} else if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->write_state)) {
+		source = g_pollable_output_stream_create_source (
+			G_POLLABLE_OUTPUT_STREAM (io->ostream), cancellable);
+	} else
+		g_return_val_if_reached (NULL);
 
+	g_source_set_callback (source, callback, user_data, NULL);
+	return source;
+}
 
-	case SOUP_MESSAGE_IO_STATE_FINISHING:
-		if (io->read_tag) {
-			g_signal_handler_disconnect (io->sock, io->read_tag);
-			io->read_tag = 0;
-		}
-		io->read_state = SOUP_MESSAGE_IO_STATE_DONE;
+static gboolean io_run (GObject *stream, SoupMessage *msg);
 
-		if (io->mode == SOUP_MESSAGE_IO_SERVER) {
-			io->write_state = SOUP_MESSAGE_IO_STATE_HEADERS;
-			io_write (sock, msg);
-		} else
-			soup_message_io_finished (msg);
-		return;
+static void
+setup_io_source (SoupMessage *msg)
+{
+	SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
+	SoupMessageIOData *io = priv->io_data;
 
+	io->io_source = soup_message_io_get_source (msg, NULL,
+						    (GSourceFunc)io_run, msg);
+	g_source_attach (io->io_source, io->async_context);
+	g_source_unref (io->io_source);
+}
 
-	case SOUP_MESSAGE_IO_STATE_DONE:
-	default:
-		g_return_if_reached ();
+static gboolean
+io_run (GObject *stream, SoupMessage *msg)
+{
+	SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
+	SoupMessageIOData *io = priv->io_data;
+	GError *error = NULL;
+
+	if (io->io_source) {
+		g_source_destroy (io->io_source);
+		io->io_source = NULL;
+	}
+
+	g_object_ref (msg);
+
+	while (priv->io_data == io && !io->paused) {
+		gboolean progress = FALSE;
+
+		if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->read_state))
+			progress = io_read (msg, io->cancellable, &error);
+		else if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->write_state))
+			progress = io_write (msg, io->cancellable, &error);
+
+		if (!progress)
+			break;
 	}
 
-	goto read_more;
+	if (error) {
+		if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
+			g_clear_error (&error);
+			setup_io_source (msg);
+		} else
+			io_error (io->sock, msg, error);
+	} else if (priv->io_data == io &&
+		   io->read_state == SOUP_MESSAGE_IO_STATE_DONE &&
+		   io->write_state == SOUP_MESSAGE_IO_STATE_DONE)
+		soup_message_io_finished (msg);
+
+	g_object_unref (msg);
+	return FALSE;
 }
 
+
 static SoupMessageIOData *
 new_iostate (SoupMessage *msg, SoupSocket *sock, SoupMessageIOMode mode,
 	     SoupMessageGetHeadersFn get_headers_cb,
@@ -1089,9 +915,9 @@ new_iostate (SoupMessage *msg, SoupSocket *sock, SoupMessageIOMode mode,
 {
 	SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
 	SoupMessageIOData *io;
+	gboolean non_blocking, use_thread_context;
 
 	io = g_slice_new0 (SoupMessageIOData);
-	io->sock = g_object_ref (sock);
 	io->mode = mode;
 	io->get_headers_cb   = get_headers_cb;
 	io->parse_headers_cb = parse_headers_cb;
@@ -1099,13 +925,32 @@ new_iostate (SoupMessage *msg, SoupSocket *sock, SoupMessageIOMode mode,
 	io->completion_cb    = completion_cb;
 	io->completion_data  = completion_data;
 
-	io->read_meta_buf    = g_byte_array_new ();
-	io->write_buf        = g_string_new (NULL);
+	io->sock = g_object_ref (sock);
+	io->istream = SOUP_FILTER_INPUT_STREAM (soup_socket_get_input_stream (sock));
+	if (io->istream)
+		g_object_add_weak_pointer (G_OBJECT (io->istream), (gpointer *)&io->istream);
+	io->ostream = soup_socket_get_output_stream (sock);
+	if (io->ostream)
+		g_object_add_weak_pointer (G_OBJECT (io->ostream), (gpointer *)&io->ostream);
+
+	g_object_get (io->sock,
+		      SOUP_SOCKET_FLAG_NONBLOCKING, &non_blocking,
+		      SOUP_SOCKET_USE_THREAD_CONTEXT, &use_thread_context,
+		      NULL);
+	io->blocking = !non_blocking;
+
+	if (use_thread_context) {
+		io->async_context = g_main_context_get_thread_default ();
+		if (io->async_context)
+			g_main_context_ref (io->async_context);
+	} else {
+		g_object_get (io->sock,
+			      SOUP_SOCKET_ASYNC_CONTEXT, &io->async_context,
+			      NULL);
+	}
 
-	io->read_tag  = g_signal_connect (io->sock, "readable",
-					  G_CALLBACK (io_read), msg);
-	io->write_tag = g_signal_connect (io->sock, "writable",
-					  G_CALLBACK (io_write), msg);
+	io->read_header_buf = g_byte_array_new ();
+	io->write_buf       = g_string_new (NULL);
 
 	io->read_state  = SOUP_MESSAGE_IO_STATE_NOT_STARTED;
 	io->write_state = SOUP_MESSAGE_IO_STATE_NOT_STARTED;
@@ -1139,7 +984,7 @@ soup_message_io_client (SoupMessageQueueItem *item,
 	io->write_body      = item->msg->request_body;
 
 	io->write_state     = SOUP_MESSAGE_IO_STATE_HEADERS;
-	io_write (sock, item->msg);
+	io_run (NULL, item->msg);
 }
 
 void
@@ -1160,7 +1005,7 @@ soup_message_io_server (SoupMessage *msg, SoupSocket *sock,
 	io->write_body      = msg->response_body;
 
 	io->read_state      = SOUP_MESSAGE_IO_STATE_HEADERS;
-	io_read (sock, msg);
+	io_run (NULL, msg);
 }
 
 void  
@@ -1171,19 +1016,17 @@ soup_message_io_pause (SoupMessage *msg)
 
 	g_return_if_fail (io != NULL);
 
-	if (io->write_tag) {
-		g_signal_handler_disconnect (io->sock, io->write_tag);
-		io->write_tag = 0;
-	}
-	if (io->read_tag) {
-		g_signal_handler_disconnect (io->sock, io->read_tag);
-		io->read_tag = 0;
+	if (io->io_source) {
+		g_source_destroy (io->io_source);
+		io->io_source = NULL;
 	}
 
 	if (io->unpause_source) {
 		g_source_destroy (io->unpause_source);
 		io->unpause_source = NULL;
 	}
+
+	io->paused = TRUE;
 }
 
 static gboolean
@@ -1194,25 +1037,12 @@ io_unpause_internal (gpointer msg)
 
 	g_return_val_if_fail (io != NULL, FALSE);
 	io->unpause_source = NULL;
+	io->paused = FALSE;
 
-	if (io->write_tag || io->read_tag)
+	if (io->io_source)
 		return FALSE;
 
-	if (io->write_state != SOUP_MESSAGE_IO_STATE_DONE) {
-		io->write_tag = g_signal_connect (io->sock, "writable",
-						  G_CALLBACK (io_write), msg);
-	}
-
-	if (io->read_state != SOUP_MESSAGE_IO_STATE_DONE) {
-		io->read_tag = g_signal_connect (io->sock, "readable",
-						 G_CALLBACK (io_read), msg);
-	}
-
-	if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->write_state))
-		io_write (io->sock, msg);
-	else if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->read_state))
-		io_read (io->sock, msg);
-
+	io_run (NULL, msg);
 	return FALSE;
 }
 
@@ -1221,32 +1051,16 @@ soup_message_io_unpause (SoupMessage *msg)
 {
 	SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
 	SoupMessageIOData *io = priv->io_data;
-	gboolean non_blocking, use_thread_context;
-	GMainContext *async_context;
 
 	g_return_if_fail (io != NULL);
 
-	g_object_get (io->sock,
-		      SOUP_SOCKET_FLAG_NONBLOCKING, &non_blocking,
-		      SOUP_SOCKET_USE_THREAD_CONTEXT, &use_thread_context,
-		      NULL);
-	if (use_thread_context)
-		async_context = g_main_context_ref_thread_default ();
-	else {
-		g_object_get (io->sock,
-			      SOUP_SOCKET_ASYNC_CONTEXT, &async_context,
-			      NULL);
-	}
-
-	if (non_blocking) {
+	if (!io->blocking) {
 		if (!io->unpause_source) {
 			io->unpause_source = soup_add_completion (
-				async_context, io_unpause_internal, msg);
+				io->async_context, io_unpause_internal, msg);
 		}
 	} else
 		io_unpause_internal (msg);
-	if (async_context)
-		g_main_context_unref (async_context);
 }
 
 /**
diff --git a/libsoup/soup-socket.c b/libsoup/soup-socket.c
index 049be92..2d72b38 100644
--- a/libsoup/soup-socket.c
+++ b/libsoup/soup-socket.c
@@ -129,8 +129,6 @@ disconnect_internal (SoupSocket *sock, gboolean close)
 		if (G_IS_TLS_CONNECTION (priv->conn))
 			g_signal_handlers_disconnect_by_func (priv->conn, soup_socket_peer_certificate_changed, sock);
 		g_clear_object (&priv->conn);
-		g_clear_object (&priv->istream);
-		g_clear_object (&priv->ostream);
 	}
 
 	if (priv->read_src) {
@@ -1325,6 +1323,22 @@ soup_socket_get_remote_address (SoupSocket *sock)
 	return priv->remote_addr;
 }
 
+GInputStream *
+soup_socket_get_input_stream (SoupSocket *sock)
+{
+	g_return_val_if_fail (SOUP_IS_SOCKET (sock), NULL);
+
+	return SOUP_SOCKET_GET_PRIVATE (sock)->istream;
+}
+
+GOutputStream *
+soup_socket_get_output_stream (SoupSocket *sock)
+{
+	g_return_val_if_fail (SOUP_IS_SOCKET (sock), NULL);
+
+	return SOUP_SOCKET_GET_PRIVATE (sock)->ostream;
+}
+
 
 static gboolean
 socket_read_watch (GObject *pollable, gpointer user_data)
diff --git a/libsoup/soup-socket.h b/libsoup/soup-socket.h
index dc6b59c..5cbf14a 100644
--- a/libsoup/soup-socket.h
+++ b/libsoup/soup-socket.h
@@ -85,6 +85,8 @@ gboolean       soup_socket_is_connected       (SoupSocket         *sock);
 SoupAddress   *soup_socket_get_local_address  (SoupSocket         *sock);
 SoupAddress   *soup_socket_get_remote_address (SoupSocket         *sock);
 
+GInputStream  *soup_socket_get_input_stream   (SoupSocket         *sock);
+GOutputStream *soup_socket_get_output_stream  (SoupSocket         *sock);
 
 typedef enum {
 	SOUP_SOCKET_OK,
diff --git a/po/POTFILES.in b/po/POTFILES.in
index c43b943..4115bb0 100644
--- a/po/POTFILES.in
+++ b/po/POTFILES.in
@@ -1,2 +1,3 @@
+libsoup/soup-body-input-stream.c
 libsoup/soup-request.c
 libsoup/soup-requester.c
diff --git a/tests/chunk-test.c b/tests/chunk-test.c
index 3805fb7..c3eecc4 100644
--- a/tests/chunk-test.c
+++ b/tests/chunk-test.c
@@ -21,15 +21,6 @@ typedef struct {
 	gboolean streaming;
 } PutTestData;
 
-static SoupBuffer *
-error_chunk_allocator (SoupMessage *msg, gsize max_len, gpointer user_data)
-{
-	/* This should never be called, because there is no response body. */
-	debug_printf (1, "  error_chunk_allocator called!\n");
-	errors++;
-	return soup_buffer_new (SOUP_MEMORY_TAKE, g_malloc (100), 100);
-}
-
 static void
 write_next_chunk (SoupMessage *msg, gpointer user_data)
 {
@@ -191,7 +182,6 @@ do_request_test (SoupSession *session, SoupURI *base_uri, RequestTestFlags flags
 	msg = soup_message_new_from_uri ("PUT", uri);
 	soup_message_headers_set_encoding (msg->request_headers, SOUP_ENCODING_CHUNKED);
 	soup_message_body_set_accumulate (msg->request_body, FALSE);
-	soup_message_set_chunk_allocator (msg, error_chunk_allocator, NULL, NULL);
 	if (flags & HACKY_STREAMING) {
 		g_signal_connect (msg, "wrote_chunk",
 				  G_CALLBACK (write_next_chunk_streaming_hack), &ptd);
diff --git a/tests/connection-test.c b/tests/connection-test.c
index 545bf10..7c6fb5a 100644
--- a/tests/connection-test.c
+++ b/tests/connection-test.c
@@ -23,11 +23,20 @@ static void
 close_socket (SoupMessage *msg, gpointer user_data)
 {
 	SoupSocket *sock = user_data;
+	int sockfd;
 
-	soup_socket_disconnect (sock);
-
-	/* But also add the missing data to the message now, so
-	 * SoupServer can clean up after itself properly.
+	/* Actually calling soup_socket_disconnect() here would cause
+	 * us to leak memory, so just shutdown the socket instead.
+	 */
+	sockfd = soup_socket_get_fd (sock);
+#ifdef G_OS_WIN32
+	shutdown (sockfd, SD_SEND);
+#else
+	shutdown (sockfd, SHUT_WR);
+#endif
+
+	/* Then add the missing data to the message now, so SoupServer
+	 * can clean up after itself properly.
 	 */
 	soup_message_body_append (msg->response_body, SOUP_MEMORY_STATIC,
 				  "foo", 3);



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]