[libsoup] Add SoupFilterInputStream



commit 6b9cbd9736486821d189aeaed1e8d327aed2b2a7
Author: Dan Winship <danw gnome org>
Date:   Sun Jan 15 10:17:21 2012 -0500

    Add SoupFilterInputStream
    
    SoupFilterInputStream is basically a subset of GDataInputStream, plus
    non-blocking read_line()/read_until().
    
    Wrap the existing socket istream member with a SoupFilterInputStream,
    and use its buffering rather than doing the buffering in SoupSocket.

 configure.ac                       |    5 +-
 libsoup/Makefile.am                |    2 +
 libsoup/soup-filter-input-stream.c |  257 ++++++++++++++++++++++++++++++++++++
 libsoup/soup-filter-input-stream.h |   56 ++++++++
 libsoup/soup-socket.c              |  163 +++++++++--------------
 5 files changed, 379 insertions(+), 104 deletions(-)
---
diff --git a/configure.ac b/configure.ac
index 84d3ec6..f52e6de 100644
--- a/configure.ac
+++ b/configure.ac
@@ -72,12 +72,13 @@ dnl ***********************
 dnl *** Checks for glib ***
 dnl ***********************
 
-GLIB_REQUIRED=2.31.7
+dnl FIXME: should actually be 2.33.0, but glib hasn't bumped yet
+GLIB_REQUIRED=2.32.2
 AM_PATH_GLIB_2_0($GLIB_REQUIRED,,,gobject gio)
 if test "$GLIB_LIBS" = ""; then
    AC_MSG_ERROR(GLIB $GLIB_REQUIRED or later is required to build libsoup)
 fi
-GLIB_CFLAGS="$GLIB_CFLAGS -DG_DISABLE_SINGLE_INCLUDES"
+GLIB_CFLAGS="$GLIB_CFLAGS -DGLIB_VERSION_MIN_REQUIRED=GLIB_VERSION_2_34"
 
 GLIB_MAKEFILE='$(top_srcdir)/Makefile.glib'
 AC_SUBST(GLIB_MAKEFILE)
diff --git a/libsoup/Makefile.am b/libsoup/Makefile.am
index 2b36a86..4526ca5 100644
--- a/libsoup/Makefile.am
+++ b/libsoup/Makefile.am
@@ -109,6 +109,8 @@ libsoup_2_4_la_SOURCES =		\
 	soup-directory-input-stream.c	\
 	soup-enum-types.h		\
 	soup-enum-types.c		\
+	soup-filter-input-stream.c	\
+	soup-filter-input-stream.h	\
 	soup-form.c			\
 	soup-headers.c			\
 	soup-http-input-stream.h	\
