[glib/tls: 5/7] Add pollable input/output streams



commit 63c60ec892a7f5ff46621c7478a4fa29aa65c28c
Author: Dan Winship <danw gnome org>
Date:   Sat Sep 18 13:05:25 2010 -0400

    Add pollable input/output streams
    
    When interfacing with APIs that expect unix-style async I/O, it is
    useful to be able to tell in advance whether a read/write is going to
    block. This adds new interfaces GPollableInputStream and
    GPollableOutputStream that can be implemented by a GInputStream or
    GOutputStream to add _is_readable/_is_writable, _create_source, and
    _read_nonblocking/_write_nonblocking methods.
    
    Also, implement for GUnixInput/OutputStream and
    GSocketInput/OutputStream

 docs/reference/gio/gio-docs.xml     |    3 +
 docs/reference/gio/gio-sections.txt |   53 ++++++++
 docs/reference/gio/gio.types        |    3 +
 gio/Makefile.am                     |    6 +
 gio/gfiledescriptorbased.c          |    2 +-
 gio/gio.h                           |    3 +
 gio/gio.symbols                     |   25 ++++
 gio/giotypes.h                      |   19 +++
 gio/gpollableinputstream.c          |  196 ++++++++++++++++++++++++++++
 gio/gpollableinputstream.h          |   91 +++++++++++++
 gio/gpollableiostream.c             |  148 +++++++++++++++++++++
 gio/gpollableiostream.h             |   68 ++++++++++
 gio/gpollableoutputstream.c         |  196 ++++++++++++++++++++++++++++
 gio/gpollableoutputstream.h         |   91 +++++++++++++
 gio/gsocketconnection.c             |    6 +-
 gio/gsocketinputstream.c            |   59 ++++++++-
 gio/gsocketoutputstream.c           |   59 ++++++++-
 gio/gunixinputstream.c              |   50 +++++++-
 gio/gunixoutputstream.c             |   49 +++++++-
 gio/tests/.gitignore                |    1 +
 gio/tests/Makefile.am               |    4 +
 gio/tests/pollable.c                |  240 +++++++++++++++++++++++++++++++++++
 22 files changed, 1359 insertions(+), 13 deletions(-)
---
diff --git a/docs/reference/gio/gio-docs.xml b/docs/reference/gio/gio-docs.xml
index cb03759..a04cf8f 100644
--- a/docs/reference/gio/gio-docs.xml
+++ b/docs/reference/gio/gio-docs.xml
@@ -68,6 +68,9 @@
         <xi:include href="xml/gunixoutputstream.xml"/>
         <xi:include href="xml/gconverterinputstream.xml"/>
         <xi:include href="xml/gconverteroutputstream.xml"/>
+        <xi:include href="xml/gpollableiostream.xml"/>
+        <xi:include href="xml/gpollableinputstream.xml"/>
+        <xi:include href="xml/gpollableoutputstream.xml"/>
     </chapter>
     <chapter id="types">
         <title>File types and applications</title>
diff --git a/docs/reference/gio/gio-sections.txt b/docs/reference/gio/gio-sections.txt
index eda433d..18d31c9 100644
--- a/docs/reference/gio/gio-sections.txt
+++ b/docs/reference/gio/gio-sections.txt
@@ -2932,3 +2932,56 @@ G_IS_PERIODIC
 <SUBSECTION Private>
 g_periodic_get_type
 </SECTION>
+
+<SECTION>
+<FILE>gpollableiostream</FILE>
+<TITLE>GPollableIOStream</TITLE>
+GPollableIOStream
+GPollableIOStreamInterface
+<SUBSECTION>
+GPollableSourceFunc
+g_pollable_source_new
+<SUBSECTION Standard>
+G_POLLABLE_IO_STREAM
+G_POLLABLE_IO_STREAM_GET_INTERFACE
+G_IS_POLLABLE_IO_STREAM
+G_TYPE_POLLABLE_IO_STREAM
+<SUBSECTION Private>
+g_pollable_io_stream_get_type
+</SECTION>
+
+<SECTION>
+<FILE>gpollableinputstream</FILE>
+<TITLE>GPollableInputStream</TITLE>
+GPollableInputStream
+GPollableInputStreamInterface
+<SUBSECTION>
+g_pollable_input_stream_is_readable
+g_pollable_input_stream_create_source
+g_pollable_input_stream_read_nonblocking
+<SUBSECTION Standard>
+G_POLLABLE_INPUT_STREAM
+G_POLLABLE_INPUT_STREAM_GET_INTERFACE
+G_IS_POLLABLE_INPUT_STREAM
+G_TYPE_POLLABLE_INPUT_STREAM
+<SUBSECTION Private>
+g_pollable_input_stream_get_type
+</SECTION>
+
+<SECTION>
+<FILE>gpollableoutputstream</FILE>
+<TITLE>GPollableOutputStream</TITLE>
+GPollableOutputStream
+GPollableOutputStreamInterface
+<SUBSECTION>
+g_pollable_output_stream_is_writable
+g_pollable_output_stream_create_source
+g_pollable_output_stream_write_nonblocking
+<SUBSECTION Standard>
+G_POLLABLE_OUTPUT_STREAM
+G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE
+G_IS_POLLABLE_OUTPUT_STREAM
+G_TYPE_POLLABLE_OUTPUT_STREAM
+<SUBSECTION Private>
+g_pollable_output_stream_get_type
+</SECTION>
diff --git a/docs/reference/gio/gio.types b/docs/reference/gio/gio.types
index ec01b40..b047219 100644
--- a/docs/reference/gio/gio.types
+++ b/docs/reference/gio/gio.types
@@ -76,6 +76,9 @@ g_output_stream_splice_flags_get_type
 g_password_save_get_type
 g_periodic_get_type
 g_permission_get_type
+g_pollable_input_stream_get_type
+g_pollable_io_stream_get_type
+g_pollable_output_stream_get_type
 g_proxy_address_enumerator_get_type
 g_proxy_address_get_type
 g_proxy_get_type
diff --git a/gio/Makefile.am b/gio/Makefile.am
index 2aedfc1..fc2dad6 100644
--- a/gio/Makefile.am
+++ b/gio/Makefile.am
@@ -349,6 +349,9 @@ libgio_2_0_la_SOURCES =		\
 	goutputstream.c 	\
 	gperiodic.c		\
 	gpermission.c 		\
+	gpollableinputstream.c 	\
+	gpollableiostream.c 	\
+	gpollableoutputstream.c \
 	gpollfilemonitor.c 	\
 	gpollfilemonitor.h 	\
 	gproxyresolver.c	\
@@ -505,6 +508,9 @@ gio_headers =			\
 	goutputstream.h 	\
 	gperiodic.h		\
 	gpermission.h 		\
+	gpollableinputstream.h 	\
+	gpollableiostream.h 	\
+	gpollableoutputstream.h \
 	gproxyaddress.h         \
 	gproxy.h		\
 	gproxyaddressenumerator.h \
diff --git a/gio/gfiledescriptorbased.c b/gio/gfiledescriptorbased.c
index 6d4ef13..4dd7e91 100644
--- a/gio/gfiledescriptorbased.c
+++ b/gio/gfiledescriptorbased.c
@@ -29,7 +29,7 @@
  * SECTION:gfiledescriptorbased
  * @short_description: Interface for file descriptor based IO
  * @include: gio/gfiledescriptorbased.h
- * @see_also: #GInputStream, #GOutputStream
+ * @see_also: #GInputStream, #GOutputStream, #GPollableIOStream
  *
  * #GFileDescriptorBased is implemented by streams (implementations of
  * #GInputStream or #GOutputStream) that are based on file descriptors.
