[libsoup/giobased: 3/11] Push chunked decoding into SoupInputStream



commit 028024a7daf5b57c7fa77dab214dff738f0ebe19
Author: Dan Winship <danw gnome org>
Date:   Thu Dec 9 11:26:49 2010 +0100

    Push chunked decoding into SoupInputStream

 libsoup/soup-input-stream.c |  246 +++++++++++++++++++++++++++++++++++++------
 libsoup/soup-input-stream.h |    5 +
 libsoup/soup-message-io.c   |  248 ++++++++++++++++++-------------------------
 3 files changed, 320 insertions(+), 179 deletions(-)
---
diff --git a/libsoup/soup-input-stream.c b/libsoup/soup-input-stream.c
index 3f334df..2525126 100644
--- a/libsoup/soup-input-stream.c
+++ b/libsoup/soup-input-stream.c
@@ -9,14 +9,28 @@
 #include <config.h>
 #endif
 
+#include <stdlib.h>
 #include <string.h>
 #include <gio/gio.h>
 
 #include "soup-input-stream.h"
+#include "soup-message-headers.h"
+
+typedef enum {
+	SOUP_INPUT_STREAM_STATE_CHUNK_SIZE,
+	SOUP_INPUT_STREAM_STATE_CHUNK_END,
+	SOUP_INPUT_STREAM_STATE_CHUNK,
+	SOUP_INPUT_STREAM_STATE_TRAILERS,
+	SOUP_INPUT_STREAM_STATE_DONE
+} SoupInputStreamState;
 
 struct _SoupInputStreamPrivate {
 	GInputStream *base_stream;
-	GByteArray   *read_buf;
+	GByteArray   *buf;
+
+	SoupEncoding  encoding;
+	goffset       read_length;
+	SoupInputStreamState chunked_state;
 };
 
 static void soup_input_stream_pollable_init (GPollableInputStreamInterface *pollable_interface, gpointer interface_data);
@@ -25,6 +39,7 @@ G_DEFINE_TYPE_WITH_CODE (SoupInputStream, soup_input_stream, G_TYPE_FILTER_INPUT
 			 G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM,
 						soup_input_stream_pollable_init))
 