diff --git a/libsoup/soup-filter-input-stream.c b/libsoup/soup-filter-input-stream.c
new file mode 100644
index 0000000..52ec6cd
--- /dev/null
+++ b/libsoup/soup-filter-input-stream.c
@@ -0,0 +1,257 @@
+/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
+/*
+ * soup-filter-input-stream.c
+ *
+ * Copyright 2012 Red Hat, Inc.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#include <stdlib.h>
+#include <string.h>
+#include <gio/gio.h>
+
+#include "soup-filter-input-stream.h"
+
+/* This is essentially a subset of GDataInputStream, except that we
+ * can do the equivalent of "fill_nonblocking()" on it. (We could use
+ * an actual GDataInputStream, and implement the nonblocking semantics
+ * via fill_async(), but that would be more work...)
+ */
+
+struct _SoupFilterInputStreamPrivate {
+	GByteArray *buf;
+};
+
+static void soup_filter_input_stream_pollable_init (GPollableInputStreamInterface *pollable_interface, gpointer interface_data);
+
+G_DEFINE_TYPE_WITH_CODE (SoupFilterInputStream, soup_filter_input_stream, G_TYPE_FILTER_INPUT_STREAM,
+			 G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM,
+						soup_filter_input_stream_pollable_init))
+
+static void
+soup_filter_input_stream_init (SoupFilterInputStream *stream)
+{
+	stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream,
+						    SOUP_TYPE_FILTER_INPUT_STREAM,
+						    SoupFilterInputStreamPrivate);
+}
+
+static void
+finalize (GObject *object)
+{
+	SoupFilterInputStream *fstream = SOUP_FILTER_INPUT_STREAM (object);
+
+	if (fstream->priv->buf)
+		g_byte_array_free (fstream->priv->buf, TRUE);
+
+	G_OBJECT_CLASS (soup_filter_input_stream_parent_class)->finalize (object);
+}
+
+static gssize
+read_from_buf (SoupFilterInputStream *fstream, gpointer buffer, gsize count)
+{
+	GByteArray *buf = fstream->priv->buf;
+
+	if (buf->len < count)
+		count = buf->len;
+	memcpy (buffer, buf->data, count);
+
+	if (count == buf->len) {
+		g_byte_array_free (buf, TRUE);
+		fstream->priv->buf = NULL;
+	} else {
+		memmove (buf->data, buf->data + count,
+			 buf->len - count);
+		g_byte_array_set_size (buf, buf->len - count);
+	}
+
+	return count;
+}
+
+static gssize
+soup_filter_input_stream_read_fn (GInputStream  *stream,
+				  void          *buffer,
+				  gsize          count,
+				  GCancellable  *cancellable,
+				  GError       **error)
+{
+	SoupFilterInputStream *fstream = SOUP_FILTER_INPUT_STREAM (stream);
+
+	if (fstream->priv->buf) {
+		return read_from_buf (fstream, buffer, count);
+	} else {
+		return g_pollable_stream_read (G_FILTER_INPUT_STREAM (fstream)->base_stream,
+					       buffer, count,
+					       TRUE, cancellable, error);
+	}
+}
+
+static gboolean
+soup_filter_input_stream_is_readable (GPollableInputStream *stream)
+{
+	SoupFilterInputStream *fstream = SOUP_FILTER_INPUT_STREAM (stream);
+
+	if (fstream->priv->buf)
+		return TRUE;
+	else
+		return g_pollable_input_stream_is_readable (G_POLLABLE_INPUT_STREAM (G_FILTER_INPUT_STREAM (fstream)->base_stream));
+}
+
+static gssize
+soup_filter_input_stream_read_nonblocking (GPollableInputStream  *stream,
+					   void                  *buffer,
+					   gsize                  count,
+					   GError               **error)
+{
+	SoupFilterInputStream *fstream = SOUP_FILTER_INPUT_STREAM (stream);
+
+	if (fstream->priv->buf) {
+		return read_from_buf (fstream, buffer, count);
+	} else {
+		return g_pollable_stream_read (G_FILTER_INPUT_STREAM (fstream)->base_stream,
+					       buffer, count,
+					       FALSE, NULL, error);
+	}
+}
+
+static GSource *
+soup_filter_input_stream_create_source (GPollableInputStream *stream,
+					GCancellable         *cancellable)
+{
+	SoupFilterInputStream *fstream = SOUP_FILTER_INPUT_STREAM (stream);
+	GSource *base_source, *pollable_source;
+
+	if (fstream->priv->buf)
+		base_source = g_timeout_source_new (0);
+	else
+		base_source = g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM (G_FILTER_INPUT_STREAM (fstream)->base_stream), cancellable);
+
+	g_source_set_dummy_callback (base_source);
+	pollable_source = g_pollable_source_new (G_OBJECT (stream));
+	g_source_add_child_source (pollable_source, base_source);
+	g_source_unref (base_source);
+
+	return pollable_source;
+}
+
+static void
+soup_filter_input_stream_class_init (SoupFilterInputStreamClass *stream_class)
+{
+	GObjectClass *object_class = G_OBJECT_CLASS (stream_class);
+	GInputStreamClass *input_stream_class = G_INPUT_STREAM_CLASS (stream_class);
+
+	g_type_class_add_private (stream_class, sizeof (SoupFilterInputStreamPrivate));
+
+	object_class->finalize = finalize;
+
+	input_stream_class->read_fn = soup_filter_input_stream_read_fn;
+}
+
+static void
+soup_filter_input_stream_pollable_init (GPollableInputStreamInterface *pollable_interface,
+					gpointer                       interface_data)
+{
+	pollable_interface->is_readable = soup_filter_input_stream_is_readable;
+	pollable_interface->read_nonblocking = soup_filter_input_stream_read_nonblocking;
+	pollable_interface->create_source = soup_filter_input_stream_create_source;
+}
+
+GInputStream *
+soup_filter_input_stream_new (GInputStream *base_stream)
+{
+	return g_object_new (SOUP_TYPE_FILTER_INPUT_STREAM,
+			     "base-stream", base_stream,
+			     "close-base-stream", FALSE,
+			     NULL);
+}
+
+gssize
+soup_filter_input_stream_read_line (SoupFilterInputStream  *fstream,
+				    void                   *buffer,
+				    gsize                   length,
+				    gboolean                blocking,
+				    gboolean               *got_line,
+				    GCancellable           *cancellable,
+				    GError                **error)
+{
+	return soup_filter_input_stream_read_until (fstream, buffer, length,
+						    "\n", 1, blocking,
+						    got_line,
+						    cancellable, error);
+}
+
+gssize
+soup_filter_input_stream_read_until (SoupFilterInputStream  *fstream,
+				     void                   *buffer,
+				     gsize                   length,
+				     const void             *boundary,
+				     gsize                   boundary_length,
+				     gboolean                blocking,
+				     gboolean               *got_boundary,
+				     GCancellable           *cancellable,
+				     GError                **error)
+{
+	gssize nread;
+	guint8 *p, *buf, *end;
+	gboolean eof = FALSE;
+
+	g_return_val_if_fail (SOUP_IS_FILTER_INPUT_STREAM (fstream), -1);
+	g_return_val_if_fail (boundary_length < length, -1);
+
+	*got_boundary = FALSE;
+
+	if (!fstream->priv->buf || fstream->priv->buf->len < boundary_length) {
+		guint prev_len;
+
+	fill_buffer:
+		if (!fstream->priv->buf)
+			fstream->priv->buf = g_byte_array_new ();
+		prev_len = fstream->priv->buf->len;
+		g_byte_array_set_size (fstream->priv->buf, length);
+		buf = fstream->priv->buf->data;
+
+		nread = g_pollable_stream_read (G_FILTER_INPUT_STREAM (fstream)->base_stream,
+						buf + prev_len, length - prev_len,
+						blocking,
+						cancellable, error);
+		if (nread <= 0) {
+			if (prev_len)
+				fstream->priv->buf->len = prev_len;
+			else {
+				g_byte_array_free (fstream->priv->buf, TRUE);
+				fstream->priv->buf = NULL;
+			}
+
+			if (nread == 0 && prev_len)
+				eof = TRUE;
+			else
+				return nread;
+		} else
+			fstream->priv->buf->len = prev_len + nread;
+	} else
+		buf = fstream->priv->buf->data;
+
+	/* Scan for the boundary */
+	end = buf + fstream->priv->buf->len;
+	if (!eof)
+		end -= boundary_length;
+	for (p = buf; p <= end; p++) {
+		if (!memcmp (p, boundary, boundary_length)) {
+			p += boundary_length;
+			*got_boundary = TRUE;
+			break;
+		}
+	}
+
+	if (!*got_boundary && fstream->priv->buf->len < length && !eof)
+		goto fill_buffer;
+
+	/* Return everything up to 'p' (which is either just after the
+	 * boundary, @boundary_len - 1 bytes before the end of the
+	 * buffer, or end-of-file).
+	 */
+	return read_from_buf (fstream, buffer, p - buf);
+}
diff --git a/libsoup/soup-filter-input-stream.h b/libsoup/soup-filter-input-stream.h
new file mode 100644
index 0000000..b86a476
--- /dev/null
+++ b/libsoup/soup-filter-input-stream.h
@@ -0,0 +1,56 @@
+/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
+/*
+ * Copyright 2012 Red Hat, Inc.
+ */
+
+#ifndef SOUP_FILTER_INPUT_STREAM_H
+#define SOUP_FILTER_INPUT_STREAM_H 1
+
+#include <libsoup/soup-types.h>
+
+G_BEGIN_DECLS
+
+#define SOUP_TYPE_FILTER_INPUT_STREAM            (soup_filter_input_stream_get_type ())
+#define SOUP_FILTER_INPUT_STREAM(obj)            (G_TYPE_CHECK_INSTANCE_CAST ((obj), SOUP_TYPE_FILTER_INPUT_STREAM, SoupFilterInputStream))
+#define SOUP_FILTER_INPUT_STREAM_CLASS(klass)    (G_TYPE_CHECK_CLASS_CAST ((klass), SOUP_TYPE_FILTER_INPUT_STREAM, SoupFilterInputStreamClass))
+#define SOUP_IS_FILTER_INPUT_STREAM(obj)         (G_TYPE_CHECK_INSTANCE_TYPE ((obj), SOUP_TYPE_FILTER_INPUT_STREAM))
+#define SOUP_IS_FILTER_INPUT_STREAM_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE ((obj), SOUP_TYPE_FILTER_INPUT_STREAM))
+#define SOUP_FILTER_INPUT_STREAM_GET_CLASS(obj)  (G_TYPE_INSTANCE_GET_CLASS ((obj), SOUP_TYPE_FILTER_INPUT_STREAM, SoupFilterInputStreamClass))
+
+typedef struct _SoupFilterInputStreamPrivate SoupFilterInputStreamPrivate;
+
+typedef struct {
+	GFilterInputStream parent;
+
+	SoupFilterInputStreamPrivate *priv;
+} SoupFilterInputStream;
+
+typedef struct {
+	GFilterInputStreamClass parent_class;
+
+} SoupFilterInputStreamClass;
+
+GType soup_filter_input_stream_get_type (void);
+
+GInputStream *soup_filter_input_stream_new        (GInputStream           *base_stream);
+
+gssize        soup_filter_input_stream_read_line  (SoupFilterInputStream  *fstream,
+						   void                   *buffer,
+						   gsize                   length,
+						   gboolean                blocking,
+						   gboolean               *got_line,
+						   GCancellable           *cancellable,
+						   GError                **error);
+gssize        soup_filter_input_stream_read_until (SoupFilterInputStream  *fstream,
+						   void                   *buffer,
+						   gsize                   length,
+						   const void             *boundary,
+						   gsize                   boundary_len,
+						   gboolean                blocking,
+						   gboolean               *got_boundary,
+						   GCancellable           *cancellable,
+						   GError                **error);
+
+G_END_DECLS
+
+#endif /* SOUP_FILTER_INPUT_STREAM_H */
diff --git a/libsoup/soup-socket.c b/libsoup/soup-socket.c
index 26c90c4..049be92 100644
--- a/libsoup/soup-socket.c
+++ b/libsoup/soup-socket.c
@@ -16,8 +16,9 @@
 #include <string.h>
 #include <unistd.h>
 