diff --git a/gio/gio.h b/gio/gio.h
index c18384f..e013942 100644
--- a/gio/gio.h
+++ b/gio/gio.h
@@ -95,6 +95,9 @@
 #include <gio/goutputstream.h>
 #include <gio/gperiodic.h>
 #include <gio/gpermission.h>
+#include <gio/gpollableinputstream.h>
+#include <gio/gpollableiostream.h>
+#include <gio/gpollableoutputstream.h>
 #include <gio/gproxy.h>
 #include <gio/gproxyaddress.h>
 #include <gio/gproxyaddressenumerator.h>
diff --git a/gio/gio.symbols b/gio/gio.symbols
index 73a41d0..eea5deb 100644
--- a/gio/gio.symbols
+++ b/gio/gio.symbols
@@ -1969,3 +1969,28 @@ g_periodic_remove
 g_periodic_unblock
 #endif
 #endif
+
+#if IN_HEADER(__G_POLLABLE_IO_STREAM_H__)
+#if IN_FILE(__G_POLLABLE_IO_STREAM_C__)
+g_pollable_io_stream_get_type G_GNUC_CONST
+g_pollable_source_new
+#endif
+#endif
+
+#if IN_HEADER(__G_POLLABLE_INPUT_STREAM_H__)
+#if IN_FILE(__G_POLLABLE_INPUT_STREAM_C__)
+g_pollable_input_stream_get_type G_GNUC_CONST
+g_pollable_input_stream_create_source
+g_pollable_input_stream_is_readable
+g_pollable_input_stream_read_nonblocking
+#endif
+#endif
+
+#if IN_HEADER(__G_POLLABLE_OUTPUT_STREAM_H__)
+#if IN_FILE(__G_POLLABLE_OUTPUT_STREAM_C__)
+g_pollable_output_stream_get_type G_GNUC_CONST
+g_pollable_output_stream_create_source
+g_pollable_output_stream_is_writable
+g_pollable_output_stream_write_nonblocking
+#endif
+#endif
diff --git a/gio/giotypes.h b/gio/giotypes.h
index 829d285..3c47a7c 100644
--- a/gio/giotypes.h
+++ b/gio/giotypes.h
@@ -124,6 +124,9 @@ typedef struct _GNetworkAddress               GNetworkAddress;
 typedef struct _GNetworkService               GNetworkService;
 typedef struct _GOutputStream                 GOutputStream;
 typedef struct _GIOStream                     GIOStream;
+typedef struct _GPollableInputStream          GPollableInputStream; /* Dummy typedef */
+typedef struct _GPollableIOStream             GPollableIOStream; /* Dummy typedef */
+typedef struct _GPollableOutputStream         GPollableOutputStream; /* Dummy typedef */
 typedef struct _GResolver                     GResolver;
 typedef struct _GSeekable                     GSeekable;
 typedef struct _GSimpleAsyncResult            GSimpleAsyncResult;
@@ -391,6 +394,22 @@ typedef struct _GDBusNodeInfo                 GDBusNodeInfo;
 typedef gboolean (*GCancellableSourceFunc) (GCancellable *cancellable,
 					    gpointer      user_data);
 
+/**
+ * GPollableSourceFunc:
+ * @pollable_stream: the #GPollableInputStream or #GPollableOutputStream
+ * @user_data: data passed in by the user.
+ *
+ * This is the function type of the callback used for the #GSource
+ * returned by g_pollable_input_stream_create_source() and
+ * g_pollable_output_stream_create_source().
+ *
+ * Returns: it should return %FALSE if the source should be removed.
+ *
+ * Since: 2.28
+ */
+typedef gboolean (*GPollableSourceFunc) (GObject  *pollable_stream,
+					 gpointer  user_data);
+
 G_END_DECLS
 
 #endif /* __GIO_TYPES_H__ */
