[libsoup/giobased: 2/11] Use gio instead of soup_socket_read*



commit e4b5ed759235423ee736790f623c7235be77234a
Author: Dan Winship <danw gnome org>
Date:   Wed Dec 8 18:19:27 2010 +0100

    Use gio instead of soup_socket_read*

 libsoup/Makefile.am         |    2 +
 libsoup/soup-input-stream.c |  228 +++++++++++++++++++++++++++++++++++++++++++
 libsoup/soup-input-stream.h |   55 ++++++++++
 libsoup/soup-message-io.c   |  156 ++++++++++++++++++-----------
 4 files changed, 383 insertions(+), 58 deletions(-)
---
diff --git a/libsoup/Makefile.am b/libsoup/Makefile.am
index a26d820..aec7948 100644
--- a/libsoup/Makefile.am
+++ b/libsoup/Makefile.am
@@ -139,6 +139,8 @@ libsoup_2_4_la_SOURCES =		\
 	soup-headers.c			\
 	soup-http-input-stream.h	\
 	soup-http-input-stream.c	\
+	soup-input-stream.h		\
+	soup-input-stream.c		\
 	soup-logger.c			\
 	soup-message.c			\
 	soup-message-body.c		\
diff --git a/libsoup/soup-input-stream.c b/libsoup/soup-input-stream.c
new file mode 100644
index 0000000..3f334df
--- /dev/null
+++ b/libsoup/soup-input-stream.c
@@ -0,0 +1,228 @@
+/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
+/*
+ * soup-input-stream.c
+ *
+ * Copyright (C) 2010 Red Hat, Inc.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#include <string.h>
+#include <gio/gio.h>
+
+#include "soup-input-stream.h"
+
+struct _SoupInputStreamPrivate {
+	GInputStream *base_stream;
+	GByteArray   *read_buf;
+};
+
+static void soup_input_stream_pollable_init (GPollableInputStreamInterface *pollable_interface, gpointer interface_data);
+
+G_DEFINE_TYPE_WITH_CODE (SoupInputStream, soup_input_stream, G_TYPE_FILTER_INPUT_STREAM,
+			 G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM,
+						soup_input_stream_pollable_init))
+
+static void
+soup_input_stream_init (SoupInputStream *stream)
+{
+	stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream,
+						    SOUP_TYPE_INPUT_STREAM,
+						    SoupInputStreamPrivate);
+}
+
+static void
+constructed (GObject *object)
+{
+	SoupInputStream *sstream = SOUP_INPUT_STREAM (object);
+
+	sstream->priv->base_stream = g_filter_input_stream_get_base_stream (G_FILTER_INPUT_STREAM (sstream));
+}
+
+static void
+finalize (GObject *object)
+{
+	SoupInputStream *sstream = SOUP_INPUT_STREAM (object);
+
+	if (sstream->priv->read_buf)
+		g_byte_array_free (sstream->priv->read_buf, TRUE);
+
+	G_OBJECT_CLASS (soup_input_stream_parent_class)->finalize (object);
+}
+
+static gssize
+read_from_buf (SoupInputStream *sstream, gpointer buffer, gsize count)
+{
+	GByteArray *read_buf = sstream->priv->read_buf;
+
+	if (read_buf->len < count)
+		count = read_buf->len;
+	memcpy (buffer, read_buf->data, count);
+
+	if (count == read_buf->len) {
+		g_byte_array_free (read_buf, TRUE);
+		sstream->priv->read_buf = NULL;
+	} else {
+		memmove (read_buf->data, read_buf->data + count,
+			 read_buf->len - count);
+		g_byte_array_set_size (read_buf, read_buf->len - count);
+	}
+
+	return count;
+}
+
+static gssize
+soup_input_stream_read (GInputStream  *stream,
+			void          *buffer,
+			gsize          count,
+			GCancellable  *cancellable,
+			GError       **error)
+{
+	SoupInputStream *sstream = SOUP_INPUT_STREAM (stream);
+
+	if (sstream->priv->read_buf) {
+		return read_from_buf (sstream, buffer, count);
+	} else {
+		return g_input_stream_read (sstream->priv->base_stream,
+					    buffer, count,
+					    cancellable, error);
+	}
+}
+
+static gboolean
+soup_input_stream_is_readable (GPollableInputStream *stream)
+{
+	SoupInputStream *sstream = SOUP_INPUT_STREAM (stream);
+
+	if (sstream->priv->read_buf)
+		return TRUE;
+	else
+		return g_pollable_input_stream_is_readable (G_POLLABLE_INPUT_STREAM (sstream->priv->base_stream));
+}
+
+static GSource *
+soup_input_stream_create_source (GPollableInputStream *stream,
+				 GCancellable *cancellable)
+{
+	SoupInputStream *sstream = SOUP_INPUT_STREAM (stream);
+	GSource *base_source, *pollable_source;
+
+	if (sstream->priv->read_buf)
+		base_source = g_idle_source_new ();
+	else
+		base_source = g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM (sstream->priv->base_stream), cancellable);
+
+	g_source_set_dummy_callback (base_source);
+	pollable_source = g_pollable_source_new (G_OBJECT (stream));
+	g_source_add_child_source (pollable_source, base_source);
+	g_source_unref (base_source);
+
+	return pollable_source;
+}
+
+static void
+soup_input_stream_class_init (SoupInputStreamClass *stream_class)
+{
+	GObjectClass *object_class = G_OBJECT_CLASS (stream_class);
+	GInputStreamClass *input_stream_class = G_INPUT_STREAM_CLASS (stream_class);
+
+	g_type_class_add_private (stream_class, sizeof (SoupInputStreamPrivate));
+
+	object_class->constructed = constructed;
+	object_class->finalize = finalize;
+
+	input_stream_class->read_fn = soup_input_stream_read;
+}
+
+static void
+soup_input_stream_pollable_init (GPollableInputStreamInterface *pollable_interface,
+				 gpointer interface_data)
+{
+	pollable_interface->is_readable = soup_input_stream_is_readable;
+	pollable_interface->create_source = soup_input_stream_create_source;
+}
+
+GInputStream *
+soup_input_stream_new (GInputStream *base_stream)
+{
+	return g_object_new (SOUP_TYPE_INPUT_STREAM,
+			     "base-stream", base_stream,
+			     "close-base-stream", FALSE,
+			     NULL);
+}
+
+gssize
+soup_input_stream_read_line (SoupInputStream       *sstream,
+			     void                  *buffer,
+			     gsize                  length,
+			     GCancellable          *cancellable,
+			     GError               **error)
+{
+	gssize nread;
+	guint8 *p, *buf = buffer;
+
+	g_return_val_if_fail (SOUP_IS_INPUT_STREAM (sstream), -1);
+
+	if (sstream->priv->read_buf) {
+		GByteArray *read_buf = sstream->priv->read_buf;
+
+		p = memchr (read_buf->data, '\n', read_buf->len);
+		nread = p ? p + 1 - read_buf->data : read_buf->len;
+		return read_from_buf (sstream, buffer, nread);
+	}
+
+	nread = g_input_stream_read (sstream->priv->base_stream,
+				     buffer, length,
+				     cancellable, error);
+	if (nread <= 0)
+		return nread;
+
+	p = memchr (buffer, '\n', nread);
+	if (!p || p == buf + nread - 1)
+		return nread;
+
+	p++;
+	sstream->priv->read_buf = g_byte_array_new ();
+	g_byte_array_append (sstream->priv->read_buf,
+			     p, nread - (p - buf));
+	return p - buf;
+}
+
+gssize
+soup_input_stream_read_line_nonblocking (SoupInputStream       *sstream,
+					 void                  *buffer,
+					 gsize                  length,
+					 GCancellable          *cancellable,
+					 GError               **error)
+{
+	gssize nread;
+	guint8 *p, *buf = buffer;
+
+	g_return_val_if_fail (SOUP_IS_INPUT_STREAM (sstream), -1);
+
+	if (sstream->priv->read_buf) {
+		GByteArray *read_buf = sstream->priv->read_buf;
+
+		p = memchr (read_buf->data, '\n', read_buf->len);
+		nread = p ? p + 1 - read_buf->data : read_buf->len;
+		return read_from_buf (sstream, buffer, nread);
+	}
+
+	nread = g_pollable_input_stream_read_nonblocking (
+		G_POLLABLE_INPUT_STREAM (sstream->priv->base_stream),
+		buffer, length, cancellable, error);
+	if (nread <= 0)
+		return nread;
+
+	p = memchr (buffer, '\n', nread);
+	if (!p || p == buf + nread - 1)
+		return nread;
+
+	p++;
+	sstream->priv->read_buf = g_byte_array_new ();
+	g_byte_array_append (sstream->priv->read_buf,
+			     p, nread - (p - buf));
+	return p - buf;
+}
diff --git a/libsoup/soup-input-stream.h b/libsoup/soup-input-stream.h
new file mode 100644
index 0000000..ac35b07
--- /dev/null
+++ b/libsoup/soup-input-stream.h
@@ -0,0 +1,55 @@
+/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
+/*
+ * Copyright (C) 2010 Red Hat, Inc.
+ */
+
+#ifndef SOUP_INPUT_STREAM_H
+#define SOUP_INPUT_STREAM_H 1
+
+#include <libsoup/soup-types.h>
+
+G_BEGIN_DECLS
+
+#define SOUP_TYPE_INPUT_STREAM            (soup_input_stream_get_type ())
+#define SOUP_INPUT_STREAM(obj)            (G_TYPE_CHECK_INSTANCE_CAST ((obj), SOUP_TYPE_INPUT_STREAM, SoupInputStream))
+#define SOUP_INPUT_STREAM_CLASS(klass)    (G_TYPE_CHECK_CLASS_CAST ((klass), SOUP_TYPE_INPUT_STREAM, SoupInputStreamClass))
+#define SOUP_IS_INPUT_STREAM(obj)         (G_TYPE_CHECK_INSTANCE_TYPE ((obj), SOUP_TYPE_INPUT_STREAM))
+#define SOUP_IS_INPUT_STREAM_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE ((obj), SOUP_TYPE_INPUT_STREAM))
+#define SOUP_INPUT_STREAM_GET_CLASS(obj)  (G_TYPE_INSTANCE_GET_CLASS ((obj), SOUP_TYPE_INPUT_STREAM, SoupInputStreamClass))
+
+typedef struct _SoupInputStreamPrivate SoupInputStreamPrivate;
+
+typedef struct {
+	GFilterInputStream parent;
+
+	SoupInputStreamPrivate *priv;
+} SoupInputStream;
+
+typedef struct {
+	GFilterInputStreamClass parent_class;
+
+	/* Padding for future expansion */
+	void (*_libsoup_reserved1) (void);
+	void (*_libsoup_reserved2) (void);
+	void (*_libsoup_reserved3) (void);
+	void (*_libsoup_reserved4) (void);
+} SoupInputStreamClass;
+
+GType soup_input_stream_get_type (void);
+
+GInputStream *soup_input_stream_new                   (GInputStream          *base_stream);
+
+gssize        soup_input_stream_read_line             (SoupInputStream       *sstream,
+						       void                  *buffer,
+						       gsize                  length,
+						       GCancellable          *cancellable,
+						       GError               **error);
+gssize        soup_input_stream_read_line_nonblocking (SoupInputStream       *sstream,
+						       void                  *buffer,
+						       gsize                  length,
+						       GCancellable          *cancellable,
+						       GError               **error);
+
+G_END_DECLS
+
+#endif /* SOUP_INPUT_STREAM_H */
diff --git a/libsoup/soup-message-io.c b/libsoup/soup-message-io.c
index b5d9840..40f8d54 100644
--- a/libsoup/soup-message-io.c
+++ b/libsoup/soup-message-io.c
@@ -13,6 +13,7 @@
 #include <string.h>
 
 #include "soup-connection.h"
