[glib] gio: implement GPollableInput/OutputStream in more stream types



commit 82ec4dcaed8107d436f76c45ec30645715b6dbef
Author: Dan Winship <danw gnome org>
Date:   Mon Feb 6 15:08:08 2012 -0500

    gio: implement GPollableInput/OutputStream in more stream types
    
    Implement GPollableInputStream in GMemoryInputStream and
    GConverterInputStream, and likewise implement GPollableOutputStream in
    the corresponding output streams.
    
    https://bugzilla.gnome.org/show_bug.cgi?id=673997

 gio/gconverterinputstream.c  |  130 ++++++++++++++++++++++---
 gio/gconverteroutputstream.c |  119 ++++++++++++++++++++----
 gio/gmemoryinputstream.c     |   41 ++++++++-
 gio/gmemoryoutputstream.c    |   40 ++++++++-
 gio/tests/converter-stream.c |  214 ++++++++++++++++++++++++++++++++++++++++++
 5 files changed, 508 insertions(+), 36 deletions(-)
---
diff --git a/gio/gconverterinputstream.c b/gio/gconverterinputstream.c
index 2fbf94d..1acf9a9 100644
--- a/gio/gconverterinputstream.c
+++ b/gio/gconverterinputstream.c
@@ -25,6 +25,7 @@
 #include <string.h>
 
 #include "gconverterinputstream.h"
+#include "gpollableinputstream.h"
 #include "gsimpleasyncresult.h"
 #include "gcancellable.h"
 #include "gioenumtypes.h"
@@ -41,6 +42,8 @@
  * Converter input stream implements #GInputStream and allows
  * conversion of data of various types during reading.
  *
+ * As of GLib 2.34, #GConverterInputStream implements
+ * #GPollableInputStream.
  **/
 
 #define INITIAL_BUFFER_SIZE 4096
@@ -55,6 +58,7 @@ typedef struct {
 struct _GConverterInputStreamPrivate {
   gboolean at_input_end;
   gboolean finished;
+  gboolean need_input;
   GConverter *converter;
   Buffer input_buffer;
   Buffer converted_buffer;
@@ -80,9 +84,24 @@ static gssize g_converter_input_stream_read         (GInputStream  *stream,
 						     GCancellable  *cancellable,
 						     GError       **error);
 
-G_DEFINE_TYPE (GConverterInputStream,
-	       g_converter_input_stream,
-	       G_TYPE_FILTER_INPUT_STREAM)
+static gboolean g_converter_input_stream_can_poll         (GPollableInputStream *stream);
+static gboolean g_converter_input_stream_is_readable      (GPollableInputStream *stream);
+static gssize   g_converter_input_stream_read_nonblocking (GPollableInputStream  *stream,
+							   void                  *buffer,
+							   gsize                  size,
+							   GError               **error);
+
+static GSource *g_converter_input_stream_create_source    (GPollableInputStream *stream,
+							   GCancellable          *cancellable);
+
+static void g_converter_input_stream_pollable_iface_init  (GPollableInputStreamInterface *iface);
+
+G_DEFINE_TYPE_WITH_CODE (GConverterInputStream,
+			 g_converter_input_stream,
+			 G_TYPE_FILTER_INPUT_STREAM,
+			 G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM,
+						g_converter_input_stream_pollable_iface_init);
+			 )
 
 static void
 g_converter_input_stream_class_init (GConverterInputStreamClass *klass)
@@ -113,6 +132,15 @@ g_converter_input_stream_class_init (GConverterInputStreamClass *klass)
 }
 
 static void