diff --git a/gio/gpollableinputstream.c b/gio/gpollableinputstream.c
new file mode 100644
index 0000000..4c42498
--- /dev/null
+++ b/gio/gpollableinputstream.c
@@ -0,0 +1,196 @@
+/* GIO - GLib Input, Output and Streaming Library
+ *
+ * Copyright (C) 2010 Red Hat, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General
+ * Public License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#include "config.h"
+
+#include <errno.h>
+
+#include "gpollableinputstream.h"
+#include "gasynchelper.h"
+#include "gio-marshal.h"
+#include "gpollableiostream.h"
+#include "glibintl.h"
+
+/**
+ * SECTION:gpollableinputstream
+ * @short_description: Interface for pollable input streams
+ * @include: gio/gio.h
+ * @see_also: #GInputStream, #GPollableIOStream, #GPollableOutputStream,
+ *   #GFileDescriptorBased
+ *
+ * #GPollableInputStream is implemented by #GInputStream<!-- -->s that
+ * can be polled for readiness to read. This can be used when
+ * interfacing with a non-gio API that expects
+ * unix-file-descriptor-style asynchronous I/O rather than gio-style.
+ *
+ * Since: 2.28
+ */
+
+G_DEFINE_INTERFACE (GPollableInputStream, g_pollable_input_stream, G_TYPE_INPUT_STREAM)
+
+static gboolean g_pollable_input_stream_default_is_readable      (GPollableInputStream *stream);
+static GSource *g_pollable_input_stream_default_create_source    (GPollableInputStream    *stream,
+								  GCancellable *cancellable);
+static gssize   g_pollable_input_stream_default_read_nonblocking (GPollableInputStream  *stream,
+								  void                  *buffer,
+								  gsize                  size,
+								  GError               **error);
+
+static void
+g_pollable_input_stream_default_init (GPollableInputStreamInterface *iface)
+{
+  iface->is_readable      = g_pollable_input_stream_default_is_readable;
+  iface->create_source    = g_pollable_input_stream_default_create_source;
+  iface->read_nonblocking = g_pollable_input_stream_default_read_nonblocking;
+}
+
+static gboolean
+g_pollable_input_stream_default_is_readable (GPollableInputStream *stream)
+{
+  return TRUE;
+}
+
+/**
+ * g_pollable_input_stream_is_readable:
+ * @stream: a #GPollableInputStream.
+ *
+ * Checks if @stream can be read.
+ *
+ * Note that some stream types may not be able to implement this 100%
+ * reliably, and it is possible that a call to g_input_stream_read()
+ * after this returns %TRUE would still block. To guarantee
+ * non-blocking behavior, you should always use
+ * g_pollable_input_stream_read_nonblocking(), which will return a
+ * %G_IO_ERROR_WOULD_BLOCK error rather than blocking.
+ *
+ * Returns: %TRUE if @stream is readable, %FALSE if not. If an error
+ *   has occurred on @stream, this will result in
+ *   g_pollable_input_stream_is_readable() returning %TRUE, and the
+ *   next attempt to read will return the error.
+ *
+ * Since: 2.28
+ */
+gboolean
+g_pollable_input_stream_is_readable (GPollableInputStream *stream)
+{
+  g_return_val_if_fail (G_IS_POLLABLE_INPUT_STREAM (stream), FALSE);
+
+  return G_POLLABLE_INPUT_STREAM_GET_INTERFACE (stream)->is_readable (stream);
+}
+
+static GSource *
+g_pollable_input_stream_default_create_source (GPollableInputStream *stream,
+					       GCancellable         *cancellable)
+{
+  GSource *inner_source, *pollable_source;
+
+  pollable_source = g_pollable_source_new (G_OBJECT (stream));
+  inner_source = g_idle_source_new ();
+  g_source_add_child_source (pollable_source, inner_source);
+  g_source_unref (inner_source);
+
+  return pollable_source;
+}
+
+/**
+ * g_pollable_input_stream_create_source:
+ * @stream: a #GPollableInputStream.
+ * @cancellable: a #GCancellable, or %NULL
+ *
+ * Creates a #GSource that triggers when @stream can be read, or
+ * @cancellable is triggered or an error occurs. The callback on the
+ * source is of the #GPollableSourceFunc type.
+ *
+ * As with g_pollable_input_stream_is_readable(), it is possible that
+ * the stream may not actually be readable even after the source
+ * triggers, so you should use
+ * g_pollable_input_stream_read_nonblocking() rather than
+ * g_input_stream_read() from the callback.
+ *
+ * Returns: a new #GSource
+ *
+ * Since: 2.28
+ */
+GSource *
+g_pollable_input_stream_create_source (GPollableInputStream *stream,
+				       GCancellable         *cancellable)
+{
+  g_return_val_if_fail (G_IS_POLLABLE_INPUT_STREAM (stream), NULL);
+
+  return G_POLLABLE_INPUT_STREAM_GET_INTERFACE (stream)->
+	  create_source (stream, cancellable);
+}
+
+static gssize
+g_pollable_input_stream_default_read_nonblocking (GPollableInputStream  *stream,
+						  void                  *buffer,
+						  gsize                  size,
+						  GError               **error)
+{
+  if (!g_pollable_input_stream_is_readable (stream))
+    {
+      g_set_error (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
+		   g_strerror (EAGAIN));
+      return -1;
+    }
+
+  return g_input_stream_read (G_INPUT_STREAM (stream), buffer, size,
+			      NULL, error);
+}
+
+/**
+ * g_pollable_input_stream_read_nonblocking:
+ * @stream: a #GPollableInputStream
+ * @buffer: a buffer to read data into (which should be at least @size
+ *     bytes long).
+ * @size: the number of bytes you want to read
+ * @cancellable: a #GCancellable, or %NULL
+ * @error: #GError for error reporting, or %NULL to ignore.
+ *
+ * Attempts to read up to @size bytes from @stream into @buffer, as
+ * with g_input_stream_read(). If @stream is not currently readable,
+ * this will immediately return %G_IO_ERROR_WOULD_BLOCK, and you can
+ * use g_pollable_input_stream_create_source() to create a #GSource
+ * that will be triggered when @stream is readable.
+ *
+ * Note that since this method never blocks, you cannot actually
+ * use @cancellable to cancel it. However, it will return an error
+ * if @cancellable has already been cancelled when you call, which
+ * may happen if you call this method after a source triggers due
+ * to having been cancelled.
+ *
+ * Return value: the number of bytes read, or -1 on error (including
+ *   %G_IO_ERROR_WOULD_BLOCK).
+ */
+gssize
+g_pollable_input_stream_read_nonblocking (GPollableInputStream  *stream,
+					  void                  *buffer,
+					  gsize                  size,
+					  GCancellable          *cancellable,
+					  GError               **error)
+{
+  g_return_val_if_fail (G_IS_POLLABLE_INPUT_STREAM (stream), -1);
+
+  if (g_cancellable_set_error_if_cancelled (cancellable, error))
+    return -1;
+
+  return G_POLLABLE_INPUT_STREAM_GET_INTERFACE (stream)->
+    read_nonblocking (stream, buffer, size, error);
+}
diff --git a/gio/gpollableinputstream.h b/gio/gpollableinputstream.h
new file mode 100644
index 0000000..17becba
--- /dev/null
+++ b/gio/gpollableinputstream.h
@@ -0,0 +1,91 @@
+/* GIO - GLib Input, Output and Streaming Library
+ *
+ * Copyright (C) 2010 Red Hat, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General
+ * Public License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#ifndef __G_POLLABLE_INPUT_STREAM_H__
+#define __G_POLLABLE_INPUT_STREAM_H__
+
+#include <gio/gio.h>
+
+G_BEGIN_DECLS
+
+#define G_TYPE_POLLABLE_INPUT_STREAM               (g_pollable_input_stream_get_type ())
+#define G_POLLABLE_INPUT_STREAM(obj)               (G_TYPE_CHECK_INSTANCE_CAST ((obj), G_TYPE_POLLABLE_INPUT_STREAM, GPollableInputStream))
+#define G_IS_POLLABLE_INPUT_STREAM(obj)            (G_TYPE_CHECK_INSTANCE_TYPE ((obj), G_TYPE_POLLABLE_INPUT_STREAM))
+#define G_POLLABLE_INPUT_STREAM_GET_INTERFACE(obj) (G_TYPE_INSTANCE_GET_INTERFACE ((obj), G_TYPE_POLLABLE_INPUT_STREAM, GPollableInputStreamInterface))
+
+/**
+ * GPollableInputStream:
+ *
+ * An interface for a #GInputStream that can be polled for readability.
+ *
+ * Since: 2.28
+ */
+typedef struct _GPollableInputStreamInterface GPollableInputStreamInterface;
+
+/**
+ * GPollableInputStreamInterface:
+ * @g_iface: The parent interface.
+ * @is_readable: Checks if the stream is readable
+ * @create_source: Creates a #GSource to poll the stream
+ * @read_nonblocking: Does a non-blocking read or returns
+ *   %G_IO_ERROR_WOULD_BLOCK
+ *
+ * The interface for pollable input streams.
+ *
+ * The default implementation of @read_nonblocking calls
+ * g_pollable_input_stream_is_readable(), and then calls
+ * g_input_stream_read() if it returns %TRUE. This means you only need
+ * to override it if it is possible that your @is_readable
+ * implementation may return %TRUE when the stream is not actually
+ * readable.
+ *
+ * Since: 2.28
+ */
+struct _GPollableInputStreamInterface
+{
+  GTypeInterface g_iface;
+
+  /* Virtual Table */
+  gboolean     (*is_readable)      (GPollableInputStream  *stream);
+  GSource *    (*create_source)    (GPollableInputStream  *stream,
+				    GCancellable          *cancellable);
+  gssize       (*read_nonblocking) (GPollableInputStream  *stream,
+				    void                  *buffer,
+				    gsize                  size,
+				    GError               **error);
+};
+
+GType    g_pollable_input_stream_get_type         (void) G_GNUC_CONST;
+
+gboolean g_pollable_input_stream_is_readable      (GPollableInputStream  *stream);
+GSource *g_pollable_input_stream_create_source    (GPollableInputStream  *stream,
+						   GCancellable          *cancellable);
+
+gssize   g_pollable_input_stream_read_nonblocking (GPollableInputStream  *stream,
+						   void                  *buffer,
+						   gsize                  size,
+						   GCancellable          *cancellable,
+						   GError               **error);
+
+G_END_DECLS
+
+
+#endif /* __G_POLLABLE_INPUT_STREAM_H__ */
+
diff --git a/gio/gpollableiostream.c b/gio/gpollableiostream.c
new file mode 100644
index 0000000..db69d0a
--- /dev/null
+++ b/gio/gpollableiostream.c
@@ -0,0 +1,148 @@
+/* GIO - GLib IO, Output and Streaming Library
+ *
+ * Copyright (C) 2010 Red Hat, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General
+ * Public License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#include "config.h"
+
+#include <errno.h>
+
+#include "gpollableiostream.h"
+#include "gio-marshal.h"
+#include "glibintl.h"
+
+/**
+ * SECTION:gpollableiostream
+ * @short_description: Interface for pollable I/O streams
+ * @include: gio/gio.h
+ * @see_also: #GIOStream, #GPollableInputStream, #GPollableOutputStream
+ *
+ * #GPollableIOStream is implemented by #GIOStream<!-- -->s whose
+ * input streams are #GPollableInputStream and whose output streams are
+ * #GPollableOutputStream.
+ *
+ * Since: 2.28
+ */
+
+G_DEFINE_INTERFACE (GPollableIOStream, g_pollable_io_stream, G_TYPE_IO_STREAM)
+
+static void
+g_pollable_io_stream_default_init (GPollableIOStreamInterface *iface)
+{
+}
+
+typedef struct {
+  GSource       source;
+
+  GObject      *stream;
+} GPollableSource;
+
+static gboolean
+pollable_source_prepare (GSource *source,
+			 gint    *timeout)
+{
+  *timeout = -1;
+  return FALSE;
+}
+
+static gboolean
+pollable_source_check (GSource *source)
+{
+  return FALSE;
+}
+
+static gboolean
+pollable_source_dispatch (GSource     *source,
+			  GSourceFunc  callback,
+			  gpointer     user_data)
+{
+  GPollableSourceFunc func = (GPollableSourceFunc)callback;
+  GPollableSource *pollable_source = (GPollableSource *)source;
+
+  return (*func) (pollable_source->stream, user_data);
+}
+
+static void
+pollable_source_finalize (GSource *source)
+{
+  GPollableSource *pollable_source = (GPollableSource *)source;
+
+  g_object_unref (pollable_source->stream);
+}
+
+static gboolean
+pollable_source_closure_callback (GObject  *stream,
+				  gpointer  data)
+{
+  GClosure *closure = data;
+
+  GValue param = { 0, };
+  GValue result_value = { 0, };
+  gboolean result;
+
+  g_value_init (&result_value, G_TYPE_BOOLEAN);
+
+  g_value_init (&param, G_TYPE_OBJECT);
+  g_value_set_object (&param, stream);
+
+  g_closure_invoke (closure, &result_value, 1, &param, NULL);
+
+  result = g_value_get_boolean (&result_value);
+  g_value_unset (&result_value);
+  g_value_unset (&param);
+
+  return result;
+}
+
+static GSourceFuncs pollable_source_funcs =
+{
+  pollable_source_prepare,
+  pollable_source_check,
+  pollable_source_dispatch,
+  pollable_source_finalize,
+  (GSourceFunc)pollable_source_closure_callback,
+  (GSourceDummyMarshal)_gio_marshal_BOOLEAN__VOID,
+};
+
+/**
+ * g_pollable_source_new:
+ * @pollable_stream: the stream associated with the new source
+ *
+ * Utility method for #GPollableInputStream and #GPollableOutputStream
+ * implementations. Creates a new #GSource that expects a callback of
+ * type #GPollableSourceFunc. The new source does not actually do
+ * anything on its own; use g_source_add_child_source() to add other
+ * sources to it to cause it to trigger.
+ *
+ * Return value: the new #GSource.
+ *
+ * Since: 2.28
+ */
+GSource *
+g_pollable_source_new (GObject *pollable_stream)
+{
+  GSource *source;
+  GPollableSource *pollable_source;
+
+  source = g_source_new (&pollable_source_funcs, sizeof (GPollableSource));
+  g_source_set_name (source, "GPollableSource");
+  pollable_source = (GPollableSource *)source;
+  pollable_source->stream = g_object_ref (pollable_stream);
+
+  return source;
+}
diff --git a/gio/gpollableiostream.h b/gio/gpollableiostream.h
new file mode 100644
index 0000000..b245b95
--- /dev/null
+++ b/gio/gpollableiostream.h
@@ -0,0 +1,68 @@
+/* GIO - GLib Input, Output and Streaming Library
+ *
+ * Copyright (C) 2010 Red Hat, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General
+ * Public License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#ifndef __G_POLLABLE_IO_STREAM_H__
+#define __G_POLLABLE_IO_STREAM_H__
+
+#include <gio/gio.h>
+
+G_BEGIN_DECLS
+
+#define G_TYPE_POLLABLE_IO_STREAM               (g_pollable_io_stream_get_type ())
+#define G_POLLABLE_IO_STREAM(obj)               (G_TYPE_CHECK_INSTANCE_CAST ((obj), G_TYPE_POLLABLE_IO_STREAM, GPollableIOStream))
+#define G_IS_POLLABLE_IO_STREAM(obj)            (G_TYPE_CHECK_INSTANCE_TYPE ((obj), G_TYPE_POLLABLE_IO_STREAM))
+#define G_POLLABLE_IO_STREAM_GET_INTERFACE(obj) (G_TYPE_INSTANCE_GET_INTERFACE ((obj), G_TYPE_POLLABLE_IO_STREAM, GPollableIOStreamInterface))
+
+/**
+ * GPollableIOStream:
+ *
+ * An interface for a #GIOStream whose input and output streams
+ * implement #GPollableInputStream and #GPollableOutputStream.
+ *
+ * Since: 2.28
+ */
+typedef struct _GPollableIOStreamInterface GPollableIOStreamInterface;
+
+/**
+ * GPollableIOStreamInterface:
+ * @g_iface: The parent interface.
+ *
+ * The interface for pollable #GIOStreams. There are no methods in
+ * this interface; implementing the interface is just a promise that
+ * the streams returned by g_io_stream_get_input_stream() and
+ * g_io_stream_get_output_stream() will themselves be pollable.
+ *
+ * Since: 2.28
+ */
+struct _GPollableIOStreamInterface
+{
+  GTypeInterface g_iface;
+
+};
+
+GType    g_pollable_io_stream_get_type   (void) G_GNUC_CONST;
+
+/* Helper method for stream implementations */
+GSource *g_pollable_source_new   (GObject *pollable_stream);
+
+G_END_DECLS
+
+#endif /* __G_POLLABLE_IO_STREAM_H__ */
+
diff --git a/gio/gpollableoutputstream.c b/gio/gpollableoutputstream.c
new file mode 100644
index 0000000..c1d124b
--- /dev/null
+++ b/gio/gpollableoutputstream.c
@@ -0,0 +1,196 @@
+/* GIO - GLib Input, Output and Streaming Library
+ *
+ * Copyright (C) 2010 Red Hat, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General
+ * Public License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#include "config.h"
+
+#include <errno.h>
+
+#include "gpollableoutputstream.h"
+#include "gasynchelper.h"
+#include "gfiledescriptorbased.h"
+#include "gio-marshal.h"
+#include "gpollableiostream.h"
+#include "glibintl.h"
+
+/**
+ * SECTION:gpollableoutputstream
+ * @short_description: Interface for pollable output streams
+ * @include: gio/gio.h
+ * @see_also: #GOutputStream, #GFileDescriptorBased, #GPollableIOStream,
+ *   #GPollableInputStream
+ *
+ * #GPollableOutputStream is implemented by #GOutputStream<!-- -->s that
+ * can be polled for readiness to write. This can be used when
+ * interfacing with a non-gio API that expects
+ * unix-file-descriptor-style asynchronous I/O rather than gio-style.
+ *
+ * Since: 2.28
+ */
+
+G_DEFINE_INTERFACE (GPollableOutputStream, g_pollable_output_stream, G_TYPE_OUTPUT_STREAM)
+
+static gboolean g_pollable_output_stream_default_is_writable       (GPollableOutputStream *stream);
+static GSource *g_pollable_output_stream_default_create_source     (GPollableOutputStream    *stream,
+								    GCancellable *cancellable);
+static gssize   g_pollable_output_stream_default_write_nonblocking (GPollableOutputStream  *stream,
+								    const void             *buffer,
+								    gsize                   size,
+								    GError                **error);
+
+static void
+g_pollable_output_stream_default_init (GPollableOutputStreamInterface *iface)
+{
+  iface->is_writable       = g_pollable_output_stream_default_is_writable;
+  iface->create_source     = g_pollable_output_stream_default_create_source;
+  iface->write_nonblocking = g_pollable_output_stream_default_write_nonblocking;
+}
+
+static gboolean
+g_pollable_output_stream_default_is_writable (GPollableOutputStream *stream)
+{
+  return TRUE;
+}
+
+/**
+ * g_pollable_output_stream_is_writable:
+ * @stream: a #GPollableOutputStream.
+ *
+ * Checks if @stream can be written.
+ *
+ * Note that some stream types may not be able to implement this 100%
+ * reliably, and it is possible that a call to g_output_stream_write()
+ * after this returns %TRUE would still block. To guarantee
+ * non-blocking behavior, you should always use
+ * g_pollable_output_stream_write_nonblocking(), which will return a
+ * %G_IO_ERROR_WOULD_BLOCK error rather than blocking.
+ *
+ * Returns: %TRUE if @stream is writable, %FALSE if not. If an error
+ *   has occurred on @stream, this will result in
+ *   g_pollable_output_stream_is_writable() returning %TRUE, and the
+ *   next attempt to write will return the error.
+ *
+ * Since: 2.28
+ */
+gboolean
+g_pollable_output_stream_is_writable (GPollableOutputStream *stream)
+{
+  g_return_val_if_fail (G_IS_POLLABLE_OUTPUT_STREAM (stream), FALSE);
+
+  return G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE (stream)->is_writable (stream);
+}
+
+static GSource *
+g_pollable_output_stream_default_create_source (GPollableOutputStream *stream,
+						GCancellable          *cancellable)
+{
+  GSource *inner_source, *pollable_source;
+
+  pollable_source = g_pollable_source_new (G_OBJECT (stream));
+  inner_source = g_idle_source_new ();
+  g_source_add_child_source (pollable_source, inner_source);
+  g_source_unref (inner_source);
+
+  return pollable_source;
+}
+
+/**
+ * g_pollable_output_stream_create_source:
+ * @stream: a #GPollableOutputStream.
+ * @cancellable: a #GCancellable, or %NULL
+ *
+ * Creates a #GSource that triggers when @stream can be written, or
+ * @cancellable is triggered or an error occurs. The callback on the
+ * source is of the #GPollableSourceFunc type.
+ *
+ * As with g_pollable_output_stream_is_writable(), it is possible that
+ * the stream may not actually be writable even after the source
+ * triggers, so you should use
+ * g_pollable_output_stream_write_nonblocking() rather than
+ * g_output_stream_write() from the callback.
+ *
+ * Returns: a new #GSource
+ *
+ * Since: 2.28
+ */
+GSource *
+g_pollable_output_stream_create_source (GPollableOutputStream *stream,
+					GCancellable          *cancellable)
+{
+  g_return_val_if_fail (G_IS_POLLABLE_OUTPUT_STREAM (stream), NULL);
+
+  return G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE (stream)->
+	  create_source (stream, cancellable);
+}
+
+static gssize
+g_pollable_output_stream_default_write_nonblocking (GPollableOutputStream  *stream,
+						    const void             *buffer,
+						    gsize                   size,
+						    GError                **error)
+{
+  if (!g_pollable_output_stream_is_writable (stream))
+    {
+      g_set_error (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK,
+		   g_strerror (EAGAIN));
+      return -1;
+    }
+
+  return g_output_stream_write (G_OUTPUT_STREAM (stream), buffer, size,
+				NULL, error);
+}
+
+/**
+ * g_pollable_output_stream_write_nonblocking:
+ * @stream: a #GPollableOutputStream
+ * @buffer: a buffer to write data from
+ * @size: the number of bytes you want to write
+ * @cancellable: a #GCancellable, or %NULL
+ * @error: #GError for error reporting, or %NULL to ignore.
+ *
+ * Attempts to write up to @size bytes from @buffer to @stream, as
+ * with g_output_stream_write(). If @stream is not currently writable,
+ * this will immediately return %G_IO_ERROR_WOULD_BLOCK, and you can
+ * use g_pollable_output_stream_create_source() to create a #GSource
+ * that will be triggered when @stream is writable.
+ *
+ * Note that since this method never blocks, you cannot actually
+ * use @cancellable to cancel it. However, it will return an error
+ * if @cancellable has already been cancelled when you call, which
+ * may happen if you call this method after a source triggers due
+ * to having been cancelled.
+ *
+ * Return value: the number of bytes written, or -1 on error (including
+ *   %G_IO_ERROR_WOULD_BLOCK).
+ */
+gssize
+g_pollable_output_stream_write_nonblocking (GPollableOutputStream  *stream,
+					    const void             *buffer,
+					    gsize                   size,
+					    GCancellable           *cancellable,
+					    GError                **error)
+{
+  g_return_val_if_fail (G_IS_POLLABLE_OUTPUT_STREAM (stream), -1);
+
+  if (g_cancellable_set_error_if_cancelled (cancellable, error))
+    return -1;
+
+  return G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE (stream)->
+    write_nonblocking (stream, buffer, size, error);
+}
diff --git a/gio/gpollableoutputstream.h b/gio/gpollableoutputstream.h
new file mode 100644
index 0000000..6ea1167
--- /dev/null
+++ b/gio/gpollableoutputstream.h
@@ -0,0 +1,91 @@
+/* GIO - GLib Input, Output and Streaming Library
+ *
+ * Copyright (C) 2010 Red Hat, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General
+ * Public License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#ifndef __G_POLLABLE_OUTPUT_STREAM_H__
+#define __G_POLLABLE_OUTPUT_STREAM_H__
+
+#include <gio/gio.h>
+
+G_BEGIN_DECLS
+
+#define G_TYPE_POLLABLE_OUTPUT_STREAM               (g_pollable_output_stream_get_type ())
+#define G_POLLABLE_OUTPUT_STREAM(obj)               (G_TYPE_CHECK_INSTANCE_CAST ((obj), G_TYPE_POLLABLE_OUTPUT_STREAM, GPollableOutputStream))
+#define G_IS_POLLABLE_OUTPUT_STREAM(obj)            (G_TYPE_CHECK_INSTANCE_TYPE ((obj), G_TYPE_POLLABLE_OUTPUT_STREAM))
+#define G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE(obj) (G_TYPE_INSTANCE_GET_INTERFACE ((obj), G_TYPE_POLLABLE_OUTPUT_STREAM, GPollableOutputStreamInterface))
+
+/**
+ * GPollableOutputStream:
+ *
+ * An interface for a #GOutputStream that can be polled for readability.
+ *
+ * Since: 2.28
+ */
+typedef struct _GPollableOutputStreamInterface GPollableOutputStreamInterface;
+
+/**
+ * GPollableOutputStreamInterface:
+ * @g_iface: The parent interface.
+ * @is_writable: Checks if the stream is writable
+ * @create_source: Creates a #GSource to poll the stream
+ * @write_nonblocking: Does a non-blocking write or returns
+ *   %G_IO_ERROR_WOULD_BLOCK
+ *
+ * The interface for pollable output streams.
+ *
+ * The default implementation of @write_nonblocking calls
+ * g_pollable_output_stream_is_writable(), and then calls
+ * g_output_stream_write() if it returns %TRUE. This means you only
+ * need to override it if it is possible that your @is_writable
+ * implementation may return %TRUE when the stream is not actually
+ * writable.
+ *
+ * Since: 2.28
+ */
+struct _GPollableOutputStreamInterface
+{
+  GTypeInterface g_iface;
+
+  /* Virtual Table */
+  gboolean     (*is_writable)        (GPollableOutputStream  *stream);
+  GSource *    (*create_source)     (GPollableOutputStream  *stream,
+				     GCancellable           *cancellable);
+  gssize       (*write_nonblocking) (GPollableOutputStream  *stream,
+				     const void             *buffer,
+				     gsize                   size,
+				     GError                **error);
+};
+
+GType    g_pollable_output_stream_get_type          (void) G_GNUC_CONST;
+
+gboolean g_pollable_output_stream_is_writable       (GPollableOutputStream  *stream);
+GSource *g_pollable_output_stream_create_source     (GPollableOutputStream  *stream,
+						     GCancellable           *cancellable);
+
+gssize   g_pollable_output_stream_write_nonblocking (GPollableOutputStream  *stream,
+						     const void             *buffer,
+						     gsize                   size,
+						     GCancellable           *cancellable,
+						     GError                **error);
+
+G_END_DECLS
+
+
+#endif /* __G_POLLABLE_OUTPUT_STREAM_H__ */
+
diff --git a/gio/gsocketconnection.c b/gio/gsocketconnection.c
index 495f81c..4eddfb6 100644
--- a/gio/gsocketconnection.c
+++ b/gio/gsocketconnection.c
@@ -60,8 +60,10 @@
  * Since: 2.22
  */
 