-#include "soup-address.h"
 #include "soup-socket.h"
+#include "soup-address.h"
+#include "soup-filter-input-stream.h"
 #include "soup-marshal.h"
 #include "soup-misc.h"
 #include "soup-misc-private.h"
@@ -70,8 +71,8 @@ typedef struct {
 	SoupAddress *local_addr, *remote_addr;
 	GIOStream *conn;
 	GSocket *gsock;
-	GPollableInputStream *istream;
-	GPollableOutputStream *ostream;
+	GInputStream *istream;
+	GOutputStream *ostream;
 	GTlsCertificateFlags tls_errors;
 
 	guint non_blocking:1;
@@ -86,7 +87,6 @@ typedef struct {
 	GMainContext   *async_context;
 	GSource        *watch_src;
 	GSource        *read_src, *write_src;
-	GByteArray     *read_buf;
 
 	GMutex iolock, addrlock;
 	guint timeout;
@@ -128,10 +128,9 @@ disconnect_internal (SoupSocket *sock, gboolean close)
 	if (priv->conn) {
 		if (G_IS_TLS_CONNECTION (priv->conn))
 			g_signal_handlers_disconnect_by_func (priv->conn, soup_socket_peer_certificate_changed, sock);
-		g_object_unref (priv->conn);
-		priv->conn = NULL;
-		priv->istream = NULL;
-		priv->ostream = NULL;
+		g_clear_object (&priv->conn);
+		g_clear_object (&priv->istream);
+		g_clear_object (&priv->ostream);
 	}
 
 	if (priv->read_src) {
@@ -160,6 +159,9 @@ finalize (GObject *object)
 		disconnect_internal (SOUP_SOCKET (object), TRUE);
 	}
 
+	g_clear_object (&priv->istream);
+	g_clear_object (&priv->ostream);
+
 	if (priv->local_addr)
 		g_object_unref (priv->local_addr);
 	if (priv->remote_addr)
@@ -173,9 +175,6 @@ finalize (GObject *object)
 	if (priv->async_context)
 		g_main_context_unref (priv->async_context);
 
-	if (priv->read_buf)
-		g_byte_array_free (priv->read_buf, TRUE);
-
 	g_mutex_clear (&priv->addrlock);
 	g_mutex_clear (&priv->iolock);
 
@@ -521,9 +520,9 @@ finish_socket_setup (SoupSocketPrivate *priv)
 	if (!priv->conn)
 		priv->conn = (GIOStream *)g_socket_connection_factory_create_connection (priv->gsock);
 	if (!priv->istream)
-		priv->istream = G_POLLABLE_INPUT_STREAM (g_io_stream_get_input_stream (priv->conn));
+		priv->istream = soup_filter_input_stream_new (g_io_stream_get_input_stream (priv->conn));
 	if (!priv->ostream)
-		priv->ostream = G_POLLABLE_OUTPUT_STREAM (g_io_stream_get_output_stream (priv->conn));
+		priv->ostream = g_object_ref (g_io_stream_get_output_stream (priv->conn));
 
 	g_socket_set_timeout (priv->gsock, priv->timeout);
 }
@@ -862,9 +861,9 @@ soup_socket_create_watch (SoupSocketPrivate *priv, GIOCondition cond,
 	GMainContext *async_context;
 
 	if (cond == G_IO_IN)
-		watch = g_pollable_input_stream_create_source (priv->istream, cancellable);
+		watch = g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM (priv->istream), cancellable);
 	else
-		watch = g_pollable_output_stream_create_source (priv->ostream, cancellable);
+		watch = g_pollable_output_stream_create_source (G_POLLABLE_OUTPUT_STREAM (priv->ostream), cancellable);
 	g_source_set_callback (watch, (GSourceFunc)callback, user_data, NULL);
 
 	if (priv->use_thread_context)
@@ -1089,8 +1088,13 @@ soup_socket_start_proxy_ssl (SoupSocket *sock, const char *ssl_host,
 	g_signal_connect (priv->conn, "notify::peer-certificate",
 			  G_CALLBACK (soup_socket_peer_certificate_changed), sock);
 
-	priv->istream = G_POLLABLE_INPUT_STREAM (g_io_stream_get_input_stream (priv->conn));
-	priv->ostream = G_POLLABLE_OUTPUT_STREAM (g_io_stream_get_output_stream (priv->conn));
+	if (priv->istream)
+		g_object_unref (priv->istream);
+	if (priv->ostream)
+		g_object_unref (priv->ostream);
+
+	priv->istream = soup_filter_input_stream_new (g_io_stream_get_input_stream (priv->conn));
+	priv->ostream = g_object_ref (g_io_stream_get_output_stream (priv->conn));
 	return TRUE;
 }
 	
@@ -1334,34 +1338,18 @@ socket_read_watch (GObject *pollable, gpointer user_data)
 }
 
 static SoupSocketIOStatus