+
 static void
 soup_input_stream_init (SoupInputStream *stream)
 {
@@ -46,8 +61,8 @@ finalize (GObject *object)
 {
 	SoupInputStream *sstream = SOUP_INPUT_STREAM (object);
 
-	if (sstream->priv->read_buf)
-		g_byte_array_free (sstream->priv->read_buf, TRUE);
+	if (sstream->priv->buf)
+		g_byte_array_free (sstream->priv->buf, TRUE);
 
 	G_OBJECT_CLASS (soup_input_stream_parent_class)->finalize (object);
 }
@@ -55,25 +70,126 @@ finalize (GObject *object)
 static gssize
 read_from_buf (SoupInputStream *sstream, gpointer buffer, gsize count)
 {
-	GByteArray *read_buf = sstream->priv->read_buf;
+	GByteArray *buf = sstream->priv->buf;
 
-	if (read_buf->len < count)
-		count = read_buf->len;
-	memcpy (buffer, read_buf->data, count);
+	if (buf->len < count)
+		count = buf->len;
+	memcpy (buffer, buf->data, count);
 
-	if (count == read_buf->len) {
-		g_byte_array_free (read_buf, TRUE);
-		sstream->priv->read_buf = NULL;
+	if (count == buf->len) {
+		g_byte_array_free (buf, TRUE);
+		sstream->priv->buf = NULL;
 	} else {
-		memmove (read_buf->data, read_buf->data + count,
-			 read_buf->len - count);
-		g_byte_array_set_size (read_buf, read_buf->len - count);
+		memmove (buf->data, buf->data + count,
+			 buf->len - count);
+		g_byte_array_set_size (buf, buf->len - count);
 	}
 
 	return count;
 }
 
 static gssize
+soup_input_stream_read_raw (SoupInputStream  *sstream,
+			    void             *buffer,
+			    gsize             count,
+			    gboolean          blocking,
+			    GCancellable     *cancellable,
+			    GError          **error)
+{
+	gssize nread;
+
+	if (sstream->priv->buf) {
+		return read_from_buf (sstream, buffer, count);
+	} else if (blocking) {
+		nread = g_input_stream_read (sstream->priv->base_stream,
+					     buffer, count,
+					     cancellable, error);
+	} else {
+		nread = g_pollable_input_stream_read_nonblocking (
+			G_POLLABLE_INPUT_STREAM (sstream->priv->base_stream),
+			buffer, count, cancellable, error);
+	}
+
+	if (nread == 0 && sstream->priv->encoding != SOUP_ENCODING_EOF) {
+		g_set_error (error, G_IO_ERROR, G_IO_ERROR_PARTIAL_INPUT, NULL);
+		return -1;
+	}
+	return nread;
+}
+
+static gssize
+soup_input_stream_read_chunked (SoupInputStream  *sstream,
+				void             *buffer,
+				gsize             count,
+				gboolean          blocking,
+				GCancellable     *cancellable,
+				GError          **error)
+{
+	char metabuf[128];
+	gssize nread;
+
+again:
+	switch (sstream->priv->chunked_state) {
+	case SOUP_INPUT_STREAM_STATE_CHUNK_SIZE:
+	case SOUP_INPUT_STREAM_STATE_CHUNK_END:
+		if (blocking) {
+			nread = soup_input_stream_read_line (
+				sstream, metabuf, sizeof (metabuf),
+				cancellable, error);
+		} else {
+			nread = soup_input_stream_read_line_nonblocking (
+				sstream, metabuf, sizeof (metabuf),
+				cancellable, error);
+		}
+		if (nread <= 0)
+			return nread;
+		if (metabuf[nread - 1] != '\n')
+			return -1;
+
+		if (sstream->priv->chunked_state == SOUP_INPUT_STREAM_STATE_CHUNK_SIZE) {
+			sstream->priv->read_length = strtoul (metabuf, NULL, 16);
+			if (sstream->priv->read_length > 0)
+				sstream->priv->chunked_state = SOUP_INPUT_STREAM_STATE_CHUNK;
+			else
+				sstream->priv->chunked_state = SOUP_INPUT_STREAM_STATE_TRAILERS;
+		} else
+			sstream->priv->chunked_state = SOUP_INPUT_STREAM_STATE_CHUNK_SIZE;
+		break;
+
+	case SOUP_INPUT_STREAM_STATE_CHUNK:
+		nread = soup_input_stream_read_raw (sstream, buffer,
+						    MIN (count, sstream->priv->read_length),
+						    blocking, cancellable, error);
+		if (nread > 0) {
+			sstream->priv->read_length -= nread;
+			if (sstream->priv->read_length == 0)
+				sstream->priv->chunked_state = SOUP_INPUT_STREAM_STATE_CHUNK_END;
+		}
+		return nread;
+
+	case SOUP_INPUT_STREAM_STATE_TRAILERS:
+		if (blocking) {
+			nread = soup_input_stream_read_line (
+				sstream, buffer, count, cancellable, error);
+		} else {
+			nread = soup_input_stream_read_line_nonblocking (
+				sstream, buffer, count, cancellable, error);
+		}
+		if (nread <= 0)
+			return nread;
+
+		if (strncmp (buffer, "\r\n", nread) || strncmp (buffer, "\n", nread))
+			sstream->priv->chunked_state = SOUP_INPUT_STREAM_STATE_DONE;
+		break;
+		
+	case SOUP_INPUT_STREAM_STATE_DONE:
+		return 0;
+	}
+
+	goto again;
+}
+
+static gssize
 soup_input_stream_read (GInputStream  *stream,
 			void          *buffer,
 			gsize          count,
@@ -81,13 +197,31 @@ soup_input_stream_read (GInputStream  *stream,
 			GError       **error)
 {
 	SoupInputStream *sstream = SOUP_INPUT_STREAM (stream);
+	gssize nread;
 
-	if (sstream->priv->read_buf) {
-		return read_from_buf (sstream, buffer, count);
-	} else {
-		return g_input_stream_read (sstream->priv->base_stream,
-					    buffer, count,
-					    cancellable, error);
+	switch (sstream->priv->encoding) {
+	case SOUP_ENCODING_CHUNKED:
+		return soup_input_stream_read_chunked (sstream, buffer, count,
+						       TRUE, cancellable,
+						       error);
+
+	case SOUP_ENCODING_CONTENT_LENGTH:
+		count = MIN (count, sstream->priv->read_length);
+		if (count == 0)
+			return 0;
+		nread = soup_input_stream_read_raw (sstream, buffer, count,
+						    TRUE, cancellable, error);
+		if (nread > 0)
+			sstream->priv->read_length -= nread;
+		return nread;
+
+	case SOUP_ENCODING_EOF:
+		return soup_input_stream_read_raw (sstream, buffer, count,
+						   TRUE, cancellable, error);
+
+	case SOUP_ENCODING_NONE:
+	default:
+		return 0;
 	}
 }
 
@@ -96,7 +230,7 @@ soup_input_stream_is_readable (GPollableInputStream *stream)
 {
 	SoupInputStream *sstream = SOUP_INPUT_STREAM (stream);
 
-	if (sstream->priv->read_buf)
+	if (sstream->priv->buf)
 		return TRUE;
 	else
 		return g_pollable_input_stream_is_readable (G_POLLABLE_INPUT_STREAM (sstream->priv->base_stream));
@@ -109,7 +243,7 @@ soup_input_stream_create_source (GPollableInputStream *stream,
 	SoupInputStream *sstream = SOUP_INPUT_STREAM (stream);
 	GSource *base_source, *pollable_source;
 
-	if (sstream->priv->read_buf)
+	if (sstream->priv->buf)
 		base_source = g_idle_source_new ();
 	else
 		base_source = g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM (sstream->priv->base_stream), cancellable);
@@ -122,6 +256,40 @@ soup_input_stream_create_source (GPollableInputStream *stream,
 	return pollable_source;
 }
 
+static gssize
+soup_input_stream_read_nonblocking (GPollableInputStream  *stream,
+				    void          *buffer,
+				    gsize          count,
+				    GError       **error)
+{
+	SoupInputStream *sstream = SOUP_INPUT_STREAM (stream);
+	gssize nread;
+
+	switch (sstream->priv->encoding) {
+	case SOUP_ENCODING_CHUNKED:
+		return soup_input_stream_read_chunked (sstream, buffer, count,
+						       FALSE, NULL, error);
+
+	case SOUP_ENCODING_CONTENT_LENGTH:
+		count = MIN (count, sstream->priv->read_length);
+		if (count == 0)
+			return 0;
+		nread = soup_input_stream_read_raw (sstream, buffer, count,
+						    FALSE, NULL, error);
+		if (nread > 0)
+			sstream->priv->read_length -= nread;
+		return nread;
+
+	case SOUP_ENCODING_EOF:
+		return soup_input_stream_read_raw (sstream, buffer, count,
+						   FALSE, NULL, error);
+
+	case SOUP_ENCODING_NONE:
+	default:
+		return 0;
+	}
+}
+
 static void
 soup_input_stream_class_init (SoupInputStreamClass *stream_class)
 {
@@ -142,6 +310,7 @@ soup_input_stream_pollable_init (GPollableInputStreamInterface *pollable_interfa
 {
 	pollable_interface->is_readable = soup_input_stream_is_readable;
 	pollable_interface->create_source = soup_input_stream_create_source;
+	pollable_interface->read_nonblocking = soup_input_stream_read_nonblocking;
 }
 
 GInputStream *
@@ -153,6 +322,17 @@ soup_input_stream_new (GInputStream *base_stream)
 			     NULL);
 }
 
+void
+soup_input_stream_set_encoding (SoupInputStream *sstream,
+				SoupEncoding     encoding,
+				goffset          content_length)
+{
+	sstream->priv->encoding = encoding;
+	sstream->priv->read_length = content_length;
+	if (encoding == SOUP_ENCODING_CHUNKED)
+		sstream->priv->chunked_state = SOUP_INPUT_STREAM_STATE_CHUNK_SIZE;
+}
+
 gssize
 soup_input_stream_read_line (SoupInputStream       *sstream,
 			     void                  *buffer,
@@ -165,11 +345,11 @@ soup_input_stream_read_line (SoupInputStream       *sstream,
 
 	g_return_val_if_fail (SOUP_IS_INPUT_STREAM (sstream), -1);
 
-	if (sstream->priv->read_buf) {
-		GByteArray *read_buf = sstream->priv->read_buf;
+	if (sstream->priv->buf) {
+		GByteArray *buf = sstream->priv->buf;
 
-		p = memchr (read_buf->data, '\n', read_buf->len);
-		nread = p ? p + 1 - read_buf->data : read_buf->len;
+		p = memchr (buf->data, '\n', buf->len);
+		nread = p ? p + 1 - buf->data : buf->len;
 		return read_from_buf (sstream, buffer, nread);
 	}
 
@@ -184,8 +364,8 @@ soup_input_stream_read_line (SoupInputStream       *sstream,
 		return nread;
 
 	p++;
-	sstream->priv->read_buf = g_byte_array_new ();
-	g_byte_array_append (sstream->priv->read_buf,
+	sstream->priv->buf = g_byte_array_new ();
+	g_byte_array_append (sstream->priv->buf,
 			     p, nread - (p - buf));
 	return p - buf;
 }
@@ -202,11 +382,11 @@ soup_input_stream_read_line_nonblocking (SoupInputStream       *sstream,
 
 	g_return_val_if_fail (SOUP_IS_INPUT_STREAM (sstream), -1);
 
-	if (sstream->priv->read_buf) {
-		GByteArray *read_buf = sstream->priv->read_buf;
+	if (sstream->priv->buf) {
+		GByteArray *buf = sstream->priv->buf;
 
-		p = memchr (read_buf->data, '\n', read_buf->len);
-		nread = p ? p + 1 - read_buf->data : read_buf->len;
+		p = memchr (buf->data, '\n', buf->len);
+		nread = p ? p + 1 - buf->data : buf->len;
 		return read_from_buf (sstream, buffer, nread);
 	}
 
@@ -221,8 +401,8 @@ soup_input_stream_read_line_nonblocking (SoupInputStream       *sstream,
 		return nread;
 
 	p++;
-	sstream->priv->read_buf = g_byte_array_new ();
-	g_byte_array_append (sstream->priv->read_buf,
+	sstream->priv->buf = g_byte_array_new ();
+	g_byte_array_append (sstream->priv->buf,
 			     p, nread - (p - buf));
 	return p - buf;
 }
diff --git a/libsoup/soup-input-stream.h b/libsoup/soup-input-stream.h
index ac35b07..91141f5 100644
--- a/libsoup/soup-input-stream.h
+++ b/libsoup/soup-input-stream.h
@@ -7,6 +7,7 @@
 #define SOUP_INPUT_STREAM_H 1
 
 #include <libsoup/soup-types.h>
+#include <libsoup/soup-message-headers.h>
 
 G_BEGIN_DECLS
 
@@ -39,6 +40,10 @@ GType soup_input_stream_get_type (void);
 
 GInputStream *soup_input_stream_new                   (GInputStream          *base_stream);
 
+void          soup_input_stream_set_encoding          (SoupInputStream       *sstream,
+						       SoupEncoding           encoding,
+						       goffset                content_length);
+
 gssize        soup_input_stream_read_line             (SoupInputStream       *sstream,
 						       void                  *buffer,
 						       gsize                  length,
diff --git a/libsoup/soup-message-io.c b/libsoup/soup-message-io.c
index 40f8d54..31c043c 100644
--- a/libsoup/soup-message-io.c
+++ b/libsoup/soup-message-io.c
@@ -180,8 +180,8 @@ soup_message_io_finished (SoupMessage *msg)
 	g_object_unref (msg);
 }
 
-static void io_read (SoupSocket *sock, SoupMessage *msg);
-static void io_write (SoupSocket *sock, SoupMessage *msg);
+static gboolean io_read (GInputStream *stream, SoupMessage *msg);
+static gboolean io_write (GOutputStream *stream, SoupMessage *msg);
 
 static gboolean
 request_is_idempotent (SoupMessage *msg)
@@ -218,19 +218,6 @@ io_error (SoupSocket *sock, SoupMessage *msg, GError *error)
 }
 
 static gboolean
-io_readable (GPollableInputStream *stream, gpointer msg)
-{
-	SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
-	SoupMessageIOData *io = priv->io_data;
-
-	g_source_destroy (io->read_source);
-	io->read_source = NULL;
-
-	io_read (io->sock, msg);
-	return FALSE;
-}
-
-static gboolean
 io_handle_sniffing (SoupMessage *msg, gboolean done_reading)
 {
 	SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
@@ -279,6 +266,33 @@ io_handle_sniffing (SoupMessage *msg, gboolean done_reading)
 	return TRUE;
 }
 
+static void
+setup_read_source (SoupMessage *msg)
+{
+	SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
+	SoupMessageIOData *io = priv->io_data;
+
+	io->read_source = g_pollable_input_stream_create_source (
+		G_POLLABLE_INPUT_STREAM (io->istream), NULL);
+	g_source_set_callback (io->read_source,
+			       (GSourceFunc) io_read, msg, NULL);
+	g_source_attach (io->read_source, io->async_context);
+	g_source_unref (io->read_source);
+}
+
+static void
+setup_write_source (SoupMessage *msg)
+{
+	SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
+	SoupMessageIOData *io = priv->io_data;
+
+	io->write_source = g_pollable_output_stream_create_source (
+		G_POLLABLE_OUTPUT_STREAM (io->ostream),
+		NULL);
+	g_source_set_callback (io->write_source, (GSourceFunc) io_write, msg, NULL);
+	g_source_attach (io->write_source, io->async_context);
+}
+
 /* Reads data from io->sock into io->read_meta_buf. If @to_blank is
  * %TRUE, it reads up until a blank line ("CRLF CRLF" or "LF LF").
  * Otherwise, it reads up until a single CRLF or LF.
@@ -326,12 +340,7 @@ read_metadata (SoupMessage *msg, gboolean to_blank)
 
 		if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
 			g_error_free (error);
-			io->read_source = g_pollable_input_stream_create_source (
-				G_POLLABLE_INPUT_STREAM (io->istream),
-				NULL);
-			g_source_set_callback (io->read_source, (GSourceFunc) io_readable, msg, NULL);
-			g_source_attach (io->read_source, io->async_context);
-			g_source_unref (io->read_source);
+			setup_read_source (msg);
 			return FALSE;
 		}
 
@@ -473,8 +482,6 @@ read_body_chunk (SoupMessage *msg)
 	SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
 	SoupMessageIOData *io = priv->io_data;
 	guchar *stack_buf = NULL;
-	gsize len;
-	gboolean read_to_eof = (io->read_encoding == SOUP_ENCODING_EOF);
 	gssize nread;
 	GError *error = NULL;
 	SoupBuffer *buffer;
@@ -484,10 +491,15 @@ read_body_chunk (SoupMessage *msg)
 		return FALSE;
 	}
 
+	if (io->read_encoding == SOUP_ENCODING_NONE ||
+	    (io->read_encoding == SOUP_ENCODING_CONTENT_LENGTH &&
+	     io->read_length == 0))	    
+		return TRUE;
+
 	if (!io_handle_sniffing (msg, FALSE))
 		return FALSE;
 
-	while (read_to_eof || io->read_length > 0) {
+	while (TRUE) {
 		if (priv->chunk_allocator) {
 			buffer = priv->chunk_allocator (msg, io->read_length, priv->chunk_allocator_data);
 			if (!buffer) {
@@ -502,19 +514,15 @@ read_body_chunk (SoupMessage *msg)
 						  RESPONSE_BLOCK_SIZE);
 		}
 
-		if (read_to_eof)
-			len = buffer->length;
-		else
-			len = MIN (buffer->length, io->read_length);
-
 		if (io->non_blocking) {
 			nread = g_pollable_input_stream_read_nonblocking (
 				G_POLLABLE_INPUT_STREAM (io->istream),
-				(guchar *)buffer->data, len,
+				(guchar *)buffer->data, buffer->length,
 				io->cancellable, &error);
 		} else {
 			nread = g_input_stream_read (io->istream,
-						     (guchar *)buffer->data, len,
+						     (guchar *)buffer->data,
+						     buffer->length,
 						     io->cancellable, &error);
 		}
 
@@ -546,18 +554,15 @@ read_body_chunk (SoupMessage *msg)
 
 		soup_buffer_free (buffer);
 
+		if (nread == 0)
+			return TRUE;
+
 		if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
 			g_error_free (error);
-			io->read_source = g_pollable_input_stream_create_source (
-				G_POLLABLE_INPUT_STREAM (io->istream),
-				NULL);
-			g_source_set_callback (io->read_source, (GSourceFunc) io_readable, msg, NULL);
-			g_source_attach (io->read_source, io->async_context);
-			g_source_unref (io->read_source);
+			setup_read_source (msg);
 			return FALSE;
-		}
-
-		if (nread == 0) {
+		} else if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_PARTIAL_INPUT)) {
+			g_clear_error (&error);
 			if (io->read_eof_ok) {
 				io->read_length = 0;
 				return TRUE;
@@ -568,21 +573,6 @@ read_body_chunk (SoupMessage *msg)
 		io_error (io->sock, msg, error);
 		return FALSE;
 	}
-
-	return TRUE;
-}
-
-static gboolean
-io_writable (GPollableOutputStream *stream, gpointer msg)
-{
-	SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
-	SoupMessageIOData *io = priv->io_data;
-
-	g_source_destroy (io->write_source);
-	io->write_source = NULL;
-
-	io_write (io->sock, msg);
-	return FALSE;
 }
 
 /* Attempts to write @len bytes from @data. See the note at
@@ -618,11 +608,7 @@ write_data (SoupMessage *msg, const char *data, guint len, gboolean body)
 
 		if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
 			g_error_free (error);
-			io->write_source = g_pollable_output_stream_create_source (
-				G_POLLABLE_OUTPUT_STREAM (io->ostream),
-				NULL);
-			g_source_set_callback (io->write_source, (GSourceFunc) io_writable, msg, NULL);
-			g_source_attach (io->write_source, io->async_context);
+			setup_write_source (msg);
 			g_source_unref (io->write_source);
 			return FALSE;
 		} else if (error) {
@@ -685,16 +671,21 @@ io_body_state (SoupEncoding encoding)
  *      W:DONE     / R:DONE               R:DONE     / W:DONE
  */
 
-static void
-io_write (SoupSocket *sock, SoupMessage *msg)
+static gboolean
+io_write (GOutputStream *stream, SoupMessage *msg)
 {
 	SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
 	SoupMessageIOData *io = priv->io_data;
 
+	if (io->write_source) {
+		g_source_destroy (io->write_source);
+		io->write_source = NULL;
+	}
+
  write_more:
 	switch (io->write_state) {
 	case SOUP_MESSAGE_IO_STATE_NOT_STARTED:
-		return;
+		return FALSE;
 
 
 	case SOUP_MESSAGE_IO_STATE_HEADERS:
@@ -704,13 +695,13 @@ io_write (SoupSocket *sock, SoupMessage *msg)
 					    io->header_data);
 			if (!io->write_buf->len) {
 				soup_message_io_pause (msg);
-				return;
+				return FALSE;
 			}
 		}
 
 		if (!write_data (msg, io->write_buf->str,
 				 io->write_buf->len, FALSE))
-			return;
+			return FALSE;
 
 		g_string_truncate (io->write_buf, 0);
 
@@ -727,7 +718,7 @@ io_write (SoupSocket *sock, SoupMessage *msg)
 				/* Stop and wait for the body now */
 				io->write_state =
 					SOUP_MESSAGE_IO_STATE_BLOCKING;
-				io->read_state = io_body_state (io->read_encoding);
+				io->read_state = SOUP_MESSAGE_IO_STATE_BODY;
 			} else {
 				/* We just wrote a 1xx response
 				 * header, so stay in STATE_HEADERS.
@@ -760,12 +751,12 @@ io_write (SoupSocket *sock, SoupMessage *msg)
 			soup_message_cleanup_response (msg);
 		} else
 			soup_message_wrote_headers (msg);
-		SOUP_MESSAGE_IO_RETURN_IF_CANCELLED_OR_PAUSED;
+		SOUP_MESSAGE_IO_RETURN_VAL_IF_CANCELLED_OR_PAUSED (FALSE);
 		break;
 
 
 	case SOUP_MESSAGE_IO_STATE_BLOCKING:
-		io_read (sock, msg);
+		io_read (io->istream, msg);
 
 		/* If io_read reached a point where we could write
 		 * again, it would have recursively called io_write.
@@ -773,7 +764,7 @@ io_write (SoupSocket *sock, SoupMessage *msg)
 		 * (b) we can't anyway, because msg may have been
 		 * destroyed.
 		 */
-		return;
+		return FALSE;
 
 
 	case SOUP_MESSAGE_IO_STATE_BODY:
@@ -783,7 +774,7 @@ io_write (SoupSocket *sock, SoupMessage *msg)
 
 			SOUP_MESSAGE_IO_PREPARE_FOR_CALLBACK;
 			soup_message_wrote_body (msg);
-			SOUP_MESSAGE_IO_RETURN_IF_CANCELLED_OR_PAUSED;
+			SOUP_MESSAGE_IO_RETURN_VAL_IF_CANCELLED_OR_PAUSED (FALSE);
 			break;
 		}
 
@@ -791,7 +782,7 @@ io_write (SoupSocket *sock, SoupMessage *msg)
 			io->write_chunk = soup_message_body_get_chunk (io->write_body, io->write_body_offset);
 			if (!io->write_chunk) {
 				soup_message_io_pause (msg);
-				return;
+				return FALSE;
 			}
 			if (io->write_chunk->length > io->write_length &&
 			    io->write_encoding != SOUP_ENCODING_EOF) {
@@ -810,7 +801,7 @@ io_write (SoupSocket *sock, SoupMessage *msg)
 
 		if (!write_data (msg, io->write_chunk->data,
 				 io->write_chunk->length, TRUE))
-			return;
+			return FALSE;
 
 		if (io->mode == SOUP_MESSAGE_IO_SERVER)
 			soup_message_body_wrote_chunk (io->write_body, io->write_chunk);
@@ -820,7 +811,7 @@ io_write (SoupSocket *sock, SoupMessage *msg)
 
 		SOUP_MESSAGE_IO_PREPARE_FOR_CALLBACK;
 		soup_message_wrote_chunk (msg);
-		SOUP_MESSAGE_IO_RETURN_IF_CANCELLED_OR_PAUSED;
+		SOUP_MESSAGE_IO_RETURN_VAL_IF_CANCELLED_OR_PAUSED (FALSE);
 		break;
 
 	case SOUP_MESSAGE_IO_STATE_CHUNK_SIZE:
@@ -828,7 +819,7 @@ io_write (SoupSocket *sock, SoupMessage *msg)
 			io->write_chunk = soup_message_body_get_chunk (io->write_body, io->write_body_offset);
 			if (!io->write_chunk) {
 				soup_message_io_pause (msg);
-				return;
+				return FALSE;
 			}
 			g_string_append_printf (io->write_buf, "%lx\r\n",
 						(unsigned long) io->write_chunk->length);
@@ -837,7 +828,7 @@ io_write (SoupSocket *sock, SoupMessage *msg)
 
 		if (!write_data (msg, io->write_buf->str,
 				 io->write_buf->len, FALSE))
-			return;
+			return FALSE;
 
 		g_string_truncate (io->write_buf, 0);
 
@@ -854,7 +845,7 @@ io_write (SoupSocket *sock, SoupMessage *msg)
 	case SOUP_MESSAGE_IO_STATE_CHUNK:
 		if (!write_data (msg, io->write_chunk->data,
 				 io->write_chunk->length, TRUE))
-			return;
+			return FALSE;
 
 		if (io->mode == SOUP_MESSAGE_IO_SERVER)
 			soup_message_body_wrote_chunk (io->write_body, io->write_chunk);
@@ -865,7 +856,7 @@ io_write (SoupSocket *sock, SoupMessage *msg)
 
 		SOUP_MESSAGE_IO_PREPARE_FOR_CALLBACK;
 		soup_message_wrote_chunk (msg);
-		SOUP_MESSAGE_IO_RETURN_IF_CANCELLED_OR_PAUSED;
+		SOUP_MESSAGE_IO_RETURN_VAL_IF_CANCELLED_OR_PAUSED (FALSE);
 
 		/* fall through */
 
@@ -873,7 +864,7 @@ io_write (SoupSocket *sock, SoupMessage *msg)
 	case SOUP_MESSAGE_IO_STATE_CHUNK_END:
 		if (!write_data (msg, SOUP_MESSAGE_IO_EOL,
 				 SOUP_MESSAGE_IO_EOL_LEN, FALSE))
-			return;
+			return FALSE;
 
 		io->write_state = SOUP_MESSAGE_IO_STATE_CHUNK_SIZE;
 		break;
@@ -882,13 +873,13 @@ io_write (SoupSocket *sock, SoupMessage *msg)
 	case SOUP_MESSAGE_IO_STATE_TRAILERS:
 		if (!write_data (msg, SOUP_MESSAGE_IO_EOL,
 				 SOUP_MESSAGE_IO_EOL_LEN, FALSE))
-			return;
+			return FALSE;
 
 		io->write_state = SOUP_MESSAGE_IO_STATE_FINISHING;
 
 		SOUP_MESSAGE_IO_PREPARE_FOR_CALLBACK;
 		soup_message_wrote_body (msg);
-		SOUP_MESSAGE_IO_RETURN_IF_CANCELLED_OR_PAUSED;
+		SOUP_MESSAGE_IO_RETURN_VAL_IF_CANCELLED_OR_PAUSED (FALSE);
 		/* fall through */
 
 
@@ -901,36 +892,41 @@ io_write (SoupSocket *sock, SoupMessage *msg)
 
 		if (io->mode == SOUP_MESSAGE_IO_CLIENT) {
 			io->read_state = SOUP_MESSAGE_IO_STATE_HEADERS;
-			io_read (sock, msg);
+			io_read (io->istream, msg);
 		} else
 			soup_message_io_finished (msg);
-		return;
+		return FALSE;
 
 
 	case SOUP_MESSAGE_IO_STATE_DONE:
 	default:
-		g_return_if_reached ();
+		g_return_val_if_reached (FALSE);
 	}
 
 	goto write_more;
 }
 
-static void
-io_read (SoupSocket *sock, SoupMessage *msg)
+static gboolean
+io_read (GInputStream *stream, SoupMessage *msg)
 {
 	SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
 	SoupMessageIOData *io = priv->io_data;
 	guint status;
 
+	if (io->read_source) {
+		g_source_destroy (io->read_source);
+		io->read_source = NULL;
+	}
+
  read_more:
 	switch (io->read_state) {
 	case SOUP_MESSAGE_IO_STATE_NOT_STARTED:
-		return;
+		return FALSE;
 
 
 	case SOUP_MESSAGE_IO_STATE_HEADERS:
 		if (!read_metadata (msg, TRUE))
-			return;
+			return FALSE;
 
 		/* We need to "rewind" io->read_meta_buf back one line.
 		 * That SHOULD be two characters (CR LF), but if the
@@ -983,6 +979,10 @@ io_read (SoupSocket *sock, SoupMessage *msg)
 			}
 		}
 
+		soup_input_stream_set_encoding (SOUP_INPUT_STREAM (io->istream),
+						io->read_encoding,
+						io->read_length);
+
 		if (io->mode == SOUP_MESSAGE_IO_CLIENT &&
 		    SOUP_STATUS_IS_INFORMATIONAL (msg->status_code)) {
 			if (msg->status_code == SOUP_STATUS_CONTINUE &&
@@ -1006,7 +1006,7 @@ io_read (SoupSocket *sock, SoupMessage *msg)
 			io->write_state = SOUP_MESSAGE_IO_STATE_HEADERS;
 			io->read_state = SOUP_MESSAGE_IO_STATE_BLOCKING;
 		} else {
-			io->read_state = io_body_state (io->read_encoding);
+			io->read_state = SOUP_MESSAGE_IO_STATE_BODY;
 
 			/* If the client was waiting for a Continue
 			 * but got something else, then it's done
@@ -1022,27 +1022,26 @@ io_read (SoupSocket *sock, SoupMessage *msg)
 			SOUP_MESSAGE_IO_PREPARE_FOR_CALLBACK;
 			soup_message_got_informational (msg);
 			soup_message_cleanup_response (msg);
-			SOUP_MESSAGE_IO_RETURN_IF_CANCELLED_OR_PAUSED;
+			SOUP_MESSAGE_IO_RETURN_VAL_IF_CANCELLED_OR_PAUSED (FALSE);
 		} else {
 			SOUP_MESSAGE_IO_PREPARE_FOR_CALLBACK;
 			soup_message_got_headers (msg);
-			SOUP_MESSAGE_IO_RETURN_IF_CANCELLED_OR_PAUSED;
+			SOUP_MESSAGE_IO_RETURN_VAL_IF_CANCELLED_OR_PAUSED (FALSE);
 		}
 		break;
 
 
 	case SOUP_MESSAGE_IO_STATE_BLOCKING:
-		io_write (sock, msg);
+		io_write (io->ostream, msg);
 
 		/* As in the io_write case, we *must* return here. */
-		return;
+		return FALSE;
 
 
 	case SOUP_MESSAGE_IO_STATE_BODY:
 		if (!read_body_chunk (msg))
-			return;
+			return FALSE;
 
-	got_body:
 		if (!io_handle_sniffing (msg, TRUE)) {
 			/* If the message was paused (as opposed to
 			 * cancelled), we need to make sure we wind up
@@ -1055,57 +1054,14 @@ io_read (SoupSocket *sock, SoupMessage *msg)
 				io->read_encoding = SOUP_ENCODING_CONTENT_LENGTH;
 				io->read_length = 0;
 			}
-			return;
+			return FALSE;
 		}
 
 		io->read_state = SOUP_MESSAGE_IO_STATE_FINISHING;
 
 		SOUP_MESSAGE_IO_PREPARE_FOR_CALLBACK;
 		soup_message_got_body (msg);
-		SOUP_MESSAGE_IO_RETURN_IF_CANCELLED_OR_PAUSED;
-		break;
-
-
-	case SOUP_MESSAGE_IO_STATE_CHUNK_SIZE:
-		if (!read_metadata (msg, FALSE))
-			return;
-
-		io->read_length = strtoul ((char *)io->read_meta_buf->data, NULL, 16);
-		g_byte_array_set_size (io->read_meta_buf, 0);
-
-		if (io->read_length > 0)
-			io->read_state = SOUP_MESSAGE_IO_STATE_CHUNK;
-		else
-			io->read_state = SOUP_MESSAGE_IO_STATE_TRAILERS;
-		break;
-
-
-	case SOUP_MESSAGE_IO_STATE_CHUNK:
-		if (!read_body_chunk (msg))
-			return;
-
-		io->read_state = SOUP_MESSAGE_IO_STATE_CHUNK_END;
-		break;
-
-
-	case SOUP_MESSAGE_IO_STATE_CHUNK_END:
-		if (!read_metadata (msg, FALSE))
-			return;
-
-		g_byte_array_set_size (io->read_meta_buf, 0);
-		io->read_state = SOUP_MESSAGE_IO_STATE_CHUNK_SIZE;
-		break;
-
-
-	case SOUP_MESSAGE_IO_STATE_TRAILERS:
-		if (!read_metadata (msg, FALSE))
-			return;
-
-		if (io->read_meta_buf->len <= SOUP_MESSAGE_IO_EOL_LEN)
-			goto got_body;
-
-		/* FIXME: process trailers */
-		g_byte_array_set_size (io->read_meta_buf, 0);
+		SOUP_MESSAGE_IO_RETURN_VAL_IF_CANCELLED_OR_PAUSED (FALSE);
 		break;
 
 
@@ -1118,15 +1074,15 @@ io_read (SoupSocket *sock, SoupMessage *msg)
 
 		if (io->mode == SOUP_MESSAGE_IO_SERVER) {
 			io->write_state = SOUP_MESSAGE_IO_STATE_HEADERS;
-			io_write (sock, msg);
+			io_write (io->ostream, msg);
 		} else
 			soup_message_io_finished (msg);
-		return;
+		return FALSE;
 
 
 	case SOUP_MESSAGE_IO_STATE_DONE:
 	default:
-		g_return_if_reached ();
+		g_return_val_if_reached (FALSE);
 	}
 
 	goto read_more;
@@ -1222,7 +1178,7 @@ soup_message_io_client (SoupMessageQueueItem *item,
 	io->write_body      = item->msg->request_body;
 
 	io->write_state     = SOUP_MESSAGE_IO_STATE_HEADERS;
-	io_write (sock, item->msg);
+	io_write (io->ostream, item->msg);
 }
 
 void
@@ -1243,7 +1199,7 @@ soup_message_io_server (SoupMessage *msg, SoupSocket *sock,
 	io->write_body      = msg->response_body;
 
 	io->read_state      = SOUP_MESSAGE_IO_STATE_HEADERS;
-	io_read (sock, msg);
+	io_read (io->istream, msg);
 }
 
 void  
@@ -1285,9 +1241,9 @@ io_unpause_internal (gpointer msg)
 		return FALSE;
 
 	if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->write_state))
-		io_write (io->sock, msg);
+		io_write (io->ostream, msg);
 	else if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->read_state))
-		io_read (io->sock, msg);
+		io_read (io->istream, msg);
 
 	return FALSE;
 }



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]