-G_DEFINE_TYPE (GSocketConnection,
-	       g_socket_connection, G_TYPE_IO_STREAM);
+G_DEFINE_TYPE_WITH_CODE (GSocketConnection,
+			 g_socket_connection, G_TYPE_IO_STREAM,
+			 G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_IO_STREAM, NULL)
+			 );
 
 enum
 {
diff --git a/gio/gsocketinputstream.c b/gio/gsocketinputstream.c
index 4a27d90..5ae21e8 100644
--- a/gio/gsocketinputstream.c
+++ b/gio/gsocketinputstream.c
@@ -27,13 +27,19 @@
 #include "gsocketinputstream.h"
 #include "glibintl.h"
 
-#include <gio/gsimpleasyncresult.h>
-#include <gio/gcancellable.h>
-#include <gio/gioerror.h>
+#include "gsimpleasyncresult.h"
+#include "gcancellable.h"
+#include "gpollableinputstream.h"
+#include "gpollableiostream.h"
+#include "gioerror.h"
 
 
+static void g_socket_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface);
+
 #define g_socket_input_stream_get_type _g_socket_input_stream_get_type
-G_DEFINE_TYPE (GSocketInputStream, g_socket_input_stream, G_TYPE_INPUT_STREAM);
+G_DEFINE_TYPE_WITH_CODE (GSocketInputStream, g_socket_input_stream, G_TYPE_INPUT_STREAM,
+			 G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM, g_socket_input_stream_pollable_iface_init)
+			 )
 
 enum
 {
@@ -205,6 +211,43 @@ g_socket_input_stream_read_finish (GInputStream  *stream,
   return count;
 }
 
+static gboolean
+g_socket_input_stream_pollable_is_readable (GPollableInputStream *pollable)
+{
+  GSocketInputStream *input_stream = G_SOCKET_INPUT_STREAM (pollable);
+
+  return g_socket_condition_check (input_stream->priv->socket, G_IO_IN);
+}
+
+static GSource *
+g_socket_input_stream_pollable_create_source (GPollableInputStream *pollable,
+					      GCancellable         *cancellable)
+{
+  GSocketInputStream *input_stream = G_SOCKET_INPUT_STREAM (pollable);
+  GSource *socket_source, *pollable_source;
+
+  pollable_source = g_pollable_source_new (G_OBJECT (input_stream));
+  socket_source = g_socket_create_source (input_stream->priv->socket,
+					  G_IO_IN, cancellable);
+  g_source_add_child_source (pollable_source, socket_source);
+  g_source_unref (socket_source);
+
+  return pollable_source;
+}
+
+static gssize
+g_socket_input_stream_pollable_read_nonblocking (GPollableInputStream  *pollable,
+						 void                  *buffer,
+						 gsize                  size,
+						 GError               **error)
+{
+  GSocketInputStream *input_stream = G_SOCKET_INPUT_STREAM (pollable);
+
+  return g_socket_receive_with_blocking (input_stream->priv->socket,
+					 buffer, size, FALSE,
+					 NULL, error);
+}
+
 static void
 g_socket_input_stream_class_init (GSocketInputStreamClass *klass)
 {
@@ -230,6 +273,14 @@ g_socket_input_stream_class_init (GSocketInputStreamClass *klass)
 }
 
 static void
+g_socket_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface)
+{
+  iface->is_readable = g_socket_input_stream_pollable_is_readable;
+  iface->create_source = g_socket_input_stream_pollable_create_source;
+  iface->read_nonblocking = g_socket_input_stream_pollable_read_nonblocking;
+}
+
+static void
 g_socket_input_stream_init (GSocketInputStream *stream)
 {
   stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream, G_TYPE_SOCKET_INPUT_STREAM, GSocketInputStreamPrivate);