+#include "soup-input-stream.h"
 #include "soup-message.h"
 #include "soup-message-private.h"
 #include "soup-message-queue.h"
@@ -73,8 +74,8 @@ typedef struct {
 	goffset               write_length;
 	goffset               written;
 
-	guint read_tag, tls_signal_id;
-	GSource *write_source;
+	guint tls_signal_id;
+	GSource *read_source, *write_source;
 	GSource *unpause_source;
 	gboolean paused;
 
@@ -113,6 +114,8 @@ soup_message_io_cleanup (SoupMessage *msg)
 		g_signal_handler_disconnect (io->sock, io->tls_signal_id);
 	if (io->sock)
 		g_object_unref (io->sock);
+	if (io->istream)
+		g_object_unref (io->istream);
 	if (io->async_context)
 		g_main_context_unref (io->async_context);
 	if (io->item)
@@ -139,9 +142,9 @@ soup_message_io_stop (SoupMessage *msg)
 	if (!io)
 		return;
 
-	if (io->read_tag) {
-		g_signal_handler_disconnect (io->sock, io->read_tag);
-		io->read_tag = 0;
+	if (io->read_source) {
+		g_source_destroy (io->read_source);
+		io->read_source = NULL;
 	}
 	if (io->write_source) {
 		g_source_destroy (io->write_source);
@@ -215,6 +218,19 @@ io_error (SoupSocket *sock, SoupMessage *msg, GError *error)
 }
 
 static gboolean
+io_readable (GPollableInputStream *stream, gpointer msg)
+{
+	SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
+	SoupMessageIOData *io = priv->io_data;
+
+	g_source_destroy (io->read_source);
+	io->read_source = NULL;
+
+	io_read (io->sock, msg);
+	return FALSE;
+}
+
+static gboolean
 io_handle_sniffing (SoupMessage *msg, gboolean done_reading)
 {
 	SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
@@ -285,23 +301,44 @@ read_metadata (SoupMessage *msg, gboolean to_blank)
 {
 	SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
 	SoupMessageIOData *io = priv->io_data;
-	SoupSocketIOStatus status;
 	guchar read_buf[RESPONSE_BLOCK_SIZE];
-	gsize nread;
+	gssize nread;
 	gboolean got_lf;
 	GError *error = NULL;
 
+	if (!io->istream) {
+		io_error (io->sock, msg, NULL);
+		return FALSE;
+	}
+
 	while (1) {
-		status = soup_socket_read_until (io->sock, read_buf,
-						 sizeof (read_buf),
-						 "\n", 1, &nread, &got_lf,
-						 io->cancellable, &error);
-		switch (status) {
-		case SOUP_SOCKET_OK:
-			g_byte_array_append (io->read_meta_buf, read_buf, nread);
-			break;
+		if (io->non_blocking) {
+			nread = soup_input_stream_read_line_nonblocking (
+				SOUP_INPUT_STREAM (io->istream),
+				read_buf, sizeof (read_buf),
+				io->cancellable, &error);
+		} else {
+			nread = soup_input_stream_read_line (
+				SOUP_INPUT_STREAM (io->istream),
+				read_buf, sizeof (read_buf),
+				io->cancellable, &error);
+		}
 
-		case SOUP_SOCKET_EOF:
+		if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
+			g_error_free (error);
+			io->read_source = g_pollable_input_stream_create_source (
+				G_POLLABLE_INPUT_STREAM (io->istream),
+				NULL);
+			g_source_set_callback (io->read_source, (GSourceFunc) io_readable, msg, NULL);
+			g_source_attach (io->read_source, io->async_context);
+			g_source_unref (io->read_source);
+			return FALSE;
+		}
+
+		if (nread > 0) {
+			g_byte_array_append (io->read_meta_buf, read_buf, nread);
+			got_lf = memchr (read_buf, '\n', nread) != NULL;
+		} else if (nread == 0) {
 			/* More lame server handling... deal with
 			 * servers that don't send the final chunk.
 			 */
@@ -310,22 +347,18 @@ read_metadata (SoupMessage *msg, gboolean to_blank)
 				g_byte_array_append (io->read_meta_buf,
 						     (guchar *)"0\r\n", 3);
 				got_lf = TRUE;
-				break;
 			} else if (io->read_state == SOUP_MESSAGE_IO_STATE_TRAILERS &&
 				   io->read_meta_buf->len == 0) {
 				g_byte_array_append (io->read_meta_buf,
 						     (guchar *)"\r\n", 2);
 				got_lf = TRUE;
-				break;
+			} else {
+				io_error (io->sock, msg, NULL);
+				return FALSE;
 			}
-			/* else fall through */
-
-		case SOUP_SOCKET_ERROR:
+		} else {
 			io_error (io->sock, msg, error);
 			return FALSE;
-
-		case SOUP_SOCKET_WOULD_BLOCK:
-			return FALSE;
 		}
 
 		if (got_lf) {
@@ -439,14 +472,18 @@ read_body_chunk (SoupMessage *msg)
 {
 	SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
 	SoupMessageIOData *io = priv->io_data;
-	SoupSocketIOStatus status;
 	guchar *stack_buf = NULL;
 	gsize len;
 	gboolean read_to_eof = (io->read_encoding == SOUP_ENCODING_EOF);
-	gsize nread;
+	gssize nread;
 	GError *error = NULL;
 	SoupBuffer *buffer;
 
+	if (!io->istream) {
+		io_error (io->sock, msg, NULL);
+		return FALSE;
+	}
+
 	if (!io_handle_sniffing (msg, FALSE))
 		return FALSE;
 
@@ -470,11 +507,18 @@ read_body_chunk (SoupMessage *msg)
 		else
 			len = MIN (buffer->length, io->read_length);
 
-		status = soup_socket_read (io->sock,
-					   (guchar *)buffer->data, len,
-					   &nread, io->cancellable, &error);
+		if (io->non_blocking) {
+			nread = g_pollable_input_stream_read_nonblocking (
+				G_POLLABLE_INPUT_STREAM (io->istream),
+				(guchar *)buffer->data, len,
+				io->cancellable, &error);
+		} else {
+			nread = g_input_stream_read (io->istream,
+						     (guchar *)buffer->data, len,
+						     io->cancellable, &error);
+		}
 
-		if (status == SOUP_SOCKET_OK && nread) {
+		if (nread > 0) {
 			buffer->length = nread;
 			io->read_length -= nread;
 
@@ -501,24 +545,28 @@ read_body_chunk (SoupMessage *msg)
 		}
 
 		soup_buffer_free (buffer);
-		switch (status) {
-		case SOUP_SOCKET_OK:
-			break;
 
-		case SOUP_SOCKET_EOF:
+		if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
+			g_error_free (error);
+			io->read_source = g_pollable_input_stream_create_source (
+				G_POLLABLE_INPUT_STREAM (io->istream),
+				NULL);
+			g_source_set_callback (io->read_source, (GSourceFunc) io_readable, msg, NULL);
+			g_source_attach (io->read_source, io->async_context);
+			g_source_unref (io->read_source);
+			return FALSE;
+		}
+
+		if (nread == 0) {
 			if (io->read_eof_ok) {
 				io->read_length = 0;
 				return TRUE;
 			}
-			/* else fall through */
-
-		case SOUP_SOCKET_ERROR:
-			io_error (io->sock, msg, error);
-			return FALSE;
-
-		case SOUP_SOCKET_WOULD_BLOCK:
-			return FALSE;
+			/* else... */
 		}
+
+		io_error (io->sock, msg, error);
+		return FALSE;
 	}
 
 	return TRUE;
@@ -1062,9 +1110,9 @@ io_read (SoupSocket *sock, SoupMessage *msg)
 
 
 	case SOUP_MESSAGE_IO_STATE_FINISHING:
-		if (io->read_tag) {
-			g_signal_handler_disconnect (io->sock, io->read_tag);
-			io->read_tag = 0;
+		if (io->read_source) {
+			g_source_destroy (io->read_source);
+			io->read_source = NULL;
 		}
 		io->read_state = SOUP_MESSAGE_IO_STATE_DONE;
 
@@ -1126,7 +1174,7 @@ new_iostate (SoupMessage *msg, SoupSocket *sock, SoupMessageIOMode mode,
 	io->sock = g_object_ref (sock);
 	iostream = soup_socket_get_iostream (sock);
 	if (iostream) {
-		io->istream = g_io_stream_get_input_stream (iostream);
+		io->istream = soup_input_stream_new (g_io_stream_get_input_stream (iostream));
 		io->ostream = g_io_stream_get_output_stream (iostream);
 	}
 	g_object_get (io->sock,
@@ -1137,9 +1185,6 @@ new_iostate (SoupMessage *msg, SoupSocket *sock, SoupMessageIOMode mode,
 	io->read_meta_buf    = g_byte_array_new ();
 	io->write_buf        = g_string_new (NULL);
 
-	io->read_tag  = g_signal_connect (io->sock, "readable",
-					  G_CALLBACK (io_read), msg);
-
 	io->read_state  = SOUP_MESSAGE_IO_STATE_NOT_STARTED;
 	io->write_state = SOUP_MESSAGE_IO_STATE_NOT_STARTED;
 
@@ -1213,9 +1258,9 @@ soup_message_io_pause (SoupMessage *msg)
 		g_source_destroy (io->write_source);
 		io->write_source = NULL;
 	}
-	if (io->read_tag) {
-		g_signal_handler_disconnect (io->sock, io->read_tag);
-		io->read_tag = 0;
+	if (io->read_source) {
+		g_source_destroy (io->read_source);
+		io->read_source = NULL;
 	}
 
 	if (io->unpause_source) {
@@ -1236,14 +1281,9 @@ io_unpause_internal (gpointer msg)
 	io->unpause_source = NULL;
 	io->paused = FALSE;
 
-	if (io->write_source || io->read_tag)
+	if (io->write_source || io->read_source)
 		return FALSE;
 
-	if (io->read_state != SOUP_MESSAGE_IO_STATE_DONE) {
-		io->read_tag = g_signal_connect (io->sock, "readable",
-						 G_CALLBACK (io_read), msg);
-	}
-
 	if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->write_state))
 		io_write (io->sock, msg);
 	else if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->read_state))



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]