+g_converter_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface)
+{
+  iface->can_poll = g_converter_input_stream_can_poll;
+  iface->is_readable = g_converter_input_stream_is_readable;
+  iface->read_nonblocking = g_converter_input_stream_read_nonblocking;
+  iface->create_source = g_converter_input_stream_create_source;
+}
+
+static void
 g_converter_input_stream_finalize (GObject *object)
 {
   GConverterInputStreamPrivate *priv;
@@ -320,6 +348,7 @@ buffer_ensure_space (Buffer *buffer,
 static gssize
 fill_input_buffer (GConverterInputStream  *stream,
 		   gsize                   at_least_size,
+		   gboolean                blocking,
 		   GCancellable           *cancellable,
 		   GError                **error)
 {
@@ -332,25 +361,30 @@ fill_input_buffer (GConverterInputStream  *stream,
   buffer_ensure_space (&priv->input_buffer, at_least_size);
 
   base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
-  nread = g_input_stream_read (base_stream,
-			       priv->input_buffer.data + priv->input_buffer.end,
-			       buffer_tailspace (&priv->input_buffer),
-			       cancellable,
-			       error);
+  nread = g_pollable_stream_read (base_stream,
+				  priv->input_buffer.data + priv->input_buffer.end,
+				  buffer_tailspace (&priv->input_buffer),
+				  blocking,
+				  cancellable,
+				  error);
 
   if (nread > 0)
-    priv->input_buffer.end += nread;
+    {
+      priv->input_buffer.end += nread;
+      priv->need_input = FALSE;
+    }
 
   return nread;
 }
 
 
 static gssize
-g_converter_input_stream_read (GInputStream *stream,
-			       void         *buffer,
-			       gsize         count,
-			       GCancellable *cancellable,
-			       GError      **error)
+read_internal (GInputStream *stream,
+	       void         *buffer,
+	       gsize         count,
+	       gboolean      blocking,
+	       GCancellable *cancellable,
+	       GError      **error)
 {
   GConverterInputStream *cstream;
   GConverterInputStreamPrivate *priv;
@@ -389,7 +423,7 @@ g_converter_input_stream_read (GInputStream *stream,
       total_bytes_read == 0 &&
       !priv->at_input_end)
     {
-      nread = fill_input_buffer (cstream, count, cancellable, error);
+      nread = fill_input_buffer (cstream, count, blocking, cancellable, error);
       if (nread < 0)
 	return -1;
       if (nread == 0)
@@ -497,6 +531,7 @@ g_converter_input_stream_read (GInputStream *stream,
 	  my_error2 = NULL;
 	  nread = fill_input_buffer (cstream,
 				     buffer_data_size (&priv->input_buffer) + 4096,
+				     blocking,
 				     cancellable,
 				     &my_error2);
 	  if (nread < 0)
@@ -504,6 +539,7 @@ g_converter_input_stream_read (GInputStream *stream,
 	      /* Can't read any more data, return that error */
 	      g_error_free (my_error);
 	      g_propagate_error (error, my_error2);
+	      priv->need_input = TRUE;
 	      return -1;
 	    }
 	  else if (nread == 0)
@@ -536,6 +572,70 @@ g_converter_input_stream_read (GInputStream *stream,
   g_assert_not_reached ();
 }
 
+static gssize
+g_converter_input_stream_read (GInputStream *stream,
+			       void         *buffer,
+			       gsize         count,
+			       GCancellable *cancellable,
+			       GError      **error)
+{
+  return read_internal (stream, buffer, count, TRUE, cancellable, error);
+}
+
+static gboolean
+g_converter_input_stream_can_poll (GPollableInputStream *stream)
+{
+  GInputStream *base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
+
+  return (G_IS_POLLABLE_INPUT_STREAM (base_stream) &&
+	  g_pollable_input_stream_can_poll (G_POLLABLE_INPUT_STREAM (base_stream)));
+}
+
+static gboolean
+g_converter_input_stream_is_readable (GPollableInputStream *stream)
+{
+  GInputStream *base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
+  GConverterInputStream *cstream = G_CONVERTER_INPUT_STREAM (stream);
+
+  if (buffer_data_size (&cstream->priv->converted_buffer))
+    return TRUE;
+  else if (buffer_data_size (&cstream->priv->input_buffer) &&
+	   !cstream->priv->need_input)
+    return TRUE;
+  else
+    return g_pollable_input_stream_is_readable (G_POLLABLE_INPUT_STREAM (base_stream));
+}
+
+static gssize
+g_converter_input_stream_read_nonblocking (GPollableInputStream  *stream,
+					   void                  *buffer,
+					   gsize                  count,
+					   GError               **error)
+{
+  return read_internal (G_INPUT_STREAM (stream), buffer, count,
+			FALSE, NULL, error);
+}
+
+static GSource *
+g_converter_input_stream_create_source (GPollableInputStream *stream,
+					GCancellable         *cancellable)
+{
+  GInputStream *base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
+  GSource *base_source, *pollable_source;
+
+  if (g_pollable_input_stream_is_readable (stream))
+    base_source = g_timeout_source_new (0);
+  else
+    base_source = g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM (base_stream), NULL);
+
+  pollable_source = g_pollable_source_new_full (stream, base_source,
+						cancellable);
+  g_source_unref (base_source);
+
+  return pollable_source;
+}
+
+
 /**
  * g_converter_input_stream_get_converter:
  * @converter_stream: a #GConverterInputStream
diff --git a/gio/gconverteroutputstream.c b/gio/gconverteroutputstream.c
index 5b1cbec..9199010 100644
--- a/gio/gconverteroutputstream.c
+++ b/gio/gconverteroutputstream.c
@@ -25,6 +25,7 @@
 #include <string.h>
 
 #include "gconverteroutputstream.h"
+#include "gpollableoutputstream.h"
 #include "gsimpleasyncresult.h"
 #include "gcancellable.h"
 #include "gioenumtypes.h"
@@ -41,6 +42,8 @@
  * Converter output stream implements #GOutputStream and allows
  * conversion of data of various types during reading.
  *
+ * As of GLib 2.34, #GConverterOutputStream implements
+ * #GPollableOutputStream.
  **/
 
 #define INITIAL_BUFFER_SIZE 4096
@@ -96,9 +99,24 @@ static gboolean g_converter_output_stream_flush      (GOutputStream  *stream,
 						      GCancellable   *cancellable,
 						      GError        **error);
 
-G_DEFINE_TYPE (GConverterOutputStream,
-	       g_converter_output_stream,
-	       G_TYPE_FILTER_OUTPUT_STREAM)
+static gboolean g_converter_output_stream_can_poll          (GPollableOutputStream *stream);
+static gboolean g_converter_output_stream_is_writable       (GPollableOutputStream *stream);
+static gssize   g_converter_output_stream_write_nonblocking (GPollableOutputStream  *stream,
+							     const void             *buffer,
+							     gsize                  size,
+							     GError               **error);
+
+static GSource *g_converter_output_stream_create_source     (GPollableOutputStream *stream,
+							     GCancellable          *cancellable);
+
+static void g_converter_output_stream_pollable_iface_init (GPollableOutputStreamInterface *iface);
+
+G_DEFINE_TYPE_WITH_CODE (GConverterOutputStream,
+			 g_converter_output_stream,
+			 G_TYPE_FILTER_OUTPUT_STREAM,
+			 G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_OUTPUT_STREAM,
+						g_converter_output_stream_pollable_iface_init);
+			 )
 
 static void
 g_converter_output_stream_class_init (GConverterOutputStreamClass *klass)
@@ -130,6 +148,15 @@ g_converter_output_stream_class_init (GConverterOutputStreamClass *klass)
 }
 
 static void
+g_converter_output_stream_pollable_iface_init (GPollableOutputStreamInterface *iface)
+{
+  iface->can_poll = g_converter_output_stream_can_poll;
+  iface->is_writable = g_converter_output_stream_is_writable;
+  iface->write_nonblocking = g_converter_output_stream_write_nonblocking;
+  iface->create_source = g_converter_output_stream_create_source;
+}
+
+static void
 g_converter_output_stream_finalize (GObject *object)
 {
   GConverterOutputStreamPrivate *priv;
@@ -339,7 +366,7 @@ buffer_append (Buffer *buffer,
 
 static gboolean
 flush_buffer (GConverterOutputStream *stream,
-	      Buffer                 *buffer,
+	      gboolean                blocking,
 	      GCancellable           *cancellable,
 	      GError                **error)
 {
@@ -356,12 +383,13 @@ flush_buffer (GConverterOutputStream *stream,
   available = buffer_data_size (&priv->converted_buffer);
   if (available > 0)
     {
-      res = g_output_stream_write_all (base_stream,
-				       buffer_data (&priv->converted_buffer),
-				       available,
-				       &nwritten,
-				       cancellable,
-				       error);
+      res = g_pollable_stream_write_all (base_stream,
+					 buffer_data (&priv->converted_buffer),
+					 available,
+					 blocking,
+					 &nwritten,
+					 cancellable,
+					 error);
       buffer_consumed (&priv->converted_buffer, nwritten);
       return res;
     }
@@ -370,11 +398,12 @@ flush_buffer (GConverterOutputStream *stream,
 
 
 static gssize
-g_converter_output_stream_write (GOutputStream *stream,
-				 const void   *buffer,
-				 gsize         count,
-				 GCancellable *cancellable,
-				 GError      **error)
+write_internal (GOutputStream  *stream,
+		const void     *buffer,
+		gsize           count,
+		gboolean        blocking,
+		GCancellable   *cancellable,
+		GError        **error)
 {
   GConverterOutputStream *cstream;
   GConverterOutputStreamPrivate *priv;
@@ -392,7 +421,7 @@ g_converter_output_stream_write (GOutputStream *stream,
 
   /* Write out all available pre-converted data and fail if
      not possible */
-  if (!flush_buffer (cstream, &priv->converted_buffer, cancellable, error))
+  if (!flush_buffer (cstream, blocking, cancellable, error))
     return -1;
 
   if (priv->finished)
@@ -499,11 +528,21 @@ g_converter_output_stream_write (GOutputStream *stream,
      even if writing this to the base stream fails. If it does we'll just
      stop early and report this error when we try again on the next
      write call. */
-  flush_buffer (cstream, &priv->converted_buffer, cancellable, NULL);
+  flush_buffer (cstream, blocking, cancellable, NULL);
 
   return retval;
 }
 
+static gssize
+g_converter_output_stream_write (GOutputStream  *stream,
+				 const void     *buffer,
+				 gsize           count,
+				 GCancellable   *cancellable,
+				 GError        **error)
+{
+  return write_internal (stream, buffer, count, TRUE, cancellable, error);
+}
+
 static gboolean
 g_converter_output_stream_flush (GOutputStream  *stream,
 				 GCancellable   *cancellable,
@@ -525,7 +564,7 @@ g_converter_output_stream_flush (GOutputStream  *stream,
 
   /* Write out all available pre-converted data and fail if
      not possible */
-  if (!flush_buffer (cstream, &priv->converted_buffer, cancellable, error))
+  if (!flush_buffer (cstream, TRUE, cancellable, error))
     return FALSE;
 
   /* Ensure we have *some* initial target space */
@@ -590,12 +629,54 @@ g_converter_output_stream_flush (GOutputStream  *stream,
     }
 
   /* Now write all converted data to base stream */
-  if (!flush_buffer (cstream, &priv->converted_buffer, cancellable, error))
+  if (!flush_buffer (cstream, TRUE, cancellable, error))
     return FALSE;
 
   return TRUE;
 }
 
+static gboolean
+g_converter_output_stream_can_poll (GPollableOutputStream *stream)
+{
+  GOutputStream *base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream;
+
+  return (G_IS_POLLABLE_OUTPUT_STREAM (base_stream) &&
+	  g_pollable_output_stream_can_poll (G_POLLABLE_OUTPUT_STREAM (base_stream)));
+}
+
+static gboolean
+g_converter_output_stream_is_writable (GPollableOutputStream *stream)
+{
+  GOutputStream *base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream;
+
+  return g_pollable_output_stream_is_writable (G_POLLABLE_OUTPUT_STREAM (base_stream));
+}
+
+static gssize
+g_converter_output_stream_write_nonblocking (GPollableOutputStream  *stream,
+					     const void             *buffer,
+					     gsize                   count,
+					     GError                **error)
+{
+  return write_internal (G_OUTPUT_STREAM (stream), buffer, count, FALSE,
+			 NULL, error);
+}
+
+static GSource *
+g_converter_output_stream_create_source (GPollableOutputStream *stream,
+					 GCancellable          *cancellable)
+{
+  GOutputStream *base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream;
+  GSource *base_source, *pollable_source;
+
+  base_source = g_pollable_output_stream_create_source (G_POLLABLE_OUTPUT_STREAM (base_stream), NULL);
+  pollable_source = g_pollable_source_new_full (stream, base_source,
+						cancellable);
+  g_source_unref (base_source);
+
+  return pollable_source;
+}
+
 /**
  * g_converter_output_stream_get_converter:
  * @converter_stream: a #GConverterOutputStream
diff --git a/gio/gmemoryinputstream.c b/gio/gmemoryinputstream.c
index 1fed058..e657d5b 100644
--- a/gio/gmemoryinputstream.c
+++ b/gio/gmemoryinputstream.c
@@ -22,6 +22,7 @@
 
 #include "config.h"
 #include "gmemoryinputstream.h"
+#include "gpollableinputstream.h"
 #include "ginputstream.h"
 #include "gseekable.h"
 #include "string.h"
@@ -39,6 +40,8 @@
  * #GMemoryInputStream is a class for using arbitrary
  * memory chunks as input for GIO streaming input operations.
  *
+ * As of GLib 2.34, #GMemoryInputStream implements
+ * #GPollableInputStream.
  */
 
 typedef struct _Chunk Chunk;
@@ -108,11 +111,20 @@ static gboolean g_memory_input_stream_truncate            (GSeekable       *seek
                                                            goffset          offset,
                                                            GCancellable    *cancellable,
                                                            GError         **error);
+
+static void     g_memory_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface);
+static gboolean g_memory_input_stream_is_readable         (GPollableInputStream *stream);
+static GSource *g_memory_input_stream_create_source       (GPollableInputStream *stream,
+							   GCancellable          *cancellable);
+
 static void     g_memory_input_stream_finalize            (GObject         *object);
 
 G_DEFINE_TYPE_WITH_CODE (GMemoryInputStream, g_memory_input_stream, G_TYPE_INPUT_STREAM,
                          G_IMPLEMENT_INTERFACE (G_TYPE_SEEKABLE,
-                                                g_memory_input_stream_seekable_iface_init))
+                                                g_memory_input_stream_seekable_iface_init);
+                         G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM,
+                                                g_memory_input_stream_pollable_iface_init);
+			 )
 
 
 static void
@@ -175,6 +187,13 @@ g_memory_input_stream_seekable_iface_init (GSeekableIface *iface)
 }
 
 static void
+g_memory_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface)
+{
+  iface->is_readable   = g_memory_input_stream_is_readable;
+  iface->create_source = g_memory_input_stream_create_source;
+}
+
+static void
 g_memory_input_stream_init (GMemoryInputStream *stream)
 {
   stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream,
@@ -526,3 +545,23 @@ g_memory_input_stream_truncate (GSeekable     *seekable,
                        _("Cannot truncate GMemoryInputStream"));
   return FALSE;
 }
+
+static gboolean
+g_memory_input_stream_is_readable (GPollableInputStream *stream)
+{
+  return TRUE;
+}
+
+static GSource *
+g_memory_input_stream_create_source (GPollableInputStream *stream,
+				     GCancellable         *cancellable)
+{
+  GSource *base_source, *pollable_source;
+
+  base_source = g_timeout_source_new (0);
+  pollable_source = g_pollable_source_new_full (stream, base_source,
+						cancellable);
+  g_source_unref (base_source);
+
+  return pollable_source;
+}
diff --git a/gio/gmemoryoutputstream.c b/gio/gmemoryoutputstream.c
index 08b4fba..b1da60d 100644
--- a/gio/gmemoryoutputstream.c
+++ b/gio/gmemoryoutputstream.c
@@ -25,6 +25,7 @@
 #include "config.h"
 #include "gmemoryoutputstream.h"
 #include "goutputstream.h"
+#include "gpollableoutputstream.h"
 #include "gseekable.h"
 #include "gsimpleasyncresult.h"
 #include "gioerror.h"
@@ -41,6 +42,8 @@
  * #GMemoryOutputStream is a class for using arbitrary
  * memory chunks as output for GIO streaming output operations.
  *
+ * As of GLib 2.34, #GMemoryOutputStream implements
+ * #GPollableOutputStream.
  */
 
 #define MIN_ARRAY_SIZE  16
@@ -119,9 +122,17 @@ static gboolean g_memory_output_stream_truncate            (GSeekable       *see
                                                            GCancellable    *cancellable,
                                                            GError         **error);
 
+static gboolean g_memory_output_stream_is_writable       (GPollableOutputStream *stream);
+static GSource *g_memory_output_stream_create_source     (GPollableOutputStream *stream,
+							  GCancellable          *cancellable);
+
+static void g_memory_output_stream_pollable_iface_init (GPollableOutputStreamInterface *iface);
+
 G_DEFINE_TYPE_WITH_CODE (GMemoryOutputStream, g_memory_output_stream, G_TYPE_OUTPUT_STREAM,
                          G_IMPLEMENT_INTERFACE (G_TYPE_SEEKABLE,
-                                                g_memory_output_stream_seekable_iface_init))
+                                                g_memory_output_stream_seekable_iface_init);
+                         G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_OUTPUT_STREAM,
+                                                g_memory_output_stream_pollable_iface_init))
 
 
 static void
@@ -225,6 +236,13 @@ g_memory_output_stream_class_init (GMemoryOutputStreamClass *klass)
 }
 
 static void
+g_memory_output_stream_pollable_iface_init (GPollableOutputStreamInterface *iface)
+{
+  iface->is_writable = g_memory_output_stream_is_writable;
+  iface->create_source = g_memory_output_stream_create_source;
+}
+
+static void
 g_memory_output_stream_set_property (GObject      *object,
                                      guint         prop_id,
                                      const GValue *value,
@@ -800,3 +818,23 @@ g_memory_output_stream_truncate (GSeekable     *seekable,
 
   return TRUE;
 }
+
+static gboolean
+g_memory_output_stream_is_writable (GPollableOutputStream *stream)
+{
+  return TRUE;
+}
+
+static GSource *
+g_memory_output_stream_create_source (GPollableOutputStream *stream,
+				      GCancellable          *cancellable)
+{
+  GSource *base_source, *pollable_source;
+
+  base_source = g_timeout_source_new (0);
+  pollable_source = g_pollable_source_new_full (stream, base_source,
+						cancellable);
+  g_source_unref (base_source);
+
+  return pollable_source;
+}
diff --git a/gio/tests/converter-stream.c b/gio/tests/converter-stream.c
index 8017015..ae1bdfb 100644
--- a/gio/tests/converter-stream.c
+++ b/gio/tests/converter-stream.c
@@ -724,6 +724,219 @@ test_charset (gconstpointer data)
   g_object_unref (conv);
 }
 
+
+static void
+client_connected (GObject      *source,
+		  GAsyncResult *result,
+		  gpointer      user_data)
+{
+  GSocketClient *client = G_SOCKET_CLIENT (source);
+  GSocketConnection **conn = user_data;
+  GError *error = NULL;
+
+  *conn = g_socket_client_connect_finish (client, result, &error);
+  g_assert_no_error (error);
+}
+
+static void
+server_connected (GObject      *source,
+		  GAsyncResult *result,
+		  gpointer      user_data)
+{
+  GSocketListener *listener = G_SOCKET_LISTENER (source);
+  GSocketConnection **conn = user_data;
+  GError *error = NULL;
+
+  *conn = g_socket_listener_accept_finish (listener, result, NULL, &error);
+  g_assert_no_error (error);
+}
+
+static void
+make_socketpair (GIOStream **left,
+		 GIOStream **right)
+{
+  GInetAddress *iaddr;
+  GSocketAddress *saddr, *effective_address;
+  GSocketListener *listener;
+  GSocketClient *client;
+  GError *error = NULL;
+  GSocketConnection *client_conn = NULL, *server_conn = NULL;
+
+  iaddr = g_inet_address_new_loopback (G_SOCKET_FAMILY_IPV4);
+  saddr = g_inet_socket_address_new (iaddr, 0);
+  g_object_unref (iaddr);
+
+  listener = g_socket_listener_new ();
+  g_socket_listener_add_address (listener, saddr,
+				 G_SOCKET_TYPE_STREAM,
+				 G_SOCKET_PROTOCOL_TCP,
+				 NULL,
+				 &effective_address,
+				 &error);
+  g_assert_no_error (error);
+  g_object_unref (saddr);
+
+  client = g_socket_client_new ();
+
+  g_socket_client_connect_async (client,
+				 G_SOCKET_CONNECTABLE (effective_address),
+				 NULL, client_connected, &client_conn);
+  g_socket_listener_accept_async (listener, NULL,
+				  server_connected, &server_conn);
+
+  while (!client_conn || !server_conn)
+    g_main_context_iteration (NULL, TRUE);
+
+  g_object_unref (client);
+  g_object_unref (listener);
+  g_object_unref (effective_address);
+
+  *left = G_IO_STREAM (client_conn);
+  *right = G_IO_STREAM (server_conn);
+}
+
+static void
+test_converter_pollable (void)
+{
+  GIOStream *left, *right;
+  guint8 *converted, *inptr;
+  guint8 *expanded, *outptr, *expanded_end;
+  gsize n_read, expanded_size;
+  gsize total_read;
+  gssize res;
+  gboolean is_readable;
+  GConverterResult cres;
+  GInputStream *cstream;
+  GPollableInputStream *pollable_in;
+  GOutputStream *socket_out, *mem_out, *cstream_out;
+  GPollableOutputStream *pollable_out;
+  GConverter *expander, *compressor;
+  GError *error;
+  int i;
+
+  expander = g_expander_converter_new ();
+  expanded = g_malloc (100*1000); /* Large enough */
+  cres = g_converter_convert (expander,
+			      unexpanded_data, sizeof(unexpanded_data),
+			      expanded, 100*1000,
+			      G_CONVERTER_INPUT_AT_END,
+			      &n_read, &expanded_size, NULL);
+  g_assert (cres == G_CONVERTER_FINISHED);
+  g_assert (n_read == 11);
+  g_assert (expanded_size == 41030);
+  expanded_end = expanded + expanded_size;
+
+  make_socketpair (&left, &right);
+
+  compressor = g_compressor_converter_new ();
+
+  converted = g_malloc (100*1000); /* Large enough */
+
+  cstream = g_converter_input_stream_new (g_io_stream_get_input_stream (left),
+					  compressor);
+  pollable_in = G_POLLABLE_INPUT_STREAM (cstream);
+  g_assert (g_pollable_input_stream_can_poll (pollable_in));
+
+  socket_out = g_io_stream_get_output_stream (right);
+
+  total_read = 0;
+  outptr = expanded;
+  inptr = converted;
+  while (TRUE)
+    {
+      error = NULL;
+
+      if (outptr < expanded_end)
+	{
+	  res = g_output_stream_write (socket_out,
+				       outptr,
+				       MIN (1000, (expanded_end - outptr)),
+				       NULL, &error);
+	  g_assert_cmpint (res, >, 0);
+	  outptr += res;
+	}
+      else if (socket_out)
+	{
+	  g_object_unref (right);
+	  socket_out = NULL;
+	}
+
+      is_readable = g_pollable_input_stream_is_readable (pollable_in);
+      res = g_pollable_input_stream_read_nonblocking (pollable_in,
+						      inptr, 1,
+						      NULL, &error);
+
+      /* is_readable can be a false positive, but not a false negative */
+      if (!is_readable)
+	g_assert_cmpint (res, ==, -1);
+
+      /* After closing the write end, we can't get WOULD_BLOCK any more */
+      if (!socket_out)
+	g_assert_cmpint (res, !=, -1);
+
+      if (res == -1)
+	{
+	  g_assert_error (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK);
+	  g_error_free (error);
+
+	  continue;
+	}
+
+      if (res == 0)
+	break;
+      inptr += res;
+      total_read += res;
+    }
+
+  g_assert (total_read == n_read - 1); /* Last 2 zeros are combined */
+  g_assert (memcmp (converted, unexpanded_data, total_read)  == 0);
+
+  g_object_unref (cstream);
+  g_object_unref (left);
+
+  g_converter_reset (compressor);
+
+  /* This doesn't actually test the behavior on
+   * G_IO_ERROR_WOULD_BLOCK; to do that we'd need to implement a
+   * custom GOutputStream that we could control blocking on.
+   */
+
+  mem_out = g_memory_output_stream_new (NULL, 0, g_realloc, g_free);
+  cstream_out = g_converter_output_stream_new (mem_out, compressor);
+  g_object_unref (mem_out);
+  pollable_out = G_POLLABLE_OUTPUT_STREAM (cstream_out);
+
+  for (i = 0; i < expanded_size; i++)
+    {
+      error = NULL;
+      res = g_pollable_output_stream_write_nonblocking (pollable_out,
+							expanded + i, 1,
+							NULL, &error);
+      g_assert (res != -1);
+      if (res == 0)
+	{
+	  g_assert (i == expanded_size -1);
+	  break;
+	}
+      g_assert (res == 1);
+    }
+
+  g_output_stream_close (cstream_out, NULL, NULL);
+
+  g_assert (g_memory_output_stream_get_data_size (G_MEMORY_OUTPUT_STREAM (mem_out)) == n_read - 1); /* Last 2 zeros are combined */
+  g_assert (memcmp (g_memory_output_stream_get_data (G_MEMORY_OUTPUT_STREAM (mem_out)),
+		    unexpanded_data,
+		    g_memory_output_stream_get_data_size (G_MEMORY_OUTPUT_STREAM (mem_out)))  == 0);
+
+  g_object_unref (cstream_out);
+
+  g_free (expanded);
+  g_free (converted);
+  g_object_unref (expander);
+  g_object_unref (compressor);
+}
+
+
 int
 main (int   argc,
       char *argv[])
@@ -759,6 +972,7 @@ main (int   argc,
   for (i = 0; i < G_N_ELEMENTS (charset_tests); i++)
     g_test_add_data_func (charset_tests[i].path, &charset_tests[i], test_charset);
 
+  g_test_add_func ("/converter-stream/pollable", test_converter_pollable);
 
   return g_test_run();
 }



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]