diff --git a/gio/gsocketoutputstream.c b/gio/gsocketoutputstream.c
index 4febf88..797d1a1 100644
--- a/gio/gsocketoutputstream.c
+++ b/gio/gsocketoutputstream.c
@@ -28,14 +28,20 @@
 #include "gsocketoutputstream.h"
 #include "gsocket.h"
 
-#include <gio/gsimpleasyncresult.h>
-#include <gio/gcancellable.h>
-#include <gio/gioerror.h>
+#include "gsimpleasyncresult.h"
+#include "gcancellable.h"
+#include "gpollableoutputstream.h"
+#include "gpollableiostream.h"
+#include "gioerror.h"
 #include "glibintl.h"
 
 
+static void g_socket_output_stream_pollable_iface_init (GPollableOutputStreamInterface *iface);
+
 #define g_socket_output_stream_get_type _g_socket_output_stream_get_type
-G_DEFINE_TYPE (GSocketOutputStream, g_socket_output_stream, G_TYPE_OUTPUT_STREAM);
+G_DEFINE_TYPE_WITH_CODE (GSocketOutputStream, g_socket_output_stream, G_TYPE_OUTPUT_STREAM,
+			 G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_OUTPUT_STREAM, g_socket_output_stream_pollable_iface_init)
+			 )
 
 enum
 {
@@ -207,6 +213,43 @@ g_socket_output_stream_write_finish (GOutputStream  *stream,
   return count;
 }
 
+static gboolean
+g_socket_output_stream_pollable_is_writable (GPollableOutputStream *pollable)
+{
+  GSocketOutputStream *output_stream = G_SOCKET_OUTPUT_STREAM (pollable);
+
+  return g_socket_condition_check (output_stream->priv->socket, G_IO_OUT);
+}
+
+static GSource *
+g_socket_output_stream_pollable_create_source (GPollableOutputStream *pollable,
+					       GCancellable          *cancellable)
+{
+  GSocketOutputStream *output_stream = G_SOCKET_OUTPUT_STREAM (pollable);
+  GSource *socket_source, *pollable_source;
+
+  pollable_source = g_pollable_source_new (G_OBJECT (output_stream));
+  socket_source = g_socket_create_source (output_stream->priv->socket,
+					  G_IO_OUT, cancellable);
+  g_source_add_child_source (pollable_source, socket_source);
+  g_source_unref (socket_source);
+
+  return pollable_source;
+}
+
+static gssize
+g_socket_output_stream_pollable_write_nonblocking (GPollableOutputStream  *pollable,
+						   const void             *buffer,
+						   gsize                   size,
+						   GError                **error)
+{
+  GSocketOutputStream *output_stream = G_SOCKET_OUTPUT_STREAM (pollable);
+
+  return g_socket_send_with_blocking (output_stream->priv->socket,
+				      buffer, size, FALSE,
+				      NULL, error);
+}
+
 static void
 g_socket_output_stream_class_init (GSocketOutputStreamClass *klass)
 {
@@ -232,6 +275,14 @@ g_socket_output_stream_class_init (GSocketOutputStreamClass *klass)
 }
 
 static void
+g_socket_output_stream_pollable_iface_init (GPollableOutputStreamInterface *iface)
+{
+  iface->is_writable = g_socket_output_stream_pollable_is_writable;
+  iface->create_source = g_socket_output_stream_pollable_create_source;
+  iface->write_nonblocking = g_socket_output_stream_pollable_write_nonblocking;
+}
+
+static void
 g_socket_output_stream_init (GSocketOutputStream *stream)
 {
   stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream, G_TYPE_SOCKET_OUTPUT_STREAM, GSocketOutputStreamPrivate);
diff --git a/gio/gunixinputstream.c b/gio/gunixinputstream.c
index e1ee34a..6ada540 100644
--- a/gio/gunixinputstream.c
+++ b/gio/gunixinputstream.c
@@ -60,7 +60,12 @@ enum {
   PROP_CLOSE_FD
 };
 
-G_DEFINE_TYPE (GUnixInputStream, g_unix_input_stream, G_TYPE_INPUT_STREAM);
+static void g_unix_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface);
+
+G_DEFINE_TYPE_WITH_CODE (GUnixInputStream, g_unix_input_stream, G_TYPE_INPUT_STREAM,
+			 G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM,
+						g_unix_input_stream_pollable_iface_init)
+			 );
 
 struct _GUnixInputStreamPrivate {
   int fd;
@@ -111,6 +116,9 @@ static gboolean g_unix_input_stream_close_finish (GInputStream         *stream,
 						  GAsyncResult         *result,
 						  GError              **error);
 
+static gboolean g_unix_input_stream_pollable_is_readable   (GPollableInputStream *stream);
+static GSource *g_unix_input_stream_pollable_create_source (GPollableInputStream *stream,
+							    GCancellable         *cancellable);
 
 static void
 g_unix_input_stream_finalize (GObject *object)
@@ -175,6 +183,13 @@ g_unix_input_stream_class_init (GUnixInputStreamClass *klass)
 }
 
 static void
