[glib] gio: implement GPollableInput/OutputStream in more stream types
- From: Dan Winship <danw src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [glib] gio: implement GPollableInput/OutputStream in more stream types
- Date: Tue, 17 Apr 2012 16:33:41 +0000 (UTC)
commit 82ec4dcaed8107d436f76c45ec30645715b6dbef
Author: Dan Winship <danw gnome org>
Date: Mon Feb 6 15:08:08 2012 -0500
gio: implement GPollableInput/OutputStream in more stream types
Implement GPollableInputStream in GMemoryInputStream and
GConverterInputStream, and likewise implement GPollableOutputStream in
the corresponding output streams.
https://bugzilla.gnome.org/show_bug.cgi?id=673997
gio/gconverterinputstream.c | 130 ++++++++++++++++++++++---
gio/gconverteroutputstream.c | 119 ++++++++++++++++++++----
gio/gmemoryinputstream.c | 41 ++++++++-
gio/gmemoryoutputstream.c | 40 ++++++++-
gio/tests/converter-stream.c | 214 ++++++++++++++++++++++++++++++++++++++++++
5 files changed, 508 insertions(+), 36 deletions(-)
---
diff --git a/gio/gconverterinputstream.c b/gio/gconverterinputstream.c
index 2fbf94d..1acf9a9 100644
--- a/gio/gconverterinputstream.c
+++ b/gio/gconverterinputstream.c
@@ -25,6 +25,7 @@
#include <string.h>
#include "gconverterinputstream.h"
+#include "gpollableinputstream.h"
#include "gsimpleasyncresult.h"
#include "gcancellable.h"
#include "gioenumtypes.h"
@@ -41,6 +42,8 @@
* Converter input stream implements #GInputStream and allows
* conversion of data of various types during reading.
*
+ * As of GLib 2.34, #GConverterInputStream implements
+ * #GPollableInputStream.
**/
#define INITIAL_BUFFER_SIZE 4096
@@ -55,6 +58,7 @@ typedef struct {
struct _GConverterInputStreamPrivate {
gboolean at_input_end;
gboolean finished;
+ gboolean need_input;
GConverter *converter;
Buffer input_buffer;
Buffer converted_buffer;
@@ -80,9 +84,24 @@ static gssize g_converter_input_stream_read (GInputStream *stream,
GCancellable *cancellable,
GError **error);
-G_DEFINE_TYPE (GConverterInputStream,
- g_converter_input_stream,
- G_TYPE_FILTER_INPUT_STREAM)
+static gboolean g_converter_input_stream_can_poll (GPollableInputStream *stream);
+static gboolean g_converter_input_stream_is_readable (GPollableInputStream *stream);
+static gssize g_converter_input_stream_read_nonblocking (GPollableInputStream *stream,
+ void *buffer,
+ gsize size,
+ GError **error);
+
+static GSource *g_converter_input_stream_create_source (GPollableInputStream *stream,
+ GCancellable *cancellable);
+
+static void g_converter_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface);
+
+G_DEFINE_TYPE_WITH_CODE (GConverterInputStream,
+ g_converter_input_stream,
+ G_TYPE_FILTER_INPUT_STREAM,
+ G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM,
+ g_converter_input_stream_pollable_iface_init);
+ )
static void
g_converter_input_stream_class_init (GConverterInputStreamClass *klass)
@@ -113,6 +132,15 @@ g_converter_input_stream_class_init (GConverterInputStreamClass *klass)
}
static void
+g_converter_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface)
+{
+ iface->can_poll = g_converter_input_stream_can_poll;
+ iface->is_readable = g_converter_input_stream_is_readable;
+ iface->read_nonblocking = g_converter_input_stream_read_nonblocking;
+ iface->create_source = g_converter_input_stream_create_source;
+}
+
+static void
g_converter_input_stream_finalize (GObject *object)
{
GConverterInputStreamPrivate *priv;
@@ -320,6 +348,7 @@ buffer_ensure_space (Buffer *buffer,
static gssize
fill_input_buffer (GConverterInputStream *stream,
gsize at_least_size,
+ gboolean blocking,
GCancellable *cancellable,
GError **error)
{
@@ -332,25 +361,30 @@ fill_input_buffer (GConverterInputStream *stream,
buffer_ensure_space (&priv->input_buffer, at_least_size);
base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
- nread = g_input_stream_read (base_stream,
- priv->input_buffer.data + priv->input_buffer.end,
- buffer_tailspace (&priv->input_buffer),
- cancellable,
- error);
+ nread = g_pollable_stream_read (base_stream,
+ priv->input_buffer.data + priv->input_buffer.end,
+ buffer_tailspace (&priv->input_buffer),
+ blocking,
+ cancellable,
+ error);
if (nread > 0)
- priv->input_buffer.end += nread;
+ {
+ priv->input_buffer.end += nread;
+ priv->need_input = FALSE;
+ }
return nread;
}
static gssize
-g_converter_input_stream_read (GInputStream *stream,
- void *buffer,
- gsize count,
- GCancellable *cancellable,
- GError **error)
+read_internal (GInputStream *stream,
+ void *buffer,
+ gsize count,
+ gboolean blocking,
+ GCancellable *cancellable,
+ GError **error)
{
GConverterInputStream *cstream;
GConverterInputStreamPrivate *priv;
@@ -389,7 +423,7 @@ g_converter_input_stream_read (GInputStream *stream,
total_bytes_read == 0 &&
!priv->at_input_end)
{
- nread = fill_input_buffer (cstream, count, cancellable, error);
+ nread = fill_input_buffer (cstream, count, blocking, cancellable, error);
if (nread < 0)
return -1;
if (nread == 0)
@@ -497,6 +531,7 @@ g_converter_input_stream_read (GInputStream *stream,
my_error2 = NULL;
nread = fill_input_buffer (cstream,
buffer_data_size (&priv->input_buffer) + 4096,
+ blocking,
cancellable,
&my_error2);
if (nread < 0)
@@ -504,6 +539,7 @@ g_converter_input_stream_read (GInputStream *stream,
/* Can't read any more data, return that error */
g_error_free (my_error);
g_propagate_error (error, my_error2);
+ priv->need_input = TRUE;
return -1;
}
else if (nread == 0)
@@ -536,6 +572,70 @@ g_converter_input_stream_read (GInputStream *stream,
g_assert_not_reached ();
}
+static gssize
+g_converter_input_stream_read (GInputStream *stream,
+ void *buffer,
+ gsize count,
+ GCancellable *cancellable,
+ GError **error)
+{
+ return read_internal (stream, buffer, count, TRUE, cancellable, error);
+}
+
+static gboolean
+g_converter_input_stream_can_poll (GPollableInputStream *stream)
+{
+ GInputStream *base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
+
+ return (G_IS_POLLABLE_INPUT_STREAM (base_stream) &&
+ g_pollable_input_stream_can_poll (G_POLLABLE_INPUT_STREAM (base_stream)));
+}
+
+static gboolean
+g_converter_input_stream_is_readable (GPollableInputStream *stream)
+{
+ GInputStream *base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
+ GConverterInputStream *cstream = G_CONVERTER_INPUT_STREAM (stream);
+
+ if (buffer_data_size (&cstream->priv->converted_buffer))
+ return TRUE;
+ else if (buffer_data_size (&cstream->priv->input_buffer) &&
+ !cstream->priv->need_input)
+ return TRUE;
+ else
+ return g_pollable_input_stream_is_readable (G_POLLABLE_INPUT_STREAM (base_stream));
+}
+
+static gssize
+g_converter_input_stream_read_nonblocking (GPollableInputStream *stream,
+ void *buffer,
+ gsize count,
+ GError **error)
+{
+ return read_internal (G_INPUT_STREAM (stream), buffer, count,
+ FALSE, NULL, error);
+}
+
+static GSource *
+g_converter_input_stream_create_source (GPollableInputStream *stream,
+ GCancellable *cancellable)
+{
+ GInputStream *base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
+ GSource *base_source, *pollable_source;
+
+ if (g_pollable_input_stream_is_readable (stream))
+ base_source = g_timeout_source_new (0);
+ else
+ base_source = g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM (base_stream), NULL);
+
+ pollable_source = g_pollable_source_new_full (stream, base_source,
+ cancellable);
+ g_source_unref (base_source);
+
+ return pollable_source;
+}
+
+
/**
* g_converter_input_stream_get_converter:
* @converter_stream: a #GConverterInputStream
diff --git a/gio/gconverteroutputstream.c b/gio/gconverteroutputstream.c
index 5b1cbec..9199010 100644
--- a/gio/gconverteroutputstream.c
+++ b/gio/gconverteroutputstream.c
@@ -25,6 +25,7 @@
#include <string.h>
#include "gconverteroutputstream.h"
+#include "gpollableoutputstream.h"
#include "gsimpleasyncresult.h"
#include "gcancellable.h"
#include "gioenumtypes.h"
@@ -41,6 +42,8 @@
* Converter output stream implements #GOutputStream and allows
* conversion of data of various types during reading.
*
+ * As of GLib 2.34, #GConverterOutputStream implements
+ * #GPollableOutputStream.
**/
#define INITIAL_BUFFER_SIZE 4096
@@ -96,9 +99,24 @@ static gboolean g_converter_output_stream_flush (GOutputStream *stream,
GCancellable *cancellable,
GError **error);
-G_DEFINE_TYPE (GConverterOutputStream,
- g_converter_output_stream,
- G_TYPE_FILTER_OUTPUT_STREAM)
+static gboolean g_converter_output_stream_can_poll (GPollableOutputStream *stream);
+static gboolean g_converter_output_stream_is_writable (GPollableOutputStream *stream);
+static gssize g_converter_output_stream_write_nonblocking (GPollableOutputStream *stream,
+ const void *buffer,
+ gsize size,
+ GError **error);
+
+static GSource *g_converter_output_stream_create_source (GPollableOutputStream *stream,
+ GCancellable *cancellable);
+
+static void g_converter_output_stream_pollable_iface_init (GPollableOutputStreamInterface *iface);
+
+G_DEFINE_TYPE_WITH_CODE (GConverterOutputStream,
+ g_converter_output_stream,
+ G_TYPE_FILTER_OUTPUT_STREAM,
+ G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_OUTPUT_STREAM,
+ g_converter_output_stream_pollable_iface_init);
+ )
static void
g_converter_output_stream_class_init (GConverterOutputStreamClass *klass)
@@ -130,6 +148,15 @@ g_converter_output_stream_class_init (GConverterOutputStreamClass *klass)
}
static void
+g_converter_output_stream_pollable_iface_init (GPollableOutputStreamInterface *iface)
+{
+ iface->can_poll = g_converter_output_stream_can_poll;
+ iface->is_writable = g_converter_output_stream_is_writable;
+ iface->write_nonblocking = g_converter_output_stream_write_nonblocking;
+ iface->create_source = g_converter_output_stream_create_source;
+}
+
+static void
g_converter_output_stream_finalize (GObject *object)
{
GConverterOutputStreamPrivate *priv;
@@ -339,7 +366,7 @@ buffer_append (Buffer *buffer,
static gboolean
flush_buffer (GConverterOutputStream *stream,
- Buffer *buffer,
+ gboolean blocking,
GCancellable *cancellable,
GError **error)
{
@@ -356,12 +383,13 @@ flush_buffer (GConverterOutputStream *stream,
available = buffer_data_size (&priv->converted_buffer);
if (available > 0)
{
- res = g_output_stream_write_all (base_stream,
- buffer_data (&priv->converted_buffer),
- available,
- &nwritten,
- cancellable,
- error);
+ res = g_pollable_stream_write_all (base_stream,
+ buffer_data (&priv->converted_buffer),
+ available,
+ blocking,
+ &nwritten,
+ cancellable,
+ error);
buffer_consumed (&priv->converted_buffer, nwritten);
return res;
}
@@ -370,11 +398,12 @@ flush_buffer (GConverterOutputStream *stream,
static gssize
-g_converter_output_stream_write (GOutputStream *stream,
- const void *buffer,
- gsize count,
- GCancellable *cancellable,
- GError **error)
+write_internal (GOutputStream *stream,
+ const void *buffer,
+ gsize count,
+ gboolean blocking,
+ GCancellable *cancellable,
+ GError **error)
{
GConverterOutputStream *cstream;
GConverterOutputStreamPrivate *priv;
@@ -392,7 +421,7 @@ g_converter_output_stream_write (GOutputStream *stream,
/* Write out all available pre-converted data and fail if
not possible */
- if (!flush_buffer (cstream, &priv->converted_buffer, cancellable, error))
+ if (!flush_buffer (cstream, blocking, cancellable, error))
return -1;
if (priv->finished)
@@ -499,11 +528,21 @@ g_converter_output_stream_write (GOutputStream *stream,
even if writing this to the base stream fails. If it does we'll just
stop early and report this error when we try again on the next
write call. */
- flush_buffer (cstream, &priv->converted_buffer, cancellable, NULL);
+ flush_buffer (cstream, blocking, cancellable, NULL);
return retval;
}
+static gssize
+g_converter_output_stream_write (GOutputStream *stream,
+ const void *buffer,
+ gsize count,
+ GCancellable *cancellable,
+ GError **error)
+{
+ return write_internal (stream, buffer, count, TRUE, cancellable, error);
+}
+
static gboolean
g_converter_output_stream_flush (GOutputStream *stream,
GCancellable *cancellable,
@@ -525,7 +564,7 @@ g_converter_output_stream_flush (GOutputStream *stream,
/* Write out all available pre-converted data and fail if
not possible */
- if (!flush_buffer (cstream, &priv->converted_buffer, cancellable, error))
+ if (!flush_buffer (cstream, TRUE, cancellable, error))
return FALSE;
/* Ensure we have *some* initial target space */
@@ -590,12 +629,54 @@ g_converter_output_stream_flush (GOutputStream *stream,
}
/* Now write all converted data to base stream */
- if (!flush_buffer (cstream, &priv->converted_buffer, cancellable, error))
+ if (!flush_buffer (cstream, TRUE, cancellable, error))
return FALSE;
return TRUE;
}
+static gboolean
+g_converter_output_stream_can_poll (GPollableOutputStream *stream)
+{
+ GOutputStream *base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream;
+
+ return (G_IS_POLLABLE_OUTPUT_STREAM (base_stream) &&
+ g_pollable_output_stream_can_poll (G_POLLABLE_OUTPUT_STREAM (base_stream)));
+}
+
+static gboolean
+g_converter_output_stream_is_writable (GPollableOutputStream *stream)
+{
+ GOutputStream *base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream;
+
+ return g_pollable_output_stream_is_writable (G_POLLABLE_OUTPUT_STREAM (base_stream));
+}
+
+static gssize
+g_converter_output_stream_write_nonblocking (GPollableOutputStream *stream,
+ const void *buffer,
+ gsize count,
+ GError **error)
+{
+ return write_internal (G_OUTPUT_STREAM (stream), buffer, count, FALSE,
+ NULL, error);
+}
+
+static GSource *
+g_converter_output_stream_create_source (GPollableOutputStream *stream,
+ GCancellable *cancellable)
+{
+ GOutputStream *base_stream = G_FILTER_OUTPUT_STREAM (stream)->base_stream;
+ GSource *base_source, *pollable_source;
+
+ base_source = g_pollable_output_stream_create_source (G_POLLABLE_OUTPUT_STREAM (base_stream), NULL);
+ pollable_source = g_pollable_source_new_full (stream, base_source,
+ cancellable);
+ g_source_unref (base_source);
+
+ return pollable_source;
+}
+
/**
* g_converter_output_stream_get_converter:
* @converter_stream: a #GConverterOutputStream
diff --git a/gio/gmemoryinputstream.c b/gio/gmemoryinputstream.c
index 1fed058..e657d5b 100644
--- a/gio/gmemoryinputstream.c
+++ b/gio/gmemoryinputstream.c
@@ -22,6 +22,7 @@
#include "config.h"
#include "gmemoryinputstream.h"
+#include "gpollableinputstream.h"
#include "ginputstream.h"
#include "gseekable.h"
#include "string.h"
@@ -39,6 +40,8 @@
* #GMemoryInputStream is a class for using arbitrary
* memory chunks as input for GIO streaming input operations.
*
+ * As of GLib 2.34, #GMemoryInputStream implements
+ * #GPollableInputStream.
*/
typedef struct _Chunk Chunk;
@@ -108,11 +111,20 @@ static gboolean g_memory_input_stream_truncate (GSeekable *seek
goffset offset,
GCancellable *cancellable,
GError **error);
+
+static void g_memory_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface);
+static gboolean g_memory_input_stream_is_readable (GPollableInputStream *stream);
+static GSource *g_memory_input_stream_create_source (GPollableInputStream *stream,
+ GCancellable *cancellable);
+
static void g_memory_input_stream_finalize (GObject *object);
G_DEFINE_TYPE_WITH_CODE (GMemoryInputStream, g_memory_input_stream, G_TYPE_INPUT_STREAM,
G_IMPLEMENT_INTERFACE (G_TYPE_SEEKABLE,
- g_memory_input_stream_seekable_iface_init))
+ g_memory_input_stream_seekable_iface_init);
+ G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM,
+ g_memory_input_stream_pollable_iface_init);
+ )
static void
@@ -175,6 +187,13 @@ g_memory_input_stream_seekable_iface_init (GSeekableIface *iface)
}
static void
+g_memory_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface)
+{
+ iface->is_readable = g_memory_input_stream_is_readable;
+ iface->create_source = g_memory_input_stream_create_source;
+}
+
+static void
g_memory_input_stream_init (GMemoryInputStream *stream)
{
stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream,
@@ -526,3 +545,23 @@ g_memory_input_stream_truncate (GSeekable *seekable,
_("Cannot truncate GMemoryInputStream"));
return FALSE;
}
+
+static gboolean
+g_memory_input_stream_is_readable (GPollableInputStream *stream)
+{
+ return TRUE;
+}
+
+static GSource *
+g_memory_input_stream_create_source (GPollableInputStream *stream,
+ GCancellable *cancellable)
+{
+ GSource *base_source, *pollable_source;
+
+ base_source = g_timeout_source_new (0);
+ pollable_source = g_pollable_source_new_full (stream, base_source,
+ cancellable);
+ g_source_unref (base_source);
+
+ return pollable_source;
+}
diff --git a/gio/gmemoryoutputstream.c b/gio/gmemoryoutputstream.c
index 08b4fba..b1da60d 100644
--- a/gio/gmemoryoutputstream.c
+++ b/gio/gmemoryoutputstream.c
@@ -25,6 +25,7 @@
#include "config.h"
#include "gmemoryoutputstream.h"
#include "goutputstream.h"
+#include "gpollableoutputstream.h"
#include "gseekable.h"
#include "gsimpleasyncresult.h"
#include "gioerror.h"
@@ -41,6 +42,8 @@
* #GMemoryOutputStream is a class for using arbitrary
* memory chunks as output for GIO streaming output operations.
*
+ * As of GLib 2.34, #GMemoryOutputStream implements
+ * #GPollableOutputStream.
*/
#define MIN_ARRAY_SIZE 16
@@ -119,9 +122,17 @@ static gboolean g_memory_output_stream_truncate (GSeekable *see
GCancellable *cancellable,
GError **error);
+static gboolean g_memory_output_stream_is_writable (GPollableOutputStream *stream);
+static GSource *g_memory_output_stream_create_source (GPollableOutputStream *stream,
+ GCancellable *cancellable);
+
+static void g_memory_output_stream_pollable_iface_init (GPollableOutputStreamInterface *iface);
+
G_DEFINE_TYPE_WITH_CODE (GMemoryOutputStream, g_memory_output_stream, G_TYPE_OUTPUT_STREAM,
G_IMPLEMENT_INTERFACE (G_TYPE_SEEKABLE,
- g_memory_output_stream_seekable_iface_init))
+ g_memory_output_stream_seekable_iface_init);
+ G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_OUTPUT_STREAM,
+ g_memory_output_stream_pollable_iface_init))
static void
@@ -225,6 +236,13 @@ g_memory_output_stream_class_init (GMemoryOutputStreamClass *klass)
}
static void
+g_memory_output_stream_pollable_iface_init (GPollableOutputStreamInterface *iface)
+{
+ iface->is_writable = g_memory_output_stream_is_writable;
+ iface->create_source = g_memory_output_stream_create_source;
+}
+
+static void
g_memory_output_stream_set_property (GObject *object,
guint prop_id,
const GValue *value,
@@ -800,3 +818,23 @@ g_memory_output_stream_truncate (GSeekable *seekable,
return TRUE;
}
+
+static gboolean
+g_memory_output_stream_is_writable (GPollableOutputStream *stream)
+{
+ return TRUE;
+}
+
+static GSource *
+g_memory_output_stream_create_source (GPollableOutputStream *stream,
+ GCancellable *cancellable)
+{
+ GSource *base_source, *pollable_source;
+
+ base_source = g_timeout_source_new (0);
+ pollable_source = g_pollable_source_new_full (stream, base_source,
+ cancellable);
+ g_source_unref (base_source);
+
+ return pollable_source;
+}
diff --git a/gio/tests/converter-stream.c b/gio/tests/converter-stream.c
index 8017015..ae1bdfb 100644
--- a/gio/tests/converter-stream.c
+++ b/gio/tests/converter-stream.c
@@ -724,6 +724,219 @@ test_charset (gconstpointer data)
g_object_unref (conv);
}
+
+static void
+client_connected (GObject *source,
+ GAsyncResult *result,
+ gpointer user_data)
+{
+ GSocketClient *client = G_SOCKET_CLIENT (source);
+ GSocketConnection **conn = user_data;
+ GError *error = NULL;
+
+ *conn = g_socket_client_connect_finish (client, result, &error);
+ g_assert_no_error (error);
+}
+
+static void
+server_connected (GObject *source,
+ GAsyncResult *result,
+ gpointer user_data)
+{
+ GSocketListener *listener = G_SOCKET_LISTENER (source);
+ GSocketConnection **conn = user_data;
+ GError *error = NULL;
+
+ *conn = g_socket_listener_accept_finish (listener, result, NULL, &error);
+ g_assert_no_error (error);
+}
+
+static void
+make_socketpair (GIOStream **left,
+ GIOStream **right)
+{
+ GInetAddress *iaddr;
+ GSocketAddress *saddr, *effective_address;
+ GSocketListener *listener;
+ GSocketClient *client;
+ GError *error = NULL;
+ GSocketConnection *client_conn = NULL, *server_conn = NULL;
+
+ iaddr = g_inet_address_new_loopback (G_SOCKET_FAMILY_IPV4);
+ saddr = g_inet_socket_address_new (iaddr, 0);
+ g_object_unref (iaddr);
+
+ listener = g_socket_listener_new ();
+ g_socket_listener_add_address (listener, saddr,
+ G_SOCKET_TYPE_STREAM,
+ G_SOCKET_PROTOCOL_TCP,
+ NULL,
+ &effective_address,
+ &error);
+ g_assert_no_error (error);
+ g_object_unref (saddr);
+
+ client = g_socket_client_new ();
+
+ g_socket_client_connect_async (client,
+ G_SOCKET_CONNECTABLE (effective_address),
+ NULL, client_connected, &client_conn);
+ g_socket_listener_accept_async (listener, NULL,
+ server_connected, &server_conn);
+
+ while (!client_conn || !server_conn)
+ g_main_context_iteration (NULL, TRUE);
+
+ g_object_unref (client);
+ g_object_unref (listener);
+ g_object_unref (effective_address);
+
+ *left = G_IO_STREAM (client_conn);
+ *right = G_IO_STREAM (server_conn);
+}
+
+static void
+test_converter_pollable (void)
+{
+ GIOStream *left, *right;
+ guint8 *converted, *inptr;
+ guint8 *expanded, *outptr, *expanded_end;
+ gsize n_read, expanded_size;
+ gsize total_read;
+ gssize res;
+ gboolean is_readable;
+ GConverterResult cres;
+ GInputStream *cstream;
+ GPollableInputStream *pollable_in;
+ GOutputStream *socket_out, *mem_out, *cstream_out;
+ GPollableOutputStream *pollable_out;
+ GConverter *expander, *compressor;
+ GError *error;
+ int i;
+
+ expander = g_expander_converter_new ();
+ expanded = g_malloc (100*1000); /* Large enough */
+ cres = g_converter_convert (expander,
+ unexpanded_data, sizeof(unexpanded_data),
+ expanded, 100*1000,
+ G_CONVERTER_INPUT_AT_END,
+ &n_read, &expanded_size, NULL);
+ g_assert (cres == G_CONVERTER_FINISHED);
+ g_assert (n_read == 11);
+ g_assert (expanded_size == 41030);
+ expanded_end = expanded + expanded_size;
+
+ make_socketpair (&left, &right);
+
+ compressor = g_compressor_converter_new ();
+
+ converted = g_malloc (100*1000); /* Large enough */
+
+ cstream = g_converter_input_stream_new (g_io_stream_get_input_stream (left),
+ compressor);
+ pollable_in = G_POLLABLE_INPUT_STREAM (cstream);
+ g_assert (g_pollable_input_stream_can_poll (pollable_in));
+
+ socket_out = g_io_stream_get_output_stream (right);
+
+ total_read = 0;
+ outptr = expanded;
+ inptr = converted;
+ while (TRUE)
+ {
+ error = NULL;
+
+ if (outptr < expanded_end)
+ {
+ res = g_output_stream_write (socket_out,
+ outptr,
+ MIN (1000, (expanded_end - outptr)),
+ NULL, &error);
+ g_assert_cmpint (res, >, 0);
+ outptr += res;
+ }
+ else if (socket_out)
+ {
+ g_object_unref (right);
+ socket_out = NULL;
+ }
+
+ is_readable = g_pollable_input_stream_is_readable (pollable_in);
+ res = g_pollable_input_stream_read_nonblocking (pollable_in,
+ inptr, 1,
+ NULL, &error);
+
+ /* is_readable can be a false positive, but not a false negative */
+ if (!is_readable)
+ g_assert_cmpint (res, ==, -1);
+
+ /* After closing the write end, we can't get WOULD_BLOCK any more */
+ if (!socket_out)
+ g_assert_cmpint (res, !=, -1);
+
+ if (res == -1)
+ {
+ g_assert_error (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK);
+ g_error_free (error);
+
+ continue;
+ }
+
+ if (res == 0)
+ break;
+ inptr += res;
+ total_read += res;
+ }
+
+ g_assert (total_read == n_read - 1); /* Last 2 zeros are combined */
+ g_assert (memcmp (converted, unexpanded_data, total_read) == 0);
+
+ g_object_unref (cstream);
+ g_object_unref (left);
+
+ g_converter_reset (compressor);
+
+ /* This doesn't actually test the behavior on
+ * G_IO_ERROR_WOULD_BLOCK; to do that we'd need to implement a
+ * custom GOutputStream that we could control blocking on.
+ */
+
+ mem_out = g_memory_output_stream_new (NULL, 0, g_realloc, g_free);
+ cstream_out = g_converter_output_stream_new (mem_out, compressor);
+ g_object_unref (mem_out);
+ pollable_out = G_POLLABLE_OUTPUT_STREAM (cstream_out);
+
+ for (i = 0; i < expanded_size; i++)
+ {
+ error = NULL;
+ res = g_pollable_output_stream_write_nonblocking (pollable_out,
+ expanded + i, 1,
+ NULL, &error);
+ g_assert (res != -1);
+ if (res == 0)
+ {
+ g_assert (i == expanded_size -1);
+ break;
+ }
+ g_assert (res == 1);
+ }
+
+ g_output_stream_close (cstream_out, NULL, NULL);
+
+ g_assert (g_memory_output_stream_get_data_size (G_MEMORY_OUTPUT_STREAM (mem_out)) == n_read - 1); /* Last 2 zeros are combined */
+ g_assert (memcmp (g_memory_output_stream_get_data (G_MEMORY_OUTPUT_STREAM (mem_out)),
+ unexpanded_data,
+ g_memory_output_stream_get_data_size (G_MEMORY_OUTPUT_STREAM (mem_out))) == 0);
+
+ g_object_unref (cstream_out);
+
+ g_free (expanded);
+ g_free (converted);
+ g_object_unref (expander);
+ g_object_unref (compressor);
+}
+
+
int
main (int argc,
char *argv[])
@@ -759,6 +972,7 @@ main (int argc,
for (i = 0; i < G_N_ELEMENTS (charset_tests); i++)
g_test_add_data_func (charset_tests[i].path, &charset_tests[i], test_charset);
+ g_test_add_func ("/converter-stream/pollable", test_converter_pollable);
return g_test_run();
}
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]