[libsoup/giobased: 3/11] Push chunked decoding into SoupInputStream
- From: Dan Winship <danw src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [libsoup/giobased: 3/11] Push chunked decoding into SoupInputStream
- Date: Tue, 22 Mar 2011 13:06:53 +0000 (UTC)
commit 028024a7daf5b57c7fa77dab214dff738f0ebe19
Author: Dan Winship <danw gnome org>
Date: Thu Dec 9 11:26:49 2010 +0100
Push chunked decoding into SoupInputStream
libsoup/soup-input-stream.c | 246 +++++++++++++++++++++++++++++++++++++------
libsoup/soup-input-stream.h | 5 +
libsoup/soup-message-io.c | 248 ++++++++++++++++++-------------------------
3 files changed, 320 insertions(+), 179 deletions(-)
---
diff --git a/libsoup/soup-input-stream.c b/libsoup/soup-input-stream.c
index 3f334df..2525126 100644
--- a/libsoup/soup-input-stream.c
+++ b/libsoup/soup-input-stream.c
@@ -9,14 +9,28 @@
#include <config.h>
#endif
+#include <stdlib.h>
#include <string.h>
#include <gio/gio.h>
#include "soup-input-stream.h"
+#include "soup-message-headers.h"
+
+typedef enum {
+ SOUP_INPUT_STREAM_STATE_CHUNK_SIZE,
+ SOUP_INPUT_STREAM_STATE_CHUNK_END,
+ SOUP_INPUT_STREAM_STATE_CHUNK,
+ SOUP_INPUT_STREAM_STATE_TRAILERS,
+ SOUP_INPUT_STREAM_STATE_DONE
+} SoupInputStreamState;
struct _SoupInputStreamPrivate {
GInputStream *base_stream;
- GByteArray *read_buf;
+ GByteArray *buf;
+
+ SoupEncoding encoding;
+ goffset read_length;
+ SoupInputStreamState chunked_state;
};
static void soup_input_stream_pollable_init (GPollableInputStreamInterface *pollable_interface, gpointer interface_data);
@@ -25,6 +39,7 @@ G_DEFINE_TYPE_WITH_CODE (SoupInputStream, soup_input_stream, G_TYPE_FILTER_INPUT
G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM,
soup_input_stream_pollable_init))
+
static void
soup_input_stream_init (SoupInputStream *stream)
{
@@ -46,8 +61,8 @@ finalize (GObject *object)
{
SoupInputStream *sstream = SOUP_INPUT_STREAM (object);
- if (sstream->priv->read_buf)
- g_byte_array_free (sstream->priv->read_buf, TRUE);
+ if (sstream->priv->buf)
+ g_byte_array_free (sstream->priv->buf, TRUE);
G_OBJECT_CLASS (soup_input_stream_parent_class)->finalize (object);
}
@@ -55,25 +70,126 @@ finalize (GObject *object)
static gssize
read_from_buf (SoupInputStream *sstream, gpointer buffer, gsize count)
{
- GByteArray *read_buf = sstream->priv->read_buf;
+ GByteArray *buf = sstream->priv->buf;
- if (read_buf->len < count)
- count = read_buf->len;
- memcpy (buffer, read_buf->data, count);
+ if (buf->len < count)
+ count = buf->len;
+ memcpy (buffer, buf->data, count);
- if (count == read_buf->len) {
- g_byte_array_free (read_buf, TRUE);
- sstream->priv->read_buf = NULL;
+ if (count == buf->len) {
+ g_byte_array_free (buf, TRUE);
+ sstream->priv->buf = NULL;
} else {
- memmove (read_buf->data, read_buf->data + count,
- read_buf->len - count);
- g_byte_array_set_size (read_buf, read_buf->len - count);
+ memmove (buf->data, buf->data + count,
+ buf->len - count);
+ g_byte_array_set_size (buf, buf->len - count);
}
return count;
}
static gssize
+soup_input_stream_read_raw (SoupInputStream *sstream,
+ void *buffer,
+ gsize count,
+ gboolean blocking,
+ GCancellable *cancellable,
+ GError **error)
+{
+ gssize nread;
+
+ if (sstream->priv->buf) {
+ return read_from_buf (sstream, buffer, count);
+ } else if (blocking) {
+ nread = g_input_stream_read (sstream->priv->base_stream,
+ buffer, count,
+ cancellable, error);
+ } else {
+ nread = g_pollable_input_stream_read_nonblocking (
+ G_POLLABLE_INPUT_STREAM (sstream->priv->base_stream),
+ buffer, count, cancellable, error);
+ }
+
+ if (nread == 0 && sstream->priv->encoding != SOUP_ENCODING_EOF) {
+ g_set_error (error, G_IO_ERROR, G_IO_ERROR_PARTIAL_INPUT, NULL);
+ return -1;
+ }
+ return nread;
+}
+
+static gssize
+soup_input_stream_read_chunked (SoupInputStream *sstream,
+ void *buffer,
+ gsize count,
+ gboolean blocking,
+ GCancellable *cancellable,
+ GError **error)
+{
+ char metabuf[128];
+ gssize nread;
+
+again:
+ switch (sstream->priv->chunked_state) {
+ case SOUP_INPUT_STREAM_STATE_CHUNK_SIZE:
+ case SOUP_INPUT_STREAM_STATE_CHUNK_END:
+ if (blocking) {
+ nread = soup_input_stream_read_line (
+ sstream, metabuf, sizeof (metabuf),
+ cancellable, error);
+ } else {
+ nread = soup_input_stream_read_line_nonblocking (
+ sstream, metabuf, sizeof (metabuf),
+ cancellable, error);
+ }
+ if (nread <= 0)
+ return nread;
+ if (metabuf[nread - 1] != '\n')
+ return -1;
+
+ if (sstream->priv->chunked_state == SOUP_INPUT_STREAM_STATE_CHUNK_SIZE) {
+ sstream->priv->read_length = strtoul (metabuf, NULL, 16);
+ if (sstream->priv->read_length > 0)
+ sstream->priv->chunked_state = SOUP_INPUT_STREAM_STATE_CHUNK;
+ else
+ sstream->priv->chunked_state = SOUP_INPUT_STREAM_STATE_TRAILERS;
+ } else
+ sstream->priv->chunked_state = SOUP_INPUT_STREAM_STATE_CHUNK_SIZE;
+ break;
+
+ case SOUP_INPUT_STREAM_STATE_CHUNK:
+ nread = soup_input_stream_read_raw (sstream, buffer,
+ MIN (count, sstream->priv->read_length),
+ blocking, cancellable, error);
+ if (nread > 0) {
+ sstream->priv->read_length -= nread;
+ if (sstream->priv->read_length == 0)
+ sstream->priv->chunked_state = SOUP_INPUT_STREAM_STATE_CHUNK_END;
+ }
+ return nread;
+
+ case SOUP_INPUT_STREAM_STATE_TRAILERS:
+ if (blocking) {
+ nread = soup_input_stream_read_line (
+ sstream, buffer, count, cancellable, error);
+ } else {
+ nread = soup_input_stream_read_line_nonblocking (
+ sstream, buffer, count, cancellable, error);
+ }
+ if (nread <= 0)
+ return nread;
+
+ if (strncmp (buffer, "\r\n", nread) || strncmp (buffer, "\n", nread))
+ sstream->priv->chunked_state = SOUP_INPUT_STREAM_STATE_DONE;
+ break;
+
+ case SOUP_INPUT_STREAM_STATE_DONE:
+ return 0;
+ }
+
+ goto again;
+}
+
+static gssize
soup_input_stream_read (GInputStream *stream,
void *buffer,
gsize count,
@@ -81,13 +197,31 @@ soup_input_stream_read (GInputStream *stream,
GError **error)
{
SoupInputStream *sstream = SOUP_INPUT_STREAM (stream);
+ gssize nread;
- if (sstream->priv->read_buf) {
- return read_from_buf (sstream, buffer, count);
- } else {
- return g_input_stream_read (sstream->priv->base_stream,
- buffer, count,
- cancellable, error);
+ switch (sstream->priv->encoding) {
+ case SOUP_ENCODING_CHUNKED:
+ return soup_input_stream_read_chunked (sstream, buffer, count,
+ TRUE, cancellable,
+ error);
+
+ case SOUP_ENCODING_CONTENT_LENGTH:
+ count = MIN (count, sstream->priv->read_length);
+ if (count == 0)
+ return 0;
+ nread = soup_input_stream_read_raw (sstream, buffer, count,
+ TRUE, cancellable, error);
+ if (nread > 0)
+ sstream->priv->read_length -= nread;
+ return nread;
+
+ case SOUP_ENCODING_EOF:
+ return soup_input_stream_read_raw (sstream, buffer, count,
+ TRUE, cancellable, error);
+
+ case SOUP_ENCODING_NONE:
+ default:
+ return 0;
}
}
@@ -96,7 +230,7 @@ soup_input_stream_is_readable (GPollableInputStream *stream)
{
SoupInputStream *sstream = SOUP_INPUT_STREAM (stream);
- if (sstream->priv->read_buf)
+ if (sstream->priv->buf)
return TRUE;
else
return g_pollable_input_stream_is_readable (G_POLLABLE_INPUT_STREAM (sstream->priv->base_stream));
@@ -109,7 +243,7 @@ soup_input_stream_create_source (GPollableInputStream *stream,
SoupInputStream *sstream = SOUP_INPUT_STREAM (stream);
GSource *base_source, *pollable_source;
- if (sstream->priv->read_buf)
+ if (sstream->priv->buf)
base_source = g_idle_source_new ();
else
base_source = g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM (sstream->priv->base_stream), cancellable);
@@ -122,6 +256,40 @@ soup_input_stream_create_source (GPollableInputStream *stream,
return pollable_source;
}
+static gssize
+soup_input_stream_read_nonblocking (GPollableInputStream *stream,
+ void *buffer,
+ gsize count,
+ GError **error)
+{
+ SoupInputStream *sstream = SOUP_INPUT_STREAM (stream);
+ gssize nread;
+
+ switch (sstream->priv->encoding) {
+ case SOUP_ENCODING_CHUNKED:
+ return soup_input_stream_read_chunked (sstream, buffer, count,
+ FALSE, NULL, error);
+
+ case SOUP_ENCODING_CONTENT_LENGTH:
+ count = MIN (count, sstream->priv->read_length);
+ if (count == 0)
+ return 0;
+ nread = soup_input_stream_read_raw (sstream, buffer, count,
+ FALSE, NULL, error);
+ if (nread > 0)
+ sstream->priv->read_length -= nread;
+ return nread;
+
+ case SOUP_ENCODING_EOF:
+ return soup_input_stream_read_raw (sstream, buffer, count,
+ FALSE, NULL, error);
+
+ case SOUP_ENCODING_NONE:
+ default:
+ return 0;
+ }
+}
+
static void
soup_input_stream_class_init (SoupInputStreamClass *stream_class)
{
@@ -142,6 +310,7 @@ soup_input_stream_pollable_init (GPollableInputStreamInterface *pollable_interfa
{
pollable_interface->is_readable = soup_input_stream_is_readable;
pollable_interface->create_source = soup_input_stream_create_source;
+ pollable_interface->read_nonblocking = soup_input_stream_read_nonblocking;
}
GInputStream *
@@ -153,6 +322,17 @@ soup_input_stream_new (GInputStream *base_stream)
NULL);
}
+void
+soup_input_stream_set_encoding (SoupInputStream *sstream,
+ SoupEncoding encoding,
+ goffset content_length)
+{
+ sstream->priv->encoding = encoding;
+ sstream->priv->read_length = content_length;
+ if (encoding == SOUP_ENCODING_CHUNKED)
+ sstream->priv->chunked_state = SOUP_INPUT_STREAM_STATE_CHUNK_SIZE;
+}
+
gssize
soup_input_stream_read_line (SoupInputStream *sstream,
void *buffer,
@@ -165,11 +345,11 @@ soup_input_stream_read_line (SoupInputStream *sstream,
g_return_val_if_fail (SOUP_IS_INPUT_STREAM (sstream), -1);
- if (sstream->priv->read_buf) {
- GByteArray *read_buf = sstream->priv->read_buf;
+ if (sstream->priv->buf) {
+ GByteArray *buf = sstream->priv->buf;
- p = memchr (read_buf->data, '\n', read_buf->len);
- nread = p ? p + 1 - read_buf->data : read_buf->len;
+ p = memchr (buf->data, '\n', buf->len);
+ nread = p ? p + 1 - buf->data : buf->len;
return read_from_buf (sstream, buffer, nread);
}
@@ -184,8 +364,8 @@ soup_input_stream_read_line (SoupInputStream *sstream,
return nread;
p++;
- sstream->priv->read_buf = g_byte_array_new ();
- g_byte_array_append (sstream->priv->read_buf,
+ sstream->priv->buf = g_byte_array_new ();
+ g_byte_array_append (sstream->priv->buf,
p, nread - (p - buf));
return p - buf;
}
@@ -202,11 +382,11 @@ soup_input_stream_read_line_nonblocking (SoupInputStream *sstream,
g_return_val_if_fail (SOUP_IS_INPUT_STREAM (sstream), -1);
- if (sstream->priv->read_buf) {
- GByteArray *read_buf = sstream->priv->read_buf;
+ if (sstream->priv->buf) {
+ GByteArray *buf = sstream->priv->buf;
- p = memchr (read_buf->data, '\n', read_buf->len);
- nread = p ? p + 1 - read_buf->data : read_buf->len;
+ p = memchr (buf->data, '\n', buf->len);
+ nread = p ? p + 1 - buf->data : buf->len;
return read_from_buf (sstream, buffer, nread);
}
@@ -221,8 +401,8 @@ soup_input_stream_read_line_nonblocking (SoupInputStream *sstream,
return nread;
p++;
- sstream->priv->read_buf = g_byte_array_new ();
- g_byte_array_append (sstream->priv->read_buf,
+ sstream->priv->buf = g_byte_array_new ();
+ g_byte_array_append (sstream->priv->buf,
p, nread - (p - buf));
return p - buf;
}
diff --git a/libsoup/soup-input-stream.h b/libsoup/soup-input-stream.h
index ac35b07..91141f5 100644
--- a/libsoup/soup-input-stream.h
+++ b/libsoup/soup-input-stream.h
@@ -7,6 +7,7 @@
#define SOUP_INPUT_STREAM_H 1
#include <libsoup/soup-types.h>
+#include <libsoup/soup-message-headers.h>
G_BEGIN_DECLS
@@ -39,6 +40,10 @@ GType soup_input_stream_get_type (void);
GInputStream *soup_input_stream_new (GInputStream *base_stream);
+void soup_input_stream_set_encoding (SoupInputStream *sstream,
+ SoupEncoding encoding,
+ goffset content_length);
+
gssize soup_input_stream_read_line (SoupInputStream *sstream,
void *buffer,
gsize length,
diff --git a/libsoup/soup-message-io.c b/libsoup/soup-message-io.c
index 40f8d54..31c043c 100644
--- a/libsoup/soup-message-io.c
+++ b/libsoup/soup-message-io.c
@@ -180,8 +180,8 @@ soup_message_io_finished (SoupMessage *msg)
g_object_unref (msg);
}
-static void io_read (SoupSocket *sock, SoupMessage *msg);
-static void io_write (SoupSocket *sock, SoupMessage *msg);
+static gboolean io_read (GInputStream *stream, SoupMessage *msg);
+static gboolean io_write (GOutputStream *stream, SoupMessage *msg);
static gboolean
request_is_idempotent (SoupMessage *msg)
@@ -218,19 +218,6 @@ io_error (SoupSocket *sock, SoupMessage *msg, GError *error)
}
static gboolean
-io_readable (GPollableInputStream *stream, gpointer msg)
-{
- SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
- SoupMessageIOData *io = priv->io_data;
-
- g_source_destroy (io->read_source);
- io->read_source = NULL;
-
- io_read (io->sock, msg);
- return FALSE;
-}
-
-static gboolean
io_handle_sniffing (SoupMessage *msg, gboolean done_reading)
{
SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
@@ -279,6 +266,33 @@ io_handle_sniffing (SoupMessage *msg, gboolean done_reading)
return TRUE;
}
+static void
+setup_read_source (SoupMessage *msg)
+{
+ SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
+ SoupMessageIOData *io = priv->io_data;
+
+ io->read_source = g_pollable_input_stream_create_source (
+ G_POLLABLE_INPUT_STREAM (io->istream), NULL);
+ g_source_set_callback (io->read_source,
+ (GSourceFunc) io_read, msg, NULL);
+ g_source_attach (io->read_source, io->async_context);
+ g_source_unref (io->read_source);
+}
+
+static void
+setup_write_source (SoupMessage *msg)
+{
+ SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
+ SoupMessageIOData *io = priv->io_data;
+
+ io->write_source = g_pollable_output_stream_create_source (
+ G_POLLABLE_OUTPUT_STREAM (io->ostream),
+ NULL);
+ g_source_set_callback (io->write_source, (GSourceFunc) io_write, msg, NULL);
+ g_source_attach (io->write_source, io->async_context);
+}
+
/* Reads data from io->sock into io->read_meta_buf. If @to_blank is
* %TRUE, it reads up until a blank line ("CRLF CRLF" or "LF LF").
* Otherwise, it reads up until a single CRLF or LF.
@@ -326,12 +340,7 @@ read_metadata (SoupMessage *msg, gboolean to_blank)
if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
g_error_free (error);
- io->read_source = g_pollable_input_stream_create_source (
- G_POLLABLE_INPUT_STREAM (io->istream),
- NULL);
- g_source_set_callback (io->read_source, (GSourceFunc) io_readable, msg, NULL);
- g_source_attach (io->read_source, io->async_context);
- g_source_unref (io->read_source);
+ setup_read_source (msg);
return FALSE;
}
@@ -473,8 +482,6 @@ read_body_chunk (SoupMessage *msg)
SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
SoupMessageIOData *io = priv->io_data;
guchar *stack_buf = NULL;
- gsize len;
- gboolean read_to_eof = (io->read_encoding == SOUP_ENCODING_EOF);
gssize nread;
GError *error = NULL;
SoupBuffer *buffer;
@@ -484,10 +491,15 @@ read_body_chunk (SoupMessage *msg)
return FALSE;
}
+ if (io->read_encoding == SOUP_ENCODING_NONE ||
+ (io->read_encoding == SOUP_ENCODING_CONTENT_LENGTH &&
+ io->read_length == 0))
+ return TRUE;
+
if (!io_handle_sniffing (msg, FALSE))
return FALSE;
- while (read_to_eof || io->read_length > 0) {
+ while (TRUE) {
if (priv->chunk_allocator) {
buffer = priv->chunk_allocator (msg, io->read_length, priv->chunk_allocator_data);
if (!buffer) {
@@ -502,19 +514,15 @@ read_body_chunk (SoupMessage *msg)
RESPONSE_BLOCK_SIZE);
}
- if (read_to_eof)
- len = buffer->length;
- else
- len = MIN (buffer->length, io->read_length);
-
if (io->non_blocking) {
nread = g_pollable_input_stream_read_nonblocking (
G_POLLABLE_INPUT_STREAM (io->istream),
- (guchar *)buffer->data, len,
+ (guchar *)buffer->data, buffer->length,
io->cancellable, &error);
} else {
nread = g_input_stream_read (io->istream,
- (guchar *)buffer->data, len,
+ (guchar *)buffer->data,
+ buffer->length,
io->cancellable, &error);
}
@@ -546,18 +554,15 @@ read_body_chunk (SoupMessage *msg)
soup_buffer_free (buffer);
+ if (nread == 0)
+ return TRUE;
+
if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
g_error_free (error);
- io->read_source = g_pollable_input_stream_create_source (
- G_POLLABLE_INPUT_STREAM (io->istream),
- NULL);
- g_source_set_callback (io->read_source, (GSourceFunc) io_readable, msg, NULL);
- g_source_attach (io->read_source, io->async_context);
- g_source_unref (io->read_source);
+ setup_read_source (msg);
return FALSE;
- }
-
- if (nread == 0) {
+ } else if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_PARTIAL_INPUT)) {
+ g_clear_error (&error);
if (io->read_eof_ok) {
io->read_length = 0;
return TRUE;
@@ -568,21 +573,6 @@ read_body_chunk (SoupMessage *msg)
io_error (io->sock, msg, error);
return FALSE;
}
-
- return TRUE;
-}
-
-static gboolean
-io_writable (GPollableOutputStream *stream, gpointer msg)
-{
- SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
- SoupMessageIOData *io = priv->io_data;
-
- g_source_destroy (io->write_source);
- io->write_source = NULL;
-
- io_write (io->sock, msg);
- return FALSE;
}
/* Attempts to write @len bytes from @data. See the note at
@@ -618,11 +608,7 @@ write_data (SoupMessage *msg, const char *data, guint len, gboolean body)
if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
g_error_free (error);
- io->write_source = g_pollable_output_stream_create_source (
- G_POLLABLE_OUTPUT_STREAM (io->ostream),
- NULL);
- g_source_set_callback (io->write_source, (GSourceFunc) io_writable, msg, NULL);
- g_source_attach (io->write_source, io->async_context);
+ setup_write_source (msg);
g_source_unref (io->write_source);
return FALSE;
} else if (error) {
@@ -685,16 +671,21 @@ io_body_state (SoupEncoding encoding)
* W:DONE / R:DONE R:DONE / W:DONE
*/
-static void
-io_write (SoupSocket *sock, SoupMessage *msg)
+static gboolean
+io_write (GOutputStream *stream, SoupMessage *msg)
{
SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
SoupMessageIOData *io = priv->io_data;
+ if (io->write_source) {
+ g_source_destroy (io->write_source);
+ io->write_source = NULL;
+ }
+
write_more:
switch (io->write_state) {
case SOUP_MESSAGE_IO_STATE_NOT_STARTED:
- return;
+ return FALSE;
case SOUP_MESSAGE_IO_STATE_HEADERS:
@@ -704,13 +695,13 @@ io_write (SoupSocket *sock, SoupMessage *msg)
io->header_data);
if (!io->write_buf->len) {
soup_message_io_pause (msg);
- return;
+ return FALSE;
}
}
if (!write_data (msg, io->write_buf->str,
io->write_buf->len, FALSE))
- return;
+ return FALSE;
g_string_truncate (io->write_buf, 0);
@@ -727,7 +718,7 @@ io_write (SoupSocket *sock, SoupMessage *msg)
/* Stop and wait for the body now */
io->write_state =
SOUP_MESSAGE_IO_STATE_BLOCKING;
- io->read_state = io_body_state (io->read_encoding);
+ io->read_state = SOUP_MESSAGE_IO_STATE_BODY;
} else {
/* We just wrote a 1xx response
* header, so stay in STATE_HEADERS.
@@ -760,12 +751,12 @@ io_write (SoupSocket *sock, SoupMessage *msg)
soup_message_cleanup_response (msg);
} else
soup_message_wrote_headers (msg);
- SOUP_MESSAGE_IO_RETURN_IF_CANCELLED_OR_PAUSED;
+ SOUP_MESSAGE_IO_RETURN_VAL_IF_CANCELLED_OR_PAUSED (FALSE);
break;
case SOUP_MESSAGE_IO_STATE_BLOCKING:
- io_read (sock, msg);
+ io_read (io->istream, msg);
/* If io_read reached a point where we could write
* again, it would have recursively called io_write.
@@ -773,7 +764,7 @@ io_write (SoupSocket *sock, SoupMessage *msg)
* (b) we can't anyway, because msg may have been
* destroyed.
*/
- return;
+ return FALSE;
case SOUP_MESSAGE_IO_STATE_BODY:
@@ -783,7 +774,7 @@ io_write (SoupSocket *sock, SoupMessage *msg)
SOUP_MESSAGE_IO_PREPARE_FOR_CALLBACK;
soup_message_wrote_body (msg);
- SOUP_MESSAGE_IO_RETURN_IF_CANCELLED_OR_PAUSED;
+ SOUP_MESSAGE_IO_RETURN_VAL_IF_CANCELLED_OR_PAUSED (FALSE);
break;
}
@@ -791,7 +782,7 @@ io_write (SoupSocket *sock, SoupMessage *msg)
io->write_chunk = soup_message_body_get_chunk (io->write_body, io->write_body_offset);
if (!io->write_chunk) {
soup_message_io_pause (msg);
- return;
+ return FALSE;
}
if (io->write_chunk->length > io->write_length &&
io->write_encoding != SOUP_ENCODING_EOF) {
@@ -810,7 +801,7 @@ io_write (SoupSocket *sock, SoupMessage *msg)
if (!write_data (msg, io->write_chunk->data,
io->write_chunk->length, TRUE))
- return;
+ return FALSE;
if (io->mode == SOUP_MESSAGE_IO_SERVER)
soup_message_body_wrote_chunk (io->write_body, io->write_chunk);
@@ -820,7 +811,7 @@ io_write (SoupSocket *sock, SoupMessage *msg)
SOUP_MESSAGE_IO_PREPARE_FOR_CALLBACK;
soup_message_wrote_chunk (msg);
- SOUP_MESSAGE_IO_RETURN_IF_CANCELLED_OR_PAUSED;
+ SOUP_MESSAGE_IO_RETURN_VAL_IF_CANCELLED_OR_PAUSED (FALSE);
break;
case SOUP_MESSAGE_IO_STATE_CHUNK_SIZE:
@@ -828,7 +819,7 @@ io_write (SoupSocket *sock, SoupMessage *msg)
io->write_chunk = soup_message_body_get_chunk (io->write_body, io->write_body_offset);
if (!io->write_chunk) {
soup_message_io_pause (msg);
- return;
+ return FALSE;
}
g_string_append_printf (io->write_buf, "%lx\r\n",
(unsigned long) io->write_chunk->length);
@@ -837,7 +828,7 @@ io_write (SoupSocket *sock, SoupMessage *msg)
if (!write_data (msg, io->write_buf->str,
io->write_buf->len, FALSE))
- return;
+ return FALSE;
g_string_truncate (io->write_buf, 0);
@@ -854,7 +845,7 @@ io_write (SoupSocket *sock, SoupMessage *msg)
case SOUP_MESSAGE_IO_STATE_CHUNK:
if (!write_data (msg, io->write_chunk->data,
io->write_chunk->length, TRUE))
- return;
+ return FALSE;
if (io->mode == SOUP_MESSAGE_IO_SERVER)
soup_message_body_wrote_chunk (io->write_body, io->write_chunk);
@@ -865,7 +856,7 @@ io_write (SoupSocket *sock, SoupMessage *msg)
SOUP_MESSAGE_IO_PREPARE_FOR_CALLBACK;
soup_message_wrote_chunk (msg);
- SOUP_MESSAGE_IO_RETURN_IF_CANCELLED_OR_PAUSED;
+ SOUP_MESSAGE_IO_RETURN_VAL_IF_CANCELLED_OR_PAUSED (FALSE);
/* fall through */
@@ -873,7 +864,7 @@ io_write (SoupSocket *sock, SoupMessage *msg)
case SOUP_MESSAGE_IO_STATE_CHUNK_END:
if (!write_data (msg, SOUP_MESSAGE_IO_EOL,
SOUP_MESSAGE_IO_EOL_LEN, FALSE))
- return;
+ return FALSE;
io->write_state = SOUP_MESSAGE_IO_STATE_CHUNK_SIZE;
break;
@@ -882,13 +873,13 @@ io_write (SoupSocket *sock, SoupMessage *msg)
case SOUP_MESSAGE_IO_STATE_TRAILERS:
if (!write_data (msg, SOUP_MESSAGE_IO_EOL,
SOUP_MESSAGE_IO_EOL_LEN, FALSE))
- return;
+ return FALSE;
io->write_state = SOUP_MESSAGE_IO_STATE_FINISHING;
SOUP_MESSAGE_IO_PREPARE_FOR_CALLBACK;
soup_message_wrote_body (msg);
- SOUP_MESSAGE_IO_RETURN_IF_CANCELLED_OR_PAUSED;
+ SOUP_MESSAGE_IO_RETURN_VAL_IF_CANCELLED_OR_PAUSED (FALSE);
/* fall through */
@@ -901,36 +892,41 @@ io_write (SoupSocket *sock, SoupMessage *msg)
if (io->mode == SOUP_MESSAGE_IO_CLIENT) {
io->read_state = SOUP_MESSAGE_IO_STATE_HEADERS;
- io_read (sock, msg);
+ io_read (io->istream, msg);
} else
soup_message_io_finished (msg);
- return;
+ return FALSE;
case SOUP_MESSAGE_IO_STATE_DONE:
default:
- g_return_if_reached ();
+ g_return_val_if_reached (FALSE);
}
goto write_more;
}
-static void
-io_read (SoupSocket *sock, SoupMessage *msg)
+static gboolean
+io_read (GInputStream *stream, SoupMessage *msg)
{
SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
SoupMessageIOData *io = priv->io_data;
guint status;
+ if (io->read_source) {
+ g_source_destroy (io->read_source);
+ io->read_source = NULL;
+ }
+
read_more:
switch (io->read_state) {
case SOUP_MESSAGE_IO_STATE_NOT_STARTED:
- return;
+ return FALSE;
case SOUP_MESSAGE_IO_STATE_HEADERS:
if (!read_metadata (msg, TRUE))
- return;
+ return FALSE;
/* We need to "rewind" io->read_meta_buf back one line.
* That SHOULD be two characters (CR LF), but if the
@@ -983,6 +979,10 @@ io_read (SoupSocket *sock, SoupMessage *msg)
}
}
+ soup_input_stream_set_encoding (SOUP_INPUT_STREAM (io->istream),
+ io->read_encoding,
+ io->read_length);
+
if (io->mode == SOUP_MESSAGE_IO_CLIENT &&
SOUP_STATUS_IS_INFORMATIONAL (msg->status_code)) {
if (msg->status_code == SOUP_STATUS_CONTINUE &&
@@ -1006,7 +1006,7 @@ io_read (SoupSocket *sock, SoupMessage *msg)
io->write_state = SOUP_MESSAGE_IO_STATE_HEADERS;
io->read_state = SOUP_MESSAGE_IO_STATE_BLOCKING;
} else {
- io->read_state = io_body_state (io->read_encoding);
+ io->read_state = SOUP_MESSAGE_IO_STATE_BODY;
/* If the client was waiting for a Continue
* but got something else, then it's done
@@ -1022,27 +1022,26 @@ io_read (SoupSocket *sock, SoupMessage *msg)
SOUP_MESSAGE_IO_PREPARE_FOR_CALLBACK;
soup_message_got_informational (msg);
soup_message_cleanup_response (msg);
- SOUP_MESSAGE_IO_RETURN_IF_CANCELLED_OR_PAUSED;
+ SOUP_MESSAGE_IO_RETURN_VAL_IF_CANCELLED_OR_PAUSED (FALSE);
} else {
SOUP_MESSAGE_IO_PREPARE_FOR_CALLBACK;
soup_message_got_headers (msg);
- SOUP_MESSAGE_IO_RETURN_IF_CANCELLED_OR_PAUSED;
+ SOUP_MESSAGE_IO_RETURN_VAL_IF_CANCELLED_OR_PAUSED (FALSE);
}
break;
case SOUP_MESSAGE_IO_STATE_BLOCKING:
- io_write (sock, msg);
+ io_write (io->ostream, msg);
/* As in the io_write case, we *must* return here. */
- return;
+ return FALSE;
case SOUP_MESSAGE_IO_STATE_BODY:
if (!read_body_chunk (msg))
- return;
+ return FALSE;
- got_body:
if (!io_handle_sniffing (msg, TRUE)) {
/* If the message was paused (as opposed to
* cancelled), we need to make sure we wind up
@@ -1055,57 +1054,14 @@ io_read (SoupSocket *sock, SoupMessage *msg)
io->read_encoding = SOUP_ENCODING_CONTENT_LENGTH;
io->read_length = 0;
}
- return;
+ return FALSE;
}
io->read_state = SOUP_MESSAGE_IO_STATE_FINISHING;
SOUP_MESSAGE_IO_PREPARE_FOR_CALLBACK;
soup_message_got_body (msg);
- SOUP_MESSAGE_IO_RETURN_IF_CANCELLED_OR_PAUSED;
- break;
-
-
- case SOUP_MESSAGE_IO_STATE_CHUNK_SIZE:
- if (!read_metadata (msg, FALSE))
- return;
-
- io->read_length = strtoul ((char *)io->read_meta_buf->data, NULL, 16);
- g_byte_array_set_size (io->read_meta_buf, 0);
-
- if (io->read_length > 0)
- io->read_state = SOUP_MESSAGE_IO_STATE_CHUNK;
- else
- io->read_state = SOUP_MESSAGE_IO_STATE_TRAILERS;
- break;
-
-
- case SOUP_MESSAGE_IO_STATE_CHUNK:
- if (!read_body_chunk (msg))
- return;
-
- io->read_state = SOUP_MESSAGE_IO_STATE_CHUNK_END;
- break;
-
-
- case SOUP_MESSAGE_IO_STATE_CHUNK_END:
- if (!read_metadata (msg, FALSE))
- return;
-
- g_byte_array_set_size (io->read_meta_buf, 0);
- io->read_state = SOUP_MESSAGE_IO_STATE_CHUNK_SIZE;
- break;
-
-
- case SOUP_MESSAGE_IO_STATE_TRAILERS:
- if (!read_metadata (msg, FALSE))
- return;
-
- if (io->read_meta_buf->len <= SOUP_MESSAGE_IO_EOL_LEN)
- goto got_body;
-
- /* FIXME: process trailers */
- g_byte_array_set_size (io->read_meta_buf, 0);
+ SOUP_MESSAGE_IO_RETURN_VAL_IF_CANCELLED_OR_PAUSED (FALSE);
break;
@@ -1118,15 +1074,15 @@ io_read (SoupSocket *sock, SoupMessage *msg)
if (io->mode == SOUP_MESSAGE_IO_SERVER) {
io->write_state = SOUP_MESSAGE_IO_STATE_HEADERS;
- io_write (sock, msg);
+ io_write (io->ostream, msg);
} else
soup_message_io_finished (msg);
- return;
+ return FALSE;
case SOUP_MESSAGE_IO_STATE_DONE:
default:
- g_return_if_reached ();
+ g_return_val_if_reached (FALSE);
}
goto read_more;
@@ -1222,7 +1178,7 @@ soup_message_io_client (SoupMessageQueueItem *item,
io->write_body = item->msg->request_body;
io->write_state = SOUP_MESSAGE_IO_STATE_HEADERS;
- io_write (sock, item->msg);
+ io_write (io->ostream, item->msg);
}
void
@@ -1243,7 +1199,7 @@ soup_message_io_server (SoupMessage *msg, SoupSocket *sock,
io->write_body = msg->response_body;
io->read_state = SOUP_MESSAGE_IO_STATE_HEADERS;
- io_read (sock, msg);
+ io_read (io->istream, msg);
}
void
@@ -1285,9 +1241,9 @@ io_unpause_internal (gpointer msg)
return FALSE;
if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->write_state))
- io_write (io->sock, msg);
+ io_write (io->ostream, msg);
else if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->read_state))
- io_read (io->sock, msg);
+ io_read (io->istream, msg);
return FALSE;
}
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]