+g_unix_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface)
+{
+  iface->is_readable = g_unix_input_stream_pollable_is_readable;
+  iface->create_source = g_unix_input_stream_pollable_create_source;
+}
+
+static void
 g_unix_input_stream_set_property (GObject         *object,
 				  guint            prop_id,
 				  const GValue    *value,
@@ -637,3 +652,36 @@ g_unix_input_stream_close_finish (GInputStream  *stream,
   /* Failures handled in generic close_finish code */
   return TRUE;
 }
+
+static gboolean
+g_unix_input_stream_pollable_is_readable (GPollableInputStream *stream)
+{
+  GUnixInputStream *unix_stream = G_UNIX_INPUT_STREAM (stream);
+  GPollFD poll_fd;
+  gint result;
+
+  poll_fd.fd = unix_stream->priv->fd;
+  poll_fd.events = G_IO_IN;
+
+  do
+    result = g_poll (&poll_fd, 1, 0);
+  while (result == -1 && errno == EINTR);
+
+  return poll_fd.revents != 0;
+}
+
+static GSource *
+g_unix_input_stream_pollable_create_source (GPollableInputStream *stream,
+					    GCancellable         *cancellable)
+{
+  GUnixInputStream *unix_stream = G_UNIX_INPUT_STREAM (stream);
+  GSource *inner_source, *pollable_source;
+ 
+  pollable_source = g_pollable_source_new (G_OBJECT (stream));
+
+  inner_source = _g_fd_source_new (unix_stream->priv->fd, G_IO_IN, cancellable);
+  g_source_add_child_source (pollable_source, inner_source);
+  g_source_unref (inner_source);
+
+  return pollable_source;
+}
diff --git a/gio/gunixoutputstream.c b/gio/gunixoutputstream.c
index a0acc31..1b905de 100644
--- a/gio/gunixoutputstream.c
+++ b/gio/gunixoutputstream.c
@@ -60,8 +60,12 @@ enum {
   PROP_CLOSE_FD
 };
 
-G_DEFINE_TYPE (GUnixOutputStream, g_unix_output_stream, G_TYPE_OUTPUT_STREAM);
+static void g_unix_output_stream_pollable_iface_init (GPollableOutputStreamInterface *iface);
 
+G_DEFINE_TYPE_WITH_CODE (GUnixOutputStream, g_unix_output_stream, G_TYPE_OUTPUT_STREAM,
+			 G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_OUTPUT_STREAM,
+						g_unix_output_stream_pollable_iface_init)
+			 );
 
 struct _GUnixOutputStreamPrivate {
   int fd;
@@ -103,6 +107,9 @@ static gboolean g_unix_output_stream_close_finish (GOutputStream        *stream,
 						   GAsyncResult         *result,
 						   GError              **error);
 
+static gboolean g_unix_output_stream_pollable_is_writable   (GPollableOutputStream *stream);
+static GSource *g_unix_output_stream_pollable_create_source (GPollableOutputStream *stream,
+							     GCancellable         *cancellable);
 
 static void
 g_unix_output_stream_finalize (GObject *object)
@@ -161,6 +168,13 @@ g_unix_output_stream_class_init (GUnixOutputStreamClass *klass)
 }
 
 static void
