[libsoup/giobased: 8/11] Make SoupInputStream do non-blocking I/O via blocking interfaces when desired



commit 73dae1cbd9c3cf0f808eb81202c4f96c010dc7dd
Author: Dan Winship <danw gnome org>
Date:   Thu Dec 23 15:32:14 2010 -0500

    Make SoupInputStream do non-blocking I/O via blocking interfaces when desired

 libsoup/soup-input-stream.c |   98 +++++++++++++++++++++++++++----------------
 libsoup/soup-input-stream.h |   11 +----
 libsoup/soup-message-io.c   |   21 ++++-----
 3 files changed, 74 insertions(+), 56 deletions(-)
---
diff --git a/libsoup/soup-input-stream.c b/libsoup/soup-input-stream.c
index dab9b9a..0de12e0 100644
--- a/libsoup/soup-input-stream.c
+++ b/libsoup/soup-input-stream.c
@@ -26,6 +26,7 @@ typedef enum {
 
 struct _SoupInputStreamPrivate {
 	GInputStream *base_stream;
+	gboolean      blocking;
 	GByteArray   *buf;
 
 	SoupEncoding  encoding;
@@ -33,6 +34,12 @@ struct _SoupInputStreamPrivate {
 	SoupInputStreamState chunked_state;
 };
 
+enum {
+	PROP_0,
+
+	PROP_BLOCKING
+};
+
 static void soup_input_stream_pollable_init (GPollableInputStreamInterface *pollable_interface, gpointer interface_data);
 
 G_DEFINE_TYPE_WITH_CODE (SoupInputStream, soup_input_stream, G_TYPE_FILTER_INPUT_STREAM,
@@ -57,6 +64,38 @@ constructed (GObject *object)
 }
 
 static void
+set_property (GObject *object, guint prop_id,
+	      const GValue *value, GParamSpec *pspec)
+{
+	SoupInputStream *sstream = SOUP_INPUT_STREAM (object);
+
+	switch (prop_id) {
+	case PROP_BLOCKING:
+		sstream->priv->blocking = g_value_get_boolean (value);
+		break;
+	default:
+		G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+		break;
+	}
+}
+
+static void
+get_property (GObject *object, guint prop_id,
+	      GValue *value, GParamSpec *pspec)
+{
+	SoupInputStream *sstream = SOUP_INPUT_STREAM (object);
+
+	switch (prop_id) {
+	case PROP_BLOCKING:
+		g_value_set_boolean (value, sstream->priv->blocking);
+		break;
+	default:
+		G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+		break;
+	}
+}
+
+static void
 finalize (GObject *object)
 {
 	SoupInputStream *sstream = SOUP_INPUT_STREAM (object);
@@ -92,7 +131,6 @@ static gssize
 soup_input_stream_read_raw (SoupInputStream  *sstream,
 			    void             *buffer,
 			    gsize             count,
-			    gboolean          blocking,
 			    GCancellable     *cancellable,
 			    GError          **error)
 {
@@ -100,7 +138,7 @@ soup_input_stream_read_raw (SoupInputStream  *sstream,
 
 	if (sstream->priv->buf) {
 		return read_from_buf (sstream, buffer, count);
-	} else if (blocking) {
+	} else if (sstream->priv->blocking) {
 		nread = g_input_stream_read (sstream->priv->base_stream,
 					     buffer, count,
 					     cancellable, error);
@@ -121,7 +159,6 @@ static gssize
 soup_input_stream_read_chunked (SoupInputStream  *sstream,
 				void             *buffer,
 				gsize             count,
-				gboolean          blocking,
 				GCancellable     *cancellable,
 				GError          **error)
 {
@@ -133,7 +170,6 @@ again:
 	case SOUP_INPUT_STREAM_STATE_CHUNK_SIZE:
 		nread = soup_input_stream_read_line (sstream,
 						     metabuf, sizeof (metabuf),
-						     blocking,
 						     cancellable, error);
 		if (nread <= 0)
 			return nread;
@@ -150,7 +186,7 @@ again:
 	case SOUP_INPUT_STREAM_STATE_CHUNK:
 		nread = soup_input_stream_read_raw (sstream, buffer,
 						    MIN (count, sstream->priv->read_length),
-						    blocking, cancellable, error);
+						    cancellable, error);
 		if (nread > 0) {
 			sstream->priv->read_length -= nread;
 			if (sstream->priv->read_length == 0)
@@ -161,7 +197,6 @@ again:
 	case SOUP_INPUT_STREAM_STATE_CHUNK_END:
 		nread = soup_input_stream_read_line (sstream,
 						     metabuf, sizeof (metabuf),
-						     blocking,
 						     cancellable, error);
 		if (nread <= 0)
 			return nread;
@@ -173,7 +208,7 @@ again:
 
 	case SOUP_INPUT_STREAM_STATE_TRAILERS:
 		nread = soup_input_stream_read_line (sstream, buffer, count,
-						     blocking, cancellable, error);
+						     cancellable, error);
 		if (nread <= 0)
 			return nread;
 
@@ -201,22 +236,21 @@ soup_input_stream_read_fn (GInputStream  *stream,
 	switch (sstream->priv->encoding) {
 	case SOUP_ENCODING_CHUNKED:
 		return soup_input_stream_read_chunked (sstream, buffer, count,
-						       TRUE, cancellable,
-						       error);
+						       cancellable, error);
 
 	case SOUP_ENCODING_CONTENT_LENGTH:
 		count = MIN (count, sstream->priv->read_length);
 		if (count == 0)
 			return 0;
 		nread = soup_input_stream_read_raw (sstream, buffer, count,
-						    TRUE, cancellable, error);
+						    cancellable, error);
 		if (nread > 0)
 			sstream->priv->read_length -= nread;
 		return nread;
 
 	case SOUP_ENCODING_EOF:
 		return soup_input_stream_read_raw (sstream, buffer, count,
-						   TRUE, cancellable, error);
+						   cancellable, error);
 
 	case SOUP_ENCODING_NONE:
 	default:
@@ -267,21 +301,21 @@ soup_input_stream_read_nonblocking (GPollableInputStream  *stream,
 	switch (sstream->priv->encoding) {
 	case SOUP_ENCODING_CHUNKED:
 		return soup_input_stream_read_chunked (sstream, buffer, count,
-						       FALSE, NULL, error);
+						       NULL, error);
 
 	case SOUP_ENCODING_CONTENT_LENGTH:
 		count = MIN (count, sstream->priv->read_length);
 		if (count == 0)
 			return 0;
 		nread = soup_input_stream_read_raw (sstream, buffer, count,
-						    FALSE, NULL, error);
+						    NULL, error);
 		if (nread > 0)
 			sstream->priv->read_length -= nread;
 		return nread;
 
 	case SOUP_ENCODING_EOF:
 		return soup_input_stream_read_raw (sstream, buffer, count,
-						   FALSE, NULL, error);
+						   NULL, error);
 
 	case SOUP_ENCODING_NONE:
 	default:
@@ -298,9 +332,19 @@ soup_input_stream_class_init (SoupInputStreamClass *stream_class)
 	g_type_class_add_private (stream_class, sizeof (SoupInputStreamPrivate));
 
 	object_class->constructed = constructed;
+	object_class->set_property = set_property;
+	object_class->get_property = get_property;
 	object_class->finalize = finalize;
 
 	input_stream_class->read_fn = soup_input_stream_read_fn;
+
+	g_object_class_install_property (
+		object_class, PROP_BLOCKING,
+		g_param_spec_boolean ("blocking",
+				      "Blocking",
+				      "Whether the stream uses blocking I/O",
+				      TRUE,
+				      G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY));
 }
 
 static void
@@ -313,11 +357,13 @@ soup_input_stream_pollable_init (GPollableInputStreamInterface *pollable_interfa
 }
 
 SoupInputStream *
-soup_input_stream_new (GInputStream *base_stream)
+soup_input_stream_new (GInputStream *base_stream,
+		       gboolean      blocking)
 {
 	return g_object_new (SOUP_TYPE_INPUT_STREAM,
 			     "base-stream", base_stream,
 			     "close-base-stream", FALSE,
+			     "blocking", blocking,
 			     NULL);
 }
 
@@ -333,29 +379,9 @@ soup_input_stream_set_encoding (SoupInputStream *sstream,
 }
 
 gssize
-soup_input_stream_read (SoupInputStream       *sstream,
-			void                  *buffer,
-			gsize                  count,
-			gboolean               blocking,
-			GCancellable          *cancellable,
-			GError               **error)
-{
-	if (blocking) {
-		return g_input_stream_read (G_INPUT_STREAM (sstream),
-					    buffer, count,
-					    cancellable, error);
-	} else {
-		return g_pollable_input_stream_read_nonblocking (
-			G_POLLABLE_INPUT_STREAM (sstream),
-			buffer, count, cancellable, error);
-	}
-}
-
-gssize
 soup_input_stream_read_line (SoupInputStream       *sstream,
 			     void                  *buffer,
 			     gsize                  length,
-			     gboolean               blocking,
 			     GCancellable          *cancellable,
 			     GError               **error)
 {
@@ -372,7 +398,7 @@ soup_input_stream_read_line (SoupInputStream       *sstream,
 		return read_from_buf (sstream, buffer, nread);
 	}
 
-	if (blocking) {
+	if (sstream->priv->blocking) {
 		nread = g_input_stream_read (G_INPUT_STREAM (sstream->priv->base_stream),
 					     buffer, length,
 					     cancellable, error);
diff --git a/libsoup/soup-input-stream.h b/libsoup/soup-input-stream.h
index b70d876..2643d5e 100644
--- a/libsoup/soup-input-stream.h
+++ b/libsoup/soup-input-stream.h
@@ -38,23 +38,16 @@ typedef struct {
 
 GType soup_input_stream_get_type (void);
 
-SoupInputStream *soup_input_stream_new          (GInputStream     *base_stream);
+SoupInputStream *soup_input_stream_new          (GInputStream     *base_stream,
+						 gboolean          blocking);
 
 void             soup_input_stream_set_encoding (SoupInputStream  *sstream,
 						 SoupEncoding      encoding,
 						 goffset           content_length);
 
-gssize           soup_input_stream_read         (SoupInputStream  *sstream,
-						 void             *buffer,
-						 gsize             count,
-						 gboolean          blocking,
-						 GCancellable     *cancellable,
-						 GError          **error);
-
 gssize           soup_input_stream_read_line    (SoupInputStream  *sstream,
 						 void             *buffer,
 						 gsize             length,
-						 gboolean          blocking,
 						 GCancellable     *cancellable,
 						 GError          **error);
 
diff --git a/libsoup/soup-message-io.c b/libsoup/soup-message-io.c
index 444ec2e..bca9052 100644
--- a/libsoup/soup-message-io.c
+++ b/libsoup/soup-message-io.c
@@ -326,7 +326,6 @@ read_metadata (SoupMessage *msg, gboolean to_blank)
 	while (1) {
 		nread = soup_input_stream_read_line (io->istream, read_buf,
 						     sizeof (read_buf),
-						     io->blocking,
 						     io->cancellable, &error);
 
 		if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
@@ -487,11 +486,10 @@ read_body_chunk (SoupMessage *msg)
 						  RESPONSE_BLOCK_SIZE);
 		}
 
-		nread = soup_input_stream_read (io->istream,
-						(guchar *)buffer->data,
-						buffer->length,
-						io->blocking,
-						io->cancellable, &error);
+		nread = g_input_stream_read (G_INPUT_STREAM (io->istream),
+					     (guchar *)buffer->data,
+					     buffer->length,
+					     io->cancellable, &error);
 		if (nread > 0) {
 			buffer->length = nread;
 			io->read_length -= nread;
@@ -1021,17 +1019,18 @@ new_iostate (SoupMessage *msg, SoupSocket *sock, SoupMessageIOMode mode,
 	io->completion_data  = completion_data;
 
 	io->sock = g_object_ref (sock);
-	iostream = soup_socket_get_iostream (sock);
-	if (iostream) {
-		io->istream = soup_input_stream_new (g_io_stream_get_input_stream (iostream));
-		io->ostream = soup_output_stream_new (g_io_stream_get_output_stream (iostream));
-	}
 	g_object_get (io->sock,
 		      SOUP_SOCKET_FLAG_NONBLOCKING, &non_blocking,
 		      SOUP_SOCKET_ASYNC_CONTEXT, &io->async_context,
 		      NULL);
 	io->blocking = !non_blocking;
 
+	iostream = soup_socket_get_iostream (sock);
+	if (iostream) {
+		io->istream = soup_input_stream_new (g_io_stream_get_input_stream (iostream), io->blocking);
+		io->ostream = soup_output_stream_new (g_io_stream_get_output_stream (iostream));
+	}
+
 	io->read_meta_buf    = g_byte_array_new ();
 	io->write_buf        = g_string_new (NULL);
 



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]