-read_from_network (SoupSocket *sock, gpointer buffer, gsize len,
-		   gsize *nread, GCancellable *cancellable, GError **error)
+translate_read_status (SoupSocket *sock, GCancellable *cancellable,
+		       gssize my_nread, gsize *nread,
+		       GError *my_err, GError **error)
 {
 	SoupSocketPrivate *priv = SOUP_SOCKET_GET_PRIVATE (sock);
-	GError *my_err = NULL;
-	gssize my_nread;
-
-	*nread = 0;
-
-	if (!priv->conn)
-		return SOUP_SOCKET_EOF;
-
-	if (!priv->non_blocking) {
-		my_nread = g_input_stream_read (G_INPUT_STREAM (priv->istream),
-						buffer, len,
-						cancellable, &my_err);
-	} else {
-		my_nread = g_pollable_input_stream_read_nonblocking (
-			priv->istream, buffer, len,
-			cancellable, &my_err);
-	}
 
 	if (my_nread > 0) {
-		g_clear_error (&my_err);
+		g_assert_no_error (my_err);
 		*nread = my_nread;
 		return SOUP_SOCKET_OK;
 	} else if (my_nread == 0) {
-		g_clear_error (&my_err);
+		g_assert_no_error (my_err);
 		*nread = my_nread;
 		return SOUP_SOCKET_EOF;
 	} else if (g_error_matches (my_err, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
@@ -1379,27 +1367,6 @@ read_from_network (SoupSocket *sock, gpointer buffer, gsize len,
 	return SOUP_SOCKET_ERROR;
 }
 
-static SoupSocketIOStatus
-read_from_buf (SoupSocket *sock, gpointer buffer, gsize len, gsize *nread)
-{
-	SoupSocketPrivate *priv = SOUP_SOCKET_GET_PRIVATE (sock);
-	GByteArray *read_buf = priv->read_buf;
-
-	*nread = MIN (read_buf->len, len);
-	memcpy (buffer, read_buf->data, *nread);
-
-	if (*nread == read_buf->len) {
-		g_byte_array_free (read_buf, TRUE);
-		priv->read_buf = NULL;
-	} else {
-		memmove (read_buf->data, read_buf->data + *nread, 
-			 read_buf->len - *nread);
-		g_byte_array_set_size (read_buf, read_buf->len - *nread);
-	}
-
-	return SOUP_SOCKET_OK;
-}
-
 /**
  * SoupSocketIOStatus:
  * @SOUP_SOCKET_OK: Success
@@ -1443,6 +1410,8 @@ soup_socket_read (SoupSocket *sock, gpointer buffer, gsize len,
 {
 	SoupSocketPrivate *priv;
 	SoupSocketIOStatus status;
+	gssize my_nread;
+	GError *my_err = NULL;
 
 	g_return_val_if_fail (SOUP_IS_SOCKET (sock), SOUP_SOCKET_ERROR);
 	g_return_val_if_fail (nread != NULL, SOUP_SOCKET_ERROR);
@@ -1450,10 +1419,24 @@ soup_socket_read (SoupSocket *sock, gpointer buffer, gsize len,
 	priv = SOUP_SOCKET_GET_PRIVATE (sock);
 
 	g_mutex_lock (&priv->iolock);
-	if (priv->read_buf)
-		status = read_from_buf (sock, buffer, len, nread);
-	else
-		status = read_from_network (sock, buffer, len, nread, cancellable, error);
+
+	if (!priv->istream) {
+		status = SOUP_SOCKET_EOF;
+		goto out;
+	}
+
+	if (!priv->non_blocking) {
+		my_nread = g_input_stream_read (priv->istream, buffer, len,
+						cancellable, &my_err);
+	} else {
+		my_nread = g_pollable_input_stream_read_nonblocking (G_POLLABLE_INPUT_STREAM (priv->istream),
+								     buffer, len,
+								     cancellable, &my_err);
+	}
+	status = translate_read_status (sock, cancellable,
+					my_nread, nread, my_err, error);
+
+out:
 	g_mutex_unlock (&priv->iolock);
 
 	return status;
@@ -1495,9 +1478,8 @@ soup_socket_read_until (SoupSocket *sock, gpointer buffer, gsize len,
 {
 	SoupSocketPrivate *priv;
 	SoupSocketIOStatus status;
-	GByteArray *read_buf;
-	guint match_len, prev_len;
-	guint8 *p, *end;
+	gssize my_nread;
+	GError *my_err = NULL;
 
 	g_return_val_if_fail (SOUP_IS_SOCKET (sock), SOUP_SOCKET_ERROR);
 	g_return_val_if_fail (nread != NULL, SOUP_SOCKET_ERROR);
@@ -1509,41 +1491,18 @@ soup_socket_read_until (SoupSocket *sock, gpointer buffer, gsize len,
 
 	*got_boundary = FALSE;
 
-	if (!priv->read_buf)
-		priv->read_buf = g_byte_array_new ();
-	read_buf = priv->read_buf;
-
-	if (read_buf->len < boundary_len) {
-		prev_len = read_buf->len;
-		g_byte_array_set_size (read_buf, len);
-		status = read_from_network (sock,
-					    read_buf->data + prev_len,
-					    len - prev_len, nread, cancellable, error);
-		read_buf->len = prev_len + *nread;
-
-		if (status != SOUP_SOCKET_OK) {
-			g_mutex_unlock (&priv->iolock);
-			return status;
-		}
-	}
-
-	/* Scan for the boundary */
-	end = read_buf->data + read_buf->len;
-	for (p = read_buf->data; p <= end - boundary_len; p++) {
-		if (!memcmp (p, boundary, boundary_len)) {
-			p += boundary_len;
-			*got_boundary = TRUE;
-			break;
-		}
+	if (!priv->istream)
+		status = SOUP_SOCKET_EOF;
+	else {
+		my_nread = soup_filter_input_stream_read_until (
+			SOUP_FILTER_INPUT_STREAM (priv->istream),
+			buffer, len, boundary, boundary_len,
+			!priv->non_blocking,
+			got_boundary, cancellable, &my_err);
+		status = translate_read_status (sock, cancellable,
+						my_nread, nread, my_err, error);
 	}
 
-	/* Return everything up to 'p' (which is either just after the
-	 * boundary, or @boundary_len - 1 bytes before the end of the
-	 * buffer).
-	 */
-	match_len = p - read_buf->data;
-	status = read_from_buf (sock, buffer, MIN (len, match_len), nread);
-
 	g_mutex_unlock (&priv->iolock);
 	return status;
 }
@@ -1611,13 +1570,13 @@ soup_socket_write (SoupSocket *sock, gconstpointer buffer,
 	}
 
 	if (!priv->non_blocking) {
-		my_nwrote = g_output_stream_write (G_OUTPUT_STREAM (priv->ostream),
+		my_nwrote = g_output_stream_write (priv->ostream,
 						   buffer, len,
 						   cancellable, &my_err);
 	} else {
 		my_nwrote = g_pollable_output_stream_write_nonblocking (
-			priv->ostream, buffer, len,
-			cancellable, &my_err);
+			G_POLLABLE_OUTPUT_STREAM (priv->ostream),
+			buffer, len, cancellable, &my_err);
 	}
 
 	if (my_nwrote > 0) {



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]