[glib] gio: use GPollable* to implement fallback read_async/write_async



commit 00ee06e6a332d1415baf5533e34f05a83d64cb02
Author: Dan Winship <danw gnome org>
Date:   Sat Feb 4 16:46:29 2012 -0500

    gio: use GPollable* to implement fallback read_async/write_async
    
    If a GInputStream does not provide a read_async() implementation, but
    does implement GPollableInputStream, then instead of doing
    read-synchronously-in-a-thread, just use
    g_pollable_input_stream_read_nonblocking() and
    g_pollable_input_stream_create_source() to implement an async read in
    the same thread. Similarly for GOutputStream.
    
    Remove a bunch of existing read_async()/write_async() implementations
    that are basically equivalent to the new fallback method.
    
    https://bugzilla.gnome.org/show_bug.cgi?id=673997

 gio/gbufferedinputstream.c  |  195 -------------------------------------------
 gio/gbufferedoutputstream.c |  108 ------------------------
 gio/ginputstream.c          |   71 +++++++++++++++-
 gio/gmemoryinputstream.c    |   57 -------------
 gio/gmemoryoutputstream.c   |   62 --------------
 gio/goutputstream.c         |   66 ++++++++++++++-
 gio/gsocketinputstream.c    |   91 --------------------
 gio/gsocketoutputstream.c   |  111 ++----------------------
 gio/gunixinputstream.c      |  143 ++------------------------------
 gio/gunixoutputstream.c     |  135 ------------------------------
 10 files changed, 150 insertions(+), 889 deletions(-)
---
diff --git a/gio/gbufferedinputstream.c b/gio/gbufferedinputstream.c
index dbe96c7..e62a3de 100644
--- a/gio/gbufferedinputstream.c
+++ b/gio/gbufferedinputstream.c
@@ -100,16 +100,6 @@ static gssize g_buffered_input_stream_read             (GInputStream          *s
                                                         gsize                  count,
                                                         GCancellable          *cancellable,
                                                         GError               **error);