+g_unix_output_stream_pollable_iface_init (GPollableOutputStreamInterface *iface)
+{
+  iface->is_writable = g_unix_output_stream_pollable_is_writable;
+  iface->create_source = g_unix_output_stream_pollable_create_source;
+}
+
+static void
 g_unix_output_stream_set_property (GObject         *object,
 				   guint            prop_id,
 				   const GValue    *value,
@@ -593,3 +607,36 @@ g_unix_output_stream_close_finish (GOutputStream  *stream,
   /* Failures handled in generic close_finish code */
   return TRUE;
 }
+
+static gboolean
+g_unix_output_stream_pollable_is_writable (GPollableOutputStream *stream)
+{
+  GUnixOutputStream *unix_stream = G_UNIX_OUTPUT_STREAM (stream);
+  GPollFD poll_fd;
+  gint result;
+
+  poll_fd.fd = unix_stream->priv->fd;
+  poll_fd.events = G_IO_OUT;
+
+  do
+    result = g_poll (&poll_fd, 1, 0);
+  while (result == -1 && errno == EINTR);
+
+  return poll_fd.revents != 0;
+}
+
+static GSource *
+g_unix_output_stream_pollable_create_source (GPollableOutputStream *stream,
+					     GCancellable          *cancellable)
+{
+  GUnixOutputStream *unix_stream = G_UNIX_OUTPUT_STREAM (stream);
+  GSource *inner_source, *pollable_source;
+ 
+  pollable_source = g_pollable_source_new (G_OBJECT (stream));
+
+  inner_source = _g_fd_source_new (unix_stream->priv->fd, G_IO_OUT, cancellable);
+  g_source_add_child_source (pollable_source, inner_source);
+  g_source_unref (inner_source);
+
+  return pollable_source;
+}
diff --git a/gio/tests/.gitignore b/gio/tests/.gitignore
index 2613ef2..13d9403 100644
--- a/gio/tests/.gitignore
+++ b/gio/tests/.gitignore
@@ -56,6 +56,7 @@ memory-input-stream
 memory-output-stream
 network-address
 org.gtk.test.enums.xml
+pollable
 proxy
 readwrite
 resolver
diff --git a/gio/tests/Makefile.am b/gio/tests/Makefile.am
index 5e65c08..ad2273a 100644
--- a/gio/tests/Makefile.am
+++ b/gio/tests/Makefile.am
@@ -43,6 +43,7 @@ TEST_PROGS +=	 		\
 	network-address		\
 	gdbus-message		\
 	socket			\
+	pollable		\
 	$(NULL)
 
 if OS_UNIX
@@ -204,6 +205,9 @@ network_address_LDADD	  = $(progs_ldadd)
 socket_SOURCE		  = socket.c
 socket_LDADD		  = $(progs_ldadd)
 
+pollable_SOURCE		  = pollable.c
+pollable_LDADD		  = $(progs_ldadd)
+
 contexts_SOURCES	  = contexts.c
 contexts_LDADD		  = $(progs_ldadd) \
 	$(top_builddir)/gthread/libgthread-2.0.la
diff --git a/gio/tests/pollable.c b/gio/tests/pollable.c
new file mode 100644
index 0000000..8669e2b
--- /dev/null
+++ b/gio/tests/pollable.c
@@ -0,0 +1,240 @@
+/* GLib testing framework examples and tests
+ *
+ * Copyright (C) 2010 Red Hat, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General
+ * Public License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#include <gio/gio.h>
+
+#ifdef G_OS_UNIX
+#include <gio/gunixinputstream.h>
+#include <gio/gunixoutputstream.h>
+#endif
+
+GMainLoop *loop;
+GPollableInputStream *in;
+GOutputStream *out;
+
+static gboolean
+poll_source_callback (GPollableInputStream *in,
+		      gpointer              user_data)
+{
+  GError *error = NULL;
+  char buf[2];
+  gssize nread;
+  gboolean *success = user_data;
+
+  nread = g_pollable_input_stream_read_nonblocking (in, buf, 2, NULL, &error);
+  g_assert_no_error (error);
+  g_assert_cmpint (nread, ==, 2);
+  g_assert_cmpstr (buf, ==, "x");
+
+  *success = TRUE;
+  return FALSE;
+}
+
+static gboolean
+check_source_readability_callback (gpointer user_data)
+{
+  gboolean expected = GPOINTER_TO_INT (user_data);
+  gboolean readable;
+
+  readable = g_pollable_input_stream_is_readable (in);
+  g_assert_cmpint (readable, ==, expected);
+  return FALSE;
+}
+
+static gboolean
+write_callback (gpointer user_data)
+{
+  char *buf = "x";
+  gssize nwrote;
+  GError *error = NULL;
+
+  nwrote = g_output_stream_write (out, buf, 2, NULL, &error);
+  g_assert_no_error (error);
+  g_assert_cmpint (nwrote, ==, 2);
+
+  check_source_readability_callback (GINT_TO_POINTER (TRUE));
+
+  return FALSE;
+}
+
+static gboolean
+check_source_and_quit_callback (gpointer user_data)
+{
+  check_source_readability_callback (user_data);
+  g_main_loop_quit (loop);
+  return FALSE;
+}
+
+static void
+test_streams (void)
+{
+  gboolean readable;
+  GError *error = NULL;
+  char buf[1];
+  gssize nread;
+  GSource *poll_source;
+  gboolean success = FALSE;
+
+  readable = g_pollable_input_stream_is_readable (in);
+  g_assert (!readable);
+
+  nread = g_pollable_input_stream_read_nonblocking (in, buf, 1, NULL, &error);
+  g_assert_cmpint (nread, ==, -1);
+  g_assert_error (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK);
+  g_clear_error (&error);
+
+  /* Create 4 sources, in decreasing order of priority:
+   *   1. poll source on @in
+   *   2. idle source that checks if @in is readable once
+   *      (it won't be) and then removes itself
+   *   3. idle source that writes a byte to @out, checks that
+   *      @in is now readable, and removes itself
+   *   4. idle source that checks if @in is readable once
+   *      (it won't be, since the poll source will fire before
+   *      this one does) and then quits the loop.
+   *
+   * If the poll source triggers before it should, then it will get a
+   * %G_IO_ERROR_WOULD_BLOCK, and if check() fails in either
+   * direction, we will catch it at some point.
+   */
+
+  poll_source = g_pollable_input_stream_create_source (in, NULL);
+  g_source_set_priority (poll_source, 1);
+  g_source_set_callback (poll_source, (GSourceFunc) poll_source_callback, &success, NULL);
+  g_source_attach (poll_source, NULL);
+  g_source_unref (poll_source);
+
+  g_idle_add_full (2, check_source_readability_callback, GINT_TO_POINTER (FALSE), NULL);
+  g_idle_add_full (3, write_callback, NULL, NULL);
+  g_idle_add_full (4, check_source_and_quit_callback, GINT_TO_POINTER (FALSE), NULL);
+
+  loop = g_main_loop_new (NULL, FALSE);
+  g_main_loop_run (loop);
+  g_main_loop_unref (loop);
+
+  g_assert_cmpint (success, ==, TRUE);
+}
+
+#ifdef G_OS_UNIX
+static void
+test_pollable_unix (void)
+{
+  int pipefds[2], status;
+
+  status = pipe (pipefds);
+  g_assert_cmpint (status, ==, 0);
+
+  in = G_POLLABLE_INPUT_STREAM (g_unix_input_stream_new (pipefds[0], TRUE));
+  out = g_unix_output_stream_new (pipefds[1], TRUE);
+
+  test_streams ();
+
+  g_object_unref (in);
+  g_object_unref (out);
+}
+#endif
+
+static void
+client_connected (GObject      *source,
+		  GAsyncResult *result,
+		  gpointer      user_data)
+{
+  GSocketClient *client = G_SOCKET_CLIENT (source);
+  GSocketConnection **conn = user_data;
+  GError *error = NULL;
+
+  *conn = g_socket_client_connect_finish (client, result, &error);
+  g_assert_no_error (error);
+}
+
+static void
+server_connected (GObject      *source,
+		  GAsyncResult *result,
+		  gpointer      user_data)
+{
+  GSocketListener *listener = G_SOCKET_LISTENER (source);
+  GSocketConnection **conn = user_data;
+  GError *error = NULL;
+
+  *conn = g_socket_listener_accept_finish (listener, result, NULL, &error);
+  g_assert_no_error (error);
+}
+
+static void
+test_pollable_socket (void)
+{
+  GInetAddress *iaddr;
+  GSocketAddress *saddr, *effective_address;
+  GSocketListener *listener;
+  GSocketClient *client;
+  GError *error = NULL;
+  GSocketConnection *client_conn = NULL, *server_conn = NULL;
+
+  iaddr = g_inet_address_new_loopback (G_SOCKET_FAMILY_IPV4);
+  saddr = g_inet_socket_address_new (iaddr, 0);
+  g_object_unref (iaddr);
+
+  listener = g_socket_listener_new ();
+  g_socket_listener_add_address (listener, saddr,
+				 G_SOCKET_TYPE_STREAM,
+				 G_SOCKET_PROTOCOL_TCP,
+				 NULL,
+				 &effective_address,
+				 &error);
+  g_assert_no_error (error);
+  g_object_unref (saddr);
+
+  client = g_socket_client_new ();
+
+  g_socket_client_connect_async (client,
+				 G_SOCKET_CONNECTABLE (effective_address),
+				 NULL, client_connected, &client_conn);
+  g_socket_listener_accept_async (listener, NULL,
+				  server_connected, &server_conn);
+
+  while (!client_conn || !server_conn)
+    g_main_context_iteration (NULL, TRUE);
+
+  in = G_POLLABLE_INPUT_STREAM (g_io_stream_get_input_stream (G_IO_STREAM (client_conn)));
+  out = g_io_stream_get_output_stream (G_IO_STREAM (server_conn));
+
+  test_streams ();
+
+  g_object_unref (client_conn);
+  g_object_unref (server_conn);
+  g_object_unref (client);
+  g_object_unref (listener);
+}
+
+int
+main (int   argc,
+      char *argv[])
+{
+  g_type_init ();
+  g_test_init (&argc, &argv, NULL);
+
+#ifdef G_OS_UNIX
+  g_test_add_func ("/pollable/unix", test_pollable_unix);
+#endif
+  g_test_add_func ("/pollable/socket", test_pollable_socket);
+
+  return g_test_run();
+}
+



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]