-static void   g_buffered_input_stream_read_async       (GInputStream          *stream,
-                                                        void                  *buffer,
-                                                        gsize                  count,
-                                                        int                    io_priority,
-                                                        GCancellable          *cancellable,
-                                                        GAsyncReadyCallback    callback,
-                                                        gpointer               user_data);
-static gssize g_buffered_input_stream_read_finish      (GInputStream          *stream,
-                                                        GAsyncResult          *result,
-                                                        GError               **error);
 static gssize g_buffered_input_stream_real_fill        (GBufferedInputStream  *stream,
                                                         gssize                 count,
                                                         GCancellable          *cancellable,
@@ -150,8 +140,6 @@ g_buffered_input_stream_class_init (GBufferedInputStreamClass *klass)
   istream_class->skip_async  = g_buffered_input_stream_skip_async;
   istream_class->skip_finish = g_buffered_input_stream_skip_finish;
   istream_class->read_fn = g_buffered_input_stream_read;
-  istream_class->read_async  = g_buffered_input_stream_read_async;
-  istream_class->read_finish = g_buffered_input_stream_read_finish;
 
   bstream_class = G_BUFFERED_INPUT_STREAM_CLASS (klass);
   bstream_class->fill = g_buffered_input_stream_real_fill;
@@ -1019,189 +1007,6 @@ g_buffered_input_stream_real_fill_finish (GBufferedInputStream *stream,
 
 typedef struct
 {
-  gssize bytes_read;
-  gssize count;
-  void *buffer;
-} ReadAsyncData;
-
-static void
-free_read_async_data (gpointer _data)
-{
-  ReadAsyncData *data = _data;
-  g_slice_free (ReadAsyncData, data);
-}
-
-static void
-large_read_callback (GObject *source_object,
-                     GAsyncResult *result,
-                     gpointer user_data)
-{
-  GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
-  ReadAsyncData *data;
-  GError *error;
-  gssize nread;
-
-  data = g_simple_async_result_get_op_res_gpointer (simple);
-
-  error = NULL;
-  nread = g_input_stream_read_finish (G_INPUT_STREAM (source_object),
-                                      result, &error);
-
-  /* Only report the error if we've not already read some data */
-  if (nread < 0 && data->bytes_read == 0)
-    g_simple_async_result_take_error (simple, error);
-  else if (error)
-    g_error_free (error);
-
-  if (nread > 0)
-    data->bytes_read += nread;
-
-  /* Complete immediately, not in idle, since we're already
-   * in a mainloop callout
-   */
-  g_simple_async_result_complete (simple);
-  g_object_unref (simple);
-}
-
-static void
-read_fill_buffer_callback (GObject      *source_object,
-                           GAsyncResult *result,
-                           gpointer      user_data)
-{
-  GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (user_data);
-  GBufferedInputStream *bstream;
-  GBufferedInputStreamPrivate *priv;
-  ReadAsyncData *data;
-  GError *error;
-  gssize nread;
-  gsize available;
-
-  bstream = G_BUFFERED_INPUT_STREAM (source_object);
-  priv = bstream->priv;
-
-  data = g_simple_async_result_get_op_res_gpointer (simple);
-
-  error = NULL;
-  nread = g_buffered_input_stream_fill_finish (bstream,
-                                               result, &error);
-
-  if (nread < 0 && data->bytes_read == 0)
-    g_simple_async_result_take_error (simple, error);
-  else if (error)
-    g_error_free (error);
-
-  if (nread > 0)
-    {
-      available = priv->end - priv->pos;
-      data->count = MIN (data->count, available);
-
-      memcpy ((char *)data->buffer + data->bytes_read, (char *)priv->buffer + priv->pos, data->count);
-      data->bytes_read += data->count;
-      priv->pos += data->count;
-    }
-
-  /* Complete immediately, not in idle, since we're already
-   * in a mainloop callout
-   */
-  g_simple_async_result_complete (simple);
-  g_object_unref (simple);
-}
-
-static void
-g_buffered_input_stream_read_async (GInputStream        *stream,
-                                    void                *buffer,
-                                    gsize                count,
-                                    int                  io_priority,
-                                    GCancellable        *cancellable,
-                                    GAsyncReadyCallback  callback,
-                                    gpointer             user_data)
-{
-  GBufferedInputStream *bstream;
-  GBufferedInputStreamPrivate *priv;
-  GBufferedInputStreamClass *class;
-  GInputStream *base_stream;
-  gsize available;
-  GSimpleAsyncResult *simple;
-  ReadAsyncData *data;
-
-  bstream = G_BUFFERED_INPUT_STREAM (stream);
-  priv = bstream->priv;
-
-  data = g_slice_new (ReadAsyncData);
-  data->buffer = buffer;
-  data->bytes_read = 0;
-  simple = g_simple_async_result_new (G_OBJECT (stream),
-                                      callback, user_data,
-                                      g_buffered_input_stream_read_async);
-  g_simple_async_result_set_op_res_gpointer (simple, data, free_read_async_data);
-
-  available = priv->end - priv->pos;
-
-  if (count <= available)
-    {
-      memcpy (buffer, priv->buffer + priv->pos, count);
-      priv->pos += count;
-      data->bytes_read = count;
-
-      g_simple_async_result_complete_in_idle (simple);
-      g_object_unref (simple);
-      return;
-    }
-
-
-  /* Full request not available, read all currently available
-   * and request refill for more
-   */
-
-  memcpy (buffer, priv->buffer + priv->pos, available);
-  priv->pos = 0;
-  priv->end = 0;
-
-  count -= available;
-
-  data->bytes_read = available;
-  data->count = count;
-
-  if (count > priv->len)
-    {
-      /* Large request, shortcut buffer */
-
-      base_stream = G_FILTER_INPUT_STREAM (stream)->base_stream;
-
-      g_input_stream_read_async (base_stream,
-                                 (char *)buffer + data->bytes_read,
-                                 count,
-                                 io_priority, cancellable,
-                                 large_read_callback,
-                                 simple);
-    }
-  else
-    {
-      class = G_BUFFERED_INPUT_STREAM_GET_CLASS (stream);
-      class->fill_async (bstream, priv->len, io_priority, cancellable,
-                         read_fill_buffer_callback, simple);
-    }
-}
-
-static gssize
-g_buffered_input_stream_read_finish (GInputStream   *stream,
-                                     GAsyncResult   *result,
-                                     GError        **error)
-{
-  GSimpleAsyncResult *simple;
-  ReadAsyncData *data;
-
-  simple = G_SIMPLE_ASYNC_RESULT (result);
-
-  g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_buffered_input_stream_read_async);
-
-  data = g_simple_async_result_get_op_res_gpointer (simple);
-
-  return data->bytes_read;
-}
-
-typedef struct
-{
   gssize bytes_skipped;
   gssize count;
 } SkipAsyncData;
diff --git a/gio/gbufferedoutputstream.c b/gio/gbufferedoutputstream.c
index df8178e..f624d25 100644
--- a/gio/gbufferedoutputstream.c
+++ b/gio/gbufferedoutputstream.c
@@ -88,16 +88,6 @@ static gboolean g_buffered_output_stream_close        (GOutputStream  *stream,
                                                        GCancellable   *cancellable,
                                                        GError        **error);
 
-static void     g_buffered_output_stream_write_async  (GOutputStream        *stream,
-                                                       const void           *buffer,
-                                                       gsize                 count,
-                                                       int                   io_priority,
-                                                       GCancellable         *cancellable,
-                                                       GAsyncReadyCallback   callback,
-                                                       gpointer              data);
-static gssize   g_buffered_output_stream_write_finish (GOutputStream        *stream,
-                                                       GAsyncResult         *result,
-                                                       GError              **error);
 static void     g_buffered_output_stream_flush_async  (GOutputStream        *stream,
                                                        int                   io_priority,
                                                        GCancellable         *cancellable,
@@ -137,8 +127,6 @@ g_buffered_output_stream_class_init (GBufferedOutputStreamClass *klass)
   ostream_class->write_fn = g_buffered_output_stream_write;
   ostream_class->flush = g_buffered_output_stream_flush;
   ostream_class->close_fn = g_buffered_output_stream_close;
-  ostream_class->write_async  = g_buffered_output_stream_write_async;
-  ostream_class->write_finish = g_buffered_output_stream_write_finish;
   ostream_class->flush_async  = g_buffered_output_stream_flush_async;
   ostream_class->flush_finish = g_buffered_output_stream_flush_finish;
   ostream_class->close_async  = g_buffered_output_stream_close_async;
@@ -578,102 +566,6 @@ flush_buffer_thread (GSimpleAsyncResult *result,
     g_simple_async_result_take_error (result, error);
 }
 
-typedef struct {
-    
-  FlushData fdata;
-
-  gsize  count;
-  const void  *buffer;
-
-} WriteData;
-
-static void 
-free_write_data (gpointer data)
-{
-  g_slice_free (WriteData, data);
-}
-
-static void
-g_buffered_output_stream_write_async (GOutputStream        *stream,
-                                      const void           *buffer,
-                                      gsize                 count,
-                                      int                   io_priority,
-                                      GCancellable         *cancellable,
-                                      GAsyncReadyCallback   callback,
-                                      gpointer              data)
-{
-  GBufferedOutputStream *buffered_stream;
-  GBufferedOutputStreamPrivate *priv;
-  GSimpleAsyncResult *res;
-  WriteData *wdata;
-
-  buffered_stream = G_BUFFERED_OUTPUT_STREAM (stream);
-  priv = buffered_stream->priv;
-
-  wdata = g_slice_new (WriteData);
-  wdata->count  = count;
-  wdata->buffer = buffer;
-
-  res = g_simple_async_result_new (G_OBJECT (stream),
-                                   callback,
-                                   data,
-                                   g_buffered_output_stream_write_async);
-
-  g_simple_async_result_set_op_res_gpointer (res, wdata, free_write_data);
-
-  /* if we have space left directly call the
-   * callback (from idle) otherwise schedule a buffer 
-   * flush in the thread. In both cases the actual
-   * copying of the data to the buffer will be done in
-   * the write_finish () func since that should
-   * be fast enough */
-  if (priv->len - priv->pos > 0)
-    {
-      g_simple_async_result_complete_in_idle (res);
-    }
-  else
-    {
-      wdata->fdata.flush_stream = FALSE;
-      wdata->fdata.close_stream = FALSE;
-      g_simple_async_result_run_in_thread (res,
-                                           flush_buffer_thread,
-                                           io_priority,
-                                           cancellable);
-    }
-    g_object_unref (res);
-}
-
-static gssize
-g_buffered_output_stream_write_finish (GOutputStream        *stream,
-                                       GAsyncResult         *result,
-                                       GError              **error)
-{
-  GBufferedOutputStreamPrivate *priv;
-  GBufferedOutputStream        *buffered_stream;
-  GSimpleAsyncResult *simple;
-  WriteData          *wdata;
-  gssize              count;
-
-  simple = G_SIMPLE_ASYNC_RESULT (result);
-  buffered_stream = G_BUFFERED_OUTPUT_STREAM (stream);
-  priv = buffered_stream->priv;
-
-  g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == 
-            g_buffered_output_stream_write_async);
-
-  wdata = g_simple_async_result_get_op_res_gpointer (simple);
-
-  /* Now do the real copying of data to the buffer */
-  count = priv->len - priv->pos; 
-  count = MIN (wdata->count, count);
-
-  memcpy (priv->buffer + priv->pos, wdata->buffer, count);
-  
-  priv->pos += count;
-
-  return count;
-}
-
 static void
 g_buffered_output_stream_flush_async (GOutputStream        *stream,
                                       int                   io_priority,
diff --git a/gio/ginputstream.c b/gio/ginputstream.c
index 7160c23..c410d52 100644
--- a/gio/ginputstream.c
+++ b/gio/ginputstream.c
@@ -30,7 +30,7 @@
 #include "gasyncresult.h"
 #include "gsimpleasyncresult.h"
 #include "gioerror.h"
-
+#include "gpollableinputstream.h"
 
 /**
  * SECTION:ginputstream
@@ -925,6 +925,10 @@ typedef struct {
   void              *buffer;
   gsize              count_requested;
   gssize             count_read;
+
+  GCancellable      *cancellable;
+  gint               io_priority;
+  gboolean           need_idle;
 } ReadData;
 
 static void
@@ -947,6 +951,60 @@ read_async_thread (GSimpleAsyncResult *res,
     g_simple_async_result_take_error (res, error);
 }
 
+static void read_async_pollable (GPollableInputStream *stream,
+				 GSimpleAsyncResult   *result);
+
+static gboolean
+read_async_pollable_ready (GPollableInputStream *stream,
+			   gpointer              user_data)
+{
+  GSimpleAsyncResult *result = user_data;
+
+  read_async_pollable (stream, result);
+  return FALSE;
+}
+
+static void
+read_async_pollable (GPollableInputStream *stream,
+		     GSimpleAsyncResult   *result)
+{
+  GError *error = NULL;
+  ReadData *op = g_simple_async_result_get_op_res_gpointer (result);
+
+  if (g_cancellable_set_error_if_cancelled (op->cancellable, &error))
+    op->count_read = -1;
+  else
+    {
+      op->count_read = G_POLLABLE_INPUT_STREAM_GET_INTERFACE (stream)->
+	read_nonblocking (stream, op->buffer, op->count_requested, &error);
+    }
+
+  if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
+    {
+      GSource *source;
+
+      g_error_free (error);
+      op->need_idle = FALSE;
+
+      source = g_pollable_input_stream_create_source (stream, op->cancellable);
+      g_source_set_callback (source,
+			     (GSourceFunc) read_async_pollable_ready,
+			     g_object_ref (result), g_object_unref);
+      g_source_set_priority (source, op->io_priority);
+      g_source_attach (source, g_main_context_get_thread_default ());
+      g_source_unref (source);
+      return;
+    }
+
+  if (op->count_read == -1)
+    g_simple_async_result_take_error (result, error);
+
+  if (op->need_idle)
+    g_simple_async_result_complete_in_idle (result);
+  else
+    g_simple_async_result_complete (result);
+}
+
 static void
 g_input_stream_real_read_async (GInputStream        *stream,
 				void                *buffer,
@@ -964,8 +1022,15 @@ g_input_stream_real_read_async (GInputStream        *stream,
   g_simple_async_result_set_op_res_gpointer (res, op, g_free);
   op->buffer = buffer;
   op->count_requested = count;
-  
-  g_simple_async_result_run_in_thread (res, read_async_thread, io_priority, cancellable);
+  op->cancellable = cancellable ? g_object_ref (cancellable) : NULL;
+  op->io_priority = io_priority;
+  op->need_idle = TRUE;
+
+  if (G_IS_POLLABLE_INPUT_STREAM (stream) &&
+      g_pollable_input_stream_can_poll (G_POLLABLE_INPUT_STREAM (stream)))
+    read_async_pollable (G_POLLABLE_INPUT_STREAM (stream), res);
+  else
+    g_simple_async_result_run_in_thread (res, read_async_thread, io_priority, cancellable);
   g_object_unref (res);
 }
 
diff --git a/gio/gmemoryinputstream.c b/gio/gmemoryinputstream.c
index e657d5b..dac0ac1 100644
--- a/gio/gmemoryinputstream.c
+++ b/gio/gmemoryinputstream.c
@@ -70,16 +70,6 @@ static gssize   g_memory_input_stream_skip         (GInputStream         *stream
 static gboolean g_memory_input_stream_close        (GInputStream         *stream,
 						    GCancellable         *cancellable,
 						    GError              **error);
-static void     g_memory_input_stream_read_async   (GInputStream         *stream,
-						    void                 *buffer,
-						    gsize                 count,
-						    int                   io_priority,
-						    GCancellable         *cancellable,
-						    GAsyncReadyCallback   callback,
-						    gpointer              user_data);
-static gssize   g_memory_input_stream_read_finish  (GInputStream         *stream,
-						    GAsyncResult         *result,
-						    GError              **error);
 static void     g_memory_input_stream_skip_async   (GInputStream         *stream,
 						    gsize                 count,
 						    int                   io_priority,
@@ -143,8 +133,6 @@ g_memory_input_stream_class_init (GMemoryInputStreamClass *klass)
   istream_class->skip  = g_memory_input_stream_skip;
   istream_class->close_fn = g_memory_input_stream_close;
 
-  istream_class->read_async  = g_memory_input_stream_read_async;
-  istream_class->read_finish  = g_memory_input_stream_read_finish;
   istream_class->skip_async  = g_memory_input_stream_skip_async;
   istream_class->skip_finish  = g_memory_input_stream_skip_finish;
   istream_class->close_async = g_memory_input_stream_close_async;
@@ -353,51 +341,6 @@ g_memory_input_stream_close (GInputStream  *stream,
 }
 
 static void
-g_memory_input_stream_read_async (GInputStream        *stream,
-                                  void                *buffer,
-                                  gsize                count,
-                                  int                  io_priority,
-                                  GCancellable        *cancellable,
-                                  GAsyncReadyCallback  callback,
-                                  gpointer             user_data)
-{
-  GSimpleAsyncResult *simple;
-  GError *error = NULL;
-  gssize nread;
-
-  nread = G_INPUT_STREAM_GET_CLASS (stream)->read_fn (stream,
-						      buffer,
-						      count,
-						      cancellable,
-						      &error);
-  simple = g_simple_async_result_new (G_OBJECT (stream),
-				      callback,
-				      user_data,
-				      g_memory_input_stream_read_async);
-  if (error)
-    g_simple_async_result_take_error (simple, error);
-  else
-    g_simple_async_result_set_op_res_gssize (simple, nread);
-  g_simple_async_result_complete_in_idle (simple);
-  g_object_unref (simple);
-}
-
-static gssize
-g_memory_input_stream_read_finish (GInputStream  *stream,
-                                   GAsyncResult  *result,
-                                   GError       **error)
-{
-  GSimpleAsyncResult *simple;
-  gssize nread;
-
-  simple = G_SIMPLE_ASYNC_RESULT (result);
-  g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_memory_input_stream_read_async);
-  
-  nread = g_simple_async_result_get_op_res_gssize (simple);
-  return nread;
-}
-
-static void
 g_memory_input_stream_skip_async (GInputStream        *stream,
                                   gsize                count,
                                   int                  io_priority,
diff --git a/gio/gmemoryoutputstream.c b/gio/gmemoryoutputstream.c
index b1da60d..5a62fbb 100644
--- a/gio/gmemoryoutputstream.c
+++ b/gio/gmemoryoutputstream.c
@@ -89,16 +89,6 @@ static gboolean g_memory_output_stream_close       (GOutputStream  *stream,
                                                     GCancellable   *cancellable,
                                                     GError        **error);
 
-static void     g_memory_output_stream_write_async  (GOutputStream        *stream,
-                                                     const void           *buffer,
-                                                     gsize                 count,
-                                                     int                   io_priority,
-                                                     GCancellable         *cancellable,
-                                                     GAsyncReadyCallback   callback,
-                                                     gpointer              data);
-static gssize   g_memory_output_stream_write_finish (GOutputStream        *stream,
-                                                     GAsyncResult         *result,
-                                                     GError              **error);
 static void     g_memory_output_stream_close_async  (GOutputStream        *stream,
                                                      int                   io_priority,
                                                      GCancellable         *cancellable,
@@ -152,8 +142,6 @@ g_memory_output_stream_class_init (GMemoryOutputStreamClass *klass)
 
   ostream_class->write_fn = g_memory_output_stream_write;
   ostream_class->close_fn = g_memory_output_stream_close;
-  ostream_class->write_async  = g_memory_output_stream_write_async;
-  ostream_class->write_finish = g_memory_output_stream_write_finish;
   ostream_class->close_async  = g_memory_output_stream_close_async;
   ostream_class->close_finish = g_memory_output_stream_close_finish;
 
@@ -629,56 +617,6 @@ g_memory_output_stream_close (GOutputStream  *stream,
 }
 
 static void
-g_memory_output_stream_write_async (GOutputStream       *stream,
-                                    const void          *buffer,
-                                    gsize                count,
-                                    int                  io_priority,
-                                    GCancellable        *cancellable,
-                                    GAsyncReadyCallback  callback,
-                                    gpointer             data)
-{
-  GSimpleAsyncResult *simple;
-  GError *error = NULL;
-  gssize nwritten;
-
-  nwritten = G_OUTPUT_STREAM_GET_CLASS (stream)->write_fn (stream,
-							   buffer,
-							   count,
-							   cancellable,
-							   &error);
-
-  simple = g_simple_async_result_new (G_OBJECT (stream),
-                                      callback,
-                                      data,
-                                      g_memory_output_stream_write_async);
-
-  if (error)
-    g_simple_async_result_take_error (simple, error);
-  else
-    g_simple_async_result_set_op_res_gssize (simple, nwritten);
-  g_simple_async_result_complete_in_idle (simple);
-  g_object_unref (simple);
-}
-
-static gssize
-g_memory_output_stream_write_finish (GOutputStream  *stream,
-                                     GAsyncResult   *result,
-                                     GError        **error)
-{
-  GSimpleAsyncResult *simple;
-  gssize nwritten;
-
-  simple = G_SIMPLE_ASYNC_RESULT (result);
-
-  g_warn_if_fail (g_simple_async_result_get_source_tag (simple) ==
-                  g_memory_output_stream_write_async);
-
-  nwritten = g_simple_async_result_get_op_res_gssize (simple);
-
-  return nwritten;
-}
-
-static void
 g_memory_output_stream_close_async (GOutputStream       *stream,
                                     int                  io_priority,
                                     GCancellable        *cancellable,
diff --git a/gio/goutputstream.c b/gio/goutputstream.c
index d971046..9d3815d 100644
--- a/gio/goutputstream.c
+++ b/gio/goutputstream.c
@@ -28,7 +28,7 @@
 #include "ginputstream.h"
 #include "gioerror.h"
 #include "glibintl.h"
-
+#include "gpollableoutputstream.h"
 
 /**
  * SECTION:goutputstream
@@ -1266,6 +1266,10 @@ typedef struct {
   const void         *buffer;
   gsize               count_requested;
   gssize              count_written;
+
+  GCancellable       *cancellable;
+  gint                io_priority;
+  gboolean            need_idle;
 } WriteData;
 
 static void
@@ -1285,6 +1289,60 @@ write_async_thread (GSimpleAsyncResult *res,
     g_simple_async_result_take_error (res, error);
 }
 
+static void write_async_pollable (GPollableOutputStream *stream,
+				  GSimpleAsyncResult    *result);
+
+static gboolean
+write_async_pollable_ready (GPollableOutputStream *stream,
+			    gpointer               user_data)
+{
+  GSimpleAsyncResult *result = user_data;
+
+  write_async_pollable (stream, result);
+  return FALSE;
+}
+
+static void
+write_async_pollable (GPollableOutputStream *stream,
+		      GSimpleAsyncResult    *result)
+{
+  GError *error = NULL;
+  WriteData *op = g_simple_async_result_get_op_res_gpointer (result);
+
+  if (g_cancellable_set_error_if_cancelled (op->cancellable, &error))
+    op->count_written = -1;
+  else
+    {
+      op->count_written = G_POLLABLE_OUTPUT_STREAM_GET_INTERFACE (stream)->
+	write_nonblocking (stream, op->buffer, op->count_requested, &error);
+    }
+
+  if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
+    {
+      GSource *source;
+
+      g_error_free (error);
+      op->need_idle = FALSE;
+
+      source = g_pollable_output_stream_create_source (stream, op->cancellable);
+      g_source_set_callback (source,
+			     (GSourceFunc) write_async_pollable_ready,
+			     g_object_ref (result), g_object_unref);
+      g_source_set_priority (source, op->io_priority);
+      g_source_attach (source, g_main_context_get_thread_default ());
+      g_source_unref (source);
+      return;
+    }
+
+  if (op->count_written == -1)
+    g_simple_async_result_take_error (result, error);
+
+  if (op->need_idle)
+    g_simple_async_result_complete_in_idle (result);
+  else
+    g_simple_async_result_complete (result);
+}
+
 static void
 g_output_stream_real_write_async (GOutputStream       *stream,
                                   const void          *buffer,
@@ -1303,7 +1361,11 @@ g_output_stream_real_write_async (GOutputStream       *stream,
   op->buffer = buffer;
   op->count_requested = count;
   
-  g_simple_async_result_run_in_thread (res, write_async_thread, io_priority, cancellable);
+  if (G_IS_POLLABLE_OUTPUT_STREAM (stream) &&
+      g_pollable_output_stream_can_poll (G_POLLABLE_OUTPUT_STREAM (stream)))
+    write_async_pollable (G_POLLABLE_OUTPUT_STREAM (stream), res);
+  else
+    g_simple_async_result_run_in_thread (res, write_async_thread, io_priority, cancellable);
   g_object_unref (res);
 }
 
diff --git a/gio/gsocketinputstream.c b/gio/gsocketinputstream.c
index 3736616..89e8a84 100644
--- a/gio/gsocketinputstream.c
+++ b/gio/gsocketinputstream.c
@@ -133,95 +133,6 @@ g_socket_input_stream_read (GInputStream  *stream,
 }
 
 static gboolean
-g_socket_input_stream_read_ready (GSocket *socket,
-                                  GIOCondition condition,
-				  GSocketInputStream *stream)
-{
-  GSimpleAsyncResult *simple;
-  GError *error = NULL;
-  gssize result;
-
-  result = g_socket_receive_with_blocking (stream->priv->socket,
-					   stream->priv->buffer,
-					   stream->priv->count,
-					   FALSE,
-					   stream->priv->cancellable,
-					   &error);
-
-  if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
-    return TRUE;
-
-  simple = stream->priv->result;
-  stream->priv->result = NULL;
-
-  if (result >= 0)
-    g_simple_async_result_set_op_res_gssize (simple, result);
-
-  if (error)
-    g_simple_async_result_take_error (simple, error);
-
-  if (stream->priv->cancellable)
-    g_object_unref (stream->priv->cancellable);
-
-  g_simple_async_result_complete (simple);
-  g_object_unref (simple);
-
-  return FALSE;
-}
-
-static void
-g_socket_input_stream_read_async (GInputStream        *stream,
-                                  void                *buffer,
-                                  gsize                count,
-                                  gint                 io_priority,
-                                  GCancellable        *cancellable,
-                                  GAsyncReadyCallback  callback,
-                                  gpointer             user_data)
-{
-  GSocketInputStream *input_stream = G_SOCKET_INPUT_STREAM (stream);
-  GSource *source;
-
-  g_assert (input_stream->priv->result == NULL);
-
-  input_stream->priv->result =
-    g_simple_async_result_new (G_OBJECT (stream), callback, user_data,
-                               g_socket_input_stream_read_async);
-  if (cancellable)
-    g_object_ref (cancellable);
-  input_stream->priv->cancellable = cancellable;
-  input_stream->priv->buffer = buffer;
-  input_stream->priv->count = count;
-
-  source = g_socket_create_source (input_stream->priv->socket,
-				   G_IO_IN | G_IO_HUP | G_IO_ERR,
-				   cancellable);
-  g_source_set_callback (source,
-			 (GSourceFunc) g_socket_input_stream_read_ready,
-			 g_object_ref (input_stream), g_object_unref);
-  g_source_attach (source, g_main_context_get_thread_default ());
-  g_source_unref (source);
-}
-
-static gssize
-g_socket_input_stream_read_finish (GInputStream  *stream,
-                                   GAsyncResult  *result,
-                                   GError       **error)
-{
-  GSimpleAsyncResult *simple;
-  gssize count;
-
-  g_return_val_if_fail (G_IS_SOCKET_INPUT_STREAM (stream), -1);
-
-  simple = G_SIMPLE_ASYNC_RESULT (result);
-
-  g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_socket_input_stream_read_async);
-
-  count = g_simple_async_result_get_op_res_gssize (simple);
-
-  return count;
-}
-
-static gboolean
 g_socket_input_stream_pollable_is_readable (GPollableInputStream *pollable)
 {
   GSocketInputStream *input_stream = G_SOCKET_INPUT_STREAM (pollable);
@@ -282,8 +193,6 @@ g_socket_input_stream_class_init (GSocketInputStreamClass *klass)
   gobject_class->set_property = g_socket_input_stream_set_property;
 
   ginputstream_class->read_fn = g_socket_input_stream_read;
-  ginputstream_class->read_async = g_socket_input_stream_read_async;
-  ginputstream_class->read_finish = g_socket_input_stream_read_finish;
 
   g_object_class_install_property (gobject_class, PROP_SOCKET,
 				   g_param_spec_object ("socket",
diff --git a/gio/gsocketoutputstream.c b/gio/gsocketoutputstream.c
index 2320b17..145f009 100644
--- a/gio/gsocketoutputstream.c
+++ b/gio/gsocketoutputstream.c
@@ -137,100 +137,24 @@ g_socket_output_stream_write (GOutputStream  *stream,
 }
 
 static gboolean
-g_socket_output_stream_write_ready (GSocket *socket,
-                                    GIOCondition condition,
-				    GSocketOutputStream *stream)
+g_socket_output_stream_pollable_is_writable (GPollableOutputStream *pollable)
 {
-  GSimpleAsyncResult *simple;
-  GError *error = NULL;
-  gssize result;
-
-  result = g_socket_send_with_blocking (stream->priv->socket,
-					stream->priv->buffer,
-					stream->priv->count,
-					FALSE,
-					stream->priv->cancellable,
-					&error);
-
-  if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
-    return TRUE;
-
-  simple = stream->priv->result;
-  stream->priv->result = NULL;
-
-  if (result >= 0)
-    g_simple_async_result_set_op_res_gssize (simple, result);
-
-  if (error)
-    g_simple_async_result_take_error (simple, error);
-
-  if (stream->priv->cancellable)
-    g_object_unref (stream->priv->cancellable);
-
-  g_simple_async_result_complete (simple);
-  g_object_unref (simple);
-
-  return FALSE;
-}
+  GSocketOutputStream *output_stream = G_SOCKET_OUTPUT_STREAM (pollable);
 
-static void
-g_socket_output_stream_write_async (GOutputStream        *stream,
-                                    const void           *buffer,
-                                    gsize                 count,
-                                    gint                  io_priority,
-                                    GCancellable         *cancellable,
-                                    GAsyncReadyCallback   callback,
-                                    gpointer              user_data)
-{
-  GSocketOutputStream *output_stream = G_SOCKET_OUTPUT_STREAM (stream);
-  GSource *source;
-
-  g_assert (output_stream->priv->result == NULL);
-
-  output_stream->priv->result =
-    g_simple_async_result_new (G_OBJECT (stream), callback, user_data,
-                               g_socket_output_stream_write_async);
-  if (cancellable)
-    g_object_ref (cancellable);
-  output_stream->priv->cancellable = cancellable;
-  output_stream->priv->buffer = buffer;
-  output_stream->priv->count = count;
-
-  source = g_socket_create_source (output_stream->priv->socket,
-				   G_IO_OUT | G_IO_HUP | G_IO_ERR,
-				   cancellable);
-  g_source_set_callback (source,
-			 (GSourceFunc) g_socket_output_stream_write_ready,
-			 g_object_ref (output_stream), g_object_unref);
-  g_source_attach (source, g_main_context_get_thread_default ());
-  g_source_unref (source);
+  return g_socket_condition_check (output_stream->priv->socket, G_IO_OUT);
 }
 
 static gssize
-g_socket_output_stream_write_finish (GOutputStream  *stream,
-                                     GAsyncResult   *result,
-                                     GError        **error)
-{
-  GSimpleAsyncResult *simple;
-  gssize count;
-
-  g_return_val_if_fail (G_IS_SOCKET_OUTPUT_STREAM (stream), -1);
-
-  simple = G_SIMPLE_ASYNC_RESULT (result);
-
-  g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_socket_output_stream_write_async);
-
-  count = g_simple_async_result_get_op_res_gssize (simple);
-
-  return count;
-}
-
-static gboolean
-g_socket_output_stream_pollable_is_writable (GPollableOutputStream *pollable)
+g_socket_output_stream_pollable_write_nonblocking (GPollableOutputStream  *pollable,
+						   const void             *buffer,
+						   gsize                   size,
+						   GError                **error)
 {
   GSocketOutputStream *output_stream = G_SOCKET_OUTPUT_STREAM (pollable);
 
-  return g_socket_condition_check (output_stream->priv->socket, G_IO_OUT);
+  return g_socket_send_with_blocking (output_stream->priv->socket,
+				      buffer, size, FALSE,
+				      NULL, error);
 }
 
 static GSource *
@@ -250,19 +174,6 @@ g_socket_output_stream_pollable_create_source (GPollableOutputStream *pollable,
   return pollable_source;
 }
 
-static gssize
-g_socket_output_stream_pollable_write_nonblocking (GPollableOutputStream  *pollable,
-						   const void             *buffer,
-						   gsize                   size,
-						   GError                **error)
-{
-  GSocketOutputStream *output_stream = G_SOCKET_OUTPUT_STREAM (pollable);
-
-  return g_socket_send_with_blocking (output_stream->priv->socket,
-				      buffer, size, FALSE,
-				      NULL, error);
-}
-
 #ifdef G_OS_UNIX
 static int
 g_socket_output_stream_get_fd (GFileDescriptorBased *fd_based)
@@ -286,8 +197,6 @@ g_socket_output_stream_class_init (GSocketOutputStreamClass *klass)
   gobject_class->set_property = g_socket_output_stream_set_property;
 
   goutputstream_class->write_fn = g_socket_output_stream_write;
-  goutputstream_class->write_async = g_socket_output_stream_write_async;
-  goutputstream_class->write_finish = g_socket_output_stream_write_finish;
 
   g_object_class_install_property (gobject_class, PROP_SOCKET,
 				   g_param_spec_object ("socket",
diff --git a/gio/gunixinputstream.c b/gio/gunixinputstream.c
index 65a0dbd..a1639f6 100644
--- a/gio/gunixinputstream.c
+++ b/gio/gunixinputstream.c
@@ -95,16 +95,6 @@ static gssize   g_unix_input_stream_read         (GInputStream         *stream,
 static gboolean g_unix_input_stream_close        (GInputStream         *stream,
 						  GCancellable         *cancellable,
 						  GError              **error);
-static void     g_unix_input_stream_read_async   (GInputStream         *stream,
-						  void                 *buffer,
-						  gsize                 count,
-						  int                   io_priority,
-						  GCancellable         *cancellable,
-						  GAsyncReadyCallback   callback,
-						  gpointer              data);
-static gssize   g_unix_input_stream_read_finish  (GInputStream         *stream,
-						  GAsyncResult         *result,
-						  GError              **error);
 static void     g_unix_input_stream_skip_async   (GInputStream         *stream,
 						  gsize                 count,
 						  int                   io_priority,
@@ -123,6 +113,7 @@ static gboolean g_unix_input_stream_close_finish (GInputStream         *stream,
 						  GAsyncResult         *result,
 						  GError              **error);
 
+static gboolean g_unix_input_stream_pollable_can_poll      (GPollableInputStream *stream);
 static gboolean g_unix_input_stream_pollable_is_readable   (GPollableInputStream *stream);
 static GSource *g_unix_input_stream_pollable_create_source (GPollableInputStream *stream,
 							    GCancellable         *cancellable);
@@ -147,8 +138,6 @@ g_unix_input_stream_class_init (GUnixInputStreamClass *klass)
 
   stream_class->read_fn = g_unix_input_stream_read;
   stream_class->close_fn = g_unix_input_stream_close;
-  stream_class->read_async = g_unix_input_stream_read_async;
-  stream_class->read_finish = g_unix_input_stream_read_finish;
   if (0)
     {
       /* TODO: Implement instead of using fallbacks */
@@ -192,6 +181,7 @@ g_unix_input_stream_class_init (GUnixInputStreamClass *klass)
 static void
 g_unix_input_stream_pollable_iface_init (GPollableInputStreamInterface *iface)
 {
+  iface->can_poll = g_unix_input_stream_pollable_can_poll;
   iface->is_readable = g_unix_input_stream_pollable_is_readable;
   iface->create_source = g_unix_input_stream_pollable_create_source;
 }
@@ -454,129 +444,6 @@ g_unix_input_stream_close (GInputStream  *stream,
   return res != -1;
 }
 
-typedef struct {
-  gsize count;
-  void *buffer;
-  GAsyncReadyCallback callback;
-  gpointer user_data;
-  GCancellable *cancellable;
-  GUnixInputStream *stream;
-} ReadAsyncData;
-
-static gboolean
-read_async_cb (int            fd,
-               GIOCondition   condition,
-               ReadAsyncData *data)
-{
-  GSimpleAsyncResult *simple;
-  GError *error = NULL;
-  gssize count_read;
-
-  /* We know that we can read from fd once without blocking */
-  while (1)
-    {
-      if (g_cancellable_set_error_if_cancelled (data->cancellable, &error))
-	{
-	  count_read = -1;
-	  break;
-	}
-      count_read = read (data->stream->priv->fd, data->buffer, data->count);
-      if (count_read == -1)
-	{
-          int errsv = errno;
-
-	  if (errsv == EINTR || errsv == EAGAIN)
-	    return TRUE;
-	  
-	  g_set_error (&error, G_IO_ERROR,
-		       g_io_error_from_errno (errsv),
-		       _("Error reading from file descriptor: %s"),
-		       g_strerror (errsv));
-	}
-      break;
-    }
-
-  simple = g_simple_async_result_new (G_OBJECT (data->stream),
-				      data->callback,
-				      data->user_data,
-				      g_unix_input_stream_read_async);
-
-  g_simple_async_result_set_op_res_gssize (simple, count_read);
-
-  if (count_read == -1)
-    g_simple_async_result_take_error (simple, error);
-
-  /* Complete immediately, not in idle, since we're already in a mainloop callout */
-  g_simple_async_result_complete (simple);
-  g_object_unref (simple);
-
-  return FALSE;
-}
-
-static void
-g_unix_input_stream_read_async (GInputStream        *stream,
-				void                *buffer,
-				gsize                count,
-				int                  io_priority,
-				GCancellable        *cancellable,
-				GAsyncReadyCallback  callback,
-				gpointer             user_data)
-{
-  GSource *source;
-  GUnixInputStream *unix_stream;
-  ReadAsyncData *data;
-
-  unix_stream = G_UNIX_INPUT_STREAM (stream);
-
-  if (!unix_stream->priv->is_pipe_or_socket)
-    {
-      G_INPUT_STREAM_CLASS (g_unix_input_stream_parent_class)->
-	read_async (stream, buffer, count, io_priority,
-		    cancellable, callback, user_data);
-      return;
-    }
-
-  data = g_new0 (ReadAsyncData, 1);
-  data->count = count;
-  data->buffer = buffer;
-  data->callback = callback;
-  data->user_data = user_data;
-  data->cancellable = cancellable;
-  data->stream = unix_stream;
-
-  source = _g_fd_source_new (unix_stream->priv->fd,
-			     G_IO_IN,
-			     cancellable);
-  g_source_set_name (source, "GUnixInputStream");
-  
-  g_source_set_callback (source, (GSourceFunc)read_async_cb, data, g_free);
-  g_source_attach (source, g_main_context_get_thread_default ());
- 
-  g_source_unref (source);
-}
-
-static gssize
-g_unix_input_stream_read_finish (GInputStream  *stream,
-				 GAsyncResult  *result,
-				 GError       **error)
-{
-  GUnixInputStream *unix_stream = G_UNIX_INPUT_STREAM (stream);
-  GSimpleAsyncResult *simple;
-  gssize nread;
-
-  if (!unix_stream->priv->is_pipe_or_socket)
-    {
-      return G_INPUT_STREAM_CLASS (g_unix_input_stream_parent_class)->
-	read_finish (stream, result, error);
-    }
-
-  simple = G_SIMPLE_ASYNC_RESULT (result);
-  g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_unix_input_stream_read_async);
-  
-  nread = g_simple_async_result_get_op_res_gssize (simple);
-  return nread;
-}
-
 static void
 g_unix_input_stream_skip_async (GInputStream        *stream,
 				gsize                count,
@@ -696,6 +563,12 @@ g_unix_input_stream_close_finish (GInputStream  *stream,
 }
 
 static gboolean
+g_unix_input_stream_pollable_can_poll (GPollableInputStream *stream)
+{
+  return G_UNIX_INPUT_STREAM (stream)->priv->is_pipe_or_socket;
+}
+
+static gboolean
 g_unix_input_stream_pollable_is_readable (GPollableInputStream *stream)
 {
   GUnixInputStream *unix_stream = G_UNIX_INPUT_STREAM (stream);
diff --git a/gio/gunixoutputstream.c b/gio/gunixoutputstream.c
index 86f5277..abd14a4 100644
--- a/gio/gunixoutputstream.c
+++ b/gio/gunixoutputstream.c
@@ -95,16 +95,6 @@ static gssize   g_unix_output_stream_write        (GOutputStream        *stream,
 static gboolean g_unix_output_stream_close        (GOutputStream        *stream,
 						   GCancellable         *cancellable,
 						   GError              **error);
-static void     g_unix_output_stream_write_async  (GOutputStream        *stream,
-						   const void           *buffer,
-						   gsize                 count,
-						   int                   io_priority,
-						   GCancellable         *cancellable,
-						   GAsyncReadyCallback   callback,
-						   gpointer              data);
-static gssize   g_unix_output_stream_write_finish (GOutputStream        *stream,
-						   GAsyncResult         *result,
-						   GError              **error);
 static void     g_unix_output_stream_close_async  (GOutputStream        *stream,
 						   int                   io_priority,
 						   GCancellable         *cancellable,
@@ -138,8 +128,6 @@ g_unix_output_stream_class_init (GUnixOutputStreamClass *klass)
 
   stream_class->write_fn = g_unix_output_stream_write;
   stream_class->close_fn = g_unix_output_stream_close;
-  stream_class->write_async = g_unix_output_stream_write_async;
-  stream_class->write_finish = g_unix_output_stream_write_finish;
   stream_class->close_async = g_unix_output_stream_close_async;
   stream_class->close_finish = g_unix_output_stream_close_finish;
 
@@ -441,129 +429,6 @@ g_unix_output_stream_close (GOutputStream  *stream,
 }
 
 typedef struct {
-  gsize count;
-  const void *buffer;
-  GAsyncReadyCallback callback;
-  gpointer user_data;
-  GCancellable *cancellable;
-  GUnixOutputStream *stream;
-} WriteAsyncData;
-
-static gboolean
-write_async_cb (int             fd,
-		GIOCondition    condition,
-		WriteAsyncData *data)
-{
-  GSimpleAsyncResult *simple;
-  GError *error = NULL;
-  gssize count_written;
-
-  while (1)
-    {
-      if (g_cancellable_set_error_if_cancelled (data->cancellable, &error))
-	{
-	  count_written = -1;
-	  break;
-	}
-      
-      count_written = write (data->stream->priv->fd, data->buffer, data->count);
-      if (count_written == -1)
-	{
-          int errsv = errno;
-
-	  if (errsv == EINTR || errsv == EAGAIN)
-	    return TRUE;
-	  
-	  g_set_error (&error, G_IO_ERROR,
-		       g_io_error_from_errno (errsv),
-		       _("Error writing to file descriptor: %s"),
-		       g_strerror (errsv));
-	}
-      break;
-    }
-
-  simple = g_simple_async_result_new (G_OBJECT (data->stream),
-				      data->callback,
-				      data->user_data,
-				      g_unix_output_stream_write_async);
-  
-  g_simple_async_result_set_op_res_gssize (simple, count_written);
-
-  if (count_written == -1)
-    g_simple_async_result_take_error (simple, error);
-
-  /* Complete immediately, not in idle, since we're already in a mainloop callout */
-  g_simple_async_result_complete (simple);
-  g_object_unref (simple);
-
-  return FALSE;
-}
-
-static void
-g_unix_output_stream_write_async (GOutputStream       *stream,
-				  const void          *buffer,
-				  gsize                count,
-				  int                  io_priority,
-				  GCancellable        *cancellable,
-				  GAsyncReadyCallback  callback,
-				  gpointer             user_data)
-{
-  GSource *source;
-  GUnixOutputStream *unix_stream;
-  WriteAsyncData *data;
-
-  unix_stream = G_UNIX_OUTPUT_STREAM (stream);
-
-  if (!unix_stream->priv->is_pipe_or_socket)
-    {
-      G_OUTPUT_STREAM_CLASS (g_unix_output_stream_parent_class)->
-	write_async (stream, buffer, count, io_priority,
-		     cancellable, callback, user_data);
-      return;
-    }
-
-  data = g_new0 (WriteAsyncData, 1);
-  data->count = count;
-  data->buffer = buffer;
-  data->callback = callback;
-  data->user_data = user_data;
-  data->cancellable = cancellable;
-  data->stream = unix_stream;
-
-  source = _g_fd_source_new (unix_stream->priv->fd,
-			     G_IO_OUT,
-			     cancellable);
-  g_source_set_name (source, "GUnixOutputStream");
-  
-  g_source_set_callback (source, (GSourceFunc)write_async_cb, data, g_free);
-  g_source_attach (source, g_main_context_get_thread_default ());
-  
-  g_source_unref (source);
-}
-
-static gssize
-g_unix_output_stream_write_finish (GOutputStream  *stream,
-				   GAsyncResult   *result,
-				   GError        **error)
-{
-  GUnixOutputStream *unix_stream = G_UNIX_OUTPUT_STREAM (stream);
-  GSimpleAsyncResult *simple;
-  gssize nwritten;
-
-  if (!unix_stream->priv->is_pipe_or_socket)
-    {
-      return G_OUTPUT_STREAM_CLASS (g_unix_output_stream_parent_class)->
-	write_finish (stream, result, error);
-    }
-
-  simple = G_SIMPLE_ASYNC_RESULT (result);
-  g_warn_if_fail (g_simple_async_result_get_source_tag (simple) == g_unix_output_stream_write_async);
-  
-  nwritten = g_simple_async_result_get_op_res_gssize (simple);
-  return nwritten;
-}
-
-typedef struct {
   GOutputStream *stream;
   GAsyncReadyCallback callback;
   gpointer user_data;



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]